src/share/classes/java/util/concurrent/AbstractExecutorService.java

Print this page




  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.*;
  38 
  39 /**
  40  * Provides default implementations of {@link ExecutorService}
  41  * execution methods. This class implements the <tt>submit</tt>,
  42  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
  43  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
  44  * to the {@link FutureTask} class provided in this package.  For example,
  45  * the implementation of <tt>submit(Runnable)</tt> creates an
  46  * associated <tt>RunnableFuture</tt> that is executed and
  47  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
  48  * to return <tt>RunnableFuture</tt> implementations other than
  49  * <tt>FutureTask</tt>.
  50  *
  51  * <p> <b>Extension example</b>. Here is a sketch of a class
  52  * that customizes {@link ThreadPoolExecutor} to use
  53  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
  54  * <pre>
  55  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  56  *
  57  *   static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
  58  *
  59  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
  60  *       return new CustomTask&lt;V&gt;(c);
  61  *   }
  62  *   protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
  63  *       return new CustomTask&lt;V&gt;(r, v);
  64  *   }
  65  *   // ... add constructors, etc.
  66  * }
  67  * </pre>
  68  * @since 1.5
  69  * @author Doug Lea
  70  */
  71 public abstract class AbstractExecutorService implements ExecutorService {
  72 
  73     /**
  74      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
  75      * value.
  76      *
  77      * @param runnable the runnable task being wrapped
  78      * @param value the default value for the returned future
  79      * @return a <tt>RunnableFuture</tt> which when run will run the
  80      * underlying runnable and which, as a <tt>Future</tt>, will yield
  81      * the given value as its result and provide for cancellation of
  82      * the underlying task.
  83      * @since 1.6
  84      */
  85     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  86         return new FutureTask<T>(runnable, value);
  87     }


  89     /**
  90      * Returns a <tt>RunnableFuture</tt> for the given callable task.
  91      *
  92      * @param callable the callable task being wrapped
  93      * @return a <tt>RunnableFuture</tt> which when run will call the
  94      * underlying callable and which, as a <tt>Future</tt>, will yield
  95      * the callable's result as its result and provide for
  96      * cancellation of the underlying task.
  97      * @since 1.6
  98      */
  99     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 100         return new FutureTask<T>(callable);
 101     }
 102 
 103     /**
 104      * @throws RejectedExecutionException {@inheritDoc}
 105      * @throws NullPointerException       {@inheritDoc}
 106      */
 107     public Future<?> submit(Runnable task) {
 108         if (task == null) throw new NullPointerException();
 109         RunnableFuture<Object> ftask = newTaskFor(task, null);
 110         execute(ftask);
 111         return ftask;
 112     }
 113 
 114     /**
 115      * @throws RejectedExecutionException {@inheritDoc}
 116      * @throws NullPointerException       {@inheritDoc}
 117      */
 118     public <T> Future<T> submit(Runnable task, T result) {
 119         if (task == null) throw new NullPointerException();
 120         RunnableFuture<T> ftask = newTaskFor(task, result);
 121         execute(ftask);
 122         return ftask;
 123     }
 124 
 125     /**
 126      * @throws RejectedExecutionException {@inheritDoc}
 127      * @throws NullPointerException       {@inheritDoc}
 128      */
 129     public <T> Future<T> submit(Callable<T> task) {


 141         throws InterruptedException, ExecutionException, TimeoutException {
 142         if (tasks == null)
 143             throw new NullPointerException();
 144         int ntasks = tasks.size();
 145         if (ntasks == 0)
 146             throw new IllegalArgumentException();
 147         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
 148         ExecutorCompletionService<T> ecs =
 149             new ExecutorCompletionService<T>(this);
 150 
 151         // For efficiency, especially in executors with limited
 152         // parallelism, check to see if previously submitted tasks are
 153         // done before submitting more of them. This interleaving
 154         // plus the exception mechanics account for messiness of main
 155         // loop.
 156 
 157         try {
 158             // Record exceptions so that if we fail to obtain any
 159             // result, we can throw the last exception we got.
 160             ExecutionException ee = null;
 161             long lastTime = (timed)? System.nanoTime() : 0;
 162             Iterator<? extends Callable<T>> it = tasks.iterator();
 163 
 164             // Start one task for sure; the rest incrementally
 165             futures.add(ecs.submit(it.next()));
 166             --ntasks;
 167             int active = 1;
 168 
 169             for (;;) {
 170                 Future<T> f = ecs.poll();
 171                 if (f == null) {
 172                     if (ntasks > 0) {
 173                         --ntasks;
 174                         futures.add(ecs.submit(it.next()));
 175                         ++active;
 176                     }
 177                     else if (active == 0)
 178                         break;
 179                     else if (timed) {
 180                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 181                         if (f == null)
 182                             throw new TimeoutException();
 183                         long now = System.nanoTime();
 184                         nanos -= now - lastTime;
 185                         lastTime = now;
 186                     }
 187                     else
 188                         f = ecs.take();
 189                 }
 190                 if (f != null) {
 191                     --active;
 192                     try {
 193                         return f.get();
 194                     } catch (InterruptedException ie) {
 195                         throw ie;
 196                     } catch (ExecutionException eex) {
 197                         ee = eex;
 198                     } catch (RuntimeException rex) {
 199                         ee = new ExecutionException(rex);
 200                     }
 201                 }
 202             }
 203 
 204             if (ee == null)
 205                 ee = new ExecutionException();
 206             throw ee;
 207 
 208         } finally {
 209             for (Future<T> f : futures)
 210                 f.cancel(true);
 211         }
 212     }
 213 
 214     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 215         throws InterruptedException, ExecutionException {




  34  */
  35 
  36 package java.util.concurrent;
  37 import java.util.*;
  38 
  39 /**
  40  * Provides default implementations of {@link ExecutorService}
  41  * execution methods. This class implements the <tt>submit</tt>,
  42  * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
  43  * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
  44  * to the {@link FutureTask} class provided in this package.  For example,
  45  * the implementation of <tt>submit(Runnable)</tt> creates an
  46  * associated <tt>RunnableFuture</tt> that is executed and
  47  * returned. Subclasses may override the <tt>newTaskFor</tt> methods
  48  * to return <tt>RunnableFuture</tt> implementations other than
  49  * <tt>FutureTask</tt>.
  50  *
  51  * <p> <b>Extension example</b>. Here is a sketch of a class
  52  * that customizes {@link ThreadPoolExecutor} to use
  53  * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
  54  *  <pre> {@code
  55  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
  56  *
  57  *   static class CustomTask<V> implements RunnableFuture<V> {...}
  58  *
  59  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
  60  *       return new CustomTask<V>(c);
  61  *   }
  62  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
  63  *       return new CustomTask<V>(r, v);
  64  *   }
  65  *   // ... add constructors, etc.
  66  * }}</pre>
  67  *
  68  * @since 1.5
  69  * @author Doug Lea
  70  */
  71 public abstract class AbstractExecutorService implements ExecutorService {
  72 
  73     /**
  74      * Returns a <tt>RunnableFuture</tt> for the given runnable and default
  75      * value.
  76      *
  77      * @param runnable the runnable task being wrapped
  78      * @param value the default value for the returned future
  79      * @return a <tt>RunnableFuture</tt> which when run will run the
  80      * underlying runnable and which, as a <tt>Future</tt>, will yield
  81      * the given value as its result and provide for cancellation of
  82      * the underlying task.
  83      * @since 1.6
  84      */
  85     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  86         return new FutureTask<T>(runnable, value);
  87     }


  89     /**
  90      * Returns a <tt>RunnableFuture</tt> for the given callable task.
  91      *
  92      * @param callable the callable task being wrapped
  93      * @return a <tt>RunnableFuture</tt> which when run will call the
  94      * underlying callable and which, as a <tt>Future</tt>, will yield
  95      * the callable's result as its result and provide for
  96      * cancellation of the underlying task.
  97      * @since 1.6
  98      */
  99     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
 100         return new FutureTask<T>(callable);
 101     }
 102 
 103     /**
 104      * @throws RejectedExecutionException {@inheritDoc}
 105      * @throws NullPointerException       {@inheritDoc}
 106      */
 107     public Future<?> submit(Runnable task) {
 108         if (task == null) throw new NullPointerException();
 109         RunnableFuture<Void> ftask = newTaskFor(task, null);
 110         execute(ftask);
 111         return ftask;
 112     }
 113 
 114     /**
 115      * @throws RejectedExecutionException {@inheritDoc}
 116      * @throws NullPointerException       {@inheritDoc}
 117      */
 118     public <T> Future<T> submit(Runnable task, T result) {
 119         if (task == null) throw new NullPointerException();
 120         RunnableFuture<T> ftask = newTaskFor(task, result);
 121         execute(ftask);
 122         return ftask;
 123     }
 124 
 125     /**
 126      * @throws RejectedExecutionException {@inheritDoc}
 127      * @throws NullPointerException       {@inheritDoc}
 128      */
 129     public <T> Future<T> submit(Callable<T> task) {


 141         throws InterruptedException, ExecutionException, TimeoutException {
 142         if (tasks == null)
 143             throw new NullPointerException();
 144         int ntasks = tasks.size();
 145         if (ntasks == 0)
 146             throw new IllegalArgumentException();
 147         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
 148         ExecutorCompletionService<T> ecs =
 149             new ExecutorCompletionService<T>(this);
 150 
 151         // For efficiency, especially in executors with limited
 152         // parallelism, check to see if previously submitted tasks are
 153         // done before submitting more of them. This interleaving
 154         // plus the exception mechanics account for messiness of main
 155         // loop.
 156 
 157         try {
 158             // Record exceptions so that if we fail to obtain any
 159             // result, we can throw the last exception we got.
 160             ExecutionException ee = null;
 161             long lastTime = timed ? System.nanoTime() : 0;
 162             Iterator<? extends Callable<T>> it = tasks.iterator();
 163 
 164             // Start one task for sure; the rest incrementally
 165             futures.add(ecs.submit(it.next()));
 166             --ntasks;
 167             int active = 1;
 168 
 169             for (;;) {
 170                 Future<T> f = ecs.poll();
 171                 if (f == null) {
 172                     if (ntasks > 0) {
 173                         --ntasks;
 174                         futures.add(ecs.submit(it.next()));
 175                         ++active;
 176                     }
 177                     else if (active == 0)
 178                         break;
 179                     else if (timed) {
 180                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 181                         if (f == null)
 182                             throw new TimeoutException();
 183                         long now = System.nanoTime();
 184                         nanos -= now - lastTime;
 185                         lastTime = now;
 186                     }
 187                     else
 188                         f = ecs.take();
 189                 }
 190                 if (f != null) {
 191                     --active;
 192                     try {
 193                         return f.get();


 194                     } catch (ExecutionException eex) {
 195                         ee = eex;
 196                     } catch (RuntimeException rex) {
 197                         ee = new ExecutionException(rex);
 198                     }
 199                 }
 200             }
 201 
 202             if (ee == null)
 203                 ee = new ExecutionException();
 204             throw ee;
 205 
 206         } finally {
 207             for (Future<T> f : futures)
 208                 f.cancel(true);
 209         }
 210     }
 211 
 212     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 213         throws InterruptedException, ExecutionException {