본문 바로가기

기타

[Kafka] Kafka Producer 예제

반응형

1. KafkaProducer

 

Kafka cluster record 를 송신하는 클라이언트, 프로듀서 객체를 생성하는 클래스이다.

매개변수로 입력받은 설정에 따라 카프카 클러스터와 연결하여 동작한다.

레코드를 전송하는 메서드는 send() 인데, 이는 비동기로 동작한다.

 

프로듀서 객체가 정상적으로 동작하기 위해서는 카프카 클러스터 서버 정보와 데이터 직렬화 정보를 입력해주어야 한다.

 

- bootstrap.servers

 

클라이언트가 레코드를 전송하기 위해서는 레코드가 전송될 토픽의 파티션 정보를 알아햐 한다. 클라이언트가 찾고자 하는 토픽 파티션의 메타데이터를 요청하기 위한 설정으로 클라이언트와 클러스터의 초기 연결을 위해 사용한다.

 

브로커의 'hostname:port' 정보의 리스트 형태를 가지며, 리스트의 순서대로 해당 호스트에 메타데이터를 요청한다. 아래의 예제에서 localhost:9092 주소로 먼저 접근하고 만약 연결되지 않는 경우 그 다음 192.168.1.93:9091 로 접근하여 메타데이터를 요청한다.

 

"localhost:9092;192.168.1.93:9091"

 

해당 주소가 유효한 경우 클라이언트가 브로커와 연결되어 메타데이터를 요쳥하는데, 그 과정은 다음과 같다.

 

[메타 데이터 요청 과정]

1. 해당 주소에서 동작하고 있는 브로커와 클라이언트가 연결된다.
2. 해당 브로커가 포함된 클러스터의 메타데이터 (모든 브로커, 토픽, 파티션 등의 정보) 를 클라이언트로 전송한다.
3. 클라이언트는 수신한 메타데이터에서 토픽 파티션의 위치 (브로커) 를 탐색한다.
4. 탐색하여 찾은 브로커로 요청을 보낸다.

 

메타데이터는 클라이언트가 브로커에 처음 접근할 때와 주기적으로 갱신할 때 요청된다.

  • 현재 접근하고 있는 브로커가 비정상인 경우 (중단되거나 롤링 리스타트) 재요청
  • 클라이언트가 접근하고 있던 리더 토픽 파티션의 위치가 변경된 경우 (Partition reassign)

 

프로튜서, 컨슈머, 스트림즈 어플리케이션, 커넥터 등 카프카 브로커에 접근하는 모든 클라이언트는 카프카 클러스터의 메타데이터 전달 과정을 위해 bootstrap.servers 를 설정해야 한다.

 

- key.serializer, value.serializer

 

래코드를 전달할 때 메시지는 키와 값의 쌍의 형태로 브로커에게 전달된다. 이떄 각 메시지는 직렬화되어 바이트 배열로 변환되어 전달된다. 프로듀서에서 레코드를 전달하기 전에 메시지의 직렬화를 수행해야 하는데, 이때 키와 값에 대해 각각 어떤 직렬화 클래스를 사용할 것인지 클래스를 설정해준다.

 

 

2. ProducerRecord()

 

메시지 전송에 사용하는 레코드 객체이다. 토픽 이름, 파티션 번호, 타임스탬프, , 값 등의 정보를 가지고 있다. 이 중에서  파티션 번호, 타임스탬프, 키 등은 옵셔널이다. 반면에 데이터를 보낼 토픽과 값은 반드시 포함해야한다.

 

 

3. KafkaProducer 구현 예제

 

package com.example.kafka.clients;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {
    KafkaProducer<String, String> producer;

    public Producer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // bootstrap.servers 설정
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // key.serializer
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value.serializer

        this.producer = new KafkaProducer<>(properties);
    }

    public void send(String topic, String message) {
        try {
            ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, message); // 메시지 전송에 사용할 record 객체 생성
            producer.send(record, (metadata, e) -> {
                if(e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println(metadata);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.flush();
        }
    }

    public void close(String topic) {
        try {
            ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, "exit");
            producer.send(record, (metadata, e) -> {
                if(e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println(metadata);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.flush();
        }
        producer.close();
    }
}

 

 

[reference]
-  https://always-kimkim.tistory.com/entry/kafka101-configuration-bootstrap-servers

 

[Kafka 101] bootstrap.servers 설정에 관하여 (Inside of bootstrap.server)

들어가며  카프카 클라이언트, 그 중 대표적으로 프로듀서와 컨슈머는 메시지를 발행, 구독하기 위해 필수로 bootstrap.servers 설정을 합니다. 하지만 이 bootstrap.servers 설정은 카프카 클러스터를 구

always-kimkim.tistory.com

-  https://coding-start.tistory.com/193

 

Apache Kafka - Kafka Producer(카프카 프로듀서) - 2

2019/07/06 - [Kafka&RabbitMQ] - Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1 Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1 이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로..

coding-start.tistory.com

 

반응형

'기타' 카테고리의 다른 글

[기타] Sync - Async / Blocking - Non-Blocking  (0) 2022.04.05
[Kafka] Kafka Consumer 예제  (0) 2022.03.22
[Kafka] Kafka 기본 개념  (0) 2022.03.19
[Web] html form 태그에서 PUT, DELETE 사용  (0) 2022.02.20
[Web] Rest URI 네이밍  (0) 2022.02.18