Spring Batch - ItemProcessor
📌 ItemProcessor
ItemProcessor는 필수가 아니다 ❗
- 데이터를 가공하거나 필터링하는 역할
- 이는 Writer에서도 구현이 가능하다.
- 그래도 사용하는 이유는 비즈니스 코드가 섞이는 것을 방지하기 위해서이다.
배치 어플리케이션에서 비즈니스 로직을 추가할 때는 먼저 Processor를 고려해보자❗
- ( 읽기 / 처리 / 쓰기 )를 분리할 수 있는 좋은 방법이다.
ItemProcessor는 Reader에서 넘겨준 개별 데이터를 가공 및 처리해준다.
사용하는 두 가지 방법
- 변환
- Reader에서 읽은 데이터를 원하는 타입으로 변환해서 Writer에 넘겨 준다.
- 필터
- Reader에서 넘겨준 데이터를 Writer로 넘겨줄 것인지를 결정한다.
- null 을 반환하면 Writer에 전달되지 않음.
🤷♂️ 기본 사용법
ItemProcessor 인터페이스는 두 개의 제네릭 타입이 필요하다.
- I : ItemReader에서 받을 데이터 타입
- O : ItemWriter에 보낼 데이터 타입
구현할 메소드는 process() 하나이다.
❕자바 8 부터 인터페이스의 추상 메소드가 1개일 경우 람다식 사용가능
- ItemProcessor도 추상 메서드가 process() 하나로 람다식을 사용할 수 있다.
- 따라서 ItemProcessor를 익명 클래스 혹은 람다식으로 자주 사용한다.
@Bean(BEAN_PREFIX + "processor")
@StepScope
public ItemProcessor<ReadType, WriteType> processor() {
return item -> {
item.convert();
return item;
};
}
익명 클래스 혹은 람다식 사용 이유
- 불필요한 코드가 없어 코드 양이 적다
- 고정된 형태가 없어 원하는 형태의 어떤 처리도 가능하다
단점
- Batch Config 클래스 안에 코드가 포함되어 Batch Config 코드의 양이 많아 질 수 있다.
💨 변환
Reader에서 읽은 타입을 변환하여 Wrtier에게 전달
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorConvertJobConfiguration {
public static final String JOB_NAME = "ProcessorConvertBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step step() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Pay, String>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public JpaPagingItemReader<Pay> reader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p")
.build();
}
@Bean
public ItemProcessor<Pay, String> processor() {
return pay -> {
return pay.getTxName();
};
}
private ItemWriter<String> writer() {
return items -> {
for (String item : items) {
log.info("Pay Name={}", item);
}
};
}
}
- Pay 클래스를 읽어와 name 필드(String 타입)을 Writer에 넘긴다.
- Reader에서 읽어올 타입 : Pay
- Writer에 전달할 타입 : String
- 따라서 제네릭 타입은 <Pay, String> 이다.
ChunkSize 앞에 선언될 타입 역시 Reader와 Writer 타입을 따라가야 한다
.<Pay, String>chunk(chunkSize)
실행
- Processor를 거쳐 Pay 클래스가 String으로 변환되었다.
👀 필터
Writer에 값을 넘길지 말지 Processor에서 판단한다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorNullJobConfiguration {
public static final String JOB_NAME = "processorNullBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job processorNullJob() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(processorNullStep())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step processorNullStep() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Pay, Pay>chunk(chunkSize)
.reader(processorNullReader())
.processor(processorNull())
.writer(processorNullWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> processorNullReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p")
.build();
}
@Bean
public ItemProcessor<Pay, Pay> processorNull() {
return pay -> {
boolean isIgnoreTarget = pay.getId() % 2 == 0L;
if(isIgnoreTarget){
log.info("====== Pay name={}, isIgnoreTarget={}", pay.getTxName(), isIgnoreTarget);
return null;
}
return pay;
};
}
private ItemWriter<Pay> processorNullWriter() {
return items -> {
for (Pay item : items) {
log.info("Pay Name={}", item.getTxName());
}
};
}
}
- Pay의 id 값이 짝수 일 경우를 필터링 해준다.
- id 가 짝수 일 경우 return null;
- writer에 넘기지 않는다.
실행
- id가 짝수인 경우 필터링 되었다.
📍 트랜잭션 범위
Spring Batch에서 트랜잭션 범위는 Chunk 단위이다.
Reader에서 Entity를 반환해주었다면 Entity간의 Lazy Loading이 가능하다.
- 이는 Processor, Writer에서도 가능하다.
Processor
@Slf4j
@RequiredArgsConstructor
@Configuration
public class TransactionProcessorJobConfiguration {
public static final String JOB_NAME = "transactionProcessorBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job transactionJob() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(transactionStep())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step transactionStep() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Teacher, ClassInformation>chunk(chunkSize)
.reader(transactionReader())
.processor(transactionProcessor())
.writer(transactionWriter())
.build();
}
@Bean
public JpaPagingItemReader<Teacher> transactionReader() {
return new JpaPagingItemReaderBuilder<Teacher>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Teacher t")
.build();
}
public ItemProcessor<Teacher, ClassInformation> transactionProcessor() {
return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size());
}
private ItemWriter<ClassInformation> transactionWriter() {
return items -> {
log.info("======== Item Write");
for (ClassInformation item : items) {
log.info("반 정보= {}", item);
}
};
}
}
- Reader에서 Teacher Entity를 반환하여 Processor에서 Entity의 하위 자식들인 Student를 Lazy loading한다.
Processor 부분에서 teacher.getStudents()로 가져온다.
public ItemProcessor<Teacher, ClassInformation> transactionProcessor() {
return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size());
}
실행을 했을 때 Processor가 트랜잭션 범위 밖이라면 오류가 발생할 것이다.
실행
- 오류가 발생하지 않음.
- 즉, Processor는 트랜잭션 범위 안이다.
- Entity Lazy Loading이 가능하다.
Writer
Writer에서의 Lazy Loading
@Slf4j
@RequiredArgsConstructor
@Configuration
public class TransactionWriterJobConfiguration {
public static final String JOB_NAME = "transactionWriterBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job transactionWriterJob() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(transactionWriterStep())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step transactionWriterStep() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Teacher, Teacher>chunk(chunkSize)
.reader(transactionWriterReader())
.writer(transactionWriter())
.build();
}
@Bean
public JpaPagingItemReader<Teacher> transactionWriterReader() {
return new JpaPagingItemReaderBuilder<Teacher>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Teacher t")
.build();
}
private ItemWriter<Teacher> transactionWriter() {
return items -> {
log.info("====== Item Write");
for (Teacher item : items) {
log.info("teacher={}, student Size={}", item.getName(), item.getStudents().size());
}
};
}
}
- Reader에서 Teacher Entity를 반환하여 Processor를 거치지 않고 바로 Writer로 넘겨
- Writer에서 Student를 Lazy Loading
실행
- Writer 역시 트랜잭션 범위 안이며, Lazy Loading이 가능하다.
📍 ItemProcessor 구현체
Spring Batch에는 자주 사용하는 용도의 Processor를 미리 클래스로 만들어 제공해주고 있다.
- ItemProcessorAdapter
- ValidatingItemProcessor
- CompositeItemProcessor
최근에는 대부분 Processor 구현을 직접하는 경우가 많다(람다식으로 빠르게 구현도 많이 한다.)
CompositeItemProcessor
- ItemProcessor 간의 체이닝을 지원하는 Processor
변환이 2번 필요할 때
하나의 Processor에서 모두 변환을 해야하나?
- 그럼 너무 역할이 커지지 않을까?
CompositeItemProcessor는 이런 의문에서 나오게 되었다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorCompositeJobConfiguration {
public static final String JOB_NAME = "processorCompositeBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job compositeJob() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(compositeStep())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step compositeStep() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Teacher, String>chunk(chunkSize)
.reader(compositeReader())
.processor(compositeProcessor())
.writer(compositeWriter())
.build();
}
@Bean
public JpaPagingItemReader<Teacher> compositeReader() {
return new JpaPagingItemReaderBuilder<Teacher>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Teacher t")
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(compositeProcessor1());
delegates.add(compositeProcessor2());
CompositeItemProcessor processor = new CompositeItemProcessor<>();
processor.setDelegates(delegates);
return processor;
}
public ItemProcessor<Teacher, String> compositeProcessor1() {
return Teacher::getName;
}
public ItemProcessor<String, String> compositeProcessor2() {
return name -> " 안녕하세요 : "+ name + "입니다.";
}
private ItemWriter<String> compositeWriter() {
return items -> {
for (String item : items) {
log.info("Teacher Name={}", item);
}
};
}
}
- 첫 번째 processor 에서 “Teacher Name = “
- 두 번째 processor 에서 “안녕하세요 + name + 입니다” 이어서 Writer에게 전달
서로 다른 클래스 타입으로 변환해도 가능하다❗
CompositeItemProcessor에 ItemProcessor List인 delegates을 할당만 하면 모든 구현이 끝난다.
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(compositeProcessor1());
delegates.add(compositeProcessor2());
CompositeItemProcessor processor = new CompositeItemProcessor<>();
processor.setDelegates(delegates);
return processor;
}
여기서 제네릭 타입을 쓰지 못한다.
- 제네릭 타입을 쓰게 되면 delegates에 포함된 모든 ItemProcessor 는 같은 제네릭 타입을 가져야한다.
processor1 → <Teacher, String>
processor2 → <String, String>
- 같은 제네릭 타입을 쓰지 못한다.
실행
참고자료
https://jojoldu.tistory.com/347?category=902551