이전에는 Kafka 데이터를 Elasticsearch에 넣어봤는데, 이번에는 HDFS에 넣어보고자 한다.
이전글: Kafka connect를 활용하여 Elasticsearch에 데이터 넣기
설정
connect-distributed.properties 파일을 사용한다. (기본으로 제공된 파일 그대로 사용)
실행
bin/connect-distributed.sh config/connect-distributed.properties (카프카 설치 경로에서 명령어 실행)
커넥터 등록 방법 (Kafka Connect REST API 활용)
커넥터 등록을 위해서 미리 받아놓은 HDFS Sink Connector 라이브러리 파일들을 kafka/libs 밑으로 옮긴다.
(다른 경로에서 사용하려면 path를 지정해서 사용할 수 있다. - 공식 문서 plugin.path 참고)
echo '{"name": "hdfs3-sink", "config": {"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector", "tasks.max": "1",
"topics": "test_hdfs", "hdfs.url": "hdfs://localhost:9000", "flush.size": "3", "key.converter": "org.apache.k
afka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "val
ue.converter.schema.registry.url":"http://localhost:8081", "confluent.topic.bootstrap.servers": "localhost:90
92", "confluent.topic.replication.factor": "1"}}' | curl -X POST -d @- http://127.0.0.1:8083/connectors --head
er "content-Type:application/json"
kafka의 test_hdfs topic에서 데이터를 가져다가 hadoop에 넣어준다.
동작 확인
콘솔 3개를 띄어 놓고 데이터를 확인해보자.
1. 콘솔 프로듀서: bin/kafka-console-producer.sh --topic test_hdfs --broker-list localhost:9092
2. 콘솔 컨슈머: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_hdfs --from-beginning
3. HDFS 데이터 확인용: bin/hadoop fs -ls /
참고 자료
https://www.confluent.io/hub/confluentinc/kafka-connect-hdfs3
https://docs.confluent.io/kafka-connect-hdfs3-sink/current/overview.html
'BigData > Kafka' 카테고리의 다른 글
Kafka 자주 사용하는 명령어 정리 (0) | 2023.03.15 |
---|---|
Kafka connect를 활용하여 Elasticsearch에 데이터 넣기 (0) | 2021.08.04 |
(해결X) Kafka producer memory buffer issue (0) | 2021.07.22 |
실시간 데이터 처리를 위한 Kafka에 대해 알아보자 (0) | 2019.12.04 |