ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • SpringBatch - ItemReader
    🌱 spring/🚛 spring batch 2022. 12. 16. 15:34

    📌 ItemReader


    Step은 Tasklet 단위로 처리가 된다.

    Tasklet 중에서 ChunkOrientedTasklet 을 통해 Chunk를 처리한다.

    이를 구성하는 3요소로 ItemReader, ItemWriter, ItemProcessor 가 있다.

     

    ItemReader 란

    • Spring Batch의 Chunk Tasklet의 과정

     

    ItemReader는 데이터를 읽어들인다.

    • DB의 데이터만 이야기하는 것이 아니다
    • File, XML, JSON 등 다른 데이터 소스를 배치 처리의 입력으로 사용할 수 있다.
    • JMS(Java Message Service) 와 같은 다른 유형의 데이터 소스도 지원한다.

    또한 Spring Batch에서 지원하지 않는 Reader가 필요한 경우 직접 Reader를 만들어 사용할 수 있다.

    Spring Batch의 Reader에서 읽어올 수 있는 데이터 유형

    • 입력 데이터
    • 파일
    • DB
    • Java Message Service 등 다른 소스
    • 본인이 커스텀한 Reader로 읽어오기

     

    • JdbcPagingItemReader : 가장 대표적인 ItemReader의 구현체

    ItemReader 외에도 ItemStream 인터페이스도 함께 구현하고 있다.

     

    ItemReader

    public interface ItemReader<T> {
    	@Nullable
    	T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
    
    }
    • 데이터를 읽어오는 메소드만 있다. (read)

     

    ItemStream

    public interface ItemStream {
    
    	void open(ExecutionContext executionContext) throws ItemStreamException;
    
    	void update(ExecutionContext executionContext) throws ItemStreamException;
    
    	void close() throws ItemStreamException;
    }

    IteamStream 인터페이스의 역할

    • 주기적으로 상태를 저장하고 오류가 발생하면 해당 상태에서 복원하기 위한 마커 인터페이스이다.
    • 배치 프로세스의 실행 컨텍스트와 연계해 ItemReader의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할

    open(), close()- 스트림을 열고 닫는다.

    update() - Batch 처리의 상태를 업데이트 한다.

     

    개발자는 ItemReader와 ItemStream 인터페이스를 직접 구현해 원하는 형태의 ItemReader를 만들 수 있다.

     

    Database Reader

    배치 작업은 많은 양의 데이터를 처리해야 한다.

    수백만개의 데이터를 조회하는 쿼리가 있는 경우 해당 데이터를 모두 한 번에 가져오는 것은 힘들다.

    Spring JdbcTemplate은 분할처리를 지원하지 않아 개발자가 직접 limit, offset 을 사용하는 등의 작업이 필요하다.

     

    Spring Batch는 이런 문제점을 해결하기 위해 2개의 Reader 타입을 지원

    • Cursor 기반 ItemReader 구현체
      • JdbcCursorItemReader
      • HibernateCursorItemReader
      • StoredProcedureItemReader
    • Paging 기반 ItemReader 구현체
      • JdbcPagingItemReader
      • HibernatePagingItemReader
      • JpaPagingItemReader

    Cursor 방식은 DB와 커넥션을 맺은 후, Cursor를 하나씩 옮겨 지속적으로 데이터를 가져옴

    Paging 방식은 한 번에 pageSize 만큼 데이터를 가져온다.

    CursorItemReader

    Paging과 다르게 Streaming 으로 데이터를 처리한다.

    JdbcCursorItemReader

    • Cursor 기반 JDBC Reader 구현체

    예시 코드

    @ToString
    @Getter
    @Setter
    @NoArgsConstructor
    @Entity
    public class Pay {
        private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");
    
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        private Long id;
        private Long amount;
        private String txName;
        private LocalDateTime txDateTime;
    
        public Pay(Long amount, String txName, String txDateTime) {
            this.amount = amount;
            this.txName = txName;
            this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
        }
    
        public Pay(Long id, Long amount, String txName, String txDateTime) {
            this.id = id;
            this.amount = amount;
            this.txName = txName;
            this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
        }
    }
    
    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JdbcCursorItemReaderJobConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource; // DataSource DI
    
        private static final int chunkSize = 10;
    
        @Bean
        public Job jdbcCursorItemReaderJob() {
            return jobBuilderFactory.get("jdbcCursorItemReaderJob")
                    .start(jdbcCursorItemReaderStep())
                    .build();
        }
    
        @Bean
        public Step jdbcCursorItemReaderStep() {
            return stepBuilderFactory.get("jdbcCursorItemReaderStep")
                    .<Pay, Pay>chunk(chunkSize)
                    .reader(jdbcCursorItemReader())
                    .writer(jdbcCursorItemWriter())
                    .build();
        }
    
        @Bean
        public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
            return new JdbcCursorItemReaderBuilder<Pay>()
                    .fetchSize(chunkSize)
                    .dataSource(dataSource)
                    .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                    .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                    .name("jdbcCursorItemReader")
                    .build();
        }
    
        private ItemWriter<Pay> jdbcCursorItemWriter() {
            return list -> {
                for (Pay pay: list) {
                    log.info("Current Pay={}", pay);
                }
            };
        }
    }
    
    • chunk
      • <Pay, Pay>
        • 첫 번째 Pay : Reader에서 반환할 타입
        • 두 번째 Pay : Writer에 파라미터로 넘어올 타입
      • chunkSize - 트랜잭션 범위
    • fetchSize
      • DB에서 한번에 가져올 데이터 양
      • Paging과 다름
        • Paging은 실제 쿼리를 limit, offset을 이용해서 분할 처리
        • 반면, cursor는 분할 처리 없이 실행되나 내부적으로 가져오는 데이터는 fetchSize 만큼 가져와 read()를 통해 하나씩 가져온다.
    • dataSource
      • DB에 접근하기 위해 사용할 DataSource 객체 할당
    • rowMapper
      • 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
    • sql
      • Reader로 사용할 쿼리문을 사용
    • name
      • reader의 이름을 지정
      • Bean의 이름이 아니다
      • Spring Batch의 ExecutionContext에서 저장되어질 이름

    ItemReader의 가장 큰 장점 : 데이터를 Streaming 할 수 있다는 것

    • read() 메소드는 데이터를 하나씩 가져와 ItemWriter로 데이터를 전달하고 다음 데이터를 가져온다.
    • 이를 통해 reader - processor - writer 가 chunk 단위로 수행되고 주기적으로 commit 된다.
    • 이는 고성능 배치 처리에서 핵심이다

     

    테스트

    쿼리 실행

    create table pay (
      id         bigint not null auto_increment,
      amount     bigint,
      tx_name     varchar(255),
      tx_date_time datetime,
      primary key (id)
    ) engine = InnoDB;
    
    insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00');
    insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00');
    insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00');
    insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');
    

     

    application.yml

    logging.level.org.springframework.batch:DEBUG
    
    • Reader에서 쿼리가 어떻게 생성되고 실행되는지 확인하기 위해 log level 변경

     

    배치 실행

    writer에 명시한대로 데이터를 출력한다.

     

    CursorItemReader의 주의사항

    Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용된다

    • Batch가 끝나기전에 Database와 애플리케이션의 Connection이 먼저 끊어질 수 있다

    따라서 Database와 SocketTimeout을 충분히 큰 값으로 설정해야만 한다.

     

    Batch 수행시간이 오래 걸리는 경우에는 PagingItemReader를 사용하는게 낫다.

    • Paging의 경우 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있다.

     

    PagingItemReader

    Spring Batch에서는 offsetlimit을 pageSize에 맞게 자동으로 생성해준다.

    다만 각 쿼리는 개별적으로 실행한다

    • 각 페이지마다 새로운 쿼리를 실행하므로 페이징시 결과를 정렬하는 것이 중요하다.
      • order By 권장됨.

    JdbcPagingItemReader

    JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader

    @Slf4j
    @RequiredArgsConstructor
    @Configuration
    public class JdbcPagingItemReaderJobConfiguration {
    
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final DataSource dataSource; // DataSource DI
    
        private static final int chunkSize = 10;
    
        @Bean
        public Job jdbcPagingItemReaderJob() throws Exception {
            return jobBuilderFactory.get("jdbcPagingItemReaderJob")
                    .start(jdbcPagingItemReaderStep())
                    .build();
        }
    
        @Bean
        public Step jdbcPagingItemReaderStep() throws Exception {
            return stepBuilderFactory.get("jdbcPagingItemReaderStep")
                    .<Pay, Pay>chunk(chunkSize)
                    .reader(jdbcPagingItemReader())
                    .writer(jdbcPagingItemWriter())
                    .build();
        }
    
        @Bean
        public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
            Map<String, Object> parameterValues = new HashMap<>();
            parameterValues.put("amount", 2000);
    
            return new JdbcPagingItemReaderBuilder<Pay>()
                    .pageSize(chunkSize)
                    .fetchSize(chunkSize)
                    .dataSource(dataSource)
                    .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                    .queryProvider(createQueryProvider())
                    .parameterValues(parameterValues)
                    .name("jdbcPagingItemReader")
                    .build();
        }
    
        private ItemWriter<Pay> jdbcPagingItemWriter() {
            return list -> {
                for (Pay pay: list) {
                    log.info("Current Pay={}", pay);
                }
            };
        }
    
        @Bean
        public PagingQueryProvider createQueryProvider() throws Exception {
            SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
            queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해 
            queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
            queryProvider.setFromClause("from pay");
            queryProvider.setWhereClause("where amount >= :amount");
    
            Map<String, Order> sortKeys = new HashMap<>(1);
            sortKeys.put("id", Order.ASCENDING);
    
            queryProvider.setSortKeys(sortKeys);
    
            return queryProvider.getObject();
        }
    }

    JdbcCursorItemReader와 다른 점

    • 쿼리 - createQueryProvider()

    JdbcCursorItemReader를 사용할 때는 단순히 String 타입으로 쿼리를 생성했다.

    하지만 PagingItemReader에서는 PagingQueryProvider 를 통해 쿼리를 생성한다.

    • why❓
    • ❗각 Database에는 paging을 지원하는 자체적인 전략들이 있다.
      • 때문에 Spring Batch에는 각 DB의 paging 전략에 맞춰 구현되어야 한다.
      • 그래서 Database에 맞는 Provider들이 존재한다.

     

    하지만 이러면 Database마다 Provider 코드를 바꿔야해 불편하다

    그래서 Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 Datasource 설정 값을 보고 provider중 하나를 자동으로 선택하도록 한다.

     

    배치 실행

    • 쿼리 로그에 LIMIT 10 이 들어갔다.
    • JdbcPagingItemReader에 선언된 pageSize 에 맞게 자동으로 쿼리에 추가해줬기 때문

     

    JpaPagingItemReader

    Spring Batch가 JPA를 지원하기 위해 JpaPagingItemReader를 지원한다.

     

    JPA 와 Hibernate는 많은 유사점이 있다.

    • 하지만 Hibernate는 Cursor가 지원되지만
    • JPA에는 Cursor 기반 Database 접근을 지원하지 않는다.
    @Slf4j 
    @RequiredArgsConstructor
    @Configuration
    public class JpaPagingItemReaderJobConfiguration {
        private final JobBuilderFactory jobBuilderFactory;
        private final StepBuilderFactory stepBuilderFactory;
        private final EntityManagerFactory entityManagerFactory;
    
        private int chunkSize = 10;
    
        @Bean
        public Job jpaPagingItemReaderJob() {
            return jobBuilderFactory.get("jpaPagingItemReaderJob")
                    .start(jpaPagingItemReaderStep())
                    .build();
        }
    
        @Bean
        public Step jpaPagingItemReaderStep() {
            return stepBuilderFactory.get("jpaPagingItemReaderStep")
                    .<Pay, Pay>chunk(chunkSize)
                    .reader(jpaPagingItemReader())
                    .writer(jpaPagingItemWriter())
                    .build();
        }
    
        @Bean
        public JpaPagingItemReader<Pay> jpaPagingItemReader() {
            return new JpaPagingItemReaderBuilder<Pay>()
                    .name("jpaPagingItemReader")
                    .entityManagerFactory(entityManagerFactory)
                    .pageSize(chunkSize)
                    .queryString("SELECT p FROM Pay p WHERE amount >= 2000")
                    .build();
        }
    
        private ItemWriter<Pay> jpaPagingItemWriter() {
            return list -> {
                for (Pay pay: list) {
                    log.info("Current Pay={}", pay);
                }
            };
        }
    }
    
    • JdbcPagingItemReader와 달리 EntityManagerFactory를 지정해준다.

    실행

     

    PagingItemReader 주의사항

    정렬(Order) 가 무조건 포함되어 있어야 한다.

    참고자료

    https://jojoldu.tistory.com/336?category=902551

    https://renuevo.github.io/spring/batch/spring-batch-chapter-3/

    https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/ItemReader.html

     

    '🌱 spring > 🚛 spring batch' 카테고리의 다른 글

    Spring Batch - ItemProcessor  (0) 2022.12.16
    Spring Batch - ItemWriter  (0) 2022.12.16
    Spring Batch - Chunk 지향 처리  (1) 2022.12.16
    Spring Batch - Scope  (0) 2022.12.16
    Spring Batch - Job Flow  (0) 2022.12.16

    댓글

Designed by Tistory.