如何解决并行执行有向无环图
我有一个任务列表[Task-A,Task-B,Task-C,Task-D,...]
。
任务可以选择依赖于其他任务。
例如:
任务A具有3个从属任务:B,C和D
任务B具有2个相关任务:C和E
等等..
它基本上是一个有向非循环图,只有在独立任务执行成功或异常之后才执行任务。例如:任务:C和E独立应首先运行任务B和任务D最后是Task-A,这应该是执行的顺序。
Task datamodel
@Data //lambok
Public Class Task{
Private String name;
Private List<Task> dependentTasks;
public void run(){
// business logic
}
}
解决方法
other answer 工作正常,但太复杂了。
更简单的方法是并行执行 Kahn's algorithm。
关键是并行执行所有依赖的任务。
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
class DependencyManager {
private final ConcurrentHashMap<String,List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String,Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);
private static Runnable getRunnable(DependencyManager dependencyManager,String taskId){
return () -> {
try {
Thread.sleep(2000); // A task takes 2 seconds to finish.
dependencyManager.taskCompleted(taskId);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
_dependencies.putIfAbsent(taskId,new ArrayList<>());
_reverseDependencies.putIfAbsent(taskId,new ArrayList<>());
_tasks.putIfAbsent(taskId,getRunnable(this,taskId));
_numDependenciesExecuted.putIfAbsent(taskId,0);
}
private void addEdge(String dependentTaskId,String dependeeTaskId) {
_dependencies.get(dependentTaskId).add(dependeeTaskId);
_reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}
public void addDependency(String dependentTaskId,String dependeeTaskId) {
addVertex(dependentTaskId);
addVertex(dependeeTaskId);
addEdge(dependentTaskId,dependeeTaskId);
}
private void taskCompleted(String taskId) {
System.out.println(String.format("%s:: Task %s done!!",Instant.now(),taskId));
_numTasksExecuted.incrementAndGet();
_reverseDependencies.get(taskId).forEach(nextTaskId -> {
_numDependenciesExecuted.computeIfPresent(nextTaskId,(__,currValue) -> currValue + 1);
int numDependencies = _dependencies.get(nextTaskId).size();
int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
if (numDependenciesExecuted == numDependencies) {
// All dependencies have been executed,so we can submit this task to the threadpool.
_executorService.submit(_tasks.get(nextTaskId));
}
});
if (_numTasksExecuted.get() == _tasks.size()) {
topoSortCompleted();
}
}
private void topoSortCompleted() {
System.out.println("Topo sort complete!!");
_executorService.shutdownNow();
}
public void executeTopoSort() {
System.out.println(String.format("%s:: Topo sort started!!",Instant.now()));
_dependencies.forEach((taskId,dependencies) -> {
if (dependencies.isEmpty()) {
_executorService.submit(_tasks.get(taskId));
}
});
}
}
public class TestParallelTopoSort {
public static void main(String[] args) {
DependencyManager dependencyManager = new DependencyManager();
dependencyManager.addDependency("8","5");
dependencyManager.addDependency("7","6");
dependencyManager.addDependency("6","3");
dependencyManager.addDependency("6","4");
dependencyManager.addDependency("5","1");
dependencyManager.addDependency("5","2");
dependencyManager.addDependency("5","3");
dependencyManager.addDependency("4","1");
dependencyManager.executeTopoSort();
// Parallel version takes 8 seconds to execute.
// Serial version would have taken 16 seconds.
}
}
这个例子中构造的有向无环图是这样的:
,我们可以创建一个DAG,其中图形的每个顶点都是任务之一。
之后,我们可以计算其topological sorted顺序。
然后,我们可以使用优先级字段装饰Task类,并使用ThreadPoolExecutor
运行PriorityBlockingQueue
,该run()
使用优先级字段比较Tasks。
最后一个技巧是重写import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Testing {
private static Callable<Void> getCallable(String taskId){
return () -> {
System.out.println(String.format("Task %s result",taskId));
Thread.sleep(100);
return null;
};
}
public static void main(String[] args) throws ExecutionException,InterruptedException {
Callable<Void> taskA = getCallable("A");
Callable<Void> taskB = getCallable("B");
Callable<Void> taskC = getCallable("C");
Callable<Void> taskD = getCallable("D");
Callable<Void> taskE = getCallable("E");
PrioritizedFutureTask<Void> pfTaskA = new PrioritizedFutureTask<>(taskA);
PrioritizedFutureTask<Void> pfTaskB = new PrioritizedFutureTask<>(taskB);
PrioritizedFutureTask<Void> pfTaskC = new PrioritizedFutureTask<>(taskC);
PrioritizedFutureTask<Void> pfTaskD = new PrioritizedFutureTask<>(taskD);
PrioritizedFutureTask<Void> pfTaskE = new PrioritizedFutureTask<>(taskE);
// Create a DAG graph.
pfTaskB.addDependency(pfTaskC).addDependency(pfTaskE);
pfTaskA.addDependency(pfTaskB).addDependency(pfTaskC).addDependency(pfTaskD);
// Now that we have a graph,we can just get its topological sorted order.
List<PrioritizedFutureTask<Void>> topological_sort = new ArrayList<>();
topological_sort.add(pfTaskE);
topological_sort.add(pfTaskC);
topological_sort.add(pfTaskB);
topological_sort.add(pfTaskD);
topological_sort.add(pfTaskA);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,new PriorityBlockingQueue<Runnable>(1,new CustomRunnableComparator()));
// Its important to insert the tasks in the topological sorted order,otherwise its possible that the thread pool will be stuck forever.
for (int i = 0; i < topological_sort.size(); i++) {
PrioritizedFutureTask<Void> pfTask = topological_sort.get(i);
pfTask.setPriority(i);
// The lower the priority,the sooner it will run.
executor.execute(pfTask);
}
}
}
class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {
private Integer _priority = 0;
private final Callable<T> callable;
private final List<PrioritizedFutureTask> _dependencies = new ArrayList<>();
;
public PrioritizedFutureTask(Callable<T> callable) {
super(callable);
this.callable = callable;
}
public PrioritizedFutureTask(Callable<T> callable,Integer priority) {
this(callable);
_priority = priority;
}
public Integer getPriority() {
return _priority;
}
public PrioritizedFutureTask<T> setPriority(Integer priority) {
_priority = priority;
return this;
}
public PrioritizedFutureTask<T> addDependency(PrioritizedFutureTask dep) {
this._dependencies.add(dep);
return this;
}
@Override
public void run() {
for (PrioritizedFutureTask dep : _dependencies) {
try {
dep.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
super.run();
}
@Override
public int compareTo(PrioritizedFutureTask<T> other) {
if (other == null) {
throw new NullPointerException();
}
return getPriority().compareTo(other.getPriority());
}
}
class CustomRunnableComparator implements Comparator<Runnable> {
@Override
public int compare(Runnable task1,Runnable task2) {
return ((PrioritizedFutureTask) task1).compareTo((PrioritizedFutureTask) task2);
}
}
,以首先等待所有相关任务完成。
由于每个任务都无限期地等待其相关任务完成,因此我们不能让线程池完全占据拓扑排序顺序中较高的任务;线程池将永远卡住。
为了避免这种情况,我们只需要根据拓扑顺序为任务分配优先级即可。
Task E result
Task C result
Task B result
Task D result
Task A result
输出:
1
PS:Here是经过测试且简单的Python拓扑排序实现,您可以轻松地将其移植到Java中。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。