Spring Cloud Data Flow - Custom Task
Spring Cloud Data Flow
In this article we will look at Spring Cloud Data Flow (SCDF) and how to create a custom with the Command Line Interface, or CLI.
Requirements
We’ll assume you have Spring Cloud Data Flow up and running. If you don’t, go through intro and then come back.
Timestamp Task
In the intro, we setup & ran the Timestamp task. This task simply prints a timestamp to the log and exits. We can look at the source for that task here: https://github.com/spring-cloud-task-app-starters/timestamp/tree/master/spring-cloud-starter-task-timestamp
The first thing to notice is that the Timestamp task is a full-fledged Spring Boot application:
public class TimestampTask implements CommandLineRunner
The other thing to notice is that we have the @EnableTask
annotation on the application. This annotation just needs
to be on one of the @Configuration
classes in the app.
Note that only one of your configuration classes needs to have the @EnableTask annotation. Once you have an @EnableTask class in your configuration the task will have the Spring Cloud Task features available.
We can see from the run code in the app that it simply logs a timestamp:
logger.info(dateFormat.format(new Date()));
Developing a Task
Simply put, we create a Spring Boot application with the correct metadata and we have a runnable task.
With that in mind, we’ll create a project with Spring Boot Batch and Spring Boot Task to deploy into our data flow instance.
Custom task
In our example code project, we have a simple batch job that squares integers from 1 to 100 and prints the value to standard out. If we checkout the project and:
cd spring-cloud-data-flow-first-batch-job
mvn package
java -jar target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar
We can see the batch job run and print the squares of 1 … 100 along with the spring logging:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.2.1.RELEASE)
2019-12-31 08:12:38.749 INFO 26976 --- [ main] gCloudDataFlowFirstBatchJobApplicationKt : Starting SpringCloudDataFlowFirstBatchJobApplicationKt v0.0.1-SNAPSHOT on mikes-mbp.raleigh.ibm.com with PID 26976 (/Users/mike/code/public/examples/spring-cloud-data-flow-first-batch-job/target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar started by mike in /Users/mike/code/public/examples/spring-cloud-data-flow-first-batch-job)
2019-12-31 08:12:38.751 INFO 26976 --- [ main] gCloudDataFlowFirstBatchJobApplicationKt : No active profile set, falling back to default profiles: default
2019-12-31 08:12:39.648 INFO 26976 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$$bddc7f3] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-12-31 08:12:39.654 INFO 26976 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-12-31 08:12:39.677 INFO 26976 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$5433d1f6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-12-31 08:12:39.925 WARN 26976 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No datasource was provided...using a Map based JobRepository
2019-12-31 08:12:39.926 WARN 26976 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No transaction manager was provided, using a ResourcelessTransactionManager
2019-12-31 08:12:39.940 INFO 26976 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2019-12-31 08:12:39.991 INFO 26976 --- [ main] gCloudDataFlowFirstBatchJobApplicationKt : Started SpringCloudDataFlowFirstBatchJobApplicationKt in 1.552 seconds (JVM running for 1.925)
2019-12-31 08:12:39.992 INFO 26976 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: []
2019-12-31 08:12:40.027 INFO 26976 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=int-squaring-job]] launched with the following parameters: [{}]
2019-12-31 08:12:40.034 INFO 26976 --- [ main] o.s.c.t.b.l.TaskBatchExecutionListener : The job execution id 0 was run within the task execution 0
2019-12-31 08:12:40.043 INFO 26976 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [int-squaring-job-step1]
1
4
9
16
25
...
9801
10000
2019-12-31 08:12:40.197 INFO 26976 --- [ main] o.s.batch.core.step.AbstractStep : Step: [int-squaring-job-step1] executed in 154ms
2019-12-31 08:12:40.200 INFO 26976 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=int-squaring-job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 170ms
Now that we have our task defined in a jar, we can install it into Spring Cloud Data Flow.
Install via Shell
Copy jar to host
The first thing to do is copy the target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar to /tmp on the system running SCDF. We can use scp (note that for scp the port argument is a big P, unlike ssh):
scp -P 2222 target/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar mike@localhost:/tmp/.
Start SCDF shell
Since we’re using local Docker in this example, we’ll start SCDF’s shell like this:
docker exec -it dataflow-server java -jar shell.jar
For the shell reference guide, see here:
https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#shell
Register Application
First, we register our application with the system. You can do this from the Dashboard or the Shell - here is the shell command to register our jar that is in /tmp:
dataflow:>app register --name 'batch-int-squaring' --type task --uri file:///tmp/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar
Successfully registered application 'task:batch-int-squaring'
dataflow:>
Note Your jar must be in the container dataflow-server
’s /tmp directory. If you have it in /tmp/
on the host
machine, the following will put it into /tmp
in the container:
tar -c /tmp/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar . | docker cp - dataflow-server:/
Create Task
Next, we create a Task from the Application definition:
dataflow:>task create --name batch-int-squaring-task --definition 'batch-int-squaring'
Created new task 'batch-int-squaring-task'
dataflow:>
Launch Task
Finally, we can launch the task:
dataflow:>task launch batch-int-squaring-task
Launched task 'batch-int-squaring-task' with execution id 5
While the task is running, you can list executions and see that there is not start/end time or exit code:
dataflow:>task execution list
╔═══════════════════════╤══╤══════════╤════════╤═════════╗
║ Task Name │ID│Start Time│End Time│Exit Code║
╠═══════════════════════╪══╪══════════╪════════╪═════════╣
║batch-int-squaring-task│7 │ │ │ ║
╚═══════════════════════╧══╧══════════╧════════╧═════════╝
When the task is finished, you can see the start/end time and exit code. When the exit code is 0, we have successfully executed the task:
dataflow:>task execution list
╔═══════════════════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
║ Task Name │ID│ Start Time │ End Time │Exit Code║
╠═══════════════════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
║batch-int-squaring-task│7 │Tue Dec 31 15:38:29 GMT 2019│Tue Dec 31 15:38:30 GMT 2019│0 ║
╚═══════════════════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝
dataflow:>
We can see the log (with embedded newlines) with:
dataflow:>task execution log 7
"\n . ____ _ __ _ _\n /\\\\ / ___'_ __ _ _(_)_ __ __ _ \\ \\ \\ \\\n( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\\n \\\\/ ___)| |_)| | | | | || (_| | ) ) ) )\n ' |____| .__|_| |_|_| |_\\__, | / / / /\n =========|_|==============|___/=/_/_/_/\n :: Spring Boot :: (v2.2.1.RELEASE)\n\n2019-12-31 15:38:26.829 INFO 333 --- [ main] gCloudDataFlowFirstBatchJobApplicationKt : Starting SpringCloudDataFlowFirstBatchJobApplicationKt v0.0.1-SNAPSHOT on 9306c75c494d with PID 333 (/tmp/spring-cloud-data-flow-first-batch-job-0.0.1-SNAPSHOT.jar started by root in /tmp/79208763755798/batch-int-squaring-task-ffeb3175-1036-4480-abf3-84abd35a711a)\n2019-12-31 15:38:26.840 INFO 333 --- [ main] gCloudDataFlowFirstBatchJobApplicationKt : No active profile set, falling back to default profiles: default\n2019-12-31 15:38:28.669 INFO 333 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration' of type [org.springframework.cloud.task.batch.configuration.TaskBatchAutoConfiguration$$EnhancerBySpringCGLIB$$5d5f33a3] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)\n2019-12-31 15:38:28.686 INFO 333 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)\n2019-12-31 15:38:28.734 INFO 333 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration' of type [org.springframework.cloud.task.batch.listener.BatchEventAutoConfiguration$$EnhancerBySpringCGLIB$$a5b53da6] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)\n2019-12-31 15:38:29.056 INFO 333 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...\n2019-12-31 15:38:29.114 INFO 333 --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Driver does not support get/set network timeout for connections. (Not yet supported)\n2019-12-31 15:38:29.123 INFO 333 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.\n2019-12-31 15:38:29.376 INFO 333 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL\n2019-12-31 15:38:29.402 INFO 333 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.\n2019-12-31 15:38:29.711 INFO 333 --- [ main] gCloudDataFlowFirstBatchJobApplicationKt : Started SpringCloudDataFlowFirstBatchJobApplicationKt in 3.768 seconds (JVM running for 4.436)\n2019-12-31 15:38:29.712 INFO 333 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [--spring.cloud.data.flow.platformname=default, --spring.cloud.task.executionid=7]\n2019-12-31 15:38:29.789 INFO 333 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=int-squaring-job]] launched with the following parameters: [{-spring.cloud.data.flow.platformname=default, -spring.cloud.task.executionid=7}]\n2019-12-31 15:38:29.804 INFO 333 --- [ main] o.s.c.t.b.l.TaskBatchExecutionListener : The job execution id 2 was run within the task execution 7\n2019-12-31 15:38:29.822 INFO 333 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [int-squaring-job-step1]\n1\n4\n9\n16\n25\n36\n49\n64\n81\n100\n121\n144\n169\n196\n225\n256\n289\n324\n361\n400\n441\n484\n529\n576\n625\n676\n729\n784\n841\n900\n961\n1024\n1089\n1156\n1225\n1296\n1369\n1444\n1521\n1600\n1681\n1764\n1849\n1936\n2025\n2116\n2209\n2304\n2401\n2500\n2601\n2704\n2809\n2916\n3025\n3136\n3249\n3364\n3481\n3600\n3721\n3844\n3969\n4096\n4225\n4356\n4489\n4624\n4761\n4900\n5041\n5184\n5329\n5476\n5625\n5776\n5929\n6084\n6241\n6400\n6561\n6724\n6889\n7056\n7225\n7396\n7569\n7744\n7921\n8100\n8281\n8464\n8649\n8836\n9025\n9216\n9409\n9604\n9801\n10000\n2019-12-31 15:38:30.301 INFO 333 --- [ main] o.s.batch.core.step.AbstractStep : Step: [int-squaring-job-step1] executed in 479ms\n2019-12-31 15:38:30.312 INFO 333 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=int-squaring-job]] completed with the following parameters: [{-spring.cloud.data.flow.platformname=default, -spring.cloud.task.executionid=7}] and the following status: [COMPLETED] in 518ms\n2019-12-31 15:38:30.325 INFO 333 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...\n2019-12-31 15:38:30.329 INFO 333 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.\n"
dataflow:>
It’s pretty ugly, and I have not figured out how to “echo” it so the escapes are interpolated. There is an issue here to address this: https://github.com/spring-cloud/spring-cloud-dataflow/issues/3723