ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka 스트림즈 구현
    📕 book/아파치 카프카 2022. 9. 18. 18:01

    스트림즈 DSL을 이용하여 구현


    스트림즈 DSL - stream(), to()

    특정 토픽을 KStream 형태로 가져오려면 스트림즈 DSL의 stream() 메서드를 사용하면 된다.

    KStream의 데이터를 특정 토픽으로 저장하려면 스트림즈 DSL의 to() 메서드를 사용하면 된다.

    build.gradle

    스트림즈 애플리케이션을 개발하기 위한 라이브러리 추가

    dependencies {
    	compile 'org.apache.kafka:kafka-streams:2.5.0'
    }
    

    SimpleStreamApplication.java

    스트림 프로세싱을 위한 코드를 스트림즈DSL로 작성

    package com.example;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    
    import java.util.Properties;
    
    public class SimpleStreamApplication {
    
        private static String APPLICATION_NAME = "streams-application";
        private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
        private static String STREAM_LOG = "stream_log";
        private static String STREAM_LOG_COPY = "stream_log_copy";
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> stream = builder.stream(STREAM_LOG);
    
            stream.to(STREAM_LOG_COPY);
    
            KafkaStreams streams = new KafkaStreams(builder.build(), props);
            streams.start();
    
        }
    }
    
    • 스트림즈 애플리케이션은 애플리케이션 아이디를 지정해야 한다(application.id)
      • 애플리케이션 아이디 값을 기준으로 병렬처리하기 때문
    • 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보를 입력
      • 카프카 브로커 호스트와 포트 정보 입력
    • 스트림 처리를 위한 메시지 키와 메시지 값의 역직렬화, 직렬화 방식 지정
    • stream_log 토픽으로부터 KStream 객체를 만들기 위해 StreamBuilder의 stream() 메서드 사용
      • StreamBuilder는 stream() 외에
      • KTable을 만드는 table()
      • GlobalKTable을 만드는 globalTable() 메서드도 지원한다.
      • 이 메서드들은 최초 토픽 데이터를 가져오는 소스 프로세서
    • KStream 객체를 다른 토픽으로 전송하기 위해 to() 메서드를 사용
      • KStream 인스턴스의 데이터들을 특정 토픽으로 저장하기 위한 용도로 사용
      • to()싱크 프로세서
    • StreamBuilder로 정의한 토폴로지에 대한 정보와 스트림즈 실행을 위한 기본 옵션을 파라미터로 KafkaStreams 인스턴스를 생성
      • 이 인스턴스를 실행하려면 start() 메서드 사용

    스트림즈 DSL - filter()

    메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 떄 filter() 메서드를 사용

    • 스트림즈 DSL에서 사용 가능한 필터링 스트림 프로세서

    스트림즈 DSL - KTable와 KStream을 Join()

    KTable과 KStream을 함께 사용하는 경우

    • KTable과 KStream은 메시지 키를 기준으로 조인할 수 있다.

    데이터베이스는 정적으로 저장된 데이터를 조인하여 사용했지만, 카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있다.

    ⇒ 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있다는 장점 존재

    → 이벤트 기반 스트리밍 데이터 파이프라인 구성 가능

    KStreamJoinKTable.java

    package com.example;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.KTable;
    
    import java.util.Properties;
    
    public class KStreamJoinKTable {
    
        private static String APPLICATION_NAME = "order-join-application";
        private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
        private static String ADDRESS_TABLE = "address";
        private static String ORDER_STREAM = "order";
        private static String ORDER_JOIN_STREAM = "order_join";
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            StreamsBuilder builder = new StreamsBuilder();
            KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
            KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
    
            orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM);
    
            KafkaStreams streams;
            streams = new KafkaStreams(builder.build(), props);
            streams.start();
    
        }
    }
    //
    //
    //./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address --property "parse.key=true" --property "key.separator=:"
    //        >wonyoung:Seoul
    //        >somin:Newyork
    //        >wonyoung:Seoul
    //        >somin:Newyork
    //
    //        ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic order --property "parse.key=true" --property "key.separator=:"
    //        >somin:cup
    //        >somin:cup
    //        >wonyoung:iPhone
    //
    //        ./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic order_join --from-beginning
    //        cup send to Newyork
    //        cup send to Newyork
    //        cup send to Newyork
    //        iPhone send to Busan
    //
    
    • 토픽을 KTable로 가져올 때 table() 메서드를 소스 프로세서 로 사용
    • KStream으로 가져올 때 stream() 메서드를 소스 프로세서 로 사용
    • 조인을 위해 KStream 인스턴스에 정의되어 있는 join() 메서드를 사용
      • 첫 번째 파라미터로 조인을 수행할 KTable 인스턴스를 넣는다.
      • 스트림 프로세서
    • KStream 과 KTable에서 동일한 메시지 키를 가진 데이터를 찾았을 경우, 각각의 메시지 값을 조합해서 어떤 데이터를 만들기 결정
    • 조인을 통해 생성된 데이터를 다른 토픽에 저장하기 위해 to() 싱크 프로세서 를 사용

    스트림즈 DSL - GlobalKTable와 KStream을 Join()

    코파티셔닝이 되어 있지 않은 토픽을 조인해야 할 경우?!?!

    1. 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리
    2. KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용

    KStreamJoinGlobalTable.java

    package com.example;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.GlobalKTable;
    import org.apache.kafka.streams.kstream.KStream;
    
    import java.util.Properties;
    
    public class KStreamJoinGlobalKTable {
    
        private static String APPLICATION_NAME = "global-table-join-application";
        private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
        private static String ADDRESS_GLOBAL_TABLE = "address_v2";
        private static String ORDER_STREAM = "order";
        private static String ORDER_JOIN_STREAM = "order_join";
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            StreamsBuilder builder = new StreamsBuilder();
            GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
            KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
    
            orderStream.join(addressGlobalTable,
                    (orderKey, orderValue) -> orderKey,
                    (order, address) -> order + " send to " + address)
                    .to(ORDER_JOIN_STREAM);
    
            KafkaStreams streams;
            streams = new KafkaStreams(builder.build(), props);
            streams.start();
    
        }
    }
    
    //
    // ./kafka-topics.sh --bootstrap-server my-kafka:9092 --create --partitions 2 --topic address_v2
    //         WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
    //         Created topic address_v2.
    //
    //
    //
    //        ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address_v2 --property "parse.key=true" --property "key.separator=:"
    //        >wonyoung:Jeju
    
    //
    //
    //./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address --property "parse.key=true" --property "key.separator=:"
    //        >wonyoung:Seoul
    //        >somin:Newyork
    //        >wonyoung:Seoul
    //        >somin:Newyork
    //
    //        ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic order --property "parse.key=true" --property "key.separator=:"
    //        >somin:cup
    //        >somin:cup
    //        >wonyoung:iPhone
    //
    //        ./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic order_join --from-beginning
    //        cup send to Newyork
    //        cup send to Newyork
    //        cup send to Newyork
    //        iPhone send to Busan
    //
    
    • 서로 다른 파티션으로 이루어진 토픽을 정의
    • 토픽을 GlobalKTable로 정의하기 위해 globalTable() 메서드를 사용
    • KStream으로 사용하기 위해 stream() 메서드 사용
    • GlobalKTable을 조인하기 위해 join() 메서드를 사용
    • GlobalKTable 은 KTable의 조인과 다르게 레코드를 매칭할 때 KStream의 메시지 키와 메시지 값 둘 다 사용할 수 있다.
      • 예: KStream의 메시지 값을 GlobalKTable의 메시지 키와 조인할 수 있다.

    GlobalKTable로 조인한 결과와 KTable과 조인한 결과는 크게 다르지 않아 보인다.

    • 하지만 GlobalKTable로 선언한 토픽은 토픽에 존재하는 모든 데이터를 태스크마다 저장하고 조인 처리를 수행하는 점이 다르다.
    • 그리고 조인을 수행할 때 마다 KStream 메시지 키뿐만 아니라 메시지 값을 기준으로도 매칭하여 조인할 수 있는 점도 다르다.

    프로세서 API를 이용하여 구현


    build.gradle

    dependencies {
    	compile 'org.apache.kafka:kafka-streams:2.5.0'
    }
    

    스트림즈 DSL에서는 filter() 메서드를 스트림 프로세서로 사용해서 구현할 수 있었지만,

    프로세서 API에서 동일한 로직을 구현하기 위해서 스트림 프로세서 역할을 하는 클래스를 만들어 줘야 한다.

    FilterProcessor.java

    필터링 처리 코드

    package com.example;
    
    import org.apache.kafka.streams.processor.ProcessorContext;
    
    import org.apache.kafka.streams.processor.Processor;
    
    public class FilterProcessor implements Processor<String, String> {
    
        private ProcessorContext context;
    
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }
    
        @Override
        public void process(String key, String value) {
            if (value.length() > 5) {
                context.forward(key, value);
            }
            context.commit();
        }
    
        @Override
        public void close() {
        }
    
    }
    
    • 스트림 프로세서 클래스를 생성하기 위해서는 kafka-streams 라이브러리에서 제공하는 Processor 또는 Transformer 인터페이스를 사용해야 한다.
    • ProcessorContext 클래스는 프로세서에 대한 정보를 담고 있다.
      • ProcessorContext 클래스로 생성된 인스턴스로 현재 스트림 처리 중인 토폴로지의 토픽 정보, 애플리케이션 아이디를 조회할 수 있다.
    • init() 메서드는 스트림 프로세서의 생성자
      • ProcessorContext를 파라미터로 받아서 초기화
    • forward() - 필터링된 데이터의 경우 이 메서드를 사용해 다음 토폴로지(다음 프로세서)로 넘어가도록 함.
    • commit() - 처리가 완료된 이후, 메서드를 호출하여 명시적으로 데이터가 처리되었음을 선언
    • close() - FilterProcessor가 종료되기 전에 호출되는 메서드

    SimpleKafkaProcessor.java

    package com.example;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    
    import java.util.Properties;
    
    public class SimpleKafkaProcessor {
    
        private static String APPLICATION_NAME = "processor-application";
        private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
        private static String STREAM_LOG = "stream_log";
        private static String STREAM_LOG_FILTER = "stream_log_filter";
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
            Topology topology = new Topology();
            topology.addSource("Source",
                            STREAM_LOG)
                    .addProcessor("Process",
                            () -> new FilterProcessor(),
                            "Source")
                    .addSink("Sink",
                            STREAM_LOG_FILTER,
                            "Process");
    
            KafkaStreams streaming = new KafkaStreams(topology, props);
            streaming.start();
        }
    }
    
    • Topology 클래스는 프로세서 API를 사용한 토폴로지를 구성하기 위해 사용
    • 토픽을 소스 프로세서로 가져오기 위해 addSource() 메서드를 사용
      • 첫 번째 파라미터 - 소스 프로세서의 이름
      • 두 번째 파라미터 - 대상 토픽 이름
    • 스트림 프로세서를 사용하기 위해 addProcessor() 메서드를 사용
      • 첫 번째 파라미터 - 스트림 프로세서의 이름
      • 두 번째 파라미터 - 저장할 토픽의 이름을 입력
      • 세 번째 파라미터 - 부모 노드를 입력, 필터링 처리가 완료된 데이터를 저장해야 하므로
    • 작성 완료한 topology 인스턴스를 KafkaStreams 인스턴스의 파라미터로 넣어서 스트림즈를 생성하고 실행할 수 있다.

    출처

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

     

    아파치 카프카 애플리케이션 프로그래밍 with 자바 - YES24

    아파치 카프카 애플리케이션 개발을 위한 「실전 가이드」아파치 카프카란 무엇일까? 카프카 애플리케이션은 어떻게 만들까? 데이터 파이프라인을 만들기 위해 어떤 카프카 라이브러리를 사용

    www.yes24.com

     

    댓글

Designed by Tistory.