ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Batch - Chunk 지향 처리
    🌱 spring/🚛 spring batch 2022. 12. 16. 01:35

    📌 Chunk 지향 처리


    Chunk 지향 처리는 Spring Batch의 큰 장점 중 하나이다.

    ❔ Chunk

    Spring Batch에서 Chunk 란 데이터 덩어리로 작업할 때 각 커밋사이에 처리되는 row의 수이다.

     

    즉, Chunk 지향 처리

    • 한 번에 하나씩 데이터를 읽어 Chunk 라는 덩어리를 만든 뒤
    • Chunk 단위로 트랜잭션을 다루는 것이다.

    Chunk 단위로 트랜잭션을 수행하기 때문에

    • 실패할 경우 해당 Chunk 만큼만 롤백
    • 이전에 커밋된 트랜잭션 범위까지는 반영이 된다.

    • Reader에서 데이터를 하나 읽어온다
    • 읽어온 데이터를 Processor에서 가공한다.
    • 가공된 데이터들을 별도의 공간에 모은뒤, Chunk 단위만큼 쌓이게 되면 Writer에 전달하고 Writer는 일괄 저장한다.

    Reader, Processor 에서는 1건씩 다뤄지고, Writer에선 Chunk 단위로 처리가 된다.

     

    간단한 자바코드로 표현

    for(int i = 0; i< totalSize; i += chunkSize) { // chunkSize 단위로 묶어서 처리
        List items = new Arraylist();
        for(int j = 0; j < chunkSize; j++){
            Object item = itemReader.read()
            Object processedItem = itemProcessor.process(item);
            items.add(processedItem);
        }
        itemWriter.write(items);
    }

     

    ChunkOrientedTasklet

    Spring Batch에서 Chunk 지향 처리의 전체 로직을 다루는 것은 ChunkOrientedTasklet 클래스이다.

     

    ChunkOrientedTasklet의 execute()

    @Nullable
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    
      @SuppressWarnings("unchecked")
      Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
      if (inputs == null) {
      	inputs = chunkProvider.provide(contribution); // Reader에서 데이터 가져옴
    	if (buffering) {
    		chunkContext.setAttribute(INPUTS_KEY, inputs);
    	}
      }
    
      chunkProcessor.process(contribution, inputs); // Processor & Writer 처리
      chunkProvider.postProcess(contribution, inputs);
    
      // Allow a message coming back from the processor to say that we
      // are not done yet
      if (inputs.isBusy()) {
    	logger.debug("Inputs still busy");
    	return RepeatStatus.CONTINUABLE;
      }
    
      chunkContext.removeAttribute(INPUTS_KEY);
      chunkContext.setComplete();
    
      if (logger.isDebugEnabled()) {
    	logger.debug("Inputs not busy, ended: " + inputs.isEnd());
      }
      return RepeatStatus.continueIf(!inputs.isEnd());
    }
    • chunkProvider.provide() : Reader에서 Chunk size 만큼 데이터를 가져온다.
    • chunkProcessor.process() : Reader에서 받은 데이터를 가공(process)하고 저장(write)한다.

     

    chunkProvider.provide()

    • Chunk가 ChunkSize 만큼 쌓일 때 까지 read()를 호출한다.
    • read()는 ItemReader.read 를 호출한다.
    • ItemReader.read 에서 1건씩 데이터를 조회하여 Chunk size 만큼 데이터를 쌓는다.

     

    SimpleChunkProcessor

    • ChunkSize 만큼 쌓은 데이터를 어떻게 가공하고 저장하는가❓

    Processor와 Writer 로직을 담고 있는 것은 ChunkProcessor 가 담당하고 있다.

    • 인터페이스

     

    SimpleChunkProcessor가 기본적으로 사용하는 실제 구현체이다.

     

    SimpleChunkProcessor의 process() - 처리를 담당하는 핵심 로직

    @Override
    public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
    
      // Allow temporary state to be stored in the user data field
      initializeUserData(inputs);
    
      // If there is no input we don't have to do anything more
      if (isComplete(inputs)) {
    	return;
      }
    
      // Make the transformation, calling remove() on the inputs iterator if
      // any items are filtered. Might throw exception and cause rollback.
      Chunk<O> outputs = transform(contribution, inputs);
    
      // Adjust the filter count based on available data
      contribution.incrementFilterCount(getFilterCount(inputs, outputs));
    
      // Adjust the outputs if necessary for housekeeping purposes, and then
      // write them out...
      write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
    
    }
    • Chunk<I> inputs 를 파라미터로 받는다.
      • chunkProvider.provide()에서 받은 ChunkSize만큼 쌓인 item
    • transform()에서는 전달받은 inputs을 doProcess()로 전달하고 변환값을 받는다.
    • transform()에서 가공된 대량의 데이터는 writer()를 통해 일괄 저장된다.
      • writer()는 저장이 될수도 있고 외부 API로 전송할 수도 있다.
      • 이는 개발자가 ItemWriter를 어떻게 구현했는지에 따라 다르다.

     

    transform()

    protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
      Chunk<O> outputs = new Chunk<>();
      for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
      	final I item = iterator.next();
    	O output;
    	Timer.Sample sample = BatchMetrics.createTimerSample();
    	String status = BatchMetrics.STATUS_SUCCESS;
    	try {
    		output = doProcess(item);
    	}
    	catch (Exception e) {
    		/*
    		 * For a simple chunk processor (no fault tolerance) we are done
    		 * here, so prevent any more processing of these inputs.
    		 */
    		inputs.clear();
    		status = BatchMetrics.STATUS_FAILURE;
    		throw e;
    	}
    	finally {
    		stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
    	}
    	if (output != null) {
    		outputs.add(output);
    	}
    	else {
    		iterator.remove();
    	}
      }
      return outputs;
    }
    • 반복문을 통해 doProcess()를 호출한다.
      • doProcess()는 ItemProcessor의 process()를 사용한다.

     

    protected final O doProcess(I item) throws Exception {
    
      if (itemProcessor == null) {
    	@SuppressWarnings("unchecked")
        O result = (O) item;
    	return result;
      }
    
      try {
    	listener.beforeProcess(item);
    	O result = itemProcessor.process(item);
    	listener.afterProcess(item, result);
    	return result;
      }
      catch (Exception e) {
    	listener.onProcessError(item, e);
    	throw e;
      }
    }
    • doProcess()를 처리하는데 만약 ItemProcessor 가 없다면 item을 그대로 반환한다.
    • 있다면 process()로 가공하여 반환한다.

     

    가공된 데이터들은 SimpleChunkProcessor의 doWriter()를 호출하여 일괄 처리가 된다.

    ⚠️ Page Size vs Chunk Size

    Chunk Size : 한번에 처리될 트랜잭션 단위

    Page Size : 한번에 조회할 Item의 양

     

    Page Size

    • Page 단위로 끊어서 조회하는 것
    • 페이징 쿼리에서 Page의 Size를 지정하기 위함

    Page Size가 10, Chunk Size가 50 인 경우 ItemReader에서 Page 조회가 5번 일어나면❓

    • 1 번의 트랜잭션이 발생하여 Chunk가 처리가 된다.
    • 한 번의 트랜잭션 처리를 위해 5번의 쿼리 조회가 발생 → 성능상 이슈가 발생할 수 있다.

     

    Page size 와 Chunk Size의 값을 일치시키는 것이 보편적으로 좋은 방법이다.

    참고자료

    https://jojoldu.tistory.com/331?category=902551

     

    '🌱 spring > 🚛 spring batch' 카테고리의 다른 글

    Spring Batch - ItemWriter  (0) 2022.12.16
    SpringBatch - ItemReader  (0) 2022.12.16
    Spring Batch - Scope  (0) 2022.12.16
    Spring Batch - Job Flow  (0) 2022.12.16
    메타데이터 테이블  (0) 2022.12.16

    댓글

Designed by Tistory.