🌱 spring/🚛 spring batch

SpringBatch - ItemReader

beomsic 2022. 12. 16. 15:34

📌 ItemReader


Step은 Tasklet 단위로 처리가 된다.

Tasklet 중에서 ChunkOrientedTasklet 을 통해 Chunk를 처리한다.

이를 구성하는 3요소로 ItemReader, ItemWriter, ItemProcessor 가 있다.

 

ItemReader 란

  • Spring Batch의 Chunk Tasklet의 과정

 

ItemReader는 데이터를 읽어들인다.

  • DB의 데이터만 이야기하는 것이 아니다
  • File, XML, JSON 등 다른 데이터 소스를 배치 처리의 입력으로 사용할 수 있다.
  • JMS(Java Message Service) 와 같은 다른 유형의 데이터 소스도 지원한다.

또한 Spring Batch에서 지원하지 않는 Reader가 필요한 경우 직접 Reader를 만들어 사용할 수 있다.

Spring Batch의 Reader에서 읽어올 수 있는 데이터 유형

  • 입력 데이터
  • 파일
  • DB
  • Java Message Service 등 다른 소스
  • 본인이 커스텀한 Reader로 읽어오기

 

  • JdbcPagingItemReader : 가장 대표적인 ItemReader의 구현체

ItemReader 외에도 ItemStream 인터페이스도 함께 구현하고 있다.

 

ItemReader

public interface ItemReader<T> {
	@Nullable
	T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}
  • 데이터를 읽어오는 메소드만 있다. (read)

 

ItemStream

public interface ItemStream {

	void open(ExecutionContext executionContext) throws ItemStreamException;

	void update(ExecutionContext executionContext) throws ItemStreamException;

	void close() throws ItemStreamException;
}

IteamStream 인터페이스의 역할

  • 주기적으로 상태를 저장하고 오류가 발생하면 해당 상태에서 복원하기 위한 마커 인터페이스이다.
  • 배치 프로세스의 실행 컨텍스트와 연계해 ItemReader의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할

open(), close()- 스트림을 열고 닫는다.

update() - Batch 처리의 상태를 업데이트 한다.

 

개발자는 ItemReader와 ItemStream 인터페이스를 직접 구현해 원하는 형태의 ItemReader를 만들 수 있다.

 

Database Reader

배치 작업은 많은 양의 데이터를 처리해야 한다.

수백만개의 데이터를 조회하는 쿼리가 있는 경우 해당 데이터를 모두 한 번에 가져오는 것은 힘들다.

Spring JdbcTemplate은 분할처리를 지원하지 않아 개발자가 직접 limit, offset 을 사용하는 등의 작업이 필요하다.

 

Spring Batch는 이런 문제점을 해결하기 위해 2개의 Reader 타입을 지원

  • Cursor 기반 ItemReader 구현체
    • JdbcCursorItemReader
    • HibernateCursorItemReader
    • StoredProcedureItemReader
  • Paging 기반 ItemReader 구현체
    • JdbcPagingItemReader
    • HibernatePagingItemReader
    • JpaPagingItemReader

Cursor 방식은 DB와 커넥션을 맺은 후, Cursor를 하나씩 옮겨 지속적으로 데이터를 가져옴

Paging 방식은 한 번에 pageSize 만큼 데이터를 가져온다.

CursorItemReader

Paging과 다르게 Streaming 으로 데이터를 처리한다.

JdbcCursorItemReader

  • Cursor 기반 JDBC Reader 구현체

예시 코드

@ToString
@Getter
@Setter
@NoArgsConstructor
@Entity
public class Pay {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;

    public Pay(Long amount, String txName, String txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }

    public Pay(Long id, Long amount, String txName, String txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }
}

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource; // DataSource DI

    private static final int chunkSize = 10;

    @Bean
    public Job jdbcCursorItemReaderJob() {
        return jobBuilderFactory.get("jdbcCursorItemReaderJob")
                .start(jdbcCursorItemReaderStep())
                .build();
    }

    @Bean
    public Step jdbcCursorItemReaderStep() {
        return stepBuilderFactory.get("jdbcCursorItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jdbcCursorItemReader())
                .writer(jdbcCursorItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<Pay>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                .name("jdbcCursorItemReader")
                .build();
    }

    private ItemWriter<Pay> jdbcCursorItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }
}
  • chunk
    • <Pay, Pay>
      • 첫 번째 Pay : Reader에서 반환할 타입
      • 두 번째 Pay : Writer에 파라미터로 넘어올 타입
    • chunkSize - 트랜잭션 범위
  • fetchSize
    • DB에서 한번에 가져올 데이터 양
    • Paging과 다름
      • Paging은 실제 쿼리를 limit, offset을 이용해서 분할 처리
      • 반면, cursor는 분할 처리 없이 실행되나 내부적으로 가져오는 데이터는 fetchSize 만큼 가져와 read()를 통해 하나씩 가져온다.
  • dataSource
    • DB에 접근하기 위해 사용할 DataSource 객체 할당
  • rowMapper
    • 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
  • sql
    • Reader로 사용할 쿼리문을 사용
  • name
    • reader의 이름을 지정
    • Bean의 이름이 아니다
    • Spring Batch의 ExecutionContext에서 저장되어질 이름

ItemReader의 가장 큰 장점 : 데이터를 Streaming 할 수 있다는 것

  • read() 메소드는 데이터를 하나씩 가져와 ItemWriter로 데이터를 전달하고 다음 데이터를 가져온다.
  • 이를 통해 reader - processor - writer 가 chunk 단위로 수행되고 주기적으로 commit 된다.
  • 이는 고성능 배치 처리에서 핵심이다

 

테스트

쿼리 실행

create table pay (
  id         bigint not null auto_increment,
  amount     bigint,
  tx_name     varchar(255),
  tx_date_time datetime,
  primary key (id)
) engine = InnoDB;

insert into pay (amount, tx_name, tx_date_time) VALUES (1000, 'trade1', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (2000, 'trade2', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (3000, 'trade3', '2018-09-10 00:00:00');
insert into pay (amount, tx_name, tx_date_time) VALUES (4000, 'trade4', '2018-09-10 00:00:00');

 

application.yml

logging.level.org.springframework.batch:DEBUG
  • Reader에서 쿼리가 어떻게 생성되고 실행되는지 확인하기 위해 log level 변경

 

배치 실행

writer에 명시한대로 데이터를 출력한다.

 

CursorItemReader의 주의사항

Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용된다

  • Batch가 끝나기전에 Database와 애플리케이션의 Connection이 먼저 끊어질 수 있다

따라서 Database와 SocketTimeout을 충분히 큰 값으로 설정해야만 한다.

 

Batch 수행시간이 오래 걸리는 경우에는 PagingItemReader를 사용하는게 낫다.

  • Paging의 경우 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있다.

 

PagingItemReader

Spring Batch에서는 offsetlimit을 pageSize에 맞게 자동으로 생성해준다.

다만 각 쿼리는 개별적으로 실행한다

  • 각 페이지마다 새로운 쿼리를 실행하므로 페이징시 결과를 정렬하는 것이 중요하다.
    • order By 권장됨.

JdbcPagingItemReader

JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader

@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource; // DataSource DI

    private static final int chunkSize = 10;

    @Bean
    public Job jdbcPagingItemReaderJob() throws Exception {
        return jobBuilderFactory.get("jdbcPagingItemReaderJob")
                .start(jdbcPagingItemReaderStep())
                .build();
    }

    @Bean
    public Step jdbcPagingItemReaderStep() throws Exception {
        return stepBuilderFactory.get("jdbcPagingItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jdbcPagingItemReader())
                .writer(jdbcPagingItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("amount", 2000);

        return new JdbcPagingItemReaderBuilder<Pay>()
                .pageSize(chunkSize)
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameterValues)
                .name("jdbcPagingItemReader")
                .build();
    }

    private ItemWriter<Pay> jdbcPagingItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해 
        queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
        queryProvider.setFromClause("from pay");
        queryProvider.setWhereClause("where amount >= :amount");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }
}

JdbcCursorItemReader와 다른 점

  • 쿼리 - createQueryProvider()

JdbcCursorItemReader를 사용할 때는 단순히 String 타입으로 쿼리를 생성했다.

하지만 PagingItemReader에서는 PagingQueryProvider 를 통해 쿼리를 생성한다.

  • why❓
  • ❗각 Database에는 paging을 지원하는 자체적인 전략들이 있다.
    • 때문에 Spring Batch에는 각 DB의 paging 전략에 맞춰 구현되어야 한다.
    • 그래서 Database에 맞는 Provider들이 존재한다.

 

하지만 이러면 Database마다 Provider 코드를 바꿔야해 불편하다

그래서 Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 Datasource 설정 값을 보고 provider중 하나를 자동으로 선택하도록 한다.

 

배치 실행

  • 쿼리 로그에 LIMIT 10 이 들어갔다.
  • JdbcPagingItemReader에 선언된 pageSize 에 맞게 자동으로 쿼리에 추가해줬기 때문

 

JpaPagingItemReader

Spring Batch가 JPA를 지원하기 위해 JpaPagingItemReader를 지원한다.

 

JPA 와 Hibernate는 많은 유사점이 있다.

  • 하지만 Hibernate는 Cursor가 지원되지만
  • JPA에는 Cursor 기반 Database 접근을 지원하지 않는다.
@Slf4j 
@RequiredArgsConstructor
@Configuration
public class JpaPagingItemReaderJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private int chunkSize = 10;

    @Bean
    public Job jpaPagingItemReaderJob() {
        return jobBuilderFactory.get("jpaPagingItemReaderJob")
                .start(jpaPagingItemReaderStep())
                .build();
    }

    @Bean
    public Step jpaPagingItemReaderStep() {
        return stepBuilderFactory.get("jpaPagingItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jpaPagingItemReader())
                .writer(jpaPagingItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Pay> jpaPagingItemReader() {
        return new JpaPagingItemReaderBuilder<Pay>()
                .name("jpaPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Pay p WHERE amount >= 2000")
                .build();
    }

    private ItemWriter<Pay> jpaPagingItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }
}
  • JdbcPagingItemReader와 달리 EntityManagerFactory를 지정해준다.

실행

 

PagingItemReader 주의사항

정렬(Order) 가 무조건 포함되어 있어야 한다.

참고자료

https://jojoldu.tistory.com/336?category=902551

https://renuevo.github.io/spring/batch/spring-batch-chapter-3/

https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/ItemReader.html