34 */
35
36 package java.util.concurrent;
37 import java.util.*;
38
39 /**
40 * Provides default implementations of {@link ExecutorService}
41 * execution methods. This class implements the <tt>submit</tt>,
42 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
43 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
44 * to the {@link FutureTask} class provided in this package. For example,
45 * the implementation of <tt>submit(Runnable)</tt> creates an
46 * associated <tt>RunnableFuture</tt> that is executed and
47 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
48 * to return <tt>RunnableFuture</tt> implementations other than
49 * <tt>FutureTask</tt>.
50 *
51 * <p> <b>Extension example</b>. Here is a sketch of a class
52 * that customizes {@link ThreadPoolExecutor} to use
53 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
54 * <pre>
55 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
56 *
57 * static class CustomTask<V> implements RunnableFuture<V> {...}
58 *
59 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
60 * return new CustomTask<V>(c);
61 * }
62 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
63 * return new CustomTask<V>(r, v);
64 * }
65 * // ... add constructors, etc.
66 * }
67 * </pre>
68 * @since 1.5
69 * @author Doug Lea
70 */
71 public abstract class AbstractExecutorService implements ExecutorService {
72
73 /**
74 * Returns a <tt>RunnableFuture</tt> for the given runnable and default
75 * value.
76 *
77 * @param runnable the runnable task being wrapped
78 * @param value the default value for the returned future
79 * @return a <tt>RunnableFuture</tt> which when run will run the
80 * underlying runnable and which, as a <tt>Future</tt>, will yield
81 * the given value as its result and provide for cancellation of
82 * the underlying task.
83 * @since 1.6
84 */
85 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
86 return new FutureTask<T>(runnable, value);
87 }
89 /**
90 * Returns a <tt>RunnableFuture</tt> for the given callable task.
91 *
92 * @param callable the callable task being wrapped
93 * @return a <tt>RunnableFuture</tt> which when run will call the
94 * underlying callable and which, as a <tt>Future</tt>, will yield
95 * the callable's result as its result and provide for
96 * cancellation of the underlying task.
97 * @since 1.6
98 */
99 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
100 return new FutureTask<T>(callable);
101 }
102
103 /**
104 * @throws RejectedExecutionException {@inheritDoc}
105 * @throws NullPointerException {@inheritDoc}
106 */
107 public Future<?> submit(Runnable task) {
108 if (task == null) throw new NullPointerException();
109 RunnableFuture<Object> ftask = newTaskFor(task, null);
110 execute(ftask);
111 return ftask;
112 }
113
114 /**
115 * @throws RejectedExecutionException {@inheritDoc}
116 * @throws NullPointerException {@inheritDoc}
117 */
118 public <T> Future<T> submit(Runnable task, T result) {
119 if (task == null) throw new NullPointerException();
120 RunnableFuture<T> ftask = newTaskFor(task, result);
121 execute(ftask);
122 return ftask;
123 }
124
125 /**
126 * @throws RejectedExecutionException {@inheritDoc}
127 * @throws NullPointerException {@inheritDoc}
128 */
129 public <T> Future<T> submit(Callable<T> task) {
141 throws InterruptedException, ExecutionException, TimeoutException {
142 if (tasks == null)
143 throw new NullPointerException();
144 int ntasks = tasks.size();
145 if (ntasks == 0)
146 throw new IllegalArgumentException();
147 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
148 ExecutorCompletionService<T> ecs =
149 new ExecutorCompletionService<T>(this);
150
151 // For efficiency, especially in executors with limited
152 // parallelism, check to see if previously submitted tasks are
153 // done before submitting more of them. This interleaving
154 // plus the exception mechanics account for messiness of main
155 // loop.
156
157 try {
158 // Record exceptions so that if we fail to obtain any
159 // result, we can throw the last exception we got.
160 ExecutionException ee = null;
161 long lastTime = (timed)? System.nanoTime() : 0;
162 Iterator<? extends Callable<T>> it = tasks.iterator();
163
164 // Start one task for sure; the rest incrementally
165 futures.add(ecs.submit(it.next()));
166 --ntasks;
167 int active = 1;
168
169 for (;;) {
170 Future<T> f = ecs.poll();
171 if (f == null) {
172 if (ntasks > 0) {
173 --ntasks;
174 futures.add(ecs.submit(it.next()));
175 ++active;
176 }
177 else if (active == 0)
178 break;
179 else if (timed) {
180 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
181 if (f == null)
182 throw new TimeoutException();
183 long now = System.nanoTime();
184 nanos -= now - lastTime;
185 lastTime = now;
186 }
187 else
188 f = ecs.take();
189 }
190 if (f != null) {
191 --active;
192 try {
193 return f.get();
194 } catch (InterruptedException ie) {
195 throw ie;
196 } catch (ExecutionException eex) {
197 ee = eex;
198 } catch (RuntimeException rex) {
199 ee = new ExecutionException(rex);
200 }
201 }
202 }
203
204 if (ee == null)
205 ee = new ExecutionException();
206 throw ee;
207
208 } finally {
209 for (Future<T> f : futures)
210 f.cancel(true);
211 }
212 }
213
214 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
215 throws InterruptedException, ExecutionException {
|
34 */
35
36 package java.util.concurrent;
37 import java.util.*;
38
39 /**
40 * Provides default implementations of {@link ExecutorService}
41 * execution methods. This class implements the <tt>submit</tt>,
42 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
43 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
44 * to the {@link FutureTask} class provided in this package. For example,
45 * the implementation of <tt>submit(Runnable)</tt> creates an
46 * associated <tt>RunnableFuture</tt> that is executed and
47 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
48 * to return <tt>RunnableFuture</tt> implementations other than
49 * <tt>FutureTask</tt>.
50 *
51 * <p> <b>Extension example</b>. Here is a sketch of a class
52 * that customizes {@link ThreadPoolExecutor} to use
53 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
54 * <pre> {@code
55 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
56 *
57 * static class CustomTask<V> implements RunnableFuture<V> {...}
58 *
59 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
60 * return new CustomTask<V>(c);
61 * }
62 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
63 * return new CustomTask<V>(r, v);
64 * }
65 * // ... add constructors, etc.
66 * }}</pre>
67 *
68 * @since 1.5
69 * @author Doug Lea
70 */
71 public abstract class AbstractExecutorService implements ExecutorService {
72
73 /**
74 * Returns a <tt>RunnableFuture</tt> for the given runnable and default
75 * value.
76 *
77 * @param runnable the runnable task being wrapped
78 * @param value the default value for the returned future
79 * @return a <tt>RunnableFuture</tt> which when run will run the
80 * underlying runnable and which, as a <tt>Future</tt>, will yield
81 * the given value as its result and provide for cancellation of
82 * the underlying task.
83 * @since 1.6
84 */
85 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
86 return new FutureTask<T>(runnable, value);
87 }
89 /**
90 * Returns a <tt>RunnableFuture</tt> for the given callable task.
91 *
92 * @param callable the callable task being wrapped
93 * @return a <tt>RunnableFuture</tt> which when run will call the
94 * underlying callable and which, as a <tt>Future</tt>, will yield
95 * the callable's result as its result and provide for
96 * cancellation of the underlying task.
97 * @since 1.6
98 */
99 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
100 return new FutureTask<T>(callable);
101 }
102
103 /**
104 * @throws RejectedExecutionException {@inheritDoc}
105 * @throws NullPointerException {@inheritDoc}
106 */
107 public Future<?> submit(Runnable task) {
108 if (task == null) throw new NullPointerException();
109 RunnableFuture<Void> ftask = newTaskFor(task, null);
110 execute(ftask);
111 return ftask;
112 }
113
114 /**
115 * @throws RejectedExecutionException {@inheritDoc}
116 * @throws NullPointerException {@inheritDoc}
117 */
118 public <T> Future<T> submit(Runnable task, T result) {
119 if (task == null) throw new NullPointerException();
120 RunnableFuture<T> ftask = newTaskFor(task, result);
121 execute(ftask);
122 return ftask;
123 }
124
125 /**
126 * @throws RejectedExecutionException {@inheritDoc}
127 * @throws NullPointerException {@inheritDoc}
128 */
129 public <T> Future<T> submit(Callable<T> task) {
141 throws InterruptedException, ExecutionException, TimeoutException {
142 if (tasks == null)
143 throw new NullPointerException();
144 int ntasks = tasks.size();
145 if (ntasks == 0)
146 throw new IllegalArgumentException();
147 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
148 ExecutorCompletionService<T> ecs =
149 new ExecutorCompletionService<T>(this);
150
151 // For efficiency, especially in executors with limited
152 // parallelism, check to see if previously submitted tasks are
153 // done before submitting more of them. This interleaving
154 // plus the exception mechanics account for messiness of main
155 // loop.
156
157 try {
158 // Record exceptions so that if we fail to obtain any
159 // result, we can throw the last exception we got.
160 ExecutionException ee = null;
161 long lastTime = timed ? System.nanoTime() : 0;
162 Iterator<? extends Callable<T>> it = tasks.iterator();
163
164 // Start one task for sure; the rest incrementally
165 futures.add(ecs.submit(it.next()));
166 --ntasks;
167 int active = 1;
168
169 for (;;) {
170 Future<T> f = ecs.poll();
171 if (f == null) {
172 if (ntasks > 0) {
173 --ntasks;
174 futures.add(ecs.submit(it.next()));
175 ++active;
176 }
177 else if (active == 0)
178 break;
179 else if (timed) {
180 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
181 if (f == null)
182 throw new TimeoutException();
183 long now = System.nanoTime();
184 nanos -= now - lastTime;
185 lastTime = now;
186 }
187 else
188 f = ecs.take();
189 }
190 if (f != null) {
191 --active;
192 try {
193 return f.get();
194 } catch (ExecutionException eex) {
195 ee = eex;
196 } catch (RuntimeException rex) {
197 ee = new ExecutionException(rex);
198 }
199 }
200 }
201
202 if (ee == null)
203 ee = new ExecutionException();
204 throw ee;
205
206 } finally {
207 for (Future<T> f : futures)
208 f.cancel(true);
209 }
210 }
211
212 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
213 throws InterruptedException, ExecutionException {
|