ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 3장 - 카프카 기본 개념
    📕 book/아파치 카프카 2022. 9. 18. 13:18

    3. 1 카프카 브로커, 클러스터, 주키퍼


    💡 카프카 브로커는 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체
    데이터를 분산 저장하여 장애가 발생하더라도 안전하게 사용할 수 있도록 도와주는 애플리케이션

    하나의 서버에는 한 개의 카프카 브로커 프로세스가 실행

    • 3대 이상의 브로커 서버를 1개의 클러스터로 묶어 운영
    • 브로커들은 프로듀서가 보낸 데이터를 안전하게 분산 저장하고 복제하는 역할 수행

    데이터 저장, 전송

    프로듀서로 부터 데이터를 전달 받으면 카프카 브로커는 프로듀서가 요청한 토픽의 파티션에 데이터를 저장

    • 프로듀서로부터 전달된 데이터는 파일 시스템에 저장된다.

    컨슈머가 데이터를 요청하면 파티션에 저장된 데이터를 전달.

    카프카는 메모리나 DB에 저장하지 않고 캐시 메모리를 구현하여 사용하지도 않는다.

    ❓ 파일 시스템에 저장하기 때문에 속도 이슈가 생기지 않음??!?

    ❗ 카프카는 페이지 캐시(page cache)를 사용하여 디스크 입출력 속도를 높여 이 문제를 해결

    • 페이지 캐시 : OS에서 파일 입출력의 성능 향상을 위해 만들어 놓은 메모리 영역
    • 한번 읽은 파일의 내용은 메모리의 페이지 캐시 영역에 저장시킨다.

    데이터 복제, 싱크

    데이터 복제(Replication)는 카프카를 장애 허용 시스템(fault tolerant system)으로 동작하도록 하는 원동력

    복제의 이유

    • 클러스터로 묶인 브로커중 일부에 장애가 발생하더라도 데이터를 유실하지 않고 안전하게 사용하기 위함.

    카프카의 데이터 복제는 파티션 단위로 이루어진다.

    • 복제의 개수 최솟값은 1
    • 최댓값은 브로커 개수

    복제된 파티션은 리더와 팔로워로 구성된다.

    리더 : 프로듀서 또는 컨슈머와 직접 통신하는 파티션

    팔로워 : 나머지 복제 데이터를 가지고 있는 파티션

    • 팔로워 파티션들은 리더 파티션의 오프셋을 확인하여 현재 자신이 가지고 있는 오프셋과 차이가 나는 경우 리더 파티션으로부터 데이터를 가져와서 자신의 파티션에 저장한다. ⇒ 복제

    브로커가 다운되면 해당 브로커에 있는 리더 파티션은 사용할 수 없기 때문에 팔로워 파티션중 하나가 리더 파티션 지위를 넘겨 받는다.

    • 운영시에는 데이터 종류마다 다른 복제 개수를 설정하고 상황에 따라서는 토픽마다 복제 개수를 다르게 설정하여 운영하기도 한다.

    컨트롤러(controller)

    클러스터의 다수 브로커 중 한대가 컨트롤러의 역할을 한다.

    컨트롤러는 다른 브로커들의 상태를 체크하고 브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재 분배

    데이터 삭제

    카프카는 다른 메시징 플랫폼과 다르게 컨슈머가 데이터를 가져가더라도 토픽의 데이터는 삭제되지 않는다.

    • 컨슈머, 프로듀서가 데이터 삭제를 요청할 수도 없다
    • 오직 브로커 만이 데이터를 삭제할 수 있다.

    데이터 삭제는 파일 단위로 이루어지고 이 단위를 ‘로그 세그먼트’라고 부른다.

    컨슈머 오프셋 저장

    컨슈머 그룹은 토픽이 특정 파티션으로부터 데이터를 가져가서 처리하고 이 파티션의 어느 레코드까지 가져갔는지 확인하기 위해 오프셋을 커밋한다.

    • 커밋한 오프셋은 __consumer_offsets 토픽에 저장
    • 이 토픽에 저장된 오프셋을 토대로 컨슈머 그룹은 다음 레코드를 가져와 처리

    코디네이터(coordinator)

    클러스터의 다수 브로커 중 한 대는 코디네이터의 역할을 수행한다.

    코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배하는 역할을 한다.

    컨슈머가 컨슈머 그룹에서 빠지면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당하여 끊임없이 데이터가 처리되도록 도와준다.

    • 파티션을 컨슈머로 재할당하는 과정을 ‘리밸런스(rebalance)’ 라고 부른다.

    3. 2 토픽과 파티션


    💡 토픽은 카프카에서 데이터를 구분하기 위해 사용하는 단위.
         1 개 이상의 파티션을 소유하고 있다

    파티션에는 프로듀서가 보낸 데이터들이 들어가 저장되는데 이 데이터를 ‘레코드'라고 한다.

    파티션은 카프카의 병렬처리의 핵심

    • 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리할 수 있도록 매칭된다.
    • 컨슈머 개수를 늘림과 동시에 파티션 개수도 늘리면 처리량이 증가하는 효과를 볼 수 있다.

    파티션은 자료구조에서 접하는 큐(Queue)와 비슷한 구조

    • FIFO
    • 다만, 카프카에서는 데이터를 가져가도 삭제하지 않는다.
    • ⇒ 이러한 특징 때문에 토픽의 레코드는 다양한 목적을 가진 여러 컨슈머 그룹들이 토픽의 데이터를 여러번 가져갈 수 있다.

    토픽 이름 제약 조건

    1. 빈 문자열 토픽 이름 ❌
    2. 토픽 이름은 마침표 하나(.) 또는 마침표 둘(..)로 생성 ❌
    3. 토픽의 이름의 길이는 249자 미만으로 생성
    4. 토픽 이름은 영어 대소문자와 숫자 0-9, 마침표(.), 언더바(_), 하이픈(-)의 조합으로 생성
      • 이외의 문자열이 포함된 토픽 ❌
    5. 카프카 내부 로직 관리 목적으로 사용되는 2개 토픽 (__consumer_offsets, ___transaction_state)과 동일한 이름으로 생성 ❌
    6. 카프카 내부적으로 사용하는 로직 때문에 토픽 이름에 마침표(.)와 언더바(_)가 동시에 들어가면 ❌
      • 생성 가능
      • 하지만 사용시 이슈가 발생할 수 있어 WARNING 메시지가 발생
    7. 이미 생성된 토픽이름을 마침표 → 언더바 로 바꾸거나 언더바 → 마침표로 바꾼경우 신규 토픽 이름과 동일하다면 생성 ❌
      • Ex) to.pic → to_pic 생성 못함.

    의미 있는 토픽 이름 작명 방법

    최소한 토픽 이름을 통해 어떤 개발환경에서 사용되는 것인지 판단 가능해야 하고 어떤 애플리케이션에서 어떤 데이터 타입으로 사용되는지 유추할 수 있어야 한다.

    중요한 것은 토픽 이름에 대한 규칙을 사전에 정의하고 구성원들이 그 규칙을 잘 따르는 것.

    카프카는 토픽 이름 변경을 지원하지 ❌

    • 이름을 변경하기 위해서는 삭제 후 다시 생성하는 것 외에는 방법이 없다.
    • MSG-ORDER-DELIVERY-COMPLETED
    • LOG-APPLICATION-ORDER-DAY-DELIVERY-CONSUMER...등등

    3. 3 레코드


    💡 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성되어 있다.

    프로듀서가 생성한 레코드가 브로커로 전송되면 오프셋과 타임스탬프가 지정되어 저장된다.

    브로커에 한번 적재된 레코드는 수정할 수 ❌, 로그 리텐션 기간 또는 용량에 따라서만 삭제된다.

    타임 스탬프

    프로듀서에서 해당 레코드가 생성된 시점의 유닉스 타임이 설정

    • 컨슈머가 이를 토대로 레코드가 언제 생성되었는지 알 수 있다.
    • 프로듀서가 레코드를 생성할 때 임의의 타임스탬프 값을 설정 가능,
    • 토픽 설정에 따라 브로커에 적재된 시간으로 설정될 수 도 있다.

    메시지 키

    메시지 값을 순서대로 처리하거나 메시지 값의 종류를 나타내기 위해 사용

    메시지 키를 사용하면 프로듀서가 토픽에 레코드를 전송할 때 메시지 키의 해시값을 토대로 파티션을 지정하게 됨.

    → 즉, 동일한 메시지 키라면 동일 파티션에 들어간다.

    • 하지만, 어느 파티션에 지정될지 알 수 없고 파티션 개수가 변경되면 메시지 키와 파티션 매칭이 달라져 주의해야 한다.

    메시지키를 선언하지 않으면 null로 설정된다.

    • null로 설정된 레코드는 프로듀서 기본 설정 파티셔너에 따라서 파티션에 분배되 적재된다.

    메시지 값

    실질적으로 처리할 데이터가 들어 있다.

    • 메시지 키와 메시지 값은 직렬화 되어 브로커로 전송된다.

    오프셋

    레코드의 오프셋은 직접 지정할 수 없고 브로커에 저장될 때 이전에 전송된 레코드의 오프셋 + 1 값으로 생성된다.

    헤더

    레코드의 추가적인 정보를 담는 메타데이터 저장소 용도로 사용한다.

    3. 4 카프카 클라이언트


    💡 카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리는
        카프카 프로듀서, 카프카 컨슈머, 어드민 클라이언트
        를 제공하는 카프카 클라이언트를 사용하여 애플리케이션을 개발한다.

    카프카 클라이언트는 라이브러리이기 때문에 자체 라이플사이클을 가진 프레임워크나 애플리케이션 위에서 구현하고 실행해야 한다.

    프로듀서 API

    카프카에서 데이터의 시작점은 프로듀서이다.

    프로듀서 애플리케이션

    • 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송한다.
    • 데이터를 전송시 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신한다.

    프로듀서를 구현하는 가장 기초적인 방법

    → 카프카 클라이언트를 라이브러리로 추가하여 자바 기본 애플리케이션 만들기

    • 그래들 기반의 자바 애플리케이션을 만들면 개발에 필요한 각종 라이브러리를 미리 선언하여 가져올 수 있다.

    프로듀서는 데이터를 직렬화 하여 카프카 브로커로 보내기 때문에 자바에서 선언 가능한 모든 형태를 브로커로 전송할 수 있다.

    직렬화

    • 자바 또는 외부 시스템에서 사용가능하도록 바이트 형태로 데이터를 변환하는 기술

    카프카 프로듀서 프로젝트 생성

    아파치 카프카는 공식 라이브러리로 자바 라이브러리를 지원한다.

    // gradle 디펜던시 정의
    
    plugins {
    	id 'java'.       - 자바 프로그램을 위한 기능을 제공하는 플러그인
    }
    
    group 'com.example'      - 프로젝트 생성 시 설정한 group 명
    version '1.0'      - 프로젝트 버전
    
    repositories {
    	mavenCentral()     
    }
    
    dependencies {
    	compile 'org.apache.kafka:kafka-clients:2.5.0'        
    	complie 'org.slf4j:slf4j-simple:1.7.30'         
    	
    }
    1. 카프카 클라이언트 라이브러리를 가져오기 위한 디펜던시 설정

    프로듀서 애플리케이션

    // simple - kafka - producer
    
    package com.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    public class SimpleProducer {
        private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
        private final static String TOPIC_NAME = "test";  - 1
        private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";   - 2
    
        public static void main(String[] args) {
    
            Properties configs = new Properties();  - 3
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);  - 2
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - 4
    
            KafkaProducer<String, String> producer = new KafkaProducer<>(configs); - 5
    
            String messageValue = "testMessage";  - 6
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue); - 7
            producer.send(record); - 8
            logger.info("{}", record);
            producer.flush();  - 9
            producer.close();  - 10
        }
    }
    
    1. 프로듀서는 생성한 레코드를 전송하기 위해 전송하고자 하는 토픽을 알고 있어야 한다.
      • 토픽을 지정하지 않고서는 데이터를 전송 ❌
      • 토픽이름은 Producer Record 인스턴스를 생성할 때 사용한다.
    2. 전송하고자 하는 카프카 클러스터 서버의 host 와 IP 를 지정
    3. Properties에는 KafkaProducer 인스턴스를 생성하기 위한 프로듀서 옵션들을 Key-Value 값으로 선언
      • 필수 옵션을 반드시 선언
      • 선택 옵션은 선언하지 않아도 된다. ( 선택 안하면 기본 옵션값으로 설정되어 동작 )
    4. 메시지 키, 값을 직렬화 하기 위한 직렬화 클래스 선언
      • 위 코드에서는 String 객체를 전송하기 위해 String 을 직렬화하는 클래스인 카프카 라이브러리의 StringSerializer를 사용
    5. Properties를 KafkaProducer의 생성 파라미터로 추가하여 인스턴스를 생성한다.
    6. producer 인스턴스는 ProducerRecord를 전송할 때 사용된다.
    7. 메시지 값 설정
    8. 카프카 브로커로 데이터를 보내기 위해 ProducerRecord를 생성한다.
      • Producer Record는 생성자를 여러개 가진다.
        • 생성자 개수에 따라 오버로딩
        • 토픽이름, 메시지 값, 메시지 키(null)
      • 메시지 키와 값의 타입은 직렬화 클래스와 동일하게 설정
    9. 생성한 ProducerRecord를 전송하기 위해 record를 파라미터로 가지는 send() 메서드 호출
      • 프로듀서에서 send()는 즉각적인 전송을 뜻 ❌
      • 파라미터로 들어간 record를 프로듀서 내부에 가지고 있다가 배치 형태로 묶어서 브로커에 전송
      ⇒ ‘배치 전송
    10. flush()를 통해 프로듀서 내부 버퍼에 가지고 있던 레코드 배치를 브로커에 전송한다.
    11. 애플리케이션 종료전 close()메소드를 호출하여 producer 인스턴스의 리소스들을 안전하게 종료한다.

    로컬에서 토픽을 생성하고 프로젝트 실행

    • 카프카 라이브러리 로그가 출력된다.
      • 카프카 프로듀서 구동시 설정한 옵션 출력
      • 카프카 클라이언트의 버전 출력
      • 전송한 ProducerRecord가 출력된다.

    프로듀서 중요 개념

    💡 프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.

    전송하고자 하는 데이터는 ProducerRecord 클래스를 통해 인스턴스를 생성했지만

    • ProducerRecord 인스턴스를 생성 필수 파라미터인 토픽과 메시지값 만 설정

    ProducerRecord 생성 시 추가 파라미터를 사용하여 오버로딩을 통해 ProducerRecord의 내부 변수를 선언할 수 도 있다.

    Kafka Producer 인스턴스가 send() 메서드를 호출하면

    • ProducerRecord는 파티셔너(partitioner)에서 토픽의 어느 파티션으로 전송될 것인지 정해진다.

    파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터(accumulator)에 데이터를 버퍼로 쌓아놓고 발송한다.

    프로듀서 API를 사용하면 ‘UniformStickyPartitioner’ 와 ‘RoundRobinPartitioner’ 2개 파티션을 제공한다.

    • 파티셔너를 지정하지 않으면 UniformStickyPartitioner가 파티셔너로 기본 설정된다.
    • 둘 다 메시지 키가 있을 때는 메시지 키의 해시값과 파티션을 매칭하여 데이터를 전송하는 것이 동일하다.
    • 메시지 키가 없을 때는 파티션에 최대한 동일하게 분배하는 로직이 들어있다.
      • UniformStickyPartitioner 가 RoundRobinPartitioner의 단점을 개선

    UniformStickPartitioner

    프로듀서 동작에 특화되어 높은 처리량과 낮은 리소스 사용률을 가지는 특징이 있다.

    RoundRobinPartitioner 은 ProducerRecord가 들어오는 대로 파티션을 순회하면서 전송하기 때문에 배치로 묶이는 빈도가 적다.

    • 될 수 있으면 많은 데이터가 배치로 묶여 전송되어야 성능 향상을 기대할 수 있다.
    • 이를 해결하고자 UniformStickyPartitioner 가 기본 파티셔너로 설정

    UniformStickyPartitioner 은 어큐뮬레이터에서 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터는 모두 동일한 파티션에 전송해 더 향상된 성능을 가지게 되었다.

    카프카 클라이언트 라이브러리에서는 사용자 지정 파티셔너를 생성하기 위한 Partitioner 인터페이스를 제공

    • sender 스레드는 어큐뮬레이터에 쌓인 배치 데이터를 가져가 카프카 브로커로 전송한다.

    프로듀서 주요 옵션

    프로듀서 애플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택옵션 이 있다.

    필수 옵션

    • bootstrap.servers
      • 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트이름, 포트 를 1개 이상 작성
    • key.serializer
      • 레코드의 메시지 키를 직렬화하는 클래스를 지정
    • value.serializer
      • 레코드의 메시지 값을 직렬화하는 클래스를 지정

    선택옵션

    • acks - 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부 확인하는데 사용
    • buffer.memory
    • retries
    • batch.size
    • linger.ms
    • partitioner.class
    • enable.idempotence
    • transactional.id

    메시지 키를 가진 데이터를 전송하는 프로듀서

    메시지 키가 포함된 레코드를 전송하고 싶다면 ProducerRecord 생성 시 파리미터로 추가해야 한다.

    커스텀 파티셔너를 가지는 프로듀서

    프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 할 때가 있다.

    → 커스텀 파티셔너를 사용

    커스텀 파티셔너를 지정한 경우 ProducerConfig의 PARTITIONER_CLASS_CONFIG 옵션을 사용자 생성 파티셔너로 설정하여 KafkaProducer 인스턴스를 생성해야 한다.

    브로커 정상 전송 여부를 확인하는 프로듀서

    KafkaProducer의 send()메서드는 Future 객체를 반환한다.

    이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어 있다.

    • get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있다.
    • 동기로 프로듀서의 전송 결과를 확인하는 것은 빠른 전송에 문제가 있을 수 있다.
    • 프로듀서는 비동기로 결과를 확인할 수 있도록 Callback 인터페이스를 제공하고 있다.
    • → 사용자는 사용자 정의 Callback 클래스를 생성하여 레코드의 전송 결과에 대응하는 로직을 만들 수 있다.
    • 비동기로 결과를 받는 경우 동기보다 더 빠른 속도로 데이터를 추가 처리할 수 있지만 전송하는 데이터의 순서가 중요한 경우 사용하면 안된다.
    • 앞서 보낸 데이터의 결과가 실패할 경우 재 전송으로 인해 데이터 순서가 역전될 수 있기 때문

    컨슈머 API

    💡 프로듀서가 전송한 데이터는 카프카 브로커에 적재된다.
    컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.

    ex)

    마케팅 문자를 고객에 보내는 기능

    • 컨슈머는 토픽으로부터 고객 데이터를 가져와서 문자 발송 처리를 하게 된다.

    카프카 컨슈머 프로젝트 생성

    컨슈머 코드

    package com.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class SimpleConsumer {
        private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
        private final static String TOPIC_NAME = "test";     - 1
        private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";   - 2
        private final static String GROUP_ID = "test-group";   - 3
    
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);   - 2
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);    -  3
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  - 4
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - 4
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); - 5
    
            consumer.subscribe(Arrays.asList(TOPIC_NAME)); - 6
    
            while (true) { - 7
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); - 8
                for (ConsumerRecord<String, String> record : records) { - 9
                    logger.info("record:{}", record);
                }
            }
        }
    }
    
    1. 컨슈머 그룹 이름 선언
      컨슈머 그룹 ❓
      • 컨슈머 그룹을 통해 컨슈머의 목적을 구분할 수 있다.
      • 컨슈머 그룹을 지정하여 동일한 역할을 하는 컨슈머를 묶어 관리
      • 컨슈머 그룹을 기준으로 컨슈머 오프셋을 관리하기 때문에 subscribe() 메서드를 사용하여 토픽을 구독하는 경우 에는 컨슈머 그룹을 선언해야 한다.
      • 컨슈머 그룹을 선언하지 않으면 어떤 그룹에도 속하지 않는 컨슈머로 동작하게 된다.
    2. 프로듀서가 직렬화하여 전송한 데이터를 역직렬화하기 위해 역직렬화 클래스를 지정
    3. Properties로 지정한 카프카 컨슈머 옵션을 파라미터로 받아 KafkaConsumer 인스턴스를 생성
      • 이 인스턴스를 통해 데이터를 가져올 수 있다.
    4. 컨슈머에게 토픽을 할당하기 위해 subscribe() 메서드를 사용
      • 이 메서드는 Collection 타입의 String값들을 받는데, 1개 이상의 토픽 이름을 받을 수 있다.
    5. 컨슈머는 poll() 메서드를 호출하여 데이터를 가져와서 처리한다.
      • 지속적으로 데이터를 처리하기 위해서 반복호출을 해야 한다.
      • 지속적으로 반복 호출하기 위한 가장 쉬운 방법은 while(true)처럼 무한 루프를 만드는 것.
      • 무한 루프 내에서 poll() 메서드를 통해 데이터를 가져오고 사용자가 원하는 데이터 처리를 수행
    6. 컨슈머는 poll() 메서드를 통해 ConsumerRecord 리스트를 반환한다.
      • poll() 메서드는 Duration 타입을 인자로 받는다.
      • 인자 값은 브로커로부터 데이터를 가져올 때 컨슈머 버퍼에 데이터를 기다리기 위한 타임아웃 간격을 뜻
    7. for loop를 통해 poll() 메서드가 반환한 ConsumerRecord 데이터들을 순차적으로 처리한다.

    카프카 컨슈머 애플리케이션이 실행되면서 카프카 라이브러리 로그가 출력됨과 동시에 컨슈머가 토픽을 구독하면서 컨슈머는 브로커로부터 polling을 시작한다.

    컨슈머 주요 개념

    토픽의 파티션으로부터 데이터를 가져가기 위해 컨슈머를 운영하는 방법은 크게 2가지

    1. 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것
    2. 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것.

    컨슈머 그룹으로 운영하는 방법

    컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식

    • 컨슈머 그룹으로 묶인 컨슈머들은 토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있다.
    • 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하다.
    • 1개 컨슈머는 여러 개의 파티션에 할당될 수 있다.

    이러한 특징으로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.

    컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있다.

    카프카는 파이프라인을 운영함에 있어 최종 적재되는 저장소의 장애에 유연하게 대응할 수 있도록 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저장소의 장애에 격리되어 운영할 수 있다.

    ❓컨슈머 그룹의 컨슈머에 장애가 발생하면 어떻게 될까

    컨슈머 그룹으로 이루어진 컨슈머들 중 일부 컨슈머에 장애가 발생하면, 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.

    리밸런싱(rebalancing) 이라고 부른다.

    리밸런싱은 크게 두가지 상황에서 일어난다.

    1. 컨슈머가 추가되는 상황
    2. 컨슈머가 제외되는 상황

    리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생할 수 있으므로 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야 한다.

    가용성을 높이면서도 안정적인 운영을 도와주는 리밸런싱은 유용하지만 자주 일어나서는 안된다.

    → 리밸런싱이 발생할 때 파티션의 소유권을 컨슈머로 재할당하는 과정에서 해당 컨슈머의 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문

    • 그룹 조정자(group coordinator)는 리밸런싱을 발동시키는 역할을 한다.
    • 컨슈머 그룹의 컨슈머가 추가되고 삭제될 때를 감지
    • 카프카 브로커 중 한 대가 그룹 조정자의 역할을 수행

    컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋(commit)을 통해 기록한다.

    • 컨슈머 동작 이슈가 발생하여 consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있다.
    • 데이터 처리의 중복이 발생하지 않게 하기 위해서는 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증해야만 한다.

    오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있다.

    • 일정 간격마다 자동으로 커밋되는 것을 비명시 ‘오프셋 커밋'이라고 부른다.
      • 편리하지만 poll() 메서드 호출 이후에 리밸런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있는 취약한 구조를 가지고 있다.
      그러므로 데이터 중복이나 유실을 허용하지 않는 서비스라면 자동 커밋을 사용해서는 안된다.

    명시적으로 오프셋을 커밋하려면 poll() 메서드 호출 이후에 반환받은 데이터의 처리가 완료되고 commitSync()메서드를 호출하면 된다.

    • commitSync()메서드는 poll()메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행
    • 하지만 정상적으로 처리되었는지 응답하기 까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다.

    → 해결 위해 commtiAsync() 메서드를 사용하여 커밋 요청을 전송하고 응답이 오기 전까지 데이터 처리를 수행할 수 있다.

    • 비동기 커밋은 커밋 요청이 실패했을 경우 현재 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수 있다.

    컨슈머 주요 옵션

    필수 옵션

    선택 옵션

    동기 오프셋 커밋

    • poll() 메서드로 받은 모든 레코드의 처리가 끝난 이후 commitSync() 메서드를 호출해야 한다.
    • 개별 레코드 단위로 매번 오프셋을 커밋하고 싶다면, commitSync() 메서드에 Map<TopicPartition, OffsetAndMetadata> 인스턴스를 파라미터로 넣으면 된다.

    비동기 오프셋 커밋

    • 동기 오프셋 커밋을 사용하면 커밋 응답을 기다리는 동안 데이터 처리가 일시적으로 중단된다
    • 더 많은 데이터를 처리하기 위해 비동기 오프셋 커밋을 사용

    동기 오프셋 커밋과 마찬가지로 poll()메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋한다.

    커밋이 완료될 때까지 응답을 기다리지 않는다.

    • 비동기 오프셋 커밋을 사용할 경우 비동기로 커밋 응답을 받기 때문에 callback 함수를 파라미터로 받아 결과를 얻을 수 있다.

    리밸런스 리스너를 가진 컨슈머

    리밸런스 발생시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 한다.

    • 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스 지원

    파티션 할당 컨슈머

    컨슈머를 운영할 때 subscribe()메서드를 사용하여 구독 형태로 사용하는 것 외에더 직접 파티션을 컨슈머에 명시적으로 할당하여 운영할 수도 있다.

    • 컨슈머가 어떤 토픽, 파티션을 할당할지 명시적으로 선언할 때는 assign()메서드를 사용하면 된다.
    • 직접 컨슈머가 특정 토픽, 특정 파티션에 할당되므로 리밸런싱 하는 과정이 없다.

    컨슈머에 할당된 파티션 확인 방법

    컨슈머에 할당된 토픽과 파티션에 대한 정보는 assignment() 메서드로 확인할 수 있다.

    컨슈머의 안전한 종료

    컨슈머를 안전하게 종료하기 위해 KafkaConsumer 클래스는 wakeup()메서드르 지원한다.

    마지막에는 close() 메서드를 호출하여 카프카 클러스터에 컨슈머가 안전하게 종료되었음을 명시적으로 알려주면 종료가 완료

    어드민 API

    💡 실제 운영환경에서는 프로듀서와 컨슈머를 통해 데이터를 주고받는 것만큼 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 중요

    카프카 클라이언트에서는 내부 옵션들을 설정하거나 조회 하기 위해 AdminClient 클래스를 제공한다.

    • 이를 활용하면 클러스터의 옵션과 관련된 부분을 자동화할 수 있다.

    프로듀서, 컨슈머 API와 다르게 추가 설정없이 클러스터 정보에 대한 설정만 하면 된다.

    create() 메서드로 KafkaAdminClient를 반환받는다.

    어드민 API를 활용할 때 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해야 한다.

     

    참고

     

    아파치 카프카 애플리케이션 프로그래밍 with 자바 - YES24

    아파치 카프카 애플리케이션 개발을 위한 「실전 가이드」아파치 카프카란 무엇일까? 카프카 애플리케이션은 어떻게 만들까? 데이터 파이프라인을 만들기 위해 어떤 카프카 라이브러리를 사용

    www.yes24.com

     

    댓글

Designed by Tistory.