1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 /**
  39  * A {@link CompletionService} that uses a supplied {@link Executor}
  40  * to execute tasks.  This class arranges that submitted tasks are,
  41  * upon completion, placed on a queue accessible using {@code take}.
  42  * The class is lightweight enough to be suitable for transient use
  43  * when processing groups of tasks.
  44  *
  45  * <p>
  46  *
  47  * <b>Usage Examples.</b>
  48  *
  49  * Suppose you have a set of solvers for a certain problem, each
  50  * returning a value of some type {@code Result}, and would like to
  51  * run them concurrently, processing the results of each of them that
  52  * return a non-null value, in some method {@code use(Result r)}. You
  53  * could write this as:
  54  *
  55  * <pre> {@code
  56  * void solve(Executor e,
  57  *            Collection<Callable<Result>> solvers)
  58  *     throws InterruptedException, ExecutionException {
  59  *     CompletionService<Result> ecs
  60  *         = new ExecutorCompletionService<Result>(e);
  61  *     for (Callable<Result> s : solvers)
  62  *         ecs.submit(s);
  63  *     int n = solvers.size();
  64  *     for (int i = 0; i < n; ++i) {
  65  *         Result r = ecs.take().get();
  66  *         if (r != null)
  67  *             use(r);
  68  *     }
  69  * }}</pre>
  70  *
  71  * Suppose instead that you would like to use the first non-null result
  72  * of the set of tasks, ignoring any that encounter exceptions,
  73  * and cancelling all other tasks when the first one is ready:
  74  *
  75  * <pre> {@code
  76  * void solve(Executor e,
  77  *            Collection<Callable<Result>> solvers)
  78  *     throws InterruptedException {
  79  *     CompletionService<Result> ecs
  80  *         = new ExecutorCompletionService<Result>(e);
  81  *     int n = solvers.size();
  82  *     List<Future<Result>> futures
  83  *         = new ArrayList<Future<Result>>(n);
  84  *     Result result = null;
  85  *     try {
  86  *         for (Callable<Result> s : solvers)
  87  *             futures.add(ecs.submit(s));
  88  *         for (int i = 0; i < n; ++i) {
  89  *             try {
  90  *                 Result r = ecs.take().get();
  91  *                 if (r != null) {
  92  *                     result = r;
  93  *                     break;
  94  *                 }
  95  *             } catch (ExecutionException ignore) {}
  96  *         }
  97  *     }
  98  *     finally {
  99  *         for (Future<Result> f : futures)
 100  *             f.cancel(true);
 101  *     }
 102  *
 103  *     if (result != null)
 104  *         use(result);
 105  * }}</pre>
 106  */
 107 public class ExecutorCompletionService<V> implements CompletionService<V> {
 108     private final Executor executor;
 109     private final AbstractExecutorService aes;
 110     private final BlockingQueue<Future<V>> completionQueue;
 111 
 112     /**
 113      * FutureTask extension to enqueue upon completion
 114      */
 115     private class QueueingFuture extends FutureTask<Void> {
 116         QueueingFuture(RunnableFuture<V> task) {
 117             super(task, null);
 118             this.task = task;
 119         }
 120         protected void done() { completionQueue.add(task); }
 121         private final Future<V> task;
 122     }
 123 
 124     private RunnableFuture<V> newTaskFor(Callable<V> task) {
 125         if (aes == null)
 126             return new FutureTask<V>(task);
 127         else
 128             return aes.newTaskFor(task);
 129     }
 130 
 131     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
 132         if (aes == null)
 133             return new FutureTask<V>(task, result);
 134         else
 135             return aes.newTaskFor(task, result);
 136     }
 137 
 138     /**
 139      * Creates an ExecutorCompletionService using the supplied
 140      * executor for base task execution and a
 141      * {@link LinkedBlockingQueue} as a completion queue.
 142      *
 143      * @param executor the executor to use
 144      * @throws NullPointerException if executor is {@code null}
 145      */
 146     public ExecutorCompletionService(Executor executor) {
 147         if (executor == null)
 148             throw new NullPointerException();
 149         this.executor = executor;
 150         this.aes = (executor instanceof AbstractExecutorService) ?
 151             (AbstractExecutorService) executor : null;
 152         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
 153     }
 154 
 155     /**
 156      * Creates an ExecutorCompletionService using the supplied
 157      * executor for base task execution and the supplied queue as its
 158      * completion queue.
 159      *
 160      * @param executor the executor to use
 161      * @param completionQueue the queue to use as the completion queue
 162      *        normally one dedicated for use by this service. This
 163      *        queue is treated as unbounded -- failed attempted
 164      *        {@code Queue.add} operations for completed taskes cause
 165      *        them not to be retrievable.
 166      * @throws NullPointerException if executor or completionQueue are {@code null}
 167      */
 168     public ExecutorCompletionService(Executor executor,
 169                                      BlockingQueue<Future<V>> completionQueue) {
 170         if (executor == null || completionQueue == null)
 171             throw new NullPointerException();
 172         this.executor = executor;
 173         this.aes = (executor instanceof AbstractExecutorService) ?
 174             (AbstractExecutorService) executor : null;
 175         this.completionQueue = completionQueue;
 176     }
 177 
 178     public Future<V> submit(Callable<V> task) {
 179         if (task == null) throw new NullPointerException();
 180         RunnableFuture<V> f = newTaskFor(task);
 181         executor.execute(new QueueingFuture(f));
 182         return f;
 183     }
 184 
 185     public Future<V> submit(Runnable task, V result) {
 186         if (task == null) throw new NullPointerException();
 187         RunnableFuture<V> f = newTaskFor(task, result);
 188         executor.execute(new QueueingFuture(f));
 189         return f;
 190     }
 191 
 192     public Future<V> take() throws InterruptedException {
 193         return completionQueue.take();
 194     }
 195 
 196     public Future<V> poll() {
 197         return completionQueue.poll();
 198     }
 199 
 200     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
 201         return completionQueue.poll(timeout, unit);
 202     }
 203 
 204 }