📕 book/아파치 카프카

Kafka 스트림즈 구현

beomsic 2022. 9. 18. 18:01

스트림즈 DSL을 이용하여 구현


스트림즈 DSL - stream(), to()

특정 토픽을 KStream 형태로 가져오려면 스트림즈 DSL의 stream() 메서드를 사용하면 된다.

KStream의 데이터를 특정 토픽으로 저장하려면 스트림즈 DSL의 to() 메서드를 사용하면 된다.

build.gradle

스트림즈 애플리케이션을 개발하기 위한 라이브러리 추가

dependencies {
	compile 'org.apache.kafka:kafka-streams:2.5.0'
}

SimpleStreamApplication.java

스트림 프로세싱을 위한 코드를 스트림즈DSL로 작성

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class SimpleStreamApplication {

    private static String APPLICATION_NAME = "streams-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_COPY = "stream_log_copy";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(STREAM_LOG);

        stream.to(STREAM_LOG_COPY);

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}
  • 스트림즈 애플리케이션은 애플리케이션 아이디를 지정해야 한다(application.id)
    • 애플리케이션 아이디 값을 기준으로 병렬처리하기 때문
  • 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보를 입력
    • 카프카 브로커 호스트와 포트 정보 입력
  • 스트림 처리를 위한 메시지 키와 메시지 값의 역직렬화, 직렬화 방식 지정
  • stream_log 토픽으로부터 KStream 객체를 만들기 위해 StreamBuilder의 stream() 메서드 사용
    • StreamBuilder는 stream() 외에
    • KTable을 만드는 table()
    • GlobalKTable을 만드는 globalTable() 메서드도 지원한다.
    • 이 메서드들은 최초 토픽 데이터를 가져오는 소스 프로세서
  • KStream 객체를 다른 토픽으로 전송하기 위해 to() 메서드를 사용
    • KStream 인스턴스의 데이터들을 특정 토픽으로 저장하기 위한 용도로 사용
    • to()싱크 프로세서
  • StreamBuilder로 정의한 토폴로지에 대한 정보와 스트림즈 실행을 위한 기본 옵션을 파라미터로 KafkaStreams 인스턴스를 생성
    • 이 인스턴스를 실행하려면 start() 메서드 사용

스트림즈 DSL - filter()

메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 떄 filter() 메서드를 사용

  • 스트림즈 DSL에서 사용 가능한 필터링 스트림 프로세서

스트림즈 DSL - KTable와 KStream을 Join()

KTable과 KStream을 함께 사용하는 경우

  • KTable과 KStream은 메시지 키를 기준으로 조인할 수 있다.

데이터베이스는 정적으로 저장된 데이터를 조인하여 사용했지만, 카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있다.

⇒ 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있다는 장점 존재

→ 이벤트 기반 스트리밍 데이터 파이프라인 구성 가능

KStreamJoinKTable.java

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Properties;

public class KStreamJoinKTable {

    private static String APPLICATION_NAME = "order-join-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String ADDRESS_TABLE = "address";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM);

        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}
//
//
//./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address --property "parse.key=true" --property "key.separator=:"
//        >wonyoung:Seoul
//        >somin:Newyork
//        >wonyoung:Seoul
//        >somin:Newyork
//
//        ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic order --property "parse.key=true" --property "key.separator=:"
//        >somin:cup
//        >somin:cup
//        >wonyoung:iPhone
//
//        ./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic order_join --from-beginning
//        cup send to Newyork
//        cup send to Newyork
//        cup send to Newyork
//        iPhone send to Busan
//
  • 토픽을 KTable로 가져올 때 table() 메서드를 소스 프로세서 로 사용
  • KStream으로 가져올 때 stream() 메서드를 소스 프로세서 로 사용
  • 조인을 위해 KStream 인스턴스에 정의되어 있는 join() 메서드를 사용
    • 첫 번째 파라미터로 조인을 수행할 KTable 인스턴스를 넣는다.
    • 스트림 프로세서
  • KStream 과 KTable에서 동일한 메시지 키를 가진 데이터를 찾았을 경우, 각각의 메시지 값을 조합해서 어떤 데이터를 만들기 결정
  • 조인을 통해 생성된 데이터를 다른 토픽에 저장하기 위해 to() 싱크 프로세서 를 사용

스트림즈 DSL - GlobalKTable와 KStream을 Join()

코파티셔닝이 되어 있지 않은 토픽을 조인해야 할 경우?!?!

  1. 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리
  2. KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용

KStreamJoinGlobalTable.java

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class KStreamJoinGlobalKTable {

    private static String APPLICATION_NAME = "global-table-join-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String ADDRESS_GLOBAL_TABLE = "address_v2";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressGlobalTable,
                (orderKey, orderValue) -> orderKey,
                (order, address) -> order + " send to " + address)
                .to(ORDER_JOIN_STREAM);

        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}

//
// ./kafka-topics.sh --bootstrap-server my-kafka:9092 --create --partitions 2 --topic address_v2
//         WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
//         Created topic address_v2.
//
//
//
//        ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address_v2 --property "parse.key=true" --property "key.separator=:"
//        >wonyoung:Jeju

//
//
//./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address --property "parse.key=true" --property "key.separator=:"
//        >wonyoung:Seoul
//        >somin:Newyork
//        >wonyoung:Seoul
//        >somin:Newyork
//
//        ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic order --property "parse.key=true" --property "key.separator=:"
//        >somin:cup
//        >somin:cup
//        >wonyoung:iPhone
//
//        ./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic order_join --from-beginning
//        cup send to Newyork
//        cup send to Newyork
//        cup send to Newyork
//        iPhone send to Busan
//
  • 서로 다른 파티션으로 이루어진 토픽을 정의
  • 토픽을 GlobalKTable로 정의하기 위해 globalTable() 메서드를 사용
  • KStream으로 사용하기 위해 stream() 메서드 사용
  • GlobalKTable을 조인하기 위해 join() 메서드를 사용
  • GlobalKTable 은 KTable의 조인과 다르게 레코드를 매칭할 때 KStream의 메시지 키와 메시지 값 둘 다 사용할 수 있다.
    • 예: KStream의 메시지 값을 GlobalKTable의 메시지 키와 조인할 수 있다.

GlobalKTable로 조인한 결과와 KTable과 조인한 결과는 크게 다르지 않아 보인다.

  • 하지만 GlobalKTable로 선언한 토픽은 토픽에 존재하는 모든 데이터를 태스크마다 저장하고 조인 처리를 수행하는 점이 다르다.
  • 그리고 조인을 수행할 때 마다 KStream 메시지 키뿐만 아니라 메시지 값을 기준으로도 매칭하여 조인할 수 있는 점도 다르다.

프로세서 API를 이용하여 구현


build.gradle

dependencies {
	compile 'org.apache.kafka:kafka-streams:2.5.0'
}

스트림즈 DSL에서는 filter() 메서드를 스트림 프로세서로 사용해서 구현할 수 있었지만,

프로세서 API에서 동일한 로직을 구현하기 위해서 스트림 프로세서 역할을 하는 클래스를 만들어 줘야 한다.

FilterProcessor.java

필터링 처리 코드

package com.example;

import org.apache.kafka.streams.processor.ProcessorContext;

import org.apache.kafka.streams.processor.Processor;

public class FilterProcessor implements Processor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        if (value.length() > 5) {
            context.forward(key, value);
        }
        context.commit();
    }

    @Override
    public void close() {
    }

}
  • 스트림 프로세서 클래스를 생성하기 위해서는 kafka-streams 라이브러리에서 제공하는 Processor 또는 Transformer 인터페이스를 사용해야 한다.
  • ProcessorContext 클래스는 프로세서에 대한 정보를 담고 있다.
    • ProcessorContext 클래스로 생성된 인스턴스로 현재 스트림 처리 중인 토폴로지의 토픽 정보, 애플리케이션 아이디를 조회할 수 있다.
  • init() 메서드는 스트림 프로세서의 생성자
    • ProcessorContext를 파라미터로 받아서 초기화
  • forward() - 필터링된 데이터의 경우 이 메서드를 사용해 다음 토폴로지(다음 프로세서)로 넘어가도록 함.
  • commit() - 처리가 완료된 이후, 메서드를 호출하여 명시적으로 데이터가 처리되었음을 선언
  • close() - FilterProcessor가 종료되기 전에 호출되는 메서드

SimpleKafkaProcessor.java

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;

public class SimpleKafkaProcessor {

    private static String APPLICATION_NAME = "processor-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_FILTER = "stream_log_filter";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        Topology topology = new Topology();
        topology.addSource("Source",
                        STREAM_LOG)
                .addProcessor("Process",
                        () -> new FilterProcessor(),
                        "Source")
                .addSink("Sink",
                        STREAM_LOG_FILTER,
                        "Process");

        KafkaStreams streaming = new KafkaStreams(topology, props);
        streaming.start();
    }
}
  • Topology 클래스는 프로세서 API를 사용한 토폴로지를 구성하기 위해 사용
  • 토픽을 소스 프로세서로 가져오기 위해 addSource() 메서드를 사용
    • 첫 번째 파라미터 - 소스 프로세서의 이름
    • 두 번째 파라미터 - 대상 토픽 이름
  • 스트림 프로세서를 사용하기 위해 addProcessor() 메서드를 사용
    • 첫 번째 파라미터 - 스트림 프로세서의 이름
    • 두 번째 파라미터 - 저장할 토픽의 이름을 입력
    • 세 번째 파라미터 - 부모 노드를 입력, 필터링 처리가 완료된 데이터를 저장해야 하므로
  • 작성 완료한 topology 인스턴스를 KafkaStreams 인스턴스의 파라미터로 넣어서 스트림즈를 생성하고 실행할 수 있다.

출처

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

아파치 카프카 애플리케이션 프로그래밍 with 자바 - YES24

아파치 카프카 애플리케이션 개발을 위한 「실전 가이드」아파치 카프카란 무엇일까? 카프카 애플리케이션은 어떻게 만들까? 데이터 파이프라인을 만들기 위해 어떤 카프카 라이브러리를 사용

www.yes24.com