🌱 spring/🚛 spring batch

Spring Batch - ItemProcessor

beomsic 2022. 12. 16. 15:53

📌 ItemProcessor


ItemProcessor는 필수가 아니다

  • 데이터를 가공하거나 필터링하는 역할
  • 이는 Writer에서도 구현이 가능하다.
  • 그래도 사용하는 이유는 비즈니스 코드가 섞이는 것을 방지하기 위해서이다.

 

배치 어플리케이션에서 비즈니스 로직을 추가할 때는 먼저 Processor를 고려해보자❗

  •  ( 읽기 / 처리 / 쓰기 )를 분리할 수 있는 좋은 방법이다.

ItemProcessor는 Reader에서 넘겨준 개별 데이터를 가공 및 처리해준다.

 

사용하는 두 가지 방법

  1. 변환
    • Reader에서 읽은 데이터를 원하는 타입으로 변환해서 Writer에 넘겨 준다.
  2. 필터
    • Reader에서 넘겨준 데이터를 Writer로 넘겨줄 것인지를 결정한다.
    • null 을 반환하면 Writer에 전달되지 않음.

 

🤷‍♂️ 기본 사용법


ItemProcessor 인터페이스는 두 개의 제네릭 타입이 필요하다.

  • I : ItemReader에서 받을 데이터 타입
  • O : ItemWriter에 보낼 데이터 타입

구현할 메소드는 process() 하나이다.

자바 8 부터 인터페이스의 추상 메소드가 1개일 경우 람다식 사용가능

  • ItemProcessor도 추상 메서드가 process() 하나로 람다식을 사용할 수 있다.
  • 따라서 ItemProcessor를 익명 클래스 혹은 람다식으로 자주 사용한다.
@Bean(BEAN_PREFIX + "processor")
@StepScope
public ItemProcessor<ReadType, WriteType> processor() {
    return item -> {
        item.convert();
        return item;
    };
}

익명 클래스 혹은 람다식 사용 이유

  • 불필요한 코드가 없어 코드 양이 적다
  • 고정된 형태가 없어 원하는 형태의 어떤 처리도 가능하다

단점

  • Batch Config 클래스 안에 코드가 포함되어 Batch Config 코드의 양이 많아 질 수 있다.

 

💨 변환


Reader에서 읽은 타입을 변환하여 Wrtier에게 전달

@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorConvertJobConfiguration {

  public static final String JOB_NAME = "ProcessorConvertBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory emf;

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job job() {
    return jobBuilderFactory.get(JOB_NAME)
        .preventRestart()
        .start(step())
        .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step step() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
        .<Pay, String>chunk(chunkSize)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
  }

  @Bean
  public JpaPagingItemReader<Pay> reader() {
    return new JpaPagingItemReaderBuilder<Pay>()
        .name(BEAN_PREFIX+"reader")
        .entityManagerFactory(emf)
        .pageSize(chunkSize)
        .queryString("SELECT p FROM Pay p")
        .build();
  }

  @Bean
  public ItemProcessor<Pay, String> processor() {
    return pay -> {
      return pay.getTxName();
    };
  }

  private ItemWriter<String> writer() {
    return items -> {
      for (String item : items) {
        log.info("Pay Name={}", item);
      }
    };
  }
}
  • Pay 클래스를 읽어와 name 필드(String 타입)을 Writer에 넘긴다.
  • Reader에서 읽어올 타입 : Pay
  • Writer에 전달할 타입 : String
  • 따라서 제네릭 타입은 <Pay, String> 이다.

ChunkSize 앞에 선언될 타입 역시 Reader와 Writer 타입을 따라가야 한다

  .<Pay, String>chunk(chunkSize)

실행

  • Processor를 거쳐 Pay 클래스가 String으로 변환되었다.

👀 필터


Writer에 값을 넘길지 말지 Processor에서 판단한다.

 

@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorNullJobConfiguration {

  public static final String JOB_NAME = "processorNullBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory emf;

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job processorNullJob() {
    return jobBuilderFactory.get(JOB_NAME)
        .preventRestart()
        .start(processorNullStep())
        .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step processorNullStep() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
        .<Pay, Pay>chunk(chunkSize)
        .reader(processorNullReader())
        .processor(processorNull())
        .writer(processorNullWriter())
        .build();
  }

  @Bean
  public JpaPagingItemReader<Pay> processorNullReader() {
    return new JpaPagingItemReaderBuilder<Pay>()
        .name(BEAN_PREFIX+"reader")
        .entityManagerFactory(emf)
        .pageSize(chunkSize)
        .queryString("SELECT p FROM Pay p")
        .build();
  }

  @Bean
  public ItemProcessor<Pay, Pay> processorNull() {
    return pay -> {

      boolean isIgnoreTarget = pay.getId() % 2 == 0L;
      if(isIgnoreTarget){
        log.info("====== Pay name={}, isIgnoreTarget={}", pay.getTxName(), isIgnoreTarget);
        return null;
      }

      return pay;
    };
  }

  private ItemWriter<Pay> processorNullWriter() {
    return items -> {
      for (Pay item : items) {
        log.info("Pay Name={}", item.getTxName());
      }
    };
  }
}
  • Pay의 id 값이 짝수 일 경우를 필터링 해준다.
  • id 가 짝수 일 경우 return null;
    • writer에 넘기지 않는다.

실행

  • id가 짝수인 경우 필터링 되었다.

 

📍 트랜잭션 범위


Spring Batch에서 트랜잭션 범위Chunk 단위이다.

Reader에서 Entity를 반환해주었다면 Entity간의 Lazy Loading이 가능하다.

  • 이는 Processor, Writer에서도 가능하다.

 

Processor

@Slf4j
@RequiredArgsConstructor
@Configuration
public class TransactionProcessorJobConfiguration {

  public static final String JOB_NAME = "transactionProcessorBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory emf;

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job transactionJob() {
    return jobBuilderFactory.get(JOB_NAME)
        .preventRestart()
        .start(transactionStep())
        .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step transactionStep() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
        .<Teacher, ClassInformation>chunk(chunkSize)
        .reader(transactionReader())
        .processor(transactionProcessor())
        .writer(transactionWriter())
        .build();
  }

  @Bean
  public JpaPagingItemReader<Teacher> transactionReader() {
    return new JpaPagingItemReaderBuilder<Teacher>()
        .name(BEAN_PREFIX+"reader")
        .entityManagerFactory(emf)
        .pageSize(chunkSize)
        .queryString("SELECT t FROM Teacher t")
        .build();
  }

  public ItemProcessor<Teacher, ClassInformation> transactionProcessor() {
    return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size());
  }

  private ItemWriter<ClassInformation> transactionWriter() {
    return items -> {
      log.info("========   Item Write");
      for (ClassInformation item : items) {
        log.info("반 정보= {}", item);
      }
    };
  }
}
  • Reader에서 Teacher Entity를 반환하여 Processor에서 Entity의 하위 자식들인 Student를 Lazy loading한다.

 

Processor 부분에서 teacher.getStudents()로 가져온다.

 public ItemProcessor<Teacher, ClassInformation> transactionProcessor() {
    return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size());
  }

실행을 했을 때 Processor가 트랜잭션 범위 밖이라면 오류가 발생할 것이다.

실행

wj

  • 오류가 발생하지 않음.
  • 즉, Processor는 트랜잭션 범위 안이다.
  • Entity Lazy Loading이 가능하다.

 

Writer

Writer에서의 Lazy Loading

@Slf4j
@RequiredArgsConstructor
@Configuration
public class TransactionWriterJobConfiguration {

  public static final String JOB_NAME = "transactionWriterBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory emf;

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job transactionWriterJob() {
    return jobBuilderFactory.get(JOB_NAME)
        .preventRestart()
        .start(transactionWriterStep())
        .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step transactionWriterStep() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
        .<Teacher, Teacher>chunk(chunkSize)
        .reader(transactionWriterReader())
        .writer(transactionWriter())
        .build();
  }

  @Bean
  public JpaPagingItemReader<Teacher> transactionWriterReader() {
    return new JpaPagingItemReaderBuilder<Teacher>()
        .name(BEAN_PREFIX+"reader")
        .entityManagerFactory(emf)
        .pageSize(chunkSize)
        .queryString("SELECT t FROM Teacher t")
        .build();
  }

  private ItemWriter<Teacher> transactionWriter() {
    return items -> {
      log.info("====== Item Write");
      for (Teacher item : items) {
        log.info("teacher={}, student Size={}", item.getName(), item.getStudents().size());
      }
    };
  }
}
  • Reader에서 Teacher Entity를 반환하여 Processor를 거치지 않고 바로 Writer로 넘겨
  • Writer에서 Student를 Lazy Loading

실행

  • Writer 역시 트랜잭션 범위 안이며, Lazy Loading이 가능하다.

 

📍 ItemProcessor 구현체


 

Spring Batch에는 자주 사용하는 용도의 Processor를 미리 클래스로 만들어 제공해주고 있다.

  • ItemProcessorAdapter
  • ValidatingItemProcessor
  • CompositeItemProcessor

최근에는 대부분 Processor 구현을 직접하는 경우가 많다(람다식으로 빠르게 구현도 많이 한다.)

 

CompositeItemProcessor

  • ItemProcessor 간의 체이닝을 지원하는 Processor

변환이 2번 필요할 때

하나의 Processor에서 모두 변환을 해야하나?

  • 그럼 너무 역할이 커지지 않을까?

CompositeItemProcessor는 이런 의문에서 나오게 되었다.

@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorCompositeJobConfiguration {

  public static final String JOB_NAME = "processorCompositeBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory emf;

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job compositeJob() {
    return jobBuilderFactory.get(JOB_NAME)
        .preventRestart()
        .start(compositeStep())
        .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step compositeStep() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
        .<Teacher, String>chunk(chunkSize)
        .reader(compositeReader())
        .processor(compositeProcessor())
        .writer(compositeWriter())
        .build();
  }

  @Bean
  public JpaPagingItemReader<Teacher> compositeReader() {
    return new JpaPagingItemReaderBuilder<Teacher>()
        .name(BEAN_PREFIX+"reader")
        .entityManagerFactory(emf)
        .pageSize(chunkSize)
        .queryString("SELECT t FROM Teacher t")
        .build();
  }

  @Bean
  public CompositeItemProcessor compositeProcessor() {
    List<ItemProcessor> delegates = new ArrayList<>(2);
    delegates.add(compositeProcessor1());
    delegates.add(compositeProcessor2());

    CompositeItemProcessor processor = new CompositeItemProcessor<>();

    processor.setDelegates(delegates);

    return processor;
  }

  public ItemProcessor<Teacher, String> compositeProcessor1() {
    return Teacher::getName;
  }

  public ItemProcessor<String, String> compositeProcessor2() {
    return name -> " 안녕하세요  :  "+ name + "입니다.";
  }

  private ItemWriter<String> compositeWriter() {
    return items -> {
      for (String item : items) {
        log.info("Teacher Name={}", item);
      }
    };
  }
}
  • 첫 번째 processor 에서 “Teacher Name = “
  • 두 번째 processor 에서 “안녕하세요 + name + 입니다” 이어서 Writer에게 전달

서로 다른 클래스 타입으로 변환해도 가능하다❗

CompositeItemProcessor에 ItemProcessor List인 delegates을 할당만 하면 모든 구현이 끝난다.

  @Bean
  public CompositeItemProcessor compositeProcessor() {
    List<ItemProcessor> delegates = new ArrayList<>(2);
    delegates.add(compositeProcessor1());
    delegates.add(compositeProcessor2());

    CompositeItemProcessor processor = new CompositeItemProcessor<>();

    processor.setDelegates(delegates);

    return processor;
  }

여기서 제네릭 타입을 쓰지 못한다.

  • 제네릭 타입을 쓰게 되면 delegates에 포함된 모든 ItemProcessor 는 같은 제네릭 타입을 가져야한다.

processor1<Teacher, String>

processor2<String, String>

  • 같은 제네릭 타입을 쓰지 못한다.

 

실행

 

참고자료

https://jojoldu.tistory.com/347?category=902551