Spring Cloud Data Flow - Custom Task

Thursday Dec 26, 2019

Spring Cloud Data Flow

In this article we will look at Spring Cloud Data Flow (SCDF) and how to create a custom with the Command Line Interface, or CLI.

Requirements

We’ll assume you have Spring Cloud Data Flow up and running. If you don’t, go through intro and then come back.

Timestamp Task

In the intro, we setup & ran the Timestamp task. This task simply prints a timestamp to the log and exits. We can look at the source for that task here: https://github.com/spring-cloud-task-app-starters/timestamp/tree/master/spring-cloud-starter-task-timestamp

The first thing to notice is that the Timestamp task is a full-fledged Spring Boot application:

    public class TimestampTask implements CommandLineRunner

The other thing to notice is that we have the @EnableTask annotation on the application. This annotation just needs to be on one of the @Configuration classes in the app.

Note that only one of your configuration classes needs to have the @EnableTask annotation. Once you have an @EnableTask class in your configuration the task will have the Spring Cloud Task features available.

We can see from the run code in the app that it simply logs a timestamp:

    logger.info(dateFormat.format(new Date()));

Developing a Task

Simply put, we create a Spring Boot application with the correct metadata and we have a runnable task.

With that in mind, we’ll create a project with Spring Boot Batch and Spring Boot Task to deploy into our data flow instance.

Custom task

In our example code project, we have a simple batch job that squares integers from 1 to 100 and prints the value to standard out. If we checkout the project and:

cd spring-cloud-data-flow-first-batch-job
mvn package
java -jar target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar

We can see the batch job run and print the squares of 1 … 100 along with the spring logging:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.1.RELEASE)

2019-12-31 08:12:38.749  INFO 26976 --- [           main] gCloudDataFlowFirstBatchJobApplicationKt : Starting SpringCloudDataFlowFirstBatchJobApplicationKt v0.0.1-SNAPSHOT on mikes-mbp.raleigh.ibm.com with PID 26976 (/Users/mike/code/public/examples/spring-cloud-data-flow-first-batch-job/target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar started by mike in /Users/mike/code/public/examples/spring-cloud-data-flow-first-batch-job)
2019-12-31 08:12:38.751  INFO 26976 --- [           main] gCloudDataFlowFirstBatchJobApplicationKt : No active profile set, falling back to default profiles: default
2019-12-31 08:12:39.648  INFO 26976 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$$bddc7f3] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-12-31 08:12:39.654  INFO 26976 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-12-31 08:12:39.677  INFO 26976 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$5433d1f6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-12-31 08:12:39.925  WARN 26976 --- [           main] o.s.b.c.c.a.DefaultBatchConfigurer       : No datasource was provided...using a Map based JobRepository
2019-12-31 08:12:39.926  WARN 26976 --- [           main] o.s.b.c.c.a.DefaultBatchConfigurer       : No transaction manager was provided, using a ResourcelessTransactionManager
2019-12-31 08:12:39.940  INFO 26976 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2019-12-31 08:12:39.991  INFO 26976 --- [           main] gCloudDataFlowFirstBatchJobApplicationKt : Started SpringCloudDataFlowFirstBatchJobApplicationKt in 1.552 seconds (JVM running for 1.925)
2019-12-31 08:12:39.992  INFO 26976 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
2019-12-31 08:12:40.027  INFO 26976 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=int-squaring-job]] launched with the following parameters: [{}]
2019-12-31 08:12:40.034  INFO 26976 --- [           main] o.s.c.t.b.l.TaskBatchExecutionListener   : The job execution id 0 was run within the task execution 0
2019-12-31 08:12:40.043  INFO 26976 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [int-squaring-job-step1]
1
4
9
16
25
...
9801
10000
2019-12-31 08:12:40.197  INFO 26976 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [int-squaring-job-step1] executed in 154ms
2019-12-31 08:12:40.200  INFO 26976 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=int-squaring-job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 170ms

Now that we have our task defined in a jar, we can install it into Spring Cloud Data Flow.

Install via Shell

Copy jar to host

The first thing to do is copy the target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar to /tmp on the system running SCDF. We can use scp (note that for scp the port argument is a big P, unlike ssh):

scp -P 2222 target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar mike@localhost:/tmp/.

Start SCDF shell

Since we’re using local Docker in this example, we’ll start SCDF’s shell like this:

docker exec -it dataflow-server java -jar shell.jar

For the shell reference guide, see here:

https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#shell

Register Application

First, we register our application with the system. You can do this from the Dashboard or the Shell - here is the shell command to register our jar that is in /tmp:

dataflow:>app register --name 'batch-int-squaring' --type task --uri file:///tmp/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar 
Successfully registered application 'task:batch-int-squaring'
dataflow:>

Note Your jar must be in the container dataflow-server’s /tmp directory. If you have it in /tmp/ on the host machine, the following will put it into /tmp in the container:

tar -c /tmp/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar . | docker cp - dataflow-server:/

Create Task

Next, we create a Task from the Application definition:

dataflow:>task create --name batch-int-squaring-task --definition 'batch-int-squaring'
Created new task 'batch-int-squaring-task'
dataflow:>

Launch Task

Finally, we can launch the task:

dataflow:>task launch batch-int-squaring-task
Launched task 'batch-int-squaring-task' with execution id 5

While the task is running, you can list executions and see that there is not start/end time or exit code:

dataflow:>task execution list
╔═══════════════════════╤══╤══════════╤════════╤═════════╗
║       Task Name       │ID│Start Time│End Time│Exit Code║
╠═══════════════════════╪══╪══════════╪════════╪═════════╣
║batch-int-squaring-task│7 │          │        │         ║
╚═══════════════════════╧══╧══════════╧════════╧═════════╝

When the task is finished, you can see the start/end time and exit code. When the exit code is 0, we have successfully executed the task:

dataflow:>task execution list
╔═══════════════════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
║       Task Name       │ID│         Start Time         │          End Time          │Exit Code║
╠═══════════════════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
║batch-int-squaring-task│7 │Tue Dec 31 15:38:29 GMT 2019│Tue Dec 31 15:38:30 GMT 2019│0        ║
╚═══════════════════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝

dataflow:>

We can see the log (with embedded newlines) with:

dataflow:>task execution log 7
"\n  .   ____          _            __ _ _\n /\\\\ / ___'_ __ _ _(_)_ __  __ _ \\ \\ \\ \\\n( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\\n \\\\/  ___)| |_)| | | | | || (_| |  ) ) ) )\n  '  |____| .__|_| |_|_| |_\\__, | / / / /\n =========|_|==============|___/=/_/_/_/\n :: Spring Boot ::        (v2.2.1.RELEASE)\n\n2019-12-31 15:38:26.829  INFO 333 --- [           main] gCloudDataFlowFirstBatchJobApplicationKt : Starting SpringCloudDataFlowFirstBatchJobApplicationKt v0.0.1-SNAPSHOT on 9306c75c494d with PID 333 (/tmp/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar started by root in /tmp/79208763755798/batch-int-squaring-task-ffeb3175-1036-4480-abf3-84abd35a711a)\n2019-12-31 15:38:26.840  INFO 333 --- [           main] gCloudDataFlowFirstBatchJobApplicationKt : No active profile set, falling back to default profiles: default\n2019-12-31 15:38:28.669  INFO 333 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$$5d5f33a3] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)\n2019-12-31 15:38:28.686  INFO 333 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)\n2019-12-31 15:38:28.734  INFO 333 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$a5b53da6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)\n2019-12-31 15:38:29.056  INFO 333 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...\n2019-12-31 15:38:29.114  INFO 333 --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Driver does not support get/set network timeout for connections. (Not yet supported)\n2019-12-31 15:38:29.123  INFO 333 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.\n2019-12-31 15:38:29.376  INFO 333 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL\n2019-12-31 15:38:29.402  INFO 333 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.\n2019-12-31 15:38:29.711  INFO 333 --- [           main] gCloudDataFlowFirstBatchJobApplicationKt : Started SpringCloudDataFlowFirstBatchJobApplicationKt in 3.768 seconds (JVM running for 4.436)\n2019-12-31 15:38:29.712  INFO 333 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: [--spring.cloud.data.flow.platformname=default, --spring.cloud.task.executionid=7]\n2019-12-31 15:38:29.789  INFO 333 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=int-squaring-job]] launched with the following parameters: [{-spring.cloud.data.flow.platformname=default, -spring.cloud.task.executionid=7}]\n2019-12-31 15:38:29.804  INFO 333 --- [           main] o.s.c.t.b.l.TaskBatchExecutionListener   : The job execution id 2 was run within the task execution 7\n2019-12-31 15:38:29.822  INFO 333 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [int-squaring-job-step1]\n1\n4\n9\n16\n25\n36\n49\n64\n81\n100\n121\n144\n169\n196\n225\n256\n289\n324\n361\n400\n441\n484\n529\n576\n625\n676\n729\n784\n841\n900\n961\n1024\n1089\n1156\n1225\n1296\n1369\n1444\n1521\n1600\n1681\n1764\n1849\n1936\n2025\n2116\n2209\n2304\n2401\n2500\n2601\n2704\n2809\n2916\n3025\n3136\n3249\n3364\n3481\n3600\n3721\n3844\n3969\n4096\n4225\n4356\n4489\n4624\n4761\n4900\n5041\n5184\n5329\n5476\n5625\n5776\n5929\n6084\n6241\n6400\n6561\n6724\n6889\n7056\n7225\n7396\n7569\n7744\n7921\n8100\n8281\n8464\n8649\n8836\n9025\n9216\n9409\n9604\n9801\n10000\n2019-12-31 15:38:30.301  INFO 333 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [int-squaring-job-step1] executed in 479ms\n2019-12-31 15:38:30.312  INFO 333 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=int-squaring-job]] completed with the following parameters: [{-spring.cloud.data.flow.platformname=default, -spring.cloud.task.executionid=7}] and the following status: [COMPLETED] in 518ms\n2019-12-31 15:38:30.325  INFO 333 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...\n2019-12-31 15:38:30.329  INFO 333 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.\n"
dataflow:>

It’s pretty ugly, and I have not figured out how to “echo” it so the escapes are interpolated. There is an issue here to address this: https://github.com/spring-cloud/spring-cloud-dataflow/issues/3723