SpringBatch - ItemReader
📌 ItemReader
Step은 Tasklet 단위로 처리가 된다.
Tasklet 중에서 ChunkOrientedTasklet 을 통해 Chunk를 처리한다.
이를 구성하는 3요소로 ItemReader, ItemWriter, ItemProcessor 가 있다.
ItemReader 란
- Spring Batch의 Chunk Tasklet의 과정
ItemReader는 데이터를 읽어들인다.
- DB의 데이터만 이야기하는 것이 아니다
- File, XML, JSON 등 다른 데이터 소스를 배치 처리의 입력으로 사용할 수 있다.
- JMS(Java Message Service) 와 같은 다른 유형의 데이터 소스도 지원한다.
또한 Spring Batch에서 지원하지 않는 Reader가 필요한 경우 직접 Reader를 만들어 사용할 수 있다.
Spring Batch의 Reader에서 읽어올 수 있는 데이터 유형
- 입력 데이터
- 파일
- DB
- Java Message Service 등 다른 소스
- 본인이 커스텀한 Reader로 읽어오기
- JdbcPagingItemReader : 가장 대표적인 ItemReader의 구현체
ItemReader 외에도 ItemStream 인터페이스도 함께 구현하고 있다.
ItemReader
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
- 데이터를 읽어오는 메소드만 있다. (read)
ItemStream
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
IteamStream 인터페이스의 역할
- 주기적으로 상태를 저장하고 오류가 발생하면 해당 상태에서 복원하기 위한 마커 인터페이스이다.
- 배치 프로세스의 실행 컨텍스트와 연계해 ItemReader의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할
open(), close()- 스트림을 열고 닫는다.
update() - Batch 처리의 상태를 업데이트 한다.
개발자는 ItemReader와 ItemStream 인터페이스를 직접 구현해 원하는 형태의 ItemReader를 만들 수 있다.
Database Reader
배치 작업은 많은 양의 데이터를 처리해야 한다.
수백만개의 데이터를 조회하는 쿼리가 있는 경우 해당 데이터를 모두 한 번에 가져오는 것은 힘들다.
Spring JdbcTemplate은 분할처리를 지원하지 않아 개발자가 직접 limit, offset 을 사용하는 등의 작업이 필요하다.
Spring Batch는 이런 문제점을 해결하기 위해 2개의 Reader 타입을 지원
- Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
- Paging 기반 ItemReader 구현체
- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
Cursor 방식은 DB와 커넥션을 맺은 후, Cursor를 하나씩 옮겨 지속적으로 데이터를 가져옴
Paging 방식은 한 번에 pageSize 만큼 데이터를 가져온다.
CursorItemReader
Paging과 다르게 Streaming 으로 데이터를 처리한다.
JdbcCursorItemReader
- Cursor 기반 JDBC Reader 구현체
예시 코드
@ToString
@Getter
@Setter
@NoArgsConstructor
@Entity
public class Pay {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long amount;
private String txName;
private LocalDateTime txDateTime;
public Pay(Long amount, String txName, String txDateTime) {
this.amount = amount;
this.txName = txName;
this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
}
public Pay(Long id, Long amount, String txName, String txDateTime) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
}
}
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcCursorItemReaderJob() {
return jobBuilderFactory.get("jdbcCursorItemReaderJob")
.start(jdbcCursorItemReaderStep())
.build();
}
@Bean
public Step jdbcCursorItemReaderStep() {
return stepBuilderFactory.get("jdbcCursorItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jdbcCursorItemReader())
.writer(jdbcCursorItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Pay>()
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
.name("jdbcCursorItemReader")
.build();
}
private ItemWriter<Pay> jdbcCursorItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
}
- chunk
- <Pay, Pay>
- 첫 번째 Pay : Reader에서 반환할 타입
- 두 번째 Pay : Writer에 파라미터로 넘어올 타입
- chunkSize - 트랜잭션 범위
- <Pay, Pay>
- fetchSize
- DB에서 한번에 가져올 데이터 양
- Paging과 다름
- Paging은 실제 쿼리를 limit, offset을 이용해서 분할 처리
- 반면, cursor는 분할 처리 없이 실행되나 내부적으로 가져오는 데이터는 fetchSize 만큼 가져와 read()를 통해 하나씩 가져온다.
- dataSource
- DB에 접근하기 위해 사용할 DataSource 객체 할당
- rowMapper
- 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
- sql
- Reader로 사용할 쿼리문을 사용
- name
- reader의 이름을 지정
- Bean의 이름이 아니다
- Spring Batch의 ExecutionContext에서 저장되어질 이름
ItemReader의 가장 큰 장점 : 데이터를 Streaming 할 수 있다는 것
- read() 메소드는 데이터를 하나씩 가져와 ItemWriter로 데이터를 전달하고 다음 데이터를 가져온다.
- 이를 통해 reader - processor - writer 가 chunk 단위로 수행되고 주기적으로 commit 된다.
- 이는 고성능 배치 처리에서 핵심이다
테스트
쿼리 실행
create table pay (
id bigint not null auto_increment,
amount bigint,
tx_name varchar(255),
tx_date_time datetime,
primary key (id)
) engine = InnoDB;
insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');
application.yml
logging.level.org.springframework.batch:DEBUG
- Reader에서 쿼리가 어떻게 생성되고 실행되는지 확인하기 위해 log level 변경
배치 실행
writer에 명시한대로 데이터를 출력한다.
CursorItemReader의 주의사항
Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용된다
- Batch가 끝나기전에 Database와 애플리케이션의 Connection이 먼저 끊어질 수 있다
따라서 Database와 SocketTimeout을 충분히 큰 값으로 설정해야만 한다.
Batch 수행시간이 오래 걸리는 경우에는 PagingItemReader를 사용하는게 낫다.
- Paging의 경우 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있다.
PagingItemReader
Spring Batch에서는 offset 과 limit을 pageSize에 맞게 자동으로 생성해준다.
다만 각 쿼리는 개별적으로 실행한다
- 각 페이지마다 새로운 쿼리를 실행하므로 페이징시 결과를 정렬하는 것이 중요하다.
- order By 권장됨.
JdbcPagingItemReader
JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcPagingItemReaderJob() throws Exception {
return jobBuilderFactory.get("jdbcPagingItemReaderJob")
.start(jdbcPagingItemReaderStep())
.build();
}
@Bean
public Step jdbcPagingItemReaderStep() throws Exception {
return stepBuilderFactory.get("jdbcPagingItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jdbcPagingItemReader())
.writer(jdbcPagingItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("amount", 2000);
return new JdbcPagingItemReaderBuilder<Pay>()
.pageSize(chunkSize)
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.queryProvider(createQueryProvider())
.parameterValues(parameterValues)
.name("jdbcPagingItemReader")
.build();
}
private ItemWriter<Pay> jdbcPagingItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
@Bean
public PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
queryProvider.setFromClause("from pay");
queryProvider.setWhereClause("where amount >= :amount");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
}
JdbcCursorItemReader와 다른 점
- 쿼리 - createQueryProvider()
JdbcCursorItemReader를 사용할 때는 단순히 String 타입으로 쿼리를 생성했다.
하지만 PagingItemReader에서는 PagingQueryProvider 를 통해 쿼리를 생성한다.
- why❓
- ❗각 Database에는 paging을 지원하는 자체적인 전략들이 있다.
- 때문에 Spring Batch에는 각 DB의 paging 전략에 맞춰 구현되어야 한다.
- 그래서 Database에 맞는 Provider들이 존재한다.
하지만 이러면 Database마다 Provider 코드를 바꿔야해 불편하다
그래서 Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 Datasource 설정 값을 보고 provider중 하나를 자동으로 선택하도록 한다.
배치 실행
- 쿼리 로그에 LIMIT 10 이 들어갔다.
- JdbcPagingItemReader에 선언된 pageSize 에 맞게 자동으로 쿼리에 추가해줬기 때문
JpaPagingItemReader
Spring Batch가 JPA를 지원하기 위해 JpaPagingItemReader를 지원한다.
JPA 와 Hibernate는 많은 유사점이 있다.
- 하지만 Hibernate는 Cursor가 지원되지만
- JPA에는 Cursor 기반 Database 접근을 지원하지 않는다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private int chunkSize = 10;
@Bean
public Job jpaPagingItemReaderJob() {
return jobBuilderFactory.get("jpaPagingItemReaderJob")
.start(jpaPagingItemReaderStep())
.build();
}
@Bean
public Step jpaPagingItemReaderStep() {
return stepBuilderFactory.get("jpaPagingItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jpaPagingItemReader())
.writer(jpaPagingItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> jpaPagingItemReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p WHERE amount >= 2000")
.build();
}
private ItemWriter<Pay> jpaPagingItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
}
- JdbcPagingItemReader와 달리 EntityManagerFactory를 지정해준다.
실행
PagingItemReader 주의사항
정렬(Order) 가 무조건 포함되어 있어야 한다.
참고자료
https://jojoldu.tistory.com/336?category=902551
https://renuevo.github.io/spring/batch/spring-batch-chapter-3/
https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/ItemReader.html