🌱 spring/🚛 spring batch

Spring Batch - Job Flow

beomsic 2022. 12. 16. 01:03

📌 Spring Batch Job Flow


실전에서 사용할 수 있는 Spring Batch 내용

Step

  • 실제 Batch 작업을 수행하는 역할
  • Batch로 실제 처리하고자 하는 기능과 설정을 모두 포함하는 장소

Job 내부의 Step들 간의 순서 혹은 처리 흐름을 어떻게 제어(Step 들을 어떻게 관리 ?)

 

Next

StepNextJobConfiguration.java 샘플코드

@Slf4j
@Configuration
@RequiredArgsConstructor
public class StepNextJobConfiguration {

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;

  @Bean
  public Job stepNextJob() {
    return jobBuilderFactory.get("stepNextJob")
        .start(step1())
        .next(step2())
        .next(step3())
        .build();
  }

  @Bean
  public Step step1() {
    return stepBuilderFactory.get("step1")
        .tasklet((contribution, chunkContext) -> {
          log.info("====== This is Step1 ======");
          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public Step step2() {
    return stepBuilderFactory.get("step2")
        .tasklet((contribution, chunkContext) -> {
          log.info("====== This is Step2 ======");
          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public Step step3() {
    return stepBuilderFactory.get("step3")
        .tasklet((contribution, chunkContext) -> {
          log.info("====== This is Step3 ======");
          return RepeatStatus.FINISHED;
        })
        .build();
  }
}
  • next() 는 순차적으로 Step들을 연결시킬 때 사용한다.
  • step1 > step2 > step3 순으로 하나씩 실행시킬 때 사용하기에 좋다.

 

조건별 흐름 제어(Flow)

Next가 순차적으로 Step의 순서를 제어한다.

  • ⚠️ 앞의 step에서 오류가 발생할 경우 나머지 뒤에 있는 step들은 실행되지 못한다!!

하지만 상황에 따라 정상일때는 Step B로, 오류가 났을 때는 Step C로 수행해야 할 때가 있다.

 

Spring Batch Job에서는 조건별로 Step을 사용할 수 있다.

 

StepNextConditionalJobConfiguration.java

@Slf4j
@Configuration
@RequiredArgsConstructor
public class StepNextConditionalJobConfiguration {

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;

  @Bean
  public Job stepNextConditionalJob() {
    return jobBuilderFactory.get("stepNextConditionalJob")
        .start(conditionalJobStep1())
        .on("FAILED") // FAILED 일 경우
        .to(conditionalJobStep3()) // step3으로 이동한다.
        .on("*") // step3의 결과 관계 없이
        .end() // step3으로 이동하면 Flow가 종료한다.
        .from(conditionalJobStep1()) // step1로부터
        .on("*") // FAILED 외에 모든 경우
        .to(conditionalJobStep2()) // step2로 이동한다.
        .next(conditionalJobStep3()) // step2가 정상 종료되면 step3으로 이동한다.
        .on("*") // step3의 결과 관계 없이
        .end() // step3으로 이동하면 Flow가 종료한다.
        .end() // Job 종료
        .build();
  }

  @Bean
  public Step conditionalJobStep1() {
    return stepBuilderFactory.get("step1")
        .tasklet((contribution, chunkContext) -> {
          log.info("==== StepNextConditionalJob Step1 ===");

          /**
           ExitStatus를 FAILED로 지정한다.
           해당 status를 보고 flow가 진행된다.
           **/
          contribution.setExitStatus(ExitStatus.FAILED);

          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public Step conditionalJobStep2() {
    return stepBuilderFactory.get("conditionalJobStep2")
        .tasklet((contribution, chunkContext) -> {
          log.info("==== StepNextConditionalJob Step2 ===");
          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public Step conditionalJobStep3() {
    return stepBuilderFactory.get("conditionalJobStep3")
        .tasklet((contribution, chunkContext) -> {
          log.info("==== StepNextConditionalJob Step3 ===");
          return RepeatStatus.FINISHED;
        })
        .build();
  }
}

위 코드는 Step1 의 성공 / 실패 여부에 따라 시나리오가 달라진다.

  • step1 성공 : step1 → step2 → step3
  • step1 실패 : step1 → step3

 

  @Bean
  public Job stepNextConditionalJob() {
    return jobBuilderFactory.get("stepNextConditionalJob")
        .start(conditionalJobStep1())
        .on("FAILED") // FAILED 일 경우
        .to(conditionalJobStep3()) // step3으로 이동한다.
        .on("*") // step3의 결과 관계 없이
        .end() // step3으로 이동하면 Flow가 종료한다.
        .from(conditionalJobStep1()) // step1로부터
        .on("*") // FAILED 외에 모든 경우
        .to(conditionalJobStep2()) // step2로 이동한다.
        .next(conditionalJobStep3()) // step2가 정상 종료되면 step3으로 이동한다.
        .on("*") // step3의 결과 관계 없이
        .end() // step3으로 이동하면 Flow가 종료한다.
        .end() // Job 종료
        .build();
  }

.on()

  • 캐치할 ExitStatus 지정
  • * 일 경우는 모든 ExitStatus가 지정된다.

.to()

  • 다음으로 이동할 Step 지정

.from()

  • 일종의 이벤트 리스너 역할
  • 상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출
  • step1 의 이벤트 캐치가 FAILED 로 되어 있는 상태라면 추가로 이벤트 캐치를 하기 위해서는 from 을 사용해야 한다.

.end()

  • 두 가지 종류
    • FlowBuilder를 반환하는 end
    • FlowBuilder를 종료하는 end
  • on(”*”) 뒤에 있는 end : FlowBuilder를 반환
  • build() 앞에 있는 end : FlowBuilder를 종료

❗ on 이 캐치하는 상태 값이 BatchStatus가 아닌 ExitStatus 이다!!

 

Decide

위 방법과 다른 분기 처리 방식

 

위에서 진행 했던 방식의 2가지 문제점

  1. Step이 담당하는 역할이 2개 이상이다.
    • Step이 실제로 처리해야할 로직외에도 분기처리를 위해 ExitStatus 조작이 필요하다.
  2. 다양한 분기 로직 처리가 어렵다.
    • ExitStatus를 커스텀하게 고치기 위해 Listener를 생성하고
    • 이를 Job Flow에 등록하는 등 번거롭다.

Spring Batch에는 Step들의 Flow 속에서 분기만 담당하는 타입이 있다.

JobExecutionDecider

 

DeciderJobConfiguration.java

@Slf4j
@Configuration
@RequiredArgsConstructor
public class DeciderJobConfiguration {
  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;

  @Bean
  public Job deciderJob() {
    return jobBuilderFactory.get("deciderJob")
        .start(startStep())
        .next(decider()) // 홀수 | 짝수 구분
        .from(decider()) // decider의 상태가
        .on("ODD") // ODD라면
        .to(oddStep()) // oddStep로 간다.
        .from(decider()) // decider의 상태가
        .on("EVEN") // ODD라면
        .to(evenStep()) // evenStep로 간다.
        .end() // builder 종료
        .build();
  }

  @Bean
  public Step startStep() {
    return stepBuilderFactory.get("startStep")
        .tasklet((contribution, chunkContext) -> {
          log.info("==== Start =====");
          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public Step evenStep() {
    return stepBuilderFactory.get("evenStep")
        .tasklet((contribution, chunkContext) -> {
          log.info("==== 짝수 ====");
          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public Step oddStep() {
    return stepBuilderFactory.get("oddStep")
        .tasklet((contribution, chunkContext) -> {
          log.info("==== 홀수 ====");
          return RepeatStatus.FINISHED;
        })
        .build();
  }

  @Bean
  public JobExecutionDecider decider() {
    return new OddDecider();
  }

  public static class OddDecider implements JobExecutionDecider {

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
      Random rand = new Random();

      int randomNumber = rand.nextInt(50) + 1;
      log.info("===== 랜덤숫자: {}", randomNumber);

      if(randomNumber % 2 == 0) {
        return new FlowExecutionStatus("EVEN");
      } else {
        return new FlowExecutionStatus("ODD");
      }
    }
  }
}

Flow

  • startJob → oddDecider 에서 홀수 짝수 구분 → oddStep or evenStep 진행

 

@Bean
public Job deciderJob() {
    return jobBuilderFactory.get("deciderJob")
        .start(startStep())
        .next(decider()) // 홀수 | 짝수 구분
        .from(decider()) // decider의 상태가
        .on("ODD") // ODD라면
        .to(oddStep()) // oddStep로 간다.
        .from(decider()) // decider의 상태가
        .on("EVEN") // ODD라면
        .to(evenStep()) // evenStep로 간다.
        .end() // builder 종료
        .build();
}

start()

  • Job Flow 의 첫 번째 Step 시작

next()

  • startStep 이후 decider를 실행

from()

  • from은 이벤트 리스너 역할
  • decider의 상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출

 

public static class OddDecider implements JobExecutionDecider {

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
      Random rand = new Random();

      int randomNumber = rand.nextInt(50) + 1;
      log.info("===== 랜덤숫자: {}", randomNumber);

      if(randomNumber % 2 == 0) {
        return new FlowExecutionStatus("EVEN");
      } else {
        return new FlowExecutionStatus("ODD");
      }
    }
  }
  • JobExecutionDecider 인터페이스를 구현
  • 서로 다른 상태를 반환
    • Step으로 처리하는것이 아니기 때문에 Exitstatus가 아닌 FlowExecutionStatus 로 상태를 관리한다.

 

⚡ Batch Status 와 Exit Status

Batch Status 와 Exit Status 의 차이점을 아는 것은 매우 중요하다.

Batch Status

  • Job 또는 Step의 실행 결과를 Spring에 기록할 때 사용하는 Enum 이다.
    • COMPLETED
    • STARTING
    • STARTED
    • STOPPING
    • STOPPED
    • FAILED
    • ABANDONED
    • UNKNOWN
.on("FAILED").to(stepB())

위 코드에서 on 메소드가 참조하는 것은 BatchStatus가 아닌 Step의 Exit Status이다.

 

Exit Status

  • Step의 실행 후 상태
  • Exit Status 는 Enum이 아니다.

Spring Batch는 기본적으로 ExitStatus의 exitCode는 Step의 BatchStatus와 같도록 설정되어 있다.

 

커스텀한 exitCode를 사용하고자 하면 어떻게 ❓

.start(step1())
    .on("FAILED")
    .end()
.from(step1())
    .on("COMPLETED WITH SKIPS")
    .to(errorPrint1())
    .end()
.from(step1())
    .on("*")
    .to(step2())
    .end()

COMPLETED WITH SKIPS 는 ExitStatus에 없는 코드이다.

  • 이를 원하는대로 처리하기 위해서는 COMPLETED WITH SKIPS exitStatus 를 반환하는 별도의 로직이 필요

 

public class SkipCheckingListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) && 
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}
  • Stpe이 성공적으로 수행되었는 지 확인
  • 원하는 조건을 만족할 경우 새로운 ExitStatus 반환

 

⚡ 지정한 Batch Job만 실행되도록 하기

applicatin.yml에 코드 추가

spring.batch.job.names: ${job.name:NONE}
  • Spring Batch 가 실행될 때, Program argument로 job.name 값이 넘어오면 해당 값과 일치하는 Job만 실행하겠다는 의미

 

  • job.name:NONE
    • job.name이 있다면 job.name값을 할당
    • 없다면 NONE을 할당
    • spring.batch.job.names 에 NONE이 할당되면 어떤 배치도 실행하지 않는다는 의미
    • 값이 없을 때 모든 배치가 실행되지 않도록 막는 역할

 

실제 운영 환경

  • java -jar batch-application.jar --job.name=simpleJob 같이 배치를 실행

 

참고자료

https://jojoldu.tistory.com/328?category=902551

https://khj93.tistory.com/entry/Spring-Batch란-이해하고-사용하기

https://velog.io/@ehdrms2034/스프링-배치-Job-설정과-실행