본문 바로가기

생각정리/Kafka

카프카 프로듀서

프로듀서의 기본 흐름

  1. send : send 메서드를 이용해서 메시지를 전송받음
  2. Serializer : 메시지를 byte 배열로 변환
  3. Partitioner : 메시지를 어느 토픽의 파티션으로 보낼지 결정
  4. 버퍼 : 변환된 byte 메시지를 버퍼에 모음
  5. Sender : 배치로 묶어서 카프카 브로커로 전송
    • 별도 스레드로 동작한다.
    • 배치를 버퍼에서 꺼내서 차례대로 브로커로 전송 (동시작업)
    • 배치를 보내는 동안 send메서드에서 배치를 모음 (동시작업)

토픽에 메시지 전송

  • 프로퍼티를 이용해서 프로듀서가 사용할 속성을 지정한다.
    • 브로커 목록, 키와 벨류 정렬 방식, 에크, 배치 사이즈등의 설정을 지정 가능
  • 프로퍼티를 이용해 카프카 프로듀서 객체를 생성한 후, send 메서드를 이용해서 메시지를 보낸다.
    • send를 할 때 프로듀서 레코드를 생성해서 보내는데 토픽명, 키, 값 혹은 토픽명, 값으로 생성할 수 있다.
  • 프로듀서를 사용한 뒤 닫아줘야 한다.
public void producer() { 
    Properties prop = new Properties();
    prop.put("bootstrap.servers", "kafka1:9092, kafka2:9092, kafka3:9092");
    prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

    producer.send(new ProducerRecord<>("topicname", "key", "value"));
    producer.send(new ProducerRecord<>("topicname", "value"));

    producer.close();
}

 

스프링 카프카 활용

프로듀서 설정을 빈객체로 추가를 하고 해당 프로듀서 설정을 기본으로 생성되는 카프카 템플릿을 만들어서 사용한다.

@Configuration
public class KafkaProducerConfig {

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092, kafka2:9092, kafka3:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

카프카 탬플릿을 추가해서 간편히 send를 사용할 수 있다.

private final KafkaTemplate<String, String> kafkaTemplate;

public void producer() { 
	kafkaTemplate.send("topicname", "key", "value");
	kafkaTemplate.send("topicname", "value");
}

전송 결과 확인 방법

Callback 사용 시

producer.send(new ProducerRecord<>("simple", "value"),
	new Callback() {
		@Override
		public void onCompletion(RecordMetadata metadata, Exception ex) {
			if (metadata != null) {
				// 성공	
			} else {
				// 실패
			}
		}
	});
  • 주로 쓰는 메시지 큐 처리 방식으로 처리량 저하가 없이 사용할 수 있다.

Future 사용 시

Future<RecordMetadata> f = producer.send(new ProducerRecord<>("topic", "value"));
try {
	Recordmetadata meta = f.get(); // 블로킹
} catch (ExecutionException ex) {
	
}
  • 하나의 메시지를 보낼 때마다 블로킹이 되어 배치에 메시지가 쌓이지 않아 배치 효과가 떨어져 처리량이 저하된다.
  • 처리량이 낮아도 되는 경우에 사용
CompletableFuture<SendResult<String, String>> f = producer.send(new ProducerRecord<>("topic", "value"));
f.whenComplete((result, ex) -> {
    if (ex == null) {
        // 성공
    } else {
        // 실패
    }
});
  • CompletableFuture를 이용해서 비동기 처리도 가능하다.

전송 보장과 ack

프로듀서는 전송을 보장하기 위해서 ack를 사용한다.

기본 

  • ack = 0
    • 서버 응답을 기다리지 않음
    • 전송 보장도 zero, 처리량 증가
    • 메시지가 유실되더라도 실패여부를 알 수 없기 때문에 메시지 전송여부가 중요한 경우 사용하면 안 된다.
  • ack = 1
    • 파티션의 리더에 저장되면 성공 응답받음
    • 리더 장애 시 메시지 유실 가능
    • 리더에 저장되면서 성공 응답을 받고 다음 팔로워로 복제되지 않은 상황에서 리더에서 오류 시 메시지 유실될 수 있다.
  • ack = all (또는 -1)
    • 모든 레플리카에 저장되면 응답받음
      • 브로커 min.insync.replicas 설정에 따라 달라짐
      • min.insync.replicas (브로커 옵션) : 프로듀서 ack 옵션이 all일 때 저장에 성공했다고 응답할 수 있는 동기화된 레플리카 최소 개수 (브로커가 3개일 경우 2를 권장)
      • ex) 레플리카 개수 3, ack = all, min.insync.replicas = 2일 경우, 리더와 팔로워 1개에 저장 되면 성공 응답
      • ex) 레플리카 개수 3, ack = all, min.insync.replicas = 1일 경우, 리더에 저장되면 성공 응답, ack = 1과 동일
      • ex) 레플리카 개수 3, ack = all, min.insync.replicas = 3일 경우, 리더와 팔로워 2개에 저장되면 성공 응답, 팔로워 중 한 개라도 장애가 나면 레플리카 부족으로 저장에 실패함

프로듀서 에러 유형

  • 전송 과정에서 실패
    • 전송 타임 아웃(일시적인 네트워크 오류 등)
    • 리더 다운에 의한 새 리더 선출 진행 중
    • 브로커 설정 메시지 크기 한도 초과
    • 등등
  • 전송 전에 실패
    • 직렬화 실패, 프로듀서 자체 요청 크기 제한 초과
    • 프로듀서 버퍼가 차서 기다린 시간이 최대 대기 시간 초과
    • 등등

실패 대응 방법

1. 재시도

  • 재시도 가능한 에러는 재시도 처리
    • ex) 브로커 응답 타임 아웃, 일시적인 리더 없음 등
  • 재시도 위치
    • 프로듀서는 자체적으로 브로커 전송 과정에서 에러가 발생하면 재시도 가능한 에러에 대해 재전송 시도
    • send() 메서드에서 예외 발생 시 예외 타입에 따라 send() 재호출
    • 콜백 메서드에서 예외 받으면 타입에 따라 send() 재호출
  • 아주 특별한 이유가 없다면 무한 재시도는 하면 안된다.
    • 무한 재시도 과정에서 전체 메시지가 멈춰 있어 횟수나 시간을 제한 둬야한다.
  • 브로커 응답이 늦게 와서 재시도할 경우 중복 발송 가능성이 있다.
    • enable.idempotence속성을 이용해서 중복 발송을 줄일 수 있다.
    • 멱등성 프로듀서로 설정하여 동일한 데이터를 여러번 전송하더라도 카프카 클러스터에 단 한번만 저장되도록 한다.
  • max.in.flight.requests.per.connection
    • 블록킹없이 한 커넥션에서 전송할 수 있는 최대 전송중인 요청 개수
    • 이 값이 1보다 크면 재시도 시점에 따라 메시지 순서가 바뀔 수 있음 (기본값 5) 
      • 전송 순서가 중요하면 이 값을 1로 지정
    • ex) 배치 1 전송 → 실패, 배치2 전송 → 성공, 배치3 전송 → 성공, 배치1 전송 재시도 → 성공
    • 원래 순서는 1 → 2 → 3 이지만 카프카에 저장된 배치 순서 2 → 3 → 1로 저장이 될 수 있다.

2. 기록

  • 추후 처리를 위해 기록
    • 별도 파일, DB 등을 이용해서 실패한 메시지 기록
    • 추후에 수동(또는 자동) 보정 작업 진행
  • 기록 위치
    • send() 메서드에서 예외 발생시
    • send() 메서드에 전달한 콜백에서 예외 받은 경우
    • send() 메서드가 리턴한 Future의 get() 메서드에서 예외 발생 시

추가 설정 적용

프로듀서 설정

@Configuration
public class KafkaProducerConfig {

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092, kafka2:9092, kafka3:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 프로듀서
        props.put(ProducerConfig.RETRIES_CONFIG, 10); // 전송 실패 시 재전송 횟수

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

토픽 설정

@Configuration
public class KafkaTopicConfig {

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092, kafka2:9092, kafka3:9092");
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("test")
                .partitions(3)
                .replicas(3)
                .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") // 성공 처리 레플리카 최소 개수
                .config(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
                .build();
    }
}