-
Kafka 스트림즈 구현📕 book/아파치 카프카 2022. 9. 18. 18:01
스트림즈 DSL을 이용하여 구현
스트림즈 DSL - stream(), to()
특정 토픽을 KStream 형태로 가져오려면 스트림즈 DSL의 stream() 메서드를 사용하면 된다.
KStream의 데이터를 특정 토픽으로 저장하려면 스트림즈 DSL의 to() 메서드를 사용하면 된다.
build.gradle
스트림즈 애플리케이션을 개발하기 위한 라이브러리 추가
dependencies { compile 'org.apache.kafka:kafka-streams:2.5.0' }
SimpleStreamApplication.java
스트림 프로세싱을 위한 코드를 스트림즈DSL로 작성
package com.example; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class SimpleStreamApplication { private static String APPLICATION_NAME = "streams-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String STREAM_LOG = "stream_log"; private static String STREAM_LOG_COPY = "stream_log_copy"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> stream = builder.stream(STREAM_LOG); stream.to(STREAM_LOG_COPY); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
- 스트림즈 애플리케이션은 애플리케이션 아이디를 지정해야 한다(application.id)
- 애플리케이션 아이디 값을 기준으로 병렬처리하기 때문
- 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보를 입력
- 카프카 브로커 호스트와 포트 정보 입력
- 스트림 처리를 위한 메시지 키와 메시지 값의 역직렬화, 직렬화 방식 지정
- stream_log 토픽으로부터 KStream 객체를 만들기 위해 StreamBuilder의 stream() 메서드 사용
- StreamBuilder는 stream() 외에
- KTable을 만드는 table()
- GlobalKTable을 만드는 globalTable() 메서드도 지원한다.
- 이 메서드들은 최초 토픽 데이터를 가져오는 소스 프로세서
- KStream 객체를 다른 토픽으로 전송하기 위해 to() 메서드를 사용
- KStream 인스턴스의 데이터들을 특정 토픽으로 저장하기 위한 용도로 사용
- to() → 싱크 프로세서
- StreamBuilder로 정의한 토폴로지에 대한 정보와 스트림즈 실행을 위한 기본 옵션을 파라미터로 KafkaStreams 인스턴스를 생성
- 이 인스턴스를 실행하려면 start() 메서드 사용
스트림즈 DSL - filter()
메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 떄 filter() 메서드를 사용
- 스트림즈 DSL에서 사용 가능한 필터링 스트림 프로세서
스트림즈 DSL - KTable와 KStream을 Join()
KTable과 KStream을 함께 사용하는 경우
- KTable과 KStream은 메시지 키를 기준으로 조인할 수 있다.
데이터베이스는 정적으로 저장된 데이터를 조인하여 사용했지만, 카프카에서는 실시간으로 들어오는 데이터들을 조인할 수 있다.
⇒ 사용자의 이벤트 데이터를 데이터베이스에 저장하지 않고도 조인하여 스트리밍 처리할 수 있다는 장점 존재
→ 이벤트 기반 스트리밍 데이터 파이프라인 구성 가능
KStreamJoinKTable.java
package com.example; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import java.util.Properties; public class KStreamJoinKTable { private static String APPLICATION_NAME = "order-join-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String ADDRESS_TABLE = "address"; private static String ORDER_STREAM = "order"; private static String ORDER_JOIN_STREAM = "order_join"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KTable<String, String> addressTable = builder.table(ADDRESS_TABLE); KStream<String, String> orderStream = builder.stream(ORDER_STREAM); orderStream.join(addressTable, (order, address) -> order + " send to " + address).to(ORDER_JOIN_STREAM); KafkaStreams streams; streams = new KafkaStreams(builder.build(), props); streams.start(); } } // // //./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address --property "parse.key=true" --property "key.separator=:" // >wonyoung:Seoul // >somin:Newyork // >wonyoung:Seoul // >somin:Newyork // // ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic order --property "parse.key=true" --property "key.separator=:" // >somin:cup // >somin:cup // >wonyoung:iPhone // // ./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic order_join --from-beginning // cup send to Newyork // cup send to Newyork // cup send to Newyork // iPhone send to Busan //
- 토픽을 KTable로 가져올 때 table() 메서드를 소스 프로세서 로 사용
- KStream으로 가져올 때 stream() 메서드를 소스 프로세서 로 사용
- 조인을 위해 KStream 인스턴스에 정의되어 있는 join() 메서드를 사용
- 첫 번째 파라미터로 조인을 수행할 KTable 인스턴스를 넣는다.
- 스트림 프로세서
- KStream 과 KTable에서 동일한 메시지 키를 가진 데이터를 찾았을 경우, 각각의 메시지 값을 조합해서 어떤 데이터를 만들기 결정
- 조인을 통해 생성된 데이터를 다른 토픽에 저장하기 위해 to() 싱크 프로세서 를 사용
스트림즈 DSL - GlobalKTable와 KStream을 Join()
코파티셔닝이 되어 있지 않은 토픽을 조인해야 할 경우?!?!
- 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리
- KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용
KStreamJoinGlobalTable.java
package com.example; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class KStreamJoinGlobalKTable { private static String APPLICATION_NAME = "global-table-join-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String ADDRESS_GLOBAL_TABLE = "address_v2"; private static String ORDER_STREAM = "order"; private static String ORDER_JOIN_STREAM = "order_join"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE); KStream<String, String> orderStream = builder.stream(ORDER_STREAM); orderStream.join(addressGlobalTable, (orderKey, orderValue) -> orderKey, (order, address) -> order + " send to " + address) .to(ORDER_JOIN_STREAM); KafkaStreams streams; streams = new KafkaStreams(builder.build(), props); streams.start(); } } // // ./kafka-topics.sh --bootstrap-server my-kafka:9092 --create --partitions 2 --topic address_v2 // WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. // Created topic address_v2. // // // // ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address_v2 --property "parse.key=true" --property "key.separator=:" // >wonyoung:Jeju // // //./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic address --property "parse.key=true" --property "key.separator=:" // >wonyoung:Seoul // >somin:Newyork // >wonyoung:Seoul // >somin:Newyork // // ./kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic order --property "parse.key=true" --property "key.separator=:" // >somin:cup // >somin:cup // >wonyoung:iPhone // // ./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic order_join --from-beginning // cup send to Newyork // cup send to Newyork // cup send to Newyork // iPhone send to Busan //
- 서로 다른 파티션으로 이루어진 토픽을 정의
- 토픽을 GlobalKTable로 정의하기 위해 globalTable() 메서드를 사용
- KStream으로 사용하기 위해 stream() 메서드 사용
- GlobalKTable을 조인하기 위해 join() 메서드를 사용
- GlobalKTable 은 KTable의 조인과 다르게 레코드를 매칭할 때 KStream의 메시지 키와 메시지 값 둘 다 사용할 수 있다.
- 예: KStream의 메시지 값을 GlobalKTable의 메시지 키와 조인할 수 있다.
GlobalKTable로 조인한 결과와 KTable과 조인한 결과는 크게 다르지 않아 보인다.
- 하지만 GlobalKTable로 선언한 토픽은 토픽에 존재하는 모든 데이터를 태스크마다 저장하고 조인 처리를 수행하는 점이 다르다.
- 그리고 조인을 수행할 때 마다 KStream 메시지 키뿐만 아니라 메시지 값을 기준으로도 매칭하여 조인할 수 있는 점도 다르다.
프로세서 API를 이용하여 구현
build.gradle
dependencies { compile 'org.apache.kafka:kafka-streams:2.5.0' }
스트림즈 DSL에서는 filter() 메서드를 스트림 프로세서로 사용해서 구현할 수 있었지만,
프로세서 API에서 동일한 로직을 구현하기 위해서 스트림 프로세서 역할을 하는 클래스를 만들어 줘야 한다.
FilterProcessor.java
필터링 처리 코드
package com.example; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.Processor; public class FilterProcessor implements Processor<String, String> { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(String key, String value) { if (value.length() > 5) { context.forward(key, value); } context.commit(); } @Override public void close() { } }
- 스트림 프로세서 클래스를 생성하기 위해서는 kafka-streams 라이브러리에서 제공하는 Processor 또는 Transformer 인터페이스를 사용해야 한다.
- ProcessorContext 클래스는 프로세서에 대한 정보를 담고 있다.
- ProcessorContext 클래스로 생성된 인스턴스로 현재 스트림 처리 중인 토폴로지의 토픽 정보, 애플리케이션 아이디를 조회할 수 있다.
- init() 메서드는 스트림 프로세서의 생성자
- ProcessorContext를 파라미터로 받아서 초기화
- forward() - 필터링된 데이터의 경우 이 메서드를 사용해 다음 토폴로지(다음 프로세서)로 넘어가도록 함.
- commit() - 처리가 완료된 이후, 메서드를 호출하여 명시적으로 데이터가 처리되었음을 선언
- close() - FilterProcessor가 종료되기 전에 호출되는 메서드
SimpleKafkaProcessor.java
package com.example; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; public class SimpleKafkaProcessor { private static String APPLICATION_NAME = "processor-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String STREAM_LOG = "stream_log"; private static String STREAM_LOG_FILTER = "stream_log_filter"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); Topology topology = new Topology(); topology.addSource("Source", STREAM_LOG) .addProcessor("Process", () -> new FilterProcessor(), "Source") .addSink("Sink", STREAM_LOG_FILTER, "Process"); KafkaStreams streaming = new KafkaStreams(topology, props); streaming.start(); } }
- Topology 클래스는 프로세서 API를 사용한 토폴로지를 구성하기 위해 사용
- 토픽을 소스 프로세서로 가져오기 위해 addSource() 메서드를 사용
- 첫 번째 파라미터 - 소스 프로세서의 이름
- 두 번째 파라미터 - 대상 토픽 이름
- 스트림 프로세서를 사용하기 위해 addProcessor() 메서드를 사용
- 첫 번째 파라미터 - 스트림 프로세서의 이름
- 두 번째 파라미터 - 저장할 토픽의 이름을 입력
- 세 번째 파라미터 - 부모 노드를 입력, 필터링 처리가 완료된 데이터를 저장해야 하므로
- 작성 완료한 topology 인스턴스를 KafkaStreams 인스턴스의 파라미터로 넣어서 스트림즈를 생성하고 실행할 수 있다.
출처
'📕 book > 아파치 카프카' 카테고리의 다른 글
4장. 카프카 상세 개념 - 토픽과 파티션 (0) 2023.01.17 2장 - 카프카 빠르게 시작해보기 (0) 2023.01.12 3장 - 카프카 스트림즈 (0) 2022.09.18 3장 - 카프카 기본 개념 (0) 2022.09.18 1장 - 들어가며 (0) 2022.09.18 - 스트림즈 애플리케이션은 애플리케이션 아이디를 지정해야 한다(application.id)