1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 import java.util.*; 38 39 /** 40 * Provides default implementations of {@link ExecutorService} 41 * execution methods. This class implements the <tt>submit</tt>, 42 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a 43 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults 44 * to the {@link FutureTask} class provided in this package. For example, 45 * the implementation of <tt>submit(Runnable)</tt> creates an 46 * associated <tt>RunnableFuture</tt> that is executed and 47 * returned. Subclasses may override the <tt>newTaskFor</tt> methods 48 * to return <tt>RunnableFuture</tt> implementations other than 49 * <tt>FutureTask</tt>. 50 * 51 * <p> <b>Extension example</b>. Here is a sketch of a class 52 * that customizes {@link ThreadPoolExecutor} to use 53 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>: 54 * <pre> 55 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 56 * 57 * static class CustomTask<V> implements RunnableFuture<V> {...} 58 * 59 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 60 * return new CustomTask<V>(c); 61 * } 62 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 63 * return new CustomTask<V>(r, v); 64 * } 65 * // ... add constructors, etc. 66 * } 67 * </pre> 68 * @since 1.5 69 * @author Doug Lea 70 */ 71 public abstract class AbstractExecutorService implements ExecutorService { 72 73 /** 74 * Returns a <tt>RunnableFuture</tt> for the given runnable and default 75 * value. 76 * 77 * @param runnable the runnable task being wrapped 78 * @param value the default value for the returned future 79 * @return a <tt>RunnableFuture</tt> which when run will run the 80 * underlying runnable and which, as a <tt>Future</tt>, will yield 81 * the given value as its result and provide for cancellation of 82 * the underlying task. 83 * @since 1.6 84 */ 85 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 86 return new FutureTask<T>(runnable, value); 87 } 88 89 /** 90 * Returns a <tt>RunnableFuture</tt> for the given callable task. 91 * 92 * @param callable the callable task being wrapped 93 * @return a <tt>RunnableFuture</tt> which when run will call the 94 * underlying callable and which, as a <tt>Future</tt>, will yield 95 * the callable's result as its result and provide for 96 * cancellation of the underlying task. 97 * @since 1.6 98 */ 99 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 100 return new FutureTask<T>(callable); 101 } 102 103 /** 104 * @throws RejectedExecutionException {@inheritDoc} 105 * @throws NullPointerException {@inheritDoc} 106 */ 107 public Future<?> submit(Runnable task) { 108 if (task == null) throw new NullPointerException(); 109 RunnableFuture<Object> ftask = newTaskFor(task, null); 110 execute(ftask); 111 return ftask; 112 } 113 114 /** 115 * @throws RejectedExecutionException {@inheritDoc} 116 * @throws NullPointerException {@inheritDoc} 117 */ 118 public <T> Future<T> submit(Runnable task, T result) { 119 if (task == null) throw new NullPointerException(); 120 RunnableFuture<T> ftask = newTaskFor(task, result); 121 execute(ftask); 122 return ftask; 123 } 124 125 /** 126 * @throws RejectedExecutionException {@inheritDoc} 127 * @throws NullPointerException {@inheritDoc} 128 */ 129 public <T> Future<T> submit(Callable<T> task) { 130 if (task == null) throw new NullPointerException(); 131 RunnableFuture<T> ftask = newTaskFor(task); 132 execute(ftask); 133 return ftask; 134 } 135 136 /** 137 * the main mechanics of invokeAny. 138 */ 139 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 140 boolean timed, long nanos) 141 throws InterruptedException, ExecutionException, TimeoutException { 142 if (tasks == null) 143 throw new NullPointerException(); 144 int ntasks = tasks.size(); 145 if (ntasks == 0) 146 throw new IllegalArgumentException(); 147 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 148 ExecutorCompletionService<T> ecs = 149 new ExecutorCompletionService<T>(this); 150 151 // For efficiency, especially in executors with limited 152 // parallelism, check to see if previously submitted tasks are 153 // done before submitting more of them. This interleaving 154 // plus the exception mechanics account for messiness of main 155 // loop. 156 157 try { 158 // Record exceptions so that if we fail to obtain any 159 // result, we can throw the last exception we got. 160 ExecutionException ee = null; 161 long lastTime = (timed)? System.nanoTime() : 0; 162 Iterator<? extends Callable<T>> it = tasks.iterator(); 163 164 // Start one task for sure; the rest incrementally 165 futures.add(ecs.submit(it.next())); 166 --ntasks; 167 int active = 1; 168 169 for (;;) { 170 Future<T> f = ecs.poll(); 171 if (f == null) { 172 if (ntasks > 0) { 173 --ntasks; 174 futures.add(ecs.submit(it.next())); 175 ++active; 176 } 177 else if (active == 0) 178 break; 179 else if (timed) { 180 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 181 if (f == null) 182 throw new TimeoutException(); 183 long now = System.nanoTime(); 184 nanos -= now - lastTime; 185 lastTime = now; 186 } 187 else 188 f = ecs.take(); 189 } 190 if (f != null) { 191 --active; 192 try { 193 return f.get(); 194 } catch (InterruptedException ie) { 195 throw ie; 196 } catch (ExecutionException eex) { 197 ee = eex; 198 } catch (RuntimeException rex) { 199 ee = new ExecutionException(rex); 200 } 201 } 202 } 203 204 if (ee == null) 205 ee = new ExecutionException(); 206 throw ee; 207 208 } finally { 209 for (Future<T> f : futures) 210 f.cancel(true); 211 } 212 } 213 214 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 215 throws InterruptedException, ExecutionException { 216 try { 217 return doInvokeAny(tasks, false, 0); 218 } catch (TimeoutException cannotHappen) { 219 assert false; 220 return null; 221 } 222 } 223 224 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 225 long timeout, TimeUnit unit) 226 throws InterruptedException, ExecutionException, TimeoutException { 227 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 228 } 229 230 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 231 throws InterruptedException { 232 if (tasks == null) 233 throw new NullPointerException(); 234 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 235 boolean done = false; 236 try { 237 for (Callable<T> t : tasks) { 238 RunnableFuture<T> f = newTaskFor(t); 239 futures.add(f); 240 execute(f); 241 } 242 for (Future<T> f : futures) { 243 if (!f.isDone()) { 244 try { 245 f.get(); 246 } catch (CancellationException ignore) { 247 } catch (ExecutionException ignore) { 248 } 249 } 250 } 251 done = true; 252 return futures; 253 } finally { 254 if (!done) 255 for (Future<T> f : futures) 256 f.cancel(true); 257 } 258 } 259 260 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 261 long timeout, TimeUnit unit) 262 throws InterruptedException { 263 if (tasks == null || unit == null) 264 throw new NullPointerException(); 265 long nanos = unit.toNanos(timeout); 266 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 267 boolean done = false; 268 try { 269 for (Callable<T> t : tasks) 270 futures.add(newTaskFor(t)); 271 272 long lastTime = System.nanoTime(); 273 274 // Interleave time checks and calls to execute in case 275 // executor doesn't have any/much parallelism. 276 Iterator<Future<T>> it = futures.iterator(); 277 while (it.hasNext()) { 278 execute((Runnable)(it.next())); 279 long now = System.nanoTime(); 280 nanos -= now - lastTime; 281 lastTime = now; 282 if (nanos <= 0) 283 return futures; 284 } 285 286 for (Future<T> f : futures) { 287 if (!f.isDone()) { 288 if (nanos <= 0) 289 return futures; 290 try { 291 f.get(nanos, TimeUnit.NANOSECONDS); 292 } catch (CancellationException ignore) { 293 } catch (ExecutionException ignore) { 294 } catch (TimeoutException toe) { 295 return futures; 296 } 297 long now = System.nanoTime(); 298 nanos -= now - lastTime; 299 lastTime = now; 300 } 301 } 302 done = true; 303 return futures; 304 } finally { 305 if (!done) 306 for (Future<T> f : futures) 307 f.cancel(true); 308 } 309 } 310 311 } --- EOF ---