1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
  39 
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.Iterator;
  43 import java.util.List;
  44 
  45 /**
  46  * Provides default implementations of {@link ExecutorService}
  47  * execution methods. This class implements the {@code submit},
  48  * {@code invokeAny} and {@code invokeAll} methods using a
  49  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
  50  * to the {@link FutureTask} class provided in this package.  For example,
  51  * the implementation of {@code submit(Runnable)} creates an
  52  * associated {@code RunnableFuture} that is executed and
  53  * returned. Subclasses may override the {@code newTaskFor} methods
  54  * to return {@code RunnableFuture} implementations other than
  55  * {@code FutureTask}.
  56  *
  57  * <p><b>Extension example</b>. Here is a sketch of a class
  58  * that customizes {@link ThreadPoolExecutor} to use
  59  * a {@code CustomTask} class instead of the default {@code FutureTask}:
  60  * <pre> {@code
  61  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  62  *
  63  *   static class CustomTask<V> implements RunnableFuture<V> {...}
  64  *
  65  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
  66  *       return new CustomTask<V>(c);
  67  *   }
  68  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
  69  *       return new CustomTask<V>(r, v);
  70  *   }
  71  *   // ... add constructors, etc.
  72  * }}</pre>
  73  *
  74  * @since 1.5
  75  * @author Doug Lea
  76  */
  77 public abstract class AbstractExecutorService implements ExecutorService {
  78 
  79     /**
  80      * Returns a {@code RunnableFuture} for the given runnable and default
  81      * value.
  82      *
  83      * @param runnable the runnable task being wrapped
  84      * @param value the default value for the returned future
  85      * @param <T> the type of the given value
  86      * @return a {@code RunnableFuture} which, when run, will run the
  87      * underlying runnable and which, as a {@code Future}, will yield
  88      * the given value as its result and provide for cancellation of
  89      * the underlying task
  90      * @since 1.6
  91      */
  92     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  93         return new FutureTask<T>(runnable, value);
  94     }
  95 
  96     /**
  97      * Returns a {@code RunnableFuture} for the given callable task.
  98      *
  99      * @param callable the callable task being wrapped
 100      * @param <T> the type of the callable's result
 101      * @return a {@code RunnableFuture} which, when run, will call the
 102      * underlying callable and which, as a {@code Future}, will yield
 103      * the callable's result as its result and provide for
 104      * cancellation of the underlying task
 105      * @since 1.6
 106      */
 107     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 108         return new FutureTask<T>(callable);
 109     }
 110 
 111     /**
 112      * @throws RejectedExecutionException {@inheritDoc}
 113      * @throws NullPointerException       {@inheritDoc}
 114      */
 115     public Future<?> submit(Runnable task) {
 116         if (task == null) throw new NullPointerException();
 117         RunnableFuture<Void> ftask = newTaskFor(task, null);
 118         execute(ftask);
 119         return ftask;
 120     }
 121 
 122     /**
 123      * @throws RejectedExecutionException {@inheritDoc}
 124      * @throws NullPointerException       {@inheritDoc}
 125      */
 126     public <T> Future<T> submit(Runnable task, T result) {
 127         if (task == null) throw new NullPointerException();
 128         RunnableFuture<T> ftask = newTaskFor(task, result);
 129         execute(ftask);
 130         return ftask;
 131     }
 132 
 133     /**
 134      * @throws RejectedExecutionException {@inheritDoc}
 135      * @throws NullPointerException       {@inheritDoc}
 136      */
 137     public <T> Future<T> submit(Callable<T> task) {
 138         if (task == null) throw new NullPointerException();
 139         RunnableFuture<T> ftask = newTaskFor(task);
 140         execute(ftask);
 141         return ftask;
 142     }
 143 
 144     /**
 145      * the main mechanics of invokeAny.
 146      */
 147     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 148                               boolean timed, long nanos)
 149         throws InterruptedException, ExecutionException, TimeoutException {
 150         if (tasks == null)
 151             throw new NullPointerException();
 152         int ntasks = tasks.size();
 153         if (ntasks == 0)
 154             throw new IllegalArgumentException();
 155         ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
 156         ExecutorCompletionService<T> ecs =
 157             new ExecutorCompletionService<T>(this);
 158 
 159         // For efficiency, especially in executors with limited
 160         // parallelism, check to see if previously submitted tasks are
 161         // done before submitting more of them. This interleaving
 162         // plus the exception mechanics account for messiness of main
 163         // loop.
 164 
 165         try {
 166             // Record exceptions so that if we fail to obtain any
 167             // result, we can throw the last exception we got.
 168             ExecutionException ee = null;
 169             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 170             Iterator<? extends Callable<T>> it = tasks.iterator();
 171 
 172             // Start one task for sure; the rest incrementally
 173             futures.add(ecs.submit(it.next()));
 174             --ntasks;
 175             int active = 1;
 176 
 177             for (;;) {
 178                 Future<T> f = ecs.poll();
 179                 if (f == null) {
 180                     if (ntasks > 0) {
 181                         --ntasks;
 182                         futures.add(ecs.submit(it.next()));
 183                         ++active;
 184                     }
 185                     else if (active == 0)
 186                         break;
 187                     else if (timed) {
 188                         f = ecs.poll(nanos, NANOSECONDS);
 189                         if (f == null)
 190                             throw new TimeoutException();
 191                         nanos = deadline - System.nanoTime();
 192                     }
 193                     else
 194                         f = ecs.take();
 195                 }
 196                 if (f != null) {
 197                     --active;
 198                     try {
 199                         return f.get();
 200                     } catch (ExecutionException eex) {
 201                         ee = eex;
 202                     } catch (RuntimeException rex) {
 203                         ee = new ExecutionException(rex);
 204                     }
 205                 }
 206             }
 207 
 208             if (ee == null)
 209                 ee = new ExecutionException();
 210             throw ee;
 211 
 212         } finally {
 213             cancelAll(futures);
 214         }
 215     }
 216 
 217     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 218         throws InterruptedException, ExecutionException {
 219         try {
 220             return doInvokeAny(tasks, false, 0);
 221         } catch (TimeoutException cannotHappen) {
 222             assert false;
 223             return null;
 224         }
 225     }
 226 
 227     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
 228                            long timeout, TimeUnit unit)
 229         throws InterruptedException, ExecutionException, TimeoutException {
 230         return doInvokeAny(tasks, true, unit.toNanos(timeout));
 231     }
 232 
 233     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 234         throws InterruptedException {
 235         if (tasks == null)
 236             throw new NullPointerException();
 237         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
 238         try {
 239             for (Callable<T> t : tasks) {
 240                 RunnableFuture<T> f = newTaskFor(t);
 241                 futures.add(f);
 242                 execute(f);
 243             }
 244             for (int i = 0, size = futures.size(); i < size; i++) {
 245                 Future<T> f = futures.get(i);
 246                 if (!f.isDone()) {
 247                     try { f.get(); }
 248                     catch (CancellationException | ExecutionException ignore) {}
 249                 }
 250             }
 251             return futures;
 252         } catch (Throwable t) {
 253             cancelAll(futures);
 254             throw t;
 255         }
 256     }
 257 
 258     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 259                                          long timeout, TimeUnit unit)
 260         throws InterruptedException {
 261         if (tasks == null)
 262             throw new NullPointerException();
 263         final long nanos = unit.toNanos(timeout);
 264         final long deadline = System.nanoTime() + nanos;
 265         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
 266         int j = 0;
 267         timedOut: try {
 268             for (Callable<T> t : tasks)
 269                 futures.add(newTaskFor(t));
 270 
 271             final int size = futures.size();
 272 
 273             // Interleave time checks and calls to execute in case
 274             // executor doesn't have any/much parallelism.
 275             for (int i = 0; i < size; i++) {
 276                 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
 277                     break timedOut;
 278                 execute((Runnable)futures.get(i));
 279             }
 280 
 281             for (; j < size; j++) {
 282                 Future<T> f = futures.get(j);
 283                 if (!f.isDone()) {
 284                     try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
 285                     catch (CancellationException | ExecutionException ignore) {}
 286                     catch (TimeoutException timedOut) {
 287                         break timedOut;
 288                     }
 289                 }
 290             }
 291             return futures;
 292         } catch (Throwable t) {
 293             cancelAll(futures);
 294             throw t;
 295         }
 296         // Timed out before all the tasks could be completed; cancel remaining
 297         cancelAll(futures, j);
 298         return futures;
 299     }
 300 
 301     private static <T> void cancelAll(ArrayList<Future<T>> futures) {
 302         cancelAll(futures, 0);
 303     }
 304 
 305     /** Cancels all futures with index at least j. */
 306     private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
 307         for (int size = futures.size(); j < size; j++)
 308             futures.get(j).cancel(true);
 309     }
 310 }