본문 바로가기

개발/Kafka

[Kafka] Source Connector 생성

Docker로 생성된 kafka_test DB에 테이블을 생성하여 REST 방식으로 Source Connector를 생성해보자.

CREATE TABLE user(
    user_id integer not null,
    user_name varchar(30) not null,
    user_age integer not null,
    unique(user_id)
);

샘플 테이블은 위와 같이 만들어 주었다.

curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "jdbc_source_mysql_01",
    "config": {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
        "connection.user" : "root",
        "connection.password" : "qwer1234",
        "topic.prefix" : "jdbc-connector-",
        "poll.interval.ms" : 2000,
        "table.whitelist" : "user",
        "mode" : "incrementing",
        "incrementing.column.name" : "user_id",
        "topic.creation.default.replication.factor" : 1,
        "topic.creation.default.partitions" : 1
    }
}'

위와 같이 POST 방식을 통해 /connectors로 Connector 생성 요청을 한다.

 

connector.class
Connector를 생성하기 위해 필요한 클래스
connection.url
데이터베이스 접근을 위한 주소
connection.user
connection.password
데이터베이스 접속을 위한 ID와 Password
topic.prefix
Topic 생성시 이름앞에 붙일 접두어.

여기서 작성한 prefix와 테이블명이 Topic의 이름이 된다. (prefix + 테이블명) 
poll.interval.ms
테이블에서 새로운 데이터에 대해 데이터를 폴링하는 주기
table.whitelist
데이터를 poll할 테이블의 목록을 셋팅한다.

복수개의 테이블에서 데이터를 가져오는경우 콤마(,)를 통해서 작성한다.
mode
테이블에 변경이 발생했을때 어떤 방식으로 데이터를 poll할지 셋팅한다. bulk를 사용하면 이벤트가 발생한 테이블의 내용을 모두 poll한다.

incrementing은 incrementing column을 통해서 신규로 들어온 row를 판단하고, 해당 데이터만 poll해온다.
여기서 주의해야할점은 incrementing모드의 경우에 "삭제(delete)"와 "수정(update)"에 대해선 작동하지 않는다는 점이다.

따라서 수정과 삭제정보도 poll하고 싶다면 shadow테이블을 만들어야 할 것이다. 
incrementing.column.name
incrementing column을 셋팅한다.

id라는 컬럼명을 보고 어떤 row부터 poll할지 판단한다. 설정한 column의 타입이 varchar인 경우 에러가 난다.
topic.creation.default.replication.factor
 default 그룹으로 topic을 자동생성할때 생성되는 replication 개수.

Source Connector를 실행했을때 Topic이 존재하지 않는다면 Source Connector는 자동으로 Topic을 생성할 수있다.
이때 몇가지 조건이 존재하는데 먼저 worker의 설정파일에서 topic.creation.enable=true 로 셋팅해야한다. (default값은 true)
topic.creation.default.partitions
default 그룹을 통해 topic을 자동생성할때 파티션을 몇개로 셋팅할지 정하는 옵션.

추가로 사용자가 default이외의 그룹을 만들수 있으며 그룹마다 이런 replication factor나 partition옵션을 셋팅할 수 있다.
그리고 include, exclude 옵션을 통해서 사용자가 생성한 그룹으로 topic을 생성할지 제외할지 선택할 수 있다.

Source Connector 옵션

 

com-aa7b93e6ea7:kafka_2.12-2.6.0 lhj$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
example_topic_users
jdbc-connector-user

위와 같이 user 테이블에 데이터를 Insert하면 위에서 정의한 jdbc-connector-user 토픽이 생성된 것을 확인할 수 있고

com-aa7b93e6ea7:kafka_2.12-2.6.0 lhj$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic jdbc-connector-user --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":1,"user_name":"user1","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":2,"user_name":"user2","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":1,"user_name":"user1","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":2,"user_name":"user2","user_age":30}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":3,"user_name":"user3","user_age":20}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":3,"user_name":"user3","user_age":20}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":4,"user_name":"user4","user_age":20}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"user_id"},{"type":"string","optional":false,"field":"user_name"},{"type":"int32","optional":false,"field":"user_age"}],"optional":false,"name":"user"},"payload":{"user_id":4,"user_name":"user4","user_age":20}}

Consumer를 실행하여 Topic에서 성공적으로 데이터를 가져오는 것이 확인된다.

'개발 > Kafka' 카테고리의 다른 글

[Kafka] Managing Platform  (0) 2022.02.23
[Kafka] JDBC Connector 옵션 테스트  (2) 2022.02.10
[Kafka] Sink Connector 생성  (0) 2022.02.08
[Docker] Docker로 MySQL 설치하기  (0) 2022.02.04
[Kafka] Kafka Connect 세팅  (0) 2022.02.03