🌱 spring/🚛 spring batch

Spring Batch - ItemWriter

beomsic 2022. 12. 16. 15:45

📌 ItemWriter


ItemWriter는 SpringBatch에서 사용하는 출력 기능이다.

 

Spring Batch가 처음 나왔을 때, ItemWritersms ItemReader와 마찬가지로 item을 하나씩 다루었다.

  • 업데이트 이후 부터 ItemWriter는 item 하나를 작성하지 않고 Chunk 단위로 묶인 item List를 다룬다.

 

  • Reader 의 read()는 Item 하나를 반환하는 반면, Writer의 write()는 인자로 Item List를 받는다.

과정

  1. ItemReader로 통해 각 항목을 개별적으로 읽고 이를 처리하기 위해 ItemProcessor에 전달
  2. 이 과정은 chunk의 Item 개수만큼 처리될 때 까지 계속된다.
  3. 청크 단위만큼 처리가 완료되면 Writer에 전달되어 Writer에 명시되어 있는대로 일괄처리한다.

즉, Reader와 Processor를 거쳐 처리된 Item을 Chunk 크기 만큼 쌓은 뒤 이를 Writer에 전달

 

SpringBatch는 다양한 Output 타입을 처리할 수 있도록 많은 Writer를 제공한다.

 

✍️ Database Writer


Java에서 JDBC 또는 ORM을 통해 RDBMS에 접근한다.

Spring Batch는 JDBC와 ORM 모두 Writer를 제공한다.

 

Writer는 Chunk 단위의 마지막 단계이다.

  • 따라서 Database의 영속성과 관련해서는 항상 마지막에 flush를 해주어야 한다.

예 : 영속성을 사용하는 JPA, Hibernate의 경우 ItemWriter 구현체에서는 flush() 와 session.clear()가 따라온다.

 

JpaItemWriter - write()

  • entityManager.flush()

 

HibernateItemWriter

  • sessionFactory.getCurrentSession().clear()

Writer가 받은 모든 Item이 처리 된 후, Spring Batch는 현재 트랜잭션을 커밋한다.

 

Database와 관련된 Writer는 3가지 가 있다.

  • JdbcBatchItemWriter
  • HibernateItemWriter
  • JpaItemWriter

 

JdbcBatchItemWriter

fORM 을 사용하지 않는 경우 Writer는 대부분 JdbcBatchItemWriter를 사용한다.

 

JdbcBatchItemWriter는 JDBC의 Batch 기능을 사용하여 한 번에 Database로 전달하여 Database 내부에서 쿼리들이 실행되도록 한다.

이렇게 처리하는 이유

  • 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 횟수를 최소화하여 성능 향상을 꾀하기 위함
  • 업데이트를 일괄처리로 그룹화하면 데이터베이스와 어플리케이션 왕복 횟수가 줄어들어 성능이 향상된다.

 

JdbcBatchItemWriter - write()

  • 일괄처리를 하고 있다.

 

JdbcBatchItemWriterJobConfiguration.java

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcBatchItemWriterJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource; // DataSource DI

    private static final int chunkSize = 10;

    @Bean
    public Job jdbcBatchItemWriterJob() {
        return jobBuilderFactory.get("jdbcBatchItemWriterJob")
                .start(jdbcBatchItemWriterStep())
                .build();
    }

    @Bean
    public Step jdbcBatchItemWriterStep() {
        return stepBuilderFactory.get("jdbcBatchItemWriterStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jdbcBatchItemWriterReader())
                .writer(jdbcBatchItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Pay> jdbcBatchItemWriterReader() {
        return new JdbcCursorItemReaderBuilder<Pay>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                .name("jdbcBatchItemWriter")
                .build();
    }

    /**
     * reader에서 넘어온 데이터를 하나씩 출력하는 writer
     */
    @Bean // beanMapped()을 사용할때는 필수
    public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter() {
        return new JdbcBatchItemWriterBuilder<Pay>()
                .dataSource(dataSource)
                .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
                .beanMapped()
                .build();
    }
}

JdbcBatchItemWriterBuilder의 설정값

Property Prameter Type 설명
assertUpdates boolean - 적어도 하나의 항목이 행을 업데이트하거나 삭제하지 않을 경우 예외를 throw할지 여부를 설정
- 기본 - true
- Exception : EmptyResultDataAccessException
columnMapped - Key, Value 기반으로 Insert SQL의 Values를 매핑
- ex) Map<String, Object>
beanMapped Pojo 기반으로 Insert SQL의 Values를 매핑

⚡ JdbcBatchItemWriter 설정에서 주의할 것.

  • JdbcBatchItemWriter의 제네릭 타입은 Reader에서 넘겨주는 값의 타입이다.

 

columnMapped vs beanMapped

columnMapped 사용시 코드

new JdbcBatchItemWriterBuilder<Map<String, Object>>() // Map 사용
                .columnMapped()
                .dataSource(this.dataSource)
                .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
                .build();

 

차이점

  • columnMapped : Reader에서 Writer로 넘겨주는 타입이 Map<String, Object>
  • beanMapped : Reader에서 Writer로 넘겨주는 타입이 Pay.class와 같은 Pojo 타입

valeus(:field)

  • Dto의 Getter 혹은 Map의 Key에 매핑되어 값이 할당된다.

 

메서드

  1. afterPropertiesSet
  2. InitializingBean
  • JdbcBatchItemWriter, JpaItemWriter, JpaItemWriter등 ItemWriter의 구현체들은 모두 InitializingBean 인터페이스를 구현하고 있다.

 

  • afterPropertiesSet
    • 각각의 Writer들이 실행되기 위해 필요한 필수값들이 제대로 세팅되어있는지 체크
    • writer를 생성하고 메소드를 바로 실행하면 어느 값이 누락되었는지 명확하게 인지할 수 있어서 많이 사용한다.

 

JpaItemWriter

ORM을 사용할 수 있다.

Writer에 전달하는 데이터가 Entity 클래스라면 JpaItemWriter를 사용

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaItemWriterJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private static final int chunkSize = 10;

    @Bean
    public Job jpaItemWriterJob() {
        return jobBuilderFactory.get("jpaItemWriterJob")
                .start(jpaItemWriterStep())
                .build();
    }

    @Bean
    public Step jpaItemWriterStep() {
        return stepBuilderFactory.get("jpaItemWriterStep")
                .<Pay, Pay2>chunk(chunkSize)
                .reader(jpaItemWriterReader())
                .processor(jpaItemProcessor())
                .writer(jpaItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Pay> jpaItemWriterReader() {
        return new JpaPagingItemReaderBuilder<Pay>()
                .name("jpaItemWriterReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Pay p")
                .build();
    }

    @Bean
    public ItemProcessor<Pay, Pay2> jpaItemProcessor() {
        return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
    }

    @Bean
    public JpaItemWriter<Pay2> jpaItemWriter() {
        JpaItemWriter<Pay2> jpaItemWriter = new JpaItemWriter<>();
        jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
        return jpaItemWriter;
    }
}

JPA를 사용하기 때문에 영속성 관리를 위해 EntityManager를 할당해줘야 한다.

 

spring-boot-starter-data-jpa를 의존성에 등록하면 EntityManager가 Bean으로 자동생성된다.

  • DI 코드만 추가해주면 된다.
  • 대신 필수로 설정해야할 값이 Entity Manager 뿐이다.

 

JdbcBatchItemWriter와 다른 점

  • processor 가 추가되었다.
  • Pay Entity를 읽어서 Writer에는 Pay2 Entity를 전달해주기 때문

 

이렇게 데이터 가공할 때 Processor 가 필요하다.

  • JpaItemWriter는 JdbcBatchItemWriter와 달리 넘어온 Entity를 데이터베이스에 반영한다.

 

즉, JpaItemWriter는 Entity 클래스를 제네릭 타입으로 받아야만 한다.

  • JpaItemWriter는 넘어온 Item을 그대로 entityManager.merge()로 테이블에 반영을 하기 때문이다.

 

JpaItemWriter.dowrite()

 

🤲 Custome ItemWriter


Reader와 달리 Writer의 경우에는 Custom 하게 구현해야할 일이 많다.

  • Reader도 있긴하다
    • Querydsl 기반의 ItemReader
    • Jooq 기반의 ItemReader
  • Writer의 경우
    • Reader에서 읽어온 데이터를 RestTemplate으로 외부 API로 전달해야 할 때
    • 여러 Entity를 동시에 save 해야 할 때

공식적으로 지원하지 않는 Writer를 사용하고 싶다면 ItemWriter 인터페이스를 구현하면 된다.

 

@Slf4j
@RequiredArgsConstructor
@Configuration
public class CustomItemWriterJobConfiguration {

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory entityManagerFactory;

  private static final int chunkSize = 10;

  @Bean
  public Job customItemWriterJob() {
    return jobBuilderFactory.get("myCustomItemWriterJob")
        .start(customItemWriterStep())
        .build();
  }

  @Bean
  public Step customItemWriterStep() {
    return stepBuilderFactory.get("myCustomItemWriterStep")
        .<Pay, Pay2>chunk(chunkSize)
        .reader(customItemWriterReader())
        .processor(customItemWriterProcessor())
        .writer(customItemWriter())
        .build();
  }

  @Bean
  public JpaPagingItemReader<Pay> customItemWriterReader() {
    return new JpaPagingItemReaderBuilder<Pay>()
        .name("myCustomItemWriterReader")
        .entityManagerFactory(entityManagerFactory)
        .pageSize(chunkSize)
        .queryString("SELECT p FROM Pay p")
        .build();
  }

  @Bean
  public ItemProcessor<Pay, Pay2> customItemWriterProcessor() {
    return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
  }

  @Bean
  public ItemWriter<Pay2> customItemWriter() {
    return items -> {
      for (Pay2 item : items) {
        System.out.println("==== data : === " + item);
      }
    };
  }
}
  • write() @Override 하면 구현체 생성이 끝난다.

실행

 

참고자료


https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch12s04.html

https://jojoldu.tistory.com/339?category=902551