ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Batch - ItemWriter
    🌱 spring/🚛 spring batch 2022. 12. 16. 15:45

    📌 ItemWriter


    ItemWriter는 SpringBatch에서 사용하는 출력 기능이다.

     

    Spring Batch가 처음 나왔을 때, ItemWritersms ItemReader와 마찬가지로 item을 하나씩 다루었다.

    • 업데이트 이후 부터 ItemWriter는 item 하나를 작성하지 않고 Chunk 단위로 묶인 item List를 다룬다.

     

    • Reader 의 read()는 Item 하나를 반환하는 반면, Writer의 write()는 인자로 Item List를 받는다.

    과정

    1. ItemReader로 통해 각 항목을 개별적으로 읽고 이를 처리하기 위해 ItemProcessor에 전달
    2. 이 과정은 chunk의 Item 개수만큼 처리될 때 까지 계속된다.
    3. 청크 단위만큼 처리가 완료되면 Writer에 전달되어 Writer에 명시되어 있는대로 일괄처리한다.

    즉, Reader와 Processor를 거쳐 처리된 Item을 Chunk 크기 만큼 쌓은 뒤 이를 Writer에 전달

     

    SpringBatch는 다양한 Output 타입을 처리할 수 있도록 많은 Writer를 제공한다.

     

    ✍️ Database Writer


    Java에서 JDBC 또는 ORM을 통해 RDBMS에 접근한다.

    Spring Batch는 JDBC와 ORM 모두 Writer를 제공한다.

     

    Writer는 Chunk 단위의 마지막 단계이다.

    • 따라서 Database의 영속성과 관련해서는 항상 마지막에 flush를 해주어야 한다.

    예 : 영속성을 사용하는 JPA, Hibernate의 경우 ItemWriter 구현체에서는 flush() 와 session.clear()가 따라온다.

     

    JpaItemWriter - write()

    • entityManager.flush()

     

    HibernateItemWriter

    • sessionFactory.getCurrentSession().clear()

    Writer가 받은 모든 Item이 처리 된 후, Spring Batch는 현재 트랜잭션을 커밋한다.

     

    Database와 관련된 Writer는 3가지 가 있다.

    • JdbcBatchItemWriter
    • HibernateItemWriter
    • JpaItemWriter

     

    JdbcBatchItemWriter

    fORM 을 사용하지 않는 경우 Writer는 대부분 JdbcBatchItemWriter를 사용한다.

     

    JdbcBatchItemWriter는 JDBC의 Batch 기능을 사용하여 한 번에 Database로 전달하여 Database 내부에서 쿼리들이 실행되도록 한다.

    이렇게 처리하는 이유

    • 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 횟수를 최소화하여 성능 향상을 꾀하기 위함
    • 업데이트를 일괄처리로 그룹화하면 데이터베이스와 어플리케이션 왕복 횟수가 줄어들어 성능이 향상된다.

     

    JdbcBatchItemWriter - write()

    • 일괄처리를 하고 있다.

     

    JdbcBatchItemWriterJobConfiguration.java

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JdbcBatchItemWriterJobConfiguration {
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource; // DataSource DI
    
        private static final int chunkSize = 10;
    
        @Bean
        public Job jdbcBatchItemWriterJob() {
            return jobBuilderFactory.get("jdbcBatchItemWriterJob")
                    .start(jdbcBatchItemWriterStep())
                    .build();
        }
    
        @Bean
        public Step jdbcBatchItemWriterStep() {
            return stepBuilderFactory.get("jdbcBatchItemWriterStep")
                    .<Pay, Pay>chunk(chunkSize)
                    .reader(jdbcBatchItemWriterReader())
                    .writer(jdbcBatchItemWriter())
                    .build();
        }
    
        @Bean
        public JdbcCursorItemReader<Pay> jdbcBatchItemWriterReader() {
            return new JdbcCursorItemReaderBuilder<Pay>()
                    .fetchSize(chunkSize)
                    .dataSource(dataSource)
                    .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                    .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                    .name("jdbcBatchItemWriter")
                    .build();
        }
    
        /**
         * reader에서 넘어온 데이터를 하나씩 출력하는 writer
         */
        @Bean // beanMapped()을 사용할때는 필수
        public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter() {
            return new JdbcBatchItemWriterBuilder<Pay>()
                    .dataSource(dataSource)
                    .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
                    .beanMapped()
                    .build();
        }
    }
    

    JdbcBatchItemWriterBuilder의 설정값

    Property Prameter Type 설명
    assertUpdates boolean - 적어도 하나의 항목이 행을 업데이트하거나 삭제하지 않을 경우 예외를 throw할지 여부를 설정
    - 기본 - true
    - Exception : EmptyResultDataAccessException
    columnMapped - Key, Value 기반으로 Insert SQL의 Values를 매핑
    - ex) Map<String, Object>
    beanMapped Pojo 기반으로 Insert SQL의 Values를 매핑

    ⚡ JdbcBatchItemWriter 설정에서 주의할 것.

    • JdbcBatchItemWriter의 제네릭 타입은 Reader에서 넘겨주는 값의 타입이다.

     

    columnMapped vs beanMapped

    columnMapped 사용시 코드

    new JdbcBatchItemWriterBuilder<Map<String, Object>>() // Map 사용
                    .columnMapped()
                    .dataSource(this.dataSource)
                    .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
                    .build();
    

     

    차이점

    • columnMapped : Reader에서 Writer로 넘겨주는 타입이 Map<String, Object>
    • beanMapped : Reader에서 Writer로 넘겨주는 타입이 Pay.class와 같은 Pojo 타입

    valeus(:field)

    • Dto의 Getter 혹은 Map의 Key에 매핑되어 값이 할당된다.

     

    메서드

    1. afterPropertiesSet
    2. InitializingBean
    • JdbcBatchItemWriter, JpaItemWriter, JpaItemWriter등 ItemWriter의 구현체들은 모두 InitializingBean 인터페이스를 구현하고 있다.

     

    • afterPropertiesSet
      • 각각의 Writer들이 실행되기 위해 필요한 필수값들이 제대로 세팅되어있는지 체크
      • writer를 생성하고 메소드를 바로 실행하면 어느 값이 누락되었는지 명확하게 인지할 수 있어서 많이 사용한다.

     

    JpaItemWriter

    ORM을 사용할 수 있다.

    Writer에 전달하는 데이터가 Entity 클래스라면 JpaItemWriter를 사용

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JpaItemWriterJobConfiguration {
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final EntityManagerFactory entityManagerFactory;
    
        private static final int chunkSize = 10;
    
        @Bean
        public Job jpaItemWriterJob() {
            return jobBuilderFactory.get("jpaItemWriterJob")
                    .start(jpaItemWriterStep())
                    .build();
        }
    
        @Bean
        public Step jpaItemWriterStep() {
            return stepBuilderFactory.get("jpaItemWriterStep")
                    .<Pay, Pay2>chunk(chunkSize)
                    .reader(jpaItemWriterReader())
                    .processor(jpaItemProcessor())
                    .writer(jpaItemWriter())
                    .build();
        }
    
        @Bean
        public JpaPagingItemReader<Pay> jpaItemWriterReader() {
            return new JpaPagingItemReaderBuilder<Pay>()
                    .name("jpaItemWriterReader")
                    .entityManagerFactory(entityManagerFactory)
                    .pageSize(chunkSize)
                    .queryString("SELECT p FROM Pay p")
                    .build();
        }
    
        @Bean
        public ItemProcessor<Pay, Pay2> jpaItemProcessor() {
            return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
        }
    
        @Bean
        public JpaItemWriter<Pay2> jpaItemWriter() {
            JpaItemWriter<Pay2> jpaItemWriter = new JpaItemWriter<>();
            jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
            return jpaItemWriter;
        }
    }
    

    JPA를 사용하기 때문에 영속성 관리를 위해 EntityManager를 할당해줘야 한다.

     

    spring-boot-starter-data-jpa를 의존성에 등록하면 EntityManager가 Bean으로 자동생성된다.

    • DI 코드만 추가해주면 된다.
    • 대신 필수로 설정해야할 값이 Entity Manager 뿐이다.

     

    JdbcBatchItemWriter와 다른 점

    • processor 가 추가되었다.
    • Pay Entity를 읽어서 Writer에는 Pay2 Entity를 전달해주기 때문

     

    이렇게 데이터 가공할 때 Processor 가 필요하다.

    • JpaItemWriter는 JdbcBatchItemWriter와 달리 넘어온 Entity를 데이터베이스에 반영한다.

     

    즉, JpaItemWriter는 Entity 클래스를 제네릭 타입으로 받아야만 한다.

    • JpaItemWriter는 넘어온 Item을 그대로 entityManager.merge()로 테이블에 반영을 하기 때문이다.

     

    JpaItemWriter.dowrite()

     

    🤲 Custome ItemWriter


    Reader와 달리 Writer의 경우에는 Custom 하게 구현해야할 일이 많다.

    • Reader도 있긴하다
      • Querydsl 기반의 ItemReader
      • Jooq 기반의 ItemReader
    • Writer의 경우
      • Reader에서 읽어온 데이터를 RestTemplate으로 외부 API로 전달해야 할 때
      • 여러 Entity를 동시에 save 해야 할 때

    공식적으로 지원하지 않는 Writer를 사용하고 싶다면 ItemWriter 인터페이스를 구현하면 된다.

     

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class CustomItemWriterJobConfiguration {
    
      private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;
      private final EntityManagerFactory entityManagerFactory;
    
      private static final int chunkSize = 10;
    
      @Bean
      public Job customItemWriterJob() {
        return jobBuilderFactory.get("myCustomItemWriterJob")
            .start(customItemWriterStep())
            .build();
      }
    
      @Bean
      public Step customItemWriterStep() {
        return stepBuilderFactory.get("myCustomItemWriterStep")
            .<Pay, Pay2>chunk(chunkSize)
            .reader(customItemWriterReader())
            .processor(customItemWriterProcessor())
            .writer(customItemWriter())
            .build();
      }
    
      @Bean
      public JpaPagingItemReader<Pay> customItemWriterReader() {
        return new JpaPagingItemReaderBuilder<Pay>()
            .name("myCustomItemWriterReader")
            .entityManagerFactory(entityManagerFactory)
            .pageSize(chunkSize)
            .queryString("SELECT p FROM Pay p")
            .build();
      }
    
      @Bean
      public ItemProcessor<Pay, Pay2> customItemWriterProcessor() {
        return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
      }
    
      @Bean
      public ItemWriter<Pay2> customItemWriter() {
        return items -> {
          for (Pay2 item : items) {
            System.out.println("==== data : === " + item);
          }
        };
      }
    }
    
    • write() @Override 하면 구현체 생성이 끝난다.

    실행

     

    참고자료


    https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch12s04.html

    https://jojoldu.tistory.com/339?category=902551

    '🌱 spring > 🚛 spring batch' 카테고리의 다른 글

    Quartz (1)  (0) 2022.12.22
    Spring Batch - ItemProcessor  (0) 2022.12.16
    SpringBatch - ItemReader  (0) 2022.12.16
    Spring Batch - Chunk 지향 처리  (1) 2022.12.16
    Spring Batch - Scope  (0) 2022.12.16

    댓글

Designed by Tistory.