[源码解析]Oozie来龙去脉之内部执行

[源码解析]Oozie来龙去脉之内部执行

0x00 摘要

Oozie由Cloudera公司贡献给Apache的基于工作流引擎的开源框架,是用于Hadoop平台的开源的工作流调度引擎,用来管理Hadoop作业,进行。本文是系列的第二篇,介绍Oozie的内部执行阶段。

前文[源码解析]Oozie的来龙去脉 --- (1)提交任务阶段 已经为大家展示了用户提交一个Oozie Job之后做了什么,本文将沿着一个Workflow的执行流程为大家继续剖析Oozie接下来做什么。

大致如下:

  • 在Oozie中准备Yarn Application Master
  • 介绍新旧两版本的Yarn Application Master区别
  • 介绍Hive on Yarn
  • Tez是如何乱入到这个流程中的
  • Java on Yarn会是如何执行
  • Yarn Job结束之后如何返回Oozie

0x01 Oozie阶段

1.1 ActionStartXCommand

我们假设Workflow在start之后,就进入到了一个Hive命令。

ActionStartXCommand的主要作用就是和Yarn交互,最后提交一个Yarn Application Master

ActionStartXCommand是 WorkflowXCommand的子类。重点函数还是loadState和execute。

public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> {
    private String jobId = null;
    protected String actionId = null;
    protected WorkflowJobBean wfJob = null;
    protected WorkflowActionBean wfAction = null;
    private JPAService jpaService = null;
    private ActionExecutor executor = null;
    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
    private List<JsonBean> insertList = new ArrayList<JsonBean>();
    protected ActionExecutorContext context = null;  
}

loadState 的作用就是从数据库中获取 WorkflowJobBean 和 WorkflowActionBean 信息

protected void loadState() throws CommandException {
    try {
        jpaService = Services.get().get(JPAService.class);
        if (jpaService != null) {
            if (wfJob == null) {
                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW,jobId);
            }
            this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION,actionId);
        }
    }
}

execute函数如下。其主要业务就是executor.start(context,wfAction); 这里的executor是HiveActionExecutor。

@Override
protected ActionExecutorContext execute() throws CommandException {
    Configuration conf = wfJob.getWorkflowInstance().getConf();
    try {
        if(!caught) {
            // 这里是业务重点,就是启动任务
            executor.start(context,wfAction);
          
            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                } else {
                    wfAction.setPending();
                    if (!(executor instanceof ControlNodeActionExecutor)) {
                        queue(new ActionEndXCommand(wfAction.getId(),wfAction.getType()));
                    }
                    else {
                        execSynchronous = true;
                    }
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,wfAction));
        }
    }
    finally {
            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList,updateList,null);
            ......
            if (execSynchronous) {
                // Changing to synchronous call from asynchronous queuing to prevent
                // undue delay from ::start:: to action due to queuing
                callActionEnd();
            }
        }
    }
    return null;
}

ActionExecutor.start是异步的,还需要检查Action执行状态来推进流程,oozie通过两种方式来检查任务是否完成。

  • 回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie

  • 轮询:在任务执行回调失败的情况下,无论任何原因,都支持以轮询的方式进行查询。

oozie提供这两种方式来控制任务。后续我们会再提到。

1.2 HiveActionExecutor

上面代码中 executor.start(context,wfAction); 就是启动任务。

HiveActionExecutor继承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor继承 JavaActionExecutor,所以后续很多函数执行的是JavaActionExecutor中的函数。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {}

ActionExecutor.start就是执行的JavaActionExecutor.start()。

其会检查文件系统,比如hdfs是不是支持,Action Dir是否ready,然后会submitLauncher。

public void start(Context context,WorkflowAction action) throws ActionExecutorException {
        FileSystem actionFs = context.getAppFileSystem();
        prepareActionDir(actionFs,context);
        submitLauncher(actionFs,context,action); // 这里是业务
        check(context,action);
}

submitLauncher主要功能是:

  • 1)对于某些类型job,调用injectActionCallback配置回调Action
  • 2)配置 action job
  • 3)调用createLauncherConf配置LauncherAM,即Application Master
    • 3.1)配置回调conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL,callback);
    • 3.2)设置"launcher Main Class"。LauncherHelper.setupMainClass(launcherJobConf,getLauncherMain(launcherJobConf,actionXml));
  • 4)调用HadoopAccessorService.createYarnClient来创建一个YarnClient
  • 5)调用UserGroupInformation继续配置
  • 6)调用yarnClient.createApplication创建一个YarnClientApplication
  • 7)记录ApplicationId
  • 8)调用createAppSubmissionContext建立Yarn App的执行环境
    • 8.1)appContext.setApplicationType("Oozie Launcher");
    • 8.2)设置容器信息 ContainerLaunchContext
    • 8.3)vargs.add(LauncherAM.class.getCanonicalName()); 比如设置AM启动类
    • 8.4)return appContext;
  • 9)提交App,yarnClient.submitApplication(appContext); appContext就是前面return的。

具体代码如下:

public void submitLauncher(final FileSystem actionFs,final Context context,final WorkflowAction action)throws ActionExecutorException {
    YarnClient yarnClient = null;
    try {
        // action job configuration
        Configuration actionConf = loadHadoopDefaultResources(context,actionXml);
        setupActionConf(actionConf,actionXml,appPathRoot);
        addAppNameContext(context,action);
        setLibFilesArchives(context,appPathRoot,actionConf);
				// 配置回调Action
        injectActionCallback(context,actionConf);

        Configuration launcherConf = createLauncherConf(actionFs,action,actionConf);
        yarnClient = createYarnClient(context,launcherConf);
      
        //继续配置各种Credentials
        if (UserGroupInformation.isSecurityEnabled()) {
           ......
        }

        if (alreadyRunning && !isUserRetry) {
          ......
        }
        else {
            YarnClientApplication newApp = yarnClient.createApplication();
            ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
            ApplicationSubmissionContext appContext =
                    createAppSubmissionContext(appId,launcherConf,actionConf,credentials,actionXml);
            // 这里正式与 Yarn 交互。
            yarnClient.submitApplication(appContext);

            launcherId = appId.toString();
            ApplicationReport appReport = yarnClient.getApplicationReport(appId);
            consoleUrl = appReport.getTrackingUrl();
        }

        String jobTracker = launcherConf.get(HADOOP_YARN_RM);
        context.setStartData(launcherId,jobTracker,consoleUrl);
    }
}

protected YarnClient createYarnClient(Context context,Configuration jobConf) throws HadoopAccessorException {
        String user = context.getWorkflow().getUser();
        return Services.get().get(HadoopAccessorService.class).createYarnClient(user,jobConf);
}

0x2 旧版本LauncherMapper

这里我们有必要提一下旧版本的实现:LauncherMapper。

网上关于Oozie的文章很多都是基于旧版本,所以基本都提到了 LauncherMapper,比如:

Oozie本质就是一个作业协调工具(底层原理是通过将xml语言转换成mapreduce程序来做,但只是在集中map端做处理,避免shuffle的过程)。

Oozie执行Action时,即ActionExecutor(最主要的子类是JavaActionExecutor,hive、spark等action都是这个类的子类),JavaActionExecutor首先会提交一个LauncherMapper(map任务)到yarn,其中会执行LauncherMain(具体的action是其子类,比如JavaMain、SparkMain等),spark任务会执行SparkMain,在SparkMain中会调用org.apache.spark.deploy.SparkSubmit来提交任务。其实诉我的map任务就是识别你是什么样的任务(hive,shell,spark等),并通过该任务来启动任务所需要的环境来提交任务。提供了提交任务的接口(如hive任务,启动hive客户端或beeline等)

从文档看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko) 这时候被移除了。

我们从旧版本代码中大致看看LauncherMapper的实现。

LauncherMapper继承了 import org.apache.hadoop.mapred.Mapper;,实现了 map 函数。其内部就是调用用户代码的主函数。

import org.apache.hadoop.mapred.Mapper;

public class LauncherMapper<K1,V1,K2,V2> implements Mapper<K1,V2>,Runnable {
   @Override
    public void map(K1 key,V1 value,OutputCollector<K2,V2> collector,Reporter reporter) throws IOException {
        SecurityManager initialSecurityManager = System.getSecurityManager();
        try {
            else {
                String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS);

                    new LauncherSecurityManager();
                    setupHeartBeater(reporter);
                    setupMainConfiguration();
                    // Propagating the conf to use by child job.
                    propagateToHadoopConf();

                    executePrepare();
                    Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS,Object.class);
                    Method mainMethod = klass.getMethod("main",String[].class);
                    mainMethod.invoke(null,(Object) args);
             }
        }
    }
}

在LauncherMapperHelper中,会设置LauncherMapper为启动函数。

public static void setupLauncherInfo(JobConf launcherConf,String jobId,String actionId,Path actionDir,String recoveryId,Configuration actionConf,String prepareXML) throws IOException,HadoopAccessorException {
        launcherConf.setMapperClass(LauncherMapper.class);
}

在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient

import org.apache.hadoop.mapred.JobClient;

public void submitLauncher(FileSystem actionFs,Context context,WorkflowAction action) throws ActionExecutorException {
            jobClient = createJobClient(context,launcherJobConf);
            LauncherMapperHelper.setupLauncherInfo(launcherJobConf,jobId,actionId,actionDir,recoveryId,prepareXML);

            // Set the launcher Main Class
            LauncherMapperHelper.setupMainClass(launcherJobConf,actionXml)); 
            LauncherMapperHelper.setupMainArguments(launcherJobConf,args);
            ......
  
            runningJob = jobClient.submitJob(launcherJobConf);  // 这里进行了提交
}      

综上所述,旧版本 LauncherMapper 实现了一个 import org.apache.hadoop.mapred.Mapper;,具体是org.apache.hadoop.mapred.JobClient 负责与hadoop交互

0x3 新版本Yarn Application Master

新版本的Oozie是和Yarn深度绑定的,所以我们需要先介绍Yarn。

3. 1 YARN简介

YARN 是 Hadoop 2.0 中的资源管理系统,它的基本设计思想是将 MRv1 中的 JobTracker拆分成了两个独立的服务:一个全局的资源管理器 ResourceManager 和每个应用程序特有的ApplicationMaster。 其中 ResourceManager 负责整个系统的资源管理和分配, 而 ApplicationMaster负责单个应用程序的管理。

YARN 总体上仍然是 Master/Slave 结构,在整个资源管理框架中,ResourceManager 为Master,NodeManager 为 Slave,ResourceManager 负责对各个 NodeManager 上的资源进行统一管理和调度。

当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster, 它负责向 ResourceManager 申请资源,并要求 NodeManager 启动可以占用一定资源的任务。 由于不同的ApplicationMaster 被分布到不同的节点上,因此它们之间不会相互影响。

3.2 ApplicationMaster

用户提交的每个应用程序均包含一个 AM,主要功能包括:

  • 与 RM 调度器协商以获取资源(用 Container 表示);
  • 将得到的任务进一步分配给内部的任务;
  • 与 NM 通信以启动 / 停止任务;
  • 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

当用户向 YARN 中提交一个应用程序后, YARN 将分两个阶段运行该应用程序 :

  • 第一个阶段是启动 ApplicationMaster ;
  • 第二个阶段是由 ApplicationMaster 创建应用程序, 为它申请资源, 并监控它的整个运行过程, 直到运行完成。

工作流程分为以下几个步骤:

  1. 用 户 向 YARN 中 提 交 应 用 程 序, 其 中 包 括 ApplicationMaster 程 序、 启 动ApplicationMaster 的命令、 用户程序等。
  2. ResourceManager 为 该 应 用程 序 分 配 第 一 个 Container, 并 与 对应 的 NodeManager 通信,要求它在这个 Container 中启动应用程序的 ApplicationMaster。
  3. ApplicationMaster 首 先 向 ResourceManager 注 册, 这 样 用 户 可 以 直 接 通 过ResourceManage 查看应用程序的运行状态, 然后它将为各个任务申请资源, 并监控它的运行状态, 直到运行结束, 即重复步骤 4~7。
  4. ApplicationMaster 采用轮询的方式通过 RPC 协议向 ResourceManager 申请和领取资源。
  5. 一旦 ApplicationMaster 申请到资源后, 便与对应的 NodeManager 通信, 要求它启动任务。
  6. NodeManager 为任务设置好运行环境(包括环境变量、 JAR 包、 二进制程序等) 后, 将任务启动命令写到一个脚本中, 并通过运行该脚本启动任务。
  7. 各个任务通过某个 RPC 协议向 ApplicationMaster 汇报自己的状态和进度, 以让 ApplicationMaster 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
  8. 应用程序运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。

3.3 LauncherAM

LauncherAM就是Oozie的ApplicationMaster实现。LauncherAM.main就是Yarn调用之处。

public class LauncherAM {
  
    public static void main(String[] args) throws Exception {
        final LocalFsOperations localFsOperations = new LocalFsOperations();
        final Configuration launcherConf = readLauncherConfiguration(localFsOperations);
        UserGroupInformation.setConfiguration(launcherConf);
        // MRAppMaster adds this call as well,but it's included only in Hadoop 2.9+
        // SecurityUtil.setConfiguration(launcherConf);
        UserGroupInformation ugi = getUserGroupInformation(launcherConf);
        // Executing code inside a doAs with an ugi equipped with correct tokens.
        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                  LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(),new AMRMCallBackHandler(),new HdfsOperations(new SequenceFileWriterFactory()),new LocalFsOperations(),new PrepareActionsHandler(new LauncherURIHandlerFactory(null)),new LauncherAMCallbackNotifierFactory(),new LauncherSecurityManager(),sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()),launcherConf);
                    launcher.run();
                    return null;
            }
        });
    }  
}

launcher.run主要完成

通过registerWithRM调用AMRMClientAsync来注册到Resource Manager

  • executePrepare / setupMainConfiguration 完成初始化,准备和配置
  • runActionMain会根据配置调用具体的main函数,比如HiveMain
    • Class<?> klass = launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS,null);
    • Method mainMethod = klass.getMethod("main",String[].class);
    • mainMethod.invoke(null,(Object) mainArgs);
  • 调用uploadActionDataToHDFS同步HDFS
  • 调用unregisterWithRM从RM解绑
  • 调用LauncherAMCallbackNotifier.notifyURL通知Oozie

具体代码如下:

public void run() throws Exception {
    try {
        actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH));
        registerWithRM(amrmCallBackHandler);
        // Run user code without the AM_RM_TOKEN so users can't request containers
        UserGroupInformation ugi = getUserGroupInformation(launcherConf,AMRMTokenIdentifier.KIND_NAME);

        ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                executePrepare(errorHolder);
                setupMainConfiguration();
                runActionMain(errorHolder); // 会根据配置调用具体的main函数,比如HiveMain
                return null;
            }
        });
    } 
    finally {
        try {
            actionData.put(ACTION_DATA_FINAL_STATUS,actionResult.toString());
            hdfsOperations.uploadActionDataToHDFS(launcherConf,actionData);
        } finally {
            try {
                unregisterWithRM(actionResult,errorHolder.getErrorMessage());
            } finally {
                LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);
            }
        }
    }
}

但是你会发现,对比之前所说的ApplicationMaster应该实现的功能,LauncherAM 做得恁少了点,这是个疑问! 我们在后续研究中会为大家揭开这个秘密。

0x4 Hive on Yarn

上文提到,runActionMain会根据配置调用具体的main函数。我们假设是hive action,则对应的是HiveMain。

Hive job的入口函数是在HIVE_MAIN_CLASS_NAME配置的。

public class HiveActionExecutor extends ScriptLanguageActionExecutor {
    private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain";

	  @Override
    public List<Class<?>> getLauncherClasses() {
        List<Class<?>> classes = new ArrayList<Class<?>>();
        classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 这里配置了 HiveMain
        return classes;
    }  
}

HiveMain后续调用如下

HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);

最后调用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操作,大致有:

  • 设定参数;
  • 如果有脚本,则设定脚本路径;
  • 如果有之前的yarn child jobs,杀掉;
  • 执行hive;
  • 写log;

具体如下:

public class HiveMain extends LauncherMain {
    public static void main(String[] args) throws Exception {
        run(HiveMain.class,args);
    }
  
   @Override
    protected void run(String[] args) throws Exception {
        Configuration hiveConf = setUpHiveSite();
        List<String> arguments = new ArrayList<String>();

        String logFile = setUpHiveLog4J(hiveConf);
        arguments.add("--hiveconf");
        arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath());
        arguments.add("--hiveconf");
        arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath());

        //setting oozie workflow id as caller context id for hive
        String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID);
        arguments.add("--hiveconf");
        arguments.add("hive.log.trace.id=" + callerId);

        String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT);
        String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY);
        if (scriptPath != null) {
            ......
            // print out current directory & its contents
            File localDir = new File("dummy").getAbsoluteFile().getParentFile();
            String[] files = localDir.list();

            // Prepare the Hive Script
            String script = readStringFromFile(scriptPath);
            arguments.add("-f");
            arguments.add(scriptPath);
        } else if (query != null) {
            String filename = createScriptFile(query);
            arguments.add("-f");
            arguments.add(filename);
        } 

        // Pass any parameters to Hive via arguments
        ......
        String[] hiveArgs = ActionUtils.getStrings(hiveConf,HiveActionExecutor.HIVE_ARGS);
        for (String hiveArg : hiveArgs) {
            arguments.add(hiveArg);
        }
        LauncherMain.killChildYarnJobs(hiveConf);

        try {
            runHive(arguments.toArray(new String[arguments.size()]));
        }
        finally {
            writeExternalChildIDs(logFile,HIVE_JOB_IDS_PATTERNS,"Hive");
        }
    }  
}

因此我们能看到,Oozie ApplicationMaster 在被Yarn调用之后,就是通过org.apache.hadoop.hive.cli.CliDriver 给Hive发送命令让其执行,没有什么再和ResourceManager / NodeManager 交互的过程,这真的很奇怪。这个秘密要由下面的Tez来解答。

0x5 Tez计算框架

Tez是Apache开源的支持DAG作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业。

Tez有以下特点:

  • Apache二级开源项目
  • 运行在YARN之上
  • 适用于DAG(有向图)应用(同Impala、Dremel和Drill一样,可用于替换Hive/Pig等)

可以看到,Tez也是和Yarn深度绑定的。

5.1 DAGAppMaster

首先我们就找到了Tez对应的Application Master,即Tez DAG Application Master

public class DAGAppMaster extends AbstractService {
  public String submitDAGToAppMaster(DAGPlan dagPlan,Map<String,LocalResource> additionalResources) throws TezException {
      startDAG(dagPlan,additionalResources);
    }
  }  
}

我们能看到提交Application Master代码。

public class TezYarnClient extends FrameworkClient {
  @Override
  public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext)
      throws YarnException,IOException,TezException {
   	ApplicationId appId= yarnClient.submitApplication(appSubmissionContext);
    ApplicationReport appReport = getApplicationReport(appId);
    return appId;
  }
}

这里是建立Application Master context 代码,设置了Application Maste类和Container。

  public static ApplicationSubmissionContext createApplicationSubmissionContext(
      ApplicationId appId,DAG dag,String amName,AMConfiguration amConfig,LocalResource> tezJarResources,Credentials sessionCreds,boolean tezLrsAsArchive,TezApiVersionInfo apiVersionInfo,ServicePluginsDescriptor servicePluginsDescriptor,JavaOptsChecker javaOptsChecker)
      throws IOException,YarnException {

    // Setup the command to run the AM
    List<String> vargs = new ArrayList<String>(8);
    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");

    String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(),capability);
    vargs.add(amOpts);

    // 这里设置了 Application Master
    vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS);

    // 这里设置了命令行参数 
    Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());

    // 设置了container
    // Setup ContainerLaunchContext for AM container
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(amLocalResources,environment,vargsFinal,serviceData,securityTokens,acls);

    // Set up the ApplicationSubmissionContext
    ApplicationSubmissionContext appContext = Records
        .newRecord(ApplicationSubmissionContext.class);

    appContext.setAMContainerSpec(amContainer);

    return appContext;
}

5.2 与Resource Manager交互

这里只摘要部分代码,能看到Tez实现了与Yarn Resource Manager交互

YarnTaskSchedulerService实现了AMRMClientAsync.CallbackHandler,其功能是处理由Resource Manager收到的消息,其实现了方法

import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;

public class YarnTaskSchedulerService extends TaskScheduler
                             implements AMRMClientAsync.CallbackHandler {
  @Override
  public void onContainersAllocated(List<Container> containers) {
      if (!shouldReuseContainers) {
        List<Container> modifiableContainerList = Lists.newLinkedList(containers);
        assignedContainers = assignNewlyAllocatedContainers(
            modifiableContainerList);
      } 
    }
    // upcall to app must be outside locks
    informAppAboutAssignments(assignedContainers);
  }

  @Override
  public void onContainersCompleted(List<ContainerStatus> statuses) {
    synchronized (this) {
      for(ContainerStatus containerStatus : statuses) {
        ContainerId completedId = containerStatus.getContainerId();
        HeldContainer delayedContainer = heldContainers.get(completedId);

        Object task = releasedContainers.remove(completedId);
        appContainerStatus.put(task,containerStatus);
        continue;
       }

        // not found in released containers. check currently allocated containers
        // no need to release this container as the RM has already completed it
        task = unAssignContainer(completedId,false);
        if (delayedContainer != null) {
          heldContainers.remove(completedId);
          Resources.subtract(allocatedResources,delayedContainer.getContainer().getResource());
        } 
        if(task != null) {
          // completion of a container we have allocated currently
          // an allocated container completed. notify app. This will cause attempt to get killed
          appContainerStatus.put(task,containerStatus);
          continue;
        }
      }
    }

    // upcall to app must be outside locks
    for (Entry<Object,ContainerStatus> entry : appContainerStatus.entrySet()) {
      getContext().containerCompleted(entry.getKey(),entry.getValue());
    }
  }
}
  • onContainersAllocated : 当有新的Container 可以使用。这里时启动container 的代码。
  • onContainersCompleted 是Container 运行结束。 在onContainersCompleted 中,如果是失败的Container,我们需要重新申请并启动Container,成功的将做记录既可以。

由此我们可以看到,Oozie是一个甩手掌柜,他只管启动Hive,具体后续如何与RM交互,则完全由Tez搞定。这就解答了之前我们所有疑惑

最后总结下新流程:

  1. Oozie提交LauncherAM到Yarn;
  2. LauncherAM运行HiveMain,其调用CliDriver.main给Hive提交任务;
  3. Hive on Tez,所以Tez准备DAGAppMaster;
  4. Yarn与Tez交互:Tez提交DAGAppMaster到Yarn,Tez解析运行Hive命令;
  5. Hive运行结束后,调用回调 url 通知Oozie;

原谅我用这种办法画图,因为我最讨厌看到一篇好文,结果发现图没了......

+---------+                       +----------+                       +-----------+
|         | 1-submit LauncherAM   |          | 2.CliDriver.main      |           |  
|         |---------------------->| HiveMain |---------------------> |           |
|         |                       |          |                       |           |--+
| [Oozie] |                       |  [Yarn]  |                       |   [Hive]  |  | 3.Run 
|         |                       |          |                       |           |  | Hive     
|         | 5-notifyURL of Oozie  |          | 4-submit DAGAppMaster |           |<-+
|         |<----------------------|          | <-------------------->|    Tez    |
|         |                       |          |                       |           |
+---------+                       +----------+                       +-----------+

0x6 Java on Yarn

下面我们看看如果Oozie执行一个Java程序,是如何进行的。

Java程序的主执行函数是 JavaMain,这个就简单多了,就是直接调用用户的Java主函数。

public class JavaMain extends LauncherMain {
    public static final String JAVA_MAIN_CLASS = "oozie.action.java.main";

   /**
    * @param args Invoked from LauncherAM:run()
    * @throws Exception in case of error when running the application
    */
    public static void main(String[] args) throws Exception {
        run(JavaMain.class,args);
    }

    @Override
    protected void run(String[] args) throws Exception {

        Configuration actionConf = loadActionConf();
        setYarnTag(actionConf);
        setApplicationTags(actionConf,TEZ_APPLICATION_TAGS);
        setApplicationTags(actionConf,SPARK_YARN_TAGS);

        LauncherMain.killChildYarnJobs(actionConf);

        Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS,Object.class);
        Method mainMethod = klass.getMethod("main",String[].class);
        mainMethod.invoke(null,(Object) args);
    }
}

0x7 Yarn job 执行结束

7.1 检查任务机制

前面提到,ActionExecutor.start是异步的,还需要检查Action执行状态来推进流程,oozie通过两种方式来检查任务是否完成。

  • 回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie
  • 轮询:在任务执行回调失败的情况下,无论任何原因,都支持以轮询的方式进行查询。

oozie提供这两种方式来控制任务。

7.2 回调机制

LauncherAM 在用户程序执行完成之后,会做如下调用,以通知Oozie。这就用到了“回调”机制。

LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf);
                cn.notifyURL(actionResult);

Oozie的CallbackServlet会响应这个调用。可以看到,DagEngine.processCallback是Oozie处理程序结束之处。

public class CallbackServlet extends JsonRestServlet {
    @Override
    protected void doGet(HttpServletRequest request,HttpServletResponse response) throws ServletException,IOException {
        String queryString = request.getQueryString();
        CallbackService callbackService = Services.get().get(CallbackService.class);

        String actionId = callbackService.getActionId(queryString);

        DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine();

        dagEngine.processCallback(actionId,callbackService.getExternalStatus(queryString),null);
        }
    }
}

DagEngine.processCallback主要是使用CompletedActionXCommand来进行。可以看到这个命令是放到 CallableQueueService 的 queue中,所以下面我们需要介绍 CallableQueueService

 public void processCallback(String actionId,String externalStatus,Properties actionData)
          throws DagEngineException {
      XCallable<Void> command = new CompletedActionXCommand(actionId,externalStatus,actionData,HIGH_PRIORITY);
      if (!Services.get().get(CallableQueueService.class).queue(command)) {
          LOG.warn(XLog.OPS,"queue is full or system is in SAFEMODE,ignoring callback");
      }
}

7.3 异步执行

7.3.1 CallableQueueService

Oozie 使用 CallableQueueService 来异步执行操作;

public class CallableQueueService implements Service,Instrumentable {
    private final Map<String,AtomicInteger> activeCallables = new HashMap<String,AtomicInteger>();
    private final Map<String,Date> uniqueCallables = new ConcurrentHashMap<String,Date>();
    private final ConcurrentHashMap<String,Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
    private Set<String> interruptTypes;
    private int interruptMapMaxSize;
    private int maxCallableConcurrency;
    private int queueAwaitTerminationTimeoutSeconds;
    private int queueSize;
    private PriorityDelayQueue<CallableWrapper<?>> queue;
    private ThreadPoolExecutor executor;
    private Instrumentation instrumentation;
    private boolean newImpl = false;
    private AsyncXCommandExecutor asyncXCommandExecutor; 
  
    public void init(Services services) {
          queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES,MAX_CALLABLE_WAITTIME_MS,TimeUnit.MILLISECONDS,queueSize) {
                @Override
                protected boolean eligibleToPoll(QueueElement<?> element) {
                    if (element != null) {
                        CallableWrapper wrapper = (CallableWrapper) element;
                        if (element.getElement() != null) {
                            return callableReachMaxConcurrency(wrapper.getElement());
                        }
                    }
                    return false;
                }
            };  
    }
}

特点:

  • 加入执行队列的任务可能是可以立即被吊起的,也可能是未来某个时间才触发的。
  • 执行线程池根据 任务的执行时间和任务的优先级别来选取任务吊起。
  • 执行线程池的任务队列大小可配置,当到达队列最大值,线程池将不再接收任务。

7.3.3 PriorityDelayQueue

线程池选取的队列是oozie自定义的队列 PriorityDelayQueue:

特点:

根据队列中元素的延时时间以及其执行优先级出队列:

实现策略:

PriorityDelayQueue 中为每个优先级别的任务设置一个 延时队列 DelayQueue
因为使用的是jdk自带的延时队列 DelayQueue,可以保证的是如果任务在该队列中的延时时间满足条件,我们
通过poll()方法即可得到满足延时条件的任务,如果 poll()得到的是null,说明该队列的中任务没有满足时间条件的任务。

如何编排多个优先级的队列:
每次从PriorityDelayQueue去选取任务,都优先从最高优先级的队列来poll出任务,如果最高的优先级队列中没有满足条件的任务,则次优先级队列poll出任务,如果仍未获取
将按照队列优先等级以此类推。
饿死现象:假如高优先级中的任务在每次获取的时候都满足条件,这样容易将低优先级的队列中满足条件的任务活活饿死,为了防止这种情况的产生,在每次选取任务之前,遍历
低优先级队列任务,如果任务早已经满足出队列条件,如果超时时间超过了我们设定的最大值,我们会为这个任务提高优先级,将这个任务优先级加一,添加到上个优先级队列中进行
排队。

7.3.3 PollablePriorityDelayQueue

特点:

在从队列中选取任务的时候,先判断满足时间的任务是否满足并发等限制,如果满足再从队列中取出,而不是像PriorityDelayQueue那样,先取出如果不满足并发等限制,再将该任务重新放置回去。

任务类型:

使用线程池异步执行任务,任务和任务之间是无序的,针对具体的业务场景,可能执行的单元是需要串序执行的。oozie中封装了 CompositeCallable 和 一般的 XCallable的任务类型,前者是XCallable的一个集合,它能保证的是这个集合里面的XCallable是顺序执行的。

7.4 跳转下一个操作

CompletedActionXCommand 当Workflow command结束时候会执行,且只执行一次。对于程序结束,会在异步队列中加入一个 ActionCheckXCommand。

public class CompletedActionXCommand extends WorkflowXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {
        if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) {
           .....
        } else {    // RUNNING
            ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType());
            // this is done because oozie notifications (of sub-wfs) is send
            // every status change,not only on completion.
            if (executor.isCompleted(externalStatus)) {
                queue(new ActionCheckXCommand(this.wfactionBean.getId(),getPriority(),-1));
            }
        }
        return null;
    }  
}

异步调用到ActionCheckXCommand,其主要作用是:

  • 如果有重试机制,则做相应配置
  • 调用 executor.check(context,wfAction); 来检查环境信息
  • 更新数据库中的任务信息
  • 因为已经结束了,所以用ActionEndXCommand来执行结束
public class ActionCheckXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        ActionExecutorContext context = null;
        boolean execSynchronous = false;
        try {
            boolean isRetry = false; // 如果有重试机制,则做相应配置
            if (wfAction.getRetries() > 0) {
                isRetry = true;
            }
            boolean isUserRetry = false;
            context = new ActionXCommand.ActionExecutorContext(wfJob,wfAction,isRetry,isUserRetry);
          
            executor.check(context,wfAction); // 检查环境信息

            if (wfAction.isExecutionComplete()) {
                if (!context.isExecuted()) {
                    failJob(context);
                    generateEvent = true;
                } else {
                    wfAction.setPending();
                    execSynchronous = true;
                }
            }
            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK,wfAction));
            updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,wfJob));
        }
        finally {
                // 更新数据库中的任务信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null,null);
                if (generateEvent && EventHandlerService.isEnabled()) {
                    generateEvent(wfAction,wfJob.getUser());
                }
                if (execSynchronous) {
                    // 用ActionEndXCommand来执行结束
                    new ActionEndXCommand(wfAction.getId(),wfAction.getType()).call();
                }
        }
        return null;
    }
}

调用到 JavaActionExecutor.check

  • 根据配置信息建立 yarnClient = createYarnClient(context,jobConf);
  • 获取程序报告信息 ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
  • 获取程序数据 Map<String,String> actionData = LauncherHelper.getActionData(actionFs,jobConf);
  • 设置各种信息
@Override
public void check(Context context,WorkflowAction action) throws ActionExecutorException {
    boolean fallback = false;
    YarnClient yarnClient = null;
    try {
        Element actionXml = XmlUtils.parseXml(action.getConf());
        Configuration jobConf = createBaseHadoopConf(context,actionXml);
        FileSystem actionFs = context.getAppFileSystem();
        yarnClient = createYarnClient(context,jobConf); // 根据配置信息建立
        FinalApplicationStatus appStatus = null;
        try {
            final String effectiveApplicationId = findYarnApplicationId(context,action);
            final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId);
            final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 获取程序报告信息
            final YarnApplicationState appState = appReport.getYarnApplicationState();
            if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
                    || appState == YarnApplicationState.KILLED) {
                appStatus = appReport.getFinalApplicationStatus();
            }
        } 
        if (appStatus != null || fallback) {
            Path actionDir = context.getActionDir();
            // load sequence file into object
            Map<String,jobConf);   // 获取程序数据
            if (fallback) {
                String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
                if (finalStatus != null) {
                    appStatus = FinalApplicationStatus.valueOf(finalStatus);
                } else {
                    context.setExecutionData(FAILED,null);
                }
            }

            String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
            if (externalID != null) {
                context.setExternalChildIDs(externalID);
             }

           // Multiple child IDs - Pig or Hive action
            String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
            if (externalIDs != null) {
                context.setExternalChildIDs(externalIDs);
             }

            // 设置各种信息
            context.setExecutionData(appStatus.toString(),null);
            if (appStatus == FinalApplicationStatus.SUCCEEDED) {
                if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
                    context.setExecutionData(SUCCEEDED,PropertiesUtils.stringToProperties(actionData
                            .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
                }
                else {
                    context.setExecutionData(SUCCEEDED,null);
                }
                if (LauncherHelper.hasStatsData(actionData)) {
                    context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
                }
                getActionData(actionFs,context);
            }
            else {
                ......
                context.setExecutionData(FAILED_KILLED,null);
            }
        }
    }
    finally {
        if (yarnClient != null) {
            IOUtils.closeQuietly(yarnClient);
        }
    }
}

ActionEndXCommand会进行结束和跳转:

  • 调用Executor来完成结束操作 executor.end(context,wfAction);
  • 更新数据库的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete
  • 用 SignalXCommand 来进行跳转,进行下一个Action的执行
public class ActionEndXCommand extends ActionXCommand<Void> {
    @Override
    protected Void execute() throws CommandException {

        Configuration conf = wfJob.getWorkflowInstance().getConf();

        if (!(executor instanceof ControlNodeActionExecutor)) {
            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES,executor.getMaxRetries());
            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL,executor.getRetryInterval());
        }

        executor.setMaxRetries(maxRetries);
        executor.setRetryInterval(retryInterval);

        boolean isRetry = false;
        if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY
                || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) {
            isRetry = true;
        }
        boolean isUserRetry = false;
        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob,isUserRetry);
        try {
          
            executor.end(context,wfAction); // 调用Executor来完成结束操作

            if (!context.isEnded()) {
                failJob(context);
            } else {
                wfAction.setRetries(0);
                wfAction.setEndTime(new Date());

                boolean shouldHandleUserRetry = false;
                Status slaStatus = null;
                switch (wfAction.getStatus()) {
                    case OK:
                        slaStatus = Status.SUCCEEDED;
                        break;
                    ......
                }
                if (!shouldHandleUserRetry || !handleUserRetry(context,wfAction)) {
                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(),wfAction.getId(),slaStatus,SlaAppType.WORKFLOW_ACTION);
                    if(slaEvent != null) {
                        insertList.add(slaEvent);
                    }
                }
            }
            WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
            DagELFunctions.setActionInfo(wfInstance,wfAction);
            wfJob.setWorkflowInstance(wfInstance);

            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction));
            wfJob.setLastModifiedTime(new Date());
            updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,wfJob));
        }
        finally {
            try { 
                // 更新数据库的job信息
                BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList,null);
            }
            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                generateEvent(wfAction,wfJob.getUser());
            }
            new SignalXCommand(jobId,actionId).call(); // 进行跳转,进行下一个Action的执行
        }
        return null;
    }  
}

0xFF 参考

大数据之Oozie——源码分析(一)程序入口

什么是Oozie——大数据任务调度框架

Oozie基础小结

【原创】大数据基础之Oozie(1)简介、源代码解析

【原创】大叔经验分享(6)Oozie如何查看提交到Yarn上的任务日志

Oozie和Azkaban的技术选型和对比

Oozie-TransitionXCommand

Oozie-Service-CallableQueueService

YARN基本框架分析

Oozie任务调度阻塞及内存优化方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读5.3k次,点赞10次,收藏39次。本章详细写了mysql的安装,环境的搭建以及安装时常见的问题和解决办法。_mysql安装及配置超详细教程
文章浏览阅读1.8k次,点赞50次,收藏31次。本篇文章讲解Spark编程基础这门课程的期末大作业,主要围绕Hadoop基本操作、RDD编程、SparkSQL和SparkStreaming编程展开。_直接将第4题的计算结果保存到/user/root/lisi目录中lisipi文件里。
文章浏览阅读7.8k次,点赞9次,收藏34次。ES查询常用语法目录1. ElasticSearch之查询返回结果各字段含义2. match 查询3. term查询4. terms 查询5. range 范围6. 布尔查询6.1 filter加快查询效率的原因7. boosting query(提高查询)8. dis_max(最佳匹配查询)9. 分页10. 聚合查询【内含实际的demo】_es查询语法
文章浏览阅读928次,点赞27次,收藏18次。
文章浏览阅读1.1k次,点赞24次,收藏24次。作用描述分布式协调和一致性协调多个节点的活动,确保一致性和顺序。实现一致性、领导选举、集群管理等功能,确保系统的稳定和可靠性。高可用性和容错性Zookeeper是高可用的分布式系统,通过多个节点提供服务,容忍节点故障并自动进行主从切换。作为其他分布式系统的高可用组件,提供稳定的分布式协调和管理服务,保证系统的连续可用性。配置管理和动态更新作为配置中心,集中管理和分发配置信息。通过订阅机制,实现对配置的动态更新,以适应系统的变化和需求的变化。分布式锁和并发控制。
文章浏览阅读1.5k次,点赞26次,收藏29次。为贯彻执行集团数字化转型的需要,该知识库将公示集团组织内各产研团队不同角色成员的职务“职级”岗位的评定标准;
文章浏览阅读1.2k次,点赞26次,收藏28次。在安装Hadoop之前,需要进行以下准备工作:确认操作系统:Hadoop可以运行在多种操作系统上,包括Linux、Windows和Mac OS等。选择适合你的操作系统,并确保操作系统版本符合Hadoop的要求。安装Java环境:Hadoop是基于Java开发的,因此需要先安装和配置Java环境。确保已经安装了符合Hadoop版本要求的Java Development Kit (JDK),并设置好JAVA_HOME环境变量。确认硬件要求:Hadoop是一个分布式系统,因此需要多台计算机组成集群。
文章浏览阅读974次,点赞19次,收藏24次。# 基于大数据的K-means广告效果分析毕业设计 基于大数据的K-means广告效果分析。
文章浏览阅读1.7k次,点赞6次,收藏10次。Hadoop入门理论
文章浏览阅读1.3w次,点赞28次,收藏232次。通过博客和文献调研整理的一些农业病虫害数据集与算法。_病虫害数据集
文章浏览阅读699次,点赞22次,收藏7次。ZooKeeper使用的是Zab(ZooKeeper Atomic Broadcast)协议,其选举过程基于一种名为Fast Leader Election(FLE)的算法进行。:每个参与选举的ZooKeeper服务器称为一个“Follower”或“Candidate”,它们都有一个唯一的标识ID(通常是一个整数),并且都知道集群中其他服务器的ID。总之,ZooKeeper的选举机制确保了在任何时刻集群中只有一个Leader存在,并通过过半原则保证了即使部分服务器宕机也能维持高可用性和一致性。
文章浏览阅读10w+次,点赞62次,收藏73次。informatica 9.x是一款好用且功能强大的数据集成平台,主要进行各类数据库的管理操作,是使用相当广泛的一款ETL工具(注: ETL就是用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)到目的端的过程)。本文主要为大家图文详细介绍Windows10下informatica powercenter 9.6.1安装与配置步骤。文章到这里就结束了,本人是在虚拟机中装了一套win10然后在此基础上测试安装的这些软件,因为工作学习要分开嘛哈哈哈。!!!!!_informatica客户端安装教程
文章浏览阅读7.8w次,点赞245次,收藏2.9k次。111个Python数据分析实战项目,代码已跑通,数据可下载_python数据分析项目案例
文章浏览阅读1.9k次,点赞61次,收藏64次。TDH企业级一站式大数据基础平台致力于帮助企业更全面、更便捷、更智能、更安全的加速数字化转型。通过数年时间的打磨创新,已帮助数千家行业客户利用大数据平台构建核心商业系统,加速商业创新。为了让大数据技术得到更广泛的使用与应用从而创造更高的价值,依托于TDH强大的技术底座,星环科技推出TDH社区版(Transwarp Data Hub Community Edition)版本,致力于为企业用户、高校师生、科研机构以及其他专业开发人员提供更轻量、更简单、更易用的数据分析开发环境,轻松应对各类人员数据分析需求。_星环tdh没有hive
文章浏览阅读836次,点赞21次,收藏19次。
文章浏览阅读1k次,点赞21次,收藏15次。主要介绍ETL相关工作的一些概念和需求点
文章浏览阅读1.4k次。本文以Android、java为开发技术,实现了一个基于Android的博物馆线上导览系统 app。基于Android的博物馆线上导览系统 app的主要使用者分为管理员和用户,app端:首页、菜谱信息、甜品信息、交流论坛、我的,管理员:首页、个人中心、用户管理、菜谱信息管理、菜谱分类管理、甜品信息管理、甜品分类管理、宣传广告管理、交流论坛、系统管理等功能。通过这些功能模块的设计,基本上实现了整个博物馆线上导览的过程。
文章浏览阅读897次,点赞19次,收藏26次。1.背景介绍在当今的数字时代,数据已经成为企业和组织中最宝贵的资源之一。随着互联网、移动互联网和物联网等技术的发展,数据的产生和收集速度也急剧增加。这些数据包括结构化数据(如数据库、 spreadsheet 等)和非结构化数据(如文本、图像、音频、视频等)。这些数据为企业和组织提供了更多的信息和见解,从而帮助他们做出更明智的决策。业务智能(Business Intelligence,BI)...
文章浏览阅读932次,点赞22次,收藏16次。也就是说,一个类应该对自己需要耦合或调用的类知道的最少,类与类之间的关系越密切,耦合度越大,那么类的变化对其耦合的类的影响也会越大,这也是我们面向对象设计的核心原则:低耦合,高内聚。优秀的架构和产品都是一步一步迭代出来的,用户量的不断增大,业务的扩展进行不断地迭代升级,最终演化成优秀的架构。其根本思想是强调了类的松耦合,类之间的耦合越弱,越有利于复用,一个处在弱耦合的类被修改,不会波及有关系的类。缓存,从操作系统到浏览器,从数据库到消息队列,从应用软件到操作系统,从操作系统到CPU,无处不在。
文章浏览阅读937次,点赞22次,收藏23次。大数据可视化是关于数据视觉表现形式的科学技术研究[9],将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。将数据直观地展现出来,以帮助人们理解数据,同时找出包含在海量数据中的规律或者信息,更多的为态势监控和综合决策服务。数据可视化是大数据生态链的最后一公里,也是用户最直接感知数据的环节。数据可视化系统并不是为了展示用户的已知的数据之间的规律,而是为了帮助用户通过认知数据,有新的发现,发现这些数据所反映的实质。大数据可视化的实施是一系列数据的转换过程。