如何解决基于任务权重设计Executor服务
需要帮助设计我的执行程序服务或使用现有功能(如果这些功能可用)。
比方说,我的总计算能力高达 10。 我将为每个任务分配一些权重(2,4,6)。 提交的任务应基于权重运行以使用最大 10。例如(2 个权重任务的 5 个线程,或 2,2,6 或 4,6)。
解决方法
也许,这是您需要的实施:
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
interface WeightageTask extends Runnable {
WeightageTask POISON = new WeightageTask() {};
default void run() {}
default int getWeightage() { return 0; }
}
public class WeightageExecutorService implements ExecutorService {
private volatile boolean shutdown;
private volatile boolean active;
private final Thread[] threads;
private final int numberOfThreads;
private final Queue<Queue<WeightageTask>> taskQueues;
private final Map<Integer,Queue<WeightageTask>> taskQueueByWeightage;
private final Map<Integer,Integer> threadCountAllocation;
protected WeightageExecutorService(Builder builder) {
taskQueues = new LinkedList<>();
taskQueueByWeightage = new HashMap<>();
threadCountAllocation = builder.threadCountAllocation;
numberOfThreads = threadCountAllocation.values()
.stream()
.reduce(0,Integer::sum);
threads = new Thread[numberOfThreads];
int threadIndex = 0;
for (Integer weightage : threadCountAllocation.keySet()) {
final int threadCount = threadCountAllocation.get(weightage);
for(int i = 0 ; i < threadCount ; ++i) {
threads[threadIndex] = new Thread(() -> {
while (true) {
try {
WeightageTask task = takeTask();
if (task == WeightageTask.POISON) {
break;
}
task.run();
}
catch (Throwable e) {
e.printStackTrace();
}
}
});
threads[threadIndex].setName("weightage-thread-" + weightage + "-" + (i + 1));
++ threadIndex;
}
}
for(Thread thread : threads) {
thread.start();
}
}
public void execute(WeightageTask task) {
final Integer weightage = task.getWeightage();
if(task != WeightageTask.POISON && !threadCountAllocation.containsKey(weightage)) {
throw new IllegalArgumentException("there is no allocated thread for weightage: " + weightage);
}
synchronized (taskQueues) {
final Queue<WeightageTask> taskQueue = taskQueueByWeightage
.computeIfAbsent(weightage,k -> new LinkedList<>());
final boolean addToQueue = taskQueue.isEmpty();
taskQueue.offer(task);
if (addToQueue) {
taskQueues.offer(taskQueue);
}
active = true;
taskQueues.notifyAll();
}
}
@Override
public void shutdown() {
for(int i = 0 ; i < numberOfThreads ; ++i) {
execute(WeightageTask.POISON);
}
shutdown = true;
}
@Override
public List<Runnable> shutdownNow() {
for(Thread thread : threads) {
thread.interrupt();
}
shutdown = true;
return taskQueues.stream()
.flatMap(Queue::stream)
.collect(Collectors.toList());
}
@Override
public boolean isShutdown() {
return shutdown;
}
@Override
public boolean isTerminated() {
return shutdown;
}
private WeightageTask takeTask() throws InterruptedException {
synchronized (taskQueues) {
while (!active) {
taskQueues.wait();
}
final Queue<WeightageTask> taskQueue = taskQueues.poll();
final WeightageTask task = taskQueue.poll();
if (!taskQueue.isEmpty()) {
taskQueues.offer(taskQueue);
}
active = !taskQueues.isEmpty();
return task;
}
}
public static Builder builder() {
return new Builder();
}
@Override
public void execute(Runnable command) {
execute((WeightageTask) command);
}
@Override
public boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
throw new UnsupportedOperationException("let implement by yourself");
}
@Override
public <T> Future<T> submit(Runnable task,T result) {
throw new UnsupportedOperationException("let implement by yourself");
}
@Override
public Future<?> submit(Runnable task) {
throw new UnsupportedOperationException("let implement by yourself");
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
throw new UnsupportedOperationException("let implement by yourself");
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit)
throws InterruptedException {
throw new UnsupportedOperationException("let implement by yourself");
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException,ExecutionException {
throw new UnsupportedOperationException("let implement by yourself");
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,TimeUnit unit)
throws InterruptedException,ExecutionException,TimeoutException {
throw new UnsupportedOperationException("let implement by yourself");
}
public static class Builder {
private final Map<Integer,Integer> threadCountAllocation = new HashMap<>();
public Builder allocateThreads(int weightage,int threadCount) {
this.threadCountAllocation.put(weightage,threadCount);
return this;
}
public WeightageExecutorService build() {
return new WeightageExecutorService(this);
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = WeightageExecutorService.builder()
.allocateThreads(2,4)
.allocateThreads(3,6)
.build();
executorService.execute(new WeightageTask() {
@Override
public void run() {
System.out.print("Hello ");
}
@Override
public int getWeightage() {
return 2;
}
});
executorService.execute(new WeightageTask() {
@Override
public void run() {
System.out.print("World ");
}
@Override
public int getWeightage() {
return 3;
}
});
Thread.sleep(1000);
executorService.shutdown();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。