본문 바로가기

개발/Kafka

[Kafka] 테스트 서버 DB 마이그레이션

지난번 source connector의 bulk 모드를 이용하여 로컬 MySQL DB의 특정 테이블 데이터를 다른 테이블로 마이그레이션해보았다.

https://presentlee.tistory.com/7

 

[Kafka] JDBC Connector 옵션 테스트

앞서 진행한 jdbc connector 테스트 중 incrementing 모드에서는 기존 row의 수정이나 삭제는 감지하지 못하는 이슈가 발생하여 mode 옵션을 변경 후 테스트해보려고 한다. bulk : 데이터를 폴링할 때 마다

presentlee.tistory.com

이번에는 테스트 서버의 MSSQL DB 테이블의 데이터를 로컬 파일로 옮겨보려고 하는데

1. DB -> Topic polling : 5분마다 실행
2. polling된 Topic 메시지 삭제 : 1번 실행 후 2분 뒤 실행
3. Topic -> File Sink : 5분마다 저장된 Topic 데이터를 시간별로 분리하여 저장

위 조건을 추가하여 bulk 모드를 사용하더라도 중복된 데이터가 통째로 이관되지 않도록 설정해보자.

 

Source Connector 생성
{
    "name": "source_데이터베이스명_테이블명",
    "config": {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url" : "jdbc:sqlserver://***.**.***.**:1433;databasename=***_**",
        "connection.user" : "****",
        "connection.password" : "****",
        "topic.prefix" : "topic_데이터베이스명",
        "poll.interval.ms" : 300000,
        "table.whitelist" : "테이블명",
        "mode" : "bulk",
        "query": "",
        "validate.non.null": "false",
        "topic.creation.default.replication.factor" : 1,
        "topic.creation.default.partitions" : 1
    }
}

위와 같이 poll.interval.ms를 5분으로 설정하여 source connector를 생성하였다.

아무 오류는 없는데 Topic에 메시지가 쌓이지 않아 table.whitelist(테이블명)를 대문자로 수정하였더니 정상 동작하는 것이 확인되었다.

실제 DB에 등록된 테이블 명칭을 정확히 입력해야하는 것으로 보인다.

select count(*) from 테이블명 //8371

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_데이터베이스명_테이블명 --from-beginning //Topic 확인

Processed a total of 8371 messages //source connector 생성 직후
Processed a total of 16742 messages //5분 후

대상 테이블의 8371건이 5분 주기로 Topic에 누적되는 것이 확인된다.

 

config 옵션 수정
$ vi ./kafka_2.12-2.6.0/config/server.properties

#log.retention.hours=168
log.retention.minutes=2 //Topic 메시지 보관 기간

Topic 메시지를 지우기 위해 config/server.properties의 log.retention.minutes 옵션만 2분으로 수정하였는데 삭제되지 않아서 확인해보니

메시지 보관 기간 설정은 개별 메시지가 아닌 log segment 파일을 대상으로 처리된다.
이를 바탕으로 메시지가 카프카 브로커에 생성될 때, 해당 파티션의 log segment 파일 맨 마지막에 추가되는데, log.segment.bytes 값을 초과하면(기본 1GB) 해당 파일을 close 하여 만기 처리한 뒤 새 파일을 열어서 쓰게 된다.

이 때, log segment 파일이 close 되어 만기처리가 되어야만 retention.ms 또는 retention.bytes 로 삭제 처리할 수
 있다.

ex) log.segment.bytes 가 기본값인 1GB 이고, log.retention.ms 가 7일이면, 1GB 를 다 채우고 난 다음 7일이 지나야 해당 파일이 삭제된다. 단, log.segment.bytes 값이 너무 크면 retention 값이 제대로 동작하지 않을 수 있고, 너무 작으면 파일의 close & open 이 빈번히 발생하므로 적절한 조절이 필요하다.

위와 같은 Topic 메시지의 특징이 확인되었다.

#log.segment.bytes=1073741824
log.segment.bytes=10000000 //단일 로그의 최대 크기

#log.retention.check.interval.ms=300000
log.retention.check.interval.ms=1000 //삭제할 대상을 확인하는 시간주기 값

 

보관주기인 2분이 지났지만 로그 용량이 기본값 1GB를 넘지 않아 지워지지 않았던 것이다. 테스트를 위해 log.segment.bytes10MB 로 수정하였다. 

 

또한 브로커에서 log.retention.check.interval.ms 에 설정된 시간마다 삭제할 대상을 확인하는데 기본값이 5분이라 정상적으로 삭제되지 않는 문제가 발생하여 1초로 수정하였다.

 

테스트 결과
no Timestamp Topic messages count 로그 용량
1 2022-02-24 15:46 8371 8KB -> 14MB -> 34MB
2 2022-02-24 15:48 0 34MB -> 14MB -> 8KB
3 2022-02-24 15:51 8371 8KB -> 14MB -> 34MB
4 2022-02-24 15:53 0 34MB -> 14MB -> 8KB
5 2022-02-24 15:56 8371 8KB -> 14MB -> 34MB

테스트 결과, 15시 46분에 source connector를 생성하여 8371의 Topic 메시지가 생성되었고 2분 뒤 삭제되었다가 최초 생성 시간으로부터 5분 뒤 다시 생성 -> 2분 뒤 삭제가 반복되었다.

//8KB
00000000000000401798.timeindex
00000000000000401798.snapshot
00000000000000401798.log
leader-epoch-checkpoint

//14MB
00000000000000393427.index.deleted
00000000000000393427.timeindex.deleted
00000000000000393427.log.deleted
00000000000000399470.index.deleted
00000000000000399470.timeindex.deleted
00000000000000399470.log.deleted
00000000000000401798.timeindex
00000000000000401798.snapshot
00000000000000401798.log
leader-epoch-checkpoint

//34MB
00000000000000401798.index
00000000000000401798.timeindex
00000000000000401798.snapshot
00000000000000401798.log
00000000000000407834.index
00000000000000407834.timeindex
00000000000000407834.snapshot
00000000000000407834.log
leader-epoch-checkpoint

/tmp/kafka-logs/topic_데이터베이스명_테이블명-0 디렉토리에서 실시간으로 확인된 로그파일은 위와 같다.

테스트 결과 표에서 볼 수 있듯이, segment 파일이 close 되고 지워진 뒤 변경되는 작업에 일정 시간이 소요되는 듯 하다.

 

Sink Connector 생성
{
    "name": "sink_데이터베이스명_테이블명",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "file": "/Users/lhj/apps/데이터베이스명_테이블명.txt",
        "topics": "topic_데이터베이스명_테이블명",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true"
    }
}

File sink connector를 생성하여 로컬에 파일로 저장 완료하였다.

하지만 동일한 파일에 Topic 메시지가 중복으로 계속 쌓이고 있어, 날짜별 파일/디렉토리 단위로 분할하여 저장을 해야겠다.

 

 

출처

https://dev-jj.tistory.com/entry/Kafka-Topic-%EB%A9%94%EC%84%B8%EC%A7%80-%EB%B3%B4%EA%B4%80%EC%A3%BC%EA%B8%B0-%EC%84%A4%EC%A0%95-MSK

https://deep-dive-dev.tistory.com/63

'개발 > Kafka' 카테고리의 다른 글

[Kafka] Troubleshooting  (0) 2022.03.08
[Kafka] S3 Sink Connector  (0) 2022.03.02
[Kafka] Managing Platform  (0) 2022.02.23
[Kafka] JDBC Connector 옵션 테스트  (2) 2022.02.10
[Kafka] Sink Connector 생성  (0) 2022.02.08