-
🚐 Kafka 메시지 발행 최적화: 동기 방식에서 코루틴까지의 여정🧑🏻💻 프로젝트 2025. 3. 24. 17:45
⚠️ 문제 상황
트랜잭션 아웃박스 패턴을 사용하여 outbox 테이블에서 배치 단위로 데이터를 가져와 Kafka로 메시지를 발행하는 과정에서 메시지가 실제로 Kafka에 정상적으로 전달되었는지 확인하는 기능이 필요했습니다.
🤔 초기 접근법 - 동기적 처리 KafkaTemplate.send().get()
Kafka에 메시지가 정상적으로 전송되었는지를 확인하기 위해 get() 메서드를 사용했습니다.
val successfulIds = mutableListOf<Long>() pendingMessages.forEach { message -> try { val topic = kafkaProperties.getTopicForOutboxType(message.outboxType) kafkaTemplate.send(topic, message.storyId.toString(), message.payload).get() successfulIds.add(message.id!!) } catch (e: Exception) { logger.error("Kafka 전송 실패: ${e.message}") } }
💣 문제점
- 동기적 처리로 메시지당 응답을 기다려야 함
- 대량 메시지 처리 시 성능 저하
- Kafka의 비동기적 특성을 활용하지 못함
- 순차적 처리로 전체 소요 시간 증가
이렇게 send().get() 을 사용한 동기적 처리는 성능 저하를 초래할 수 있어 다른 방법을 고려하게 되었습니다.
💡 Callback 함수를 사용하자!
이 문제를 해결하기 위해 callback 함수를 적용하는 방법을 고려했습니다.
👍 장점
- send()가 호출된 후 별도의 스레드에서 전송 결과를 처리할 수 있어 성능 저하 없이 빠르게 메시지를 publish
- 비동기적으로 메시지를 보낼 수 있어 batch size 만큼의 메시지를 한 번에 처리하는 데 유리
❗ ListenableFutureCallback
fun <K : Any, V> KafkaTemplate<K, V>.sendMessageWithCallback(topic: String, key: K, value: V) { send(topic, key, value).addCallback(object : ListenableFutureCallback<SendResult<K, V>> { override fun onSuccess(result: SendResult<K, V>?) { println("Message delivered to Kafka with offset: \\${result?.recordMetadata?.offset()}") } override fun onFailure(ex: Throwable) { logger.error("Kafka message delivery failed", ex) } }) }
⚠️ ListenableFutureCallback 사용 시 문제점
Spring Kafka 6.0에서는 ListenableFutureCallback이 deprecated 되었습니다.
📌 CompletableFuture 기반 구현
pendingMessages.content .map { message -> try { val topic = outboxTopicResolver.getTopicForOutboxType(message.outboxType) CompletableFuture.supplyAsync { kafkaTemplate.send(topic, message.storyId.toString(), message).get() }.thenAccept { result -> successfulIds.add(message.id!!) logger.info("Message delivered to Kafka with offset: ${result.recordMetadata.offset()}") }.exceptionally { ex -> logger.error("Kafka message delivery failed", ex) null } } catch (e: Exception) { logger.error("Kafka 전송 실패: ${e.message}") CompletableFuture.completedFuture(null) } } .toTypedArray() .let { futuresArray -> CompletableFuture.allOf(*futuresArray).join() } if (successfulIds.isNotEmpty()) { outboxRepository.deleteAllByIds(successfulIds) }
🤔 CompletableFuture.supplyAsync()를 사용한 이유
- supplyAsync는 별도의 스레드에서 실행되므로 Kafka 전송이 블로킹되지 않음
- .thenAccept를 사용하여 전송 성공 시 후속 작업을 정의 가능
- .exceptionally를 통해 예외 처리를 수행할 수 있어 실패 시 로깅 또는 재시도 로직을 추가 가능
⚠️ 구현 과정에서 발생한 이슈 / 처리
1️⃣ 멀티스레드 환경에서의 데이터 무결성
- 비동기 처리로 인해 여러 스레드가 동시에 successfulIds 리스트에 접근하면서 발생 가능한 동시성 문제
⇒ 스레드 안전한 컬렉션 사용
val successfulIds = Collections.synchronizedList(mutableListOf<Long>())
2️⃣ 예외 처리 계층화
- 비동기 실행 시 발생하는 예외와 초기화/설정 단계에서 발생하는 예외의 구분 필요
⇒ 이중 예외 처리
- try-catch 블록: 즉시 발생하는 예외 처리 (카프카 메시지 전송 시도 전)
- .exceptionally { } 블록: 비동기 전송 중 발생하는 예외 처리
3️⃣ 실행 순서 보장
- 메시지 발행 성공 후에만 ID를 추가하도록 보장 필요
⇒ thenAccept를 사용하여 성공 시에만 ID 추가 로직 실행
.thenAccept { result -> successfulIds.add(message.id!!) // 추가 로직 }
4️⃣ 데이터 일관성 보장
- 모든 kafka에 메세지를 publish 하는 과정이 끝나기 전에 deleteAllByIds 로직이 실행되어 데이터 일관성이 깨질 수 있어 모든 메시지가 발행된 이후에 삭제로직이 실행되도록 해주어야 함.
- 각 메시지마다 CompletableFuture를 생성
- Stream을 CompletableFuture 배열로 변환
- CompletableFuture.allOf()를 사용하여 모든 작업이 완료될 때까지 대기
⭐ Coroutine 사용
기존 코드에서는 CompletableFuture.supplyAsync를 활용하여 Kafka 메시지 전송을 비동기적으로 수행하고, 결과를 thenAccept와 exceptionally를 사용하여 처리하고 있었습니다.
⚠️ 기존 코드의 문제점
CompletableFuture 를 사용한다고 했을 때 발생할 수 있는 문제는 아래와 같다고 생각했습니다.
- 스레드 풀 관리의 어려움
- CompletableFuture.supplyAsync는 기본적으로 ForkJoinPool을 사용하여 스레드 풀 크기 조절 및 관리가 어렵습니다.
- 처리량이 많아지면 ForkJoinPool이 포화될 가능성이 있습니다.
- 블로킹 I/O 작업 문제
- kafkaTemplate.send(...).get()은 완전히 블로킹 호출로 비동기 컨텍스트 내에서 스레드를 불필요하게 차단합니다.
- 과도한 스레드 사용
- 작업당 하나의 스레드를 사용하여 대량의 메시지 처리 시 시스템 리소스에 부담이 커집니다.
- 예외 전파 및 관리 어려움
- exceptionally로 개별 예외를 처리하지만 전체 흐름 관리가 어렵습니다.
- 일부 메시지 실패가 전체 처리에 영향을 줄 수 있습니다.
- 리스트 변환 과정의 복잡성
- toTypedArray(), allOf(), join() 등의 복잡한 변환 과정이 필요합니다.
- join()은 블로킹 호출로 코루틴 기반 코드와 호환성이 떨어집니다.
- 코드 가독성
- 중첩된 콜백 구조로 코드 복잡성이 높고 유지보수가 어렵습니다.
또한, 프로젝트가 Kotlin과 Spring Boot를 기반으로 하므로 Kotlin의 기본 비동기 처리 메커니즘인 코루틴을 활용하는 것이 기술 스택의 일관성 측면에서 더 적합하다고 생각해 코루틴을 활용하기로 했습니다.
💻 개선된 코드 (Kotlin Coroutine 기반)
개선된 코드는 Kotlin Coroutine을 활용하여 보다 효율적으로 비동기 작업을 수행합니다.
💻 코루틴을 사용한 코드
@Service class ...( // ... @Transactional fun publishPendingMessages() { val pageable = Pageable.ofSize(batchSize) val pendingMessages = outboxRepository.findAllByOrderByCreatedAtAsc(pageable) if (pendingMessages.isEmpty()) { return } val successfulIds = runBlocking { publishPendingMessagesAsync(pendingMessages) } if (successfulIds.isNotEmpty()) { outboxRepository.deleteAllById(successfulIds) } } private suspend fun publishPendingMessagesAsync(pendingMessages: List<StoryOutbox>): List<Long> = supervisorScope { pendingMessages.map { message -> async { try { val topic = outboxTopicResolver.getTopicForOutboxType(message.outboxType) val result = withContext(Dispatchers.IO) { kafkaTemplate.send(topic, message.storyId.toString(), message).await() } logger.info("Message delivered to Kafka with offset: ${result.recordMetadata.offset()}") message.id // ✅ 성공한 경우 ID 반환 } catch (e: Exception) { logger.error("Kafka message delivery failed: ${e.message}", e) null // ✅ 실패 시 null 반환 } } }.awaitAll().filterNotNull() } }
✅ Coroutine을 활용한 개선점
- 스레드 풀 관리의 어려움 ✅
- Dispatchers.IO를 사용하여 I/O 작업에 최적화된 스레드 풀 사용
- 코루틴은 스레드보다 가벼운 작업 단위로 보다 효율적인 자원 관리 가능
- 블로킹 I/O 작업 문제 ✅
- withContext(Dispatchers.IO)로 블로킹 작업을 I/O 전용 스레드 풀로 격리
- await() 구현으로 완전한 논블로킹 처리 가능
- 과도한 스레드 사용 ✅
- 코루틴은 적은 수의 스레드로 많은 동시 작업 처리 가능
- 하나의 스레드에서 여러 코루틴이 실행되어 자원 효율성 증가
- 예외 전파 및 관리 어려움 ✅
- supervisorScope를 사용하여 한 작업의 실패가 다른 작업에 영향을 주지 않도록 함
- try-catch로 각 작업의 예외를 명확하게 처리하고 null 반환으로 실패 표시
- 리스트 변환 과정의 복잡성 ✅
- 복잡한 타입 변환 과정 제거
- 코드 가독성 ✅
- 순차적인 코드 흐름으로 이해하기 쉬운 구조
- 콜백 중첩 제거로 코드 흐름 파악이 용이
🤔 Coroutine을 사용하면서 추가 고려한 점
1️⃣ launch vs async-awaitAll
- async를 활용한 병렬 실행
- launch 대신 async를 사용하여 Deferred 리스트를 만들고 마지막에 awaitAll()을 호출하여 병렬 실행된 모든 작업이 완료될 때까지 기다립니다.
- 성공한 ID 리스트 반환 방식 변경
- launch 대신 async를 사용하여 successfulIds를 따로 관리할 필요 없이 자연스럽게 성공한 메시지만 반환합니다.
- 개별적으로 successfulIds.add(...)를 호출하는 대신 성공한 경우 해당 ID를 반환하도록 하고 awaitAll() 이후 filterNotNull()을 적용하여 성공한 ID만 남깁니다.
2️⃣ supervisorScope vs coroutineScope
- coroutineScope를 사용하면 내부 코루틴 중 하나라도 예외가 발생하면 전체 스코프가 취소됩니다.
- 반면, supervisorScope를 사용하면 개별 실패가 다른 코루틴에 영향을 주지 않으며 실패한 작업만 개별적으로 처리됩니다.
- Kafka 메시지 전송은 개별 메시지 단위로 수행되므로 하나의 메시지 전송 실패가 전체 배치 작업을 중단시키지 않도록 supervisorScope를 사용했습니다.
3️⃣ withContext(Dispatchers.IO) 사용
- Kafka 메시지 전송(kafkaTemplate.send(...))은 네트워크 I/O가 포함된 작업으로 Dispatchers.IO에서 실행하는 것이 적절하다고 생각.
- Dispatchers.IO는 I/O 작업에 최적화된 스레드 풀을 활용해 CompletableFuture.supplyAsync에서 공용 스레드 풀을 사용할 때보다 성능 및 안정성이 향상됩니다.
4️⃣ runBlocking 사용 이유
- publishPendingMessages 함수는 @Transactional 어노테이션이 적용된 함수이므로 최상위 수준에서 동기적으로 실행되어야 합니다.
- 내부적으로 suspend 함수를 호출해야 하므로 runBlocking을 사용하여 Coroutine 컨텍스트를 생성하고 publishPendingMessagesAsync를 실행하도록 했습니다.
👍 최종 개선된 동작 방식과 장점
최종적으로 코루틴 기반의 비동기 메시지 전송 방식을 사용하면서 처음 get()을 사용했던 방식과 비교해 다음과 같은 개선이 이루어졌습니다.
✅ Coroutine 기반으로 보다 자연스럽고 효율적인 비동기 처리 가능
✅ supervisorScope를 사용하여 개별 실패가 전체 프로세스 중단을 유발하지 않도록 개선
✅ withContext(Dispatchers.IO)를 사용하여 블로킹 I/O가 메인 스레드에 영향을 주지 않도록 최적화
✅ 동시성 안전성을 고려하여 성공한 메시지를 저장하는 구조 개선
이러한 개선을 통해 Kafka 메시지 전송의 안정성과 성능을 동시에 확보하고자 했습니다. 🚀
📚 참고
https://www.baeldung.com/java-callbacks-listenablefuture-completablefuture
https://www.baeldung.com/java-completablefuture-allof-join