1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 import static java.util.concurrent.TimeUnit.NANOSECONDS; 38 import static java.util.concurrent.TimeUnit.SECONDS; 39 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.Collections; 43 import java.util.HashSet; 44 import java.util.List; 45 import java.util.concurrent.BlockingQueue; 46 import java.util.concurrent.Callable; 47 import java.util.concurrent.CancellationException; 48 import java.util.concurrent.CountDownLatch; 49 import java.util.concurrent.ExecutionException; 50 import java.util.concurrent.ExecutorService; 51 import java.util.concurrent.Future; 52 import java.util.concurrent.RejectedExecutionException; 53 import java.util.concurrent.ScheduledFuture; 54 import java.util.concurrent.ScheduledThreadPoolExecutor; 55 import java.util.concurrent.ThreadFactory; 56 import java.util.concurrent.ThreadLocalRandom; 57 import java.util.concurrent.ThreadPoolExecutor; 58 import java.util.concurrent.atomic.AtomicBoolean; 59 import java.util.concurrent.atomic.AtomicInteger; 60 import java.util.concurrent.atomic.AtomicLong; 61 import java.util.stream.Stream; 62 63 import junit.framework.Test; 64 import junit.framework.TestSuite; 65 66 public class ScheduledExecutorTest extends JSR166TestCase { 67 public static void main(String[] args) { 68 main(suite(), args); 69 } 70 public static Test suite() { 71 return new TestSuite(ScheduledExecutorTest.class); 72 } 73 74 /** 75 * execute successfully executes a runnable 76 */ 77 public void testExecute() throws InterruptedException { 78 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 79 try (PoolCleaner cleaner = cleaner(p)) { 80 final CountDownLatch done = new CountDownLatch(1); 81 final Runnable task = new CheckedRunnable() { 82 public void realRun() { done.countDown(); }}; 83 p.execute(task); 84 await(done); 85 } 86 } 87 88 /** 89 * delayed schedule of callable successfully executes after delay 90 */ 91 public void testSchedule1() throws Exception { 92 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 93 try (PoolCleaner cleaner = cleaner(p)) { 94 final long startTime = System.nanoTime(); 95 final CountDownLatch done = new CountDownLatch(1); 96 Callable task = new CheckedCallable<Boolean>() { 97 public Boolean realCall() { 98 done.countDown(); 99 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 100 return Boolean.TRUE; 101 }}; 102 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); 103 assertSame(Boolean.TRUE, f.get()); 104 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 105 assertEquals(0L, done.getCount()); 106 } 107 } 108 109 /** 110 * delayed schedule of runnable successfully executes after delay 111 */ 112 public void testSchedule3() throws Exception { 113 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 114 try (PoolCleaner cleaner = cleaner(p)) { 115 final long startTime = System.nanoTime(); 116 final CountDownLatch done = new CountDownLatch(1); 117 Runnable task = new CheckedRunnable() { 118 public void realRun() { 119 done.countDown(); 120 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 121 }}; 122 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); 123 await(done); 124 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS)); 125 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 126 } 127 } 128 129 /** 130 * scheduleAtFixedRate executes runnable after given initial delay 131 */ 132 public void testSchedule4() throws Exception { 133 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 134 try (PoolCleaner cleaner = cleaner(p)) { 135 final long startTime = System.nanoTime(); 136 final CountDownLatch done = new CountDownLatch(1); 137 Runnable task = new CheckedRunnable() { 138 public void realRun() { 139 done.countDown(); 140 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 141 }}; 142 ScheduledFuture f = 143 p.scheduleAtFixedRate(task, timeoutMillis(), 144 LONG_DELAY_MS, MILLISECONDS); 145 await(done); 146 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 147 f.cancel(true); 148 } 149 } 150 151 /** 152 * scheduleWithFixedDelay executes runnable after given initial delay 153 */ 154 public void testSchedule5() throws Exception { 155 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 156 try (PoolCleaner cleaner = cleaner(p)) { 157 final long startTime = System.nanoTime(); 158 final CountDownLatch done = new CountDownLatch(1); 159 Runnable task = new CheckedRunnable() { 160 public void realRun() { 161 done.countDown(); 162 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 163 }}; 164 ScheduledFuture f = 165 p.scheduleWithFixedDelay(task, timeoutMillis(), 166 LONG_DELAY_MS, MILLISECONDS); 167 await(done); 168 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 169 f.cancel(true); 170 } 171 } 172 173 static class RunnableCounter implements Runnable { 174 AtomicInteger count = new AtomicInteger(0); 175 public void run() { count.getAndIncrement(); } 176 } 177 178 /** 179 * scheduleAtFixedRate executes series of tasks at given rate. 180 * Eventually, it must hold that: 181 * cycles - 1 <= elapsedMillis/delay < cycles 182 */ 183 public void testFixedRateSequence() throws InterruptedException { 184 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 185 try (PoolCleaner cleaner = cleaner(p)) { 186 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { 187 final long startTime = System.nanoTime(); 188 final int cycles = 8; 189 final CountDownLatch done = new CountDownLatch(cycles); 190 final Runnable task = new CheckedRunnable() { 191 public void realRun() { done.countDown(); }}; 192 final ScheduledFuture periodicTask = 193 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS); 194 final int totalDelayMillis = (cycles - 1) * delay; 195 await(done, totalDelayMillis + LONG_DELAY_MS); 196 periodicTask.cancel(true); 197 final long elapsedMillis = millisElapsedSince(startTime); 198 assertTrue(elapsedMillis >= totalDelayMillis); 199 if (elapsedMillis <= cycles * delay) 200 return; 201 // else retry with longer delay 202 } 203 fail("unexpected execution rate"); 204 } 205 } 206 207 /** 208 * scheduleWithFixedDelay executes series of tasks with given period. 209 * Eventually, it must hold that each task starts at least delay and at 210 * most 2 * delay after the termination of the previous task. 211 */ 212 public void testFixedDelaySequence() throws InterruptedException { 213 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 214 try (PoolCleaner cleaner = cleaner(p)) { 215 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { 216 final long startTime = System.nanoTime(); 217 final AtomicLong previous = new AtomicLong(startTime); 218 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false); 219 final int cycles = 8; 220 final CountDownLatch done = new CountDownLatch(cycles); 221 final int d = delay; 222 final Runnable task = new CheckedRunnable() { 223 public void realRun() { 224 long now = System.nanoTime(); 225 long elapsedMillis 226 = NANOSECONDS.toMillis(now - previous.get()); 227 if (done.getCount() == cycles) { // first execution 228 if (elapsedMillis >= d) 229 tryLongerDelay.set(true); 230 } else { 231 assertTrue(elapsedMillis >= d); 232 if (elapsedMillis >= 2 * d) 233 tryLongerDelay.set(true); 234 } 235 previous.set(now); 236 done.countDown(); 237 }}; 238 final ScheduledFuture periodicTask = 239 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS); 240 final int totalDelayMillis = (cycles - 1) * delay; 241 await(done, totalDelayMillis + cycles * LONG_DELAY_MS); 242 periodicTask.cancel(true); 243 final long elapsedMillis = millisElapsedSince(startTime); 244 assertTrue(elapsedMillis >= totalDelayMillis); 245 if (!tryLongerDelay.get()) 246 return; 247 // else retry with longer delay 248 } 249 fail("unexpected execution rate"); 250 } 251 } 252 253 /** 254 * Submitting null tasks throws NullPointerException 255 */ 256 public void testNullTaskSubmission() { 257 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 258 try (PoolCleaner cleaner = cleaner(p)) { 259 assertNullTaskSubmissionThrowsNullPointerException(p); 260 } 261 } 262 263 /** 264 * Submitted tasks are rejected when shutdown 265 */ 266 public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException { 267 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 268 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 269 final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize()); 270 final CountDownLatch done = new CountDownLatch(1); 271 final Runnable r = () -> { 272 threadsStarted.countDown(); 273 for (;;) { 274 try { 275 done.await(); 276 return; 277 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} 278 }}; 279 final Callable<Boolean> c = () -> { 280 threadsStarted.countDown(); 281 for (;;) { 282 try { 283 done.await(); 284 return Boolean.TRUE; 285 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} 286 }}; 287 288 try (PoolCleaner cleaner = cleaner(p, done)) { 289 for (int i = p.getCorePoolSize(); i--> 0; ) { 290 switch (rnd.nextInt(4)) { 291 case 0: p.execute(r); break; 292 case 1: assertFalse(p.submit(r).isDone()); break; 293 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break; 294 case 3: assertFalse(p.submit(c).isDone()); break; 295 } 296 } 297 298 // ScheduledThreadPoolExecutor has an unbounded queue, so never saturated. 299 await(threadsStarted); 300 301 if (rnd.nextBoolean()) 302 p.shutdownNow(); 303 else 304 p.shutdown(); 305 // Pool is shutdown, but not yet terminated 306 assertTaskSubmissionsAreRejected(p); 307 assertFalse(p.isTerminated()); 308 309 done.countDown(); // release blocking tasks 310 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 311 312 assertTaskSubmissionsAreRejected(p); 313 } 314 assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount()); 315 } 316 317 /** 318 * getActiveCount increases but doesn't overestimate, when a 319 * thread becomes active 320 */ 321 public void testGetActiveCount() throws InterruptedException { 322 final CountDownLatch done = new CountDownLatch(1); 323 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2); 324 try (PoolCleaner cleaner = cleaner(p, done)) { 325 final CountDownLatch threadStarted = new CountDownLatch(1); 326 assertEquals(0, p.getActiveCount()); 327 p.execute(new CheckedRunnable() { 328 public void realRun() throws InterruptedException { 329 threadStarted.countDown(); 330 assertEquals(1, p.getActiveCount()); 331 await(done); 332 }}); 333 await(threadStarted); 334 assertEquals(1, p.getActiveCount()); 335 } 336 } 337 338 /** 339 * getCompletedTaskCount increases, but doesn't overestimate, 340 * when tasks complete 341 */ 342 public void testGetCompletedTaskCount() throws InterruptedException { 343 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(2); 344 try (PoolCleaner cleaner = cleaner(p)) { 345 final CountDownLatch threadStarted = new CountDownLatch(1); 346 final CountDownLatch threadProceed = new CountDownLatch(1); 347 final CountDownLatch threadDone = new CountDownLatch(1); 348 assertEquals(0, p.getCompletedTaskCount()); 349 p.execute(new CheckedRunnable() { 350 public void realRun() throws InterruptedException { 351 threadStarted.countDown(); 352 assertEquals(0, p.getCompletedTaskCount()); 353 await(threadProceed); 354 threadDone.countDown(); 355 }}); 356 await(threadStarted); 357 assertEquals(0, p.getCompletedTaskCount()); 358 threadProceed.countDown(); 359 await(threadDone); 360 long startTime = System.nanoTime(); 361 while (p.getCompletedTaskCount() != 1) { 362 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 363 fail("timed out"); 364 Thread.yield(); 365 } 366 } 367 } 368 369 /** 370 * getCorePoolSize returns size given in constructor if not otherwise set 371 */ 372 public void testGetCorePoolSize() throws InterruptedException { 373 ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 374 try (PoolCleaner cleaner = cleaner(p)) { 375 assertEquals(1, p.getCorePoolSize()); 376 } 377 } 378 379 /** 380 * getLargestPoolSize increases, but doesn't overestimate, when 381 * multiple threads active 382 */ 383 public void testGetLargestPoolSize() throws InterruptedException { 384 final int THREADS = 3; 385 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(THREADS); 386 final CountDownLatch threadsStarted = new CountDownLatch(THREADS); 387 final CountDownLatch done = new CountDownLatch(1); 388 try (PoolCleaner cleaner = cleaner(p, done)) { 389 assertEquals(0, p.getLargestPoolSize()); 390 for (int i = 0; i < THREADS; i++) 391 p.execute(new CheckedRunnable() { 392 public void realRun() throws InterruptedException { 393 threadsStarted.countDown(); 394 await(done); 395 assertEquals(THREADS, p.getLargestPoolSize()); 396 }}); 397 await(threadsStarted); 398 assertEquals(THREADS, p.getLargestPoolSize()); 399 } 400 assertEquals(THREADS, p.getLargestPoolSize()); 401 } 402 403 /** 404 * getPoolSize increases, but doesn't overestimate, when threads 405 * become active 406 */ 407 public void testGetPoolSize() throws InterruptedException { 408 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 409 final CountDownLatch threadStarted = new CountDownLatch(1); 410 final CountDownLatch done = new CountDownLatch(1); 411 try (PoolCleaner cleaner = cleaner(p, done)) { 412 assertEquals(0, p.getPoolSize()); 413 p.execute(new CheckedRunnable() { 414 public void realRun() throws InterruptedException { 415 threadStarted.countDown(); 416 assertEquals(1, p.getPoolSize()); 417 await(done); 418 }}); 419 await(threadStarted); 420 assertEquals(1, p.getPoolSize()); 421 } 422 } 423 424 /** 425 * getTaskCount increases, but doesn't overestimate, when tasks 426 * submitted 427 */ 428 public void testGetTaskCount() throws InterruptedException { 429 final int TASKS = 3; 430 final CountDownLatch done = new CountDownLatch(1); 431 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 432 try (PoolCleaner cleaner = cleaner(p, done)) { 433 final CountDownLatch threadStarted = new CountDownLatch(1); 434 assertEquals(0, p.getTaskCount()); 435 assertEquals(0, p.getCompletedTaskCount()); 436 p.execute(new CheckedRunnable() { 437 public void realRun() throws InterruptedException { 438 threadStarted.countDown(); 439 await(done); 440 }}); 441 await(threadStarted); 442 assertEquals(1, p.getTaskCount()); 443 assertEquals(0, p.getCompletedTaskCount()); 444 for (int i = 0; i < TASKS; i++) { 445 assertEquals(1 + i, p.getTaskCount()); 446 p.execute(new CheckedRunnable() { 447 public void realRun() throws InterruptedException { 448 threadStarted.countDown(); 449 assertEquals(1 + TASKS, p.getTaskCount()); 450 await(done); 451 }}); 452 } 453 assertEquals(1 + TASKS, p.getTaskCount()); 454 assertEquals(0, p.getCompletedTaskCount()); 455 } 456 assertEquals(1 + TASKS, p.getTaskCount()); 457 assertEquals(1 + TASKS, p.getCompletedTaskCount()); 458 } 459 460 /** 461 * getThreadFactory returns factory in constructor if not set 462 */ 463 public void testGetThreadFactory() throws InterruptedException { 464 final ThreadFactory threadFactory = new SimpleThreadFactory(); 465 final ScheduledThreadPoolExecutor p = 466 new ScheduledThreadPoolExecutor(1, threadFactory); 467 try (PoolCleaner cleaner = cleaner(p)) { 468 assertSame(threadFactory, p.getThreadFactory()); 469 } 470 } 471 472 /** 473 * setThreadFactory sets the thread factory returned by getThreadFactory 474 */ 475 public void testSetThreadFactory() throws InterruptedException { 476 ThreadFactory threadFactory = new SimpleThreadFactory(); 477 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 478 try (PoolCleaner cleaner = cleaner(p)) { 479 p.setThreadFactory(threadFactory); 480 assertSame(threadFactory, p.getThreadFactory()); 481 } 482 } 483 484 /** 485 * setThreadFactory(null) throws NPE 486 */ 487 public void testSetThreadFactoryNull() throws InterruptedException { 488 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 489 try (PoolCleaner cleaner = cleaner(p)) { 490 try { 491 p.setThreadFactory(null); 492 shouldThrow(); 493 } catch (NullPointerException success) {} 494 } 495 } 496 497 /** 498 * The default rejected execution handler is AbortPolicy. 499 */ 500 public void testDefaultRejectedExecutionHandler() { 501 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 502 try (PoolCleaner cleaner = cleaner(p)) { 503 assertTrue(p.getRejectedExecutionHandler() 504 instanceof ThreadPoolExecutor.AbortPolicy); 505 } 506 } 507 508 /** 509 * isShutdown is false before shutdown, true after 510 */ 511 public void testIsShutdown() { 512 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 513 assertFalse(p.isShutdown()); 514 try (PoolCleaner cleaner = cleaner(p)) { 515 try { 516 p.shutdown(); 517 assertTrue(p.isShutdown()); 518 } catch (SecurityException ok) {} 519 } 520 } 521 522 /** 523 * isTerminated is false before termination, true after 524 */ 525 public void testIsTerminated() throws InterruptedException { 526 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 527 try (PoolCleaner cleaner = cleaner(p)) { 528 final CountDownLatch threadStarted = new CountDownLatch(1); 529 final CountDownLatch done = new CountDownLatch(1); 530 assertFalse(p.isTerminated()); 531 p.execute(new CheckedRunnable() { 532 public void realRun() throws InterruptedException { 533 assertFalse(p.isTerminated()); 534 threadStarted.countDown(); 535 await(done); 536 }}); 537 await(threadStarted); 538 assertFalse(p.isTerminating()); 539 done.countDown(); 540 try { p.shutdown(); } catch (SecurityException ok) { return; } 541 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 542 assertTrue(p.isTerminated()); 543 } 544 } 545 546 /** 547 * isTerminating is not true when running or when terminated 548 */ 549 public void testIsTerminating() throws InterruptedException { 550 final ThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 551 final CountDownLatch threadStarted = new CountDownLatch(1); 552 final CountDownLatch done = new CountDownLatch(1); 553 try (PoolCleaner cleaner = cleaner(p)) { 554 assertFalse(p.isTerminating()); 555 p.execute(new CheckedRunnable() { 556 public void realRun() throws InterruptedException { 557 assertFalse(p.isTerminating()); 558 threadStarted.countDown(); 559 await(done); 560 }}); 561 await(threadStarted); 562 assertFalse(p.isTerminating()); 563 done.countDown(); 564 try { p.shutdown(); } catch (SecurityException ok) { return; } 565 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 566 assertTrue(p.isTerminated()); 567 assertFalse(p.isTerminating()); 568 } 569 } 570 571 /** 572 * getQueue returns the work queue, which contains queued tasks 573 */ 574 public void testGetQueue() throws InterruptedException { 575 final CountDownLatch done = new CountDownLatch(1); 576 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 577 try (PoolCleaner cleaner = cleaner(p, done)) { 578 final CountDownLatch threadStarted = new CountDownLatch(1); 579 ScheduledFuture[] tasks = new ScheduledFuture[5]; 580 for (int i = 0; i < tasks.length; i++) { 581 Runnable r = new CheckedRunnable() { 582 public void realRun() throws InterruptedException { 583 threadStarted.countDown(); 584 await(done); 585 }}; 586 tasks[i] = p.schedule(r, 1, MILLISECONDS); 587 } 588 await(threadStarted); 589 BlockingQueue<Runnable> q = p.getQueue(); 590 assertTrue(q.contains(tasks[tasks.length - 1])); 591 assertFalse(q.contains(tasks[0])); 592 } 593 } 594 595 /** 596 * remove(task) removes queued task, and fails to remove active task 597 */ 598 public void testRemove() throws InterruptedException { 599 final CountDownLatch done = new CountDownLatch(1); 600 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 601 try (PoolCleaner cleaner = cleaner(p, done)) { 602 ScheduledFuture[] tasks = new ScheduledFuture[5]; 603 final CountDownLatch threadStarted = new CountDownLatch(1); 604 for (int i = 0; i < tasks.length; i++) { 605 Runnable r = new CheckedRunnable() { 606 public void realRun() throws InterruptedException { 607 threadStarted.countDown(); 608 await(done); 609 }}; 610 tasks[i] = p.schedule(r, 1, MILLISECONDS); 611 } 612 await(threadStarted); 613 BlockingQueue<Runnable> q = p.getQueue(); 614 assertFalse(p.remove((Runnable)tasks[0])); 615 assertTrue(q.contains((Runnable)tasks[4])); 616 assertTrue(q.contains((Runnable)tasks[3])); 617 assertTrue(p.remove((Runnable)tasks[4])); 618 assertFalse(p.remove((Runnable)tasks[4])); 619 assertFalse(q.contains((Runnable)tasks[4])); 620 assertTrue(q.contains((Runnable)tasks[3])); 621 assertTrue(p.remove((Runnable)tasks[3])); 622 assertFalse(q.contains((Runnable)tasks[3])); 623 } 624 } 625 626 /** 627 * purge eventually removes cancelled tasks from the queue 628 */ 629 public void testPurge() throws InterruptedException { 630 final ScheduledFuture[] tasks = new ScheduledFuture[5]; 631 final Runnable releaser = new Runnable() { public void run() { 632 for (ScheduledFuture task : tasks) 633 if (task != null) task.cancel(true); }}; 634 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 635 try (PoolCleaner cleaner = cleaner(p, releaser)) { 636 for (int i = 0; i < tasks.length; i++) 637 tasks[i] = p.schedule(new SmallPossiblyInterruptedRunnable(), 638 LONG_DELAY_MS, MILLISECONDS); 639 int max = tasks.length; 640 if (tasks[4].cancel(true)) --max; 641 if (tasks[3].cancel(true)) --max; 642 // There must eventually be an interference-free point at 643 // which purge will not fail. (At worst, when queue is empty.) 644 long startTime = System.nanoTime(); 645 do { 646 p.purge(); 647 long count = p.getTaskCount(); 648 if (count == max) 649 return; 650 } while (millisElapsedSince(startTime) < LONG_DELAY_MS); 651 fail("Purge failed to remove cancelled tasks"); 652 } 653 } 654 655 /** 656 * shutdownNow returns a list containing tasks that were not run, 657 * and those tasks are drained from the queue 658 */ 659 public void testShutdownNow() throws InterruptedException { 660 final int poolSize = 2; 661 final int count = 5; 662 final AtomicInteger ran = new AtomicInteger(0); 663 final ScheduledThreadPoolExecutor p = 664 new ScheduledThreadPoolExecutor(poolSize); 665 final CountDownLatch threadsStarted = new CountDownLatch(poolSize); 666 Runnable waiter = new CheckedRunnable() { public void realRun() { 667 threadsStarted.countDown(); 668 try { 669 MILLISECONDS.sleep(2 * LONG_DELAY_MS); 670 } catch (InterruptedException success) {} 671 ran.getAndIncrement(); 672 }}; 673 for (int i = 0; i < count; i++) 674 p.execute(waiter); 675 await(threadsStarted); 676 assertEquals(poolSize, p.getActiveCount()); 677 assertEquals(0, p.getCompletedTaskCount()); 678 final List<Runnable> queuedTasks; 679 try { 680 queuedTasks = p.shutdownNow(); 681 } catch (SecurityException ok) { 682 return; // Allowed in case test doesn't have privs 683 } 684 assertTrue(p.isShutdown()); 685 assertTrue(p.getQueue().isEmpty()); 686 assertEquals(count - poolSize, queuedTasks.size()); 687 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 688 assertTrue(p.isTerminated()); 689 assertEquals(poolSize, ran.get()); 690 assertEquals(poolSize, p.getCompletedTaskCount()); 691 } 692 693 /** 694 * shutdownNow returns a list containing tasks that were not run, 695 * and those tasks are drained from the queue 696 */ 697 public void testShutdownNow_delayedTasks() throws InterruptedException { 698 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 699 List<ScheduledFuture> tasks = new ArrayList<>(); 700 for (int i = 0; i < 3; i++) { 701 Runnable r = new NoOpRunnable(); 702 tasks.add(p.schedule(r, 9, SECONDS)); 703 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS)); 704 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS)); 705 } 706 if (testImplementationDetails) 707 assertEquals(new HashSet(tasks), new HashSet(p.getQueue())); 708 final List<Runnable> queuedTasks; 709 try { 710 queuedTasks = p.shutdownNow(); 711 } catch (SecurityException ok) { 712 return; // Allowed in case test doesn't have privs 713 } 714 assertTrue(p.isShutdown()); 715 assertTrue(p.getQueue().isEmpty()); 716 if (testImplementationDetails) 717 assertEquals(new HashSet(tasks), new HashSet(queuedTasks)); 718 assertEquals(tasks.size(), queuedTasks.size()); 719 for (ScheduledFuture task : tasks) { 720 assertFalse(task.isDone()); 721 assertFalse(task.isCancelled()); 722 } 723 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 724 assertTrue(p.isTerminated()); 725 } 726 727 /** 728 * By default, periodic tasks are cancelled at shutdown. 729 * By default, delayed tasks keep running after shutdown. 730 * Check that changing the default values work: 731 * - setExecuteExistingDelayedTasksAfterShutdownPolicy 732 * - setContinueExistingPeriodicTasksAfterShutdownPolicy 733 */ 734 @SuppressWarnings("FutureReturnValueIgnored") 735 public void testShutdown_cancellation() throws Exception { 736 final int poolSize = 4; 737 final ScheduledThreadPoolExecutor p 738 = new ScheduledThreadPoolExecutor(poolSize); 739 final BlockingQueue<Runnable> q = p.getQueue(); 740 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 741 final long delay = rnd.nextInt(2); 742 final int rounds = rnd.nextInt(1, 3); 743 final boolean effectiveDelayedPolicy; 744 final boolean effectivePeriodicPolicy; 745 final boolean effectiveRemovePolicy; 746 747 if (rnd.nextBoolean()) 748 p.setExecuteExistingDelayedTasksAfterShutdownPolicy( 749 effectiveDelayedPolicy = rnd.nextBoolean()); 750 else 751 effectiveDelayedPolicy = true; 752 assertEquals(effectiveDelayedPolicy, 753 p.getExecuteExistingDelayedTasksAfterShutdownPolicy()); 754 755 if (rnd.nextBoolean()) 756 p.setContinueExistingPeriodicTasksAfterShutdownPolicy( 757 effectivePeriodicPolicy = rnd.nextBoolean()); 758 else 759 effectivePeriodicPolicy = false; 760 assertEquals(effectivePeriodicPolicy, 761 p.getContinueExistingPeriodicTasksAfterShutdownPolicy()); 762 763 if (rnd.nextBoolean()) 764 p.setRemoveOnCancelPolicy( 765 effectiveRemovePolicy = rnd.nextBoolean()); 766 else 767 effectiveRemovePolicy = false; 768 assertEquals(effectiveRemovePolicy, 769 p.getRemoveOnCancelPolicy()); 770 771 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean(); 772 773 // Strategy: Wedge the pool with one wave of "blocker" tasks, 774 // then add a second wave that waits in the queue until unblocked. 775 final AtomicInteger ran = new AtomicInteger(0); 776 final CountDownLatch poolBlocked = new CountDownLatch(poolSize); 777 final CountDownLatch unblock = new CountDownLatch(1); 778 final RuntimeException exception = new RuntimeException(); 779 780 class Task implements Runnable { 781 public void run() { 782 try { 783 ran.getAndIncrement(); 784 poolBlocked.countDown(); 785 await(unblock); 786 } catch (Throwable fail) { threadUnexpectedException(fail); } 787 } 788 } 789 790 class PeriodicTask extends Task { 791 PeriodicTask(int rounds) { this.rounds = rounds; } 792 int rounds; 793 public void run() { 794 if (--rounds == 0) super.run(); 795 // throw exception to surely terminate this periodic task, 796 // but in a separate execution and in a detectable way. 797 if (rounds == -1) throw exception; 798 } 799 } 800 801 Runnable task = new Task(); 802 803 List<Future<?>> immediates = new ArrayList<>(); 804 List<Future<?>> delayeds = new ArrayList<>(); 805 List<Future<?>> periodics = new ArrayList<>(); 806 807 immediates.add(p.submit(task)); 808 delayeds.add(p.schedule(task, delay, MILLISECONDS)); 809 periodics.add(p.scheduleAtFixedRate( 810 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 811 periodics.add(p.scheduleWithFixedDelay( 812 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 813 814 await(poolBlocked); 815 816 assertEquals(poolSize, ran.get()); 817 assertEquals(poolSize, p.getActiveCount()); 818 assertTrue(q.isEmpty()); 819 820 // Add second wave of tasks. 821 immediates.add(p.submit(task)); 822 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS)); 823 periodics.add(p.scheduleAtFixedRate( 824 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 825 periodics.add(p.scheduleWithFixedDelay( 826 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 827 828 assertEquals(poolSize, q.size()); 829 assertEquals(poolSize, ran.get()); 830 831 immediates.forEach( 832 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L)); 833 834 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream) 835 .forEach(f -> assertFalse(f.isDone())); 836 837 try { p.shutdown(); } catch (SecurityException ok) { return; } 838 assertTrue(p.isShutdown()); 839 assertTrue(p.isTerminating()); 840 assertFalse(p.isTerminated()); 841 842 if (rnd.nextBoolean()) 843 assertThrows( 844 RejectedExecutionException.class, 845 () -> p.submit(task), 846 () -> p.schedule(task, 1, SECONDS), 847 () -> p.scheduleAtFixedRate( 848 new PeriodicTask(1), 1, 1, SECONDS), 849 () -> p.scheduleWithFixedDelay( 850 new PeriodicTask(2), 1, 1, SECONDS)); 851 852 assertTrue(q.contains(immediates.get(1))); 853 assertTrue(!effectiveDelayedPolicy 854 ^ q.contains(delayeds.get(1))); 855 assertTrue(!effectivePeriodicPolicy 856 ^ q.containsAll(periodics.subList(2, 4))); 857 858 immediates.forEach(f -> assertFalse(f.isDone())); 859 860 assertFalse(delayeds.get(0).isDone()); 861 if (effectiveDelayedPolicy) 862 assertFalse(delayeds.get(1).isDone()); 863 else 864 assertTrue(delayeds.get(1).isCancelled()); 865 866 if (effectivePeriodicPolicy) 867 periodics.forEach( 868 f -> { 869 assertFalse(f.isDone()); 870 if (!periodicTasksContinue) { 871 assertTrue(f.cancel(false)); 872 assertTrue(f.isCancelled()); 873 } 874 }); 875 else { 876 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone())); 877 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled())); 878 } 879 880 unblock.countDown(); // Release all pool threads 881 882 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 883 assertFalse(p.isTerminating()); 884 assertTrue(p.isTerminated()); 885 886 assertTrue(q.isEmpty()); 887 888 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream) 889 .forEach(f -> assertTrue(f.isDone())); 890 891 for (Future<?> f : immediates) assertNull(f.get()); 892 893 assertNull(delayeds.get(0).get()); 894 if (effectiveDelayedPolicy) 895 assertNull(delayeds.get(1).get()); 896 else 897 assertTrue(delayeds.get(1).isCancelled()); 898 899 if (periodicTasksContinue) 900 periodics.forEach( 901 f -> { 902 try { f.get(); } 903 catch (ExecutionException success) { 904 assertSame(exception, success.getCause()); 905 } 906 catch (Throwable fail) { threadUnexpectedException(fail); } 907 }); 908 else 909 periodics.forEach(f -> assertTrue(f.isCancelled())); 910 911 assertEquals(poolSize + 1 912 + (effectiveDelayedPolicy ? 1 : 0) 913 + (periodicTasksContinue ? 2 : 0), 914 ran.get()); 915 } 916 917 /** 918 * completed submit of callable returns result 919 */ 920 public void testSubmitCallable() throws Exception { 921 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 922 try (PoolCleaner cleaner = cleaner(e)) { 923 Future<String> future = e.submit(new StringTask()); 924 String result = future.get(); 925 assertSame(TEST_STRING, result); 926 } 927 } 928 929 /** 930 * completed submit of runnable returns successfully 931 */ 932 public void testSubmitRunnable() throws Exception { 933 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 934 try (PoolCleaner cleaner = cleaner(e)) { 935 Future<?> future = e.submit(new NoOpRunnable()); 936 future.get(); 937 assertTrue(future.isDone()); 938 } 939 } 940 941 /** 942 * completed submit of (runnable, result) returns result 943 */ 944 public void testSubmitRunnable2() throws Exception { 945 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 946 try (PoolCleaner cleaner = cleaner(e)) { 947 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING); 948 String result = future.get(); 949 assertSame(TEST_STRING, result); 950 } 951 } 952 953 /** 954 * invokeAny(null) throws NullPointerException 955 */ 956 public void testInvokeAny1() throws Exception { 957 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 958 try (PoolCleaner cleaner = cleaner(e)) { 959 try { 960 e.invokeAny(null); 961 shouldThrow(); 962 } catch (NullPointerException success) {} 963 } 964 } 965 966 /** 967 * invokeAny(empty collection) throws IllegalArgumentException 968 */ 969 public void testInvokeAny2() throws Exception { 970 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 971 try (PoolCleaner cleaner = cleaner(e)) { 972 try { 973 e.invokeAny(new ArrayList<Callable<String>>()); 974 shouldThrow(); 975 } catch (IllegalArgumentException success) {} 976 } 977 } 978 979 /** 980 * invokeAny(c) throws NullPointerException if c has null elements 981 */ 982 public void testInvokeAny3() throws Exception { 983 CountDownLatch latch = new CountDownLatch(1); 984 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 985 try (PoolCleaner cleaner = cleaner(e)) { 986 List<Callable<String>> l = new ArrayList<>(); 987 l.add(latchAwaitingStringTask(latch)); 988 l.add(null); 989 try { 990 e.invokeAny(l); 991 shouldThrow(); 992 } catch (NullPointerException success) {} 993 latch.countDown(); 994 } 995 } 996 997 /** 998 * invokeAny(c) throws ExecutionException if no task completes 999 */ 1000 public void testInvokeAny4() throws Exception { 1001 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1002 try (PoolCleaner cleaner = cleaner(e)) { 1003 List<Callable<String>> l = new ArrayList<>(); 1004 l.add(new NPETask()); 1005 try { 1006 e.invokeAny(l); 1007 shouldThrow(); 1008 } catch (ExecutionException success) { 1009 assertTrue(success.getCause() instanceof NullPointerException); 1010 } 1011 } 1012 } 1013 1014 /** 1015 * invokeAny(c) returns result of some task 1016 */ 1017 public void testInvokeAny5() throws Exception { 1018 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1019 try (PoolCleaner cleaner = cleaner(e)) { 1020 List<Callable<String>> l = new ArrayList<>(); 1021 l.add(new StringTask()); 1022 l.add(new StringTask()); 1023 String result = e.invokeAny(l); 1024 assertSame(TEST_STRING, result); 1025 } 1026 } 1027 1028 /** 1029 * invokeAll(null) throws NPE 1030 */ 1031 public void testInvokeAll1() throws Exception { 1032 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1033 try (PoolCleaner cleaner = cleaner(e)) { 1034 try { 1035 e.invokeAll(null); 1036 shouldThrow(); 1037 } catch (NullPointerException success) {} 1038 } 1039 } 1040 1041 /** 1042 * invokeAll(empty collection) returns empty list 1043 */ 1044 public void testInvokeAll2() throws Exception { 1045 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1046 final Collection<Callable<String>> emptyCollection 1047 = Collections.emptyList(); 1048 try (PoolCleaner cleaner = cleaner(e)) { 1049 List<Future<String>> r = e.invokeAll(emptyCollection); 1050 assertTrue(r.isEmpty()); 1051 } 1052 } 1053 1054 /** 1055 * invokeAll(c) throws NPE if c has null elements 1056 */ 1057 public void testInvokeAll3() throws Exception { 1058 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1059 try (PoolCleaner cleaner = cleaner(e)) { 1060 List<Callable<String>> l = new ArrayList<>(); 1061 l.add(new StringTask()); 1062 l.add(null); 1063 try { 1064 e.invokeAll(l); 1065 shouldThrow(); 1066 } catch (NullPointerException success) {} 1067 } 1068 } 1069 1070 /** 1071 * get of invokeAll(c) throws exception on failed task 1072 */ 1073 public void testInvokeAll4() throws Exception { 1074 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1075 try (PoolCleaner cleaner = cleaner(e)) { 1076 List<Callable<String>> l = new ArrayList<>(); 1077 l.add(new NPETask()); 1078 List<Future<String>> futures = e.invokeAll(l); 1079 assertEquals(1, futures.size()); 1080 try { 1081 futures.get(0).get(); 1082 shouldThrow(); 1083 } catch (ExecutionException success) { 1084 assertTrue(success.getCause() instanceof NullPointerException); 1085 } 1086 } 1087 } 1088 1089 /** 1090 * invokeAll(c) returns results of all completed tasks 1091 */ 1092 public void testInvokeAll5() throws Exception { 1093 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1094 try (PoolCleaner cleaner = cleaner(e)) { 1095 List<Callable<String>> l = new ArrayList<>(); 1096 l.add(new StringTask()); 1097 l.add(new StringTask()); 1098 List<Future<String>> futures = e.invokeAll(l); 1099 assertEquals(2, futures.size()); 1100 for (Future<String> future : futures) 1101 assertSame(TEST_STRING, future.get()); 1102 } 1103 } 1104 1105 /** 1106 * timed invokeAny(null) throws NPE 1107 */ 1108 public void testTimedInvokeAny1() throws Exception { 1109 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1110 try (PoolCleaner cleaner = cleaner(e)) { 1111 try { 1112 e.invokeAny(null, randomTimeout(), randomTimeUnit()); 1113 shouldThrow(); 1114 } catch (NullPointerException success) {} 1115 } 1116 } 1117 1118 /** 1119 * timed invokeAny(,,null) throws NullPointerException 1120 */ 1121 public void testTimedInvokeAnyNullTimeUnit() throws Exception { 1122 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1123 try (PoolCleaner cleaner = cleaner(e)) { 1124 List<Callable<String>> l = new ArrayList<>(); 1125 l.add(new StringTask()); 1126 try { 1127 e.invokeAny(l, randomTimeout(), null); 1128 shouldThrow(); 1129 } catch (NullPointerException success) {} 1130 } 1131 } 1132 1133 /** 1134 * timed invokeAny(empty collection) throws IllegalArgumentException 1135 */ 1136 public void testTimedInvokeAny2() throws Exception { 1137 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1138 final Collection<Callable<String>> emptyCollection 1139 = Collections.emptyList(); 1140 try (PoolCleaner cleaner = cleaner(e)) { 1141 try { 1142 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit()); 1143 shouldThrow(); 1144 } catch (IllegalArgumentException success) {} 1145 } 1146 } 1147 1148 /** 1149 * timed invokeAny(c) throws NPE if c has null elements 1150 */ 1151 public void testTimedInvokeAny3() throws Exception { 1152 CountDownLatch latch = new CountDownLatch(1); 1153 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1154 try (PoolCleaner cleaner = cleaner(e)) { 1155 List<Callable<String>> l = new ArrayList<>(); 1156 l.add(latchAwaitingStringTask(latch)); 1157 l.add(null); 1158 try { 1159 e.invokeAny(l, randomTimeout(), randomTimeUnit()); 1160 shouldThrow(); 1161 } catch (NullPointerException success) {} 1162 latch.countDown(); 1163 } 1164 } 1165 1166 /** 1167 * timed invokeAny(c) throws ExecutionException if no task completes 1168 */ 1169 public void testTimedInvokeAny4() throws Exception { 1170 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1171 try (PoolCleaner cleaner = cleaner(e)) { 1172 long startTime = System.nanoTime(); 1173 List<Callable<String>> l = new ArrayList<>(); 1174 l.add(new NPETask()); 1175 try { 1176 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); 1177 shouldThrow(); 1178 } catch (ExecutionException success) { 1179 assertTrue(success.getCause() instanceof NullPointerException); 1180 } 1181 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1182 } 1183 } 1184 1185 /** 1186 * timed invokeAny(c) returns result of some task 1187 */ 1188 public void testTimedInvokeAny5() throws Exception { 1189 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1190 try (PoolCleaner cleaner = cleaner(e)) { 1191 long startTime = System.nanoTime(); 1192 List<Callable<String>> l = new ArrayList<>(); 1193 l.add(new StringTask()); 1194 l.add(new StringTask()); 1195 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); 1196 assertSame(TEST_STRING, result); 1197 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1198 } 1199 } 1200 1201 /** 1202 * timed invokeAll(null) throws NPE 1203 */ 1204 public void testTimedInvokeAll1() throws Exception { 1205 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1206 try (PoolCleaner cleaner = cleaner(e)) { 1207 try { 1208 e.invokeAll(null, randomTimeout(), randomTimeUnit()); 1209 shouldThrow(); 1210 } catch (NullPointerException success) {} 1211 } 1212 } 1213 1214 /** 1215 * timed invokeAll(,,null) throws NPE 1216 */ 1217 public void testTimedInvokeAllNullTimeUnit() throws Exception { 1218 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1219 try (PoolCleaner cleaner = cleaner(e)) { 1220 List<Callable<String>> l = new ArrayList<>(); 1221 l.add(new StringTask()); 1222 try { 1223 e.invokeAll(l, randomTimeout(), null); 1224 shouldThrow(); 1225 } catch (NullPointerException success) {} 1226 } 1227 } 1228 1229 /** 1230 * timed invokeAll(empty collection) returns empty list 1231 */ 1232 public void testTimedInvokeAll2() throws Exception { 1233 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1234 final Collection<Callable<String>> emptyCollection 1235 = Collections.emptyList(); 1236 try (PoolCleaner cleaner = cleaner(e)) { 1237 List<Future<String>> r = 1238 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit()); 1239 assertTrue(r.isEmpty()); 1240 } 1241 } 1242 1243 /** 1244 * timed invokeAll(c) throws NPE if c has null elements 1245 */ 1246 public void testTimedInvokeAll3() throws Exception { 1247 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1248 try (PoolCleaner cleaner = cleaner(e)) { 1249 List<Callable<String>> l = new ArrayList<>(); 1250 l.add(new StringTask()); 1251 l.add(null); 1252 try { 1253 e.invokeAll(l, randomTimeout(), randomTimeUnit()); 1254 shouldThrow(); 1255 } catch (NullPointerException success) {} 1256 } 1257 } 1258 1259 /** 1260 * get of element of invokeAll(c) throws exception on failed task 1261 */ 1262 public void testTimedInvokeAll4() throws Exception { 1263 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1264 try (PoolCleaner cleaner = cleaner(e)) { 1265 List<Callable<String>> l = new ArrayList<>(); 1266 l.add(new NPETask()); 1267 List<Future<String>> futures = 1268 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS); 1269 assertEquals(1, futures.size()); 1270 try { 1271 futures.get(0).get(); 1272 shouldThrow(); 1273 } catch (ExecutionException success) { 1274 assertTrue(success.getCause() instanceof NullPointerException); 1275 } 1276 } 1277 } 1278 1279 /** 1280 * timed invokeAll(c) returns results of all completed tasks 1281 */ 1282 public void testTimedInvokeAll5() throws Exception { 1283 final ExecutorService e = new ScheduledThreadPoolExecutor(2); 1284 try (PoolCleaner cleaner = cleaner(e)) { 1285 List<Callable<String>> l = new ArrayList<>(); 1286 l.add(new StringTask()); 1287 l.add(new StringTask()); 1288 List<Future<String>> futures = 1289 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS); 1290 assertEquals(2, futures.size()); 1291 for (Future<String> future : futures) 1292 assertSame(TEST_STRING, future.get()); 1293 } 1294 } 1295 1296 /** 1297 * timed invokeAll(c) cancels tasks not completed by timeout 1298 */ 1299 public void testTimedInvokeAll6() throws Exception { 1300 for (long timeout = timeoutMillis();;) { 1301 final CountDownLatch done = new CountDownLatch(1); 1302 final Callable<String> waiter = new CheckedCallable<String>() { 1303 public String realCall() { 1304 try { done.await(LONG_DELAY_MS, MILLISECONDS); } 1305 catch (InterruptedException ok) {} 1306 return "1"; }}; 1307 final ExecutorService p = new ScheduledThreadPoolExecutor(2); 1308 try (PoolCleaner cleaner = cleaner(p, done)) { 1309 List<Callable<String>> tasks = new ArrayList<>(); 1310 tasks.add(new StringTask("0")); 1311 tasks.add(waiter); 1312 tasks.add(new StringTask("2")); 1313 long startTime = System.nanoTime(); 1314 List<Future<String>> futures = 1315 p.invokeAll(tasks, timeout, MILLISECONDS); 1316 assertEquals(tasks.size(), futures.size()); 1317 assertTrue(millisElapsedSince(startTime) >= timeout); 1318 for (Future future : futures) 1319 assertTrue(future.isDone()); 1320 assertTrue(futures.get(1).isCancelled()); 1321 try { 1322 assertEquals("0", futures.get(0).get()); 1323 assertEquals("2", futures.get(2).get()); 1324 break; 1325 } catch (CancellationException retryWithLongerTimeout) { 1326 timeout *= 2; 1327 if (timeout >= LONG_DELAY_MS / 2) 1328 fail("expected exactly one task to be cancelled"); 1329 } 1330 } 1331 } 1332 } 1333 1334 /** 1335 * A fixed delay task with overflowing period should not prevent a 1336 * one-shot task from executing. 1337 * https://bugs.openjdk.java.net/browse/JDK-8051859 1338 */ 1339 @SuppressWarnings("FutureReturnValueIgnored") 1340 public void testScheduleWithFixedDelay_overflow() throws Exception { 1341 final CountDownLatch delayedDone = new CountDownLatch(1); 1342 final CountDownLatch immediateDone = new CountDownLatch(1); 1343 final ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1); 1344 try (PoolCleaner cleaner = cleaner(p)) { 1345 final Runnable delayed = () -> { 1346 delayedDone.countDown(); 1347 p.submit(() -> immediateDone.countDown()); 1348 }; 1349 p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS); 1350 await(delayedDone); 1351 await(immediateDone); 1352 } 1353 } 1354 1355 }