Print this page
Split |
Close |
Expand all |
Collapse all |
--- old/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java
+++ new/src/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java
1 1 /*
2 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 3 *
4 4 * This code is free software; you can redistribute it and/or modify it
5 5 * under the terms of the GNU General Public License version 2 only, as
6 6 * published by the Free Software Foundation. Oracle designates this
7 7 * particular file as subject to the "Classpath" exception as provided
8 8 * by Oracle in the LICENSE file that accompanied this code.
9 9 *
10 10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 13 * version 2 for more details (a copy is included in the LICENSE file that
14 14 * accompanied this code).
15 15 *
16 16 * You should have received a copy of the GNU General Public License version
17 17 * 2 along with this work; if not, write to the Free Software Foundation,
18 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 19 *
20 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 21 * or visit www.oracle.com if you need additional information or have any
22 22 * questions.
23 23 */
24 24
25 25 /*
26 26 * This file is available under and governed by the GNU General Public
27 27 * License version 2 only, as published by the Free Software Foundation.
28 28 * However, the following notice accompanied the original version of this
29 29 * file:
30 30 *
31 31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 32 * Expert Group and released to the public domain, as explained at
33 33 * http://creativecommons.org/licenses/publicdomain
34 34 */
35 35
36 36 package java.util.concurrent;
37 37 import java.util.concurrent.atomic.*;
38 38 import java.util.concurrent.locks.*;
39 39 import java.util.*;
40 40
41 41 /**
42 42 * A {@link ThreadPoolExecutor} that can additionally schedule
43 43 * commands to run after a given delay, or to execute
44 44 * periodically. This class is preferable to {@link java.util.Timer}
45 45 * when multiple worker threads are needed, or when the additional
46 46 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
47 47 * this class extends) are required.
48 48 *
49 49 * <p>Delayed tasks execute no sooner than they are enabled, but
50 50 * without any real-time guarantees about when, after they are
51 51 * enabled, they will commence. Tasks scheduled for exactly the same
52 52 * execution time are enabled in first-in-first-out (FIFO) order of
53 53 * submission.
54 54 *
↓ open down ↓ |
54 lines elided |
↑ open up ↑ |
55 55 * <p>When a submitted task is cancelled before it is run, execution
56 56 * is suppressed. By default, such a cancelled task is not
57 57 * automatically removed from the work queue until its delay
58 58 * elapses. While this enables further inspection and monitoring, it
59 59 * may also cause unbounded retention of cancelled tasks. To avoid
60 60 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
61 61 * causes tasks to be immediately removed from the work queue at
62 62 * time of cancellation.
63 63 *
64 64 * <p>Successive executions of a task scheduled via
65 - * <code>scheduleAtFixedRate</code> or
66 - * <code>scheduleWithFixedDelay</code> do not overlap. While different
65 + * {@code scheduleAtFixedRate} or
66 + * {@code scheduleWithFixedDelay} do not overlap. While different
67 67 * executions may be performed by different threads, the effects of
68 68 * prior executions <a
69 69 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
70 70 * those of subsequent ones.
71 71 *
72 72 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
73 73 * of the inherited tuning methods are not useful for it. In
74 74 * particular, because it acts as a fixed-sized pool using
75 75 * {@code corePoolSize} threads and an unbounded queue, adjustments
76 76 * to {@code maximumPoolSize} have no useful effect. Additionally, it
77 77 * is almost never a good idea to set {@code corePoolSize} to zero or
78 78 * use {@code allowCoreThreadTimeOut} because this may leave the pool
79 79 * without threads to handle tasks once they become eligible to run.
80 80 *
81 81 * <p><b>Extension notes:</b> This class overrides the
82 82 * {@link ThreadPoolExecutor#execute execute} and
83 83 * {@link AbstractExecutorService#submit(Runnable) submit}
84 84 * methods to generate internal {@link ScheduledFuture} objects to
85 85 * control per-task delays and scheduling. To preserve
86 86 * functionality, any further overrides of these methods in
87 87 * subclasses must invoke superclass versions, which effectively
88 88 * disables additional task customization. However, this class
89 89 * provides alternative protected extension method
90 90 * {@code decorateTask} (one version each for {@code Runnable} and
91 91 * {@code Callable}) that can be used to customize the concrete task
92 92 * types used to execute commands entered via {@code execute},
93 93 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
94 94 * and {@code scheduleWithFixedDelay}. By default, a
95 95 * {@code ScheduledThreadPoolExecutor} uses a task type extending
96 96 * {@link FutureTask}. However, this may be modified or replaced using
97 97 * subclasses of the form:
98 98 *
99 99 * <pre> {@code
100 100 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
101 101 *
102 102 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
103 103 *
104 104 * protected <V> RunnableScheduledFuture<V> decorateTask(
105 105 * Runnable r, RunnableScheduledFuture<V> task) {
106 106 * return new CustomTask<V>(r, task);
107 107 * }
108 108 *
109 109 * protected <V> RunnableScheduledFuture<V> decorateTask(
110 110 * Callable<V> c, RunnableScheduledFuture<V> task) {
111 111 * return new CustomTask<V>(c, task);
112 112 * }
113 113 * // ... add constructors, etc.
114 114 * }}</pre>
115 115 *
116 116 * @since 1.5
117 117 * @author Doug Lea
118 118 */
119 119 public class ScheduledThreadPoolExecutor
120 120 extends ThreadPoolExecutor
121 121 implements ScheduledExecutorService {
122 122
123 123 /*
124 124 * This class specializes ThreadPoolExecutor implementation by
125 125 *
126 126 * 1. Using a custom task type, ScheduledFutureTask for
127 127 * tasks, even those that don't require scheduling (i.e.,
128 128 * those submitted using ExecutorService execute, not
129 129 * ScheduledExecutorService methods) which are treated as
130 130 * delayed tasks with a delay of zero.
131 131 *
132 132 * 2. Using a custom queue (DelayedWorkQueue), a variant of
133 133 * unbounded DelayQueue. The lack of capacity constraint and
134 134 * the fact that corePoolSize and maximumPoolSize are
135 135 * effectively identical simplifies some execution mechanics
136 136 * (see delayedExecute) compared to ThreadPoolExecutor.
137 137 *
138 138 * 3. Supporting optional run-after-shutdown parameters, which
139 139 * leads to overrides of shutdown methods to remove and cancel
140 140 * tasks that should NOT be run after shutdown, as well as
141 141 * different recheck logic when task (re)submission overlaps
142 142 * with a shutdown.
143 143 *
144 144 * 4. Task decoration methods to allow interception and
145 145 * instrumentation, which are needed because subclasses cannot
146 146 * otherwise override submit methods to get this effect. These
147 147 * don't have any impact on pool control logic though.
148 148 */
149 149
150 150 /**
151 151 * False if should cancel/suppress periodic tasks on shutdown.
152 152 */
153 153 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
154 154
155 155 /**
156 156 * False if should cancel non-periodic tasks on shutdown.
157 157 */
158 158 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
159 159
160 160 /**
161 161 * True if ScheduledFutureTask.cancel should remove from queue
162 162 */
163 163 private volatile boolean removeOnCancel = false;
164 164
165 165 /**
166 166 * Sequence number to break scheduling ties, and in turn to
167 167 * guarantee FIFO order among tied entries.
168 168 */
169 169 private static final AtomicLong sequencer = new AtomicLong(0);
170 170
171 171 /**
172 172 * Returns current nanosecond time.
173 173 */
174 174 final long now() {
175 175 return System.nanoTime();
176 176 }
177 177
178 178 private class ScheduledFutureTask<V>
179 179 extends FutureTask<V> implements RunnableScheduledFuture<V> {
180 180
181 181 /** Sequence number to break ties FIFO */
182 182 private final long sequenceNumber;
183 183
184 184 /** The time the task is enabled to execute in nanoTime units */
185 185 private long time;
186 186
187 187 /**
188 188 * Period in nanoseconds for repeating tasks. A positive
189 189 * value indicates fixed-rate execution. A negative value
190 190 * indicates fixed-delay execution. A value of 0 indicates a
191 191 * non-repeating task.
192 192 */
193 193 private final long period;
194 194
195 195 /** The actual task to be re-enqueued by reExecutePeriodic */
196 196 RunnableScheduledFuture<V> outerTask = this;
197 197
198 198 /**
199 199 * Index into delay queue, to support faster cancellation.
200 200 */
201 201 int heapIndex;
202 202
203 203 /**
204 204 * Creates a one-shot action with given nanoTime-based trigger time.
205 205 */
206 206 ScheduledFutureTask(Runnable r, V result, long ns) {
207 207 super(r, result);
208 208 this.time = ns;
209 209 this.period = 0;
210 210 this.sequenceNumber = sequencer.getAndIncrement();
211 211 }
212 212
213 213 /**
214 214 * Creates a periodic action with given nano time and period.
215 215 */
216 216 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
217 217 super(r, result);
218 218 this.time = ns;
219 219 this.period = period;
220 220 this.sequenceNumber = sequencer.getAndIncrement();
221 221 }
222 222
223 223 /**
224 224 * Creates a one-shot action with given nanoTime-based trigger.
225 225 */
226 226 ScheduledFutureTask(Callable<V> callable, long ns) {
227 227 super(callable);
228 228 this.time = ns;
229 229 this.period = 0;
230 230 this.sequenceNumber = sequencer.getAndIncrement();
231 231 }
232 232
233 233 public long getDelay(TimeUnit unit) {
234 234 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
235 235 }
236 236
237 237 public int compareTo(Delayed other) {
238 238 if (other == this) // compare zero ONLY if same object
239 239 return 0;
240 240 if (other instanceof ScheduledFutureTask) {
241 241 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
242 242 long diff = time - x.time;
243 243 if (diff < 0)
244 244 return -1;
245 245 else if (diff > 0)
246 246 return 1;
247 247 else if (sequenceNumber < x.sequenceNumber)
248 248 return -1;
249 249 else
250 250 return 1;
251 251 }
252 252 long d = (getDelay(TimeUnit.NANOSECONDS) -
253 253 other.getDelay(TimeUnit.NANOSECONDS));
254 254 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
255 255 }
256 256
257 257 /**
258 258 * Returns true if this is a periodic (not a one-shot) action.
259 259 *
260 260 * @return true if periodic
261 261 */
262 262 public boolean isPeriodic() {
263 263 return period != 0;
264 264 }
265 265
266 266 /**
267 267 * Sets the next time to run for a periodic task.
268 268 */
269 269 private void setNextRunTime() {
270 270 long p = period;
271 271 if (p > 0)
272 272 time += p;
273 273 else
274 274 time = triggerTime(-p);
275 275 }
276 276
277 277 public boolean cancel(boolean mayInterruptIfRunning) {
278 278 boolean cancelled = super.cancel(mayInterruptIfRunning);
279 279 if (cancelled && removeOnCancel && heapIndex >= 0)
280 280 remove(this);
281 281 return cancelled;
282 282 }
283 283
284 284 /**
285 285 * Overrides FutureTask version so as to reset/requeue if periodic.
286 286 */
287 287 public void run() {
288 288 boolean periodic = isPeriodic();
289 289 if (!canRunInCurrentRunState(periodic))
290 290 cancel(false);
291 291 else if (!periodic)
292 292 ScheduledFutureTask.super.run();
293 293 else if (ScheduledFutureTask.super.runAndReset()) {
294 294 setNextRunTime();
295 295 reExecutePeriodic(outerTask);
296 296 }
297 297 }
298 298 }
299 299
300 300 /**
301 301 * Returns true if can run a task given current run state
302 302 * and run-after-shutdown parameters.
303 303 *
304 304 * @param periodic true if this task periodic, false if delayed
305 305 */
306 306 boolean canRunInCurrentRunState(boolean periodic) {
307 307 return isRunningOrShutdown(periodic ?
308 308 continueExistingPeriodicTasksAfterShutdown :
309 309 executeExistingDelayedTasksAfterShutdown);
310 310 }
311 311
312 312 /**
313 313 * Main execution method for delayed or periodic tasks. If pool
314 314 * is shut down, rejects the task. Otherwise adds task to queue
315 315 * and starts a thread, if necessary, to run it. (We cannot
316 316 * prestart the thread to run the task because the task (probably)
317 317 * shouldn't be run yet,) If the pool is shut down while the task
318 318 * is being added, cancel and remove it if required by state and
319 319 * run-after-shutdown parameters.
320 320 *
321 321 * @param task the task
322 322 */
323 323 private void delayedExecute(RunnableScheduledFuture<?> task) {
324 324 if (isShutdown())
325 325 reject(task);
326 326 else {
327 327 super.getQueue().add(task);
328 328 if (isShutdown() &&
329 329 !canRunInCurrentRunState(task.isPeriodic()) &&
330 330 remove(task))
331 331 task.cancel(false);
332 332 else
333 333 prestartCoreThread();
334 334 }
335 335 }
336 336
337 337 /**
338 338 * Requeues a periodic task unless current run state precludes it.
339 339 * Same idea as delayedExecute except drops task rather than rejecting.
340 340 *
341 341 * @param task the task
342 342 */
343 343 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
344 344 if (canRunInCurrentRunState(true)) {
345 345 super.getQueue().add(task);
346 346 if (!canRunInCurrentRunState(true) && remove(task))
347 347 task.cancel(false);
348 348 else
349 349 prestartCoreThread();
350 350 }
351 351 }
352 352
↓ open down ↓ |
276 lines elided |
↑ open up ↑ |
353 353 /**
354 354 * Cancels and clears the queue of all tasks that should not be run
355 355 * due to shutdown policy. Invoked within super.shutdown.
356 356 */
357 357 @Override void onShutdown() {
358 358 BlockingQueue<Runnable> q = super.getQueue();
359 359 boolean keepDelayed =
360 360 getExecuteExistingDelayedTasksAfterShutdownPolicy();
361 361 boolean keepPeriodic =
362 362 getContinueExistingPeriodicTasksAfterShutdownPolicy();
363 - if (!keepDelayed && !keepPeriodic)
363 + if (!keepDelayed && !keepPeriodic) {
364 + for (Object e : q.toArray())
365 + if (e instanceof RunnableScheduledFuture<?>)
366 + ((RunnableScheduledFuture<?>) e).cancel(false);
364 367 q.clear();
368 + }
365 369 else {
366 370 // Traverse snapshot to avoid iterator exceptions
367 371 for (Object e : q.toArray()) {
368 372 if (e instanceof RunnableScheduledFuture) {
369 373 RunnableScheduledFuture<?> t =
370 374 (RunnableScheduledFuture<?>)e;
371 375 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
372 376 t.isCancelled()) { // also remove if already cancelled
373 377 if (q.remove(t))
374 378 t.cancel(false);
375 379 }
376 380 }
377 381 }
378 382 }
379 383 tryTerminate();
380 384 }
381 385
382 386 /**
383 387 * Modifies or replaces the task used to execute a runnable.
384 388 * This method can be used to override the concrete
385 389 * class used for managing internal tasks.
386 390 * The default implementation simply returns the given task.
387 391 *
388 392 * @param runnable the submitted Runnable
389 393 * @param task the task created to execute the runnable
390 394 * @return a task that can execute the runnable
391 395 * @since 1.6
392 396 */
393 397 protected <V> RunnableScheduledFuture<V> decorateTask(
394 398 Runnable runnable, RunnableScheduledFuture<V> task) {
395 399 return task;
396 400 }
397 401
398 402 /**
399 403 * Modifies or replaces the task used to execute a callable.
400 404 * This method can be used to override the concrete
401 405 * class used for managing internal tasks.
402 406 * The default implementation simply returns the given task.
403 407 *
404 408 * @param callable the submitted Callable
405 409 * @param task the task created to execute the callable
406 410 * @return a task that can execute the callable
407 411 * @since 1.6
408 412 */
409 413 protected <V> RunnableScheduledFuture<V> decorateTask(
410 414 Callable<V> callable, RunnableScheduledFuture<V> task) {
411 415 return task;
412 416 }
413 417
414 418 /**
415 419 * Creates a new {@code ScheduledThreadPoolExecutor} with the
416 420 * given core pool size.
417 421 *
418 422 * @param corePoolSize the number of threads to keep in the pool, even
419 423 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
420 424 * @throws IllegalArgumentException if {@code corePoolSize < 0}
421 425 */
422 426 public ScheduledThreadPoolExecutor(int corePoolSize) {
423 427 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
424 428 new DelayedWorkQueue());
425 429 }
426 430
427 431 /**
428 432 * Creates a new {@code ScheduledThreadPoolExecutor} with the
↓ open down ↓ |
54 lines elided |
↑ open up ↑ |
429 433 * given initial parameters.
430 434 *
431 435 * @param corePoolSize the number of threads to keep in the pool, even
432 436 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
433 437 * @param threadFactory the factory to use when the executor
434 438 * creates a new thread
435 439 * @throws IllegalArgumentException if {@code corePoolSize < 0}
436 440 * @throws NullPointerException if {@code threadFactory} is null
437 441 */
438 442 public ScheduledThreadPoolExecutor(int corePoolSize,
439 - ThreadFactory threadFactory) {
443 + ThreadFactory threadFactory) {
440 444 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
441 445 new DelayedWorkQueue(), threadFactory);
442 446 }
443 447
444 448 /**
445 449 * Creates a new ScheduledThreadPoolExecutor with the given
446 450 * initial parameters.
447 451 *
448 452 * @param corePoolSize the number of threads to keep in the pool, even
449 453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
450 454 * @param handler the handler to use when execution is blocked
451 455 * because the thread bounds and queue capacities are reached
452 456 * @throws IllegalArgumentException if {@code corePoolSize < 0}
453 457 * @throws NullPointerException if {@code handler} is null
454 458 */
455 459 public ScheduledThreadPoolExecutor(int corePoolSize,
456 - RejectedExecutionHandler handler) {
460 + RejectedExecutionHandler handler) {
457 461 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
458 462 new DelayedWorkQueue(), handler);
459 463 }
460 464
461 465 /**
462 466 * Creates a new ScheduledThreadPoolExecutor with the given
463 467 * initial parameters.
464 468 *
465 469 * @param corePoolSize the number of threads to keep in the pool, even
466 470 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
467 471 * @param threadFactory the factory to use when the executor
468 472 * creates a new thread
469 473 * @param handler the handler to use when execution is blocked
470 474 * because the thread bounds and queue capacities are reached
471 475 * @throws IllegalArgumentException if {@code corePoolSize < 0}
472 476 * @throws NullPointerException if {@code threadFactory} or
473 477 * {@code handler} is null
474 478 */
475 479 public ScheduledThreadPoolExecutor(int corePoolSize,
476 - ThreadFactory threadFactory,
477 - RejectedExecutionHandler handler) {
480 + ThreadFactory threadFactory,
481 + RejectedExecutionHandler handler) {
478 482 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
479 483 new DelayedWorkQueue(), threadFactory, handler);
480 484 }
481 485
482 486 /**
483 487 * Returns the trigger time of a delayed action.
484 488 */
485 489 private long triggerTime(long delay, TimeUnit unit) {
486 490 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
487 491 }
488 492
489 493 /**
490 494 * Returns the trigger time of a delayed action.
491 495 */
492 496 long triggerTime(long delay) {
493 497 return now() +
494 498 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
495 499 }
496 500
497 501 /**
498 502 * Constrains the values of all delays in the queue to be within
499 503 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
500 504 * This may occur if a task is eligible to be dequeued, but has
501 505 * not yet been, while some other task is added with a delay of
502 506 * Long.MAX_VALUE.
503 507 */
504 508 private long overflowFree(long delay) {
505 509 Delayed head = (Delayed) super.getQueue().peek();
506 510 if (head != null) {
507 511 long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
508 512 if (headDelay < 0 && (delay - headDelay < 0))
509 513 delay = Long.MAX_VALUE + headDelay;
510 514 }
511 515 return delay;
512 516 }
513 517
514 518 /**
515 519 * @throws RejectedExecutionException {@inheritDoc}
516 520 * @throws NullPointerException {@inheritDoc}
517 521 */
518 522 public ScheduledFuture<?> schedule(Runnable command,
519 523 long delay,
520 524 TimeUnit unit) {
521 525 if (command == null || unit == null)
522 526 throw new NullPointerException();
523 527 RunnableScheduledFuture<?> t = decorateTask(command,
524 528 new ScheduledFutureTask<Void>(command, null,
525 529 triggerTime(delay, unit)));
526 530 delayedExecute(t);
527 531 return t;
528 532 }
529 533
530 534 /**
531 535 * @throws RejectedExecutionException {@inheritDoc}
532 536 * @throws NullPointerException {@inheritDoc}
533 537 */
534 538 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
535 539 long delay,
536 540 TimeUnit unit) {
537 541 if (callable == null || unit == null)
538 542 throw new NullPointerException();
539 543 RunnableScheduledFuture<V> t = decorateTask(callable,
540 544 new ScheduledFutureTask<V>(callable,
541 545 triggerTime(delay, unit)));
542 546 delayedExecute(t);
543 547 return t;
544 548 }
545 549
546 550 /**
547 551 * @throws RejectedExecutionException {@inheritDoc}
548 552 * @throws NullPointerException {@inheritDoc}
549 553 * @throws IllegalArgumentException {@inheritDoc}
550 554 */
551 555 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
552 556 long initialDelay,
553 557 long period,
554 558 TimeUnit unit) {
555 559 if (command == null || unit == null)
556 560 throw new NullPointerException();
557 561 if (period <= 0)
558 562 throw new IllegalArgumentException();
559 563 ScheduledFutureTask<Void> sft =
560 564 new ScheduledFutureTask<Void>(command,
561 565 null,
562 566 triggerTime(initialDelay, unit),
563 567 unit.toNanos(period));
564 568 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
565 569 sft.outerTask = t;
566 570 delayedExecute(t);
567 571 return t;
568 572 }
569 573
570 574 /**
571 575 * @throws RejectedExecutionException {@inheritDoc}
572 576 * @throws NullPointerException {@inheritDoc}
573 577 * @throws IllegalArgumentException {@inheritDoc}
574 578 */
575 579 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
576 580 long initialDelay,
577 581 long delay,
578 582 TimeUnit unit) {
579 583 if (command == null || unit == null)
580 584 throw new NullPointerException();
581 585 if (delay <= 0)
582 586 throw new IllegalArgumentException();
583 587 ScheduledFutureTask<Void> sft =
584 588 new ScheduledFutureTask<Void>(command,
585 589 null,
586 590 triggerTime(initialDelay, unit),
587 591 unit.toNanos(-delay));
588 592 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
589 593 sft.outerTask = t;
590 594 delayedExecute(t);
591 595 return t;
592 596 }
593 597
594 598 /**
595 599 * Executes {@code command} with zero required delay.
596 600 * This has effect equivalent to
597 601 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
598 602 * Note that inspections of the queue and of the list returned by
599 603 * {@code shutdownNow} will access the zero-delayed
600 604 * {@link ScheduledFuture}, not the {@code command} itself.
601 605 *
602 606 * <p>A consequence of the use of {@code ScheduledFuture} objects is
603 607 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
604 608 * called with a null second {@code Throwable} argument, even if the
605 609 * {@code command} terminated abruptly. Instead, the {@code Throwable}
606 610 * thrown by such a task can be obtained via {@link Future#get}.
607 611 *
608 612 * @throws RejectedExecutionException at discretion of
609 613 * {@code RejectedExecutionHandler}, if the task
610 614 * cannot be accepted for execution because the
611 615 * executor has been shut down
612 616 * @throws NullPointerException {@inheritDoc}
613 617 */
614 618 public void execute(Runnable command) {
615 619 schedule(command, 0, TimeUnit.NANOSECONDS);
616 620 }
617 621
618 622 // Override AbstractExecutorService methods
619 623
620 624 /**
621 625 * @throws RejectedExecutionException {@inheritDoc}
622 626 * @throws NullPointerException {@inheritDoc}
623 627 */
624 628 public Future<?> submit(Runnable task) {
625 629 return schedule(task, 0, TimeUnit.NANOSECONDS);
626 630 }
627 631
628 632 /**
629 633 * @throws RejectedExecutionException {@inheritDoc}
630 634 * @throws NullPointerException {@inheritDoc}
631 635 */
632 636 public <T> Future<T> submit(Runnable task, T result) {
633 637 return schedule(Executors.callable(task, result),
634 638 0, TimeUnit.NANOSECONDS);
635 639 }
636 640
637 641 /**
638 642 * @throws RejectedExecutionException {@inheritDoc}
639 643 * @throws NullPointerException {@inheritDoc}
640 644 */
641 645 public <T> Future<T> submit(Callable<T> task) {
642 646 return schedule(task, 0, TimeUnit.NANOSECONDS);
643 647 }
644 648
645 649 /**
646 650 * Sets the policy on whether to continue executing existing
647 651 * periodic tasks even when this executor has been {@code shutdown}.
648 652 * In this case, these tasks will only terminate upon
649 653 * {@code shutdownNow} or after setting the policy to
650 654 * {@code false} when already shutdown.
651 655 * This value is by default {@code false}.
652 656 *
653 657 * @param value if {@code true}, continue after shutdown, else don't.
654 658 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
655 659 */
656 660 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
657 661 continueExistingPeriodicTasksAfterShutdown = value;
658 662 if (!value && isShutdown())
659 663 onShutdown();
660 664 }
661 665
662 666 /**
663 667 * Gets the policy on whether to continue executing existing
664 668 * periodic tasks even when this executor has been {@code shutdown}.
665 669 * In this case, these tasks will only terminate upon
666 670 * {@code shutdownNow} or after setting the policy to
667 671 * {@code false} when already shutdown.
668 672 * This value is by default {@code false}.
669 673 *
670 674 * @return {@code true} if will continue after shutdown
671 675 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
672 676 */
673 677 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
674 678 return continueExistingPeriodicTasksAfterShutdown;
675 679 }
676 680
677 681 /**
678 682 * Sets the policy on whether to execute existing delayed
679 683 * tasks even when this executor has been {@code shutdown}.
680 684 * In this case, these tasks will only terminate upon
681 685 * {@code shutdownNow}, or after setting the policy to
682 686 * {@code false} when already shutdown.
683 687 * This value is by default {@code true}.
684 688 *
685 689 * @param value if {@code true}, execute after shutdown, else don't.
686 690 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
687 691 */
688 692 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
689 693 executeExistingDelayedTasksAfterShutdown = value;
690 694 if (!value && isShutdown())
691 695 onShutdown();
692 696 }
693 697
694 698 /**
695 699 * Gets the policy on whether to execute existing delayed
696 700 * tasks even when this executor has been {@code shutdown}.
697 701 * In this case, these tasks will only terminate upon
698 702 * {@code shutdownNow}, or after setting the policy to
699 703 * {@code false} when already shutdown.
700 704 * This value is by default {@code true}.
701 705 *
702 706 * @return {@code true} if will execute after shutdown
703 707 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
704 708 */
705 709 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
706 710 return executeExistingDelayedTasksAfterShutdown;
707 711 }
708 712
709 713 /**
710 714 * Sets the policy on whether cancelled tasks should be immediately
711 715 * removed from the work queue at time of cancellation. This value is
712 716 * by default {@code false}.
713 717 *
714 718 * @param value if {@code true}, remove on cancellation, else don't
715 719 * @see #getRemoveOnCancelPolicy
716 720 * @since 1.7
717 721 */
718 722 public void setRemoveOnCancelPolicy(boolean value) {
719 723 removeOnCancel = value;
720 724 }
721 725
722 726 /**
723 727 * Gets the policy on whether cancelled tasks should be immediately
724 728 * removed from the work queue at time of cancellation. This value is
725 729 * by default {@code false}.
726 730 *
727 731 * @return {@code true} if cancelled tasks are immediately removed
728 732 * from the queue
729 733 * @see #setRemoveOnCancelPolicy
730 734 * @since 1.7
731 735 */
732 736 public boolean getRemoveOnCancelPolicy() {
733 737 return removeOnCancel;
734 738 }
735 739
736 740 /**
737 741 * Initiates an orderly shutdown in which previously submitted
738 742 * tasks are executed, but no new tasks will be accepted.
739 743 * Invocation has no additional effect if already shut down.
740 744 *
741 745 * <p>This method does not wait for previously submitted tasks to
742 746 * complete execution. Use {@link #awaitTermination awaitTermination}
743 747 * to do that.
744 748 *
745 749 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
746 750 * has been set {@code false}, existing delayed tasks whose delays
747 751 * have not yet elapsed are cancelled. And unless the {@code
748 752 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
749 753 * {@code true}, future executions of existing periodic tasks will
750 754 * be cancelled.
751 755 *
752 756 * @throws SecurityException {@inheritDoc}
753 757 */
754 758 public void shutdown() {
755 759 super.shutdown();
756 760 }
757 761
758 762 /**
759 763 * Attempts to stop all actively executing tasks, halts the
760 764 * processing of waiting tasks, and returns a list of the tasks
761 765 * that were awaiting execution.
762 766 *
763 767 * <p>This method does not wait for actively executing tasks to
764 768 * terminate. Use {@link #awaitTermination awaitTermination} to
765 769 * do that.
766 770 *
767 771 * <p>There are no guarantees beyond best-effort attempts to stop
768 772 * processing actively executing tasks. This implementation
769 773 * cancels tasks via {@link Thread#interrupt}, so any task that
770 774 * fails to respond to interrupts may never terminate.
771 775 *
772 776 * @return list of tasks that never commenced execution.
773 777 * Each element of this list is a {@link ScheduledFuture},
774 778 * including those tasks submitted using {@code execute},
775 779 * which are for scheduling purposes used as the basis of a
776 780 * zero-delay {@code ScheduledFuture}.
777 781 * @throws SecurityException {@inheritDoc}
778 782 */
779 783 public List<Runnable> shutdownNow() {
780 784 return super.shutdownNow();
781 785 }
782 786
783 787 /**
784 788 * Returns the task queue used by this executor. Each element of
785 789 * this queue is a {@link ScheduledFuture}, including those
786 790 * tasks submitted using {@code execute} which are for scheduling
787 791 * purposes used as the basis of a zero-delay
788 792 * {@code ScheduledFuture}. Iteration over this queue is
789 793 * <em>not</em> guaranteed to traverse tasks in the order in
790 794 * which they will execute.
791 795 *
792 796 * @return the task queue
793 797 */
794 798 public BlockingQueue<Runnable> getQueue() {
795 799 return super.getQueue();
796 800 }
797 801
798 802 /**
799 803 * Specialized delay queue. To mesh with TPE declarations, this
800 804 * class must be declared as a BlockingQueue<Runnable> even though
801 805 * it can only hold RunnableScheduledFutures.
802 806 */
803 807 static class DelayedWorkQueue extends AbstractQueue<Runnable>
804 808 implements BlockingQueue<Runnable> {
805 809
806 810 /*
807 811 * A DelayedWorkQueue is based on a heap-based data structure
808 812 * like those in DelayQueue and PriorityQueue, except that
809 813 * every ScheduledFutureTask also records its index into the
810 814 * heap array. This eliminates the need to find a task upon
811 815 * cancellation, greatly speeding up removal (down from O(n)
812 816 * to O(log n)), and reducing garbage retention that would
813 817 * otherwise occur by waiting for the element to rise to top
814 818 * before clearing. But because the queue may also hold
815 819 * RunnableScheduledFutures that are not ScheduledFutureTasks,
816 820 * we are not guaranteed to have such indices available, in
817 821 * which case we fall back to linear search. (We expect that
818 822 * most tasks will not be decorated, and that the faster cases
819 823 * will be much more common.)
820 824 *
821 825 * All heap operations must record index changes -- mainly
822 826 * within siftUp and siftDown. Upon removal, a task's
823 827 * heapIndex is set to -1. Note that ScheduledFutureTasks can
824 828 * appear at most once in the queue (this need not be true for
825 829 * other kinds of tasks or work queues), so are uniquely
826 830 * identified by heapIndex.
827 831 */
828 832
829 833 private static final int INITIAL_CAPACITY = 16;
830 834 private RunnableScheduledFuture[] queue =
831 835 new RunnableScheduledFuture[INITIAL_CAPACITY];
832 836 private final ReentrantLock lock = new ReentrantLock();
833 837 private int size = 0;
834 838
835 839 /**
836 840 * Thread designated to wait for the task at the head of the
837 841 * queue. This variant of the Leader-Follower pattern
838 842 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
839 843 * minimize unnecessary timed waiting. When a thread becomes
840 844 * the leader, it waits only for the next delay to elapse, but
841 845 * other threads await indefinitely. The leader thread must
842 846 * signal some other thread before returning from take() or
843 847 * poll(...), unless some other thread becomes leader in the
844 848 * interim. Whenever the head of the queue is replaced with a
845 849 * task with an earlier expiration time, the leader field is
846 850 * invalidated by being reset to null, and some waiting
847 851 * thread, but not necessarily the current leader, is
848 852 * signalled. So waiting threads must be prepared to acquire
849 853 * and lose leadership while waiting.
850 854 */
851 855 private Thread leader = null;
852 856
853 857 /**
854 858 * Condition signalled when a newer task becomes available at the
855 859 * head of the queue or a new thread may need to become leader.
856 860 */
857 861 private final Condition available = lock.newCondition();
858 862
859 863 /**
860 864 * Set f's heapIndex if it is a ScheduledFutureTask.
861 865 */
862 866 private void setIndex(RunnableScheduledFuture f, int idx) {
863 867 if (f instanceof ScheduledFutureTask)
864 868 ((ScheduledFutureTask)f).heapIndex = idx;
865 869 }
866 870
867 871 /**
868 872 * Sift element added at bottom up to its heap-ordered spot.
869 873 * Call only when holding lock.
870 874 */
871 875 private void siftUp(int k, RunnableScheduledFuture key) {
872 876 while (k > 0) {
873 877 int parent = (k - 1) >>> 1;
874 878 RunnableScheduledFuture e = queue[parent];
875 879 if (key.compareTo(e) >= 0)
876 880 break;
877 881 queue[k] = e;
878 882 setIndex(e, k);
879 883 k = parent;
880 884 }
881 885 queue[k] = key;
882 886 setIndex(key, k);
883 887 }
884 888
885 889 /**
886 890 * Sift element added at top down to its heap-ordered spot.
887 891 * Call only when holding lock.
888 892 */
889 893 private void siftDown(int k, RunnableScheduledFuture key) {
890 894 int half = size >>> 1;
891 895 while (k < half) {
892 896 int child = (k << 1) + 1;
893 897 RunnableScheduledFuture c = queue[child];
894 898 int right = child + 1;
895 899 if (right < size && c.compareTo(queue[right]) > 0)
896 900 c = queue[child = right];
897 901 if (key.compareTo(c) <= 0)
898 902 break;
899 903 queue[k] = c;
900 904 setIndex(c, k);
901 905 k = child;
902 906 }
903 907 queue[k] = key;
904 908 setIndex(key, k);
905 909 }
906 910
907 911 /**
908 912 * Resize the heap array. Call only when holding lock.
909 913 */
910 914 private void grow() {
911 915 int oldCapacity = queue.length;
912 916 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
913 917 if (newCapacity < 0) // overflow
914 918 newCapacity = Integer.MAX_VALUE;
915 919 queue = Arrays.copyOf(queue, newCapacity);
916 920 }
917 921
918 922 /**
919 923 * Find index of given object, or -1 if absent
920 924 */
921 925 private int indexOf(Object x) {
922 926 if (x != null) {
923 927 if (x instanceof ScheduledFutureTask) {
924 928 int i = ((ScheduledFutureTask) x).heapIndex;
925 929 // Sanity check; x could conceivably be a
926 930 // ScheduledFutureTask from some other pool.
927 931 if (i >= 0 && i < size && queue[i] == x)
928 932 return i;
929 933 } else {
930 934 for (int i = 0; i < size; i++)
931 935 if (x.equals(queue[i]))
932 936 return i;
933 937 }
934 938 }
935 939 return -1;
936 940 }
937 941
938 942 public boolean contains(Object x) {
939 943 final ReentrantLock lock = this.lock;
940 944 lock.lock();
941 945 try {
942 946 return indexOf(x) != -1;
943 947 } finally {
944 948 lock.unlock();
945 949 }
946 950 }
947 951
948 952 public boolean remove(Object x) {
949 953 final ReentrantLock lock = this.lock;
950 954 lock.lock();
951 955 try {
952 956 int i = indexOf(x);
953 957 if (i < 0)
954 958 return false;
955 959
956 960 setIndex(queue[i], -1);
957 961 int s = --size;
958 962 RunnableScheduledFuture replacement = queue[s];
959 963 queue[s] = null;
960 964 if (s != i) {
961 965 siftDown(i, replacement);
962 966 if (queue[i] == replacement)
963 967 siftUp(i, replacement);
964 968 }
965 969 return true;
966 970 } finally {
967 971 lock.unlock();
968 972 }
969 973 }
970 974
971 975 public int size() {
972 976 final ReentrantLock lock = this.lock;
973 977 lock.lock();
974 978 try {
975 979 return size;
976 980 } finally {
977 981 lock.unlock();
978 982 }
979 983 }
980 984
981 985 public boolean isEmpty() {
982 986 return size() == 0;
983 987 }
984 988
985 989 public int remainingCapacity() {
986 990 return Integer.MAX_VALUE;
987 991 }
988 992
989 993 public RunnableScheduledFuture peek() {
990 994 final ReentrantLock lock = this.lock;
991 995 lock.lock();
992 996 try {
993 997 return queue[0];
994 998 } finally {
995 999 lock.unlock();
996 1000 }
997 1001 }
998 1002
999 1003 public boolean offer(Runnable x) {
1000 1004 if (x == null)
1001 1005 throw new NullPointerException();
1002 1006 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
1003 1007 final ReentrantLock lock = this.lock;
1004 1008 lock.lock();
1005 1009 try {
1006 1010 int i = size;
1007 1011 if (i >= queue.length)
1008 1012 grow();
1009 1013 size = i + 1;
1010 1014 if (i == 0) {
1011 1015 queue[0] = e;
1012 1016 setIndex(e, 0);
1013 1017 } else {
1014 1018 siftUp(i, e);
1015 1019 }
1016 1020 if (queue[0] == e) {
1017 1021 leader = null;
1018 1022 available.signal();
1019 1023 }
1020 1024 } finally {
1021 1025 lock.unlock();
1022 1026 }
1023 1027 return true;
1024 1028 }
1025 1029
1026 1030 public void put(Runnable e) {
1027 1031 offer(e);
1028 1032 }
1029 1033
1030 1034 public boolean add(Runnable e) {
1031 1035 return offer(e);
1032 1036 }
1033 1037
1034 1038 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1035 1039 return offer(e);
1036 1040 }
1037 1041
1038 1042 /**
1039 1043 * Performs common bookkeeping for poll and take: Replaces
1040 1044 * first element with last and sifts it down. Call only when
1041 1045 * holding lock.
1042 1046 * @param f the task to remove and return
1043 1047 */
1044 1048 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1045 1049 int s = --size;
1046 1050 RunnableScheduledFuture x = queue[s];
1047 1051 queue[s] = null;
1048 1052 if (s != 0)
1049 1053 siftDown(0, x);
1050 1054 setIndex(f, -1);
1051 1055 return f;
1052 1056 }
1053 1057
1054 1058 public RunnableScheduledFuture poll() {
1055 1059 final ReentrantLock lock = this.lock;
1056 1060 lock.lock();
1057 1061 try {
1058 1062 RunnableScheduledFuture first = queue[0];
1059 1063 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1060 1064 return null;
1061 1065 else
1062 1066 return finishPoll(first);
1063 1067 } finally {
1064 1068 lock.unlock();
1065 1069 }
1066 1070 }
1067 1071
1068 1072 public RunnableScheduledFuture take() throws InterruptedException {
1069 1073 final ReentrantLock lock = this.lock;
1070 1074 lock.lockInterruptibly();
1071 1075 try {
1072 1076 for (;;) {
1073 1077 RunnableScheduledFuture first = queue[0];
1074 1078 if (first == null)
1075 1079 available.await();
1076 1080 else {
1077 1081 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1078 1082 if (delay <= 0)
1079 1083 return finishPoll(first);
1080 1084 else if (leader != null)
1081 1085 available.await();
1082 1086 else {
1083 1087 Thread thisThread = Thread.currentThread();
1084 1088 leader = thisThread;
1085 1089 try {
1086 1090 available.awaitNanos(delay);
1087 1091 } finally {
1088 1092 if (leader == thisThread)
1089 1093 leader = null;
1090 1094 }
1091 1095 }
1092 1096 }
1093 1097 }
1094 1098 } finally {
1095 1099 if (leader == null && queue[0] != null)
1096 1100 available.signal();
1097 1101 lock.unlock();
1098 1102 }
1099 1103 }
1100 1104
1101 1105 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1102 1106 throws InterruptedException {
1103 1107 long nanos = unit.toNanos(timeout);
1104 1108 final ReentrantLock lock = this.lock;
1105 1109 lock.lockInterruptibly();
1106 1110 try {
1107 1111 for (;;) {
1108 1112 RunnableScheduledFuture first = queue[0];
1109 1113 if (first == null) {
1110 1114 if (nanos <= 0)
1111 1115 return null;
1112 1116 else
1113 1117 nanos = available.awaitNanos(nanos);
1114 1118 } else {
1115 1119 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1116 1120 if (delay <= 0)
1117 1121 return finishPoll(first);
1118 1122 if (nanos <= 0)
1119 1123 return null;
1120 1124 if (nanos < delay || leader != null)
1121 1125 nanos = available.awaitNanos(nanos);
1122 1126 else {
1123 1127 Thread thisThread = Thread.currentThread();
1124 1128 leader = thisThread;
1125 1129 try {
1126 1130 long timeLeft = available.awaitNanos(delay);
1127 1131 nanos -= delay - timeLeft;
1128 1132 } finally {
1129 1133 if (leader == thisThread)
1130 1134 leader = null;
1131 1135 }
1132 1136 }
1133 1137 }
1134 1138 }
1135 1139 } finally {
1136 1140 if (leader == null && queue[0] != null)
1137 1141 available.signal();
1138 1142 lock.unlock();
1139 1143 }
1140 1144 }
1141 1145
1142 1146 public void clear() {
1143 1147 final ReentrantLock lock = this.lock;
1144 1148 lock.lock();
1145 1149 try {
1146 1150 for (int i = 0; i < size; i++) {
1147 1151 RunnableScheduledFuture t = queue[i];
1148 1152 if (t != null) {
1149 1153 queue[i] = null;
1150 1154 setIndex(t, -1);
1151 1155 }
1152 1156 }
1153 1157 size = 0;
1154 1158 } finally {
1155 1159 lock.unlock();
1156 1160 }
1157 1161 }
1158 1162
1159 1163 /**
1160 1164 * Return and remove first element only if it is expired.
1161 1165 * Used only by drainTo. Call only when holding lock.
1162 1166 */
1163 1167 private RunnableScheduledFuture pollExpired() {
1164 1168 RunnableScheduledFuture first = queue[0];
1165 1169 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1166 1170 return null;
1167 1171 return finishPoll(first);
1168 1172 }
1169 1173
1170 1174 public int drainTo(Collection<? super Runnable> c) {
1171 1175 if (c == null)
1172 1176 throw new NullPointerException();
1173 1177 if (c == this)
1174 1178 throw new IllegalArgumentException();
1175 1179 final ReentrantLock lock = this.lock;
1176 1180 lock.lock();
1177 1181 try {
1178 1182 RunnableScheduledFuture first;
1179 1183 int n = 0;
1180 1184 while ((first = pollExpired()) != null) {
1181 1185 c.add(first);
1182 1186 ++n;
1183 1187 }
1184 1188 return n;
1185 1189 } finally {
1186 1190 lock.unlock();
1187 1191 }
1188 1192 }
1189 1193
1190 1194 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1191 1195 if (c == null)
1192 1196 throw new NullPointerException();
1193 1197 if (c == this)
1194 1198 throw new IllegalArgumentException();
1195 1199 if (maxElements <= 0)
1196 1200 return 0;
1197 1201 final ReentrantLock lock = this.lock;
1198 1202 lock.lock();
1199 1203 try {
1200 1204 RunnableScheduledFuture first;
1201 1205 int n = 0;
1202 1206 while (n < maxElements && (first = pollExpired()) != null) {
1203 1207 c.add(first);
1204 1208 ++n;
1205 1209 }
1206 1210 return n;
1207 1211 } finally {
1208 1212 lock.unlock();
1209 1213 }
1210 1214 }
1211 1215
1212 1216 public Object[] toArray() {
1213 1217 final ReentrantLock lock = this.lock;
1214 1218 lock.lock();
1215 1219 try {
1216 1220 return Arrays.copyOf(queue, size, Object[].class);
1217 1221 } finally {
1218 1222 lock.unlock();
1219 1223 }
1220 1224 }
1221 1225
1222 1226 @SuppressWarnings("unchecked")
1223 1227 public <T> T[] toArray(T[] a) {
1224 1228 final ReentrantLock lock = this.lock;
1225 1229 lock.lock();
1226 1230 try {
1227 1231 if (a.length < size)
1228 1232 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1229 1233 System.arraycopy(queue, 0, a, 0, size);
1230 1234 if (a.length > size)
1231 1235 a[size] = null;
1232 1236 return a;
1233 1237 } finally {
1234 1238 lock.unlock();
1235 1239 }
1236 1240 }
1237 1241
1238 1242 public Iterator<Runnable> iterator() {
1239 1243 return new Itr(Arrays.copyOf(queue, size));
1240 1244 }
1241 1245
1242 1246 /**
1243 1247 * Snapshot iterator that works off copy of underlying q array.
1244 1248 */
1245 1249 private class Itr implements Iterator<Runnable> {
1246 1250 final RunnableScheduledFuture[] array;
1247 1251 int cursor = 0; // index of next element to return
1248 1252 int lastRet = -1; // index of last element, or -1 if no such
1249 1253
1250 1254 Itr(RunnableScheduledFuture[] array) {
1251 1255 this.array = array;
1252 1256 }
1253 1257
1254 1258 public boolean hasNext() {
1255 1259 return cursor < array.length;
1256 1260 }
1257 1261
1258 1262 public Runnable next() {
1259 1263 if (cursor >= array.length)
1260 1264 throw new NoSuchElementException();
1261 1265 lastRet = cursor;
1262 1266 return array[cursor++];
1263 1267 }
1264 1268
1265 1269 public void remove() {
1266 1270 if (lastRet < 0)
1267 1271 throw new IllegalStateException();
1268 1272 DelayedWorkQueue.this.remove(array[lastRet]);
1269 1273 lastRet = -1;
1270 1274 }
1271 1275 }
1272 1276 }
1273 1277 }
↓ open down ↓ |
786 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX