본문 바로가기

개발/Kafka

[Kafka] S3 Sink Connector

지난번 bulk로 생성된 topic life cycle을 구성해보았는데

FileStreamSinkConnector를 사용할 경우, 하나의 파일에 bulk 데이터가 중복으로 쌓이는 이슈가 발생하였다.

1. FileStreamSinkConnector
- 간단한 옵션만을 제공하여 테스트용으로 사용하기에 적절하며, production용으로 사용하기엔 부적합

2. HdfsSinkConnector
- HDFS 2.x 파일로 날짜별 데이터를 내보낼 수 있으며, Hive 연동에 적합

3. SftpSinkConnector
- SFTP 디렉토리로 날짜별 파일을 내보낼 수 있으며, 지원되는 형식은 CSV/TSV, Avro, JSON, Parquet

4. S3SinkConnector
- S3 객체로 날짜별 파일을 내보낼 수 있으며, 지원되는 형식은 Avro, JSON, Parquet, Byte, gzip

FileStreamSinkConnector는 날짜별 파일 생성이 불가능한 것으로 확인되어

나머지 Connector 중 AWS S3 버킷으로 파일을 보내기 위한 S3SinkConnector를 사용해보자.


AWS 세팅

AWS S3 콘솔에서 기본옵션으로 sink 파일을 저장할 버킷을 생성하였다.

AWS CLI란?
AWS 명령줄 인터페이스(CLI)는 AWS 서비스를 관리하는 통합 도구입니다. 도구 하나만 다운로드하여 구성하면 여러 AWS 서비스를 명령줄에서 제어하고 스크립트를 통해 자동화할 수 있습니다.

위에서 생성한 S3 버킷 접근을 위해 AWS CLI를 통해 권한 설정을 진행해보자.

$ curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"
$ sudo installer -pkg AWSCLIV2.pkg -target /

AWS CLI 최신 버전 설치

$ aws --version
aws-cli/2.4.22 Python/3.8.8 Darwin/18.7.0 exe/x86_64 prompt/off

위 명령어로 설치가 완료됐는지 확인이 가능하다.

IAM 콘솔 -> 내 보안 자격 증명 -> 액세스 키 만들기 에서 엑세스 키를 발급 받는다.

$ aws configure

위 명령어를 입력하여 발급 받은 액세스 키와 시크릿 키를 설정하고, 리전은 버킷과 동일하게 ap-northeast-2 로 설정하였다.

$ aws configure list

설정한 configure list를 확인하면 액세스 키와 시크릿 키가 적용된 것이 확인된다.


S3 Sink Connector 생성
{
    "name": "s3_sink_tb_iv_member_detail",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "topic_tb_iv_member_detail",
        "tasks.max": 1,
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "flush.size": 100,
        "s3.bucket.name": "버킷명",
        "s3.region": "ap-northeast-2",
        "path.format": "YYYY-MM-dd",
        "partition.duration.ms": 600000,
        "locale": "ko_KR",
        "timezone": "Asia/Seoul",
        "schema.compatibility": "NONE"
        }
}
이전에 생성해두었던 source connector를 통해 쌓인 topic 데이터를 S3로 이관시키는 S3 Sink Connector를 생성하였다.

 

위 경로에 topic 메시지가 flush.size (100개) 마다 분할되어 저장되었다.

s3.compression.type 옵션을 gzip으로 설정하였더니 gz 파일로 저장되어 제거 후 json 파일로 저장되는 것을 확인할 수 있었다.

파일을 다운받아 확인해보니 위와 같이 topic의 payload(실제 사용 데이터)만 저장되는 것을 확인 할 수 있었다. (10초마다 polling 되는 topic이기 때문에 중복 데이터로 보임)


 

출처

https://dev.classmethod.jp/articles/setting-up-the-aws-cli-environment-in-a-mac/

https://docs.confluent.io/kafka-connect-s3-sink/current/overview.html

https://docs.confluent.io/5.0.0/connect/kafka-connect-s3/configuration_options.html#s3-configuration-options

https://swalloow.github.io/kafka-connect/

https://data-engineer-tech.tistory.com/34

'개발 > Kafka' 카테고리의 다른 글

[Kafka] Option 및 Config 정리  (0) 2022.03.16
[Kafka] Troubleshooting  (0) 2022.03.08
[Kafka] 테스트 서버 DB 마이그레이션  (0) 2022.02.23
[Kafka] Managing Platform  (0) 2022.02.23
[Kafka] JDBC Connector 옵션 테스트  (2) 2022.02.10