mikeski.net kotlin java javascript hugo development

mikeski.net development blog

Spring Boot Retry with Batch

Spring Boot’s Batch process is a very handy way to implement pipleine processing for our applications.

When writing a batch job, we need to consider what happens if a step fails - do we retry, fail the whole job, or log and continue?

Let’s look at how to do retry when a step fails with Spring Boot’s Batch.

If you need to, you can see the primer on Batch here

Spring Retry

From the documentation

To make processing more robust and less prone to failure, it sometimes helps to automatically retry a failed operation in case it might succeed on a subsequent attempt. Errors that are susceptible to intermittent failure are often transient in nature

Enable in configuration

To enable Spring Retry we need to add the @EnableRetry annotation to a configuration class, or to the main class of the application itself:

@SpringBootApplication
@EnableBatchProcessing
@EnableRetry(proxyTargetClass=true)
class TextFilteringSpringBootApplication {
    ...
}

Annotated service bean

Now, let’s create an ItemProcessor that will retry its process method 3 times if it fails. We’ll just make it fail every time it runs so we can also see the recover method get called when the method fails too many times:

@Service
@Retryable
class TextFilteringProcessorDeclarativeRetryDefaultConfiguration : ItemProcessor<String, String> {
    companion object {
        val LOG = LoggerFactory.getLogger(TextFilteringProcessorDeclarativeRetryDefaultConfiguration::class.toString())
    }
    @Retryable(Exception::class)
    @Throws(Exception::class)
    override fun process(stringToFilter: String): String? {
        LOG.warn("Processing String ${stringToFilter}");
        throw Exception("Simulating something failed")
    }

    @Recover
    fun recover(e: Exception, stringToFilter: String): String {
        LOG.warn("Recover called with exception ${e.message} for argument ${stringToFilter}");
        return "Error with filter"
    }
}

The process method throws an exception that Spring Retry will retry 3 times (in this case). If it fails, we have our recover method that takes the exception + all of the arguments that the process method takes and returns the same type as the process method.

Let’s talk about the annotations here:

  1. @Service means Spring creates this bean and will @Autowire it when we need it
  2. @Retryable on the class tells Spring that we are going to implement a @Retryable method and a @Recover method
  3. The method that is @Retryable is the method that Spring will re-run when it fails. @Throws(Exception::class) means that Kotlin will let the Exception that is thrown pass through.
  4. @Recover is the method that gets called if the @Retryable method fails 3 times. 3 retrys is the default, we will see how to change that later.

Running the job

Now let’s create a simple job to demonstrate what is happening. We’ll use the Reader and Writer from the previous Batch example:

@Bean(name=["Batch example 2"])
fun jobLogAllFiltereddTextItemsAndFailures(jbf: JobBuilderFactory, sbf: StepBuilderFactory): Job {
    return jbf.get("demo-text-job2").start(
            sbf.get("demo-text-step2")
                    .chunk<String, String>(1)
                    .reader(TextFilteringItemReader())
                    .processor(textFilteringProcessorDeclarativeRetryDefaultConfiguration)
                    .writer(TextFilteringItemWriter())
                    .build()).build()
}

When we run this, we can see the retry messages on standard out:

2019-11-24 09:17:09.295 INFO 82473 — [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [demo-text-step2]

2019-11-24 09:17:09.310 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has no bad words

2019-11-24 09:17:10.314 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has no bad words

2019-11-24 09:17:11.315 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has no bad words

2019-11-24 09:17:11.316 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Recover called with exception Simulating something failed for argument This has no bad words

Error with filter

2019-11-24 09:17:11.318 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has one damn bad word

2019-11-24 09:17:12.323 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has one damn bad word

2019-11-24 09:17:13.327 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Processing String This has one damn bad word

2019-11-24 09:17:13.328 WARN 82473 — [ main] ssorDeclarativeRetryDefaultConfiguration : Recover called with exception Simulating something failed for argument This has one damn bad word

Error with filter

The process method is called 3 times, fails 3 times and then the recover method is called. The return from the processor is the return value of the recover method and the ItemWriter prints Error with filter from the processor.

Configuring Backoff

Let’s say we wanted to start with a 1 second retry, and if the retry fails wait 2 seconds before retrying, then 4, finally 8 and then failing after that. That’s 5 tries total with 1 second retry interval with a 2x multiplier. Easy:

@Service
@Retryable
class TextFilteringProcessorDeclarativeRetryAnnotatedConfiguration : ItemProcessor<String, String> {
    companion object {
        val LOG = LoggerFactory.getLogger(TextFilteringProcessorDeclarativeRetryAnnotatedConfiguration::class.toString())
    }
    @Retryable(Exception::class, maxAttempts = 5, backoff = Backoff(delay = 1000L, multiplier = 2.0))
    @Throws(Exception::class)
    override fun process(stringToFilter: String): String? {
        LOG.warn("Processing String ${stringToFilter}");
        throw Exception("Simulating something failed")
    }

    @Recover
    fun recover(e: Exception, stringToFilter: String): String {
        LOG.warn("Recover called with exception ${e.message} for argument ${stringToFilter}");
        return "Error with filter"
    }
}

Now, let’s create a job using this ItemProcessor:

@Bean(name=["Batch example 3"])
    fun jobLogAllFiltereddTextItemsAndFailuresConfigured(jbf: JobBuilderFactory, sbf: StepBuilderFactory): Job {
        return jbf.get("demo-text-job3").start(
                sbf.get("demo-text-step3")
                        .chunk<String, String>(1)
                        .reader(TextFilteringItemReader())
                        .processor(textFilteringProcessorDeclarativeRetryAnnotatedConfiguration)
                        .writer(TextFilteringItemWriter())
                        .build()).build()
    }

Here is the output, let’s pay close attention to the timestamps to see the seconds between tries (1, 2, 4, 8):

2019-11-25 19:52:51.726 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words

2019-11-25 19:52:52.729 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words

2019-11-25 19:52:54.729 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words

2019-11-25 19:52:58.730 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words

2019-11-25 19:53:06.735 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Processing String This has no bad words

2019-11-25 19:53:06.736 WARN 94589 — [ main] orDeclarativeRetryAnnotatedConfiguration : Recover called with exception Simulating something failed for argument This has no bad words

Error with filter

Problems

org.springframework.retry.ExhaustedRetryException: Cannot locate recovery method

If we are implementing other interfaces with our @Retryable bean we need to tell AOP to use class based proxies instead of interface based proxies.

This is done with @EnableRetry(proxyTargetClass=true)

java.lang.NoClassDefFoundError or java.lang.ClassNotFoundException: org.aspectj.lang.annotation.Pointcut

If we are using annotations in Spring Batch, we need to add an additional dependency on Aspectj. Simply add to your pom.xml:

For Spring Boot use:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

Otherwise:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-aspects</artifactId>
</dependency>