이슈 => Kafka producer를 통해 데이터를 생성할 때 memory buffer 크기에 도달하면 일시 hang이 걸린다.
풀어서 설명하면 다음과 같다.
Ex 1) buffer.memory=2 일 때 데이터 크기가 1인 데이터 send
첫 번째 send (성공 - buffer 에만 전송된 상태, broker로 전송되지 않은 상태인 것으로 추정)
두 번째 send (성공 - buffer 에만 전송된 상태, broker로 전송되지 않은 상태인 것으로 추정)
세 번째 send (대기 약 5초)
5초가 지난 시점에 buffer에 있는 데이터가 broker -> topic 저장되고 대기가 풀리면서 다음 send들이 모두 성공한다.
Ex 2) buffer.memory=2 일 때 데이터 크기가 3인 데이터 send
첫 번째 send (대기 약 5초)
5초가 지난 시점에 buffer에 있는 데이터가 broker -> topic 저장되고 대기가 풀리면서 다음 send들이 모두 성공한다.
기본 정보 (중간에 일부 설정들의 값이 변경되기도 한다.)
- Kafka Version 2.5.0
- 데이터 크기 약 500byte
- buffer.memory=32MB 설정 (33,554,432byte)
- batch.size=20000
- acks=1
- producer.type=async
- retries=1
- replication.factor=1
- max.in.flight.requests.per.connection=1
- queue.buffering.max.ms=1000
- batch.num.messages=200
- offsets.topic.replication.factor=1
첫 번째 시험, 5만 개씩 데이터 전송 - buffer.memory = 32MB 설정 (33,554,432byte)
- 5만개 => 20,000,000byte
- loop 2번째에서 약 5초 정도의 holding 걸림
실행 결과
time: 439ms
time: 5619ms
time: 344ms
time: 368ms
time: 306ms
time: 281ms
time: 326ms
time: 279ms
time: 333ms
time: 230ms
time: 212ms
time: 307ms
time: 253ms
time: 221ms
time: 238ms
time: 206ms
time: 224ms
time: 171ms
time: 197ms
두번째 시험, 1만 개씩 데이터 전송 - buffer.memory = 32MB 설정 (33,554,432byte)
- 1만개 => 5,000,000byte
- loop 7번째에서 약 5초 정도의 holding 걸림
- 7만 개 데이터 전송 시 holding 이 걸린다? 7만 개는 약 35,000,000byte => 내가 설정한 buffer.memory size랑 거의 비슷.
실행 결과
time: 148ms
time: 76ms
time: 56ms
time: 49ms
time: 56ms
time: 52ms
time: 5042ms
time: 271ms
time: 97ms
time: 70ms
time: 120ms
time: 71ms
time: 46ms
time: 89ms
time: 71ms
time: 51ms
time: 60ms
time: 59ms
time: 42ms
time: 60ms
뭔가 이상하다. buffer.memory 크기를 변경해서 다시 테스트해보자.
buffer.memory 크기 200,000,000byte로 변경 (약 190MB)
세 번째 시험, 5만 개씩 데이터 전송
- loop 8번째 데이터에서 OOM 발생
- 5만개 데이터 (25,000,000byte) * 8 = 200,000,000byte
- 이상하다.
- buffer.memory size가 32MB일 때는 OOM이 발생하지 않았는데, 갑자기 왜 발생할까?
- delay 대신 에러가 발생한다...
실행 결과
time: 418ms
time: 272ms
time: 211ms
time: 185ms
time: 172ms
time: 565ms
time: 735ms
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.StringCoding$StringEncoder.encode(StringCoding.java:300)
at java.lang.StringCoding.encode(StringCoding.java:344)
at java.lang.String.getBytes(String.java:918)
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:47)
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:750)
at com.wins.bd1.ai.controller.kafka.KafkaManager.run(KafkaManager.java:127)
at com.wins.bd1.ai.controller.kafka.KafkaManager.main(KafkaManager.java:33)
buffer.memory 크기를 크게 낮춰서 다시 테스트 해보자.
buffer.memory 크기 500byte로 변경
네번째 시험, 5만 개씩 데이터 전송 - 에러도 없고, 딜레이도 없이 잘 전송됨.(????)
call back 추가 후 다시 테스트해보니까 에러 발생함. Consumer 통해서 확인해보니 데이터도 들어가지 않았음.
call back 설정하지 않았을 때는 에러도 보이지 않음. (어쨋든 아래의 실행 결과는 send 시 에러가 발생)
실행 결과
time: 439ms
time: 248ms
time: 276ms
time: 147ms
time: 144ms
time: 147ms
time: 158ms
time: 176ms
time: 177ms
time: 184ms
time: 212ms
time: 205ms
time: 152ms
time: 179ms
time: 145ms
time: 180ms
time: 155ms
time: 201ms
time: 137ms
time: 135ms
여기서 잠깐 buffer.memory의 정의를 다시 살펴보자.
출처: https://kafka.apache.org/documentation/#producerconfigs_buffer.memory
특징
- 서버로 보내지기 전 데이터를 버퍼링 하는 데 사용한다.
- 브로커로 레코드를 전달하는 것보다 쌓이는 양이 더 많을 경우 max.block.ms 설정이 필요하다.
어떤 에러가 나는지 call back으로 확인해보자.
각종 시험을 통해 알아낸 사실
1) 어라? 내 메시지 크기가 563byte 였네..
에러 메세지
org.apache.kafka.common.errors.RecordTooLargeException: The message is 563 bytes when serialized which is larger than the total memory buffer you have configured with the buffer.memory configuration.
An error occurred
2) buffer.memory size 최소 크기, 적어도 batch.size 크기보다는 커야 한다.
- batch size를 16,384로 해놓고 buffer.memory를 1,000으로 해놓으니까 아래와 같은 에러가 발생했다.
- batch 당 1byte로 계산해서 16,384byte는 할당해야 할 것 같다. (16,384로 변경하니까 에러없이 동작한다.)
에러 메세지
- Attempt to allocate 16384 bytes, but there is a hard limit of 1000 on memory allocations.
3) 비동기 처리, 실제 저장은 delay가 된다는 것
- linger.ms=1000, batch.size=16384로 되어있는데, 왜 1초 뒤에 저장되지 않는 걸까?
- 1개의 데이터 삽입 시 약 5초 뒤에 저장되는 것을 확인할 수 있었음
- 10초로 설정하면 약 15초 뒤에 저장됨.. (5초 delay는 무엇일까..)
4) 현재의 설정 상태에서 max.block.ms 값을 5000이하로 설정하면 에러가 발생한다.
- max.block.ms는 buffer의 용량이 확보될 때까지 기다리는 시간을 적는 것인데, buffer가 비워지는 시점은 5초 이후이기 때문에 5초 이하로 설정 시 TimeoutException이 발생한다.
에러 메세지
An error occurred
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 1000 ms.
위의 시험 결과에서 delay가 발생하는 이유를 추정해보면 아래와 같다.
1) 메모리가 가득찰 때까지는 send가 가능하다.
2) 메모리가 가득차는 시점에는 send가 안된다. (대기상태, 버퍼가 빌 때까지)
3) 약 5초 뒤 저장되는 시점에 buffer에서 데이터가 비게 되면 그때부터 데이터가 sending된다.
그렇다면...
왜 다음 버퍼가 풀차는 시점이 없는걸까? (그 뒤로는 왜 delay없이 잘 들어가는걸까?)
왜 최초에 버퍼 풀찼을 때만 delay가 발생하고 그 뒤부터는 잘 들어갈까...
혹시 아시는 분이 계시다면 댓글 부탁드립니다.
'BigData > Kafka' 카테고리의 다른 글
Kafka 자주 사용하는 명령어 정리 (0) | 2023.03.15 |
---|---|
Kafka connect를 활용하여 HDFS에 데이터 넣기 (0) | 2023.03.14 |
Kafka connect를 활용하여 Elasticsearch에 데이터 넣기 (0) | 2021.08.04 |
실시간 데이터 처리를 위한 Kafka에 대해 알아보자 (0) | 2019.12.04 |