앞서 진행한 jdbc connector 테스트 중 incrementing 모드에서는 기존 row의 수정이나 삭제는 감지하지 못하는 이슈가 발생하여 mode 옵션을 변경 후 테스트해보려고 한다.
- bulk : 데이터를 폴링할 때 마다 전체 테이블을 복사
- incrementing : 특정 컬럼의 중가분만 감지되며, 기존 행의 수정과 삭제는 감지되지 않음
- incrementing.column.name : incrementing 모드에서 새 행을 감지하는 데 사용할 컬럼명
- timestamp : timestamp형 컬럼일 경우, 새 행과 수정된 행을 감지함
- timestamp.column.name : timestamp 모드에서 COALESCE SQL 함수를 사용하여 새 행 또는 수정된 행을 감지
- timestamp+incrementing : 위의 두 컬럼을 모두 사용하는 옵션
케이스별 Table 생성
--1. seq 만 존재
CREATE TABLE tb_iv_member(
mem_uno integer primary key,
mem_mid varchar(30) not null,
unique(mem_uno)
);
--2. seq, datetime 둘다 존재
CREATE TABLE tb_iv_member_join(
mem_uno integer primary key,
mem_join_type varchar(30) not null,
mem_join_date datetime
);
--3. datetime 만 존재
CREATE TABLE tb_iv_member_join_type(
mem_join_type varchar(30) not null,
reg_date datetime not null
);
--4. seq, datetime 둘다 없음
CREATE TABLE tb_iv_code(
code varchar(30) not null
);
테이블은 위와 같이 생성 후 mode 별 테스트를 진행해 보았다.
incrementing 모드(seq 컬럼)
{
"name": "source_tb_iv_member",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
"connection.user" : "root",
"connection.password" : "qwer1234",
"topic.prefix" : "topic_",
"poll.interval.ms" : 2000,
"table.whitelist" : "tb_iv_member",
"mode" : "incrementing",
"incrementing.column.name":"mem_uno",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1
}
}
/* insert test */
INSERT INTO tb_iv_member (mem_uno, mem_mid)
VALUES (900, 'lhj'); -- success
INSERT INTO tb_iv_member (mem_uno, mem_mid)
VALUES (901, 'theway'); -- success
insert 시 topic 데이터 생성 성공
/* update test */
UPDATE tb_iv_member SET mem_mid = 'lhjj' WHERE mem_uno = 900; -- fail
UPDATE tb_iv_member SET mem_uno = 903 WHERE mem_uno = 900; -- success
confluent 공식 문서를 보면 incrementing 모드에서는 update 시 topic에 produce되지 않는다고 명시되어 있는데, incrementing.column.name으로 지정된 seq 컬럼이 변경될 경우 감지되는 것 같다.
timestamp 모드
{
"name": "source_tb_iv_member_join_type",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
"connection.user" : "root",
"connection.password" : "qwer1234",
"topic.prefix" : "topic_",
"poll.interval.ms" : 2000,
"table.whitelist" : "tb_iv_member_join_type",
"mode" : "timestamp",
"timestamp.column.name":"reg_date",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1
}
}
INSERT INTO tb_iv_member_join_type (mem_join_type, reg_date) VALUES ('web', now());
INSERT INTO tb_iv_member_join_type (mem_join_type, reg_date) VALUES ('phone', now());
insert 시 topic 데이터 생성 성공
incrementing 모드 (seq 컬럼이 존재하지 않는 경우)
{
"name": "source_tb_iv_code",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
"connection.user" : "root",
"connection.password" : "qwer1234",
"topic.prefix" : "topic_",
"poll.interval.ms" : 2000,
"table.whitelist" : "tb_iv_code",
"mode" : "incrementing",
"incrementing.column.name":"code",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1
}
}
INSERT INTO tb_iv_code (code) VALUES ('aaaaa');
org.apache.kafka.connect.errors.ConnectException: Invalid type for incrementing column: STRING
io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetIncrementedId(TimestampIncrementingCriteria.java:271)
io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues
insert 시 varchar형 컬럼은 incrementing.column.name으로 사용이 불가하여 integer형으로 수정해야했다.
timestamp+incrementing 모드(seq, datetime 컬럼)
{
"name": "source_tb_iv_member_join",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
"connection.user" : "root",
"connection.password" : "qwer1234",
"topic.prefix" : "topic_",
"poll.interval.ms" : 2000,
"table.whitelist" : "tb_iv_member_join",
"mode" : "timestamp+incrementing",
"incrementing.column.name":"mem_uno",
"timestamp.column.name":"mem_join_date",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1
"validate.non.null":"false"
}
}
/* insert test */
INSERT INTO tb_iv_member_join (mem_uno, mem_join_type, mem_join_date)
VALUES (900, 'phone', now()); -- fail
INSERT INTO tb_iv_member_join (mem_uno, mem_join_type, mem_join_date)
VALUES (901, 'phone', now()); -- fail
org.apache.kafka.connect.errors.ConnectException:
Cannot make incremental queries using timestamp columns [mem_join_date] on
`kafka_test`.`tb_iv_member_join` because all of these columns nullable
insert 시 mem_join_date에 not null 조건이 없어서 에러 발생
기본적으로 JDBC 커넥터는 모든 incrementing 및 timestamp 테이블에 ID/타임스탬프로 사용되는 열에 대해 NOT NULL이 설정되어 있는지 확인하기 때문
해결방법
1. validate.non.null 옵션을 false로 설정하여 null 검사를 비활성화하면 topic에 데이터가 정상적으로 생성된다.
2. 해당 컬럼에 not null 조건 추가 -> 기존 테이블 컬럼 조건을 변경할 순 없으므로, 1번 방법을 사용해야 할 것으로 보인다.
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"mem_uno"},{"type":"string","optional":false,"field":"mem_join_type"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"mem_join_date"}],"optional":false,"name":"tb_iv_member_join"},"payload":{"mem_uno":900,"mem_join_type":"phone","mem_join_date":1644487109000}}
다만 위와 같이 mem_join_date 값이 integer형으로 저장되기 때문에 추후 sink connector에서 convert가 필요해 보인다.
/* update test */
UPDATE tb_iv_member_join
SET mem_join_type = 'web' WHERE mem_uno = 900; -- fail
UPDATE tb_iv_member_join
SET mem_join_type = 'web', mem_join_date = now() WHERE mem_uno = 900; -- success
UPDATE tb_iv_member_join
SET mem_uno = 900 WHERE mem_uno = 902; -- success
UPDATE tb_iv_member_join
SET mem_uno = 902 WHERE mem_uno = 900; -- fail
UPDATE tb_iv_member_join
SET mem_uno = 902, mem_join_date = now() WHERE mem_uno = 900; -- success
update 테스트 결과 아래와 같이 두 가지 이슈를 확인하였다.
첫번째로, source connector 옵션의 incrementing.column.name 이나 timestamp.column.name 컬럼으로 지정되지 않은 컬럼의 update는 감지하지 못한다.
결국 당연한 말이지만, 옵션으로 설정된 두 컬럼 중 최소 하나의 컬럼이 update 되어야만 topic으로 produce 된다.
두번째로, source connector의 감지 기준이 DB table이 아닌 topic의 offset이라는 것이다.
incrementing 컬럼에 기존 offset과 중복된 값이 update되거나 insert될 경우, topic에 데이터가 쌓이지 않는다.
해결 방법으로는 source connector의 이름을 변경하거나, 직접 source connector의 table offset을 수정하는 방법이 있지만 근본적인 해결 방법은 되지 않기 때문에 debezium cdc를 사용하거나 bulk 모드로 해결될지 알아보자.
bulk 모드
CREATE TABLE tb_iv_member_detail(
mem_uno integer primary key,
mem_mid varchar(30) not null,
mem_phone_num varchar(30),
mem_birth varchar(30),
mem_join_date datetime
);
INSERT INTO tb_iv_member_detail (mem_uno, mem_mid, mem_phone_num, mem_birth, mem_join_date) VALUES
(900, 'lhj', '01012345678', '19930101', now()),
(901, 'theway', '01011112222', null, now()),
(902, 'hong', null, null, now());
위 테스트 과정에서 incrementing, timestamp 모드 사용 시 확인된 이슈를 해결하기 위해, bulk 모드를 사용해보자.
테스트를 위한 테이블을 만들어주었다.
{
"name": "source_tb_iv_member_detail",
"config": {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
"connection.user" : "root",
"connection.password" : "qwer1234",
"topic.prefix" : "topic_",
"poll.interval.ms" : 10000,
"table.whitelist" : "tb_iv_member_detail",
"mode" : "bulk",
"query": "",
"validate.non.null": "false",
"topic.creation.default.replication.factor" : 1,
"topic.creation.default.partitions" : 1
}
}
10초에 한번씩 모든 row를 토픽에 저장하는 source connector를 생성하여 CUD 테스트 시 특이사항은 없는 것으로 보인다.
query 옵션은 사용하지 않았지만 bulk 모드에서도 incrementing, timestamp 모드처럼 사용할 수 있다.
Sink Connector 생성
{
"name": "sink_tb_iv_member_detail",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/kafka_test",
"connection.user":"root",
"connection.password":"qwer1234",
"tasks.max": "1",
"auto.create": "false",
"auto.evolve": "false",
"topics": "topic_tb_iv_member_detail",
"insert.mode": "upsert",
"table.name.format":"kafka_test.tb_iv_member_detail_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
"
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.connect.errors.ConnectException: Write to table '\"kafka_test\".\"tb_iv_member_detail_sink\"' in UPSERT mode requires key field names to be known, check the primary key configuration
io.confluent.connect.jdbc.sink.BufferedRecords.getInsertSql(BufferedRecords.java:278)
io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:129)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
"
upsert mode에서 PK 체크 에러가 발생하여 insert mode로 변경해보자.
{
"name": "sink_tb_iv_member_detail",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/kafka_test",
"connection.user":"root",
"connection.password":"qwer1234",
"tasks.max": "1",
"auto.create": "false",
"auto.evolve": "false",
"topics": "topic_tb_iv_member_detail",
"insert.mode": "insert",
"table.name.format":"kafka_test.tb_iv_member_detail_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
"
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Duplicate entry '901' for key 'tb_iv_member_detail_sink.PRIMARY'
java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '901' for key 'tb_iv_member_detail_sink.PRIMARY'
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:122)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
Caused by: java.sql.SQLException: Exception chain:
java.sql.BatchUpdateException: Duplicate entry '901' for key 'tb_iv_member_detail_sink.PRIMARY'
java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '901' for key 'tb_iv_member_detail_sink.PRIMARY'
io.confluent.connect.jdbc.sink.JdbcSinkTask.getAllMessagesException(JdbcSinkTask.java:150)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:102)
"
비슷하게 PK 중복 에러가 발생한다.
{
"name": "sink_tb_iv_member_detail",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/kafka_test",
"connection.user":"root",
"connection.password":"qwer1234",
"tasks.max": "1",
"auto.create": "true",
"auto.evolve": "true",
"topics": "topic_tb_iv_member_detail",
"insert.mode": "insert",
"table.name.format":"kafka_test.tb_iv_member_detail_sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}
}
원인 파악을 위해 sink 테이블을 drop하고, sink시 테이블과 컬럼이 없을 경우 자동 생성해주는 옵션(auto.create, auto.evolve)을 true로 변경해보니
위와 같이 topic에 저장된 메세지 전체가 복사되는 것을 확인할 수 있었다.
내 생각대로라면 bulk 모드 사용 시 source connector에서 저장시키는 row 3개만 들어갈 줄 알았는데 topic의 전체 데이터를 복사하는 것 같다. 그렇기 때문에 PK 중복 삽입 오류가 발생했던 것이다.
bulk 모드를 사용하기 위해서는 sink 하기전 topic에 이전 데이터는 지워져야 할 것으로 보인다.
topic 메세지 삭제는 config/server.properties의 log.retention.hours 값을 수정해보자.
log.segment.bytes 값을 초과하면 해당 파일을 close하여 만기 처리한 뒤 새 파일을 열어서 쓰게 된다.
ex) log.segment.bytes 가 기본값인 1GB 이고, log.retention.hours 가 7일이면, 1GB 를 다 채우고 난 다음 7일이 지나야 해당 파일이 삭제된다고 하는데, 해당 옵션을 수정하여 테스트해봐야겠다.
참고 : config/server.properties의 log.dirs(파티션이 저장될 디렉토리)에서 용량 확인 가능
출처
https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html
'개발 > Kafka' 카테고리의 다른 글
[Kafka] 테스트 서버 DB 마이그레이션 (0) | 2022.02.23 |
---|---|
[Kafka] Managing Platform (0) | 2022.02.23 |
[Kafka] Sink Connector 생성 (0) | 2022.02.08 |
[Kafka] Source Connector 생성 (0) | 2022.02.07 |
[Docker] Docker로 MySQL 설치하기 (0) | 2022.02.04 |