Batch란?
배치(Batch)는 일괄처리 라는 뜻을 갖고 있습니다.
Batch Application의 조건
•
대용량 데이터 : 배치 어플리케이션은 대량의 데이터를 가져오거나, 전달하거나, 계산하는 등의 처리를 할 수 있어야함
•
자동화 : 배치 어플리케이션은 심각한 문제 해결을 제외하고는 사용자 개입 없이 실행되어야함
•
견고성 : 배치 어플리케이션은 잘못된 데이터를 충돌/중단 없이 처리할 수 있어야함
•
신뢰성 : 배치 어플리케이션은 무엇이 잘못되었는지를 추적할 수 있어야함 (로깅, 알람)
•
성능 : 배치 어플리케이션은 지정한 시간 안에 처리를 완료하거나 동시에 실행되는 다른 어플리케이션을 방해하지 않도록 수행되어야함
Spring Batch의 원칙 및 가이드
•
Batch와 Service에 영향을 최소화 할 수 있도록 구조와 환경에 맞게 디자인한다.
•
Batch Application 내에서 가능한한 복잡한 로직은 피하고 단순하게 설계한다.
•
데이터 처리하는 곳과 데이터의 저장소는 물리적으로 가능한한 가까운 곳에 위치한다.
•
I/O 등의 시스템 리소스의 사용을 최소화하며 최대한 많은 데이터를 메모리 위에서 처리한다.
•
처리 시간이 많이 걸리는 작업을 시작하기 전 메모리 재할당에 소모되는 시간을 피하기 위해 충분한 메모리를 할당한다.
•
데이터 무결성을 위해 적절한 검사 및 기록하는 코드를 추가한다.
Batch vs Quartz
Batch는 대용량 데이터 배치 처리에 대한 목적이고 Quartz는 스케줄링에 대한 목적입니다.
각각의 목적에 대한 기능만 지원하기 때문에 보통은 Batch + Quartz 를 조합해서 사용합니다.
Spring Batch는 정해진 스케줄마다 Quartz가 Batch를 실행하는 구조입니다.
Spring Batch Architecture
스프링 배치의 아키텍처의 큰 그림을 먼저 살펴보면 아래와 같습니다.
•
Application Layer
◦
개발자가 작성한 모든 배치 작업과 사용자 정의 코드가 포함되어 있습니다.
•
Core Layer
◦
배치 작업을 시작하고 제어하는데 필요한 핵심 클래스들이 포함되어 있습니다.
◦
JobLauncher, Job, Step 등
•
Infrastructure Layer
◦
외부와 상호작용을 하기 위한 클래스들이 포함되어 있습니다.
◦
ItemReader, ItemWriter, RetryTemplate
위 Layer 중 Core와 Infrastructure를 좀 더 자세히 살펴보면 아래와 같습니다.
Spring Batch는 스케줄링된 시간에 Bean이 생성되면 JobLauncher 객체에 의해서 정의된 Job을 수행합니다.
JobRepository
JobRepository는 DB 또는 Memory 등 Storage에 배치의 메타데이터를 관리하는 클래스로 Spring Batch의 전반적인 데이터를 관리하는 클래스입니다.
public class SimpleJobRepository implements JobRepository {
private static final Log logger = LogFactory.getLog(SimpleJobRepository.class);
private JobInstanceDao jobInstanceDao;
private JobExecutionDao jobExecutionDao;
private StepExecutionDao stepExecutionDao;
private ExecutionContextDao ecDao;
...
Java
복사
JobRepository는 인터페이스로 SimpleJobRepository 구현체가 존재합니다.
JobInstance, JobExecution, StepExecution 등 Batch와 관련된 도메인의 DAO가 존재하며 CRUD를 처리하는 역할을 합니다.
아래 사진은 Spring Batch에서 자동으로 구성해주는 테이블의 ERD입니다.
•
BATCH_JOB_INSTANCE
◦
JobInstance 클래스에 해당
◦
job_name과 job_key를 기준으로 하나의 row가 생성
◦
job_key는 JobParameters를 나열해 암호화하여 저장
•
BATCH_JOB_EXECUTION
◦
JobExecution 클래스에 해당
◦
Job이 실행되는 동안 시작/종료 시간, Job 상태 등을 저장
•
BATCH_JOB_EXECUTION_PARAMS
◦
JobParameters 클래스에 해당
◦
Job을 실행하기 위해 주입된 parameter 정보를 저장
•
BATCH_JOB_EXECUTION_CONTEXT
◦
ExecutionContext 클래스에 해당
◦
Job이 실행되며 공유해야할 데이터를 직렬화하여 저장
•
BATCH_STEP_EXECUTION
◦
StepExecution 클래스에 해당
◦
Step이 실행되는 동안 시작/종료 시간, Step 상태, 처리 횟수 등을 저장
•
BATCH_STEP_EXECUTION_CONTEXT
◦
ExecutionContext 클래스에 해당
◦
Step이 실행되며 공유해야할 데이터를 직렬화하여 저장
Spring Batch는 기본적으로 낙관적 락 전략을 사용하기 때문에 모든 테이블에는 version 컬럼이 존재합니다.
그러므로 같은 데이터에 동시에 엑세스가 된다면 OptimisticLockingFailureException이 발생합니다.
JobLauncher
JobLauncher는 Spring Batch Job을 실행시키는 역할로 BatchAutoConfiguration을 통해 생성되는 JobLauncherApplicationRunner 에 의해 Job과 JobParameters를 전달받아 실행됩니다.
//JobLauncher
@FunctionalInterface
public interface JobLauncher {
JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
//SimpleJobLauncher
@Deprecated(since = "5.0.0", forRemoval = true)
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
private JobRepository jobRepository;
private TaskExecutor taskExecutor;
private MeterRegistry meterRegistry = Metrics.globalRegistry;
private Counter jobLaunchCount; // NoopCounter is still incubating
...
Java
복사
JobLauncher는 TaskExecutor 인터페이스를 갖고 있어서 어떤 구현체냐에 따라 동기, 비동기로 Job 실행이 가능합니다.
//SimpleJobLauncher의 run() 메서드
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
if (this.jobLaunchCount != null) {
this.jobLaunchCount.increment();
}
final JobExecution jobExecution;
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/*
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED
* and STOPPING retrieve the previous execution and check
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning()) {
throw new JobExecutionAlreadyRunningException(
"A job execution for this job is already running: " + lastExecution);
}
else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
job.getJobParametersValidator().validate(jobParameters);
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
}
job.execute(jobExecution);
if (logger.isInfoEnabled()) {
Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(),
jobExecution.getEndTime());
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + jobExecution.getStatus() + "]"
+ (jobExecutionDuration == null ? ""
: " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
}
}
catch (Throwable t) {
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: ["
+ jobParameters + "]", t);
}
rethrow(t);
}
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
catch (TaskRejectedException e) {
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(jobExecution);
}
return jobExecution;
}
Java
복사
메서드 구현 내용을 확인해보면 이미 실행된 Job인지 확인한 후 JobParameters를 validate하고 JobExecution을 생성하여 taskExecutor를 통해 execute하는걸 확인할 수 있습니다.
Job
Job은 전체 배치 프로세스를 캡슐화하는 도매인 객체이며 단순히 Step 인스턴스들의 컨테이너입니다.
JobInstance
Job의 실행 단위를 나타내며 Job이 하나 실행되면 하나의 JobInstance가 생성됩니다.
JobParameter가 있는 경우엔 식별 가능한 parameter 별로 JobInstance가 생성되고 JobInstance = Job + 식별 가능한 JobParameter로 정의할 수 있습니다.
JobParameters
Job이 시작될 때 필요한 시작 조건으로 parameter를 통해 Job을 특정 조건에서 동작하게끔 할 수 있습니다.
JobParametersValidator
Job 실행 시 필수적인 파라미터를 검증하는 역할을 합니다.
기본적으로 DefaultJobParametersValidator 구현체를 지원하며 requiredKeys, optionalKeys 두 가지를 검증할 수 있습니다.
JobExeuction
JobInstance의 실행 시도에 대한 객체로 상태, 시작시간, 종료시간, 생성시간 등의 정보를 담고 있고 동일한 JobInstance를 실행하더라도 JobExecution은 개별로 생성됩니다.
Step
Step은 Job의 독립적인 순차적 단계를 캡슐화하는 도메인 객체입니다.
StepExecution
Step의 실행에 대한 객체로 JobExecution과 Step에 대한 커밋 횟수, 롤백 횟수, 시작시간, 종료시간 등의 트랜잭션 관련 정보를 담고 있습니다.
Flow
Flow는 Step을 순차적으로만 구성하는 것이 아닌 특정한 상태에 따라 흐름을 전환하도록 구성할 수 있게 해주는 Job입니다.
FlowJobBuilder()엔 on(), to(), stop(), fail(), end(), stopAndRestart() 와 같은 메서드가 존재하며 BatchStatus, ExitStatus, FlowExecutionStatus 와 같은 배치상태들로 흐름 제어를 할 수 있습니다.
또한 미리 정의 해놓은 Flow를 Step처럼 Flow 내에서 또는 Step 내에서 사용할 수 있습니다.
JobExecutionDecider
Flow의 흐름 제어를 위한 인터페이스로 필요한 분기로직을 구현체로 작성하여 원하는 배치상태를 return할 수 있습니다.
Chunk, Tasklet
Chunk와 Tasklet은 배치를 처리할 수 있는 두 가지 방법입니다.
@Bean
@JobScope
public Step step(
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new StepBuilder("stepName", jobRepository)
.<I, O>chunk(CHUNK_SIZE, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Java
복사
먼저 Chunk란 여러 개의 아이템을 묶은 하나의 덩어리를 의미하며 대용량 데이터를 한 번에 처리하는 것이 아닌 청크 단위로 쪼개서 반복해서 처리하는 방법입니다.
청크 단위만큼 ItemReader, ItemProcessor, ItemWriter를 반복하며 각 청크가 트랜잭션의 단위가 됩니다.
@Bean
@JobScope
public Step step(
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new StepBuilder("stepName", jobRepository)
.tasklet(((contribution, chunkContext) -> {
System.out.println("Hello World");
return RepeatStatus.FINISHED;
}), transactionManager)
.build();
}
Java
복사
그리고 Tasklet은 그 반대로 단순하게 하나의 덩어리를 쪼개지 않고 한 번에 처리할 수 있는 방법이지만 대용량 데이터를 처리하는 부하를 감당하기엔 버거울 수 있습니다.
단순한 만큼 람다식을 통해 구현할 수 있고 종료할 땐 RepeatStatus의 CONTINUABLE 또는 FINISHED를 통해 처리를 완료할 수 있습니다.
ExecutionContext
Job마다 여러 개의 Step과 함께 상태를 공유하기 위한 공간으로 Key/Value 쌍으로 이루어져있습니다.
이러한 ExecutionContext는 Job 혹은 Step의 재시작을 용이하게 도와줍니다.
//JobExecution
public class JobExecution extends Entity {
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
...
private volatile ExecutionContext executionContext = new ExecutionContext();
//StepExecution
public class StepExecution extends Entity {
private final JobExecution jobExecution;
private final String stepName;
private volatile BatchStatus status = BatchStatus.STARTING;
...
private volatile ExecutionContext executionContext = new ExecutionContext();
Java
복사
JobExecution, StepExecution 내부에 각각 존재하며 JobExecution 내부의 ExecutionContext는 Job과 Step이 함께 공유하지만 StepExecution 내부의 ExecutionContext는 Step 간의 공유가 불가능합니다.
@JobScope, @StepScope
@Scope란 스프링 컨테이너에서 Bean이 관리되는 범위로 singleton, prototype, request, session, application 이 있으며 기본값은 singleton입니다.
//@JobScope
@Scope(value = "job", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JobScope {
}
//@StepScope
@Scope(value = "step", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface StepScope {
}
Java
복사
@JobScope와 @StepScope 내부엔 이러한 @Scope를 갖고 있으며 Job과 Step의 Bean 생성과 실행에 관여합니다.
기본적으로 proxyMode를 사용하며 Bean 생성과 초기화가 앱 구동 시점이 아닌 Bean의 실행시점에 이뤄지게됩니다.
@Bean
@JobScope
public Step step(
JobRepository jobRepository,
PlatformTransanactionManager transactionManager,
@Value("#{jobParameters[파라미터명]}") Object object
) {
...
}
Java
복사
이로서 @Value를 통해 jobParameters, jobExecutionContext 혹은 stepExecutionContext 등을 주입 받으며 Lazy Binding이 가능해집니다.