1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  39 
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.Iterator;
  43 import java.util.List;
  44 
  45 /**
  46  * Provides default implementations of {@link ExecutorService}
  47  * execution methods. This class implements the {@code submit},
  48  * {@code invokeAny} and {@code invokeAll} methods using a
  49  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
  50  * to the {@link FutureTask} class provided in this package.  For example,
  51  * the implementation of {@code submit(Runnable)} creates an
  52  * associated {@code RunnableFuture} that is executed and
  53  * returned. Subclasses may override the {@code newTaskFor} methods
  54  * to return {@code RunnableFuture} implementations other than
  55  * {@code FutureTask}.
  56  *
  57  * <p><b>Extension example</b>. Here is a sketch of a class
  58  * that customizes {@link ThreadPoolExecutor} to use
  59  * a {@code CustomTask} class instead of the default {@code FutureTask}:
  60  * <pre> {@code
  61  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  62  *
  63  *   static class CustomTask<V> implements RunnableFuture<V> {...}
  64  *
  65  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
  66  *       return new CustomTask<V>(c);
  67  *   }
  68  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
  69  *       return new CustomTask<V>(r, v);
  70  *   }
  71  *   // ... add constructors, etc.
  72  * }}</pre>
  73  *
  74  * @since 1.5
  75  * @author Doug Lea
  76  */
  77 public abstract class AbstractExecutorService implements ExecutorService {
  78 
  79     /**
  80      * Constructor for subclasses to call.
  81      */
  82     public AbstractExecutorService() {}
  83 
  84     /**
  85      * Returns a {@code RunnableFuture} for the given runnable and default
  86      * value.
  87      *
  88      * @param runnable the runnable task being wrapped
  89      * @param value the default value for the returned future
  90      * @param <T> the type of the given value
  91      * @return a {@code RunnableFuture} which, when run, will run the
  92      * underlying runnable and which, as a {@code Future}, will yield
  93      * the given value as its result and provide for cancellation of
  94      * the underlying task
  95      * @since 1.6
  96      */
  97     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  98         return new FutureTask<T>(runnable, value);
  99     }
 100 
 101     /**
 102      * Returns a {@code RunnableFuture} for the given callable task.
 103      *
 104      * @param callable the callable task being wrapped
 105      * @param <T> the type of the callable's result
 106      * @return a {@code RunnableFuture} which, when run, will call the
 107      * underlying callable and which, as a {@code Future}, will yield
 108      * the callable's result as its result and provide for
 109      * cancellation of the underlying task
 110      * @since 1.6
 111      */
 112     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 113         return new FutureTask<T>(callable);
 114     }
 115 
 116     /**
 117      * @throws RejectedExecutionException {@inheritDoc}
 118      * @throws NullPointerException       {@inheritDoc}
 119      */
 120     public Future<?> submit(Runnable task) {
 121         if (task == null) throw new NullPointerException();
 122         RunnableFuture<Void> ftask = newTaskFor(task, null);
 123         execute(ftask);
 124         return ftask;
 125     }
 126 
 127     /**
 128      * @throws RejectedExecutionException {@inheritDoc}
 129      * @throws NullPointerException       {@inheritDoc}
 130      */
 131     public <T> Future<T> submit(Runnable task, T result) {
 132         if (task == null) throw new NullPointerException();
 133         RunnableFuture<T> ftask = newTaskFor(task, result);
 134         execute(ftask);
 135         return ftask;
 136     }
 137 
 138     /**
 139      * @throws RejectedExecutionException {@inheritDoc}
 140      * @throws NullPointerException       {@inheritDoc}
 141      */
 142     public <T> Future<T> submit(Callable<T> task) {
 143         if (task == null) throw new NullPointerException();
 144         RunnableFuture<T> ftask = newTaskFor(task);
 145         execute(ftask);
 146         return ftask;
 147     }
 148 
 149     /**
 150      * the main mechanics of invokeAny.
 151      */
 152     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 153                               boolean timed, long nanos)
 154         throws InterruptedException, ExecutionException, TimeoutException {
 155         if (tasks == null)
 156             throw new NullPointerException();
 157         int ntasks = tasks.size();
 158         if (ntasks == 0)
 159             throw new IllegalArgumentException();
 160         ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
 161         ExecutorCompletionService<T> ecs =
 162             new ExecutorCompletionService<T>(this);
 163 
 164         // For efficiency, especially in executors with limited
 165         // parallelism, check to see if previously submitted tasks are
 166         // done before submitting more of them. This interleaving
 167         // plus the exception mechanics account for messiness of main
 168         // loop.
 169 
 170         try {
 171             // Record exceptions so that if we fail to obtain any
 172             // result, we can throw the last exception we got.
 173             ExecutionException ee = null;
 174             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 175             Iterator<? extends Callable<T>> it = tasks.iterator();
 176 
 177             // Start one task for sure; the rest incrementally
 178             futures.add(ecs.submit(it.next()));
 179             --ntasks;
 180             int active = 1;
 181 
 182             for (;;) {
 183                 Future<T> f = ecs.poll();
 184                 if (f == null) {
 185                     if (ntasks > 0) {
 186                         --ntasks;
 187                         futures.add(ecs.submit(it.next()));
 188                         ++active;
 189                     }
 190                     else if (active == 0)
 191                         break;
 192                     else if (timed) {
 193                         f = ecs.poll(nanos, NANOSECONDS);
 194                         if (f == null)
 195                             throw new TimeoutException();
 196                         nanos = deadline - System.nanoTime();
 197                     }
 198                     else
 199                         f = ecs.take();
 200                 }
 201                 if (f != null) {
 202                     --active;
 203                     try {
 204                         return f.get();
 205                     } catch (ExecutionException eex) {
 206                         ee = eex;
 207                     } catch (RuntimeException rex) {
 208                         ee = new ExecutionException(rex);
 209                     }
 210                 }
 211             }
 212 
 213             if (ee == null)
 214                 ee = new ExecutionException();
 215             throw ee;
 216 
 217         } finally {
 218             cancelAll(futures);
 219         }
 220     }
 221 
 222     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 223         throws InterruptedException, ExecutionException {
 224         try {
 225             return doInvokeAny(tasks, false, 0);
 226         } catch (TimeoutException cannotHappen) {
 227             assert false;
 228             return null;
 229         }
 230     }
 231 
 232     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
 233                            long timeout, TimeUnit unit)
 234         throws InterruptedException, ExecutionException, TimeoutException {
 235         return doInvokeAny(tasks, true, unit.toNanos(timeout));
 236     }
 237 
 238     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 239         throws InterruptedException {
 240         if (tasks == null)
 241             throw new NullPointerException();
 242         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
 243         try {
 244             for (Callable<T> t : tasks) {
 245                 RunnableFuture<T> f = newTaskFor(t);
 246                 futures.add(f);
 247                 execute(f);
 248             }
 249             for (int i = 0, size = futures.size(); i < size; i++) {
 250                 Future<T> f = futures.get(i);
 251                 if (!f.isDone()) {
 252                     try { f.get(); }
 253                     catch (CancellationException | ExecutionException ignore) {}
 254                 }
 255             }
 256             return futures;
 257         } catch (Throwable t) {
 258             cancelAll(futures);
 259             throw t;
 260         }
 261     }
 262 
 263     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 264                                          long timeout, TimeUnit unit)
 265         throws InterruptedException {
 266         if (tasks == null)
 267             throw new NullPointerException();
 268         final long nanos = unit.toNanos(timeout);
 269         final long deadline = System.nanoTime() + nanos;
 270         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
 271         int j = 0;
 272         timedOut: try {
 273             for (Callable<T> t : tasks)
 274                 futures.add(newTaskFor(t));
 275 
 276             final int size = futures.size();
 277 
 278             // Interleave time checks and calls to execute in case
 279             // executor doesn't have any/much parallelism.
 280             for (int i = 0; i < size; i++) {
 281                 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
 282                     break timedOut;
 283                 execute((Runnable)futures.get(i));
 284             }
 285 
 286             for (; j < size; j++) {
 287                 Future<T> f = futures.get(j);
 288                 if (!f.isDone()) {
 289                     try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
 290                     catch (CancellationException | ExecutionException ignore) {}
 291                     catch (TimeoutException timedOut) {
 292                         break timedOut;
 293                     }
 294                 }
 295             }
 296             return futures;
 297         } catch (Throwable t) {
 298             cancelAll(futures);
 299             throw t;
 300         }
 301         // Timed out before all the tasks could be completed; cancel remaining
 302         cancelAll(futures, j);
 303         return futures;
 304     }
 305 
 306     private static <T> void cancelAll(ArrayList<Future<T>> futures) {
 307         cancelAll(futures, 0);
 308     }
 309 
 310     /** Cancels all futures with index at least j. */
 311     private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
 312         for (int size = futures.size(); j < size; j++)
 313             futures.get(j).cancel(true);
 314     }
 315 }