-
Spring Batch - ItemProcessor🌱 spring/🚛 spring batch 2022. 12. 16. 15:53
📌 ItemProcessor
ItemProcessor는 필수가 아니다 ❗
- 데이터를 가공하거나 필터링하는 역할
- 이는 Writer에서도 구현이 가능하다.
- 그래도 사용하는 이유는 비즈니스 코드가 섞이는 것을 방지하기 위해서이다.
배치 어플리케이션에서 비즈니스 로직을 추가할 때는 먼저 Processor를 고려해보자❗
- ( 읽기 / 처리 / 쓰기 )를 분리할 수 있는 좋은 방법이다.
ItemProcessor는 Reader에서 넘겨준 개별 데이터를 가공 및 처리해준다.
사용하는 두 가지 방법
- 변환
- Reader에서 읽은 데이터를 원하는 타입으로 변환해서 Writer에 넘겨 준다.
- 필터
- Reader에서 넘겨준 데이터를 Writer로 넘겨줄 것인지를 결정한다.
- null 을 반환하면 Writer에 전달되지 않음.
🤷♂️ 기본 사용법
ItemProcessor 인터페이스는 두 개의 제네릭 타입이 필요하다.
- I : ItemReader에서 받을 데이터 타입
- O : ItemWriter에 보낼 데이터 타입
구현할 메소드는 process() 하나이다.
❕자바 8 부터 인터페이스의 추상 메소드가 1개일 경우 람다식 사용가능
- ItemProcessor도 추상 메서드가 process() 하나로 람다식을 사용할 수 있다.
- 따라서 ItemProcessor를 익명 클래스 혹은 람다식으로 자주 사용한다.
@Bean(BEAN_PREFIX + "processor") @StepScope public ItemProcessor<ReadType, WriteType> processor() { return item -> { item.convert(); return item; }; }
익명 클래스 혹은 람다식 사용 이유
- 불필요한 코드가 없어 코드 양이 적다
- 고정된 형태가 없어 원하는 형태의 어떤 처리도 가능하다
단점
- Batch Config 클래스 안에 코드가 포함되어 Batch Config 코드의 양이 많아 질 수 있다.
💨 변환
Reader에서 읽은 타입을 변환하여 Wrtier에게 전달
@Slf4j @RequiredArgsConstructor @Configuration public class ProcessorConvertJobConfiguration { public static final String JOB_NAME = "ProcessorConvertBatch"; public static final String BEAN_PREFIX = JOB_NAME + "_"; private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory emf; @Value("${chunkSize:1000}") private int chunkSize; @Bean(JOB_NAME) public Job job() { return jobBuilderFactory.get(JOB_NAME) .preventRestart() .start(step()) .build(); } @Bean(BEAN_PREFIX + "step") @JobScope public Step step() { return stepBuilderFactory.get(BEAN_PREFIX + "step") .<Pay, String>chunk(chunkSize) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } @Bean public JpaPagingItemReader<Pay> reader() { return new JpaPagingItemReaderBuilder<Pay>() .name(BEAN_PREFIX+"reader") .entityManagerFactory(emf) .pageSize(chunkSize) .queryString("SELECT p FROM Pay p") .build(); } @Bean public ItemProcessor<Pay, String> processor() { return pay -> { return pay.getTxName(); }; } private ItemWriter<String> writer() { return items -> { for (String item : items) { log.info("Pay Name={}", item); } }; } }
- Pay 클래스를 읽어와 name 필드(String 타입)을 Writer에 넘긴다.
- Reader에서 읽어올 타입 : Pay
- Writer에 전달할 타입 : String
- 따라서 제네릭 타입은 <Pay, String> 이다.
ChunkSize 앞에 선언될 타입 역시 Reader와 Writer 타입을 따라가야 한다
.<Pay, String>chunk(chunkSize)
실행
- Processor를 거쳐 Pay 클래스가 String으로 변환되었다.
👀 필터
Writer에 값을 넘길지 말지 Processor에서 판단한다.
@Slf4j @RequiredArgsConstructor @Configuration public class ProcessorNullJobConfiguration { public static final String JOB_NAME = "processorNullBatch"; public static final String BEAN_PREFIX = JOB_NAME + "_"; private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory emf; @Value("${chunkSize:1000}") private int chunkSize; @Bean(JOB_NAME) public Job processorNullJob() { return jobBuilderFactory.get(JOB_NAME) .preventRestart() .start(processorNullStep()) .build(); } @Bean(BEAN_PREFIX + "step") @JobScope public Step processorNullStep() { return stepBuilderFactory.get(BEAN_PREFIX + "step") .<Pay, Pay>chunk(chunkSize) .reader(processorNullReader()) .processor(processorNull()) .writer(processorNullWriter()) .build(); } @Bean public JpaPagingItemReader<Pay> processorNullReader() { return new JpaPagingItemReaderBuilder<Pay>() .name(BEAN_PREFIX+"reader") .entityManagerFactory(emf) .pageSize(chunkSize) .queryString("SELECT p FROM Pay p") .build(); } @Bean public ItemProcessor<Pay, Pay> processorNull() { return pay -> { boolean isIgnoreTarget = pay.getId() % 2 == 0L; if(isIgnoreTarget){ log.info("====== Pay name={}, isIgnoreTarget={}", pay.getTxName(), isIgnoreTarget); return null; } return pay; }; } private ItemWriter<Pay> processorNullWriter() { return items -> { for (Pay item : items) { log.info("Pay Name={}", item.getTxName()); } }; } }
- Pay의 id 값이 짝수 일 경우를 필터링 해준다.
- id 가 짝수 일 경우 return null;
- writer에 넘기지 않는다.
실행
- id가 짝수인 경우 필터링 되었다.
📍 트랜잭션 범위
Spring Batch에서 트랜잭션 범위는 Chunk 단위이다.
Reader에서 Entity를 반환해주었다면 Entity간의 Lazy Loading이 가능하다.
- 이는 Processor, Writer에서도 가능하다.
Processor
@Slf4j @RequiredArgsConstructor @Configuration public class TransactionProcessorJobConfiguration { public static final String JOB_NAME = "transactionProcessorBatch"; public static final String BEAN_PREFIX = JOB_NAME + "_"; private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory emf; @Value("${chunkSize:1000}") private int chunkSize; @Bean(JOB_NAME) public Job transactionJob() { return jobBuilderFactory.get(JOB_NAME) .preventRestart() .start(transactionStep()) .build(); } @Bean(BEAN_PREFIX + "step") @JobScope public Step transactionStep() { return stepBuilderFactory.get(BEAN_PREFIX + "step") .<Teacher, ClassInformation>chunk(chunkSize) .reader(transactionReader()) .processor(transactionProcessor()) .writer(transactionWriter()) .build(); } @Bean public JpaPagingItemReader<Teacher> transactionReader() { return new JpaPagingItemReaderBuilder<Teacher>() .name(BEAN_PREFIX+"reader") .entityManagerFactory(emf) .pageSize(chunkSize) .queryString("SELECT t FROM Teacher t") .build(); } public ItemProcessor<Teacher, ClassInformation> transactionProcessor() { return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size()); } private ItemWriter<ClassInformation> transactionWriter() { return items -> { log.info("======== Item Write"); for (ClassInformation item : items) { log.info("반 정보= {}", item); } }; } }
- Reader에서 Teacher Entity를 반환하여 Processor에서 Entity의 하위 자식들인 Student를 Lazy loading한다.
Processor 부분에서 teacher.getStudents()로 가져온다.
public ItemProcessor<Teacher, ClassInformation> transactionProcessor() { return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size()); }
실행을 했을 때 Processor가 트랜잭션 범위 밖이라면 오류가 발생할 것이다.
실행
- 오류가 발생하지 않음.
- 즉, Processor는 트랜잭션 범위 안이다.
- Entity Lazy Loading이 가능하다.
Writer
Writer에서의 Lazy Loading
@Slf4j @RequiredArgsConstructor @Configuration public class TransactionWriterJobConfiguration { public static final String JOB_NAME = "transactionWriterBatch"; public static final String BEAN_PREFIX = JOB_NAME + "_"; private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory emf; @Value("${chunkSize:1000}") private int chunkSize; @Bean(JOB_NAME) public Job transactionWriterJob() { return jobBuilderFactory.get(JOB_NAME) .preventRestart() .start(transactionWriterStep()) .build(); } @Bean(BEAN_PREFIX + "step") @JobScope public Step transactionWriterStep() { return stepBuilderFactory.get(BEAN_PREFIX + "step") .<Teacher, Teacher>chunk(chunkSize) .reader(transactionWriterReader()) .writer(transactionWriter()) .build(); } @Bean public JpaPagingItemReader<Teacher> transactionWriterReader() { return new JpaPagingItemReaderBuilder<Teacher>() .name(BEAN_PREFIX+"reader") .entityManagerFactory(emf) .pageSize(chunkSize) .queryString("SELECT t FROM Teacher t") .build(); } private ItemWriter<Teacher> transactionWriter() { return items -> { log.info("====== Item Write"); for (Teacher item : items) { log.info("teacher={}, student Size={}", item.getName(), item.getStudents().size()); } }; } }
- Reader에서 Teacher Entity를 반환하여 Processor를 거치지 않고 바로 Writer로 넘겨
- Writer에서 Student를 Lazy Loading
실행
- Writer 역시 트랜잭션 범위 안이며, Lazy Loading이 가능하다.
📍 ItemProcessor 구현체
Spring Batch에는 자주 사용하는 용도의 Processor를 미리 클래스로 만들어 제공해주고 있다.
- ItemProcessorAdapter
- ValidatingItemProcessor
- CompositeItemProcessor
최근에는 대부분 Processor 구현을 직접하는 경우가 많다(람다식으로 빠르게 구현도 많이 한다.)
CompositeItemProcessor
- ItemProcessor 간의 체이닝을 지원하는 Processor
변환이 2번 필요할 때
하나의 Processor에서 모두 변환을 해야하나?
- 그럼 너무 역할이 커지지 않을까?
CompositeItemProcessor는 이런 의문에서 나오게 되었다.
@Slf4j @RequiredArgsConstructor @Configuration public class ProcessorCompositeJobConfiguration { public static final String JOB_NAME = "processorCompositeBatch"; public static final String BEAN_PREFIX = JOB_NAME + "_"; private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory emf; @Value("${chunkSize:1000}") private int chunkSize; @Bean(JOB_NAME) public Job compositeJob() { return jobBuilderFactory.get(JOB_NAME) .preventRestart() .start(compositeStep()) .build(); } @Bean(BEAN_PREFIX + "step") @JobScope public Step compositeStep() { return stepBuilderFactory.get(BEAN_PREFIX + "step") .<Teacher, String>chunk(chunkSize) .reader(compositeReader()) .processor(compositeProcessor()) .writer(compositeWriter()) .build(); } @Bean public JpaPagingItemReader<Teacher> compositeReader() { return new JpaPagingItemReaderBuilder<Teacher>() .name(BEAN_PREFIX+"reader") .entityManagerFactory(emf) .pageSize(chunkSize) .queryString("SELECT t FROM Teacher t") .build(); } @Bean public CompositeItemProcessor compositeProcessor() { List<ItemProcessor> delegates = new ArrayList<>(2); delegates.add(compositeProcessor1()); delegates.add(compositeProcessor2()); CompositeItemProcessor processor = new CompositeItemProcessor<>(); processor.setDelegates(delegates); return processor; } public ItemProcessor<Teacher, String> compositeProcessor1() { return Teacher::getName; } public ItemProcessor<String, String> compositeProcessor2() { return name -> " 안녕하세요 : "+ name + "입니다."; } private ItemWriter<String> compositeWriter() { return items -> { for (String item : items) { log.info("Teacher Name={}", item); } }; } }
- 첫 번째 processor 에서 “Teacher Name = “
- 두 번째 processor 에서 “안녕하세요 + name + 입니다” 이어서 Writer에게 전달
서로 다른 클래스 타입으로 변환해도 가능하다❗
CompositeItemProcessor에 ItemProcessor List인 delegates을 할당만 하면 모든 구현이 끝난다.
@Bean public CompositeItemProcessor compositeProcessor() { List<ItemProcessor> delegates = new ArrayList<>(2); delegates.add(compositeProcessor1()); delegates.add(compositeProcessor2()); CompositeItemProcessor processor = new CompositeItemProcessor<>(); processor.setDelegates(delegates); return processor; }
여기서 제네릭 타입을 쓰지 못한다.
- 제네릭 타입을 쓰게 되면 delegates에 포함된 모든 ItemProcessor 는 같은 제네릭 타입을 가져야한다.
processor1 → <Teacher, String>
processor2 → <String, String>
- 같은 제네릭 타입을 쓰지 못한다.
실행
참고자료
https://jojoldu.tistory.com/347?category=902551
'🌱 spring > 🚛 spring batch' 카테고리의 다른 글
Quartz2 (0) 2022.12.22 Quartz (1) (0) 2022.12.22 Spring Batch - ItemWriter (0) 2022.12.16 SpringBatch - ItemReader (0) 2022.12.16 Spring Batch - Chunk 지향 처리 (1) 2022.12.16