Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/AbstractExecutorService.java
+++ new/src/share/classes/java/util/concurrent/AbstractExecutorService.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37 import java.util.*;
38 38
39 39 /**
40 40 * Provides default implementations of {@link ExecutorService}
41 41 * execution methods. This class implements the <tt>submit</tt>,
42 42 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
43 43 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
↓ open down ↓ |
43 lines elided |
↑ open up ↑ |
44 44 * to the {@link FutureTask} class provided in this package. For example,
45 45 * the implementation of <tt>submit(Runnable)</tt> creates an
46 46 * associated <tt>RunnableFuture</tt> that is executed and
47 47 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
48 48 * to return <tt>RunnableFuture</tt> implementations other than
49 49 * <tt>FutureTask</tt>.
50 50 *
51 51 * <p> <b>Extension example</b>. Here is a sketch of a class
52 52 * that customizes {@link ThreadPoolExecutor} to use
53 53 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
54 - * <pre>
54 + * <pre> {@code
55 55 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
56 56 *
57 - * static class CustomTask<V> implements RunnableFuture<V> {...}
57 + * static class CustomTask<V> implements RunnableFuture<V> {...}
58 58 *
59 - * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
60 - * return new CustomTask<V>(c);
59 + * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
60 + * return new CustomTask<V>(c);
61 61 * }
62 - * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
63 - * return new CustomTask<V>(r, v);
62 + * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
63 + * return new CustomTask<V>(r, v);
64 64 * }
65 65 * // ... add constructors, etc.
66 - * }
67 - * </pre>
66 + * }}</pre>
67 + *
68 68 * @since 1.5
69 69 * @author Doug Lea
70 70 */
71 71 public abstract class AbstractExecutorService implements ExecutorService {
72 72
73 73 /**
74 74 * Returns a <tt>RunnableFuture</tt> for the given runnable and default
75 75 * value.
76 76 *
77 77 * @param runnable the runnable task being wrapped
78 78 * @param value the default value for the returned future
79 79 * @return a <tt>RunnableFuture</tt> which when run will run the
80 80 * underlying runnable and which, as a <tt>Future</tt>, will yield
81 81 * the given value as its result and provide for cancellation of
82 82 * the underlying task.
83 83 * @since 1.6
84 84 */
85 85 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
86 86 return new FutureTask<T>(runnable, value);
87 87 }
88 88
89 89 /**
90 90 * Returns a <tt>RunnableFuture</tt> for the given callable task.
91 91 *
92 92 * @param callable the callable task being wrapped
93 93 * @return a <tt>RunnableFuture</tt> which when run will call the
94 94 * underlying callable and which, as a <tt>Future</tt>, will yield
95 95 * the callable's result as its result and provide for
96 96 * cancellation of the underlying task.
97 97 * @since 1.6
98 98 */
↓ open down ↓ |
21 lines elided |
↑ open up ↑ |
99 99 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
100 100 return new FutureTask<T>(callable);
101 101 }
102 102
103 103 /**
104 104 * @throws RejectedExecutionException {@inheritDoc}
105 105 * @throws NullPointerException {@inheritDoc}
106 106 */
107 107 public Future<?> submit(Runnable task) {
108 108 if (task == null) throw new NullPointerException();
109 - RunnableFuture<Object> ftask = newTaskFor(task, null);
109 + RunnableFuture<Void> ftask = newTaskFor(task, null);
110 110 execute(ftask);
111 111 return ftask;
112 112 }
113 113
114 114 /**
115 115 * @throws RejectedExecutionException {@inheritDoc}
116 116 * @throws NullPointerException {@inheritDoc}
117 117 */
118 118 public <T> Future<T> submit(Runnable task, T result) {
119 119 if (task == null) throw new NullPointerException();
120 120 RunnableFuture<T> ftask = newTaskFor(task, result);
121 121 execute(ftask);
122 122 return ftask;
123 123 }
124 124
125 125 /**
126 126 * @throws RejectedExecutionException {@inheritDoc}
127 127 * @throws NullPointerException {@inheritDoc}
128 128 */
129 129 public <T> Future<T> submit(Callable<T> task) {
130 130 if (task == null) throw new NullPointerException();
131 131 RunnableFuture<T> ftask = newTaskFor(task);
132 132 execute(ftask);
133 133 return ftask;
134 134 }
135 135
136 136 /**
137 137 * the main mechanics of invokeAny.
138 138 */
139 139 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
140 140 boolean timed, long nanos)
141 141 throws InterruptedException, ExecutionException, TimeoutException {
142 142 if (tasks == null)
143 143 throw new NullPointerException();
144 144 int ntasks = tasks.size();
145 145 if (ntasks == 0)
146 146 throw new IllegalArgumentException();
147 147 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
148 148 ExecutorCompletionService<T> ecs =
149 149 new ExecutorCompletionService<T>(this);
150 150
↓ open down ↓ |
31 lines elided |
↑ open up ↑ |
151 151 // For efficiency, especially in executors with limited
152 152 // parallelism, check to see if previously submitted tasks are
153 153 // done before submitting more of them. This interleaving
154 154 // plus the exception mechanics account for messiness of main
155 155 // loop.
156 156
157 157 try {
158 158 // Record exceptions so that if we fail to obtain any
159 159 // result, we can throw the last exception we got.
160 160 ExecutionException ee = null;
161 - long lastTime = (timed)? System.nanoTime() : 0;
161 + long lastTime = timed ? System.nanoTime() : 0;
162 162 Iterator<? extends Callable<T>> it = tasks.iterator();
163 163
164 164 // Start one task for sure; the rest incrementally
165 165 futures.add(ecs.submit(it.next()));
166 166 --ntasks;
167 167 int active = 1;
168 168
169 169 for (;;) {
170 170 Future<T> f = ecs.poll();
171 171 if (f == null) {
172 172 if (ntasks > 0) {
173 173 --ntasks;
174 174 futures.add(ecs.submit(it.next()));
175 175 ++active;
176 176 }
177 177 else if (active == 0)
178 178 break;
179 179 else if (timed) {
180 180 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
181 181 if (f == null)
182 182 throw new TimeoutException();
183 183 long now = System.nanoTime();
↓ open down ↓ |
12 lines elided |
↑ open up ↑ |
184 184 nanos -= now - lastTime;
185 185 lastTime = now;
186 186 }
187 187 else
188 188 f = ecs.take();
189 189 }
190 190 if (f != null) {
191 191 --active;
192 192 try {
193 193 return f.get();
194 - } catch (InterruptedException ie) {
195 - throw ie;
196 194 } catch (ExecutionException eex) {
197 195 ee = eex;
198 196 } catch (RuntimeException rex) {
199 197 ee = new ExecutionException(rex);
200 198 }
201 199 }
202 200 }
203 201
204 202 if (ee == null)
205 203 ee = new ExecutionException();
206 204 throw ee;
207 205
208 206 } finally {
209 207 for (Future<T> f : futures)
210 208 f.cancel(true);
211 209 }
212 210 }
213 211
214 212 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
215 213 throws InterruptedException, ExecutionException {
216 214 try {
217 215 return doInvokeAny(tasks, false, 0);
218 216 } catch (TimeoutException cannotHappen) {
219 217 assert false;
220 218 return null;
221 219 }
222 220 }
223 221
224 222 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
225 223 long timeout, TimeUnit unit)
226 224 throws InterruptedException, ExecutionException, TimeoutException {
227 225 return doInvokeAny(tasks, true, unit.toNanos(timeout));
228 226 }
229 227
230 228 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
231 229 throws InterruptedException {
232 230 if (tasks == null)
233 231 throw new NullPointerException();
234 232 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
235 233 boolean done = false;
236 234 try {
237 235 for (Callable<T> t : tasks) {
238 236 RunnableFuture<T> f = newTaskFor(t);
239 237 futures.add(f);
240 238 execute(f);
241 239 }
242 240 for (Future<T> f : futures) {
243 241 if (!f.isDone()) {
244 242 try {
245 243 f.get();
246 244 } catch (CancellationException ignore) {
247 245 } catch (ExecutionException ignore) {
248 246 }
249 247 }
250 248 }
251 249 done = true;
252 250 return futures;
253 251 } finally {
254 252 if (!done)
255 253 for (Future<T> f : futures)
256 254 f.cancel(true);
257 255 }
258 256 }
259 257
260 258 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
261 259 long timeout, TimeUnit unit)
262 260 throws InterruptedException {
263 261 if (tasks == null || unit == null)
264 262 throw new NullPointerException();
265 263 long nanos = unit.toNanos(timeout);
266 264 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
267 265 boolean done = false;
268 266 try {
269 267 for (Callable<T> t : tasks)
270 268 futures.add(newTaskFor(t));
271 269
272 270 long lastTime = System.nanoTime();
273 271
274 272 // Interleave time checks and calls to execute in case
275 273 // executor doesn't have any/much parallelism.
276 274 Iterator<Future<T>> it = futures.iterator();
277 275 while (it.hasNext()) {
278 276 execute((Runnable)(it.next()));
279 277 long now = System.nanoTime();
280 278 nanos -= now - lastTime;
281 279 lastTime = now;
282 280 if (nanos <= 0)
283 281 return futures;
284 282 }
285 283
286 284 for (Future<T> f : futures) {
287 285 if (!f.isDone()) {
288 286 if (nanos <= 0)
289 287 return futures;
290 288 try {
291 289 f.get(nanos, TimeUnit.NANOSECONDS);
292 290 } catch (CancellationException ignore) {
293 291 } catch (ExecutionException ignore) {
294 292 } catch (TimeoutException toe) {
295 293 return futures;
296 294 }
297 295 long now = System.nanoTime();
298 296 nanos -= now - lastTime;
299 297 lastTime = now;
300 298 }
301 299 }
302 300 done = true;
303 301 return futures;
304 302 } finally {
305 303 if (!done)
306 304 for (Future<T> f : futures)
307 305 f.cancel(true);
308 306 }
309 307 }
310 308
311 309 }
↓ open down ↓ |
106 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX