위 그림과 같이 Kafka Connect를 이용하여 특정 테이블에 데이터를 insert하면 다른 테이블에 복제 시켜보고자 한다.
Source Connector를 사용해 왼쪽의 DB 데이터를 Kafka Broker로 보내고 Sink Connector를 사용해 Kafka에 담긴 데이터를 오른쪽 DB에 저장해보자.
- Source Connector: data source에 담긴 데이터를 topic에 담는 역할(Producer)을 하는 connector
- Sink Connector: topic에 담긴 데이터를 특정 data source로 보내는 역할(Consumer 역할)을 하는 connector
Kafka 설치 및 실행
$ wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.12-2.6.0.tgz
$ tar xvf kafka_2.12-2.6.0.tgz
wget을 통해 설치하거나 https://kafka.apache.org/downloads에서 설치
웹 페이지에서 다운로드 시, Binary downloads가 아닌 Source download로 받아야 한다.
$ vi config/server.properties
압축 해제가 완료되면 kafka 디렉토리의 server.properties 파일을 열어
listeners=PLAINTEXT://:9092 주석을 풀고 localhost를 입력한다.
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
bin 디렉토리의 실행 파일을 사용하여 zookeeper server와 kafka broker를 실행시킨다.
$ ps -ef | grep zookeeper | grep kafka
정상적으로 실행되었는지 확인
Kafka Connect 설치 및 실행
$ wget https://packages.confluent.io/archive/6.1/confluent-6.1.0.tar.gz
$ tar xvf confluent-6.1.0.tar.gz
$ ./bin/connect-distributed -daemon ./etc/kafka/connect-distributed.properties
설치 후 kafka-connect 디렉토리에서 kafka-connect 실행(zookeeper와 kafka broker가 실행되어있어야함)
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
실행 후 kafka 디렉토리로 돌아와 topic list를 확인하면 기본 topic이 생성된 것이 확인된다.
JDBC Connector 설치
https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html 링크로 접속하여 JDBC Connector (Source and Sink)를 다운로드 해준다.
$ vi etc/kafka/connect-distributed.properties
Kafka Connect 디렉토리에서 connect-distributed.properties의 plugin.path를 jdbc connector의 lib 디렉토리로 수정해준다. (위에서 설치된 jdbc connector의 jar파일이 있는 경로)
Mysql Connector 설치
https://dev.mysql.com/downloads/connector/j/?os=26 에서 받은 mysql-connector-java-버전.jar 파일을
Kafka Connect 디렉토리의 /share/java/kafka에 복사한다.
세팅은 완료되었지만 REST API로 Connector 생성 시 500에러가 발생하여 datahub 서버의 plugin.path 디렉토리의 jar 파일을 복사하여 해결하였다.
출처
'개발 > Kafka' 카테고리의 다른 글
[Kafka] Managing Platform (0) | 2022.02.23 |
---|---|
[Kafka] JDBC Connector 옵션 테스트 (2) | 2022.02.10 |
[Kafka] Sink Connector 생성 (0) | 2022.02.08 |
[Kafka] Source Connector 생성 (0) | 2022.02.07 |
[Docker] Docker로 MySQL 설치하기 (0) | 2022.02.04 |