1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/licenses/publicdomain 34 */ 35 36 package java.util.concurrent; 37 import java.util.concurrent.atomic.*; 38 import java.util.concurrent.locks.*; 39 import java.util.*; 40 41 /** 42 * A {@link ThreadPoolExecutor} that can additionally schedule 43 * commands to run after a given delay, or to execute 44 * periodically. This class is preferable to {@link java.util.Timer} 45 * when multiple worker threads are needed, or when the additional 46 * flexibility or capabilities of {@link ThreadPoolExecutor} (which 47 * this class extends) are required. 48 * 49 * <p>Delayed tasks execute no sooner than they are enabled, but 50 * without any real-time guarantees about when, after they are 51 * enabled, they will commence. Tasks scheduled for exactly the same 52 * execution time are enabled in first-in-first-out (FIFO) order of 53 * submission. 54 * 55 * <p>When a submitted task is cancelled before it is run, execution 56 * is suppressed. By default, such a cancelled task is not 57 * automatically removed from the work queue until its delay 58 * elapses. While this enables further inspection and monitoring, it 59 * may also cause unbounded retention of cancelled tasks. To avoid 60 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which 61 * causes tasks to be immediately removed from the work queue at 62 * time of cancellation. 63 * 64 * <p>Successive executions of a task scheduled via 65 * <code>scheduleAtFixedRate</code> or 66 * <code>scheduleWithFixedDelay</code> do not overlap. While different 67 * executions may be performed by different threads, the effects of 68 * prior executions <a 69 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 70 * those of subsequent ones. 71 * 72 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 73 * of the inherited tuning methods are not useful for it. In 74 * particular, because it acts as a fixed-sized pool using 75 * {@code corePoolSize} threads and an unbounded queue, adjustments 76 * to {@code maximumPoolSize} have no useful effect. Additionally, it 77 * is almost never a good idea to set {@code corePoolSize} to zero or 78 * use {@code allowCoreThreadTimeOut} because this may leave the pool 79 * without threads to handle tasks once they become eligible to run. 80 * 81 * <p><b>Extension notes:</b> This class overrides the 82 * {@link ThreadPoolExecutor#execute execute} and 83 * {@link AbstractExecutorService#submit(Runnable) submit} 84 * methods to generate internal {@link ScheduledFuture} objects to 85 * control per-task delays and scheduling. To preserve 86 * functionality, any further overrides of these methods in 87 * subclasses must invoke superclass versions, which effectively 88 * disables additional task customization. However, this class 89 * provides alternative protected extension method 90 * {@code decorateTask} (one version each for {@code Runnable} and 91 * {@code Callable}) that can be used to customize the concrete task 92 * types used to execute commands entered via {@code execute}, 93 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 94 * and {@code scheduleWithFixedDelay}. By default, a 95 * {@code ScheduledThreadPoolExecutor} uses a task type extending 96 * {@link FutureTask}. However, this may be modified or replaced using 97 * subclasses of the form: 98 * 99 * <pre> {@code 100 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 101 * 102 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 103 * 104 * protected <V> RunnableScheduledFuture<V> decorateTask( 105 * Runnable r, RunnableScheduledFuture<V> task) { 106 * return new CustomTask<V>(r, task); 107 * } 108 * 109 * protected <V> RunnableScheduledFuture<V> decorateTask( 110 * Callable<V> c, RunnableScheduledFuture<V> task) { 111 * return new CustomTask<V>(c, task); 112 * } 113 * // ... add constructors, etc. 114 * }}</pre> 115 * 116 * @since 1.5 117 * @author Doug Lea 118 */ 119 public class ScheduledThreadPoolExecutor 120 extends ThreadPoolExecutor 121 implements ScheduledExecutorService { 122 123 /* 124 * This class specializes ThreadPoolExecutor implementation by 125 * 126 * 1. Using a custom task type, ScheduledFutureTask for 127 * tasks, even those that don't require scheduling (i.e., 128 * those submitted using ExecutorService execute, not 129 * ScheduledExecutorService methods) which are treated as 130 * delayed tasks with a delay of zero. 131 * 132 * 2. Using a custom queue (DelayedWorkQueue), a variant of 133 * unbounded DelayQueue. The lack of capacity constraint and 134 * the fact that corePoolSize and maximumPoolSize are 135 * effectively identical simplifies some execution mechanics 136 * (see delayedExecute) compared to ThreadPoolExecutor. 137 * 138 * 3. Supporting optional run-after-shutdown parameters, which 139 * leads to overrides of shutdown methods to remove and cancel 140 * tasks that should NOT be run after shutdown, as well as 141 * different recheck logic when task (re)submission overlaps 142 * with a shutdown. 143 * 144 * 4. Task decoration methods to allow interception and 145 * instrumentation, which are needed because subclasses cannot 146 * otherwise override submit methods to get this effect. These 147 * don't have any impact on pool control logic though. 148 */ 149 150 /** 151 * False if should cancel/suppress periodic tasks on shutdown. 152 */ 153 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 154 155 /** 156 * False if should cancel non-periodic tasks on shutdown. 157 */ 158 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 159 160 /** 161 * True if ScheduledFutureTask.cancel should remove from queue 162 */ 163 private volatile boolean removeOnCancel = false; 164 165 /** 166 * Sequence number to break scheduling ties, and in turn to 167 * guarantee FIFO order among tied entries. 168 */ 169 private static final AtomicLong sequencer = new AtomicLong(0); 170 171 /** 172 * Returns current nanosecond time. 173 */ 174 final long now() { 175 return System.nanoTime(); 176 } 177 178 private class ScheduledFutureTask<V> 179 extends FutureTask<V> implements RunnableScheduledFuture<V> { 180 181 /** Sequence number to break ties FIFO */ 182 private final long sequenceNumber; 183 184 /** The time the task is enabled to execute in nanoTime units */ 185 private long time; 186 187 /** 188 * Period in nanoseconds for repeating tasks. A positive 189 * value indicates fixed-rate execution. A negative value 190 * indicates fixed-delay execution. A value of 0 indicates a 191 * non-repeating task. 192 */ 193 private final long period; 194 195 /** The actual task to be re-enqueued by reExecutePeriodic */ 196 RunnableScheduledFuture<V> outerTask = this; 197 198 /** 199 * Index into delay queue, to support faster cancellation. 200 */ 201 int heapIndex; 202 203 /** 204 * Creates a one-shot action with given nanoTime-based trigger time. 205 */ 206 ScheduledFutureTask(Runnable r, V result, long ns) { 207 super(r, result); 208 this.time = ns; 209 this.period = 0; 210 this.sequenceNumber = sequencer.getAndIncrement(); 211 } 212 213 /** 214 * Creates a periodic action with given nano time and period. 215 */ 216 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 217 super(r, result); 218 this.time = ns; 219 this.period = period; 220 this.sequenceNumber = sequencer.getAndIncrement(); 221 } 222 223 /** 224 * Creates a one-shot action with given nanoTime-based trigger. 225 */ 226 ScheduledFutureTask(Callable<V> callable, long ns) { 227 super(callable); 228 this.time = ns; 229 this.period = 0; 230 this.sequenceNumber = sequencer.getAndIncrement(); 231 } 232 233 public long getDelay(TimeUnit unit) { 234 return unit.convert(time - now(), TimeUnit.NANOSECONDS); 235 } 236 237 public int compareTo(Delayed other) { 238 if (other == this) // compare zero ONLY if same object 239 return 0; 240 if (other instanceof ScheduledFutureTask) { 241 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 242 long diff = time - x.time; 243 if (diff < 0) 244 return -1; 245 else if (diff > 0) 246 return 1; 247 else if (sequenceNumber < x.sequenceNumber) 248 return -1; 249 else 250 return 1; 251 } 252 long d = (getDelay(TimeUnit.NANOSECONDS) - 253 other.getDelay(TimeUnit.NANOSECONDS)); 254 return (d == 0) ? 0 : ((d < 0) ? -1 : 1); 255 } 256 257 /** 258 * Returns true if this is a periodic (not a one-shot) action. 259 * 260 * @return true if periodic 261 */ 262 public boolean isPeriodic() { 263 return period != 0; 264 } 265 266 /** 267 * Sets the next time to run for a periodic task. 268 */ 269 private void setNextRunTime() { 270 long p = period; 271 if (p > 0) 272 time += p; 273 else 274 time = triggerTime(-p); 275 } 276 277 public boolean cancel(boolean mayInterruptIfRunning) { 278 boolean cancelled = super.cancel(mayInterruptIfRunning); 279 if (cancelled && removeOnCancel && heapIndex >= 0) 280 remove(this); 281 return cancelled; 282 } 283 284 /** 285 * Overrides FutureTask version so as to reset/requeue if periodic. 286 */ 287 public void run() { 288 boolean periodic = isPeriodic(); 289 if (!canRunInCurrentRunState(periodic)) 290 cancel(false); 291 else if (!periodic) 292 ScheduledFutureTask.super.run(); 293 else if (ScheduledFutureTask.super.runAndReset()) { 294 setNextRunTime(); 295 reExecutePeriodic(outerTask); 296 } 297 } 298 } 299 300 /** 301 * Returns true if can run a task given current run state 302 * and run-after-shutdown parameters. 303 * 304 * @param periodic true if this task periodic, false if delayed 305 */ 306 boolean canRunInCurrentRunState(boolean periodic) { 307 return isRunningOrShutdown(periodic ? 308 continueExistingPeriodicTasksAfterShutdown : 309 executeExistingDelayedTasksAfterShutdown); 310 } 311 312 /** 313 * Main execution method for delayed or periodic tasks. If pool 314 * is shut down, rejects the task. Otherwise adds task to queue 315 * and starts a thread, if necessary, to run it. (We cannot 316 * prestart the thread to run the task because the task (probably) 317 * shouldn't be run yet,) If the pool is shut down while the task 318 * is being added, cancel and remove it if required by state and 319 * run-after-shutdown parameters. 320 * 321 * @param task the task 322 */ 323 private void delayedExecute(RunnableScheduledFuture<?> task) { 324 if (isShutdown()) 325 reject(task); 326 else { 327 super.getQueue().add(task); 328 if (isShutdown() && 329 !canRunInCurrentRunState(task.isPeriodic()) && 330 remove(task)) 331 task.cancel(false); 332 else 333 prestartCoreThread(); 334 } 335 } 336 337 /** 338 * Requeues a periodic task unless current run state precludes it. 339 * Same idea as delayedExecute except drops task rather than rejecting. 340 * 341 * @param task the task 342 */ 343 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 344 if (canRunInCurrentRunState(true)) { 345 super.getQueue().add(task); 346 if (!canRunInCurrentRunState(true) && remove(task)) 347 task.cancel(false); 348 else 349 prestartCoreThread(); 350 } 351 } 352 353 /** 354 * Cancels and clears the queue of all tasks that should not be run 355 * due to shutdown policy. Invoked within super.shutdown. 356 */ 357 @Override void onShutdown() { 358 BlockingQueue<Runnable> q = super.getQueue(); 359 boolean keepDelayed = 360 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 361 boolean keepPeriodic = 362 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 363 if (!keepDelayed && !keepPeriodic) 364 q.clear(); 365 else { 366 // Traverse snapshot to avoid iterator exceptions 367 for (Object e : q.toArray()) { 368 if (e instanceof RunnableScheduledFuture) { 369 RunnableScheduledFuture<?> t = 370 (RunnableScheduledFuture<?>)e; 371 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || 372 t.isCancelled()) { // also remove if already cancelled 373 if (q.remove(t)) 374 t.cancel(false); 375 } 376 } 377 } 378 } 379 tryTerminate(); 380 } 381 382 /** 383 * Modifies or replaces the task used to execute a runnable. 384 * This method can be used to override the concrete 385 * class used for managing internal tasks. 386 * The default implementation simply returns the given task. 387 * 388 * @param runnable the submitted Runnable 389 * @param task the task created to execute the runnable 390 * @return a task that can execute the runnable 391 * @since 1.6 392 */ 393 protected <V> RunnableScheduledFuture<V> decorateTask( 394 Runnable runnable, RunnableScheduledFuture<V> task) { 395 return task; 396 } 397 398 /** 399 * Modifies or replaces the task used to execute a callable. 400 * This method can be used to override the concrete 401 * class used for managing internal tasks. 402 * The default implementation simply returns the given task. 403 * 404 * @param callable the submitted Callable 405 * @param task the task created to execute the callable 406 * @return a task that can execute the callable 407 * @since 1.6 408 */ 409 protected <V> RunnableScheduledFuture<V> decorateTask( 410 Callable<V> callable, RunnableScheduledFuture<V> task) { 411 return task; 412 } 413 414 /** 415 * Creates a new {@code ScheduledThreadPoolExecutor} with the 416 * given core pool size. 417 * 418 * @param corePoolSize the number of threads to keep in the pool, even 419 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 420 * @throws IllegalArgumentException if {@code corePoolSize < 0} 421 */ 422 public ScheduledThreadPoolExecutor(int corePoolSize) { 423 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 424 new DelayedWorkQueue()); 425 } 426 427 /** 428 * Creates a new {@code ScheduledThreadPoolExecutor} with the 429 * given initial parameters. 430 * 431 * @param corePoolSize the number of threads to keep in the pool, even 432 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 433 * @param threadFactory the factory to use when the executor 434 * creates a new thread 435 * @throws IllegalArgumentException if {@code corePoolSize < 0} 436 * @throws NullPointerException if {@code threadFactory} is null 437 */ 438 public ScheduledThreadPoolExecutor(int corePoolSize, 439 ThreadFactory threadFactory) { 440 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 441 new DelayedWorkQueue(), threadFactory); 442 } 443 444 /** 445 * Creates a new ScheduledThreadPoolExecutor with the given 446 * initial parameters. 447 * 448 * @param corePoolSize the number of threads to keep in the pool, even 449 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 450 * @param handler the handler to use when execution is blocked 451 * because the thread bounds and queue capacities are reached 452 * @throws IllegalArgumentException if {@code corePoolSize < 0} 453 * @throws NullPointerException if {@code handler} is null 454 */ 455 public ScheduledThreadPoolExecutor(int corePoolSize, 456 RejectedExecutionHandler handler) { 457 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 458 new DelayedWorkQueue(), handler); 459 } 460 461 /** 462 * Creates a new ScheduledThreadPoolExecutor with the given 463 * initial parameters. 464 * 465 * @param corePoolSize the number of threads to keep in the pool, even 466 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 467 * @param threadFactory the factory to use when the executor 468 * creates a new thread 469 * @param handler the handler to use when execution is blocked 470 * because the thread bounds and queue capacities are reached 471 * @throws IllegalArgumentException if {@code corePoolSize < 0} 472 * @throws NullPointerException if {@code threadFactory} or 473 * {@code handler} is null 474 */ 475 public ScheduledThreadPoolExecutor(int corePoolSize, 476 ThreadFactory threadFactory, 477 RejectedExecutionHandler handler) { 478 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, 479 new DelayedWorkQueue(), threadFactory, handler); 480 } 481 482 /** 483 * Returns the trigger time of a delayed action. 484 */ 485 private long triggerTime(long delay, TimeUnit unit) { 486 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 487 } 488 489 /** 490 * Returns the trigger time of a delayed action. 491 */ 492 long triggerTime(long delay) { 493 return now() + 494 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 495 } 496 497 /** 498 * Constrains the values of all delays in the queue to be within 499 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 500 * This may occur if a task is eligible to be dequeued, but has 501 * not yet been, while some other task is added with a delay of 502 * Long.MAX_VALUE. 503 */ 504 private long overflowFree(long delay) { 505 Delayed head = (Delayed) super.getQueue().peek(); 506 if (head != null) { 507 long headDelay = head.getDelay(TimeUnit.NANOSECONDS); 508 if (headDelay < 0 && (delay - headDelay < 0)) 509 delay = Long.MAX_VALUE + headDelay; 510 } 511 return delay; 512 } 513 514 /** 515 * @throws RejectedExecutionException {@inheritDoc} 516 * @throws NullPointerException {@inheritDoc} 517 */ 518 public ScheduledFuture<?> schedule(Runnable command, 519 long delay, 520 TimeUnit unit) { 521 if (command == null || unit == null) 522 throw new NullPointerException(); 523 RunnableScheduledFuture<?> t = decorateTask(command, 524 new ScheduledFutureTask<Void>(command, null, 525 triggerTime(delay, unit))); 526 delayedExecute(t); 527 return t; 528 } 529 530 /** 531 * @throws RejectedExecutionException {@inheritDoc} 532 * @throws NullPointerException {@inheritDoc} 533 */ 534 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 535 long delay, 536 TimeUnit unit) { 537 if (callable == null || unit == null) 538 throw new NullPointerException(); 539 RunnableScheduledFuture<V> t = decorateTask(callable, 540 new ScheduledFutureTask<V>(callable, 541 triggerTime(delay, unit))); 542 delayedExecute(t); 543 return t; 544 } 545 546 /** 547 * @throws RejectedExecutionException {@inheritDoc} 548 * @throws NullPointerException {@inheritDoc} 549 * @throws IllegalArgumentException {@inheritDoc} 550 */ 551 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 552 long initialDelay, 553 long period, 554 TimeUnit unit) { 555 if (command == null || unit == null) 556 throw new NullPointerException(); 557 if (period <= 0) 558 throw new IllegalArgumentException(); 559 ScheduledFutureTask<Void> sft = 560 new ScheduledFutureTask<Void>(command, 561 null, 562 triggerTime(initialDelay, unit), 563 unit.toNanos(period)); 564 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 565 sft.outerTask = t; 566 delayedExecute(t); 567 return t; 568 } 569 570 /** 571 * @throws RejectedExecutionException {@inheritDoc} 572 * @throws NullPointerException {@inheritDoc} 573 * @throws IllegalArgumentException {@inheritDoc} 574 */ 575 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 576 long initialDelay, 577 long delay, 578 TimeUnit unit) { 579 if (command == null || unit == null) 580 throw new NullPointerException(); 581 if (delay <= 0) 582 throw new IllegalArgumentException(); 583 ScheduledFutureTask<Void> sft = 584 new ScheduledFutureTask<Void>(command, 585 null, 586 triggerTime(initialDelay, unit), 587 unit.toNanos(-delay)); 588 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 589 sft.outerTask = t; 590 delayedExecute(t); 591 return t; 592 } 593 594 /** 595 * Executes {@code command} with zero required delay. 596 * This has effect equivalent to 597 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 598 * Note that inspections of the queue and of the list returned by 599 * {@code shutdownNow} will access the zero-delayed 600 * {@link ScheduledFuture}, not the {@code command} itself. 601 * 602 * <p>A consequence of the use of {@code ScheduledFuture} objects is 603 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 604 * called with a null second {@code Throwable} argument, even if the 605 * {@code command} terminated abruptly. Instead, the {@code Throwable} 606 * thrown by such a task can be obtained via {@link Future#get}. 607 * 608 * @throws RejectedExecutionException at discretion of 609 * {@code RejectedExecutionHandler}, if the task 610 * cannot be accepted for execution because the 611 * executor has been shut down 612 * @throws NullPointerException {@inheritDoc} 613 */ 614 public void execute(Runnable command) { 615 schedule(command, 0, TimeUnit.NANOSECONDS); 616 } 617 618 // Override AbstractExecutorService methods 619 620 /** 621 * @throws RejectedExecutionException {@inheritDoc} 622 * @throws NullPointerException {@inheritDoc} 623 */ 624 public Future<?> submit(Runnable task) { 625 return schedule(task, 0, TimeUnit.NANOSECONDS); 626 } 627 628 /** 629 * @throws RejectedExecutionException {@inheritDoc} 630 * @throws NullPointerException {@inheritDoc} 631 */ 632 public <T> Future<T> submit(Runnable task, T result) { 633 return schedule(Executors.callable(task, result), 634 0, TimeUnit.NANOSECONDS); 635 } 636 637 /** 638 * @throws RejectedExecutionException {@inheritDoc} 639 * @throws NullPointerException {@inheritDoc} 640 */ 641 public <T> Future<T> submit(Callable<T> task) { 642 return schedule(task, 0, TimeUnit.NANOSECONDS); 643 } 644 645 /** 646 * Sets the policy on whether to continue executing existing 647 * periodic tasks even when this executor has been {@code shutdown}. 648 * In this case, these tasks will only terminate upon 649 * {@code shutdownNow} or after setting the policy to 650 * {@code false} when already shutdown. 651 * This value is by default {@code false}. 652 * 653 * @param value if {@code true}, continue after shutdown, else don't. 654 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 655 */ 656 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 657 continueExistingPeriodicTasksAfterShutdown = value; 658 if (!value && isShutdown()) 659 onShutdown(); 660 } 661 662 /** 663 * Gets the policy on whether to continue executing existing 664 * periodic tasks even when this executor has been {@code shutdown}. 665 * In this case, these tasks will only terminate upon 666 * {@code shutdownNow} or after setting the policy to 667 * {@code false} when already shutdown. 668 * This value is by default {@code false}. 669 * 670 * @return {@code true} if will continue after shutdown 671 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 672 */ 673 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 674 return continueExistingPeriodicTasksAfterShutdown; 675 } 676 677 /** 678 * Sets the policy on whether to execute existing delayed 679 * tasks even when this executor has been {@code shutdown}. 680 * In this case, these tasks will only terminate upon 681 * {@code shutdownNow}, or after setting the policy to 682 * {@code false} when already shutdown. 683 * This value is by default {@code true}. 684 * 685 * @param value if {@code true}, execute after shutdown, else don't. 686 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 687 */ 688 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 689 executeExistingDelayedTasksAfterShutdown = value; 690 if (!value && isShutdown()) 691 onShutdown(); 692 } 693 694 /** 695 * Gets the policy on whether to execute existing delayed 696 * tasks even when this executor has been {@code shutdown}. 697 * In this case, these tasks will only terminate upon 698 * {@code shutdownNow}, or after setting the policy to 699 * {@code false} when already shutdown. 700 * This value is by default {@code true}. 701 * 702 * @return {@code true} if will execute after shutdown 703 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 704 */ 705 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 706 return executeExistingDelayedTasksAfterShutdown; 707 } 708 709 /** 710 * Sets the policy on whether cancelled tasks should be immediately 711 * removed from the work queue at time of cancellation. This value is 712 * by default {@code false}. 713 * 714 * @param value if {@code true}, remove on cancellation, else don't 715 * @see #getRemoveOnCancelPolicy 716 * @since 1.7 717 */ 718 public void setRemoveOnCancelPolicy(boolean value) { 719 removeOnCancel = value; 720 } 721 722 /** 723 * Gets the policy on whether cancelled tasks should be immediately 724 * removed from the work queue at time of cancellation. This value is 725 * by default {@code false}. 726 * 727 * @return {@code true} if cancelled tasks are immediately removed 728 * from the queue 729 * @see #setRemoveOnCancelPolicy 730 * @since 1.7 731 */ 732 public boolean getRemoveOnCancelPolicy() { 733 return removeOnCancel; 734 } 735 736 /** 737 * Initiates an orderly shutdown in which previously submitted 738 * tasks are executed, but no new tasks will be accepted. 739 * Invocation has no additional effect if already shut down. 740 * 741 * <p>This method does not wait for previously submitted tasks to 742 * complete execution. Use {@link #awaitTermination awaitTermination} 743 * to do that. 744 * 745 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 746 * has been set {@code false}, existing delayed tasks whose delays 747 * have not yet elapsed are cancelled. And unless the {@code 748 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 749 * {@code true}, future executions of existing periodic tasks will 750 * be cancelled. 751 * 752 * @throws SecurityException {@inheritDoc} 753 */ 754 public void shutdown() { 755 super.shutdown(); 756 } 757 758 /** 759 * Attempts to stop all actively executing tasks, halts the 760 * processing of waiting tasks, and returns a list of the tasks 761 * that were awaiting execution. 762 * 763 * <p>This method does not wait for actively executing tasks to 764 * terminate. Use {@link #awaitTermination awaitTermination} to 765 * do that. 766 * 767 * <p>There are no guarantees beyond best-effort attempts to stop 768 * processing actively executing tasks. This implementation 769 * cancels tasks via {@link Thread#interrupt}, so any task that 770 * fails to respond to interrupts may never terminate. 771 * 772 * @return list of tasks that never commenced execution. 773 * Each element of this list is a {@link ScheduledFuture}, 774 * including those tasks submitted using {@code execute}, 775 * which are for scheduling purposes used as the basis of a 776 * zero-delay {@code ScheduledFuture}. 777 * @throws SecurityException {@inheritDoc} 778 */ 779 public List<Runnable> shutdownNow() { 780 return super.shutdownNow(); 781 } 782 783 /** 784 * Returns the task queue used by this executor. Each element of 785 * this queue is a {@link ScheduledFuture}, including those 786 * tasks submitted using {@code execute} which are for scheduling 787 * purposes used as the basis of a zero-delay 788 * {@code ScheduledFuture}. Iteration over this queue is 789 * <em>not</em> guaranteed to traverse tasks in the order in 790 * which they will execute. 791 * 792 * @return the task queue 793 */ 794 public BlockingQueue<Runnable> getQueue() { 795 return super.getQueue(); 796 } 797 798 /** 799 * Specialized delay queue. To mesh with TPE declarations, this 800 * class must be declared as a BlockingQueue<Runnable> even though 801 * it can only hold RunnableScheduledFutures. 802 */ 803 static class DelayedWorkQueue extends AbstractQueue<Runnable> 804 implements BlockingQueue<Runnable> { 805 806 /* 807 * A DelayedWorkQueue is based on a heap-based data structure 808 * like those in DelayQueue and PriorityQueue, except that 809 * every ScheduledFutureTask also records its index into the 810 * heap array. This eliminates the need to find a task upon 811 * cancellation, greatly speeding up removal (down from O(n) 812 * to O(log n)), and reducing garbage retention that would 813 * otherwise occur by waiting for the element to rise to top 814 * before clearing. But because the queue may also hold 815 * RunnableScheduledFutures that are not ScheduledFutureTasks, 816 * we are not guaranteed to have such indices available, in 817 * which case we fall back to linear search. (We expect that 818 * most tasks will not be decorated, and that the faster cases 819 * will be much more common.) 820 * 821 * All heap operations must record index changes -- mainly 822 * within siftUp and siftDown. Upon removal, a task's 823 * heapIndex is set to -1. Note that ScheduledFutureTasks can 824 * appear at most once in the queue (this need not be true for 825 * other kinds of tasks or work queues), so are uniquely 826 * identified by heapIndex. 827 */ 828 829 private static final int INITIAL_CAPACITY = 16; 830 private RunnableScheduledFuture[] queue = 831 new RunnableScheduledFuture[INITIAL_CAPACITY]; 832 private final ReentrantLock lock = new ReentrantLock(); 833 private int size = 0; 834 835 /** 836 * Thread designated to wait for the task at the head of the 837 * queue. This variant of the Leader-Follower pattern 838 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 839 * minimize unnecessary timed waiting. When a thread becomes 840 * the leader, it waits only for the next delay to elapse, but 841 * other threads await indefinitely. The leader thread must 842 * signal some other thread before returning from take() or 843 * poll(...), unless some other thread becomes leader in the 844 * interim. Whenever the head of the queue is replaced with a 845 * task with an earlier expiration time, the leader field is 846 * invalidated by being reset to null, and some waiting 847 * thread, but not necessarily the current leader, is 848 * signalled. So waiting threads must be prepared to acquire 849 * and lose leadership while waiting. 850 */ 851 private Thread leader = null; 852 853 /** 854 * Condition signalled when a newer task becomes available at the 855 * head of the queue or a new thread may need to become leader. 856 */ 857 private final Condition available = lock.newCondition(); 858 859 /** 860 * Set f's heapIndex if it is a ScheduledFutureTask. 861 */ 862 private void setIndex(RunnableScheduledFuture f, int idx) { 863 if (f instanceof ScheduledFutureTask) 864 ((ScheduledFutureTask)f).heapIndex = idx; 865 } 866 867 /** 868 * Sift element added at bottom up to its heap-ordered spot. 869 * Call only when holding lock. 870 */ 871 private void siftUp(int k, RunnableScheduledFuture key) { 872 while (k > 0) { 873 int parent = (k - 1) >>> 1; 874 RunnableScheduledFuture e = queue[parent]; 875 if (key.compareTo(e) >= 0) 876 break; 877 queue[k] = e; 878 setIndex(e, k); 879 k = parent; 880 } 881 queue[k] = key; 882 setIndex(key, k); 883 } 884 885 /** 886 * Sift element added at top down to its heap-ordered spot. 887 * Call only when holding lock. 888 */ 889 private void siftDown(int k, RunnableScheduledFuture key) { 890 int half = size >>> 1; 891 while (k < half) { 892 int child = (k << 1) + 1; 893 RunnableScheduledFuture c = queue[child]; 894 int right = child + 1; 895 if (right < size && c.compareTo(queue[right]) > 0) 896 c = queue[child = right]; 897 if (key.compareTo(c) <= 0) 898 break; 899 queue[k] = c; 900 setIndex(c, k); 901 k = child; 902 } 903 queue[k] = key; 904 setIndex(key, k); 905 } 906 907 /** 908 * Resize the heap array. Call only when holding lock. 909 */ 910 private void grow() { 911 int oldCapacity = queue.length; 912 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 913 if (newCapacity < 0) // overflow 914 newCapacity = Integer.MAX_VALUE; 915 queue = Arrays.copyOf(queue, newCapacity); 916 } 917 918 /** 919 * Find index of given object, or -1 if absent 920 */ 921 private int indexOf(Object x) { 922 if (x != null) { 923 if (x instanceof ScheduledFutureTask) { 924 int i = ((ScheduledFutureTask) x).heapIndex; 925 // Sanity check; x could conceivably be a 926 // ScheduledFutureTask from some other pool. 927 if (i >= 0 && i < size && queue[i] == x) 928 return i; 929 } else { 930 for (int i = 0; i < size; i++) 931 if (x.equals(queue[i])) 932 return i; 933 } 934 } 935 return -1; 936 } 937 938 public boolean contains(Object x) { 939 final ReentrantLock lock = this.lock; 940 lock.lock(); 941 try { 942 return indexOf(x) != -1; 943 } finally { 944 lock.unlock(); 945 } 946 } 947 948 public boolean remove(Object x) { 949 final ReentrantLock lock = this.lock; 950 lock.lock(); 951 try { 952 int i = indexOf(x); 953 if (i < 0) 954 return false; 955 956 setIndex(queue[i], -1); 957 int s = --size; 958 RunnableScheduledFuture replacement = queue[s]; 959 queue[s] = null; 960 if (s != i) { 961 siftDown(i, replacement); 962 if (queue[i] == replacement) 963 siftUp(i, replacement); 964 } 965 return true; 966 } finally { 967 lock.unlock(); 968 } 969 } 970 971 public int size() { 972 final ReentrantLock lock = this.lock; 973 lock.lock(); 974 try { 975 return size; 976 } finally { 977 lock.unlock(); 978 } 979 } 980 981 public boolean isEmpty() { 982 return size() == 0; 983 } 984 985 public int remainingCapacity() { 986 return Integer.MAX_VALUE; 987 } 988 989 public RunnableScheduledFuture peek() { 990 final ReentrantLock lock = this.lock; 991 lock.lock(); 992 try { 993 return queue[0]; 994 } finally { 995 lock.unlock(); 996 } 997 } 998 999 public boolean offer(Runnable x) { 1000 if (x == null) 1001 throw new NullPointerException(); 1002 RunnableScheduledFuture e = (RunnableScheduledFuture)x; 1003 final ReentrantLock lock = this.lock; 1004 lock.lock(); 1005 try { 1006 int i = size; 1007 if (i >= queue.length) 1008 grow(); 1009 size = i + 1; 1010 if (i == 0) { 1011 queue[0] = e; 1012 setIndex(e, 0); 1013 } else { 1014 siftUp(i, e); 1015 } 1016 if (queue[0] == e) { 1017 leader = null; 1018 available.signal(); 1019 } 1020 } finally { 1021 lock.unlock(); 1022 } 1023 return true; 1024 } 1025 1026 public void put(Runnable e) { 1027 offer(e); 1028 } 1029 1030 public boolean add(Runnable e) { 1031 return offer(e); 1032 } 1033 1034 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1035 return offer(e); 1036 } 1037 1038 /** 1039 * Performs common bookkeeping for poll and take: Replaces 1040 * first element with last and sifts it down. Call only when 1041 * holding lock. 1042 * @param f the task to remove and return 1043 */ 1044 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { 1045 int s = --size; 1046 RunnableScheduledFuture x = queue[s]; 1047 queue[s] = null; 1048 if (s != 0) 1049 siftDown(0, x); 1050 setIndex(f, -1); 1051 return f; 1052 } 1053 1054 public RunnableScheduledFuture poll() { 1055 final ReentrantLock lock = this.lock; 1056 lock.lock(); 1057 try { 1058 RunnableScheduledFuture first = queue[0]; 1059 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 1060 return null; 1061 else 1062 return finishPoll(first); 1063 } finally { 1064 lock.unlock(); 1065 } 1066 } 1067 1068 public RunnableScheduledFuture take() throws InterruptedException { 1069 final ReentrantLock lock = this.lock; 1070 lock.lockInterruptibly(); 1071 try { 1072 for (;;) { 1073 RunnableScheduledFuture first = queue[0]; 1074 if (first == null) 1075 available.await(); 1076 else { 1077 long delay = first.getDelay(TimeUnit.NANOSECONDS); 1078 if (delay <= 0) 1079 return finishPoll(first); 1080 else if (leader != null) 1081 available.await(); 1082 else { 1083 Thread thisThread = Thread.currentThread(); 1084 leader = thisThread; 1085 try { 1086 available.awaitNanos(delay); 1087 } finally { 1088 if (leader == thisThread) 1089 leader = null; 1090 } 1091 } 1092 } 1093 } 1094 } finally { 1095 if (leader == null && queue[0] != null) 1096 available.signal(); 1097 lock.unlock(); 1098 } 1099 } 1100 1101 public RunnableScheduledFuture poll(long timeout, TimeUnit unit) 1102 throws InterruptedException { 1103 long nanos = unit.toNanos(timeout); 1104 final ReentrantLock lock = this.lock; 1105 lock.lockInterruptibly(); 1106 try { 1107 for (;;) { 1108 RunnableScheduledFuture first = queue[0]; 1109 if (first == null) { 1110 if (nanos <= 0) 1111 return null; 1112 else 1113 nanos = available.awaitNanos(nanos); 1114 } else { 1115 long delay = first.getDelay(TimeUnit.NANOSECONDS); 1116 if (delay <= 0) 1117 return finishPoll(first); 1118 if (nanos <= 0) 1119 return null; 1120 if (nanos < delay || leader != null) 1121 nanos = available.awaitNanos(nanos); 1122 else { 1123 Thread thisThread = Thread.currentThread(); 1124 leader = thisThread; 1125 try { 1126 long timeLeft = available.awaitNanos(delay); 1127 nanos -= delay - timeLeft; 1128 } finally { 1129 if (leader == thisThread) 1130 leader = null; 1131 } 1132 } 1133 } 1134 } 1135 } finally { 1136 if (leader == null && queue[0] != null) 1137 available.signal(); 1138 lock.unlock(); 1139 } 1140 } 1141 1142 public void clear() { 1143 final ReentrantLock lock = this.lock; 1144 lock.lock(); 1145 try { 1146 for (int i = 0; i < size; i++) { 1147 RunnableScheduledFuture t = queue[i]; 1148 if (t != null) { 1149 queue[i] = null; 1150 setIndex(t, -1); 1151 } 1152 } 1153 size = 0; 1154 } finally { 1155 lock.unlock(); 1156 } 1157 } 1158 1159 /** 1160 * Return and remove first element only if it is expired. 1161 * Used only by drainTo. Call only when holding lock. 1162 */ 1163 private RunnableScheduledFuture pollExpired() { 1164 RunnableScheduledFuture first = queue[0]; 1165 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) 1166 return null; 1167 return finishPoll(first); 1168 } 1169 1170 public int drainTo(Collection<? super Runnable> c) { 1171 if (c == null) 1172 throw new NullPointerException(); 1173 if (c == this) 1174 throw new IllegalArgumentException(); 1175 final ReentrantLock lock = this.lock; 1176 lock.lock(); 1177 try { 1178 RunnableScheduledFuture first; 1179 int n = 0; 1180 while ((first = pollExpired()) != null) { 1181 c.add(first); 1182 ++n; 1183 } 1184 return n; 1185 } finally { 1186 lock.unlock(); 1187 } 1188 } 1189 1190 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1191 if (c == null) 1192 throw new NullPointerException(); 1193 if (c == this) 1194 throw new IllegalArgumentException(); 1195 if (maxElements <= 0) 1196 return 0; 1197 final ReentrantLock lock = this.lock; 1198 lock.lock(); 1199 try { 1200 RunnableScheduledFuture first; 1201 int n = 0; 1202 while (n < maxElements && (first = pollExpired()) != null) { 1203 c.add(first); 1204 ++n; 1205 } 1206 return n; 1207 } finally { 1208 lock.unlock(); 1209 } 1210 } 1211 1212 public Object[] toArray() { 1213 final ReentrantLock lock = this.lock; 1214 lock.lock(); 1215 try { 1216 return Arrays.copyOf(queue, size, Object[].class); 1217 } finally { 1218 lock.unlock(); 1219 } 1220 } 1221 1222 @SuppressWarnings("unchecked") 1223 public <T> T[] toArray(T[] a) { 1224 final ReentrantLock lock = this.lock; 1225 lock.lock(); 1226 try { 1227 if (a.length < size) 1228 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1229 System.arraycopy(queue, 0, a, 0, size); 1230 if (a.length > size) 1231 a[size] = null; 1232 return a; 1233 } finally { 1234 lock.unlock(); 1235 } 1236 } 1237 1238 public Iterator<Runnable> iterator() { 1239 return new Itr(Arrays.copyOf(queue, size)); 1240 } 1241 1242 /** 1243 * Snapshot iterator that works off copy of underlying q array. 1244 */ 1245 private class Itr implements Iterator<Runnable> { 1246 final RunnableScheduledFuture[] array; 1247 int cursor = 0; // index of next element to return 1248 int lastRet = -1; // index of last element, or -1 if no such 1249 1250 Itr(RunnableScheduledFuture[] array) { 1251 this.array = array; 1252 } 1253 1254 public boolean hasNext() { 1255 return cursor < array.length; 1256 } 1257 1258 public Runnable next() { 1259 if (cursor >= array.length) 1260 throw new NoSuchElementException(); 1261 lastRet = cursor; 1262 return array[cursor++]; 1263 } 1264 1265 public void remove() { 1266 if (lastRet < 0) 1267 throw new IllegalStateException(); 1268 DelayedWorkQueue.this.remove(array[lastRet]); 1269 lastRet = -1; 1270 } 1271 } 1272 } 1273 } --- EOF ---