-
Spring Batch - ItemWriter🌱 spring/🚛 spring batch 2022. 12. 16. 15:45
📌 ItemWriter
ItemWriter는 SpringBatch에서 사용하는 출력 기능이다.
Spring Batch가 처음 나왔을 때, ItemWritersms ItemReader와 마찬가지로 item을 하나씩 다루었다.
- 업데이트 이후 부터 ItemWriter는 item 하나를 작성하지 않고 Chunk 단위로 묶인 item List를 다룬다.
- Reader 의 read()는 Item 하나를 반환하는 반면, Writer의 write()는 인자로 Item List를 받는다.
과정
- ItemReader로 통해 각 항목을 개별적으로 읽고 이를 처리하기 위해 ItemProcessor에 전달
- 이 과정은 chunk의 Item 개수만큼 처리될 때 까지 계속된다.
- 청크 단위만큼 처리가 완료되면 Writer에 전달되어 Writer에 명시되어 있는대로 일괄처리한다.
즉, Reader와 Processor를 거쳐 처리된 Item을 Chunk 크기 만큼 쌓은 뒤 이를 Writer에 전달
SpringBatch는 다양한 Output 타입을 처리할 수 있도록 많은 Writer를 제공한다.
✍️ Database Writer
Java에서 JDBC 또는 ORM을 통해 RDBMS에 접근한다.
Spring Batch는 JDBC와 ORM 모두 Writer를 제공한다.
Writer는 Chunk 단위의 마지막 단계이다.
- 따라서 Database의 영속성과 관련해서는 항상 마지막에 flush를 해주어야 한다.
예 : 영속성을 사용하는 JPA, Hibernate의 경우 ItemWriter 구현체에서는 flush() 와 session.clear()가 따라온다.
JpaItemWriter - write()
- entityManager.flush()
HibernateItemWriter
- sessionFactory.getCurrentSession().clear()
Writer가 받은 모든 Item이 처리 된 후, Spring Batch는 현재 트랜잭션을 커밋한다.
Database와 관련된 Writer는 3가지 가 있다.
- JdbcBatchItemWriter
- HibernateItemWriter
- JpaItemWriter
JdbcBatchItemWriter
fORM 을 사용하지 않는 경우 Writer는 대부분 JdbcBatchItemWriter를 사용한다.
JdbcBatchItemWriter는 JDBC의 Batch 기능을 사용하여 한 번에 Database로 전달하여 Database 내부에서 쿼리들이 실행되도록 한다.
이렇게 처리하는 이유
- 어플리케이션과 데이터베이스 간에 데이터를 주고 받는 횟수를 최소화하여 성능 향상을 꾀하기 위함
- 업데이트를 일괄처리로 그룹화하면 데이터베이스와 어플리케이션 왕복 횟수가 줄어들어 성능이 향상된다.
JdbcBatchItemWriter - write()
- 일괄처리를 하고 있다.
JdbcBatchItemWriterJobConfiguration.java
@Slf4j @RequiredArgsConstructor @Configuration public class JdbcBatchItemWriterJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final DataSource dataSource; // DataSource DI private static final int chunkSize = 10; @Bean public Job jdbcBatchItemWriterJob() { return jobBuilderFactory.get("jdbcBatchItemWriterJob") .start(jdbcBatchItemWriterStep()) .build(); } @Bean public Step jdbcBatchItemWriterStep() { return stepBuilderFactory.get("jdbcBatchItemWriterStep") .<Pay, Pay>chunk(chunkSize) .reader(jdbcBatchItemWriterReader()) .writer(jdbcBatchItemWriter()) .build(); } @Bean public JdbcCursorItemReader<Pay> jdbcBatchItemWriterReader() { return new JdbcCursorItemReaderBuilder<Pay>() .fetchSize(chunkSize) .dataSource(dataSource) .rowMapper(new BeanPropertyRowMapper<>(Pay.class)) .sql("SELECT id, amount, tx_name, tx_date_time FROM pay") .name("jdbcBatchItemWriter") .build(); } /** * reader에서 넘어온 데이터를 하나씩 출력하는 writer */ @Bean // beanMapped()을 사용할때는 필수 public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter() { return new JdbcBatchItemWriterBuilder<Pay>() .dataSource(dataSource) .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)") .beanMapped() .build(); } }
JdbcBatchItemWriterBuilder의 설정값
Property Prameter Type 설명 assertUpdates boolean - 적어도 하나의 항목이 행을 업데이트하거나 삭제하지 않을 경우 예외를 throw할지 여부를 설정
- 기본 - true
- Exception : EmptyResultDataAccessExceptioncolumnMapped ❌ - Key, Value 기반으로 Insert SQL의 Values를 매핑
- ex) Map<String, Object>beanMapped ❌ Pojo 기반으로 Insert SQL의 Values를 매핑 ⚡ JdbcBatchItemWriter 설정에서 주의할 것.
- JdbcBatchItemWriter의 제네릭 타입은 Reader에서 넘겨주는 값의 타입이다.
columnMapped vs beanMapped
columnMapped 사용시 코드
new JdbcBatchItemWriterBuilder<Map<String, Object>>() // Map 사용 .columnMapped() .dataSource(this.dataSource) .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)") .build();
차이점
- columnMapped : Reader에서 Writer로 넘겨주는 타입이 Map<String, Object>
- beanMapped : Reader에서 Writer로 넘겨주는 타입이 Pay.class와 같은 Pojo 타입
valeus(:field)
- Dto의 Getter 혹은 Map의 Key에 매핑되어 값이 할당된다.
메서드
- afterPropertiesSet
- InitializingBean
- JdbcBatchItemWriter, JpaItemWriter, JpaItemWriter등 ItemWriter의 구현체들은 모두 InitializingBean 인터페이스를 구현하고 있다.
- afterPropertiesSet
- 각각의 Writer들이 실행되기 위해 필요한 필수값들이 제대로 세팅되어있는지 체크
- writer를 생성하고 메소드를 바로 실행하면 어느 값이 누락되었는지 명확하게 인지할 수 있어서 많이 사용한다.
JpaItemWriter
ORM을 사용할 수 있다.
Writer에 전달하는 데이터가 Entity 클래스라면 JpaItemWriter를 사용
@Slf4j @RequiredArgsConstructor @Configuration public class JpaItemWriterJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory entityManagerFactory; private static final int chunkSize = 10; @Bean public Job jpaItemWriterJob() { return jobBuilderFactory.get("jpaItemWriterJob") .start(jpaItemWriterStep()) .build(); } @Bean public Step jpaItemWriterStep() { return stepBuilderFactory.get("jpaItemWriterStep") .<Pay, Pay2>chunk(chunkSize) .reader(jpaItemWriterReader()) .processor(jpaItemProcessor()) .writer(jpaItemWriter()) .build(); } @Bean public JpaPagingItemReader<Pay> jpaItemWriterReader() { return new JpaPagingItemReaderBuilder<Pay>() .name("jpaItemWriterReader") .entityManagerFactory(entityManagerFactory) .pageSize(chunkSize) .queryString("SELECT p FROM Pay p") .build(); } @Bean public ItemProcessor<Pay, Pay2> jpaItemProcessor() { return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime()); } @Bean public JpaItemWriter<Pay2> jpaItemWriter() { JpaItemWriter<Pay2> jpaItemWriter = new JpaItemWriter<>(); jpaItemWriter.setEntityManagerFactory(entityManagerFactory); return jpaItemWriter; } }
JPA를 사용하기 때문에 영속성 관리를 위해 EntityManager를 할당해줘야 한다.
spring-boot-starter-data-jpa를 의존성에 등록하면 EntityManager가 Bean으로 자동생성된다.
- DI 코드만 추가해주면 된다.
- 대신 필수로 설정해야할 값이 Entity Manager 뿐이다.
JdbcBatchItemWriter와 다른 점
- processor 가 추가되었다.
- Pay Entity를 읽어서 Writer에는 Pay2 Entity를 전달해주기 때문
이렇게 데이터 가공할 때 Processor 가 필요하다.
- JpaItemWriter는 JdbcBatchItemWriter와 달리 넘어온 Entity를 데이터베이스에 반영한다.
즉, JpaItemWriter는 Entity 클래스를 제네릭 타입으로 받아야만 한다.
- JpaItemWriter는 넘어온 Item을 그대로 entityManager.merge()로 테이블에 반영을 하기 때문이다.
JpaItemWriter.dowrite()
🤲 Custome ItemWriter
Reader와 달리 Writer의 경우에는 Custom 하게 구현해야할 일이 많다.
- Reader도 있긴하다
- Querydsl 기반의 ItemReader
- Jooq 기반의 ItemReader
- Writer의 경우
- Reader에서 읽어온 데이터를 RestTemplate으로 외부 API로 전달해야 할 때
- 여러 Entity를 동시에 save 해야 할 때
공식적으로 지원하지 않는 Writer를 사용하고 싶다면 ItemWriter 인터페이스를 구현하면 된다.
@Slf4j @RequiredArgsConstructor @Configuration public class CustomItemWriterJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory entityManagerFactory; private static final int chunkSize = 10; @Bean public Job customItemWriterJob() { return jobBuilderFactory.get("myCustomItemWriterJob") .start(customItemWriterStep()) .build(); } @Bean public Step customItemWriterStep() { return stepBuilderFactory.get("myCustomItemWriterStep") .<Pay, Pay2>chunk(chunkSize) .reader(customItemWriterReader()) .processor(customItemWriterProcessor()) .writer(customItemWriter()) .build(); } @Bean public JpaPagingItemReader<Pay> customItemWriterReader() { return new JpaPagingItemReaderBuilder<Pay>() .name("myCustomItemWriterReader") .entityManagerFactory(entityManagerFactory) .pageSize(chunkSize) .queryString("SELECT p FROM Pay p") .build(); } @Bean public ItemProcessor<Pay, Pay2> customItemWriterProcessor() { return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime()); } @Bean public ItemWriter<Pay2> customItemWriter() { return items -> { for (Pay2 item : items) { System.out.println("==== data : === " + item); } }; } }
- write() 만 @Override 하면 구현체 생성이 끝난다.
실행
참고자료
https://docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch12s04.html
'🌱 spring > 🚛 spring batch' 카테고리의 다른 글
Quartz (1) (0) 2022.12.22 Spring Batch - ItemProcessor (0) 2022.12.16 SpringBatch - ItemReader (0) 2022.12.16 Spring Batch - Chunk 지향 처리 (1) 2022.12.16 Spring Batch - Scope (0) 2022.12.16