Spring Boot’s Batch process is a very handy way to implement pipleine processing for our applications.
When writing a batch job, we need to consider what happens if a step fails - do we retry, fail the whole job, or log and continue?
Let’s look at how to do retry when a step fails with Spring Boot’s Batch.
If you need to, you can see the primer on Batch here
From the documentation
To make processing more robust and less prone to failure, it sometimes helps to automatically retry a failed operation in case it might succeed on a subsequent attempt. Errors that are susceptible to intermittent failure are often transient in nature
To enable Spring Retry we need to add the @EnableRetry annotation to a configuration class, or to the main class of the application itself:
@SpringBootApplication
@EnableBatchProcessing
@EnableRetry(proxyTargetClass=true)
class TextFilteringSpringBootApplication {
...
}
Now, let’s create an ItemProcessor that will retry its process method 3 times if it fails. We’ll just make it fail every time it runs so we can also see the recover method get called when the method fails too many times:
@Service
@Retryable
class TextFilteringProcessorDeclarativeRetryDefaultConfiguration : ItemProcessor<String, String> {
companion object {
val LOG = LoggerFactory.getLogger(TextFilteringProcessorDeclarativeRetryDefaultConfiguration::class.toString())
}
@Retryable(Exception::class)
@Throws(Exception::class)
override fun process(stringToFilter: String): String? {
LOG.warn("Processing String ${stringToFilter}");
throw Exception("Simulating something failed")
}
@Recover
fun recover(e: Exception, stringToFilter: String): String {
LOG.warn("Recover called with exception ${e.message} for argument ${stringToFilter}");
return "Error with filter"
}
}
The process method throws an exception that Spring Retry will retry 3 times (in this case). If it fails, we have our recover method that takes the exception + all of the arguments that the process method takes and returns the same type as the process method.
Let’s talk about the annotations here:
Now let’s create a simple job to demonstrate what is happening. We’ll use the Reader and Writer from the previous Batch example:
@Bean(name=["Batch example 2"])
fun jobLogAllFiltereddTextItemsAndFailures(jbf: JobBuilderFactory, sbf: StepBuilderFactory): Job {
return jbf.get("demo-text-job2").start(
sbf.get("demo-text-step2")
.chunk<String, String>(1)
.reader(TextFilteringItemReader())
.processor(textFilteringProcessorDeclarativeRetryDefaultConfiguration)
.writer(TextFilteringItemWriter())
.build()).build()
}
When we run this, we can see the retry messages on standard out:
2019-11-24 09:17:09.295 INFO 82473 — [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [demo-text-step2]
2019-11-24 09:17:09.310 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has no bad words
2019-11-24 09:17:10.314 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has no bad words
2019-11-24 09:17:11.315 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has no bad words
2019-11-24 09:17:11.316 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Recover called with exception Simulating something failed for argument This has no bad words
Error with filter
2019-11-24 09:17:11.318 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has one damn bad word
2019-11-24 09:17:12.323 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has one damn bad word
2019-11-24 09:17:13.327 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has one damn bad word
2019-11-24 09:17:13.328 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Recover called with exception Simulating something failed for argument This has one damn bad word
Error with filter
The process method is called 3 times, fails 3 times and then the recover method is called. The return from the processor is the return value of the recover method and the ItemWriter prints Error with filter from the processor.
Let’s say we wanted to start with a 1 second retry, and if the retry fails wait 2 seconds before retrying, then 4, finally 8 and then failing after that. That’s 5 tries total with 1 second retry interval with a 2x multiplier. Easy:
@Service
@Retryable
class TextFilteringProcessorDeclarativeRetryAnnotatedConfiguration : ItemProcessor<String, String> {
companion object {
val LOG = LoggerFactory.getLogger(TextFilteringProcessorDeclarativeRetryAnnotatedConfiguration::class.toString())
}
@Retryable(Exception::class, maxAttempts = 5, backoff = Backoff(delay = 1000L, multiplier = 2.0))
@Throws(Exception::class)
override fun process(stringToFilter: String): String? {
LOG.warn("Processing String ${stringToFilter}");
throw Exception("Simulating something failed")
}
@Recover
fun recover(e: Exception, stringToFilter: String): String {
LOG.warn("Recover called with exception ${e.message} for argument ${stringToFilter}");
return "Error with filter"
}
}
Now, let’s create a job using this ItemProcessor:
@Bean(name=["Batch example 3"])
fun jobLogAllFiltereddTextItemsAndFailuresConfigured(jbf: JobBuilderFactory, sbf: StepBuilderFactory): Job {
return jbf.get("demo-text-job3").start(
sbf.get("demo-text-step3")
.chunk<String, String>(1)
.reader(TextFilteringItemReader())
.processor(textFilteringProcessorDeclarativeRetryAnnotatedConfiguration)
.writer(TextFilteringItemWriter())
.build()).build()
}
Here is the output, let’s pay close attention to the timestamps to see the seconds between tries (1, 2, 4, 8):
2019-11-25 19:52:51.726 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words
2019-11-25 19:52:52.729 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words
2019-11-25 19:52:54.729 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words
2019-11-25 19:52:58.730 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words
2019-11-25 19:53:06.735 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words
2019-11-25 19:53:06.736 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Recover called with exception Simulating something failed for argument This has no bad words
Error with filter
If we are implementing other interfaces with our @Retryable bean we need to tell AOP to use class based proxies instead of interface based proxies.
This is done with @EnableRetry(proxyTargetClass=true)
If we are using annotations in Spring Batch, we need to add an additional dependency on Aspectj. Simply add to your pom.xml:
For Spring Boot use:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
Otherwise:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>