728x90
📌 들어가기 앞서

본 글은 향로 선생님의 4. Spring Batch 가이드 - Spring Batch Job Flow 의 내용을 바탕으로 작성되었습니다.
내용을 효과적으로 읽기 위해서는 위 글을 우선적으로 읽어보시는 것을 권장드립니다.


본 글은 아래의 내용을 포함하고 있습니다.
• Spring Batch Job Flow와 흐름 제어


원본 글에 대한 인용 및 요약은 음영 처리된 형태로 표시됩니다.
글의 작성 방향은 이 글을 참고해주시면 감사하겠습니다.

관련 시리즈

1. Spring Batch 5 가이드 - 배치 어플리케이션이란?
2. Spring Batch 5 가이드 - Batch Job 실행해보기
3. Spring Batch 5 가이드 - 메타테이블엿보기
4. Spring Batch 5 가이드 - Spring Batch Job Flow
5. Spring Batch 5 가이드 - Spring Batch Scope & Job Parameter
6. Spring Batch 5 가이드 - Chunk 지향 처리
7. Spring Batch 5 가이드 - ItemReader
8. Spring Batch 5 가이드 - ItemWriter
9. Spring Batch 5 가이드 - ItemProcessor 

 

앞서 Spring Batch의 Job을 구성하는데는 Step이 있다고 말씀드렸습니다.

Step은 실제 Batch 작업을 수행하는 역할을 합니다. 이전에 작성한 코드를 보시면 Job은 코드가 거의 없죠?
실제로 Batch 비지니스 로직을 처리하는 (ex: log.info()) 기능은 Step에 구현되어 있습니다.

이처럼 Step에서는 Batch로 실제 처리하고자 하는 기능과 설정을 모두 포함하는 장소라고 생각하시면 됩니다.

Batch 처리 내용을 담다보니, Job 내부의 Step들간에 순서 혹은 처리 흐름을 제어할 필요가 있는데요.
이번엔 여러 Step들을 어떻게 관리할지에 대해서 알아보겠습니다.

 

4.1. next

next()는 순차적으로 Step들 연결시킬때 사용됩니다.
step1 -> step2 -> stpe3 순으로 하나씩 실행시킬때 next()는 좋은 방법입니다.
@Configuration
class SimpleNextJobConfiguration(
    private val jobRepository: JobRepository,
    private val transactionManager: PlatformTransactionManager,
) {
    private val log = logger()

    @Bean
    fun stepNextJob(): Job {
        return JobBuilder("stepNextJob", jobRepository)
            .start(step1())
            .next(step2())
            .next(step3())
            .build()
    }

    @Bean
    fun step1(): Step {
        return StepBuilder("step1", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> This is Step1")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun step2(): Step {
        return StepBuilder("step2", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> This is Step2")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun step3(): Step {
        return StepBuilder("step3", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> This is Step3")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }
}

실행 결과

 

4.1.1. 지정한 Batch Job 만 실행

배치 코드 내에 2개 이상의 Job이 정의되어 있는 경우 아래와 같은 에러가 발생합니다.

Caused by: java.lang.IllegalArgumentException: Job name must be specified in case of multiple jobs

Job name must be specified in case of multiple jobs

 

이러한 경우 application.yml 파일 내에 아래와 같이 실행할 job name을 지정할 수 있게 하고,
Program argument 혹은 Environment vairables로 jobName을 지정해주시면 됩니다.

# application.yml

spring.batch.job.name: ${jobName:NONE}
Spring Boot 3.x.x 로 버전이 올라가면서 기존에 사용했던 spring.batch.job.names 프로퍼티는 spring.batch.job.name 으로 대체되었습니다.

관련 이슈: Remove support for running multiple Spring Batch jobs

 

 

4.2. 조건 별 흐름 제어

Next가 순차적으로 Step의 순서를 제어한다는 것을 알게 됐습니다.

여기서 중요한 것은, 앞의 Step에서 오류가 나면 나머지 뒤에 있는 Step 들은 실행되지 못한다는 것입니다.

하지만 상황에 따라 정상일때는 Step B로, 오류가 났을때는 Step C로 수행해야할때가 있습니다.

이 경우 FlowBuilder에서 제공하는 아래 메서드를 통해 Job의 흐름 및 순서를 제어할 수 있습니다.

  • .on()
    • 캐치할 ExitStatus 지정
    • * 일 경우 모든 ExitStatus가 지정
  • to()
    • 다음으로 이동할 Step 지정
  • from()
    • 일종의 이벤트 리스너 역할
    • 상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출
    • step1의 이벤트 캐치가 FAILED로 되있는 상태에서 추가로 이벤트 캐치하려면 from을 써야만 함
  • end()
    • end는 FlowBuilder를 반환하는 end와 FlowBuilder를 종료하는 end 2개가 있음
    • on("*")뒤에 있는 end는 FlowBuilder를 반환하는 end
    • build() 앞에 있는 end는 FlowBuilder를 종료하는 end
    • FlowBuilder를 반환하는 end 사용시 계속해서 from을 이어갈 수 있음

 

아래의 시나리오를 가정하여 예제를 작성해보겠습니다.

  • step1 실패 시나리오: step1 -> step3
  • step1 성공 시나리오: step1 -> step2 -> step3
@Configuration
class StepNextConditionalJobConfiguration(
    private val jobRepository: JobRepository,
    private val transactionManager: PlatformTransactionManager,
) {
    private val log = logger()

    @Bean
    fun stepNextConditionalJob(): Job {
        return JobBuilder("stepNextConditionalJob", jobRepository)
            .start(step1())
                .on("FAILED") // FAILED 일 경우
                .to(step3()) // step3으로 이동한다.
                .on("*") // step3의 결과 관계 없이
                .end() // step3으로 이동하면 Flow가 종료한다.
            .from(step1()) // step1로부터
                .on("*") // FAILED 외에 모든 경우
                .to(step2()) // step2로 이동한다.
                .next(step3()) // step2가 정상 종료되면 step3으로 이동한다.
                .on("*") // step3의 결과 관계 없이
                .end() // step3으로 이동하면 Flow가 종료한다.
            .end() // Job 종료
            .build()
    }

    @Bean
    fun step1(): Step {
        return StepBuilder("step1", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> This is Step1")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun step2(): Step {
        return StepBuilder("step2", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> This is Step2")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun step3(): Step {
        return StepBuilder("step3", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> This is Step3")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }
}

 

4.2.1. BatchStatus vs. ExitStatus

조건 별 흐름제어를 설정할 때 FlowBuilder에서 제공하는 on() 메서드는 BatchStatus가 아닌 ExitStatus를 기준으로 제어합니다.

@Configuration
class StepNextConditionalJobConfiguration(
    private val jobRepository: JobRepository,
    private val transactionManager: PlatformTransactionManager,
) {
    @Bean
    fun stepNextConditionalJob(): Job {
        return JobBuilder("stepNextConditionalJob", jobRepository)
            .start(step1())
                .on("FAILED") // ExitStExitStatus.FAILED 를 의미
                .to(step3())
            ...
    }
    ...
 }

 

BatchStatus는 Job 또는 Step 의 실행 결과를 Spring에서 기록할 때 사용하는 Enum입니다. BatchStatus로 사용 되는 값은 아래와 같습니다.

BatchStatus

 

반면 흐름 제어에서 참조하는 ExitStatus는 Step의 실행 후 상태를 얘기합니다. ExitStatus는 Enum class가 아닙니다.

ExitStatus

 

Spring Batch는 기본적으로 ExitStatus의 exitCode는 Step의 BatchStatus와 같도록 설정이 되어 있습니다.

 

만약 흐름 제어를 위해 별도의 커스텀 ExitStatus 가 필요한 경우 아래와 같이 별도의 exitCode를 반환할 수 있게 설정해주시면 됩니다.

class SkipCheckingListener : StepExecutionListener {
        override fun afterStep(stepExecution: StepExecution): ExitStatus? {
            val exitCode = stepExecution.exitStatus.exitCode
            return if (exitCode != ExitStatus.FAILED.exitCode &&
                stepExecution.skipCount > 0
            ) {
                ExitStatus("COMPLETED WITH SKIPS") // 커스텀 exitCode 반환
            } else {
                null
            }
        }
    }

 

4.3 decide

자 위에서 (4.2)에서 Step의 결과에 따라 서로 다른 Step으로 이동하는 방법을 알아보았습니다.

이번에는 다른 방식의 분기 처리를 알아 보겠습니다. 위에서 진행했던 방식은 2가지 문제가 있습니다.

1. Step이 담당하는 역할이 2개 이상 존재
    - 실제 해당 Step이 처리해야할 로직 외에도 분기처리를 시키기 위해 ExitStatus 조작이 필요합니다.

2. 다양한 분기 로직 처리의 어려움
    - ExitStatus를 커스텀하게 고치기 위해선 Listener를 생성하고 Job Flow에 등록하는 등 번거로움이 존재합니다.

명확하게 Step들간의 Flow 분기만 담당하면서 다양한 분기처리가 가능한 타입이 있으면 편하겠죠?

그래서 Spring Batch에서는 Step들의 Flow속에서 분기만 담당하는 타입이 있습니다. JobExecutionDecider 라고 하며, 이를 사용한 샘플 코드를 한번 만들어보겠습니다.

 

@Configuration
class DeciderJobConfiguration(
    private val jobRepository: JobRepository,
    private val transactionManager: PlatformTransactionManager,
) {
    private val log = logger()

    @Bean
    fun deciderJob(): Job {
        return JobBuilder("deciderJob", jobRepository)
            .start(deciderStartStep())
            .next(decider()) // 홀수 | 짝수 구분
            .from(decider()) // decider의 상태가
            .on("ODD") // ODD라면
            .to(oddStep()) // oddStep로 간다.
            .from(decider()) // decider의 상태가
            .on("EVEN") // ODD라면
            .to(evenStep()) // evenStep로 간다.
            .end() // builder 종료
            .build()
    }

    @Bean
    fun deciderStartStep(): Step {
        return StepBuilder("deciderStartStep", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> Start Step")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun evenStep(): Step {
        return StepBuilder("evenStep", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> 짝수입니다")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun oddStep(): Step {
        return StepBuilder("oddStep", jobRepository)
            .tasklet(
                { _, _ ->
                    log.info(">>>>> 홀수입니다")
                    RepeatStatus.FINISHED
                },
                transactionManager,
            )
            .build()
    }

    @Bean
    fun decider(): JobExecutionDecider {
        return OddDecider()
    }

    class OddDecider : JobExecutionDecider {
        private val log = logger()

        override fun decide(jobExecution: JobExecution, stepExecution: StepExecution?): FlowExecutionStatus {
            val pivot = Random().nextInt(50) + 1
            log.info("랜덤숫자: {}", pivot)

            return if (pivot % 2 == 0) {
                FlowExecutionStatus("EVEN")
            } else {
                FlowExecutionStatus("ODD")
            }
        }
    }
}

실행 결과

 

분기 로직에 대한 모든 일은 OddDecider가 전담하고 있습니다.
아무리 복잡한 분기로직이 필요하더라도 Step과는 명확히 역할과 책임이 분리된채 진행할 수 있습니다.

728x90

+ Recent posts