-
SpringBatch - ItemReader🌱 spring/🚛 spring batch 2022. 12. 16. 15:34
📌 ItemReader
Step은 Tasklet 단위로 처리가 된다.
Tasklet 중에서 ChunkOrientedTasklet 을 통해 Chunk를 처리한다.
이를 구성하는 3요소로 ItemReader, ItemWriter, ItemProcessor 가 있다.
ItemReader 란
- Spring Batch의 Chunk Tasklet의 과정
ItemReader는 데이터를 읽어들인다.
- DB의 데이터만 이야기하는 것이 아니다
- File, XML, JSON 등 다른 데이터 소스를 배치 처리의 입력으로 사용할 수 있다.
- JMS(Java Message Service) 와 같은 다른 유형의 데이터 소스도 지원한다.
또한 Spring Batch에서 지원하지 않는 Reader가 필요한 경우 직접 Reader를 만들어 사용할 수 있다.
Spring Batch의 Reader에서 읽어올 수 있는 데이터 유형
- 입력 데이터
- 파일
- DB
- Java Message Service 등 다른 소스
- 본인이 커스텀한 Reader로 읽어오기
- JdbcPagingItemReader : 가장 대표적인 ItemReader의 구현체
ItemReader 외에도 ItemStream 인터페이스도 함께 구현하고 있다.
ItemReader
public interface ItemReader<T> { @Nullable T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
- 데이터를 읽어오는 메소드만 있다. (read)
ItemStream
public interface ItemStream { void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() throws ItemStreamException; }
IteamStream 인터페이스의 역할
- 주기적으로 상태를 저장하고 오류가 발생하면 해당 상태에서 복원하기 위한 마커 인터페이스이다.
- 배치 프로세스의 실행 컨텍스트와 연계해 ItemReader의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할
open(), close()- 스트림을 열고 닫는다.
update() - Batch 처리의 상태를 업데이트 한다.
개발자는 ItemReader와 ItemStream 인터페이스를 직접 구현해 원하는 형태의 ItemReader를 만들 수 있다.
Database Reader
배치 작업은 많은 양의 데이터를 처리해야 한다.
수백만개의 데이터를 조회하는 쿼리가 있는 경우 해당 데이터를 모두 한 번에 가져오는 것은 힘들다.
Spring JdbcTemplate은 분할처리를 지원하지 않아 개발자가 직접 limit, offset 을 사용하는 등의 작업이 필요하다.
Spring Batch는 이런 문제점을 해결하기 위해 2개의 Reader 타입을 지원
- Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
- Paging 기반 ItemReader 구현체
- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
Cursor 방식은 DB와 커넥션을 맺은 후, Cursor를 하나씩 옮겨 지속적으로 데이터를 가져옴
Paging 방식은 한 번에 pageSize 만큼 데이터를 가져온다.
CursorItemReader
Paging과 다르게 Streaming 으로 데이터를 처리한다.
JdbcCursorItemReader
- Cursor 기반 JDBC Reader 구현체
예시 코드
@ToString @Getter @Setter @NoArgsConstructor @Entity public class Pay { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"); @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private Long amount; private String txName; private LocalDateTime txDateTime; public Pay(Long amount, String txName, String txDateTime) { this.amount = amount; this.txName = txName; this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER); } public Pay(Long id, Long amount, String txName, String txDateTime) { this.id = id; this.amount = amount; this.txName = txName; this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER); } } @Slf4j @RequiredArgsConstructor @Configuration public class JdbcCursorItemReaderJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final DataSource dataSource; // DataSource DI private static final int chunkSize = 10; @Bean public Job jdbcCursorItemReaderJob() { return jobBuilderFactory.get("jdbcCursorItemReaderJob") .start(jdbcCursorItemReaderStep()) .build(); } @Bean public Step jdbcCursorItemReaderStep() { return stepBuilderFactory.get("jdbcCursorItemReaderStep") .<Pay, Pay>chunk(chunkSize) .reader(jdbcCursorItemReader()) .writer(jdbcCursorItemWriter()) .build(); } @Bean public JdbcCursorItemReader<Pay> jdbcCursorItemReader() { return new JdbcCursorItemReaderBuilder<Pay>() .fetchSize(chunkSize) .dataSource(dataSource) .rowMapper(new BeanPropertyRowMapper<>(Pay.class)) .sql("SELECT id, amount, tx_name, tx_date_time FROM pay") .name("jdbcCursorItemReader") .build(); } private ItemWriter<Pay> jdbcCursorItemWriter() { return list -> { for (Pay pay: list) { log.info("Current Pay={}", pay); } }; } }
- chunk
- <Pay, Pay>
- 첫 번째 Pay : Reader에서 반환할 타입
- 두 번째 Pay : Writer에 파라미터로 넘어올 타입
- chunkSize - 트랜잭션 범위
- <Pay, Pay>
- fetchSize
- DB에서 한번에 가져올 데이터 양
- Paging과 다름
- Paging은 실제 쿼리를 limit, offset을 이용해서 분할 처리
- 반면, cursor는 분할 처리 없이 실행되나 내부적으로 가져오는 데이터는 fetchSize 만큼 가져와 read()를 통해 하나씩 가져온다.
- dataSource
- DB에 접근하기 위해 사용할 DataSource 객체 할당
- rowMapper
- 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
- sql
- Reader로 사용할 쿼리문을 사용
- name
- reader의 이름을 지정
- Bean의 이름이 아니다
- Spring Batch의 ExecutionContext에서 저장되어질 이름
ItemReader의 가장 큰 장점 : 데이터를 Streaming 할 수 있다는 것
- read() 메소드는 데이터를 하나씩 가져와 ItemWriter로 데이터를 전달하고 다음 데이터를 가져온다.
- 이를 통해 reader - processor - writer 가 chunk 단위로 수행되고 주기적으로 commit 된다.
- 이는 고성능 배치 처리에서 핵심이다
테스트
쿼리 실행
create table pay ( id bigint not null auto_increment, amount bigint, tx_name varchar(255), tx_date_time datetime, primary key (id) ) engine = InnoDB; insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00'); insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00'); insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00'); insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');
application.yml
logging.level.org.springframework.batch:DEBUG
- Reader에서 쿼리가 어떻게 생성되고 실행되는지 확인하기 위해 log level 변경
배치 실행
writer에 명시한대로 데이터를 출력한다.
CursorItemReader의 주의사항
Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용된다
- Batch가 끝나기전에 Database와 애플리케이션의 Connection이 먼저 끊어질 수 있다
따라서 Database와 SocketTimeout을 충분히 큰 값으로 설정해야만 한다.
Batch 수행시간이 오래 걸리는 경우에는 PagingItemReader를 사용하는게 낫다.
- Paging의 경우 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있다.
PagingItemReader
Spring Batch에서는 offset 과 limit을 pageSize에 맞게 자동으로 생성해준다.
다만 각 쿼리는 개별적으로 실행한다
- 각 페이지마다 새로운 쿼리를 실행하므로 페이징시 결과를 정렬하는 것이 중요하다.
- order By 권장됨.
JdbcPagingItemReader
JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader
@Slf4j @RequiredArgsConstructor @Configuration public class JdbcPagingItemReaderJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final DataSource dataSource; // DataSource DI private static final int chunkSize = 10; @Bean public Job jdbcPagingItemReaderJob() throws Exception { return jobBuilderFactory.get("jdbcPagingItemReaderJob") .start(jdbcPagingItemReaderStep()) .build(); } @Bean public Step jdbcPagingItemReaderStep() throws Exception { return stepBuilderFactory.get("jdbcPagingItemReaderStep") .<Pay, Pay>chunk(chunkSize) .reader(jdbcPagingItemReader()) .writer(jdbcPagingItemWriter()) .build(); } @Bean public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception { Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put("amount", 2000); return new JdbcPagingItemReaderBuilder<Pay>() .pageSize(chunkSize) .fetchSize(chunkSize) .dataSource(dataSource) .rowMapper(new BeanPropertyRowMapper<>(Pay.class)) .queryProvider(createQueryProvider()) .parameterValues(parameterValues) .name("jdbcPagingItemReader") .build(); } private ItemWriter<Pay> jdbcPagingItemWriter() { return list -> { for (Pay pay: list) { log.info("Current Pay={}", pay); } }; } @Bean public PagingQueryProvider createQueryProvider() throws Exception { SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean(); queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해 queryProvider.setSelectClause("id, amount, tx_name, tx_date_time"); queryProvider.setFromClause("from pay"); queryProvider.setWhereClause("where amount >= :amount"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); return queryProvider.getObject(); } }
JdbcCursorItemReader와 다른 점
- 쿼리 - createQueryProvider()
JdbcCursorItemReader를 사용할 때는 단순히 String 타입으로 쿼리를 생성했다.
하지만 PagingItemReader에서는 PagingQueryProvider 를 통해 쿼리를 생성한다.
- why❓
- ❗각 Database에는 paging을 지원하는 자체적인 전략들이 있다.
- 때문에 Spring Batch에는 각 DB의 paging 전략에 맞춰 구현되어야 한다.
- 그래서 Database에 맞는 Provider들이 존재한다.
하지만 이러면 Database마다 Provider 코드를 바꿔야해 불편하다
그래서 Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 Datasource 설정 값을 보고 provider중 하나를 자동으로 선택하도록 한다.
배치 실행
- 쿼리 로그에 LIMIT 10 이 들어갔다.
- JdbcPagingItemReader에 선언된 pageSize 에 맞게 자동으로 쿼리에 추가해줬기 때문
JpaPagingItemReader
Spring Batch가 JPA를 지원하기 위해 JpaPagingItemReader를 지원한다.
JPA 와 Hibernate는 많은 유사점이 있다.
- 하지만 Hibernate는 Cursor가 지원되지만
- JPA에는 Cursor 기반 Database 접근을 지원하지 않는다.
@Slf4j @RequiredArgsConstructor @Configuration public class JpaPagingItemReaderJobConfiguration { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final EntityManagerFactory entityManagerFactory; private int chunkSize = 10; @Bean public Job jpaPagingItemReaderJob() { return jobBuilderFactory.get("jpaPagingItemReaderJob") .start(jpaPagingItemReaderStep()) .build(); } @Bean public Step jpaPagingItemReaderStep() { return stepBuilderFactory.get("jpaPagingItemReaderStep") .<Pay, Pay>chunk(chunkSize) .reader(jpaPagingItemReader()) .writer(jpaPagingItemWriter()) .build(); } @Bean public JpaPagingItemReader<Pay> jpaPagingItemReader() { return new JpaPagingItemReaderBuilder<Pay>() .name("jpaPagingItemReader") .entityManagerFactory(entityManagerFactory) .pageSize(chunkSize) .queryString("SELECT p FROM Pay p WHERE amount >= 2000") .build(); } private ItemWriter<Pay> jpaPagingItemWriter() { return list -> { for (Pay pay: list) { log.info("Current Pay={}", pay); } }; } }
- JdbcPagingItemReader와 달리 EntityManagerFactory를 지정해준다.
실행
PagingItemReader 주의사항
정렬(Order) 가 무조건 포함되어 있어야 한다.
참고자료
https://jojoldu.tistory.com/336?category=902551
https://renuevo.github.io/spring/batch/spring-batch-chapter-3/
https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/ItemReader.html
'🌱 spring > 🚛 spring batch' 카테고리의 다른 글
Spring Batch - ItemProcessor (0) 2022.12.16 Spring Batch - ItemWriter (0) 2022.12.16 Spring Batch - Chunk 지향 처리 (1) 2022.12.16 Spring Batch - Scope (0) 2022.12.16 Spring Batch - Job Flow (0) 2022.12.16