본문 바로가기

생각정리/Kafka

카프카 컨슈머

토픽 파티션에서 레코드 조회

pubilc void consumer() {
    Properties prop = new Properties();
    prop.put("bootstrap.servers", "kafka1:9092, kafka2:9092, kafka3:9092");
    prop.put("group.id", "consumerGroupId");
    prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

    consumer.subscribe(Collections.singletton("simple")); // 토픽 구독
    while(조건) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
        }
    }

    consumer.close();
}

스프링 카프카 활용

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092, kafka2:9092, kafka3:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        return factory;
    }
}

 

@KafkaListener(topics = "topicName", groupId = "consumerGroupId")
public void consumer(ConsumerRecord record) {
	System.out.println(record.value() + ":" + record.topic() + ":" + record.partition() + ":" + record.offset());
}

토픽 파티션은 그룹 단위 할당

  • 컨슈머 그룹 단위로 파티션 할당
    • 컨슈머가 파티션 보다 많으면 초과한 컨슈머는 놀게 된다.
  • 이전 커밋 오프셋부터 컨슈머 poll로 오프셋 커밋을 한다. 오프셋이 끝날 때까지 반복
  • 커밋된 오프셋이 없는 경우 : auto.offset.reset 설정 사용
    • earliest : 맨 처음 오프셋 사용
    • latest : 가장 마지막 오프셋 사용 (기본값)
    • none : 컨슈머 그룹에 대한 이전 커밋이 없으면 예외 발생

컨슈머 설정

조회에 영향을 주는 주요 설정

  • fetch.min.bytes : 조회시 브로커가 전송할 최소 데이터 크기
    • 기본값 : 1
    • 이 값이 크면 대기 시간은 늘지만 처리량이 증가
  • fetch.max.wait.ms : 데이터가 최소 크기가 될 때까지 기다릴 시간
    • 기본값 : 500
    • 브로커가 리턴할 때까지 대기하는 시간으로 pol() 메서드의 대기 시간과 다름
  • max.partition.fetch.bytes : 파티션 당 서버가 리턴할 수 있는 최대 크기
    • 기본값 : 1048576 (1MB)

재처리와 순서

  • 동일 메시지 조회 가능성
    • 일시적 커밋 실패, 리밸런스 등에 의해 발생
  • 컨슈머는 멱등성(idempotence)을 고려해야 함
    • ex) 조회수 1증가 → 좋아요 1 증가 → 조회수 1 증가
    • 단순 처리하면 조회수는 2가 아닌 4가 될 수 있음
  • 데이터 특성에 따라 타임스탬프, 일련번호 등을 활용