67 static void check(boolean condition) {
68 check(condition, "Assertion failure");
69 }
70
71 static long secondsElapsedSince(long startTime) {
72 return NANOSECONDS.toSeconds(System.nanoTime() - startTime);
73 }
74
75 static void awaitInterrupt(long timeoutSeconds) {
76 long startTime = System.nanoTime();
77 try {
78 Thread.sleep(SECONDS.toMillis(timeoutSeconds));
79 fail("timed out waiting for interrupt");
80 } catch (InterruptedException expected) {
81 check(secondsElapsedSince(startTime) < timeoutSeconds);
82 }
83 }
84
85 public static void main(String[] args) {
86 try {
87 testInvokeAll();
88 testInvokeAny();
89 testInvokeAny_cancellationInterrupt();
90 } catch (Throwable t) { unexpected(t); }
91
92 if (failed > 0)
93 throw new Error(
94 String.format("Passed = %d, failed = %d", passed, failed));
95 }
96
97 static final long timeoutSeconds = 10L;
98
99 static void testInvokeAll() throws Throwable {
100 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
101 final int nThreads = rnd.nextInt(2, 7);
102 final boolean timed = rnd.nextBoolean();
103 final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
104 final AtomicLong count = new AtomicLong(0);
105 class Task implements Callable<Long> {
106 public Long call() throws Exception {
107 return count.incrementAndGet();
108 }
109 }
110
111 try {
112 final List<Task> tasks =
113 IntStream.range(0, nThreads)
114 .mapToObj(i -> new Task())
115 .collect(Collectors.toList());
116
117 List<Future<Long>> futures;
118 if (timed) {
119 long startTime = System.nanoTime();
120 futures = pool.invokeAll(tasks, timeoutSeconds, SECONDS);
121 check(secondsElapsedSince(startTime) < timeoutSeconds);
122 }
123 else
124 futures = pool.invokeAll(tasks);
125 check(futures.size() == tasks.size());
126 check(count.get() == tasks.size());
127
128 long gauss = 0;
129 for (Future<Long> future : futures) gauss += future.get();
130 check(gauss == (tasks.size()+1)*tasks.size()/2);
131
132 pool.shutdown();
133 check(pool.awaitTermination(10L, SECONDS));
134 } finally {
135 pool.shutdownNow();
136 }
137 }
138
139 static void testInvokeAny() throws Throwable {
140 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
141 final boolean timed = rnd.nextBoolean();
142 final ExecutorService pool = Executors.newSingleThreadExecutor();
143 final AtomicLong count = new AtomicLong(0);
144 final CountDownLatch invokeAnyDone = new CountDownLatch(1);
145 class Task implements Callable<Long> {
146 public Long call() throws Exception {
147 long x = count.incrementAndGet();
148 check(x <= 2);
149 if (x == 2) {
150 // wait for main thread to interrupt us ...
151 awaitInterrupt(timeoutSeconds);
152 // ... and then for invokeAny to return
153 check(invokeAnyDone.await(timeoutSeconds, SECONDS));
154 }
155 return x;
156 }
157 }
158
159 try {
160 final List<Task> tasks =
161 IntStream.range(0, rnd.nextInt(1, 7))
162 .mapToObj(i -> new Task())
163 .collect(Collectors.toList());
164
165 long val;
166 if (timed) {
167 long startTime = System.nanoTime();
168 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
169 check(secondsElapsedSince(startTime) < timeoutSeconds);
170 }
171 else
172 val = pool.invokeAny(tasks);
173 check(val == 1);
174 invokeAnyDone.countDown();
175
176 // inherent race between main thread interrupt and
177 // start of second task
178 check(count.get() == 1 || count.get() == 2);
179
180 pool.shutdown();
181 check(pool.awaitTermination(timeoutSeconds, SECONDS));
182 } finally {
183 pool.shutdownNow();
184 }
185 }
186
187 /**
188 * Every remaining running task is sent an interrupt for cancellation.
189 */
190 static void testInvokeAny_cancellationInterrupt() throws Throwable {
191 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
192 final int nThreads = rnd.nextInt(2, 7);
193 final boolean timed = rnd.nextBoolean();
194 final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
195 final AtomicLong count = new AtomicLong(0);
196 final AtomicLong interruptedCount = new AtomicLong(0);
197 final CyclicBarrier allStarted = new CyclicBarrier(nThreads);
198 class Task implements Callable<Long> {
199 public Long call() throws Exception {
200 allStarted.await();
201 long x = count.incrementAndGet();
202 if (x > 1)
203 // main thread will interrupt us
204 awaitInterrupt(timeoutSeconds);
205 return x;
206 }
207 }
208
209 try {
210 final List<Task> tasks =
211 IntStream.range(0, nThreads)
212 .mapToObj(i -> new Task())
213 .collect(Collectors.toList());
214
215 long val;
216 if (timed) {
217 long startTime = System.nanoTime();
218 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
219 check(secondsElapsedSince(startTime) < timeoutSeconds);
220 }
221 else
|
67 static void check(boolean condition) {
68 check(condition, "Assertion failure");
69 }
70
71 static long secondsElapsedSince(long startTime) {
72 return NANOSECONDS.toSeconds(System.nanoTime() - startTime);
73 }
74
75 static void awaitInterrupt(long timeoutSeconds) {
76 long startTime = System.nanoTime();
77 try {
78 Thread.sleep(SECONDS.toMillis(timeoutSeconds));
79 fail("timed out waiting for interrupt");
80 } catch (InterruptedException expected) {
81 check(secondsElapsedSince(startTime) < timeoutSeconds);
82 }
83 }
84
85 public static void main(String[] args) {
86 try {
87 for (int nThreads = 1; nThreads <= 6; ++nThreads) {
88 // untimed
89 testInvokeAll(nThreads, false);
90 testInvokeAny(nThreads, false);
91 testInvokeAny_cancellationInterrupt(nThreads, false);
92 // timed
93 testInvokeAll(nThreads, true);
94 testInvokeAny(nThreads, true);
95 testInvokeAny_cancellationInterrupt(nThreads, true);
96 }
97 } catch (Throwable t) { unexpected(t); }
98
99 if (failed > 0)
100 throw new Error(
101 String.format("Passed = %d, failed = %d", passed, failed));
102 }
103
104 static final long timeoutSeconds = 10L;
105
106 static void testInvokeAll(int nThreads, boolean timed) throws Throwable {
107 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
108 final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
109 final AtomicLong count = new AtomicLong(0);
110 class Task implements Callable<Long> {
111 public Long call() throws Exception {
112 return count.incrementAndGet();
113 }
114 }
115
116 try {
117 final List<Task> tasks =
118 IntStream.range(0, nThreads)
119 .mapToObj(i -> new Task())
120 .collect(Collectors.toList());
121
122 List<Future<Long>> futures;
123 if (timed) {
124 long startTime = System.nanoTime();
125 futures = pool.invokeAll(tasks, timeoutSeconds, SECONDS);
126 check(secondsElapsedSince(startTime) < timeoutSeconds);
127 }
128 else
129 futures = pool.invokeAll(tasks);
130 check(futures.size() == tasks.size());
131 check(count.get() == tasks.size());
132
133 long gauss = 0;
134 for (Future<Long> future : futures) gauss += future.get();
135 check(gauss == (tasks.size()+1)*tasks.size()/2);
136
137 pool.shutdown();
138 check(pool.awaitTermination(10L, SECONDS));
139 } finally {
140 pool.shutdownNow();
141 }
142 }
143
144 static void testInvokeAny(int nThreads, boolean timed) throws Throwable {
145 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
146 final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
147 final AtomicLong count = new AtomicLong(0);
148 final CountDownLatch invokeAnyDone = new CountDownLatch(1);
149 class Task implements Callable<Long> {
150 public Long call() throws Exception {
151 long x = count.incrementAndGet();
152 if (x > 1) {
153 // wait for main thread to interrupt us ...
154 awaitInterrupt(timeoutSeconds);
155 // ... and then for invokeAny to return
156 check(invokeAnyDone.await(timeoutSeconds, SECONDS));
157 }
158 return x;
159 }
160 }
161
162 try {
163 final List<Task> tasks =
164 IntStream.range(0, rnd.nextInt(1, 7))
165 .mapToObj(i -> new Task())
166 .collect(Collectors.toList());
167
168 long val;
169 if (timed) {
170 long startTime = System.nanoTime();
171 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
172 check(secondsElapsedSince(startTime) < timeoutSeconds);
173 }
174 else
175 val = pool.invokeAny(tasks);
176 check(val == 1);
177 invokeAnyDone.countDown();
178
179 pool.shutdown();
180 check(pool.awaitTermination(timeoutSeconds, SECONDS));
181
182 long c = count.get();
183 check(c >= 1 && c <= tasks.size());
184
185 } finally {
186 pool.shutdownNow();
187 }
188 }
189
190 /**
191 * Every remaining running task is sent an interrupt for cancellation.
192 */
193 static void testInvokeAny_cancellationInterrupt(int nThreads, boolean timed) throws Throwable {
194 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
195 final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
196 final AtomicLong count = new AtomicLong(0);
197 final AtomicLong interruptedCount = new AtomicLong(0);
198 final CyclicBarrier allStarted = new CyclicBarrier(nThreads);
199 class Task implements Callable<Long> {
200 public Long call() throws Exception {
201 long x = count.incrementAndGet();
202 allStarted.await();
203 if (x > 1)
204 // main thread will interrupt us
205 awaitInterrupt(timeoutSeconds);
206 return x;
207 }
208 }
209
210 try {
211 final List<Task> tasks =
212 IntStream.range(0, nThreads)
213 .mapToObj(i -> new Task())
214 .collect(Collectors.toList());
215
216 long val;
217 if (timed) {
218 long startTime = System.nanoTime();
219 val = pool.invokeAny(tasks, timeoutSeconds, SECONDS);
220 check(secondsElapsedSince(startTime) < timeoutSeconds);
221 }
222 else
|