ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 🚐 Kafka 메시지 발행 최적화: 동기 방식에서 코루틴까지의 여정
    🧑🏻‍💻 프로젝트 2025. 3. 24. 17:45

    ⚠️ 문제 상황

    트랜잭션 아웃박스 패턴을 사용하여 outbox 테이블에서 배치 단위로 데이터를 가져와 Kafka로 메시지를 발행하는 과정에서 메시지가 실제로 Kafka에 정상적으로 전달되었는지 확인하는 기능이 필요했습니다.

     

    🤔  초기 접근법 - 동기적 처리 KafkaTemplate.send().get()

    Kafka에 메시지가 정상적으로 전송되었는지를 확인하기 위해 get() 메서드를 사용했습니다.

    val successfulIds = mutableListOf<Long>()
    
    pendingMessages.forEach { message ->
        try {
            val topic = kafkaProperties.getTopicForOutboxType(message.outboxType)
            kafkaTemplate.send(topic, message.storyId.toString(), message.payload).get()
            successfulIds.add(message.id!!)
        } catch (e: Exception) {
            logger.error("Kafka 전송 실패: ${e.message}")
        }
    }

     

    💣 문제점

    • 동기적 처리로 메시지당 응답을 기다려야 함
    • 대량 메시지 처리 시 성능 저하
    • Kafka의 비동기적 특성을 활용하지 못함
    • 순차적 처리로 전체 소요 시간 증가

    이렇게 send().get() 을 사용한 동기적 처리는 성능 저하를 초래할 수 있어 다른 방법을 고려하게 되었습니다.

     

    💡 Callback 함수를 사용하자!

    이 문제를 해결하기 위해 callback 함수를 적용하는 방법을 고려했습니다.

     

    👍 장점

    • send()가 호출된 후 별도의 스레드에서 전송 결과를 처리할 수 있어 성능 저하 없이 빠르게 메시지를 publish
    • 비동기적으로 메시지를 보낼 수 있어 batch size 만큼의 메시지를 한 번에 처리하는 데 유리

     

    ❗ ListenableFutureCallback

    fun <K : Any, V> KafkaTemplate<K, V>.sendMessageWithCallback(topic: String, key: K, value: V) {
        send(topic, key, value).addCallback(object : ListenableFutureCallback<SendResult<K, V>> {
            override fun onSuccess(result: SendResult<K, V>?) {
                println("Message delivered to Kafka with offset: \\${result?.recordMetadata?.offset()}")
            }
            override fun onFailure(ex: Throwable) {
                logger.error("Kafka message delivery failed", ex)
            }
        })
    }

     

    ⚠️ ListenableFutureCallback 사용 시 문제점

     

    Spring Kafka 6.0에서는 ListenableFutureCallback이 deprecated 되었습니다.

     

    📌  CompletableFuture 기반 구현

    pendingMessages.content
        .map { message ->
            try {
                val topic = outboxTopicResolver.getTopicForOutboxType(message.outboxType)
                CompletableFuture.supplyAsync {
                    kafkaTemplate.send(topic, message.storyId.toString(), message).get()
                }.thenAccept { result ->
                    successfulIds.add(message.id!!)
                    logger.info("Message delivered to Kafka with offset: ${result.recordMetadata.offset()}")
                }.exceptionally { ex ->
                    logger.error("Kafka message delivery failed", ex)
                    null
                }
            } catch (e: Exception) {
                logger.error("Kafka 전송 실패: ${e.message}")
                CompletableFuture.completedFuture(null)
            }
        }
        .toTypedArray()
        .let { futuresArray -> CompletableFuture.allOf(*futuresArray).join() }
    
    if (successfulIds.isNotEmpty()) {
        outboxRepository.deleteAllByIds(successfulIds)
    }

     

    🤔 CompletableFuture.supplyAsync()를 사용한 이유

    • supplyAsync는 별도의 스레드에서 실행되므로 Kafka 전송이 블로킹되지 않음
    • .thenAccept를 사용하여 전송 성공 시 후속 작업을 정의 가능
    • .exceptionally를 통해 예외 처리를 수행할 수 있어 실패 시 로깅 또는 재시도 로직을 추가 가능

     

    ⚠️ 구현 과정에서 발생한 이슈 / 처리

     

    1️⃣ 멀티스레드 환경에서의 데이터 무결성

    • 비동기 처리로 인해 여러 스레드가 동시에 successfulIds 리스트에 접근하면서 발생 가능한 동시성 문제

    ⇒ 스레드 안전한 컬렉션 사용

    val successfulIds = Collections.synchronizedList(mutableListOf<Long>())
    

     

    2️⃣ 예외 처리 계층화

    • 비동기 실행 시 발생하는 예외와 초기화/설정 단계에서 발생하는 예외의 구분 필요

    ⇒ 이중 예외 처리

    1. try-catch 블록: 즉시 발생하는 예외 처리 (카프카 메시지 전송 시도 전)
    2. .exceptionally { } 블록: 비동기 전송 중 발생하는 예외 처리

     

    3️⃣ 실행 순서 보장

    • 메시지 발행 성공 후에만 ID를 추가하도록 보장 필요

    ⇒ thenAccept를 사용하여 성공 시에만 ID 추가 로직 실행

    .thenAccept { result ->
        successfulIds.add(message.id!!)
        // 추가 로직
    }

     

    4️⃣ 데이터 일관성 보장

    • 모든 kafka에 메세지를 publish 하는 과정이 끝나기 전에 deleteAllByIds 로직이 실행되어 데이터 일관성이 깨질 수 있어 모든 메시지가 발행된 이후에 삭제로직이 실행되도록 해주어야 함.
    • 각 메시지마다 CompletableFuture를 생성
    • Stream을 CompletableFuture 배열로 변환
    • CompletableFuture.allOf()를 사용하여 모든 작업이 완료될 때까지 대기

     

    ⭐ Coroutine 사용

    기존 코드에서는 CompletableFuture.supplyAsync를 활용하여 Kafka 메시지 전송을 비동기적으로 수행하고, 결과를 thenAccept와 exceptionally를 사용하여 처리하고 있었습니다.

     

    ⚠️ 기존 코드의 문제점

    CompletableFuture 를 사용한다고 했을 때 발생할 수 있는 문제는 아래와 같다고 생각했습니다.

     

    • 스레드 풀 관리의 어려움
      • CompletableFuture.supplyAsync는 기본적으로 ForkJoinPool을 사용하여 스레드 풀 크기 조절 및 관리가 어렵습니다.
      • 처리량이 많아지면 ForkJoinPool이 포화될 가능성이 있습니다.

     

    • 블로킹 I/O 작업 문제
      • kafkaTemplate.send(...).get()은 완전히 블로킹 호출로 비동기 컨텍스트 내에서 스레드를 불필요하게 차단합니다.

     

    • 과도한 스레드 사용
      • 작업당 하나의 스레드를 사용하여 대량의 메시지 처리 시 시스템 리소스에 부담이 커집니다.

     

    • 예외 전파 및 관리 어려움
      • exceptionally로 개별 예외를 처리하지만 전체 흐름 관리가 어렵습니다.
      • 일부 메시지 실패가 전체 처리에 영향을 줄 수 있습니다.

     

    • 리스트 변환 과정의 복잡성
      • toTypedArray(), allOf(), join() 등의 복잡한 변환 과정이 필요합니다.
      • join()은 블로킹 호출로 코루틴 기반 코드와 호환성이 떨어집니다.

     

    • 코드 가독성
      • 중첩된 콜백 구조로 코드 복잡성이 높고 유지보수가 어렵습니다.

     

    또한, 프로젝트가 Kotlin과 Spring Boot를 기반으로 하므로 Kotlin의 기본 비동기 처리 메커니즘인 코루틴을 활용하는 것이 기술 스택의 일관성 측면에서 더 적합하다고 생각해 코루틴을 활용하기로 했습니다.

     

    💻  개선된 코드 (Kotlin Coroutine 기반)

    개선된 코드는 Kotlin Coroutine을 활용하여 보다 효율적으로 비동기 작업을 수행합니다.

     

    💻 코루틴을 사용한 코드

    @Service
    class ...(
        // ... 
        @Transactional
        fun publishPendingMessages() {
    
            val pageable = Pageable.ofSize(batchSize)
            val pendingMessages = outboxRepository.findAllByOrderByCreatedAtAsc(pageable)
    
            if (pendingMessages.isEmpty()) {
                return
            }
    
            val successfulIds = runBlocking {
                publishPendingMessagesAsync(pendingMessages)
            }
    
            if (successfulIds.isNotEmpty()) {
                outboxRepository.deleteAllById(successfulIds)
            }
        }
    
        private suspend fun publishPendingMessagesAsync(pendingMessages: List<StoryOutbox>): List<Long> = supervisorScope {
            pendingMessages.map { message ->
                async {
                    try {
                        val topic = outboxTopicResolver.getTopicForOutboxType(message.outboxType)
                        val result = withContext(Dispatchers.IO) {
                            kafkaTemplate.send(topic, message.storyId.toString(), message).await()
                        }
                        logger.info("Message delivered to Kafka with offset: ${result.recordMetadata.offset()}")
                        message.id  // ✅ 성공한 경우 ID 반환
                    } catch (e: Exception) {
                        logger.error("Kafka message delivery failed: ${e.message}", e)
                        null  // ✅ 실패 시 null 반환
                    }
                }
            }.awaitAll().filterNotNull()
        }
    }

     

    Coroutine을 활용한 개선점

    • 스레드 풀 관리의 어려움
      • Dispatchers.IO를 사용하여 I/O 작업에 최적화된 스레드 풀 사용
      • 코루틴은 스레드보다 가벼운 작업 단위로 보다 효율적인 자원 관리 가능

     

    • 블로킹 I/O 작업 문제
      • withContext(Dispatchers.IO)로 블로킹 작업을 I/O 전용 스레드 풀로 격리
      • await() 구현으로 완전한 논블로킹 처리 가능

     

    • 과도한 스레드 사용
      • 코루틴은 적은 수의 스레드로 많은 동시 작업 처리 가능
      • 하나의 스레드에서 여러 코루틴이 실행되어 자원 효율성 증가

     

    • 예외 전파 및 관리 어려움
      • supervisorScope를 사용하여 한 작업의 실패가 다른 작업에 영향을 주지 않도록 함
      • try-catch로 각 작업의 예외를 명확하게 처리하고 null 반환으로 실패 표시

     

    • 리스트 변환 과정의 복잡성
      • 복잡한 타입 변환 과정 제거

     

    • 코드 가독성
      • 순차적인 코드 흐름으로 이해하기 쉬운 구조
      • 콜백 중첩 제거로 코드 흐름 파악이 용이

     

    🤔 Coroutine을 사용하면서 추가 고려한 점

     

    1️⃣ launch vs async-awaitAll

    • async를 활용한 병렬 실행
      • launch 대신 async를 사용하여 Deferred 리스트를 만들고 마지막에 awaitAll()을 호출하여 병렬 실행된 모든 작업이 완료될 때까지 기다립니다.
    • 성공한 ID 리스트 반환 방식 변경
      • launch 대신 async를 사용하여 successfulIds를 따로 관리할 필요 없이 자연스럽게 성공한 메시지만 반환합니다.
      • 개별적으로 successfulIds.add(...)를 호출하는 대신 성공한 경우 해당 ID를 반환하도록 하고 awaitAll() 이후 filterNotNull()을 적용하여 성공한 ID만 남깁니다.

     

    2️⃣ supervisorScope vs coroutineScope

    • coroutineScope를 사용하면 내부 코루틴 중 하나라도 예외가 발생하면 전체 스코프가 취소됩니다.
    • 반면, supervisorScope를 사용하면 개별 실패가 다른 코루틴에 영향을 주지 않으며 실패한 작업만 개별적으로 처리됩니다.
    • Kafka 메시지 전송은 개별 메시지 단위로 수행되므로 하나의 메시지 전송 실패가 전체 배치 작업을 중단시키지 않도록 supervisorScope를 사용했습니다.

     

    3️⃣ withContext(Dispatchers.IO) 사용

    • Kafka 메시지 전송(kafkaTemplate.send(...))은 네트워크 I/O가 포함된 작업으로 Dispatchers.IO에서 실행하는 것이 적절하다고 생각.
    • Dispatchers.IO는 I/O 작업에 최적화된 스레드 풀을 활용해 CompletableFuture.supplyAsync에서 공용 스레드 풀을 사용할 때보다 성능 및 안정성이 향상됩니다.

     

    4️⃣ runBlocking 사용 이유

    • publishPendingMessages 함수는 @Transactional 어노테이션이 적용된 함수이므로 최상위 수준에서 동기적으로 실행되어야 합니다.
    • 내부적으로 suspend 함수를 호출해야 하므로 runBlocking을 사용하여 Coroutine 컨텍스트를 생성하고 publishPendingMessagesAsync를 실행하도록 했습니다.

     

    👍 최종 개선된 동작 방식과 장점

    최종적으로 코루틴 기반의 비동기 메시지 전송 방식을 사용하면서 처음 get()을 사용했던 방식과 비교해 다음과 같은 개선이 이루어졌습니다.

     

    Coroutine 기반으로 보다 자연스럽고 효율적인 비동기 처리 가능

     

    ✅ supervisorScope를 사용하여 개별 실패가 전체 프로세스 중단을 유발하지 않도록 개선

     

    ✅ withContext(Dispatchers.IO)를 사용하여 블로킹 I/O가 메인 스레드에 영향을 주지 않도록 최적화

     

    ✅ 동시성 안전성을 고려하여 성공한 메시지를 저장하는 구조 개선

     

    이러한 개선을 통해 Kafka 메시지 전송의 안정성과 성능을 동시에 확보하고자 했습니다. 🚀

     

    📚 참고

    https://www.baeldung.com/java-callbacks-listenablefuture-completablefuture

    https://www.baeldung.com/java-completablefuture-allof-join

    https://yeon-kr.tistory.com/228

    https://medium.com/@appdevinsights/difference-between-coroutinescope-and-supervisorscope-in-kotlin-coroutine-cfedd1a8d3af

    댓글

Designed by Tistory.