ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Batch - ItemProcessor
    🌱 spring/🚛 spring batch 2022. 12. 16. 15:53

    📌 ItemProcessor


    ItemProcessor는 필수가 아니다

    • 데이터를 가공하거나 필터링하는 역할
    • 이는 Writer에서도 구현이 가능하다.
    • 그래도 사용하는 이유는 비즈니스 코드가 섞이는 것을 방지하기 위해서이다.

     

    배치 어플리케이션에서 비즈니스 로직을 추가할 때는 먼저 Processor를 고려해보자❗

    •  ( 읽기 / 처리 / 쓰기 )를 분리할 수 있는 좋은 방법이다.

    ItemProcessor는 Reader에서 넘겨준 개별 데이터를 가공 및 처리해준다.

     

    사용하는 두 가지 방법

    1. 변환
      • Reader에서 읽은 데이터를 원하는 타입으로 변환해서 Writer에 넘겨 준다.
    2. 필터
      • Reader에서 넘겨준 데이터를 Writer로 넘겨줄 것인지를 결정한다.
      • null 을 반환하면 Writer에 전달되지 않음.

     

    🤷‍♂️ 기본 사용법


    ItemProcessor 인터페이스는 두 개의 제네릭 타입이 필요하다.

    • I : ItemReader에서 받을 데이터 타입
    • O : ItemWriter에 보낼 데이터 타입

    구현할 메소드는 process() 하나이다.

    자바 8 부터 인터페이스의 추상 메소드가 1개일 경우 람다식 사용가능

    • ItemProcessor도 추상 메서드가 process() 하나로 람다식을 사용할 수 있다.
    • 따라서 ItemProcessor를 익명 클래스 혹은 람다식으로 자주 사용한다.
    @Bean(BEAN_PREFIX + "processor")
    @StepScope
    public ItemProcessor<ReadType, WriteType> processor() {
        return item -> {
            item.convert();
            return item;
        };
    }

    익명 클래스 혹은 람다식 사용 이유

    • 불필요한 코드가 없어 코드 양이 적다
    • 고정된 형태가 없어 원하는 형태의 어떤 처리도 가능하다

    단점

    • Batch Config 클래스 안에 코드가 포함되어 Batch Config 코드의 양이 많아 질 수 있다.

     

    💨 변환


    Reader에서 읽은 타입을 변환하여 Wrtier에게 전달

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class ProcessorConvertJobConfiguration {
    
      public static final String JOB_NAME = "ProcessorConvertBatch";
      public static final String BEAN_PREFIX = JOB_NAME + "_";
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory emf;
    
      @Value("${chunkSize:1000}")
      private int chunkSize;
    
      @Bean(JOB_NAME)
      public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(step())
            .build();
      }
    
      @Bean(BEAN_PREFIX + "step")
      @JobScope
      public Step step() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Pay, String>chunk(chunkSize)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
      }
    
      @Bean
      public JpaPagingItemReader<Pay> reader() {
        return new JpaPagingItemReaderBuilder<Pay>()
            .name(BEAN_PREFIX+"reader")
            .entityManagerFactory(emf)
            .pageSize(chunkSize)
            .queryString("SELECT p FROM Pay p")
            .build();
      }
    
      @Bean
      public ItemProcessor<Pay, String> processor() {
        return pay -> {
          return pay.getTxName();
        };
      }
    
      private ItemWriter<String> writer() {
        return items -> {
          for (String item : items) {
            log.info("Pay Name={}", item);
          }
        };
      }
    }
    
    • Pay 클래스를 읽어와 name 필드(String 타입)을 Writer에 넘긴다.
    • Reader에서 읽어올 타입 : Pay
    • Writer에 전달할 타입 : String
    • 따라서 제네릭 타입은 <Pay, String> 이다.

    ChunkSize 앞에 선언될 타입 역시 Reader와 Writer 타입을 따라가야 한다

      .<Pay, String>chunk(chunkSize)
    

    실행

    • Processor를 거쳐 Pay 클래스가 String으로 변환되었다.

    👀 필터


    Writer에 값을 넘길지 말지 Processor에서 판단한다.

     

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class ProcessorNullJobConfiguration {
    
      public static final String JOB_NAME = "processorNullBatch";
      public static final String BEAN_PREFIX = JOB_NAME + "_";
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory emf;
    
      @Value("${chunkSize:1000}")
      private int chunkSize;
    
      @Bean(JOB_NAME)
      public Job processorNullJob() {
        return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(processorNullStep())
            .build();
      }
    
      @Bean(BEAN_PREFIX + "step")
      @JobScope
      public Step processorNullStep() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Pay, Pay>chunk(chunkSize)
            .reader(processorNullReader())
            .processor(processorNull())
            .writer(processorNullWriter())
            .build();
      }
    
      @Bean
      public JpaPagingItemReader<Pay> processorNullReader() {
        return new JpaPagingItemReaderBuilder<Pay>()
            .name(BEAN_PREFIX+"reader")
            .entityManagerFactory(emf)
            .pageSize(chunkSize)
            .queryString("SELECT p FROM Pay p")
            .build();
      }
    
      @Bean
      public ItemProcessor<Pay, Pay> processorNull() {
        return pay -> {
    
          boolean isIgnoreTarget = pay.getId() % 2 == 0L;
          if(isIgnoreTarget){
            log.info("====== Pay name={}, isIgnoreTarget={}", pay.getTxName(), isIgnoreTarget);
            return null;
          }
    
          return pay;
        };
      }
    
      private ItemWriter<Pay> processorNullWriter() {
        return items -> {
          for (Pay item : items) {
            log.info("Pay Name={}", item.getTxName());
          }
        };
      }
    }
    
    • Pay의 id 값이 짝수 일 경우를 필터링 해준다.
    • id 가 짝수 일 경우 return null;
      • writer에 넘기지 않는다.

    실행

    • id가 짝수인 경우 필터링 되었다.

     

    📍 트랜잭션 범위


    Spring Batch에서 트랜잭션 범위Chunk 단위이다.

    Reader에서 Entity를 반환해주었다면 Entity간의 Lazy Loading이 가능하다.

    • 이는 Processor, Writer에서도 가능하다.

     

    Processor

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class TransactionProcessorJobConfiguration {
    
      public static final String JOB_NAME = "transactionProcessorBatch";
      public static final String BEAN_PREFIX = JOB_NAME + "_";
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory emf;
    
      @Value("${chunkSize:1000}")
      private int chunkSize;
    
      @Bean(JOB_NAME)
      public Job transactionJob() {
        return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(transactionStep())
            .build();
      }
    
      @Bean(BEAN_PREFIX + "step")
      @JobScope
      public Step transactionStep() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Teacher, ClassInformation>chunk(chunkSize)
            .reader(transactionReader())
            .processor(transactionProcessor())
            .writer(transactionWriter())
            .build();
      }
    
      @Bean
      public JpaPagingItemReader<Teacher> transactionReader() {
        return new JpaPagingItemReaderBuilder<Teacher>()
            .name(BEAN_PREFIX+"reader")
            .entityManagerFactory(emf)
            .pageSize(chunkSize)
            .queryString("SELECT t FROM Teacher t")
            .build();
      }
    
      public ItemProcessor<Teacher, ClassInformation> transactionProcessor() {
        return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size());
      }
    
      private ItemWriter<ClassInformation> transactionWriter() {
        return items -> {
          log.info("========   Item Write");
          for (ClassInformation item : items) {
            log.info("반 정보= {}", item);
          }
        };
      }
    }
    
    • Reader에서 Teacher Entity를 반환하여 Processor에서 Entity의 하위 자식들인 Student를 Lazy loading한다.

     

    Processor 부분에서 teacher.getStudents()로 가져온다.

     public ItemProcessor<Teacher, ClassInformation> transactionProcessor() {
        return teacher -> new ClassInformation(teacher.getName(), teacher.getStudents().size());
      }

    실행을 했을 때 Processor가 트랜잭션 범위 밖이라면 오류가 발생할 것이다.

    실행

    wj

    • 오류가 발생하지 않음.
    • 즉, Processor는 트랜잭션 범위 안이다.
    • Entity Lazy Loading이 가능하다.

     

    Writer

    Writer에서의 Lazy Loading

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class TransactionWriterJobConfiguration {
    
      public static final String JOB_NAME = "transactionWriterBatch";
      public static final String BEAN_PREFIX = JOB_NAME + "_";
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory emf;
    
      @Value("${chunkSize:1000}")
      private int chunkSize;
    
      @Bean(JOB_NAME)
      public Job transactionWriterJob() {
        return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(transactionWriterStep())
            .build();
      }
    
      @Bean(BEAN_PREFIX + "step")
      @JobScope
      public Step transactionWriterStep() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Teacher, Teacher>chunk(chunkSize)
            .reader(transactionWriterReader())
            .writer(transactionWriter())
            .build();
      }
    
      @Bean
      public JpaPagingItemReader<Teacher> transactionWriterReader() {
        return new JpaPagingItemReaderBuilder<Teacher>()
            .name(BEAN_PREFIX+"reader")
            .entityManagerFactory(emf)
            .pageSize(chunkSize)
            .queryString("SELECT t FROM Teacher t")
            .build();
      }
    
      private ItemWriter<Teacher> transactionWriter() {
        return items -> {
          log.info("====== Item Write");
          for (Teacher item : items) {
            log.info("teacher={}, student Size={}", item.getName(), item.getStudents().size());
          }
        };
      }
    }
    
    • Reader에서 Teacher Entity를 반환하여 Processor를 거치지 않고 바로 Writer로 넘겨
    • Writer에서 Student를 Lazy Loading

    실행

    • Writer 역시 트랜잭션 범위 안이며, Lazy Loading이 가능하다.

     

    📍 ItemProcessor 구현체


     

    Spring Batch에는 자주 사용하는 용도의 Processor를 미리 클래스로 만들어 제공해주고 있다.

    • ItemProcessorAdapter
    • ValidatingItemProcessor
    • CompositeItemProcessor

    최근에는 대부분 Processor 구현을 직접하는 경우가 많다(람다식으로 빠르게 구현도 많이 한다.)

     

    CompositeItemProcessor

    • ItemProcessor 간의 체이닝을 지원하는 Processor

    변환이 2번 필요할 때

    하나의 Processor에서 모두 변환을 해야하나?

    • 그럼 너무 역할이 커지지 않을까?

    CompositeItemProcessor는 이런 의문에서 나오게 되었다.

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class ProcessorCompositeJobConfiguration {
    
      public static final String JOB_NAME = "processorCompositeBatch";
      public static final String BEAN_PREFIX = JOB_NAME + "_";
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory emf;
    
      @Value("${chunkSize:1000}")
      private int chunkSize;
    
      @Bean(JOB_NAME)
      public Job compositeJob() {
        return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(compositeStep())
            .build();
      }
    
      @Bean(BEAN_PREFIX + "step")
      @JobScope
      public Step compositeStep() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Teacher, String>chunk(chunkSize)
            .reader(compositeReader())
            .processor(compositeProcessor())
            .writer(compositeWriter())
            .build();
      }
    
      @Bean
      public JpaPagingItemReader<Teacher> compositeReader() {
        return new JpaPagingItemReaderBuilder<Teacher>()
            .name(BEAN_PREFIX+"reader")
            .entityManagerFactory(emf)
            .pageSize(chunkSize)
            .queryString("SELECT t FROM Teacher t")
            .build();
      }
    
      @Bean
      public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(compositeProcessor1());
        delegates.add(compositeProcessor2());
    
        CompositeItemProcessor processor = new CompositeItemProcessor<>();
    
        processor.setDelegates(delegates);
    
        return processor;
      }
    
      public ItemProcessor<Teacher, String> compositeProcessor1() {
        return Teacher::getName;
      }
    
      public ItemProcessor<String, String> compositeProcessor2() {
        return name -> " 안녕하세요  :  "+ name + "입니다.";
      }
    
      private ItemWriter<String> compositeWriter() {
        return items -> {
          for (String item : items) {
            log.info("Teacher Name={}", item);
          }
        };
      }
    }
    
    • 첫 번째 processor 에서 “Teacher Name = “
    • 두 번째 processor 에서 “안녕하세요 + name + 입니다” 이어서 Writer에게 전달

    서로 다른 클래스 타입으로 변환해도 가능하다❗

    CompositeItemProcessor에 ItemProcessor List인 delegates을 할당만 하면 모든 구현이 끝난다.

      @Bean
      public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(compositeProcessor1());
        delegates.add(compositeProcessor2());
    
        CompositeItemProcessor processor = new CompositeItemProcessor<>();
    
        processor.setDelegates(delegates);
    
        return processor;
      }

    여기서 제네릭 타입을 쓰지 못한다.

    • 제네릭 타입을 쓰게 되면 delegates에 포함된 모든 ItemProcessor 는 같은 제네릭 타입을 가져야한다.

    processor1<Teacher, String>

    processor2<String, String>

    • 같은 제네릭 타입을 쓰지 못한다.

     

    실행

     

    참고자료

    https://jojoldu.tistory.com/347?category=902551

     

    '🌱 spring > 🚛 spring batch' 카테고리의 다른 글

    Quartz2  (0) 2022.12.22
    Quartz (1)  (0) 2022.12.22
    Spring Batch - ItemWriter  (0) 2022.12.16
    SpringBatch - ItemReader  (0) 2022.12.16
    Spring Batch - Chunk 지향 처리  (1) 2022.12.16

    댓글

Designed by Tistory.