1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.  Oracle designates this
   7  * particular file as subject to the "Classpath" exception as provided
   8  * by Oracle in the LICENSE file that accompanied this code.
   9  *
  10  * This code is distributed in the hope that it will be useful, but WITHOUT
  11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  13  * version 2 for more details (a copy is included in the LICENSE file that
  14  * accompanied this code).
  15  *
  16  * You should have received a copy of the GNU General Public License version
  17  * 2 along with this work; if not, write to the Free Software Foundation,
  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.*;
  38 
  39 /**
  40  * Provides default implementations of {@link ExecutorService}
  41  * execution methods. This class implements the <tt>submit</tt>,
  42  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
  43  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
  44  * to the {@link FutureTask} class provided in this package.  For example,
  45  * the implementation of <tt>submit(Runnable)</tt> creates an
  46  * associated <tt>RunnableFuture</tt> that is executed and
  47  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
  48  * to return <tt>RunnableFuture</tt> implementations other than
  49  * <tt>FutureTask</tt>.
  50  *
  51  * <p> <b>Extension example</b>. Here is a sketch of a class
  52  * that customizes {@link ThreadPoolExecutor} to use
  53  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
  54  *  <pre> {@code
  55  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  56  *
  57  *   static class CustomTask<V> implements RunnableFuture<V> {...}
  58  *
  59  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
  60  *       return new CustomTask<V>(c);
  61  *   }
  62  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
  63  *       return new CustomTask<V>(r, v);
  64  *   }
  65  *   // ... add constructors, etc.
  66  * }}</pre>
  67  *
  68  * @since 1.5
  69  * @author Doug Lea
  70  */
  71 public abstract class AbstractExecutorService implements ExecutorService {
  72 
  73     /**
  74      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
  75      * value.
  76      *
  77      * @param runnable the runnable task being wrapped
  78      * @param value the default value for the returned future
  79      * @return a <tt>RunnableFuture</tt> which when run will run the
  80      * underlying runnable and which, as a <tt>Future</tt>, will yield
  81      * the given value as its result and provide for cancellation of
  82      * the underlying task.
  83      * @since 1.6
  84      */
  85     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  86         return new FutureTask<T>(runnable, value);
  87     }
  88 
  89     /**
  90      * Returns a <tt>RunnableFuture</tt> for the given callable task.
  91      *
  92      * @param callable the callable task being wrapped
  93      * @return a <tt>RunnableFuture</tt> which when run will call the
  94      * underlying callable and which, as a <tt>Future</tt>, will yield
  95      * the callable's result as its result and provide for
  96      * cancellation of the underlying task.
  97      * @since 1.6
  98      */
  99     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 100         return new FutureTask<T>(callable);
 101     }
 102 
 103     /**
 104      * @throws RejectedExecutionException {@inheritDoc}
 105      * @throws NullPointerException       {@inheritDoc}
 106      */
 107     public Future<?> submit(Runnable task) {
 108         if (task == null) throw new NullPointerException();
 109         RunnableFuture<Void> ftask = newTaskFor(task, null);
 110         execute(ftask);
 111         return ftask;
 112     }
 113 
 114     /**
 115      * @throws RejectedExecutionException {@inheritDoc}
 116      * @throws NullPointerException       {@inheritDoc}
 117      */
 118     public <T> Future<T> submit(Runnable task, T result) {
 119         if (task == null) throw new NullPointerException();
 120         RunnableFuture<T> ftask = newTaskFor(task, result);
 121         execute(ftask);
 122         return ftask;
 123     }
 124 
 125     /**
 126      * @throws RejectedExecutionException {@inheritDoc}
 127      * @throws NullPointerException       {@inheritDoc}
 128      */
 129     public <T> Future<T> submit(Callable<T> task) {
 130         if (task == null) throw new NullPointerException();
 131         RunnableFuture<T> ftask = newTaskFor(task);
 132         execute(ftask);
 133         return ftask;
 134     }
 135 
 136     /**
 137      * the main mechanics of invokeAny.
 138      */
 139     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 140                             boolean timed, long nanos)
 141         throws InterruptedException, ExecutionException, TimeoutException {
 142         if (tasks == null)
 143             throw new NullPointerException();
 144         int ntasks = tasks.size();
 145         if (ntasks == 0)
 146             throw new IllegalArgumentException();
 147         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
 148         ExecutorCompletionService<T> ecs =
 149             new ExecutorCompletionService<T>(this);
 150 
 151         // For efficiency, especially in executors with limited
 152         // parallelism, check to see if previously submitted tasks are
 153         // done before submitting more of them. This interleaving
 154         // plus the exception mechanics account for messiness of main
 155         // loop.
 156 
 157         try {
 158             // Record exceptions so that if we fail to obtain any
 159             // result, we can throw the last exception we got.
 160             ExecutionException ee = null;
 161             long lastTime = timed ? System.nanoTime() : 0;
 162             Iterator<? extends Callable<T>> it = tasks.iterator();
 163 
 164             // Start one task for sure; the rest incrementally
 165             futures.add(ecs.submit(it.next()));
 166             --ntasks;
 167             int active = 1;
 168 
 169             for (;;) {
 170                 Future<T> f = ecs.poll();
 171                 if (f == null) {
 172                     if (ntasks > 0) {
 173                         --ntasks;
 174                         futures.add(ecs.submit(it.next()));
 175                         ++active;
 176                     }
 177                     else if (active == 0)
 178                         break;
 179                     else if (timed) {
 180                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 181                         if (f == null)
 182                             throw new TimeoutException();
 183                         long now = System.nanoTime();
 184                         nanos -= now - lastTime;
 185                         lastTime = now;
 186                     }
 187                     else
 188                         f = ecs.take();
 189                 }
 190                 if (f != null) {
 191                     --active;
 192                     try {
 193                         return f.get();


 194                     } catch (ExecutionException eex) {
 195                         ee = eex;
 196                     } catch (RuntimeException rex) {
 197                         ee = new ExecutionException(rex);
 198                     }
 199                 }
 200             }
 201 
 202             if (ee == null)
 203                 ee = new ExecutionException();
 204             throw ee;
 205 
 206         } finally {
 207             for (Future<T> f : futures)
 208                 f.cancel(true);
 209         }
 210     }
 211 
 212     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 213         throws InterruptedException, ExecutionException {
 214         try {
 215             return doInvokeAny(tasks, false, 0);
 216         } catch (TimeoutException cannotHappen) {
 217             assert false;
 218             return null;
 219         }
 220     }
 221 
 222     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
 223                            long timeout, TimeUnit unit)
 224         throws InterruptedException, ExecutionException, TimeoutException {
 225         return doInvokeAny(tasks, true, unit.toNanos(timeout));
 226     }
 227 
 228     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 229         throws InterruptedException {
 230         if (tasks == null)
 231             throw new NullPointerException();
 232         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
 233         boolean done = false;
 234         try {
 235             for (Callable<T> t : tasks) {
 236                 RunnableFuture<T> f = newTaskFor(t);
 237                 futures.add(f);
 238                 execute(f);
 239             }
 240             for (Future<T> f : futures) {
 241                 if (!f.isDone()) {
 242                     try {
 243                         f.get();
 244                     } catch (CancellationException ignore) {
 245                     } catch (ExecutionException ignore) {
 246                     }
 247                 }
 248             }
 249             done = true;
 250             return futures;
 251         } finally {
 252             if (!done)
 253                 for (Future<T> f : futures)
 254                     f.cancel(true);
 255         }
 256     }
 257 
 258     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
 259                                          long timeout, TimeUnit unit)
 260         throws InterruptedException {
 261         if (tasks == null || unit == null)
 262             throw new NullPointerException();
 263         long nanos = unit.toNanos(timeout);
 264         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
 265         boolean done = false;
 266         try {
 267             for (Callable<T> t : tasks)
 268                 futures.add(newTaskFor(t));
 269 
 270             long lastTime = System.nanoTime();
 271 
 272             // Interleave time checks and calls to execute in case
 273             // executor doesn't have any/much parallelism.
 274             Iterator<Future<T>> it = futures.iterator();
 275             while (it.hasNext()) {
 276                 execute((Runnable)(it.next()));
 277                 long now = System.nanoTime();
 278                 nanos -= now - lastTime;
 279                 lastTime = now;
 280                 if (nanos <= 0)
 281                     return futures;
 282             }
 283 
 284             for (Future<T> f : futures) {
 285                 if (!f.isDone()) {
 286                     if (nanos <= 0)
 287                         return futures;
 288                     try {
 289                         f.get(nanos, TimeUnit.NANOSECONDS);
 290                     } catch (CancellationException ignore) {
 291                     } catch (ExecutionException ignore) {
 292                     } catch (TimeoutException toe) {
 293                         return futures;
 294                     }
 295                     long now = System.nanoTime();
 296                     nanos -= now - lastTime;
 297                     lastTime = now;
 298                 }
 299             }
 300             done = true;
 301             return futures;
 302         } finally {
 303             if (!done)
 304                 for (Future<T> f : futures)
 305                     f.cancel(true);
 306         }
 307     }
 308 
 309 }
--- EOF ---