[源码解析]Oozie来龙去脉之提交任务

[源码解析]Oozie来龙去脉之提交任务

0x00 摘要

Oozie是由Cloudera公司贡献给Apache的基于工作流引擎的开源框架,是Hadoop平台的开源的工作流调度引擎,用来管理Hadoop作业。本文是系列的第一篇,介绍Oozie的任务提交阶段。

0x01 问题

我们从需求逆推实现,即考虑如果我们从无到有实现工作流引擎,我们需要实现哪些部分?从而我们可以提出一系列问题从而去Oozie中探寻。

作为工作流引擎需要实现哪些部分?大致想了想,觉得需要有:

  • 任务提交
  • 任务持久化
  • 任务委托给某一个执行器执行
  • 任务调度
  • 任务回调,即任务被执行器完成后通知工作流引擎
  • 支持不同任务(同步,异步)
  • 控制任务之间逻辑关系(跳转,等待...)
  • 状态监控,监控任务进度
  • ......

因为篇幅和精力所限,我们无法研究所有源码,回答所有问题,所以我们先整理出部分问题,在后面Oozie源码分析中一一解答:

  • Oozie分为几个模块?
  • 每个模块功能是什么?
  • Oozie如何提交任务?
  • 任务提交到什么地方?如何持久化?
  • Oozie任务有同步异步之分吗?
  • Oozie如何处理同步任务?
  • Oozie如何处理异步任务?
  • 任务的控制流节点(Control Flow Nodes)和动作节点(Action Nodes)之间如何跳转?
  • Oozie都支持什么类型的任务?Shell?Java? Hive?
  • Oozie如何同Yarn交互?
  • Oozie如何知道Yarn任务完成?

0x02 Oozie 基本概念

2.1 组件

Oozie由Oozie client和Oozie Server两个组件构成,Oozie Server是运行于Java Servlet容器(Tomcat)中的web应用程序。Oozie client用于给Oozie Server提及任务,Oozie client 提交任务的途径是HTTP请求。

实际上Oozie Server就相当于Hadoop的一个客户端,当用户需要执行多个关联的MR任务时,只需要将MR执行顺序写入workflow.xml,然后使用Oozie Server提交本次任务,Oozie Server会托管此任务流。

Oozie Server 具体操作的是workflow,即Oozie主要维护workflow的执行 / workflow内部Action的串联和跳转。

具体Action的执行是由Yarn去执行,Yarn会把Action分配给有充足资源的节点执行。Action是异步执行,所以Action结束时候会通过回调方式通知Oozie执行结果,Oozie也会采用轮询方式去获取Action结果(为了提高可靠性)。

大致提交流程如下:

Oozie client ------> Oozie  Server -------> Yarn ------> Hadoop

2.2 特点

Oozie特点如下:

  • Oozie不是仅用来配置多个MR工作流的,它可以是各种程序夹杂在一起的工作流,比如执行一个MR1后,接着执行一个java脚本,再执行一个shell脚本,接着是Hive脚本,然后又是Pig脚本,最后又执行了一个MR2,使用Oozie可以轻松完成这种多样的工作流。使用Oozie时,若前一个任务执行失败,后一个任务将不会被调度。
  • Oozie定义了控制流节点(Control Flow Nodes)和动作节点(Action Nodes),其中控制流节点定义了流程的开始和结束,以及控制流程的执行路径(Execution Path),如decision,fork,join等;而动作节点包括Haoop map-reduce hadoop文件系统,Pig,SSH,HTTP,eMail和Oozie子流程。
  • Oozie以action为基本单位,可以将多个action构成一个DAG图的模式运行。
  • Oozie工作流必须是一个有向无环图,实际上Oozie就相当于Hadoop的一个客户端,当用户需要执行多个关联的MR任务时,只需要将MR执行顺序写入workflow.xml,然后使用Oozie提交本次任务,Oozie会托管此任务流。

2.3 功能模块

Oozie主要由以下功能模块构成:

  • workflow(工作流):该组件用于定义和执行一个特定顺序的mapreduce,hive和pig作业。由我们需要处理的每个工作组成,进行需求的流式处理。
  • Coordinator(协调器):可将多个工作流协调成一个工作流来进行处理。多个workflow可以组成一个coordinator,可以把前几个workflow的输出作为后 一个workflow的输入,也可以定义workflow的触发条件,来做定时触发。
  • Bundle Job:绑定多个coordinator,一起提交或触发所有coordinator,是对一堆coordinator的抽象。
  • Oozie SLA(服务器等级协定):该组件支持workflow应用程序执行过程的记录跟踪。

我们就从无到有,看看一个Workflow从提交到最后是如何运行的,假设这个workflow开始后,进入一个hive action,这个hive本身配置的是由tez引擎执行 。下面是代码简化版。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive-wf">
  
    <start to="hive-node"/>

    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.5">
            <script>hive.sql</script>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
       <message>Hive failed,error message</message>
    </kill>
    
    <end name="end"/>
</workflow-app>

0x03 Oozie client

Oozie Client是用户用来提交任务给Oozie Server的途径,其可以启动任务,停止任务,提交任务,开始任务,查看任务执行情况。比如启动任务如下:

oozie job -oozie oozie_url -config job.properties_address -run

3.1 程序入口

既然有启动脚本,我们就直接去里面探寻程序入口。

${JAVA_BIN} ${OOZIE_CLIENT_OPTS} -cp ${OOZIECPPATH} org.apache.oozie.cli.OozieCLI "${@}"

这就看到了Client 的入口类,我们去看看。

public class OozieCLI {
      public static void main(String[] args) {
        if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
            System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP,"true");
        }
        System.exit(new OozieCLI().run(args));
    }
}

我们可以看到,经过验证之后,程序直接从main函数进入到了run函数。

public class OozieCLI {
     public synchronized int run(String[] args) {
        final CLIParser parser = getCLIParser();
        try {
            final CLIParser.Command command = parser.parse(args);
            String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION);

            if (doAsUser != null) {
                OozieClient.doAs(doAsUser,new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        processCommand(parser,command);
                        return null;
                    }
                });
            }
            else {
                processCommand(parser,command);
            }
            return 0;
        }
    }
}

看来主要的内容是在这个processCommand里面,其会根据命令调用相应的命令方法。通过command.getName()我们可以清楚的知道Oozie目前支持什么种类的任务,比如 JOB_CMD,JOBS_CMD,PIG_CMD,SQOOP_CMD,MR_CMD。

public void processCommand(CLIParser parser,CLIParser.Command command) throws Exception {
        switch (command.getName()) {
            case JOB_CMD:
                jobCommand(command.getCommandLine());
                break;
            case JOBS_CMD:
                jobsCommand(command.getCommandLine());
                break;
            case HIVE_CMD:
                scriptLanguageCommand(command.getCommandLine(),HIVE_CMD);
                break;
            ......
            default:
                parser.showHelp(command.getCommandLine());
        }
}

3.2 Hive为例

我们以Hive为例看看如何处理。Hive就是调用 scriptLanguageCommand。

private void scriptLanguageCommand(CommandLine commandLine,String jobType){
    List<String> args = commandLine.getArgList();
    try {
        XOozieClient wc = createXOozieClient(commandLine);
        Properties conf = getConfiguration(wc,commandLine);
        String script = commandLine.getOptionValue(SCRIPTFILE_OPTION);
        List<String> paramsList = new ArrayList<>();
        ......
        System.out.println(JOB_ID_PREFIX + wc.submitScriptLanguage(conf,script,args.toArray(new String[args.size()]),paramsList.toArray(new String[paramsList.size()]),jobType));      
    }
}

这里关键代码是:wc.submitScriptLanguage,所以我们需要看看XOozieClient.submitScriptLanguage。其注释表明作用是通过HTTP来提交 Pig 或者 Hive。

public String submitScriptLanguage(Properties conf,String scriptFile,String[] args,String[] params,String jobType) throws IOException,OozieClientException {

    switch (jobType) {
        case OozieCLI.HIVE_CMD:
            script = XOozieClient.HIVE_SCRIPT;
            options = XOozieClient.HIVE_OPTIONS;
            scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
            break;
        case OozieCLI.PIG_CMD:
            ......
    }

    conf.setProperty(script,readScript(scriptFile));
    setStrings(conf,options,args);
    setStrings(conf,scriptParams,params);

    return (new HttpJobSubmit(conf,jobType)).call();
}

HttpJobSubmit就是向Oozie Server提交job,所以我们最终是需要去Oozie Server探究。

private class HttpJobSubmit extends ClientCallable<String> {
    @Override
    protected String call(HttpURLConnection conn) throws IOException,OozieClientException {
        conn.setRequestProperty("content-type",RestConstants.XML_CONTENT_TYPE);
        writeToXml(conf,conn.getOutputStream());
        if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
            JSONObject json = (JSONObject) JSONValue.parse(
                    new InputStreamReader(conn.getInputStream(),StandardCharsets.UTF_8));
            return (String) json.get(JsonTags.JOB_ID);
        }
        return null;
    }
}

0x04 Oozie Server

4.1 我是个web程序

前面我们提到,Oozie Server是运行于Java Servlet容器(Tomcat)中的web应用程序。所以具体启动等配置信息是在web.xml中。很久没有看到web.xml了,突然觉得好陌生,嘿嘿。

<!-- Servlets -->
<servlet>
    <servlet-name>callback</servlet-name>
    <display-name>Callback Notification</display-name>
    <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
</servlet>

<servlet>
    <servlet-name>v1jobs</servlet-name>
    <display-name>WS API for Workflow Jobs</display-name>
    <servlet-class>org.apache.oozie.servlet.V1JobsServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
</servlet>

......

4.2 初始化服务

Ooize的很多基础工作是由Services来完成的,每一个service都是一个单例。这些服务的配置信息在ooze-default.xml中

<property>
    <name>oozie.services</name>
    <value>
        org.apache.oozie.service.HadoopAccessorService,org.apache.oozie.service.LiteWorkflowAppService,org.apache.oozie.service.JPAService,org.apache.oozie.service.DBLiteWorkflowStoreService,org.apache.oozie.service.CallbackService,org.apache.oozie.service.ActionService,org.apache.oozie.service.CallableQueueService,org.apache.oozie.service.CoordinatorEngineService,org.apache.oozie.service.BundleEngineService,org.apache.oozie.service.DagEngineService,......
    </value>
</property>

ServicesLoader这个类用来启动,加载配置的所有service。

public class ServicesLoader implements ServletContextListener {
    private static Services services;
    /**
     * Initialize Oozie services.
     */
    public void contextInitialized(ServletContextEvent event) {
        services = new Services();
        services.init();
    }
}

init函数是用来初始化所有配置好的Services,如果有同类型服务,则后来者会被存储。

public class Services {
      public void init() throws ServiceException {
 			 loadServices();	
      }  
  
    private void loadServices() throws ServiceException {
        try {
            Map<Class<?>,Service> map = new LinkedHashMap<Class<?>,Service>();
            Class<?>[] classes = ConfigurationService.getClasses(conf,CONF_SERVICE_CLASSES);
            Class<?>[] classesExt = ConfigurationService.getClasses(conf,CONF_SERVICE_EXT_CLASSES);

            List<Service> list = new ArrayList<Service>();
            loadServices(classes,list);
            loadServices(classesExt,list);

            //removing duplicate services,strategy: last one wins
            for (Service service : list) {
                if (map.containsKey(service.getInterface())) {
                      service.getClass());
                }
                map.put(service.getInterface(),service);
            }
            for (Map.Entry<Class<?>,Service> entry : map.entrySet()) {
                setService(entry.getValue().getClass());
            }
        } 
    }  
}

4.3 从Job到DAG

客户通过oozie脚本提交job之后,进入org.apache.oozie.cli.OozieCLI。会生成一个OozieClient,然后使用JobCommand,提交运行的信息到V1JosServlet的doPost接口,Oozier在doPos接口中会调用submitJob()方法。此时会生成一个DAG对象,然后DAG.submitJon(JobConf,startJob)。

我们从V1JosServlet.doPost入手。这里是基类。

public abstract class BaseJobsServlet extends JsonRestServlet {
      protected void doPost(HttpServletRequest request,HttpServletResponse response) throws ServletException,IOException {
         JSONObject json = submitJob(request,conf);
      }
}

然后回到 V1JosServlet.submitJob

@Override
protected JSONObject submitJob(HttpServletRequest request,Configuration conf) throws XServletException,IOException {
    String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);

    if (jobType == null) {
            String wfPath = conf.get(OozieClient.APP_PATH);
 
            if (wfPath != null) {
                json = submitWorkflowJob(request,conf); // 我们的目标在这里
            }
            else if (coordPath != null) {
                json = submitCoordinatorJob(request,conf);
            }
            else {
                json = submitBundleJob(request,conf);
            }
    }
    else { // This is a http submission job
       ......
    }
    return json;
}

然后调用到了 DagEngine.submitJob。从其注释可以看出 The DagEngine provides all the DAG engine functionality for WS calls. 这样我们就正式来到了DAG的世界

private JSONObject submitWorkflowJob(HttpServletRequest request,Configuration conf) throws XServletException {
    try {
        String action = request.getParameter(RestConstants.ACTION_PARAM);
        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
        if (action != null) {
            dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
        }
        if (dryrun) {
            id = dagEngine.dryRunSubmit(conf);
        }
        else {
            id = dagEngine.submitJob(conf,startJob); // 我们在这里
        }
        json.put(JsonTags.JOB_ID,id);
    }
    return json;
}

0x06 核心引擎

Oozie有三种核心引擎,其都是继承抽象类BaseEngine。

这三种引擎是:

  • DAGEngine,负责workflow执行,我们上面代码就会来到这里.....
  • CoordinatorEngine,负责coordinator执行
  • BundleEngine,负责bundle执行

分别对应

  • org.apache.oozie.service.CoordinatorEngineService
  • org.apache.oozie.service.BundleEngineService
  • org.apache.oozie.service.DagEngineService

我们之前提到,这些属于系统Services,都是Singletgon,在Oozie启动时候会加入到Services中。当需要时候通过get来获取。

public class Services {
		private Map<Class<? extends Service>,Service> services = new LinkedHashMap<Class<? extends Service>,Service>();
  
		public <T extends Service> T get(Class<T> serviceKlass) {
    		return (T) services.get(serviceKlass);
		}
}

具体在V1JosServlet中调用举例:

String user = conf.get(OozieClient.USER_NAME);
DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);

0x07 Command推动执行

Oozie把所有命令抽象成Command,这样其内部把程序执行总结成用Command来推动,类似于消息驱动

Command分为同步和异步。其基类都是XCommand。XCommand提供如下模式:

  • single execution: a command instance can be executed only once
  • eager data loading: loads data for eager precondition check
  • eager precondition check: verify precondition before obtaining lock
  • data loading: loads data for precondition check and execution
  • precondition check: verifies precondition for execution is still met
  • locking: obtains exclusive lock on key before executing the command
  • execution: command logic
public abstract class XCommand<T> implements XCallable<T> {
    ......
    private String key;
    private String name;
    private String type;
    private AtomicBoolean used = new AtomicBoolean(false);
    private Map<Long,List<XCommand<?>>> commandQueue;
    protected Instrumentation instrumentation;
    ......
}

XCommand的父接口XCallable继承了java.util.concurrent.Callable。最终目的是当异步执行时候,基于优先级来排列命令的执行计划。

所以XCommand的几个关键函数就是:queue,call,execute:

  • queue :向commandQueue加入一个command,这个command是在当前command执行之后,做delayed execution。在当前command执行过程中加入的具有同样的delay的commands,会后续顺序(single serial)执行。
  • call就是继承的Callable实现函数,会调用到execute。
  • execute则是具体Command实现自己的具体业务。

从我们常见的SubmitXCommand来看,继承关系如下:

public class SubmitXCommand extends WorkflowXCommand<String> 
public abstract class WorkflowXCommand<T> extends XCommand<T> 
public abstract class XCommand<T> implements XCallable<T> 
public interface XCallable<T> extends Callable<T> 

再比如TransitionXCommand的继承关系:

abstract class TransitionXCommand<T> extends XCommand<T> 
public abstract class SubmitTransitionXCommand extends TransitionXCommand<String>

从之前的组件可以看到,任务是有状态机的概念的,准备,开始,运行中,失败结束 等等,所以对任务进行操作的命令同时需要处理状态机的变化,oozie处理任务的命令都需要继承TransitionXCommand这个抽象类,而TransitionXCommand的父类是XCommand。

0x08 引擎处理提交

前面提到,doPost 会调用到 id = dagEngine.submitJob(conf,startJob);

我们看看DAGEngine是如何处理提交的任务。

首先通过SubmitXCommand直接运行其call()来提交job。

public String submitJob(Configuration conf,boolean startJob) throws DagEngineException {
    validateSubmitConfiguration(conf);
    try {
        String jobId;
        SubmitXCommand submit = new SubmitXCommand(conf);
        jobId = submit.call();
        if (startJob) {
            start(jobId);
        }
        return jobId;
    }
}

然后通过StartXCommand来启动Job。从注释中我们可以看到,此时依然是同步执行 (通过主动执行call()函数)。

public void start(String jobId) throws DagEngineException {
    // Changing to synchronous call from asynchronous queuing to prevent the
    // loss of command if the queue is full or the queue is lost in case of
    // failure.
    new StartXCommand(jobId).call();
}

8.1 SubmitXCommand

SubmitXCommand处理的是提交工作,将用户提交的任务解析后更新到数据库。

主要业务是在execute中实现。

  1. 解析配置,获取WorkflowApp
  2. 创建WorkflowInstance
  3. 生成 WorkflowJobBean
  4. 通过JPA保存WorkflowJobBean 到wf_jobs

代码摘要如下:

protected String execute() throws CommandException {

    WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
    try {
        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
        FileSystem fs = has.createFileSystem(user,uri,fsConf);

        // 解析配置,获取WorkflowApp
        WorkflowApp app = wps.parseDef(conf,defaultConf);

        // 创建WorkflowInstance
        WorkflowInstance wfInstance;
        wfInstance = workflowLib.createInstance(app,conf);

        // 生成 WorkflowJobBean
        WorkflowJobBean workflow = new WorkflowJobBean();
        workflow.setId(wfInstance.getId());
        workflow.setAppName(ELUtils.resolveAppName(app.getName(),conf));
        workflow.setAppPath(conf.get(OozieClient.APP_PATH));
        workflow.setConf(XmlUtils.prettyPrint(conf).toString());
        ......
        workflow.setWorkflowInstance(wfInstance);
        workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));

        if (!dryrun) {
            workflow.setSlaXml(jobSlaXml);
            // 添加到临时list
            insertList.add(workflow); 
            JPAService jpaService = Services.get().get(JPAService.class);
            if (jpaService != null) {
                // 保存WorkflowJobBean 到wf_jobs
    BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList,null,null);
                }
            }
            return workflow.getId();
        }
}

其中insertList是用来临时存储 WorkflowJobBean

private List<JsonBean> insertList = new ArrayList<JsonBean>();

WorkflowJobBean 对应数据库中表 WF_JOBS。

public class WorkflowJobBean implements Writable,WorkflowJob,JsonBean {
    ......//省略其他变量
    @Transient
    private List<WorkflowActionBean> actions;
}

在Oozie为了方便将用户定义的Action以及Workflow进行管理,底层使用Jpa将这些数据存储于数据库中。具体是调用executeBatchInsertUpdateDelete来通过JPA插入到数据库。

BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList,null);

具体BatchQueryExecutor代码如下。

public class BatchQueryExecutor {
    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList,Collection<UpdateEntry> updateList,Collection<JsonBean> deleteList) {
        List<QueryEntry> queryList = new ArrayList<QueryEntry>();
        JPAService jpaService = Services.get().get(JPAService.class);
        EntityManager em = jpaService.getEntityManager();

        if (updateList != null) {
            for (UpdateEntry entry : updateList) {
                Query query = null;
                JsonBean bean = entry.getBean();
                if (bean instanceof WorkflowJobBean) {
                    // 我们程序在这里
                    query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
                            (WorkflowJobQuery) entry.getQueryName(),(WorkflowJobBean) entry.getBean(),em);
                }
                else if (bean instanceof WorkflowActionBean) {
                    query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
                            (WorkflowActionQuery) entry.getQueryName(),(WorkflowActionBean) entry.getBean(),em);
                }
                else if {
                  //此处省略众多其他类型
                }
                queryList.add(new QueryEntry(entry.getQueryName(),query));
            }
        }
        // 这里插入数据库
        jpaService.executeBatchInsertUpdateDelete(insertList,queryList,deleteList,em);
    }  
}

JPA摘要代码如下:

public class JPAService implements Service,Instrumentable {
  
    private OperationRetryHandler retryHandler;
  
    public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans,final List<QueryEntry> updateQueryList,final Collection<JsonBean> deleteBeans,final EntityManager em) {
      
        try {
            retryHandler.executeWithRetry(new Callable<Void>() {

                public Void call() throws Exception {
                   ......
                    if (CollectionUtils.isNotEmpty(insertBeans)) {
                        for (final JsonBean bean : insertBeans) {
                            em.persist(bean);
                        }
                    }
                   ......
                }
            });
        }
    }
}

这样,一个Workflow Job就存储到了数据库中。

8.2 workflow生命周期

首先介绍下workflow生命周期,我们代码马上会用到PREP状态。

  • prep:一个工作流第一次创建就处于prep状态,表示工作流以及创建但是还没有运行。

  • running:当一个已经被创建的工作流job开始执行的时候,就处于running状态。它不会达到结束状态,只能因为出错而结束,或者被挂起。

  • suspended:一个running状态的工作流job会变成suspended状态,而且它会一直处于该状态,除非这个工作流job被重新开始执行或者被杀死。

  • killed:当一个工作流job处于被创建后的状态,或者处于running,suspended状态时,被杀死,则工作流job的状态变为killed状态。

  • failed:当一个工作流job不可预期的错误失败而终止,就会变为failed状态。

8.3 StartXCommand

处理完SubmitXCommand之后,Oozie Server 马上处理StartXCommand

StartXCommand 的作用是启动Command,其继承了SignalXCommand ,所以 StartXCommand(jobId).call();调用到了SignalXCommand的call。

public class StartXCommand extends SignalXCommand

相关代码如下:

首先,StartXCommand调用基类构造函数

public StartXCommand(String id) {
        super("start",1,id);
        InstrumentUtils.incrJobCounter(getName(),getInstrumentation());
}

然后,SignalXCommand得到了jobId,这个就是之前SubmitXCommand生成并且传回来的。

public SignalXCommand(String name,int priority,String jobId) {
    super(name,name,priority);
    this.jobId = ParamChecker.notEmpty(jobId,"jobId");
}

call()首先调用到 SignalXCommand.loadState。其会根据jobId从数据库中读取Workflow job信息。

protected void loadState() throws CommandException {
    try {
        jpaService = Services.get().get(JPAService.class);
        if (jpaService != null) {
            this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW,jobId);
            if (actionId != null) {
                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL,actionId);
            }
        }
}

SQL语句如下:

@NamedQuery(name = "GET_WORKFLOW",query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

call()接着调用SignalXCommand.execute(),这里具体操作如下:

  • 1)execute中,因为状态是PREP,所以调用workflowInstance.start,这里对应的实例是LiteWorkflowInstance
    • 1.1) LiteWorkflowInstance.start 调用 signal()
      • 1.1.1) signal() 调用 exiting = nodeHandler.enter(context),实际调用的是 LiteActionHandler.enter
        • 1.1.1.1) 调用 LiteWorkflowStoreService.liteExecute,这里是生成WorkflowActionBean,然后添加到临时变量ACTIONS_TO_START
          • 1.1.1.1.1) WorkflowActionBean action = new WorkflowActionBean();
          • 1.1.1.1.2) action.setJobId(jobId); 做其他各种设置
          • 1.1.1.1.3) List list = (List) context.getTransientVar(ACTIONS_TO_START);
          • 1.1.1.1.4) list.add(action); 添加到临时列表
    • 1.2) 回到 signal(),因为start 是 同步操作,所以exiting 为 true
      • 1.2.1) signal all new synch transitions。遍历 pathsToStar,如果有同步跳转,则开始进行后一步Action的跳转,即 signal(pathToStart,"::synch:

        版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。