开源基于docker的任务调度器pipeline,比`quartzs` 更强大的分布式任务调度器

pipeline 分布式任务调度器

目标: 基于docker的布式任务调度器, 比quartzs,xxl-job 更强大的分布式任务调度器。

可以将要执行的任务打包为docker镜像,或者选择已有镜像,自定义脚本程序,通过pipeline框架来实现调度。

开源地址: https://github.com/jadepeng/docker-pipeline

架构

架构

  • pipeline master 中心节点,管理和调度任务
  • pipeline agent 执行任务的节点,接收到任务后,调用docker执行pipeline任务

功能特性 && TODO List

进展

2021.07.31

  • 支持定时执行任务(固定周期和cron表达式)
  • 增加分布式mongodb锁,多master时,同时只能有一个master schedule任务

2021.07.28

  • 新增运行老版本pipeline任务能力
  • 增加日志接口

2021.07.27

  • 引入bk-job的ui,待修改

2021.07.21

  • Master 调用 agent执行任务
  • agnet 启动docker执行任务

2021.07.19

  • 基于jhipster搭建框架
  • 分布式实现

数据结构

一个pipeline 任务:

  • 支持多个pipelineTask
  • 一个pipelineTask 包含多个Step
@Data
public class Pipeline {

    @Id
    private String id;

    private String name;

    @JSONField(name = "pipeline")
    private List<PipelineTask> pipelineTasks = new ArrayList<>();

    private List<Network> networks = Lists.newArrayList(new Network());

    private List<Volume> volumes = Lists.newArrayList(new Volume());

    private String startNode;

    /**
     * 调度类型:
     *      1) CRON, 设置cronExpression
     *      2) FIX_RATE, 设置fixRateInSeconds
     */
    private ScheduleType scheduleType = ScheduleType.NONE;

    /**
     * CRON表达式,在scheduleType=CRON 时生效
     */
    private String cronExpression;

    /**
     * 固定周期运行,比如每隔多少s,在scheduleType=FIX_RATE 时生效
     */
    private int fixRateInSeconds;

    /**
     * 是否需要调度,为true时,才调度
     */
    @Indexed
    private boolean enableTrigger;

    private long lastTriggerTime;

    @Indexed
    private long nextTriggerTime;

    /**
     * 执行超时时间
     */
    private int executorTimeout;

    /**
     * 重试次数
     */
    private int executorFailRetryCount;

    /**
     * 内存限制
     */
    private String memory;

    /**
     * CPU 限制
     */
    private String cpu;

    @Data
    @Builder
    public static class PipelineTask {

        /**
         * 名称
         */
        String name;

        /**
         * 别名
         */
        String alias;

        /**
         * 依赖的pipelines,必须依赖的执行完成才能运行该PipelineTask
         */
        List<String> dependencies;

        /**
         * 任务步骤,顺序执行
         */
        List<Step> steps;
    }

    @Data
    public static class Network {
        String name = "pipeline_default";
        String driver = "bridge";
    }

    @Data
    public static class Volume {
        String name = "pipeline_default";
        String driver = "local";
    }

    @Data
    public static class StepNetwork {
        private String name;
        private List<String> aliases = Lists.newArrayList("default");

        public StepNetwork(String name) {
            this.name = name;
        }
    }

}

    

举例:

{
    "_id" : "29103d5e4a77409b9f6050eea8110bb3",
    "name" : "docker image pipeline",
    "pipelineTasks" : [ 
        {
            "name" : "docker image pipeline",
            "steps" : [ 
                {
                    "name" : "defaultJob",
                    "image" : "java-pipeline:1.0.1",
                    "workingDir" : "/workspace",
                    "environment" : {},
                    "networks" : [ 
                        {
                            "name" : "pipeline_default",
                            "aliases" : [ 
                                "default"
                            ]
                        }
                    ],
                    "onSuccess" : false,
                    "authConfig" : {}
                }
            ]
        }
    ],
    "networks" : [ 
        {
            "name" : "pipeline_default",
            "driver" : "bridge"
        }
    ],
    "volumes" : [ 
        {
            "name" : "pipeline_default",
            "driver" : "local"
        }
    ],
    "cronExpression" : "0 0 * * * ?",
    "fixRateInSeconds" : 0,
    "scheduleType" : "CRON",
    "enableTrigger" : true,
    "lastTriggerTime" : 1627744509047,
    "nextTriggerTime" : 1627747200000,
    "executorTimeout" : 0,
    "executorFailRetryCount" : 0,
    "isAvailable" : 1,
    "runningPipelines" : [],
    "finishedPipeliens" : [],
    "created_by" : "admin",
    "created_date" : "2021-07-20T04:33:16.477Z",
    "last_modified_by" : "system",
    "last_modified_date" : "2021-07-31T15:15:09.048Z"
}

使用说明

安装部署

编译

使用mvn编译

mvn package -DskipTests

部署master

根据需要,修改master的prod配置文件application-prod.yml

包含kafka配置,server端口,mongodb地址,jwt secret配置。

mongodb 会自动新建collection和初始化数据,无需手动导入数据。

kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      retries: 3
      batch-size: 2000
      buffer-memory: 33554432
    consumer:
      group-id: consumer-pipeline
      auto-offset-reset: earliest
      enable-auto-commit: true
      bootstrap-servers: 172.31.161.38:9092

server:
  port: 8080

spring:
  data:
    mongodb:
      uri: mongodb://127.0.0.1:28017
      database: pipeline

jhipster:
  security:
    authentication:
      jwt:
        base64-secret:

注意master的jwt secret需要和agent的保持一致。

配置好后,启动:

nohup java -jar pipeline-master-$version.jar --spring.profiles.active=prod &

可以将application-prod.yml放在和jar包同一目录。

部署agent

根据需要,修改master的prod配置文件application-prod.yml

包含:

  • eureka的defaultZone,配置master的地址
  • 端口
  • docker地址
    • docker-tls-verify: 是否启动tls验证
    • docker-cert-path:启动tls验证的ca证书
    • pipeline-log-path: 运行日志存储路径

eureka:
  instance:
    prefer-ip-address: true
  client:
    service-url:
      defaultZone: http://admin:${jhipster.registry.password}@127.0.0.1:8080/eureka/

server:
  port: 8081

application:
  docker-server: 
  docker-tls-verify: true
  docker-cert-path: /mnt/parastor/pipeline/ca/
  pipeline-log-path: /mnt/parastor/pipeline/logs/


jhipster:
  security:
    authentication:
      jwt:
        base64-secret:

执行老版本任务

POST /api/pipelines/exec-old


Body:
{
	"networks":[
		{
			"driver":"bridge",
			"name":"pipeline_network_3eac4b36209a41e58a5f22dd403fee50"
		}
	],
	"pipeline":[
		{
			"alias":"Word",
			"dependencies":[],
			"name":"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1",
			"nextPipelines":[],
			"steps":[
				{
					"alias":"Word",
					"auth_config":{},
					"command":[
						"echo $CI_SCRIPT | base64 -d | /bin/bash -e"
					],
					"entrypoint":[
						"/bin/bash",
						"-c"
					],
					"environment":{
						"CI_SCRIPT":"CmlmIFsgLW4gIiRDSV9ORVRSQ19NQUNISU5FIiBdOyB0aGVuCmNhdCA8PEVPRiA+ICRIT01FLy5uZXRyYwptYWNoaW5lICRDSV9ORVRSQ19NQUNISU5FCmxvZ2luICRDSV9ORVRSQ19VU0VSTkFNRQpwYXNzd29yZCAkQ0lfTkVUUkNfUEFTU1dPUkQKRU9GCmNobW9kIDA2MDAgJEhPTUUvLm5ldHJjCmZpCnVuc2V0IENJX05FVFJDX1VTRVJOQU1FCnVuc2V0IENJX05FVFJDX1BBU1NXT1JECnVuc2V0IENJX1NDUklQVAplY2hvICsgamF2YSAtY3AgL2RhdGF2b2x1bWUvcGRmX3RvX3dvcmQvcGRmYm94X3V0aWwtMS4wLVNOQVBTSE9ULmphciBjb20uaWZseXRlay5pbmRleGVyLlJ1bm5lciAtLWlucHV0UERGIC9kYXRhdm9sdW1lL2V4dHJhY3QvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkvcGRmL2VjNWMwYjk0M2QwYjRmNDI5MzcyMmE1ZGRjNjFlNjZkL0hTNy5wZGYgLS1vdXRwdXRXb3JkIC9kYXRhdm9sdW1lL2V4dHJhY3QvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkvcGRmVG9Xb3JkL2VjNWMwYjk0M2QwYjRmNDI5MzcyMmE1ZGRjNjFlNjZkLyAtLXNjaGVtYUlucHV0UGF0aCAvZGF0YXZvbHVtZS9leHRyYWN0L2ticWEvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkgLS1lbnRpdHlJbmRleFBhdGggL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGZUb1dvcmQvZW50aXR5IC0tZmllbGRJbmRleFBhdGggL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGZUb1dvcmQvZmllbGQgLS10eXBlIGx1Y2VuZSAtLW91dHB1dCAvZGF0YXZvbHVtZS9leHRyYWN0L2Y5MmMyYTc1YmFlODRiYjA4ODMyMDVkYjNmMmRhZTc5L3BkZlRvV29yZC9lYzVjMGI5NDNkMGI0ZjQyOTM3MjJhNWRkYzYxZTY2ZC9lbnRpdHlJbmZvLnR4dApqYXZhIC1jcCAvZGF0YXZvbHVtZS9wZGZfdG9fd29yZC9wZGZib3hfdXRpbC0xLjAtU05BUFNIT1QuamFyIGNvbS5pZmx5dGVrLmluZGV4ZXIuUnVubmVyIC0taW5wdXRQREYgL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGYvZWM1YzBiOTQzZDBiNGY0MjkzNzIyYTVkZGM2MWU2NmQvSFM3LnBkZiAtLW91dHB1dFdvcmQgL2RhdGF2b2x1bWUvZXh0cmFjdC9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OS9wZGZUb1dvcmQvZWM1YzBiOTQzZDBiNGY0MjkzNzIyYTVkZGM2MWU2NmQvIC0tc2NoZW1hSW5wdXRQYXRoIC9kYXRhdm9sdW1lL2V4dHJhY3Qva2JxYS9mOTJjMmE3NWJhZTg0YmIwODgzMjA1ZGIzZjJkYWU3OSAtLWVudGl0eUluZGV4UGF0aCAvZGF0YXZvbHVtZS9leHRyYWN0L2Y5MmMyYTc1YmFlODRiYjA4ODMyMDVkYjNmMmRhZTc5L3BkZlRvV29yZC9lbnRpdHkgLS1maWVsZEluZGV4UGF0aCAvZGF0YXZvbHVtZS9leHRyYWN0L2Y5MmMyYTc1YmFlODRiYjA4ODMyMDVkYjNmMmRhZTc5L3BkZlRvV29yZC9maWVsZCAtLXR5cGUgbHVjZW5lIC0tb3V0cHV0IC9kYXRhdm9sdW1lL2V4dHJhY3QvZjkyYzJhNzViYWU4NGJiMDg4MzIwNWRiM2YyZGFlNzkvcGRmVG9Xb3JkL2VjNWMwYjk0M2QwYjRmNDI5MzcyMmE1ZGRjNjFlNjZkL2VudGl0eUluZm8udHh0Cg=="
					},
					"image":"registry.iflyresearch.com/aimind/java:v1.0.0",
					"name":"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1",
					"networks":[
						{
							"aliases":[
								"default"
							],
							"name":"pipeline_network_3eac4b36209a41e58a5f22dd403fee50"
						}
					],
					"on_success":true,
					"volumes":[
						"pipeline_default:/aimind",
						"/mnt/parastor/aimind/shared/:/share",
						"/mnt/parastor/aimind/pipeline-jobs/2021/07/26/3eac4b36209a41e58a5f22dd403fee50:/workspace",
						"/mnt/parastor/aimind/datavolumes/carmaster:/datavolume"
					],
					"working_dir":"/workspace"
				}
			]
		}
	],
	"volumes":[
		{
			"driver":"local",
			"name":"pipeline_default"
		}
	]
}

成功返回:

{
    "retcode": "000000",
    "desc": "成功",
    "data": {
        "id": "8137f344-f52d-4595-bdbb-425363847b61",
    }
}

可根据id获取日志。

获取job执行日志

GET /api/pipelines/jobLog/{jobid}/

结果:

{
    "retcode": "000000",
    "desc": "成功",
    "data": {
        "currentTask": null,
        "logs": [
            {
                "id": "e76a686f68b64c0783b7721b058be137",
                "jobId": "8137f344-f52d-4595-bdbb-425363847b61",
                "status": "FINISHED",
                "taskName": "pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1",
                "exitedValue": 0,
                "logs": [
                    "proc \"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1\" started",
                    "pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1:+ java -cp /datavolume/pdf_to_word/pdfbox_util-1.0-SNAPSHOT.jar com.iflytek.indexer.Runner --inputPDF /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdf/ec5c0b943d0b4f4293722a5ddc61e66d/HS7.pdf --outputWord /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/ec5c0b943d0b4f4293722a5ddc61e66d/ --schemaInputPath /datavolume/extract/kbqa/f92c2a75bae84bb0883205db3f2dae79 --entityIndexPath /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/entity --fieldIndexPath /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/field --type lucene --output /datavolume/extract/f92c2a75bae84bb0883205db3f2dae79/pdfToWord/ec5c0b943d0b4f4293722a5ddc61e66d/entityInfo.txt",
                    "proc \"pipeline_task_3eac4b36209a41e58a5f22dd403fee50_1\" exited with status 0"
                ]
            }
        ],
        "exitedValue": 0,
        "status": "FINISHED",
        "pipelineJobSt": 1627477250599,
        "pipelineJobFt": 1627477274299
    }
}

周期任务

如果pipelien需要周期执行,需要配置enableTrigger为true,同时设置按照CRON或者FIX_RATE` 运行:

  • FIX_RATE: 固定周期,通过fixRateInSeconds配置周期运行时间

示例:每360秒运行一次:

{
    // pipeline ...
    "pipelineTasks" : [ ],
    "fixRateInSeconds" : 360,
    "scheduleType" : "FIX_RATE",
    "enableTrigger" : true
}
  • CRON: 按照CRON表达式周期执行,通过cronExpression配置.

示例:每小时开始的时候运行一次:

{
    // pipeline ...
    "pipelineTasks" : [ ],
    "cronExpression" : "0 0 * * * ?",
    "scheduleType" : "CRON",
    "enableTrigger" : true
}

更多待解锁

原文地址:https://www.cnblogs.com/xiaoqi/p/docker-pipeline.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 目录 连接 连接池产生原因 连接池实现原理 小结 TEMPERANCE:Eat not to dullness;drink not to elevation.节制
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 一个优秀的工程师和一个普通的工程师的区别,不是满天飞的架构图,他的功底体现在所写的每一行代码上。-- 毕玄 1. 命名风格 【书摘】类名用 UpperCamelC
今天犯了个错:“接口变动,伤筋动骨,除非你确定只有你一个人在用”。哪怕只是throw了一个新的Exception。哈哈,这是我犯的错误。一、接口和抽象类类,即一个对象。先抽象类,就是抽象出类的基础部分,即抽象基类(抽象类)。官方定义让人费解,但是记忆方法是也不错的 —包含抽象方法的类叫做抽象类。接口
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket一、引子文件,作为常见的数据源。关于操作文件的字节流就是 —FileInputStream&amp;FileOutputStream。
作者:泥沙砖瓦浆木匠网站:http://blog.csdn.net/jeffli1993个人签名:打算起手不凡写出鸿篇巨作的人,往往坚持不了完成第一章节。交流QQ群:【编程之美 365234583】http://qm.qq.com/cgi-bin/qm/qr?k=FhFAoaWwjP29_Aonqz
本文目录 线程与多线程 线程的运行与创建 线程的状态 1 线程与多线程 线程是什么? 线程(Thread)是一个对象(Object)。用来干什么?Java 线程(也称 JVM 线程)是 Java 进程内允许多个同时进行的任务。该进程内并发的任务成为线程(Thread),一个进程里至少一个线程。 Ja
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket在面向对象编程中,编程人员应该在意“资源”。比如?1String hello = &quot;hello&quot;; 在代码中,我们
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 这是泥瓦匠的第103篇原创 《程序兵法:Java String 源码的排序算法(一)》 文章工程:* JDK 1.8* 工程名:algorithm-core-le
摘要: 原创出处 https://www.bysocket.com 「公众号:泥瓦匠BYSocket 」欢迎关注和转载,保留摘要,谢谢! 目录 一、父子类变量名相同会咋样? 有个小故事,今天群里面有个人问下面如图输出什么? 我回答:60。但这是错的,答案结果是 40 。我知错能改,然后说了下父子类变
作者:泥瓦匠 出处:https://www.bysocket.com/2021-10-26/mac-create-files-from-the-root-directory.html Mac 操作系统挺适合开发者进行写代码,最近碰到了一个问题,问题是如何在 macOS 根目录创建文件夹。不同的 ma
作者:李强强上一篇,泥瓦匠基础地讲了下Java I/O : Bit Operation 位运算。这一讲,泥瓦匠带你走进Java中的进制详解。一、引子在Java世界里,99%的工作都是处理这高层。那么二进制,字节码这些会在哪里用到呢?自问自答:在跨平台的时候,就凸显神功了。比如说文件读写,数据通信,还
1 线程中断 1.1 什么是线程中断? 线程中断是线程的标志位属性。而不是真正终止线程,和线程的状态无关。线程中断过程表示一个运行中的线程,通过其他线程调用了该线程的 方法,使得该线程中断标志位属性改变。 深入思考下,线程中断不是去中断了线程,恰恰是用来通知该线程应该被中断了。具体是一个标志位属性,
Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocketReprint it anywhere u want需求 项目在设计表的时候,要处理并发多的一些数据,类似订单号不能重复,要保持唯一。原本以为来个时间戳,精确到毫秒应该不错了。后来觉得是错了,测试环境下很多一
纯技术交流群 每日推荐 - 技术干货推送 跟着泥瓦匠,一起问答交流 扫一扫,我邀请你入群 纯技术交流群 每日推荐 - 技术干货推送 跟着泥瓦匠,一起问答交流 扫一扫,我邀请你入群 加微信:bysocket01
Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocketReprint it anywhere u want.文章Points:1、介绍RESTful架构风格2、Spring配置CXF3、三层初设计,实现WebService接口层4、撰写HTTPClient 客户
Writer :BYSocket(泥沙砖瓦浆木匠)什么是回调?今天傻傻地截了张图问了下,然后被陈大牛回答道“就一个回调…”。此时千万个草泥马飞奔而过(逃哈哈,看着源码,享受着这种回调在代码上的作用,真是美哉。不妨总结总结。一、什么是回调回调,回调。要先有调用,才有调用者和被调用者之间的回调。所以在百
Writer :BYSocket(泥沙砖瓦浆木匠)一、什么大小端?大小端在计算机业界,Endian表示数据在存储器中的存放顺序。百度百科如下叙述之:大端模式,是指数据的高字节保存在内存的低地址中,而数据的低字节保存在内存的高地址中,这样的存储模式有点儿类似于把数据当作字符串顺序处理:地址由小向大增加
What is a programming language? Before introducing compilation and decompilation, let&#39;s briefly introduce the Programming Language. Programming la
Writer :BYSocket(泥沙砖瓦浆木匠)微 博:BYSocket豆 瓣:BYSocketFaceBook:BYSocketTwitter :BYSocket泥瓦匠喜欢Java,文章总是扯扯Java。 I/O 基础,就是二进制,也就是Bit。一、Bit与二进制什么是Bit(位)呢?位是CPU
Writer:BYSocket(泥沙砖瓦浆木匠)微博:BYSocket豆瓣:BYSocket一、前言 泥瓦匠最近被项目搞的天昏地暗。发现有些要给自己一些目标,关于技术的目标:专注很重要。专注Java 基础 + H5(学习) 其他操作系统,算法,数据结构当成课外书博览。有时候,就是那样你越是专注方面越