1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.*;
  38 
  39 /**
  40  * Provides default implementations of {@link ExecutorService}
  41  * execution methods. This class implements the <tt>submit</tt>,
  42  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
  43  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
  44  * to the {@link FutureTask} class provided in this package.  For example,
  45  * the implementation of <tt>submit(Runnable)</tt> creates an
  46  * associated <tt>RunnableFuture</tt> that is executed and
  47  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
  48  * to return <tt>RunnableFuture</tt> implementations other than
  49  * <tt>FutureTask</tt>.
  50  *
  51  * <p> <b>Extension example</b>. Here is a sketch of a class
  52  * that customizes {@link ThreadPoolExecutor} to use
  53  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
  54  * <pre>
  55  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  56  *
  57  *   static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
  58  *
  59  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
  60  *       return new CustomTask&lt;V&gt;(c);
  61  *   }
  62  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
  63  *       return new CustomTask&lt;V&gt;(r, v);
  64  *   }
  65  *   // ... add constructors, etc.
  66  * }
  67  * </pre>
  68  * @since 1.5
  69  * @author Doug Lea
  70  */
  71 public abstract class AbstractExecutorService implements ExecutorService {
  72 
  73     /**
  74      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
  75      * value.
  76      *
  77      * @param runnable the runnable task being wrapped
  78      * @param value the default value for the returned future
  79      * @return a <tt>RunnableFuture</tt> which when run will run the
  80      * underlying runnable and which, as a <tt>Future</tt>, will yield
  81      * the given value as its result and provide for cancellation of
  82      * the underlying task.
  83      * @since 1.6
  84      */
  85     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  86         return new FutureTask<T>(runnable, value);
  87     }
  88 
  89     /**
  90      * Returns a <tt>RunnableFuture</tt> for the given callable task.
  91      *
  92      * @param callable the callable task being wrapped
  93      * @return a <tt>RunnableFuture</tt> which when run will call the
  94      * underlying callable and which, as a <tt>Future</tt>, will yield
  95      * the callable's result as its result and provide for
  96      * cancellation of the underlying task.
  97      * @since 1.6
  98      */
  99     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 100         return new FutureTask<T>(callable);
 101     }
 102 
 103     /**
 104      * @throws RejectedExecutionException {@inheritDoc}
 105      * @throws NullPointerException       {@inheritDoc}
 106      */
 107     public Future<?> submit(Runnable task) {
 108         if (task == null) throw new NullPointerException();
 109         RunnableFuture<Object> ftask = newTaskFor(task, null);
 110         execute(ftask);
 111         return ftask;
 112     }
 113 
 114     /**
 115      * @throws RejectedExecutionException {@inheritDoc}
 116      * @throws NullPointerException       {@inheritDoc}
 117      */
 118     public <T> Future<T> submit(Runnable task, T result) {
 119         if (task == null) throw new NullPointerException();
 120         RunnableFuture<T> ftask = newTaskFor(task, result);
 121         execute(ftask);
 122         return ftask;
 123     }
 124 
 125     /**
 126      * @throws RejectedExecutionException {@inheritDoc}
 127      * @throws NullPointerException       {@inheritDoc}
 128      */
 129     public <T> Future<T> submit(Callable<T> task) {
 130         if (task == null) throw new NullPointerException();
 131         RunnableFuture<T> ftask = newTaskFor(task);
 132         execute(ftask);
 133         return ftask;
 134     }
 135 
 136     /**
 137      * the main mechanics of invokeAny.
 138      */
 139     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 140                             boolean timed, long nanos)
 141         throws InterruptedException, ExecutionException, TimeoutException {
 142         if (tasks == null)
 143             throw new NullPointerException();
 144         int ntasks = tasks.size();
 145         if (ntasks == 0)
 146             throw new IllegalArgumentException();
 147         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
 148         ExecutorCompletionService<T> ecs =
 149             new ExecutorCompletionService<T>(this);
 150 
 151         // For efficiency, especially in executors with limited
 152         // parallelism, check to see if previously submitted tasks are
 153         // done before submitting more of them. This interleaving
 154         // plus the exception mechanics account for messiness of main
 155         // loop.
 156 
 157         try {
 158             // Record exceptions so that if we fail to obtain any
 159             // result, we can throw the last exception we got.
 160             ExecutionException ee = null;
 161             long lastTime = (timed)? System.nanoTime() : 0;
 162             Iterator<? extends Callable<T>> it = tasks.iterator();
 163 
 164             // Start one task for sure; the rest incrementally
 165             futures.add(ecs.submit(it.next()));
 166             --ntasks;
 167             int active = 1;
 168 
 169             for (;;) {
 170                 Future<T> f = ecs.poll();
 171                 if (f == null) {
 172                     if (ntasks > 0) {
 173                         --ntasks;
 174                         futures.add(ecs.submit(it.next()));
 175                         ++active;
 176                     }
 177                     else if (active == 0)
 178                         break;
 179                     else if (timed) {
 180                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 181                         if (f == null)
 182                             throw new TimeoutException();
 183                         long now = System.nanoTime();
 184                         nanos -= now - lastTime;
 185                         lastTime = now;
 186                     }
 187                     else
 188                         f = ecs.take();
 189                 }
 190                 if (f != null) {
 191                     --active;
 192                     try {
 193                         return f.get();
 194                     } catch (InterruptedException ie) {
 195                         throw ie;
 196                     } catch (ExecutionException eex) {
 197                         ee = eex;
 198                     } catch (RuntimeException rex) {
 199                         ee = new ExecutionException(rex);
 200                     }
 201                 }
 202             }
 203 
 204             if (ee == null)
 205                 ee = new ExecutionException();
 206             throw ee;
 207 
 208         } finally {
 209             for (Future<T> f : futures)
 210                 f.cancel(true);
 211         }
 212     }
 213 
 214     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 215         throws InterruptedException, ExecutionException {
 216         try {
 217             return doInvokeAny(tasks, false, 0);
 218         } catch (TimeoutException cannotHappen) {
 219             assert false;
 220             return null;
 221         }
 222     }
 223 
 224     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
 225                            long timeout, TimeUnit unit)
 226         throws InterruptedException, ExecutionException, TimeoutException {
 227         return doInvokeAny(tasks, true, unit.toNanos(timeout));
 228     }
 229 
 230     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 231         throws InterruptedException {
 232         if (tasks == null)
 233             throw new NullPointerException();
 234         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
 235         boolean done = false;
 236         try {
 237             for (Callable<T> t : tasks) {
 238                 RunnableFuture<T> f = newTaskFor(t);
 239                 futures.add(f);
 240                 execute(f);
 241             }
 242             for (Future<T> f : futures) {
 243                 if (!f.isDone()) {
 244                     try {
 245                         f.get();
 246                     } catch (CancellationException ignore) {
 247                     } catch (ExecutionException ignore) {
 248                     }
 249                 }
 250             }
 251             done = true;
 252             return futures;
 253         } finally {
 254             if (!done)
 255                 for (Future<T> f : futures)
 256                     f.cancel(true);
 257         }
 258     }
 259 
 260     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 261                                          long timeout, TimeUnit unit)
 262         throws InterruptedException {
 263         if (tasks == null || unit == null)
 264             throw new NullPointerException();
 265         long nanos = unit.toNanos(timeout);
 266         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
 267         boolean done = false;
 268         try {
 269             for (Callable<T> t : tasks)
 270                 futures.add(newTaskFor(t));
 271 
 272             long lastTime = System.nanoTime();
 273 
 274             // Interleave time checks and calls to execute in case
 275             // executor doesn't have any/much parallelism.
 276             Iterator<Future<T>> it = futures.iterator();
 277             while (it.hasNext()) {
 278                 execute((Runnable)(it.next()));
 279                 long now = System.nanoTime();
 280                 nanos -= now - lastTime;
 281                 lastTime = now;
 282                 if (nanos <= 0)
 283                     return futures;
 284             }
 285 
 286             for (Future<T> f : futures) {
 287                 if (!f.isDone()) {
 288                     if (nanos <= 0)
 289                         return futures;
 290                     try {
 291                         f.get(nanos, TimeUnit.NANOSECONDS);
 292                     } catch (CancellationException ignore) {
 293                     } catch (ExecutionException ignore) {
 294                     } catch (TimeoutException toe) {
 295                         return futures;
 296                     }
 297                     long now = System.nanoTime();
 298                     nanos -= now - lastTime;
 299                     lastTime = now;
 300                 }
 301             }
 302             done = true;
 303             return futures;
 304         } finally {
 305             if (!done)
 306                 for (Future<T> f : futures)
 307                     f.cancel(true);
 308         }
 309     }
 310 
 311 }