BigData/Kafka

Kafka connect를 활용하여 Elasticsearch에 데이터 넣기

kih5893 2021. 8. 4. 16:27

Kafka, Elasticsearch 설치는 본문에서 다루지 않는다. (개념은 생략하며 sink connector만 다룬다.)


개요

 

아주 간략하게 Kafka Connect에 대해서만 설명하고 넘어가도록 한다.

 

Kafka Connect를 사용하기 위해서는 2가지가 필요하다.

1. Kafka Connect --> 메인, 총괄 역할

2. Kafka Connector --> 실제 파싱 sink 해주는 역할

 

※ 이름은 비슷한데 용도는 다르다.

(이론적으로 두 개가 다르다는 것은 알고 있었는데 어떻게 설치하는지 감이 안왔었다.)

Kafka Connect는 Apache Kafka를 설치하면 bin 밑에 들어있다.
(이 과정에서 약 3시간 동안 삽질..)

당연히 Connect 도 받아야 하는줄알고 열심히 Searc 했다.

(confluent kafka 받아서 그 안에 있다고 좋아했다.. 알고 보니 apache kafka bin 밑에도 있었다는..)
Elasticsearch sink connector는 검색하여 confluent hub에서 다운 받았다.


설정

 

다음은 설정 파일을 작성한다. (본문에서는 Kafka Connect를 distribute 모드로 동작시켰다.)

사실 작성할 필요없다. (kafka/config 아래에 있는 기본 setting을 사용했다.)

 

standalone: connect-standalone.properties
distiributed: connect-distributed.properties


실행

kafka 경로 아래에서 실행해보자.
bin/connect-distributed.sh config/connect-distributed.properties


커넥터 등록 방법 (Kafka Connect REST API 를 활용)

커넥터 등록을 위해서 미리 받아놓은 Elasticsearch Sink Connector의 라이브러리 파일들을 모두 kafka/libs 밑으로 옮겨놓는다. (옮길 수 없을 때는 path 지정하는 방법도 존재함 - plugin.path 참고)

다 옮기고 아래 명령을 콘솔에서 입력 (Connect의 REST API를 사용하여 Connector를 등록하는 구문이다.)


 echo '{
"name": "elasticsearch-sink",
"config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics":

 "test", "topic.index.map": "test:test_index", "connection.url": "http://localhost:9200", "type.name": "log", "key.ignore": 

"true", "schema.ignore": "true"}}
' | curl -X POST -d @- http://127.0.0.1:8083/connectors --header "content-Type:application/json"


 

elasticsearch-sink 라는 이름의 Connector를 만들건데 test라는 kafka topic에서 데이터를 가져와서 Elasticsearch test index에 넣어줘 라는 뜻이다.

 

 


동작 확인

콘솔 프로듀서, 콘솔 컨슈머, Elaticsearch 조회를 위한 콘솔 3개를 띄어 놓자.

데이터를 넣어서 실제 카프카에 들어가는 것을 확인해보기 위함

 

실행 방법

 

콘솔 프로듀서: bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
콘솔 컨슈머: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

엘라스틱 서치 조회 쿼리: http://localhost:9200/test/_search?size=10&pretty

참고로 데이터를 넣을 때는 반드시 json 형태로 넣어주어야 한다.

ex) {"name: "Song"}

 

캡쳐는 못찍었지만 잘 동작한다.


참고 (Kafka Connect API)

connector 등록
 echo '{
"name": "elasticsearch-sink",
"config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1",

 "topics": "logs", "topic.index.map": "logs:logs_index", "connection.url": "http://localhost:9020", "type.name": 

"log", "key.ignore": "true", "schema.ignore": "true"}}
' | curl -X POST -d @- http://127.0.0.1:8083/connectors --header "content-Type:application/json"

connector 제거
curl -X DELETE http://127.0.0.1:8083/connectors/elasticsearch-sink --header "content-Type:application/json"


기타 참고자료


https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
https://docs.confluent.io/home/connect/userguide.html#connect-installing-plugins
https://docs.confluent.io/platform/current/connect/references/restapi.html

반응형