본문 바로가기

개발/Kafka

[Kafka] Sink Connector 생성

 

jdbc-connector-user 토픽에 저장된 데이터를 Sink Connector를 이용해 백업 테이블로 복제시켜보자.

POST localhost:8083/connectors
{
    "name": "jdbc_sink_mysql_03",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/kafka_test",
        "connection.user":"root",
        "connection.password":"qwer1234",
        "auto.create": "false",
    	"auto.evolve": "false",
        "delete.enabled": "true",
    	"insert.mode": "upsert",
    	"pk.mode": "record_key",
    	"table.name.format":"${topic}",
    	"tombstones.on.delete": "true",
    	"topics.regex": "jdbc-connector-user",
    	"key.converter": "org.apache.kafka.connect.json.JsonConverter",
    	"key.converter.schemas.enable": "true",
    	"value.converter": "org.apache.kafka.connect.json.JsonConverter",
    	"value.converter.schemas.enable": "true"
    	}
}

Sink Connector 생성

topic의 key를 읽어오기 위한 converter 옵션을 추가하였다.

(참고 : https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/ )

GET localhost:8083/connectors?expand=info&expand=status

DB insert 시 복제가 안되어 connector의 상태를 조회할 수 있는 위 API를 호출하여 해당 tasks의 trace를 확인해보면

Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'jdbc_sink_mysql_03' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='jdbc-connector-user',partition=0,offset=0,timestamp=1643871205337) with a null key and null key schema.
io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)

config 에러가 찍힌다.

 

{
    "name": "jdbc_sink_mysql_08",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/kafka_test",
        "connection.user":"root",
        "connection.password":"qwer1234",
        "tasks.max": "2",
        "auto.create": "false",
        "auto.evolve": "false",
        "topics": "jdbc-connector-user",
        "insert.mode": "insert",
        "table.name.format":"kafka_test.user_sink",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true"
    }
}

에러가 발생하는 옵션 제거 후 다시 Sink Connector를 생성해 보았다.

Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Duplicate entry '9' for key 'user_sink.user_id'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry '9' for key 'user_sink.user_id'
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
Caused by: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry '9' for key 'user_sink.user_id'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry '9' for key 'user_sink.user_id'
io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)

user_id 컬럼에서 SQL 에러 발생하였다.

 

CREATE TABLE user(
    user_id integer not null,
    user_name varchar(30) not null,
    user_age integer not null
);

CREATE TABLE user_sink(
    user_id integer not null,
    user_name varchar(30) not null,
    user_age integer not null
);

{
    "name": "jdbc_sink_mysql_11",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/kafka_test",
        "connection.user":"root",
        "connection.password":"qwer1234",
        "tasks.max": "2",
        "auto.create": "false",
        "auto.evolve": "false",
        "topics": "jdbc-connector-user",
        "insert.mode": "insert",
        "table.name.format":"kafka_test.user_sink",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true"
    }
}

테이블의 unique 제약조건을 제거 후 table.name.format 옵션에 sink 테이블을 입력하여 재시도 해보자.

INSERT INTO user (user_id, user_name, user_age) VALUES (14, 'user14', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (15, 'user15', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (16, 'user16', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (17, 'user17', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (18, 'user18', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (19, 'user19', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (20, 'user20', 30);

14 ~ 20번의 user 데이터를 insert 하였다.

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":14,"user_name":"user14","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":15,"user_name":"user15","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":16,"user_name":"user16","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":17,"user_name":"user17","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":18,"user_name":"user18","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":19,"user_name":"user19","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":20,"user_name":"user20","user_age":30}}

jdbc-connector-user 토픽 조회시 14 ~ 20번 데이터 삽입이 확인된다.

user table
user_sink table

user 테이블에서 user_sink 테이블로 데이터 복제가 확인된다.

캡쳐에서는 다른 connector에서도 해당 테이블에 sink 하기 때문에 데이터가 여러개 들어가고 있는데, 모두 삭제 후 하나의 connector만 다시 생성하면 정상적으로 insert 되는 것이 확인된다.

 

다음장에서는 source connector의 mode 별 테스트 케이스를 생성해보겠다.

'개발 > Kafka' 카테고리의 다른 글

[Kafka] Managing Platform  (0) 2022.02.23
[Kafka] JDBC Connector 옵션 테스트  (2) 2022.02.10
[Kafka] Source Connector 생성  (0) 2022.02.07
[Docker] Docker로 MySQL 설치하기  (0) 2022.02.04
[Kafka] Kafka Connect 세팅  (0) 2022.02.03