본문 바로가기

기타

[Kafka] Kafka Consumer 예제

반응형

1. KafkaConsumer

 

Kafka cluster 의 레코드를 사용하는 클라이언트, 컨슈머 객체를 생성하는 클래스이다.

매개변수로 넘겨주는 서버 정보, 데이터 역직렬화 정보, 그룹 ID 등을 설정으로하여 객체를 생성한다.

컨슈머는 poll 메서드를 사용해서 브로커에서 데이터를 가져온다. poll 메서드는 마지막으로 사용한 데이터 이후 입력된 데이터들을 가져온다. poll 메서드는 timeout 을 매개변수로 줄 수 있는데, 만약 레코드가 존재하면 해당 레코드를 반환하지만 timeout 을 초과하면 빈 레코드 집합이 반환된다.

 

- bootstrap.servers

 

클라이언트가 레코드를 전송하기 위해서는 레코드가 전송될 토픽의 파티션 정보를 알아햐 한다. 클라이언트가 찾고자 하는 토픽 파티션의 메타데이터를 요청하기 위한 설정으로 클라이언트와 클러스터의 초기 연결을 위해 사용한다.

 

브로커의 'hostname:port' 정보의 리스트 형태를 가지며, 리스트의 순서대로 해당 호스트에 메타데이터를 요청한다. 아래의 예제에서 localhost:9092 주소로 먼저 접근하고 만약 연결되지 않는 경우 그 다음 192.168.1.93:9091 로 접근하여 메타데이터를 요청한다.

 

"localhost:9092;192.168.1.93:9091"

 

해당 주소가 유효한 경우 클라이언트가 브로커와 연결되어 메타데이터를 요쳥하는데, 그 과정은 다음과 같다.

 

[메타데이터 요청 과정]

1. 해당 주소에서 동작하고 있는 브로커와 클라이언트가 연결된다.
2. 해당 브로커가 포함된 클러스터의 메타데이터 (모든 브로커, 토픽, 파티션 등의 정보) 를 클라이언트로 전송한다.
3. 클라이언트는 수신한 메타데이터에서 토픽 파티션의 위치 (브로커) 를 탐색한다.
4. 탐색하여 찾은 브로커로 요청을 보낸다.

 

메타데이터는 클라이언트가 브로커에 처음 접근할 때와 주기적으로 갱신할 때 요청된다.

  • 현재 접근하고 있는 브로커가 비정상인 경우 (중단되거나 롤링 리스타트) 재요청
  • 클라이언트가 접근하고 있던 리더 토픽 파티션의 위치가 변경된 경우 (Partition reassign)

 

프로튜서, 컨슈머, 스트림즈 어플리케이션, 커넥터 등 카프카 브로커에 접근하는 모든 클라이언트는 카프카 클러스터의 메타데이터 전달 과정을 위해 bootstrap.servers 를 설정해야 한다.

 

- key.deserializer, value.deserializer

 

레코드의 메시지는 바이트 형식으로 직렬화되어서 전송된다. 이때 컨슈머가 어떤 클래스를 이용하여 메시지의 key, value 를 각각 역직렬화할지 설정하는 속성이다.

 

- group.id

 

카프카 컨슈머가 포함될 그룹의 id 를 의미한다.

카프카는 컨슈머 그룹로 묶인 여러개의 컨슈머를 통해 레코드를 분산처리한다. 이를 통해 확장성과 내결함성을 제공한다.

같은 group.id 를 사용하는 컨슈머들로 컨슈머 그룹이 구성된다. 카프카는 파티션에 로드밸런싱을 통해 분산처리를 수행하는데, 컨슈머 그룹의 한 컨슈머당 한 파티션을 할당하여 메시지를 전달한다.

 

 

2. ConsumerRecord, ConsumerRecords

 

ConsumerRecord 는 카프카에서 수신한 key, value 쌍의 레코드 객체이다. 토픽 이름과 파티션 번호, 카프카 파티션에서의 해당 레코드의 오프셋, 타임스탬프 등의 정보를 가지고 있다.

 

ConsumerRecords 는 이름에서 알 수 있듯이 ConsumerRecord 의 리스트이다. 컨슈머의 poll 메서드의 결과값으로 특정 토픽의 한 파티션의 레코드들을 담고있다.

 

 

[reference]

- https://soft.plusblog.co.kr/30

 

[Kafka] #7 아파치 카프카 컨슈머(Kafka Consumer) 자바(Java)예제 코드

카프카 컨슈머 그룹과 컨슈머의 동작 방식에 대해서 알아봤다. 이제 자바를 이용해서 카프카 데이터를 읽어가는 예제 프로그램을 작성해보겠다. [Kafka] #2 - 아파치 카프카(Apache Kafka) 설치 및 실

soft.plusblog.co.kr

- https://d2.naver.com/helloworld/0974525

- https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

 

KafkaConsumer (kafka 3.1.0 API)

Alert the consumer to trigger a new rebalance by rejoining the group. This is a nonblocking call that forces the consumer to trigger a new rebalance on the next poll(Duration) call. Note that this API does not itself initiate the rebalance, so you must sti

kafka.apache.org

 

반응형

'기타' 카테고리의 다른 글

[Gradle] Gradle 이란?  (1) 2023.01.14
[기타] Sync - Async / Blocking - Non-Blocking  (0) 2022.04.05
[Kafka] Kafka Producer 예제  (0) 2022.03.21
[Kafka] Kafka 기본 개념  (0) 2022.03.19
[Web] html form 태그에서 PUT, DELETE 사용  (0) 2022.02.20