로컬 환경
- zookeeper 3대(localhost:2181, localhost:2182, localhost:2183)
- kafka broker 3대(localhost:9092, localhost:9093, localhost:9094)
프로듀서란??
메시지를 생산(produce)해서 카프카의 토픽으로 메시지를 보내는 역할을 하는 애플리케이션, 서버 등을 모두 프로듀서라고 한다.
주요기능은 각각의 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것이다.
메시지를 보낼 때 키 값을 정해서 메시지를 보낼 수 있는데, 해당 키를 가진 모든 메시지를 동일한 파티션으로 전송할 수 있다.
키 값이 존재하지 않는다면, Round-Robin 방식으로 파티션에 균등 분배하게 된다.
1. 콘솔 프로듀서로 메시지 보내기
토픽 생성
kafka-topics.sh \
--zookeeper localhost:2181,localhost:2182,localhost:2183 \
--topic dcb-topic --partitions 3 --replication-factor 3 --create
파티션 3개, 레플리케이션 3개
토픽 확인
kafka-topics.sh \
--zookeeper localhost:2181 --topic dcb-topic --describe
토픽 메시지 보내기
kafka-console-producer.sh \
--broker-list localhost:9092,localhost:9093,localhost:9094 \
--topic dcb-topic
컨슘하기
kafka-console-consumer.sh \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--topic dcb-topic --from-beginning
자바를 이용한 프로듀서
메시지를 보내고 확인하지 않기
프로듀서는 메시지가 성공적으로 도착했는지 확인하지 않는다.
카프카가 항상 살아있는 상태고 프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만, 일부 메시지는 손실될 수도 있다.
public class KafkaDcbProducer {
public static void main(String[] args) {
Properties props = new Properties();
// 브로커 리스트 정의
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
// 사용할 serializer 정의
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Properties를 전달하여 새 프로듀서 생성
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<>("dcb-topic", "Apache kafka is a distributed streaming platform"));
}catch(Exception e){
// 메시지를 보내기 전 에러는 예외 처리가 가능
e.printStackTrace();
}finally {
producer.close();
}
}
}
send()메소드를 통해 ProducerRecord를 보낸다. 메시지는 버퍼에 저장되고 별도의 스레드를 통해 브로커로 전송된다.
send()는 자바 퓨처(future) 객체로 RecordMetadata을 리턴받지만 사용하지 않기 때문에 성공적으로 전송됬는지 알 수 없게 된다. 즉, 메시지 손실 가능성이 있기 때문에 일반적인 서비스 환경에서 사용하지 않는다.
동기 전송
send() 메소드 후 Future 객체를 리턴하기 때문에, 메시지 전달이 성공했는지 실패했는지 확인이 가능하다. (신뢰성)
public class SyncKafkaDcbProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
RecordMetadata metadata
= producer.send(new ProducerRecord<>("dcb-topic", "Apache kafka is a distributed streaming platform")).get();
System.out.printf("Partition: %d, Offset : %d", metadata.partition(), metadata.offset());
}catch(Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
}
get() 메소드를 이용해 카프카의 응답을 기다리게 된다.
메시지가 성공적으로 전송되지 않으면 예외가 발생하고, 에러가 없다면 메시지가 기록된 오프셋을 알 수 있는 RecordMetadata를 얻게된다.
메시지를 보내기 전과 보내는 동안 에러가 나는 경우 예외를 발생 - 두 가지로 구분
- 재시도 가능 예외 - 다시 전송하여 해결 (커넥션 에러)
- 재시도 불가능 예외 - 메시지가 너무 큰 경우 등..?
비동기 전송
프로듀서가 보낸 모든 메시지에 대한 응답을 기다린다면 시간이 많이 소요되기 때문에 비동기전송을 사용할 수있다.
카프카에서 제공하는 Callback 인터페이스를 구현하면 콜백을 사용하여 비동기 전송이 가능해진다.(전송 실패시 에외 처리 가능)
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<>("dcb-topic", "Apache kafka is a distributed streaming platform(async)"), new ProducerCallback());
}catch(Exception e){
e.printStackTrace();
}finally {
producer.close();
}
}
static class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (metadata != null) {
System.out.println("Partition : " + metadata.partition() + ", Offset : " + metadata.offset());
} else {
// 예외처리
e.printStackTrace();
}
}
}
프로듀서의 주요 옵션 참고
https://kafka.apache.org/documentation/#producerconfigs
'프로그래밍 노트 > kafka' 카테고리의 다른 글
[Kafka] Embedded Kafka Broker를 활용한 Kafka 로컬 테스트 (0) | 2024.12.18 |
---|---|
[kafka] 카프카 프로듀서 주요 옵션 및 전송방법 (0) | 2021.08.09 |
[kafka] Docker 사용하여 카프카 클러스터 구성하기 (1) | 2021.07.16 |
[kafka] 카프카 설치 및 실행 (환경설정) (0) | 2021.07.10 |
[kafka] 주키퍼(zookeeper) 설치 및 실행 (환경설정) (0) | 2021.06.30 |