< prev index next >

test/jdk/java/util/concurrent/ExecutorService/Invoke.java

Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2021-01
Reviewed-by: martin


  67     static void check(boolean condition) {
  68         check(condition, "Assertion failure");
  69     }
  70 
  71     static long secondsElapsedSince(long startTime) {
  72         return NANOSECONDS.toSeconds(System.nanoTime() - startTime);
  73     }
  74 
  75     static void awaitInterrupt(long timeoutSeconds) {
  76         long startTime = System.nanoTime();
  77         try {
  78             Thread.sleep(SECONDS.toMillis(timeoutSeconds));
  79             fail("timed out waiting for interrupt");
  80         } catch (InterruptedException expected) {
  81             check(secondsElapsedSince(startTime) < timeoutSeconds);
  82         }
  83     }
  84 
  85     public static void main(String[] args) {
  86         try {
  87             testInvokeAll();
  88             testInvokeAny();
  89             testInvokeAny_cancellationInterrupt();







  90         } catch (Throwable t) {  unexpected(t); }
  91 
  92         if (failed > 0)
  93             throw new Error(
  94                     String.format("Passed = %d, failed = %d", passed, failed));
  95     }
  96 
  97     static final long timeoutSeconds = 10L;
  98 
  99     static void testInvokeAll() throws Throwable {
 100         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
 101         final int nThreads = rnd.nextInt(2, 7);
 102         final boolean timed = rnd.nextBoolean();
 103         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
 104         final AtomicLong count = new AtomicLong(0);
 105         class Task implements Callable<Long> {
 106             public Long call() throws Exception {
 107                 return count.incrementAndGet();
 108             }
 109         }
 110 
 111         try {
 112             final List<Task> tasks =
 113                 IntStream.range(0, nThreads)
 114                 .mapToObj(i -> new Task())
 115                 .collect(Collectors.toList());
 116 
 117             List<Future<Long>> futures;
 118             if (timed) {
 119                 long startTime = System.nanoTime();
 120                 futures = pool.invokeAll(tasks, timeoutSeconds, SECONDS);
 121                 check(secondsElapsedSince(startTime) < timeoutSeconds);
 122             }
 123             else
 124                 futures = pool.invokeAll(tasks);
 125             check(futures.size() == tasks.size());
 126             check(count.get() == tasks.size());
 127 
 128             long gauss = 0;
 129             for (Future<Long> future : futures) gauss += future.get();
 130             check(gauss == (tasks.size()+1)*tasks.size()/2);
 131 
 132             pool.shutdown();
 133             check(pool.awaitTermination(10L, SECONDS));
 134         } finally {
 135             pool.shutdownNow();
 136         }
 137     }
 138 
 139     static void testInvokeAny() throws Throwable {
 140         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
 141         final boolean timed = rnd.nextBoolean();
 142         final ExecutorService pool = Executors.newSingleThreadExecutor();
 143         final AtomicLong count = new AtomicLong(0);
 144         final CountDownLatch invokeAnyDone = new CountDownLatch(1);
 145         class Task implements Callable<Long> {
 146             public Long call() throws Exception {
 147                 long x = count.incrementAndGet();
 148                 check(x <= 2);
 149                 if (x == 2) {
 150                     // wait for main thread to interrupt us ...
 151                     awaitInterrupt(timeoutSeconds);
 152                     // ... and then for invokeAny to return
 153                     check(invokeAnyDone.await(timeoutSeconds, SECONDS));
 154                 }
 155                 return x;
 156             }
 157         }
 158 
 159         try {
 160             final List<Task> tasks =
 161                 IntStream.range(0, rnd.nextInt(1, 7))
 162                 .mapToObj(i -> new Task())
 163                 .collect(Collectors.toList());
 164 
 165             long val;
 166             if (timed) {
 167                 long startTime = System.nanoTime();
 168                 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
 169                 check(secondsElapsedSince(startTime) < timeoutSeconds);
 170             }
 171             else
 172                 val = pool.invokeAny(tasks);
 173             check(val == 1);
 174             invokeAnyDone.countDown();
 175 
 176             // inherent race between main thread interrupt and
 177             // start of second task
 178             check(count.get() == 1 || count.get() == 2);
 179 
 180             pool.shutdown();
 181             check(pool.awaitTermination(timeoutSeconds, SECONDS));




 182         } finally {
 183             pool.shutdownNow();
 184         }
 185     }
 186 
 187     /**
 188      * Every remaining running task is sent an interrupt for cancellation.
 189      */
 190     static void testInvokeAny_cancellationInterrupt() throws Throwable {
 191         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
 192         final int nThreads = rnd.nextInt(2, 7);
 193         final boolean timed = rnd.nextBoolean();
 194         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
 195         final AtomicLong count = new AtomicLong(0);
 196         final AtomicLong interruptedCount = new AtomicLong(0);
 197         final CyclicBarrier allStarted = new CyclicBarrier(nThreads);
 198         class Task implements Callable<Long> {
 199             public Long call() throws Exception {
 200                 allStarted.await();
 201                 long x = count.incrementAndGet();

 202                 if (x > 1)
 203                     // main thread will interrupt us
 204                     awaitInterrupt(timeoutSeconds);
 205                 return x;
 206             }
 207         }
 208 
 209         try {
 210             final List<Task> tasks =
 211                 IntStream.range(0, nThreads)
 212                 .mapToObj(i -> new Task())
 213                 .collect(Collectors.toList());
 214 
 215             long val;
 216             if (timed) {
 217                 long startTime = System.nanoTime();
 218                 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
 219                 check(secondsElapsedSince(startTime) < timeoutSeconds);
 220             }
 221             else


  67     static void check(boolean condition) {
  68         check(condition, "Assertion failure");
  69     }
  70 
  71     static long secondsElapsedSince(long startTime) {
  72         return NANOSECONDS.toSeconds(System.nanoTime() - startTime);
  73     }
  74 
  75     static void awaitInterrupt(long timeoutSeconds) {
  76         long startTime = System.nanoTime();
  77         try {
  78             Thread.sleep(SECONDS.toMillis(timeoutSeconds));
  79             fail("timed out waiting for interrupt");
  80         } catch (InterruptedException expected) {
  81             check(secondsElapsedSince(startTime) < timeoutSeconds);
  82         }
  83     }
  84 
  85     public static void main(String[] args) {
  86         try {
  87             for (int nThreads = 1; nThreads <= 6; ++nThreads) {
  88                 // untimed
  89                 testInvokeAll(nThreads, false);
  90                 testInvokeAny(nThreads, false);
  91                 testInvokeAny_cancellationInterrupt(nThreads, false);
  92                 // timed
  93                 testInvokeAll(nThreads, true);
  94                 testInvokeAny(nThreads, true);
  95                 testInvokeAny_cancellationInterrupt(nThreads, true);
  96             }
  97         } catch (Throwable t) {  unexpected(t); }
  98 
  99         if (failed > 0)
 100             throw new Error(
 101                     String.format("Passed = %d, failed = %d", passed, failed));
 102     }
 103 
 104     static final long timeoutSeconds = 10L;
 105 
 106     static void testInvokeAll(int nThreads, boolean timed) throws Throwable {
 107         final ThreadLocalRandom rnd = ThreadLocalRandom.current();


 108         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
 109         final AtomicLong count = new AtomicLong(0);
 110         class Task implements Callable<Long> {
 111             public Long call() throws Exception {
 112                 return count.incrementAndGet();
 113             }
 114         }
 115 
 116         try {
 117             final List<Task> tasks =
 118                 IntStream.range(0, nThreads)
 119                 .mapToObj(i -> new Task())
 120                 .collect(Collectors.toList());
 121 
 122             List<Future<Long>> futures;
 123             if (timed) {
 124                 long startTime = System.nanoTime();
 125                 futures = pool.invokeAll(tasks, timeoutSeconds, SECONDS);
 126                 check(secondsElapsedSince(startTime) < timeoutSeconds);
 127             }
 128             else
 129                 futures = pool.invokeAll(tasks);
 130             check(futures.size() == tasks.size());
 131             check(count.get() == tasks.size());
 132 
 133             long gauss = 0;
 134             for (Future<Long> future : futures) gauss += future.get();
 135             check(gauss == (tasks.size()+1)*tasks.size()/2);
 136 
 137             pool.shutdown();
 138             check(pool.awaitTermination(10L, SECONDS));
 139         } finally {
 140             pool.shutdownNow();
 141         }
 142     }
 143 
 144     static void testInvokeAny(int nThreads, boolean timed) throws Throwable {
 145         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
 146         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);

 147         final AtomicLong count = new AtomicLong(0);
 148         final CountDownLatch invokeAnyDone = new CountDownLatch(1);
 149         class Task implements Callable<Long> {
 150             public Long call() throws Exception {
 151                 long x = count.incrementAndGet();
 152                 if (x > 1) {

 153                     // wait for main thread to interrupt us ...
 154                     awaitInterrupt(timeoutSeconds);
 155                     // ... and then for invokeAny to return
 156                     check(invokeAnyDone.await(timeoutSeconds, SECONDS));
 157                 }
 158                 return x;
 159             }
 160         }
 161 
 162         try {
 163             final List<Task> tasks =
 164                 IntStream.range(0, rnd.nextInt(1, 7))
 165                 .mapToObj(i -> new Task())
 166                 .collect(Collectors.toList());
 167 
 168             long val;
 169             if (timed) {
 170                 long startTime = System.nanoTime();
 171                 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
 172                 check(secondsElapsedSince(startTime) < timeoutSeconds);
 173             }
 174             else
 175                 val = pool.invokeAny(tasks);
 176             check(val == 1);
 177             invokeAnyDone.countDown();
 178 




 179             pool.shutdown();
 180             check(pool.awaitTermination(timeoutSeconds, SECONDS));
 181 
 182             long c = count.get();
 183             check(c >= 1 && c <= tasks.size());
 184 
 185         } finally {
 186             pool.shutdownNow();
 187         }
 188     }
 189 
 190     /**
 191      * Every remaining running task is sent an interrupt for cancellation.
 192      */
 193     static void testInvokeAny_cancellationInterrupt(int nThreads, boolean timed) throws Throwable {
 194         final ThreadLocalRandom rnd = ThreadLocalRandom.current();


 195         final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
 196         final AtomicLong count = new AtomicLong(0);
 197         final AtomicLong interruptedCount = new AtomicLong(0);
 198         final CyclicBarrier allStarted = new CyclicBarrier(nThreads);
 199         class Task implements Callable<Long> {
 200             public Long call() throws Exception {

 201                 long x = count.incrementAndGet();
 202                 allStarted.await();
 203                 if (x > 1)
 204                     // main thread will interrupt us
 205                     awaitInterrupt(timeoutSeconds);
 206                 return x;
 207             }
 208         }
 209 
 210         try {
 211             final List<Task> tasks =
 212                 IntStream.range(0, nThreads)
 213                 .mapToObj(i -> new Task())
 214                 .collect(Collectors.toList());
 215 
 216             long val;
 217             if (timed) {
 218                 long startTime = System.nanoTime();
 219                 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
 220                 check(secondsElapsedSince(startTime) < timeoutSeconds);
 221             }
 222             else
< prev index next >