Kafka 적용
Kafka
주문 결과를 우선적으로 반환 후 재고 업데이트, 주문 결과 저장 등 이후에 수행 되어도 사용자에게 큰 상관이 없는 로직들을 비동기 처리함으로 주문로직에 대한 성능을 개선했습니다.
주문 로직을 따로 데이터 스트림 구조로 만들어 성능의 저하 없이 동시에 지속적인 생산과 소비를 할 수 있습니다.
주문 API 단계
장바구니 조회 → 장바구니 구매 (담은 모든 상품) → 이벤트 상품 확인 → 주문 생성 → 상품 재고 관리 → 주문 결과 확정
Kafka 적용
카프카를 사용하지 않으면 모든 단계를 거친 후 사용자에게 응답을 해주는데, 카프카를 사용하면서 사용자들이 주문 후, 주문이 생성되고 주문 결과가 “주문 중” 으로 넘어가고 상품 재고 관리, 주문 결과 확정 과정을 비동기적으로 처리하여 사용자들이 주문 후 주문이 완료되기까지 기다리지 않고 다른 작업할 수 있도록 로직을 구성하였습니다. 그로 인해 처리량 자체도 매우 높일 수 있었습니다.
카프카 메시지로 무엇을 보낼 것인가 ?
카프카를 사용 시 어느 부분에서 메시지로 보내서 데이터 스트림을 나눌 것을 생각해보았습니다.
- basket Id 만 넘겨서 구매 처리 시 모든 작업을 order 컨슈머 쪽에서 한다.
- 장바구니를 확인 한 뒤 order 정보를 order 컨슈머에 넘기고 주문 생성 작업 부터 한다.
- 주문 생성 이후 주문 상품 정보를 order 컨슈머에 넘기고 주문 생성 이후 작업을 한다.
- basket Id 만 넘겨서 구매 처리 시 모든 작업을 order 컨슈머 쪽에서 한다.
→ 모든 작업을 컨슈머에서 할 경우 컨슈머에서 주문에 관한 모든 작업이 이루어 지게 되고 주문이 많아 질 경우 컨슈머 서버에서 과부하가 되고 처리효율과 처리속도가 안 좋아 질 수 있다.
- 장바구니를 확인 한 뒤 order 정보를 order 컨슈머에 넘기고 주문 생성 작업부터 한다.
→ 주문 생성 이전 과 이후의 작업이 나뉘어 분산 처리가 되지만 주문 처리량이 많을 경우 주문 여부를 확인하기가 늦어 질 수 있다.
- 주문 생성 이후 주문 상품 정보를 order 컨슈머에 넘기고 주문 생성 이후 작업을 한다.
→ 주문 생성까지의 작업을 하여 주문 생성 시 “진행 중” 처리로 만들고 부하가 많이 걸리는 재고 처리 부분을 비동기 처리 후 주문 결과에 따른 상태를 적용시킨다.
사용자는 주문 이후 바로 주문 상태를 확인 해볼 수 있고 다른 작업을 할 수 있다.
Spring Kafka 설정
spring kafka 토픽 설정
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic() {
return new NewTopic("order", 3, (short) 2);
}
spring kafka 프로듀서 설정
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
spring kafka 컨슈머 설정
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Producer OrderService
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
public void createOrder(List<OrderProduct> orderProductList) {
try {
String orderMessage = objectMapper.writeValueAsString(orderProductList);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("order",
orderMessage);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.debug(
"Send message=[ orderId: " + orderProductList.get(0).getOrder().getId()
+ " ] with partition=[" + result.getRecordMetadata().partition()
+ "], offset=[" + result.getRecordMetadata().offset() + "]");
} else {
log.error("Unable to send message=[ orderId: " + orderProductList.get(0)
.getOrder().getId() + " ] due to : " + ex.getMessage());
}
});
} catch (JsonProcessingException e) {
log.error("메세지 변환 실패");
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
Consumer OrderService
@KafkaListener(topics = "order", groupId = "group_1")
public void listener(ConsumerRecord data) throws JsonProcessingException {
log.debug(data.toString());
long startTime = System.currentTimeMillis();
List<OrderProduct> orderProductList = Arrays.asList(objectMapper.readValue(data.value().toString(), OrderProduct[].class));
...
}
카프카를 주문 생성 이후 적용해서 사용자가 주문 이후에 작업을 편하게 할 수 있도록 비동기 처리를 하였다. 카프카를 비동기 처리를 하는데 주문 로직을 통째로 컨슈머 부분으로 넘기는 것도 고려를 해보았는데, 이렇게 되면 컨슈머 쪽에서 다른 처리들까지 부하가 걸리면서 오히려 컨슈머에서 느려질 경우가 있을 수 있어 주문 정보를 오브젝트맵퍼를 이용해서 문자형태로 만들어 메시지로 넘겨 주문 로직을 분리하는 게 부하분산에 좋다고 판단해 그렇게 적용하였다.
카프카 서버에서 설정을 할 수 있는 부분들이 있지만, Spring Kafka를 활용해서 설정을 하였다. 설정들은 기본적으로 확인된 부분들만 사용하고 추후 확인을 좀 더 하면서 설정을 추가하면서 수정을 해야 할 것 같다.
'생각정리 > 항해99' 카테고리의 다른 글
[실전 프로젝트] 5주차 - 1 (0) | 2023.11.17 |
---|---|
[실전 프로젝트] 4주차 - 2 (0) | 2023.11.17 |
[실전 프로젝트] 3주차 - 2 (0) | 2023.11.17 |
[실전 프로젝트] 3주차 - 1 (0) | 2023.10.14 |
[실전 프로젝트] 2주차 - 2 (0) | 2023.10.13 |