java线程池之CompletionService

2018-01-13

 

Future的局限

 

在使用线程池批量并行执行任务时,有时需要获取任务的返回值,此时一般可以使用Future实现异步接收,关于Future的相关使用以及实现原理在前一篇文章中分析过(如需详细了解,请点击这里)。但在批量等待获取返回结果时,有些局限,下面先看一个真实的场景。

 

在对一个页面进行并行渲染时,一般的做法是把页面分成多个模块,每个模块作为一个任务单独提交到线程池中进行并行渲染。在主线程中,需要接受每个模块渲染的结果,对页面进行渲染。如果直接使用Future,这时需要遍历每个模块渲染任务对应的Future,调用其get方法阻塞获取渲染结果,此时会存在多次阻塞,遍历伪代码如下:

 

for(Future oneFuture:futureList){Object obj = oneFuture.get(); // oneFuture.get() 可能会存在多次阻塞//省略使用obj渲染渲染页面代码。}

 

 

最理想的方式是futureList是一个排序后的列表,排前面的都是已经执行完成的,这时可以达到最高的并发效果(下一次循环时,有可能刚好下一个任务执行完成,主线程又可以继续执行自己的业务逻辑)。那要如何对这个列表排序呢?庆幸的是我们不必自己排序,java提供了现成的API: CompletionService,直接使用即可。

 

CompletionService的使用方法

 

CompletionService是一个接口类,目前其唯一实现类是ExecutorCompletionService。他的主要功能就是在批量提交的任务中,优先获取已经完成的任务(可以简单的理解为对任务执行完成的先后顺序进行排序)。具体使用方式如下:

 

public class CompletionServiceTest { public static void main(String[] args) throws Exception{//创建线程池Executor executor = Executors.newCachedThreadPool();Callable<String> temp = null;//创建任务列表Collection<Callable<String>> tasks = new ArrayList<>();for (int i=0;i<10;i++){temp = new PrintTask(i+"");tasks.add(temp);}solve(executor,tasks); } public static void solve(Executor e, Collection<Callable<String>> solvers)throws InterruptedException {//构建CompletionServiceCompletionService<String> ecs = new ExecutorCompletionService<String>(e);int n = solvers.size();List<Future<String>> futures = new ArrayList<Future<String>>(n);try {for (Callable<String> s : solvers) futures.add(ecs.submit(s));//提交多个任务 //获取任务执行结果for (int i = 0; i < n; ++i) { try {Future<String> future = ecs.take();//阻塞优先获取已经完成的任务FutureString r = future.get();//这步不会阻塞了System.out.println(r); } catch (ExecutionException ignore) {}}}finally {for (Future<String> f : futures) //判断是否需要取消任务 f.cancel(true);} }} class PrintTask implements Callable<String>{ private String taskname; public PrintTask(String taskname) {this.taskname = taskname; } @Override public String call() throws Exception {System.out.println("任务"+taskname+":执行中");Thread.sleep(Integer.parseInt(taskname)*1000);return "任务"+taskname+":执行完成;"; }}

 

 

本示例中通过CompletionService的take方法,每次获取到的都是最先完成任务对应的Future,由于任务已经完成,后面调用Future的get方法,就不会产生阻塞。这是如何实现的呢?下面开始对ExecutorCompletionService的实现原理进行分析。

 

ExecutorCompletionService的实现原理

 

ExecutorCompletionService在其内部维护了一个阻塞队列BlockingQueue,每当有任务执行完成后,都会放入这个队列。ExecutorCompletionService的take方法本质上是调用的BlockingQueue的take方法,如果队列中没有完成的任务,就阻塞;如果队列中有多个完成的任务,由于BlockingQueue(默认是LinkedBlockingQueue)是FIFO队列,每次take取出的都是优先完成的任务。这就是对ExecutorCompletionService实现原理简述,下面来来具体的实现。

 

构造方法

ExecutorCompletionService的构造方法有两个,一个是使用默认的LinkedBlockingQueue作为完成任务的存放队列,另一是使用传入的BlockingQueue 参数作为完成任务的存放队列:

//默认使用LinkedBlockingQueue作为存放队列public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } //使用自定义的BlockingQueue作为存放队列public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = completionQueue;}

 

提交任务submit方法

ExecutorCompletionService中有两个版本的submit方法,一个是Callable类型的参数;另一个是Runnable类型的参数。两个方法的实现基本相同,只是由于Runnable的run方法没有返回值,本质上差异就是需要把Runnable对象使用适配器模式封装成Callable对象(这里的适配器为RunnableAdapter,关于更多适配器模式的理解可以点击这里)。这里只对参数为Callable类型的submit方法进行讲解:

public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();//把Callable对象封装成FutureTask对象RunnableFuture<V> f = newTaskFor(task);//把FutureTask对象封装成QueueingFuture对象,并提交任务到线程池executor.execute(new QueueingFuture(f));return f;}

 

可以看到submit方法,本质上上只是先把Callable对象封装成FutureTask对象,在封装成QueueingFuture对象,然后提交到线程池中。相比于线程池的submit方法,该方法只多了一步:把把FutureTask对象封装成QueueingFuture对象。再来看下内部类QueueingFuture的实现,非常简单:

private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); }private final Future<V> task; }

 

QueueingFuture基础自FutureTask,重新了父类的done()方法(该方法在FutureTask中是空的)。done()方法的重新也很简单,只是往阻塞队列中添加该任务。回想下这个done()方式是在什么时候调用的?FutureTask的run方法在任务执行完成后会调用finishCompletion方法,finishCompletion方法末尾调用了done()方法。换句话说,让任务执行完成时会把自身放入ExecutorCompletionService维护的“已完成阻塞队列”:存在于这个队列中的任务都是已经完成的。

 

take方法

掉用ExecutorCompletionService的take方法,本质上调用的是“已完成阻塞队列”的take方法:

public Future<V> take() throws InterruptedException {return completionQueue.take();}

 

另外ExecutorCompletionService还提供了非阻塞的poll方法,以及延时阻塞的poll方法,本质上也是直接调用阻塞队列的对应poll方法:

public Future<V> poll() {return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException {return completionQueue.poll(timeout, unit);}

 

这两个方法,可以放到一个while循环中,当没有任务执行完成时,主线程可以做一些其他事情(或者sleep一会儿),防止线程阻塞,过一会儿再继续poll 以使并行执行效果最大化。

 

总结

 

CompletionService的主要作用就是优先返回线程池中已经执行完成的任务,尽量减少主线程的阻塞时间,是并行最大化。本质上通过维护一个“已完成的阻塞队列”实现。

 

 

 

javaExecutorCompletionServiceCompletionService
最新更新:

第七城市

栏目导航(关闭)