jdbc-connector-user 토픽에 저장된 데이터를 Sink Connector를 이용해 백업 테이블로 복제시켜보자.
POST localhost:8083/connectors
{
"name": "jdbc_sink_mysql_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/kafka_test",
"connection.user":"root",
"connection.password":"qwer1234",
"auto.create": "false",
"auto.evolve": "false",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"topics.regex": "jdbc-connector-user",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
Sink Connector 생성
topic의 key를 읽어오기 위한 converter 옵션을 추가하였다.
(참고 : https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/ )
GET localhost:8083/connectors?expand=info&expand=status
DB insert 시 복제가 안되어 connector의 상태를 조회할 수 있는 위 API를 호출하여 해당 tasks의 trace를 확인해보면
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'jdbc_sink_mysql_03' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='jdbc-connector-user',partition=0,offset=0,timestamp=1643871205337) with a null key and null key schema.
io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
config 에러가 찍힌다.
{
"name": "jdbc_sink_mysql_08",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/kafka_test",
"connection.user":"root",
"connection.password":"qwer1234",
"tasks.max": "2",
"auto.create": "false",
"auto.evolve": "false",
"topics": "jdbc-connector-user",
"insert.mode": "insert",
"table.name.format":"kafka_test.user_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
에러가 발생하는 옵션 제거 후 다시 Sink Connector를 생성해 보았다.
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Duplicate entry '9' for key 'user_sink.user_id'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry '9' for key 'user_sink.user_id'
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
Caused by: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry '9' for key 'user_sink.user_id'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry '9' for key 'user_sink.user_id'
io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
user_id 컬럼에서 SQL 에러 발생하였다.
CREATE TABLE user(
user_id integer not null,
user_name varchar(30) not null,
user_age integer not null
);
CREATE TABLE user_sink(
user_id integer not null,
user_name varchar(30) not null,
user_age integer not null
);
{
"name": "jdbc_sink_mysql_11",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/kafka_test",
"connection.user":"root",
"connection.password":"qwer1234",
"tasks.max": "2",
"auto.create": "false",
"auto.evolve": "false",
"topics": "jdbc-connector-user",
"insert.mode": "insert",
"table.name.format":"kafka_test.user_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
테이블의 unique 제약조건을 제거 후 table.name.format 옵션에 sink 테이블을 입력하여 재시도 해보자.
INSERT INTO user (user_id, user_name, user_age) VALUES (14, 'user14', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (15, 'user15', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (16, 'user16', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (17, 'user17', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (18, 'user18', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (19, 'user19', 30);
INSERT INTO user (user_id, user_name, user_age) VALUES (20, 'user20', 30);
14 ~ 20번의 user 데이터를 insert 하였다.
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":14,"user_name":"user14","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":15,"user_name":"user15","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":16,"user_name":"user16","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":17,"user_name":"user17","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":18,"user_name":"user18","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":19,"user_name":"user19","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":20,"user_name":"user20","user_age":30}}
jdbc-connector-user 토픽 조회시 14 ~ 20번 데이터 삽입이 확인된다.
user 테이블에서 user_sink 테이블로 데이터 복제가 확인된다.
캡쳐에서는 다른 connector에서도 해당 테이블에 sink 하기 때문에 데이터가 여러개 들어가고 있는데, 모두 삭제 후 하나의 connector만 다시 생성하면 정상적으로 insert 되는 것이 확인된다.
다음장에서는 source connector의 mode 별 테스트 케이스를 생성해보겠다.
'개발 > Kafka' 카테고리의 다른 글
[Kafka] Managing Platform (0) | 2022.02.23 |
---|---|
[Kafka] JDBC Connector 옵션 테스트 (2) | 2022.02.10 |
[Kafka] Source Connector 생성 (0) | 2022.02.07 |
[Docker] Docker로 MySQL 설치하기 (0) | 2022.02.04 |
[Kafka] Kafka Connect 세팅 (0) | 2022.02.03 |