Spring Batch - ItemWriter
📌 ItemWriter
ItemWriter는 SpringBatch에서 사용하는 출력 기능이다.
Spring Batch가 처음 나왔을 때, ItemWritersms ItemReader와 마찬가지로 item을 하나씩 다루었다.
- 업데이트 이후 부터 ItemWriter는 item 하나를 작성하지 않고 Chunk 단위로 묶인 item List를 다룬다.
- Reader 의 read()는 Item 하나를 반환하는 반면, Writer의 write()는 인자로 Item List를 받는다.
과정
- ItemReader로 통해 각 항목을 개별적으로 읽고 이를 처리하기 위해 ItemProcessor에 전달
- 이 과정은 chunk의 Item 개수만큼 처리될 때 까지 계속된다.
- 청크 단위만큼 처리가 완료되면 Writer에 전달되어 Writer에 명시되어 있는대로 일괄처리한다.
즉, Reader와 Processor를 거쳐 처리된 Item을 Chunk 크기 만큼 쌓은 뒤 이를 Writer에 전달
SpringBatch는 다양한 Output 타입을 처리할 수 있도록 많은 Writer를 제공한다.
✍️ Database Writer
Java에서 JDBC 또는 ORM을 통해 RDBMS에 접근한다.
Spring Batch는 JDBC와 ORM 모두 Writer를 제공한다.
Writer는 Chunk 단위의 마지막 단계이다.
- 따라서 Database의 영속성과 관련해서는 항상 마지막에 flush를 해주어야 한다.
예 : 영속성을 사용하는 JPA, Hibernate의 경우 ItemWriter 구현체에서는 flush() 와 session.clear()가 따라온다.
JpaItemWriter - write()
- entityManager.flush()
HibernateItemWriter
- sessionFactory.getCurrentSession().clear()
Writer가 받은 모든 Item이 처리 된 후, Spring Batch는 현재 트랜잭션을 커밋한다.
Database와 관련된 Writer는 3가지 가 있다.
- JdbcBatchItemWriter
- HibernateItemWriter
- JpaItemWriter
JdbcBatchItemWriter
fORM 을 사용하지 않는 경우 Writer는 대부분 JdbcBatchItemWriter를 사용한다.
JdbcBatchItemWriter는 JDBC의 Batch 기능을 사용하여 한 번에 Database로 전달하여 Database 내부에서 쿼리들이 실행되도록 한다.
이렇게 처리하는 이유
- 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 횟수를 최소화하여 성능 향상을 꾀하기 위함
- 업데이트를 일괄처리로 그룹화하면 데이터베이스와 어플리케이션 왕복 횟수가 줄어들어 성능이 향상된다.
JdbcBatchItemWriter - write()
- 일괄처리를 하고 있다.
JdbcBatchItemWriterJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcBatchItemWriterJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcBatchItemWriterJob() {
return jobBuilderFactory.get("jdbcBatchItemWriterJob")
.start(jdbcBatchItemWriterStep())
.build();
}
@Bean
public Step jdbcBatchItemWriterStep() {
return stepBuilderFactory.get("jdbcBatchItemWriterStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jdbcBatchItemWriterReader())
.writer(jdbcBatchItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Pay> jdbcBatchItemWriterReader() {
return new JdbcCursorItemReaderBuilder<Pay>()
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
.name("jdbcBatchItemWriter")
.build();
}
/**
* reader에서 넘어온 데이터를 하나씩 출력하는 writer
*/
@Bean // beanMapped()을 사용할때는 필수
public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Pay>()
.dataSource(dataSource)
.sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
.beanMapped()
.build();
}
}
JdbcBatchItemWriterBuilder의 설정값
Property | Prameter Type | 설명 |
assertUpdates | boolean | - 적어도 하나의 항목이 행을 업데이트하거나 삭제하지 않을 경우 예외를 throw할지 여부를 설정 - 기본 - true - Exception : EmptyResultDataAccessException |
columnMapped | ❌ | - Key, Value 기반으로 Insert SQL의 Values를 매핑 - ex) Map<String, Object> |
beanMapped | ❌ | Pojo 기반으로 Insert SQL의 Values를 매핑 |
⚡ JdbcBatchItemWriter 설정에서 주의할 것.
- JdbcBatchItemWriter의 제네릭 타입은 Reader에서 넘겨주는 값의 타입이다.
columnMapped vs beanMapped
columnMapped 사용시 코드
new JdbcBatchItemWriterBuilder<Map<String, Object>>() // Map 사용
.columnMapped()
.dataSource(this.dataSource)
.sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
.build();
차이점
- columnMapped : Reader에서 Writer로 넘겨주는 타입이 Map<String, Object>
- beanMapped : Reader에서 Writer로 넘겨주는 타입이 Pay.class와 같은 Pojo 타입
valeus(:field)
- Dto의 Getter 혹은 Map의 Key에 매핑되어 값이 할당된다.
메서드
- afterPropertiesSet
- InitializingBean
- JdbcBatchItemWriter, JpaItemWriter, JpaItemWriter등 ItemWriter의 구현체들은 모두 InitializingBean 인터페이스를 구현하고 있다.
- afterPropertiesSet
- 각각의 Writer들이 실행되기 위해 필요한 필수값들이 제대로 세팅되어있는지 체크
- writer를 생성하고 메소드를 바로 실행하면 어느 값이 누락되었는지 명확하게 인지할 수 있어서 많이 사용한다.
JpaItemWriter
ORM을 사용할 수 있다.
Writer에 전달하는 데이터가 Entity 클래스라면 JpaItemWriter를 사용
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaItemWriterJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private static final int chunkSize = 10;
@Bean
public Job jpaItemWriterJob() {
return jobBuilderFactory.get("jpaItemWriterJob")
.start(jpaItemWriterStep())
.build();
}
@Bean
public Step jpaItemWriterStep() {
return stepBuilderFactory.get("jpaItemWriterStep")
.<Pay, Pay2>chunk(chunkSize)
.reader(jpaItemWriterReader())
.processor(jpaItemProcessor())
.writer(jpaItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> jpaItemWriterReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("jpaItemWriterReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p")
.build();
}
@Bean
public ItemProcessor<Pay, Pay2> jpaItemProcessor() {
return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
}
@Bean
public JpaItemWriter<Pay2> jpaItemWriter() {
JpaItemWriter<Pay2> jpaItemWriter = new JpaItemWriter<>();
jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
return jpaItemWriter;
}
}
JPA를 사용하기 때문에 영속성 관리를 위해 EntityManager를 할당해줘야 한다.
spring-boot-starter-data-jpa를 의존성에 등록하면 EntityManager가 Bean으로 자동생성된다.
- DI 코드만 추가해주면 된다.
- 대신 필수로 설정해야할 값이 Entity Manager 뿐이다.
JdbcBatchItemWriter와 다른 점
- processor 가 추가되었다.
- Pay Entity를 읽어서 Writer에는 Pay2 Entity를 전달해주기 때문
이렇게 데이터 가공할 때 Processor 가 필요하다.
- JpaItemWriter는 JdbcBatchItemWriter와 달리 넘어온 Entity를 데이터베이스에 반영한다.
즉, JpaItemWriter는 Entity 클래스를 제네릭 타입으로 받아야만 한다.
- JpaItemWriter는 넘어온 Item을 그대로 entityManager.merge()로 테이블에 반영을 하기 때문이다.
JpaItemWriter.dowrite()
🤲 Custome ItemWriter
Reader와 달리 Writer의 경우에는 Custom 하게 구현해야할 일이 많다.
- Reader도 있긴하다
- Querydsl 기반의 ItemReader
- Jooq 기반의 ItemReader
- Writer의 경우
- Reader에서 읽어온 데이터를 RestTemplate으로 외부 API로 전달해야 할 때
- 여러 Entity를 동시에 save 해야 할 때
공식적으로 지원하지 않는 Writer를 사용하고 싶다면 ItemWriter 인터페이스를 구현하면 된다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class CustomItemWriterJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private static final int chunkSize = 10;
@Bean
public Job customItemWriterJob() {
return jobBuilderFactory.get("myCustomItemWriterJob")
.start(customItemWriterStep())
.build();
}
@Bean
public Step customItemWriterStep() {
return stepBuilderFactory.get("myCustomItemWriterStep")
.<Pay, Pay2>chunk(chunkSize)
.reader(customItemWriterReader())
.processor(customItemWriterProcessor())
.writer(customItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> customItemWriterReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("myCustomItemWriterReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p")
.build();
}
@Bean
public ItemProcessor<Pay, Pay2> customItemWriterProcessor() {
return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
}
@Bean
public ItemWriter<Pay2> customItemWriter() {
return items -> {
for (Pay2 item : items) {
System.out.println("==== data : === " + item);
}
};
}
}
- write() 만 @Override 하면 구현체 생성이 끝난다.
실행
참고자료
https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch12s04.html