并行执行有向无环图

如何解决并行执行有向无环图

我有一个任务列表[Task-A,Task-B,Task-C,Task-D,...]
任务可以选择依赖于其他任务。
例如:
任务A具有3个从属任务:B,C和D
任务B具有2个相关任务:C和E
等等..

它基本上是一个有向非循环图,只有在独立任务执行成功或异常之后才执行任务。例如:任务:C和E独立应首先运行任务B和任务D最后是Task-A,这应该是执行的顺序。

Task datamodel
@Data //lambok
Public Class Task{
     Private String name;
     Private List<Task> dependentTasks;
     
     public void run(){
     // business logic
     }
}

解决方法

other answer 工作正常,但太复杂了。

更简单的方法是并行执行 Kahn's algorithm

关键是并行执行所有依赖的任务。

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


class DependencyManager {
private final ConcurrentHashMap<String,List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final  AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);

private static Runnable getRunnable(DependencyManager dependencyManager,String taskId){
    return () -> {
    try {
        Thread.sleep(2000);  // A task takes 2 seconds to finish.
        dependencyManager.taskCompleted(taskId);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    };
}

/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
    _dependencies.putIfAbsent(taskId,new ArrayList<>());
    _reverseDependencies.putIfAbsent(taskId,new ArrayList<>());
    _tasks.putIfAbsent(taskId,getRunnable(this,taskId));
    _numDependenciesExecuted.putIfAbsent(taskId,0);
}

private void addEdge(String dependentTaskId,String dependeeTaskId) {
    _dependencies.get(dependentTaskId).add(dependeeTaskId);
    _reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}

public void addDependency(String dependentTaskId,String dependeeTaskId) {
    addVertex(dependentTaskId);
    addVertex(dependeeTaskId);
    addEdge(dependentTaskId,dependeeTaskId);
}

private void taskCompleted(String taskId) {
    System.out.println(String.format("%s:: Task %s done!!",Instant.now(),taskId));
    _numTasksExecuted.incrementAndGet();
    _reverseDependencies.get(taskId).forEach(nextTaskId -> {
    _numDependenciesExecuted.computeIfPresent(nextTaskId,(__,currValue) -> currValue + 1);
    int numDependencies = _dependencies.get(nextTaskId).size();
    int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
    if (numDependenciesExecuted == numDependencies) {
        // All dependencies have been executed,so we can submit this task to the threadpool. 
        _executorService.submit(_tasks.get(nextTaskId));
    }
    });
    if (_numTasksExecuted.get() == _tasks.size()) {
    topoSortCompleted();
    }
}

private void topoSortCompleted() {
    System.out.println("Topo sort complete!!");
    _executorService.shutdownNow();
}

public void executeTopoSort() {
    System.out.println(String.format("%s:: Topo sort started!!",Instant.now()));
    _dependencies.forEach((taskId,dependencies) -> {
    if (dependencies.isEmpty()) {
        _executorService.submit(_tasks.get(taskId));
    }
    });
}
}

public class TestParallelTopoSort {

public static void main(String[] args) {
    DependencyManager dependencyManager = new DependencyManager();
    dependencyManager.addDependency("8","5");
    dependencyManager.addDependency("7","6");
    dependencyManager.addDependency("6","3");
    dependencyManager.addDependency("6","4");
    dependencyManager.addDependency("5","1");
    dependencyManager.addDependency("5","2");
    dependencyManager.addDependency("5","3");
    dependencyManager.addDependency("4","1");
    dependencyManager.executeTopoSort();
    // Parallel version takes 8 seconds to execute.
    // Serial version would have taken 16 seconds.

}
}

这个例子中构造的有向无环图是这样的:

Directed Acyclic Graph

,

我们可以创建一个DAG,其中图形的每个顶点都是任务之一。
之后,我们可以计算其topological sorted顺序。
然后,我们可以使用优先级字段装饰Task类,并使用ThreadPoolExecutor运行PriorityBlockingQueue,该run()使用优先级字段比较Tasks。

最后一个技巧是重写import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Testing { private static Callable<Void> getCallable(String taskId){ return () -> { System.out.println(String.format("Task %s result",taskId)); Thread.sleep(100); return null; }; } public static void main(String[] args) throws ExecutionException,InterruptedException { Callable<Void> taskA = getCallable("A"); Callable<Void> taskB = getCallable("B"); Callable<Void> taskC = getCallable("C"); Callable<Void> taskD = getCallable("D"); Callable<Void> taskE = getCallable("E"); PrioritizedFutureTask<Void> pfTaskA = new PrioritizedFutureTask<>(taskA); PrioritizedFutureTask<Void> pfTaskB = new PrioritizedFutureTask<>(taskB); PrioritizedFutureTask<Void> pfTaskC = new PrioritizedFutureTask<>(taskC); PrioritizedFutureTask<Void> pfTaskD = new PrioritizedFutureTask<>(taskD); PrioritizedFutureTask<Void> pfTaskE = new PrioritizedFutureTask<>(taskE); // Create a DAG graph. pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE); pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD); // Now that we have a graph,we can just get its topological sorted order. List<PrioritizedFutureTask<Void>> topological_sort = new ArrayList<>(); topological_sort.add(pfTaskE); topological_sort.add(pfTaskC); topological_sort.add(pfTaskB); topological_sort.add(pfTaskD); topological_sort.add(pfTaskA); ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,new PriorityBlockingQueue<Runnable>(1,new CustomRunnableComparator())); // Its important to insert the tasks in the topological sorted order,otherwise its possible that the thread pool will be stuck forever. for (int i = 0; i < topological_sort.size(); i++) { PrioritizedFutureTask<Void> pfTask = topological_sort.get(i); pfTask.setPriority(i); // The lower the priority,the sooner it will run. executor.execute(pfTask); } } } class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> { private Integer _priority = 0; private final Callable<T> callable; private final List<PrioritizedFutureTask> _dependencies = new ArrayList<>(); ; public PrioritizedFutureTask(Callable<T> callable) { super(callable); this.callable = callable; } public PrioritizedFutureTask(Callable<T> callable,Integer priority) { this(callable); _priority = priority; } public Integer getPriority() { return _priority; } public PrioritizedFutureTask<T> setPriority(Integer priority) { _priority = priority; return this; } public PrioritizedFutureTask<T> addDependency(PrioritizedFutureTask dep) { this._dependencies.add(dep); return this; } @Override public void run() { for (PrioritizedFutureTask dep : _dependencies) { try { dep.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } super.run(); } @Override public int compareTo(PrioritizedFutureTask<T> other) { if (other == null) { throw new NullPointerException(); } return getPriority().compareTo(other.getPriority()); } } class CustomRunnableComparator implements Comparator<Runnable> { @Override public int compare(Runnable task1,Runnable task2) { return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2); } } ,以首先等待所有相关任务完成。

由于每个任务都无限期地等待其相关任务完成,因此我们不能让线程池完全占据拓扑排序顺序中较高的任务;线程池将永远卡住。
为了避免这种情况,我们只需要根据拓扑顺序为任务分配优先级即可。

Task E result
Task C result
Task B result
Task D result
Task A result

输出:

1

PS:Here是经过测试且简单的Python拓扑排序实现,您可以轻松地将其移植到Java中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-