토픽 파티션에서 레코드 조회
pubilc void consumer() {
Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka1:9092, kafka2:9092, kafka3:9092");
prop.put("group.id", "consumerGroupId");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singletton("simple")); // 토픽 구독
while(조건) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
}
}
consumer.close();
}
스프링 카프카 활용
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092, kafka2:9092, kafka3:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
@KafkaListener(topics = "topicName", groupId = "consumerGroupId")
public void consumer(ConsumerRecord record) {
System.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
}
토픽 파티션은 그룹 단위 할당
- 컨슈머 그룹 단위로 파티션 할당
- 컨슈머가 파티션 보다 많으면 초과한 컨슈머는 놀게 된다.
- 이전 커밋 오프셋부터 컨슈머 poll로 오프셋 커밋을 한다. 오프셋이 끝날 때까지 반복
- 커밋된 오프셋이 없는 경우 : auto.offset.reset 설정 사용
- earliest : 맨 처음 오프셋 사용
- latest : 가장 마지막 오프셋 사용 (기본값)
- none : 컨슈머 그룹에 대한 이전 커밋이 없으면 예외 발생
컨슈머 설정
조회에 영향을 주는 주요 설정
- fetch.min.bytes : 조회시 브로커가 전송할 최소 데이터 크기
- 기본값 : 1
- 이 값이 크면 대기 시간은 늘지만 처리량이 증가
- fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간
- 기본값 : 500
- 브로커가 리턴할 때까지 대기하는 시간으로 pol() 메서드의 대기 시간과 다름
- max.partition.fetch.bytes : 파티션 당 서버가 리턴할 수 있는 최대 크기
- 기본값 : 1048576 (1MB)
재처리와 순서
- 동일 메시지 조회 가능성
- 일시적 커밋 실패, 리밸런스 등에 의해 발생
- 컨슈머는 멱등성(idempotence)을 고려해야 함
- ex) 조회수 1증가 → 좋아요 1 증가 → 조회수 1 증가
- 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음
- 데이터 특성에 따라 타임스탬프, 일련번호 등을 활용
'생각정리 > Kafka' 카테고리의 다른 글
카프카 프로듀서 (0) | 2023.11.24 |
---|---|
왜 Zookeeper 모드에서 KRaft 모드로 변경하는가? (0) | 2023.11.23 |
왜 주키퍼를 앙상블로 구축을 해야하는가? (0) | 2023.11.22 |
카프카의 기본 구조 (0) | 2023.11.21 |
왜 카프카를 사용하는가? (0) | 2023.11.20 |