본문 바로가기
CS/Kafka

카프카 프로듀서: 카프카에 메시지 쓰기

by hyelog 2023. 4. 29.

오늘은 이전 시리즈에 이어서

신뢰성 있는 데이터 전달을 할 수 있는 카프카의 프로듀서에 대해 알아보고자 한다.

 

이 포스팅은 카프카의 브로커, 토픽, 파티션에 대한 개념을 알고있으면 좋다.

 

알아두면 좋은 카프카의 용어 비유

- 브로커(Broker) : 카프카가 설치되어 있는  '서버' 단위

- 토픽(Topic) : FTP의 폴더와 비슷한 개념, 우리가 흔히 사용하는 드라이브 내 폴더들과 비슷한 역할을 한다.

- 파티션(Partition) : FTP의 파일과 비슷한 개념, 파티션은 토픽의 키(일종의 내용의 종류) 값에 따라 맵핑된다.

프로듀서 개념

카프카의 클라이언트의 한 형태인 프로듀서는 새로운 메시지를 생성한다 (카프카에 쓰다)

다른 발행/구독 시스템에서 발행자 혹은 작성자라고 부른다

: Queue인 토픽에 데이터를 넣는 발행자,

: 메세지를 생산해서 브로커의 토픽으로 보내는 서버 또는 어플

 

메시지는 아래와 같이 특정 토픽으로 생성된다.

메세지(프로듀서 레코드) 형식

  • 기본적으로 프로듀서는 메시지가 어떤 파티션에 수록되는지는 관여하지 않는다.
    • 하지만 때로, 특정 파티션에 직접 메시지를 작성하기도 한다. → 키와 파티셔너를 사용
      • 파티셔너 : 키의 해시 값 생성 → 키 별로 파티션에 대응시킴

프로듀서 생성하기

1. 기능을 수행하는 객체 생성하기

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1(host):9092(port)");

kafkaProps.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")

kafkaProps.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")

Producer<String, String> producer = new KafkaProducer
    <String, String>(kafkaProps);

이 코드를 분해해서 설명해 보면,  아래와 같다.

 

bootstrap.servers

kafkaProps.put("bootstrap.servers", "broker1(host):9092(port), broker2(host):9092(port)");
  • 프로듀서가 사용하는 브로커들의 host_name : port_num을 작성한다.
    • 모든 브로커에 대해서 필요하지는 않지만, 한 브로커가 중단될 것을 대비하여 적얻도 2개의 브로커를 포함시켜야 한다.

 

key.serializer & value.serializer

kafkaProps.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")

kafkaProps.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer")
  • 메세지의 키와 값을 직렬화 하기 위해 사용되는 클래스
    • 프로듀서에서 객체도 전송할 수 있으므로 여러 객체를 바이트 배열로 변환하는 방법(직렬화)를 프로듀서가 알고 있어야 한다.

2. 메세지를 포함한 실제 객체 생성하기

  // 프로듀서 레코드(메세지) 보내기
producer.send(new ProducerRecord<>("topicname", "key", "value")); // key를 넣는 방법
producer.send(new ProducerRecord<>("topicname", "value")); // key 없이 사용하기(라운드로빈)
  • 전송하기 원하는 topic 이름과 key 값, value 값을 포함한다.
    • key와 partition 지정은 선택적이다.

3. 이후 과정

1. 직렬화(Serialization) : key와 value 페어로 구성되는 메세지 객체들이 네트워크로 전송될 수 있도록

바이트 배열로 직렬화한다.

  • Avro 직렬 처리기(카프카에서 지원하는 직렬처리기)
    • 데이터를 읽을 어플리케이션을 변경하지 않는다.
    • 스키마를 변경하더라도 기존 데이터를 변경하지 않아도 된다. ➡ 어떤 에러도 발생시키지 않음
    • 스키마 레지스트리 : 카프카에 사용되는 스키마들을 저장하는 저장소
    • 추후 컨슈머에서는 식별자를 지정하여 레지스트리에서 스키마를 가져와 역직렬화를 수행한다.  ➡ 프로듀서에서 신경쓸 필요 X
    • 처리 방식

직렬화와 역직렬화

2. 파티셔너(partitioner) :  레코드에 파티셔너가 지정되어 있지 않다면, 레코드의 키 기준으로 파티셔너가 파티션을 선택한다.

3. 파티션이 선택되면 해당 레코드의 메세지가 저장될 토픽과 파티션을 프로듀서가 알게 된다.

4. 같은 토픽과 파티션으로 전송될 레코드들을 레코드 배치에 추가하고,

5. 별개의 스래드가 해당 배치들을 카프카 브로커에게 전송한다.

6. 그러면, 브로커는 해당 메세지 레코드의 수신 결과에 대한 응답을 다시 전송한다.

  • 성공 : RecordMetadata 객체 반환
    • 토픽
    • 파티션
    • 파티션 내부 메시지 오프셋
  • 실패 : 에러 반환 (but, 반환 전 몇 번 더 재시도 가능)

메세지 전송 방법

1. Fire-and-forget 전송 후 망각

ProducerRecord<String. String> record =
    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}
  • 전송 후 메시지 전송 성공 여부를 궁금해 하지x
  • 카프카 자체 가용성 + 자동 재시도 → 성공률⬆️
  • 단) 일부 메시지 유실 위험

2. Synchronous send 동기식 전송

ProducerRecord<String. String> record =
    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}
  • send 메서드 반환 값인 Future 객체의 get 메서드 호출 시, 작업 완료까지 대기하다 처리 결과 반환

3. Asynchronous send 비동기식 전송

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata RecordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace()
        }
    }
}


ProducerRecord<String, String> record =
    new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback())
  • 콜백 메서드를 사용한다

댓글