전체 글 (10) 썸네일형 리스트형 [Kafka] Option 및 Config 정리 Zookeeper Config (zookeeper.properties) 옵션 설명 dataDir=/data/zookeeper/data 주키퍼의 상태, 스냅션, 트랜잭션 로그들을 저장하고 업데이트하는 디렉토리의 위치 dataLogDir=/data/zookeeper/logs 트랜잭션 로그를 저장하는 디렉터리. 특별한 설정을 하지 않으면 dataDir에 저장. 성능상 다른 디스크의 디렉터리에 분리하는것이 좋다. clientPort=2181 클라이언트로 요청을 받기 위한 포트 initLimit=5 처음 주키퍼의 follower가 leader에 접속하거나 데이터를 동기화 시키기 위해 사용되는 최대 시간을 제한하기 위해 사용하는 timeout 단위. 초기에 팔로워가 리더에 접속하거나 데이터를 동기화 시키기 위한 시.. [Kafka] Troubleshooting Broker kafka 실행 시 kafka.common.InconsistentClusterIdException 이 발생할 경우 server.properties 에 설정된 log.dirs 경로의 로그 파일 중 meta.properties 를 삭제한 뒤 재기동하면 해결됨 JDBCSourceConnector 이슈 - 대용량 테이블(약 9천만건) JDBCSourceConnector 생성 시 100개 행만 토픽에 저장됨 - TimeoutException 발생하며 connector 중단 발생 해결 - batch.max.rows 옵션을 사용하지 않았을 경우, 대용량 DB를 polling할 때 디폴트 값인 100개만 가져오기 때문에 batch.max.row를 늘려 해결 - jvm heap size를 늘려 해결 [202.. [Kafka] S3 Sink Connector 지난번 bulk로 생성된 topic life cycle을 구성해보았는데 FileStreamSinkConnector를 사용할 경우, 하나의 파일에 bulk 데이터가 중복으로 쌓이는 이슈가 발생하였다. 1. FileStreamSinkConnector - 간단한 옵션만을 제공하여 테스트용으로 사용하기에 적절하며, production용으로 사용하기엔 부적합 2. HdfsSinkConnector - HDFS 2.x 파일로 날짜별 데이터를 내보낼 수 있으며, Hive 연동에 적합 3. SftpSinkConnector - SFTP 디렉토리로 날짜별 파일을 내보낼 수 있으며, 지원되는 형식은 CSV/TSV, Avro, JSON, Parquet 4. S3SinkConnector - S3 객체로 날짜별 파일을 내보낼 수 .. [Kafka] 테스트 서버 DB 마이그레이션 지난번 source connector의 bulk 모드를 이용하여 로컬 MySQL DB의 특정 테이블 데이터를 다른 테이블로 마이그레이션해보았다. https://presentlee.tistory.com/7 [Kafka] JDBC Connector 옵션 테스트 앞서 진행한 jdbc connector 테스트 중 incrementing 모드에서는 기존 row의 수정이나 삭제는 감지하지 못하는 이슈가 발생하여 mode 옵션을 변경 후 테스트해보려고 한다. bulk : 데이터를 폴링할 때 마다 presentlee.tistory.com 이번에는 테스트 서버의 MSSQL DB 테이블의 데이터를 로컬 파일로 옮겨보려고 하는데 1. DB -> Topic polling : 5분마다 실행 2. polling된 Topic 메시.. [Kafka] Managing Platform Confluent Control Center Confluent Platform/Cloud를 통한 추가 기능을 갖춘 올인원 솔루션 설치 : https://docs.confluent.io/platform/current/installation/index.html 가격 - Enterprise : 견적 요청 필요 - 30일 무료 평가판 제공 Topic 관리 Connector 생성 Lenses (Landoop) Confluent Control Center와 유사한 기능 제공. Control Center에 비해 관련 문서가 적음. 설치 : https://docs.lenses.io/4.0/installation/ 가격 - Single developer : 월 49$ - Bigger teams/Enterprise : 견.. [Kafka] JDBC Connector 옵션 테스트 앞서 진행한 jdbc connector 테스트 중 incrementing 모드에서는 기존 row의 수정이나 삭제는 감지하지 못하는 이슈가 발생하여 mode 옵션을 변경 후 테스트해보려고 한다. bulk : 데이터를 폴링할 때 마다 전체 테이블을 복사 incrementing : 특정 컬럼의 중가분만 감지되며, 기존 행의 수정과 삭제는 감지되지 않음 incrementing.column.name : incrementing 모드에서 새 행을 감지하는 데 사용할 컬럼명 timestamp : timestamp형 컬럼일 경우, 새 행과 수정된 행을 감지함 timestamp.column.name : timestamp 모드에서 COALESCE SQL 함수를 사용하여 새 행 또는 수정된 행을 감지 timestamp+inc.. [Kafka] Sink Connector 생성 jdbc-connector-user 토픽에 저장된 데이터를 Sink Connector를 이용해 백업 테이블로 복제시켜보자. POST localhost:8083/connectors { "name": "jdbc_sink_mysql_03", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://localhost:3306/kafka_test", "connection.user":"root", "connection.password":"qwer1234", "auto.create": "false", "auto.evolve": "false", "delete.enabled": "t.. [Kafka] Source Connector 생성 Docker로 생성된 kafka_test DB에 테이블을 생성하여 REST 방식으로 Source Connector를 생성해보자. CREATE TABLE user( user_id integer not null, user_name varchar(30) not null, user_age integer not null, unique(user_id) ); 샘플 테이블은 위와 같이 만들어 주었다. curl --location --request POST 'http://localhost:8083/connectors' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "jdbc_source_mysql_01", "config": { "connector.. 이전 1 2 다음