본문 바로가기

개발/Kafka

[Kafka] Troubleshooting

Broker

kafka 실행 시 kafka.common.InconsistentClusterIdException 이 발생할 경우

server.properties 에 설정된 log.dirs 경로의 로그 파일 중 meta.properties 를 삭제한 뒤 재기동하면 해결됨

JDBCSourceConnector
이슈
- 대용량 테이블(약 9천만건) JDBCSourceConnector 생성 시 100개 행만 토픽에 저장됨
- TimeoutException 발생하며 connector 중단 발생

해결
- batch.max.rows 옵션을 사용하지 않았을 경우, 대용량 DB를 polling할 때 디폴트 값인 100개만 가져오기 때문에 batch.max.row를 늘려 해결
- jvm heap size를 늘려 해결
[2022-03-08 16:03:13,727] ERROR Failed to remove connector configuration from Kafka:  (org.apache.kafka.connect.storage.KafkaConfigBackingStore:346)
java.util.concurrent.TimeoutException: Timed out waiting for future
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:106)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.removeConnectorConfig(KafkaConfigBackingStore.java:344)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$3.call(DistributedHerder.java:781)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$3.call(DistributedHerder.java:768)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
[2022-03-08 16:03:13,728] ERROR Uncaught exception in REST call to /connectors/source_asp_mc_tb_iv_member_login_log (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
org.apache.kafka.connect.errors.ConnectException: Error removing connector configuration from Kafka
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.removeConnectorConfig(KafkaConfigBackingStore.java:347)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$3.call(DistributedHerder.java:781)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$3.call(DistributedHerder.java:768)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:371)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:295)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for future
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:106)
at org.apache.kafka.connect.storage.KafkaConfigBackingStore.removeConnectorConfig(KafkaConfigBackingStore.java:344)
... 9 more

connector 재기동 후 batch.max.rows =10000 으로 수정하여 소스 커넥터를 생성하면 토픽에 10000개 메시지는 들어가지만 아래와 같이 OutOfMemoryError 발생하여 memory를 늘려 테스트가 필요해 보인다.

[2022-03-08 16:23:11,383] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Heartbeat thread failed due to unexpected error (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1425)
java.lang.OutOfMemoryError: Java heap space
[2022-03-08 16:23:11,383] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:366)
java.lang.OutOfMemoryError: Java heap space
[2022-03-08 16:23:11,383] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-offsets,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:366)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.LinkedHashMap.keySet(LinkedHashMap.java:534)
	at org.apache.kafka.clients.FetchSessionHandler.verifyIncrementalFetchResponsePartitions(FetchSessionHandler.java:348)
	at org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:446)
	at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:287)
	at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:274)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1292)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
	at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:278)
	at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71)
	at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:359)
[2022-03-08 16:23:11,485] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:303)
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1429)
Caused by: java.lang.OutOfMemoryError: Java heap space
[2022-03-08 16:23:11,597] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)
[2022-03-08 16:23:11,597] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:332)
[2022-03-08 16:23:11,722] INFO Stopped http_8083@6846e4e8{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:381)
[2022-03-08 16:23:11,722] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:149)
[2022-03-08 16:23:11,723] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:349)
[2022-03-08 16:23:11,723] INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder:682)
[2022-03-08 16:23:16,890] INFO [Worker clientId=connect-1, groupId=connect-cluster] Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder:702)
[2022-03-08 16:23:16,890] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:72)

이슈
- JDBCSourceConnector에서 bulk 모드 + query 옵션 사용 시 에러 발생

해결
- query 옵션과 table.whitelist 옵션을 동시에 사용할 경우 에러 발생
- 기존 topic 네이밍은 "topic.prefix + table.whitelist" 로 정의했지만, query를 사용할 경우 topic.prefix를 topic의 full name을 정의하는 용도로 사용해야 함
{
    "name": "source_join_test",
    "config": {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url" : "jdbc:sqlserver://***.**.***.**:1433;databasename=****",
        "connection.user" : "****",
        "connection.password" : "****",
        "topic.prefix" : "topic_",
        "poll.interval.ms" : 30000,
        "mode" : "bulk",
        "query": "SELECT 쿼리 작성",
        "table.whitelist" : "테이블1,테이블2",
        "validate.non.null": "false",
        "topic.creation.default.replication.factor" : 1,
        "topic.creation.default.partitions" : 1
    }
}

//에러 발생
org.apache.kafka.connect.errors.ConnectException: query may not be combined with whole-table copying settings.
io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:108)

이슈
- JDBCSourceConnector 사용시 Timestamp형 컬럼이 String형으로 토픽에 저장됨
- "mem_join_date":1647320618000,"mem_join_date2":1647320618000

해결
- transforms 옵션 추가(여러 필드를 한번에 처리하기 위해서는 transforms 옵션을 각각 생성하여 추가해줘야 함)
- "mem_join_date":"2022-03-15 05:03:38.000","mem_join_date2":"2022-03-15 05:03:38.000
{
    "name": "source_테이블명",
    "config": {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url" : "jdbc:mysql://localhost:3306/kafka_test",
        "connection.user" : "root",
        "connection.password" : "qwer1234",
        "topic.prefix" : "topic_",
        "poll.interval.ms" : 10000,
        "table.whitelist" : "테이블명",
        "mode" : "bulk",
        "query": "",
        "validate.non.null": "false",
        "topic.creation.default.replication.factor" : 1,
        "topic.creation.default.partitions" : 1,
        "transforms": "memJoinDateConverter,memJoinDate2Converter",
        "transforms.memJoinDateConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.memJoinDateConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
        "transforms.memJoinDateConverter.target.type": "string",
        "transforms.memJoinDateConverter.field": "mem_join_date",
        "transforms.memJoinDate2Converter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.memJoinDate2Converter.format": "yyyy-MM-dd HH:mm:ss.SSS",
        "transforms.memJoinDate2Converter.target.type": "string",
        "transforms.memJoinDate2Converter.field": "mem_join_date2"
    }
}

S3SinkConnector
이슈
- S3SinkConnector​를 사용하여 Amazon S3에 데이터 이관 시, 필수 값인 flush.size 옵션만큼만 데이터가 적재
ex) 토픽 데이터가 201건인데 flush.size를 100으로 설정할 경우 S3에는 json 파일이 2개만 생성됨(나머지 1건은 100건이 채워져야 세번째 파일로 생성)

해결
- ​rotate.(schedule.)interval.ms 옵션 사용 ex) flush.size가 100이고, rotate.interval.ms가 10000일 경우 : 토픽에 100건이 쌓이거나 10초마다 json 파일이 생성됨​
{
    "name": "s3_sink_테이블명",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "topic_테이블명",
        "tasks.max": 1,
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "flush.size": 100,
        "rotate.interval.ms" : 10000,
        "s3.bucket.name": "버킷명",
        "s3.region": "ap-northeast-2",
        "path.format": "YYYY-MM-dd",
        "partition.duration.ms": 600000,
        "locale": "ko_KR",
        "timezone": "Asia/Seoul",
        "schema.compatibility": "NONE"
        }
}

'개발 > Kafka' 카테고리의 다른 글

[Kafka] Option 및 Config 정리  (0) 2022.03.16
[Kafka] S3 Sink Connector  (0) 2022.03.02
[Kafka] 테스트 서버 DB 마이그레이션  (0) 2022.02.23
[Kafka] Managing Platform  (0) 2022.02.23
[Kafka] JDBC Connector 옵션 테스트  (2) 2022.02.10