1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 import static java.util.concurrent.TimeUnit.NANOSECONDS; 38 import java.util.concurrent.atomic.AtomicLong; 39 import java.util.concurrent.locks.Condition; 40 import java.util.concurrent.locks.ReentrantLock; 41 import java.util.*; 42 43 /** 44 * A {@link ThreadPoolExecutor} that can additionally schedule 45 * commands to run after a given delay, or to execute 46 * periodically. This class is preferable to {@link java.util.Timer} 47 * when multiple worker threads are needed, or when the additional 48 * flexibility or capabilities of {@link ThreadPoolExecutor} (which 49 * this class extends) are required. 50 * 51 * <p>Delayed tasks execute no sooner than they are enabled, but 52 * without any real-time guarantees about when, after they are 53 * enabled, they will commence. Tasks scheduled for exactly the same 54 * execution time are enabled in first-in-first-out (FIFO) order of 55 * submission. 56 * 57 * <p>When a submitted task is cancelled before it is run, execution 58 * is suppressed. By default, such a cancelled task is not 59 * automatically removed from the work queue until its delay 60 * elapses. While this enables further inspection and monitoring, it 61 * may also cause unbounded retention of cancelled tasks. To avoid 62 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which 63 * causes tasks to be immediately removed from the work queue at 64 * time of cancellation. 65 * 66 * <p>Successive executions of a task scheduled via 67 * {@code scheduleAtFixedRate} or 68 * {@code scheduleWithFixedDelay} do not overlap. While different 69 * executions may be performed by different threads, the effects of 70 * prior executions <a 71 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 72 * those of subsequent ones. 73 * 74 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 75 * of the inherited tuning methods are not useful for it. In 76 * particular, because it acts as a fixed-sized pool using 77 * {@code corePoolSize} threads and an unbounded queue, adjustments 78 * to {@code maximumPoolSize} have no useful effect. Additionally, it 79 * is almost never a good idea to set {@code corePoolSize} to zero or 80 * use {@code allowCoreThreadTimeOut} because this may leave the pool 81 * without threads to handle tasks once they become eligible to run. 82 * 83 * <p><b>Extension notes:</b> This class overrides the 84 * {@link ThreadPoolExecutor#execute execute} and 85 * {@link AbstractExecutorService#submit(Runnable) submit} 86 * methods to generate internal {@link ScheduledFuture} objects to 87 * control per-task delays and scheduling. To preserve 88 * functionality, any further overrides of these methods in 89 * subclasses must invoke superclass versions, which effectively 90 * disables additional task customization. However, this class 91 * provides alternative protected extension method 92 * {@code decorateTask} (one version each for {@code Runnable} and 93 * {@code Callable}) that can be used to customize the concrete task 94 * types used to execute commands entered via {@code execute}, 95 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 96 * and {@code scheduleWithFixedDelay}. By default, a 97 * {@code ScheduledThreadPoolExecutor} uses a task type extending 98 * {@link FutureTask}. However, this may be modified or replaced using 99 * subclasses of the form: 100 * 101 * <pre> {@code 102 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 103 * 104 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 105 * 106 * protected <V> RunnableScheduledFuture<V> decorateTask( 107 * Runnable r, RunnableScheduledFuture<V> task) { 108 * return new CustomTask<V>(r, task); 109 * } 110 * 111 * protected <V> RunnableScheduledFuture<V> decorateTask( 112 * Callable<V> c, RunnableScheduledFuture<V> task) { 113 * return new CustomTask<V>(c, task); 114 * } 115 * // ... add constructors, etc. 116 * }}</pre> 117 * 118 * @since 1.5 119 * @author Doug Lea 120 */ 121 public class ScheduledThreadPoolExecutor 122 extends ThreadPoolExecutor 123 implements ScheduledExecutorService { 124 125 /* 126 * This class specializes ThreadPoolExecutor implementation by 127 * 128 * 1. Using a custom task type, ScheduledFutureTask for 129 * tasks, even those that don't require scheduling (i.e., 130 * those submitted using ExecutorService execute, not 131 * ScheduledExecutorService methods) which are treated as 132 * delayed tasks with a delay of zero. 133 * 134 * 2. Using a custom queue (DelayedWorkQueue), a variant of 135 * unbounded DelayQueue. The lack of capacity constraint and 136 * the fact that corePoolSize and maximumPoolSize are 137 * effectively identical simplifies some execution mechanics 138 * (see delayedExecute) compared to ThreadPoolExecutor. 139 * 140 * 3. Supporting optional run-after-shutdown parameters, which 141 * leads to overrides of shutdown methods to remove and cancel 142 * tasks that should NOT be run after shutdown, as well as 143 * different recheck logic when task (re)submission overlaps 144 * with a shutdown. 145 * 146 * 4. Task decoration methods to allow interception and 147 * instrumentation, which are needed because subclasses cannot 148 * otherwise override submit methods to get this effect. These 149 * don't have any impact on pool control logic though. 150 */ 151 152 /** 153 * False if should cancel/suppress periodic tasks on shutdown. 154 */ 155 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 156 157 /** 158 * False if should cancel non-periodic tasks on shutdown. 159 */ 160 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 161 162 /** 163 * True if ScheduledFutureTask.cancel should remove from queue 164 */ 165 private volatile boolean removeOnCancel = false; 166 167 /** 168 * Sequence number to break scheduling ties, and in turn to 169 * guarantee FIFO order among tied entries. 170 */ 171 private static final AtomicLong sequencer = new AtomicLong(); 172 173 /** 174 * Returns current nanosecond time. 175 */ 176 final long now() { 177 return System.nanoTime(); 178 } 179 180 private class ScheduledFutureTask<V> 181 extends FutureTask<V> implements RunnableScheduledFuture<V> { 182 183 /** Sequence number to break ties FIFO */ 184 private final long sequenceNumber; 185 186 /** The time the task is enabled to execute in nanoTime units */ 187 private long time; 188 189 /** 190 * Period in nanoseconds for repeating tasks. A positive 191 * value indicates fixed-rate execution. A negative value 192 * indicates fixed-delay execution. A value of 0 indicates a 193 * non-repeating task. 194 */ 195 private final long period; 196 197 /** The actual task to be re-enqueued by reExecutePeriodic */ 198 RunnableScheduledFuture<V> outerTask = this; 199 200 /** 201 * Index into delay queue, to support faster cancellation. 202 */ 203 int heapIndex; 204 205 /** 206 * Creates a one-shot action with given nanoTime-based trigger time. 207 */ 208 ScheduledFutureTask(Runnable r, V result, long ns) { 209 super(r, result); 210 this.time = ns; 211 this.period = 0; 212 this.sequenceNumber = sequencer.getAndIncrement(); 213 } 214 215 /** 216 * Creates a periodic action with given nano time and period. 217 */ 218 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 219 super(r, result); 220 this.time = ns; 221 this.period = period; 222 this.sequenceNumber = sequencer.getAndIncrement(); 223 } 224 225 /** 226 * Creates a one-shot action with given nanoTime-based trigger. 227 */ 228 ScheduledFutureTask(Callable<V> callable, long ns) { 229 super(callable); 230 this.time = ns; 231 this.period = 0; 232 this.sequenceNumber = sequencer.getAndIncrement(); 233 } 234 235 public long getDelay(TimeUnit unit) { 236 return unit.convert(time - now(), NANOSECONDS); 237 } 238 239 public int compareTo(Delayed other) { 240 if (other == this) // compare zero ONLY if same object 241 return 0; 242 if (other instanceof ScheduledFutureTask) { 243 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 244 long diff = time - x.time; 245 if (diff < 0) 246 return -1; 247 else if (diff > 0) 248 return 1; 249 else if (sequenceNumber < x.sequenceNumber) 250 return -1; 251 else 252 return 1; 253 } 254 long d = (getDelay(NANOSECONDS) - 255 other.getDelay(NANOSECONDS)); 256 return (d == 0) ? 0 : ((d < 0) ? -1 : 1); 257 } 258 259 /** 260 * Returns true if this is a periodic (not a one-shot) action. 261 * 262 * @return true if periodic 263 */ 264 public boolean isPeriodic() { 265 return period != 0; 266 } 267 268 /** 269 * Sets the next time to run for a periodic task. 270 */ 271 private void setNextRunTime() { 272 long p = period; 273 if (p > 0) 274 time += p; 275 else 276 time = triggerTime(-p); 277 } 278 279 public boolean cancel(boolean mayInterruptIfRunning) { 280 boolean cancelled = super.cancel(mayInterruptIfRunning); 281 if (cancelled && removeOnCancel && heapIndex >= 0) 282 remove(this); 283 return cancelled; 284 } 285 286 /** 287 * Overrides FutureTask version so as to reset/requeue if periodic. 288 */ 289 public void run() { 290 boolean periodic = isPeriodic(); 291 if (!canRunInCurrentRunState(periodic)) 292 cancel(false); 293 else if (!periodic) 294 ScheduledFutureTask.super.run(); 295 else if (ScheduledFutureTask.super.runAndReset()) { 296 setNextRunTime(); 297 reExecutePeriodic(outerTask); 298 } 299 } 300 } 301 302 /** 303 * Returns true if can run a task given current run state 304 * and run-after-shutdown parameters. 305 * 306 * @param periodic true if this task periodic, false if delayed 307 */ 308 boolean canRunInCurrentRunState(boolean periodic) { 309 return isRunningOrShutdown(periodic ? 310 continueExistingPeriodicTasksAfterShutdown : 311 executeExistingDelayedTasksAfterShutdown); 312 } 313 314 /** 315 * Main execution method for delayed or periodic tasks. If pool 316 * is shut down, rejects the task. Otherwise adds task to queue 317 * and starts a thread, if necessary, to run it. (We cannot 318 * prestart the thread to run the task because the task (probably) 319 * shouldn't be run yet,) If the pool is shut down while the task 320 * is being added, cancel and remove it if required by state and 321 * run-after-shutdown parameters. 322 * 323 * @param task the task 324 */ 325 private void delayedExecute(RunnableScheduledFuture<?> task) { 326 if (isShutdown()) 327 reject(task); 328 else { 329 super.getQueue().add(task); 330 if (isShutdown() && 331 !canRunInCurrentRunState(task.isPeriodic()) && 332 remove(task)) 333 task.cancel(false); 334 else 335 ensurePrestart(); 336 } 337 } 338 339 /** 340 * Requeues a periodic task unless current run state precludes it. 341 * Same idea as delayedExecute except drops task rather than rejecting. 342 * 343 * @param task the task 344 */ 345 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 346 if (canRunInCurrentRunState(true)) { 347 super.getQueue().add(task); 348 if (!canRunInCurrentRunState(true) && remove(task)) 349 task.cancel(false); 350 else 351 ensurePrestart(); 352 } 353 } 354 355 /** 356 * Cancels and clears the queue of all tasks that should not be run 357 * due to shutdown policy. Invoked within super.shutdown. 358 */ 359 @Override void onShutdown() { 360 BlockingQueue<Runnable> q = super.getQueue(); 361 boolean keepDelayed = 362 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 363 boolean keepPeriodic = 364 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 365 if (!keepDelayed && !keepPeriodic) { 366 for (Object e : q.toArray()) 367 if (e instanceof RunnableScheduledFuture<?>) 368 ((RunnableScheduledFuture<?>) e).cancel(false); 369 q.clear(); 370 } 371 else { 372 // Traverse snapshot to avoid iterator exceptions 373 for (Object e : q.toArray()) { 374 if (e instanceof RunnableScheduledFuture) { 375 RunnableScheduledFuture<?> t = 376 (RunnableScheduledFuture<?>)e; 377 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || 378 t.isCancelled()) { // also remove if already cancelled 379 if (q.remove(t)) 380 t.cancel(false); 381 } 382 } 383 } 384 } 385 tryTerminate(); 386 } 387 388 /** 389 * Modifies or replaces the task used to execute a runnable. 390 * This method can be used to override the concrete 391 * class used for managing internal tasks. 392 * The default implementation simply returns the given task. 393 * 394 * @param runnable the submitted Runnable 395 * @param task the task created to execute the runnable 396 * @return a task that can execute the runnable 397 * @since 1.6 398 */ 399 protected <V> RunnableScheduledFuture<V> decorateTask( 400 Runnable runnable, RunnableScheduledFuture<V> task) { 401 return task; 402 } 403 404 /** 405 * Modifies or replaces the task used to execute a callable. 406 * This method can be used to override the concrete 407 * class used for managing internal tasks. 408 * The default implementation simply returns the given task. 409 * 410 * @param callable the submitted Callable 411 * @param task the task created to execute the callable 412 * @return a task that can execute the callable 413 * @since 1.6 414 */ 415 protected <V> RunnableScheduledFuture<V> decorateTask( 416 Callable<V> callable, RunnableScheduledFuture<V> task) { 417 return task; 418 } 419 420 /** 421 * Creates a new {@code ScheduledThreadPoolExecutor} with the 422 * given core pool size. 423 * 424 * @param corePoolSize the number of threads to keep in the pool, even 425 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 426 * @throws IllegalArgumentException if {@code corePoolSize < 0} 427 */ 428 public ScheduledThreadPoolExecutor(int corePoolSize) { 429 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 430 new DelayedWorkQueue()); 431 } 432 433 /** 434 * Creates a new {@code ScheduledThreadPoolExecutor} with the 435 * given initial parameters. 436 * 437 * @param corePoolSize the number of threads to keep in the pool, even 438 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 439 * @param threadFactory the factory to use when the executor 440 * creates a new thread 441 * @throws IllegalArgumentException if {@code corePoolSize < 0} 442 * @throws NullPointerException if {@code threadFactory} is null 443 */ 444 public ScheduledThreadPoolExecutor(int corePoolSize, 445 ThreadFactory threadFactory) { 446 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 447 new DelayedWorkQueue(), threadFactory); 448 } 449 450 /** 451 * Creates a new ScheduledThreadPoolExecutor with the given 452 * initial parameters. 453 * 454 * @param corePoolSize the number of threads to keep in the pool, even 455 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 456 * @param handler the handler to use when execution is blocked 457 * because the thread bounds and queue capacities are reached 458 * @throws IllegalArgumentException if {@code corePoolSize < 0} 459 * @throws NullPointerException if {@code handler} is null 460 */ 461 public ScheduledThreadPoolExecutor(int corePoolSize, 462 RejectedExecutionHandler handler) { 463 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 464 new DelayedWorkQueue(), handler); 465 } 466 467 /** 468 * Creates a new ScheduledThreadPoolExecutor with the given 469 * initial parameters. 470 * 471 * @param corePoolSize the number of threads to keep in the pool, even 472 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 473 * @param threadFactory the factory to use when the executor 474 * creates a new thread 475 * @param handler the handler to use when execution is blocked 476 * because the thread bounds and queue capacities are reached 477 * @throws IllegalArgumentException if {@code corePoolSize < 0} 478 * @throws NullPointerException if {@code threadFactory} or 479 * {@code handler} is null 480 */ 481 public ScheduledThreadPoolExecutor(int corePoolSize, 482 ThreadFactory threadFactory, 483 RejectedExecutionHandler handler) { 484 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 485 new DelayedWorkQueue(), threadFactory, handler); 486 } 487 488 /** 489 * Returns the trigger time of a delayed action. 490 */ 491 private long triggerTime(long delay, TimeUnit unit) { 492 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 493 } 494 495 /** 496 * Returns the trigger time of a delayed action. 497 */ 498 long triggerTime(long delay) { 499 return now() + 500 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 501 } 502 503 /** 504 * Constrains the values of all delays in the queue to be within 505 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 506 * This may occur if a task is eligible to be dequeued, but has 507 * not yet been, while some other task is added with a delay of 508 * Long.MAX_VALUE. 509 */ 510 private long overflowFree(long delay) { 511 Delayed head = (Delayed) super.getQueue().peek(); 512 if (head != null) { 513 long headDelay = head.getDelay(NANOSECONDS); 514 if (headDelay < 0 && (delay - headDelay < 0)) 515 delay = Long.MAX_VALUE + headDelay; 516 } 517 return delay; 518 } 519 520 /** 521 * @throws RejectedExecutionException {@inheritDoc} 522 * @throws NullPointerException {@inheritDoc} 523 */ 524 public ScheduledFuture<?> schedule(Runnable command, 525 long delay, 526 TimeUnit unit) { 527 if (command == null || unit == null) 528 throw new NullPointerException(); 529 RunnableScheduledFuture<?> t = decorateTask(command, 530 new ScheduledFutureTask<Void>(command, null, 531 triggerTime(delay, unit))); 532 delayedExecute(t); 533 return t; 534 } 535 536 /** 537 * @throws RejectedExecutionException {@inheritDoc} 538 * @throws NullPointerException {@inheritDoc} 539 */ 540 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 541 long delay, 542 TimeUnit unit) { 543 if (callable == null || unit == null) 544 throw new NullPointerException(); 545 RunnableScheduledFuture<V> t = decorateTask(callable, 546 new ScheduledFutureTask<V>(callable, 547 triggerTime(delay, unit))); 548 delayedExecute(t); 549 return t; 550 } 551 552 /** 553 * @throws RejectedExecutionException {@inheritDoc} 554 * @throws NullPointerException {@inheritDoc} 555 * @throws IllegalArgumentException {@inheritDoc} 556 */ 557 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 558 long initialDelay, 559 long period, 560 TimeUnit unit) { 561 if (command == null || unit == null) 562 throw new NullPointerException(); 563 if (period <= 0) 564 throw new IllegalArgumentException(); 565 ScheduledFutureTask<Void> sft = 566 new ScheduledFutureTask<Void>(command, 567 null, 568 triggerTime(initialDelay, unit), 569 unit.toNanos(period)); 570 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 571 sft.outerTask = t; 572 delayedExecute(t); 573 return t; 574 } 575 576 /** 577 * @throws RejectedExecutionException {@inheritDoc} 578 * @throws NullPointerException {@inheritDoc} 579 * @throws IllegalArgumentException {@inheritDoc} 580 */ 581 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 582 long initialDelay, 583 long delay, 584 TimeUnit unit) { 585 if (command == null || unit == null) 586 throw new NullPointerException(); 587 if (delay <= 0) 588 throw new IllegalArgumentException(); 589 ScheduledFutureTask<Void> sft = 590 new ScheduledFutureTask<Void>(command, 591 null, 592 triggerTime(initialDelay, unit), 593 unit.toNanos(-delay)); 594 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 595 sft.outerTask = t; 596 delayedExecute(t); 597 return t; 598 } 599 600 /** 601 * Executes {@code command} with zero required delay. 602 * This has effect equivalent to 603 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 604 * Note that inspections of the queue and of the list returned by 605 * {@code shutdownNow} will access the zero-delayed 606 * {@link ScheduledFuture}, not the {@code command} itself. 607 * 608 * <p>A consequence of the use of {@code ScheduledFuture} objects is 609 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 610 * called with a null second {@code Throwable} argument, even if the 611 * {@code command} terminated abruptly. Instead, the {@code Throwable} 612 * thrown by such a task can be obtained via {@link Future#get}. 613 * 614 * @throws RejectedExecutionException at discretion of 615 * {@code RejectedExecutionHandler}, if the task 616 * cannot be accepted for execution because the 617 * executor has been shut down 618 * @throws NullPointerException {@inheritDoc} 619 */ 620 public void execute(Runnable command) { 621 schedule(command, 0, NANOSECONDS); 622 } 623 624 // Override AbstractExecutorService methods 625 626 /** 627 * @throws RejectedExecutionException {@inheritDoc} 628 * @throws NullPointerException {@inheritDoc} 629 */ 630 public Future<?> submit(Runnable task) { 631 return schedule(task, 0, NANOSECONDS); 632 } 633 634 /** 635 * @throws RejectedExecutionException {@inheritDoc} 636 * @throws NullPointerException {@inheritDoc} 637 */ 638 public <T> Future<T> submit(Runnable task, T result) { 639 return schedule(Executors.callable(task, result), 0, NANOSECONDS); 640 } 641 642 /** 643 * @throws RejectedExecutionException {@inheritDoc} 644 * @throws NullPointerException {@inheritDoc} 645 */ 646 public <T> Future<T> submit(Callable<T> task) { 647 return schedule(task, 0, NANOSECONDS); 648 } 649 650 /** 651 * Sets the policy on whether to continue executing existing 652 * periodic tasks even when this executor has been {@code shutdown}. 653 * In this case, these tasks will only terminate upon 654 * {@code shutdownNow} or after setting the policy to 655 * {@code false} when already shutdown. 656 * This value is by default {@code false}. 657 * 658 * @param value if {@code true}, continue after shutdown, else don't. 659 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 660 */ 661 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 662 continueExistingPeriodicTasksAfterShutdown = value; 663 if (!value && isShutdown()) 664 onShutdown(); 665 } 666 667 /** 668 * Gets the policy on whether to continue executing existing 669 * periodic tasks even when this executor has been {@code shutdown}. 670 * In this case, these tasks will only terminate upon 671 * {@code shutdownNow} or after setting the policy to 672 * {@code false} when already shutdown. 673 * This value is by default {@code false}. 674 * 675 * @return {@code true} if will continue after shutdown 676 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 677 */ 678 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 679 return continueExistingPeriodicTasksAfterShutdown; 680 } 681 682 /** 683 * Sets the policy on whether to execute existing delayed 684 * tasks even when this executor has been {@code shutdown}. 685 * In this case, these tasks will only terminate upon 686 * {@code shutdownNow}, or after setting the policy to 687 * {@code false} when already shutdown. 688 * This value is by default {@code true}. 689 * 690 * @param value if {@code true}, execute after shutdown, else don't. 691 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 692 */ 693 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 694 executeExistingDelayedTasksAfterShutdown = value; 695 if (!value && isShutdown()) 696 onShutdown(); 697 } 698 699 /** 700 * Gets the policy on whether to execute existing delayed 701 * tasks even when this executor has been {@code shutdown}. 702 * In this case, these tasks will only terminate upon 703 * {@code shutdownNow}, or after setting the policy to 704 * {@code false} when already shutdown. 705 * This value is by default {@code true}. 706 * 707 * @return {@code true} if will execute after shutdown 708 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 709 */ 710 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 711 return executeExistingDelayedTasksAfterShutdown; 712 } 713 714 /** 715 * Sets the policy on whether cancelled tasks should be immediately 716 * removed from the work queue at time of cancellation. This value is 717 * by default {@code false}. 718 * 719 * @param value if {@code true}, remove on cancellation, else don't 720 * @see #getRemoveOnCancelPolicy 721 * @since 1.7 722 */ 723 public void setRemoveOnCancelPolicy(boolean value) { 724 removeOnCancel = value; 725 } 726 727 /** 728 * Gets the policy on whether cancelled tasks should be immediately 729 * removed from the work queue at time of cancellation. This value is 730 * by default {@code false}. 731 * 732 * @return {@code true} if cancelled tasks are immediately removed 733 * from the queue 734 * @see #setRemoveOnCancelPolicy 735 * @since 1.7 736 */ 737 public boolean getRemoveOnCancelPolicy() { 738 return removeOnCancel; 739 } 740 741 /** 742 * Initiates an orderly shutdown in which previously submitted 743 * tasks are executed, but no new tasks will be accepted. 744 * Invocation has no additional effect if already shut down. 745 * 746 * <p>This method does not wait for previously submitted tasks to 747 * complete execution. Use {@link #awaitTermination awaitTermination} 748 * to do that. 749 * 750 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 751 * has been set {@code false}, existing delayed tasks whose delays 752 * have not yet elapsed are cancelled. And unless the {@code 753 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 754 * {@code true}, future executions of existing periodic tasks will 755 * be cancelled. 756 * 757 * @throws SecurityException {@inheritDoc} 758 */ 759 public void shutdown() { 760 super.shutdown(); 761 } 762 763 /** 764 * Attempts to stop all actively executing tasks, halts the 765 * processing of waiting tasks, and returns a list of the tasks 766 * that were awaiting execution. 767 * 768 * <p>This method does not wait for actively executing tasks to 769 * terminate. Use {@link #awaitTermination awaitTermination} to 770 * do that. 771 * 772 * <p>There are no guarantees beyond best-effort attempts to stop 773 * processing actively executing tasks. This implementation 774 * cancels tasks via {@link Thread#interrupt}, so any task that 775 * fails to respond to interrupts may never terminate. 776 * 777 * @return list of tasks that never commenced execution. 778 * Each element of this list is a {@link ScheduledFuture}, 779 * including those tasks submitted using {@code execute}, 780 * which are for scheduling purposes used as the basis of a 781 * zero-delay {@code ScheduledFuture}. 782 * @throws SecurityException {@inheritDoc} 783 */ 784 public List<Runnable> shutdownNow() { 785 return super.shutdownNow(); 786 } 787 788 /** 789 * Returns the task queue used by this executor. Each element of 790 * this queue is a {@link ScheduledFuture}, including those 791 * tasks submitted using {@code execute} which are for scheduling 792 * purposes used as the basis of a zero-delay 793 * {@code ScheduledFuture}. Iteration over this queue is 794 * <em>not</em> guaranteed to traverse tasks in the order in 795 * which they will execute. 796 * 797 * @return the task queue 798 */ 799 public BlockingQueue<Runnable> getQueue() { 800 return super.getQueue(); 801 } 802 803 /** 804 * Specialized delay queue. To mesh with TPE declarations, this 805 * class must be declared as a BlockingQueue<Runnable> even though 806 * it can only hold RunnableScheduledFutures. 807 */ 808 static class DelayedWorkQueue extends AbstractQueue<Runnable> 809 implements BlockingQueue<Runnable> { 810 811 /* 812 * A DelayedWorkQueue is based on a heap-based data structure 813 * like those in DelayQueue and PriorityQueue, except that 814 * every ScheduledFutureTask also records its index into the 815 * heap array. This eliminates the need to find a task upon 816 * cancellation, greatly speeding up removal (down from O(n) 817 * to O(log n)), and reducing garbage retention that would 818 * otherwise occur by waiting for the element to rise to top 819 * before clearing. But because the queue may also hold 820 * RunnableScheduledFutures that are not ScheduledFutureTasks, 821 * we are not guaranteed to have such indices available, in 822 * which case we fall back to linear search. (We expect that 823 * most tasks will not be decorated, and that the faster cases 824 * will be much more common.) 825 * 826 * All heap operations must record index changes -- mainly 827 * within siftUp and siftDown. Upon removal, a task's 828 * heapIndex is set to -1. Note that ScheduledFutureTasks can 829 * appear at most once in the queue (this need not be true for 830 * other kinds of tasks or work queues), so are uniquely 831 * identified by heapIndex. 832 */ 833 834 private static final int INITIAL_CAPACITY = 16; 835 private RunnableScheduledFuture<?>[] queue = 836 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 837 private final ReentrantLock lock = new ReentrantLock(); 838 private int size = 0; 839 840 /** 841 * Thread designated to wait for the task at the head of the 842 * queue. This variant of the Leader-Follower pattern 843 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 844 * minimize unnecessary timed waiting. When a thread becomes 845 * the leader, it waits only for the next delay to elapse, but 846 * other threads await indefinitely. The leader thread must 847 * signal some other thread before returning from take() or 848 * poll(...), unless some other thread becomes leader in the 849 * interim. Whenever the head of the queue is replaced with a 850 * task with an earlier expiration time, the leader field is 851 * invalidated by being reset to null, and some waiting 852 * thread, but not necessarily the current leader, is 853 * signalled. So waiting threads must be prepared to acquire 854 * and lose leadership while waiting. 855 */ 856 private Thread leader = null; 857 858 /** 859 * Condition signalled when a newer task becomes available at the 860 * head of the queue or a new thread may need to become leader. 861 */ 862 private final Condition available = lock.newCondition(); 863 864 /** 865 * Set f's heapIndex if it is a ScheduledFutureTask. 866 */ 867 private void setIndex(RunnableScheduledFuture<?> f, int idx) { 868 if (f instanceof ScheduledFutureTask) 869 ((ScheduledFutureTask)f).heapIndex = idx; 870 } 871 872 /** 873 * Sift element added at bottom up to its heap-ordered spot. 874 * Call only when holding lock. 875 */ 876 private void siftUp(int k, RunnableScheduledFuture<?> key) { 877 while (k > 0) { 878 int parent = (k - 1) >>> 1; 879 RunnableScheduledFuture<?> e = queue[parent]; 880 if (key.compareTo(e) >= 0) 881 break; 882 queue[k] = e; 883 setIndex(e, k); 884 k = parent; 885 } 886 queue[k] = key; 887 setIndex(key, k); 888 } 889 890 /** 891 * Sift element added at top down to its heap-ordered spot. 892 * Call only when holding lock. 893 */ 894 private void siftDown(int k, RunnableScheduledFuture<?> key) { 895 int half = size >>> 1; 896 while (k < half) { 897 int child = (k << 1) + 1; 898 RunnableScheduledFuture<?> c = queue[child]; 899 int right = child + 1; 900 if (right < size && c.compareTo(queue[right]) > 0) 901 c = queue[child = right]; 902 if (key.compareTo(c) <= 0) 903 break; 904 queue[k] = c; 905 setIndex(c, k); 906 k = child; 907 } 908 queue[k] = key; 909 setIndex(key, k); 910 } 911 912 /** 913 * Resize the heap array. Call only when holding lock. 914 */ 915 private void grow() { 916 int oldCapacity = queue.length; 917 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 918 if (newCapacity < 0) // overflow 919 newCapacity = Integer.MAX_VALUE; 920 queue = Arrays.copyOf(queue, newCapacity); 921 } 922 923 /** 924 * Find index of given object, or -1 if absent 925 */ 926 private int indexOf(Object x) { 927 if (x != null) { 928 if (x instanceof ScheduledFutureTask) { 929 int i = ((ScheduledFutureTask) x).heapIndex; 930 // Sanity check; x could conceivably be a 931 // ScheduledFutureTask from some other pool. 932 if (i >= 0 && i < size && queue[i] == x) 933 return i; 934 } else { 935 for (int i = 0; i < size; i++) 936 if (x.equals(queue[i])) 937 return i; 938 } 939 } 940 return -1; 941 } 942 943 public boolean contains(Object x) { 944 final ReentrantLock lock = this.lock; 945 lock.lock(); 946 try { 947 return indexOf(x) != -1; 948 } finally { 949 lock.unlock(); 950 } 951 } 952 953 public boolean remove(Object x) { 954 final ReentrantLock lock = this.lock; 955 lock.lock(); 956 try { 957 int i = indexOf(x); 958 if (i < 0) 959 return false; 960 961 setIndex(queue[i], -1); 962 int s = --size; 963 RunnableScheduledFuture<?> replacement = queue[s]; 964 queue[s] = null; 965 if (s != i) { 966 siftDown(i, replacement); 967 if (queue[i] == replacement) 968 siftUp(i, replacement); 969 } 970 return true; 971 } finally { 972 lock.unlock(); 973 } 974 } 975 976 public int size() { 977 final ReentrantLock lock = this.lock; 978 lock.lock(); 979 try { 980 return size; 981 } finally { 982 lock.unlock(); 983 } 984 } 985 986 public boolean isEmpty() { 987 return size() == 0; 988 } 989 990 public int remainingCapacity() { 991 return Integer.MAX_VALUE; 992 } 993 994 public RunnableScheduledFuture<?> peek() { 995 final ReentrantLock lock = this.lock; 996 lock.lock(); 997 try { 998 return queue[0]; 999 } finally { 1000 lock.unlock(); 1001 } 1002 } 1003 1004 public boolean offer(Runnable x) { 1005 if (x == null) 1006 throw new NullPointerException(); 1007 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 1008 final ReentrantLock lock = this.lock; 1009 lock.lock(); 1010 try { 1011 int i = size; 1012 if (i >= queue.length) 1013 grow(); 1014 size = i + 1; 1015 if (i == 0) { 1016 queue[0] = e; 1017 setIndex(e, 0); 1018 } else { 1019 siftUp(i, e); 1020 } 1021 if (queue[0] == e) { 1022 leader = null; 1023 available.signal(); 1024 } 1025 } finally { 1026 lock.unlock(); 1027 } 1028 return true; 1029 } 1030 1031 public void put(Runnable e) { 1032 offer(e); 1033 } 1034 1035 public boolean add(Runnable e) { 1036 return offer(e); 1037 } 1038 1039 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1040 return offer(e); 1041 } 1042 1043 /** 1044 * Performs common bookkeeping for poll and take: Replaces 1045 * first element with last and sifts it down. Call only when 1046 * holding lock. 1047 * @param f the task to remove and return 1048 */ 1049 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 1050 int s = --size; 1051 RunnableScheduledFuture<?> x = queue[s]; 1052 queue[s] = null; 1053 if (s != 0) 1054 siftDown(0, x); 1055 setIndex(f, -1); 1056 return f; 1057 } 1058 1059 public RunnableScheduledFuture<?> poll() { 1060 final ReentrantLock lock = this.lock; 1061 lock.lock(); 1062 try { 1063 RunnableScheduledFuture<?> first = queue[0]; 1064 if (first == null || first.getDelay(NANOSECONDS) > 0) 1065 return null; 1066 else 1067 return finishPoll(first); 1068 } finally { 1069 lock.unlock(); 1070 } 1071 } 1072 1073 public RunnableScheduledFuture<?> take() throws InterruptedException { 1074 final ReentrantLock lock = this.lock; 1075 lock.lockInterruptibly(); 1076 try { 1077 for (;;) { 1078 RunnableScheduledFuture<?> first = queue[0]; 1079 if (first == null) 1080 available.await(); 1081 else { 1082 long delay = first.getDelay(NANOSECONDS); 1083 if (delay <= 0) 1084 return finishPoll(first); 1085 else if (leader != null) 1086 available.await(); 1087 else { 1088 Thread thisThread = Thread.currentThread(); 1089 leader = thisThread; 1090 try { 1091 available.awaitNanos(delay); 1092 } finally { 1093 if (leader == thisThread) 1094 leader = null; 1095 } 1096 } 1097 } 1098 } 1099 } finally { 1100 if (leader == null && queue[0] != null) 1101 available.signal(); 1102 lock.unlock(); 1103 } 1104 } 1105 1106 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 1107 throws InterruptedException { 1108 long nanos = unit.toNanos(timeout); 1109 final ReentrantLock lock = this.lock; 1110 lock.lockInterruptibly(); 1111 try { 1112 for (;;) { 1113 RunnableScheduledFuture<?> first = queue[0]; 1114 if (first == null) { 1115 if (nanos <= 0) 1116 return null; 1117 else 1118 nanos = available.awaitNanos(nanos); 1119 } else { 1120 long delay = first.getDelay(NANOSECONDS); 1121 if (delay <= 0) 1122 return finishPoll(first); 1123 if (nanos <= 0) 1124 return null; 1125 if (nanos < delay || leader != null) 1126 nanos = available.awaitNanos(nanos); 1127 else { 1128 Thread thisThread = Thread.currentThread(); 1129 leader = thisThread; 1130 try { 1131 long timeLeft = available.awaitNanos(delay); 1132 nanos -= delay - timeLeft; 1133 } finally { 1134 if (leader == thisThread) 1135 leader = null; 1136 } 1137 } 1138 } 1139 } 1140 } finally { 1141 if (leader == null && queue[0] != null) 1142 available.signal(); 1143 lock.unlock(); 1144 } 1145 } 1146 1147 public void clear() { 1148 final ReentrantLock lock = this.lock; 1149 lock.lock(); 1150 try { 1151 for (int i = 0; i < size; i++) { 1152 RunnableScheduledFuture<?> t = queue[i]; 1153 if (t != null) { 1154 queue[i] = null; 1155 setIndex(t, -1); 1156 } 1157 } 1158 size = 0; 1159 } finally { 1160 lock.unlock(); 1161 } 1162 } 1163 1164 /** 1165 * Return and remove first element only if it is expired. 1166 * Used only by drainTo. Call only when holding lock. 1167 */ 1168 private RunnableScheduledFuture<?> pollExpired() { 1169 // assert lock.isHeldByCurrentThread(); 1170 RunnableScheduledFuture<?> first = queue[0]; 1171 if (first == null || first.getDelay(NANOSECONDS) > 0) 1172 return null; 1173 return finishPoll(first); 1174 } 1175 1176 public int drainTo(Collection<? super Runnable> c) { 1177 if (c == null) 1178 throw new NullPointerException(); 1179 if (c == this) 1180 throw new IllegalArgumentException(); 1181 final ReentrantLock lock = this.lock; 1182 lock.lock(); 1183 try { 1184 RunnableScheduledFuture<?> first; 1185 int n = 0; 1186 while ((first = pollExpired()) != null) { 1187 c.add(first); 1188 ++n; 1189 } 1190 return n; 1191 } finally { 1192 lock.unlock(); 1193 } 1194 } 1195 1196 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1197 if (c == null) 1198 throw new NullPointerException(); 1199 if (c == this) 1200 throw new IllegalArgumentException(); 1201 if (maxElements <= 0) 1202 return 0; 1203 final ReentrantLock lock = this.lock; 1204 lock.lock(); 1205 try { 1206 RunnableScheduledFuture<?> first; 1207 int n = 0; 1208 while (n < maxElements && (first = pollExpired()) != null) { 1209 c.add(first); 1210 ++n; 1211 } 1212 return n; 1213 } finally { 1214 lock.unlock(); 1215 } 1216 } 1217 1218 public Object[] toArray() { 1219 final ReentrantLock lock = this.lock; 1220 lock.lock(); 1221 try { 1222 return Arrays.copyOf(queue, size, Object[].class); 1223 } finally { 1224 lock.unlock(); 1225 } 1226 } 1227 1228 @SuppressWarnings("unchecked") 1229 public <T> T[] toArray(T[] a) { 1230 final ReentrantLock lock = this.lock; 1231 lock.lock(); 1232 try { 1233 if (a.length < size) 1234 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1235 System.arraycopy(queue, 0, a, 0, size); 1236 if (a.length > size) 1237 a[size] = null; 1238 return a; 1239 } finally { 1240 lock.unlock(); 1241 } 1242 } 1243 1244 public Iterator<Runnable> iterator() { 1245 return new Itr(Arrays.copyOf(queue, size)); 1246 } 1247 1248 /** 1249 * Snapshot iterator that works off copy of underlying q array. 1250 */ 1251 private class Itr implements Iterator<Runnable> { 1252 final RunnableScheduledFuture[] array; 1253 int cursor = 0; // index of next element to return 1254 int lastRet = -1; // index of last element, or -1 if no such 1255 1256 Itr(RunnableScheduledFuture[] array) { 1257 this.array = array; 1258 } 1259 1260 public boolean hasNext() { 1261 return cursor < array.length; 1262 } 1263 1264 public Runnable next() { 1265 if (cursor >= array.length) 1266 throw new NoSuchElementException(); 1267 lastRet = cursor; 1268 return array[cursor++]; 1269 } 1270 1271 public void remove() { 1272 if (lastRet < 0) 1273 throw new IllegalStateException(); 1274 DelayedWorkQueue.this.remove(array[lastRet]); 1275 lastRet = -1; 1276 } 1277 } 1278 } 1279 }