88 try {
89 cs.submit((Callable) null);
90 shouldThrow();
91 } catch (NullPointerException success) {}
92 }
93
94 /**
95 * ecs.submit(null, val) throws NullPointerException
96 */
97 public void testSubmitNullRunnable() {
98 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
99 try {
100 cs.submit((Runnable) null, Boolean.TRUE);
101 shouldThrow();
102 } catch (NullPointerException success) {}
103 }
104
105 /**
106 * A taken submitted task is completed
107 */
108 public void testTake()
109 throws InterruptedException, ExecutionException {
110 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
111 cs.submit(new StringTask());
112 Future f = cs.take();
113 assertTrue(f.isDone());
114 assertSame(TEST_STRING, f.get());
115 }
116
117 /**
118 * Take returns the same future object returned by submit
119 */
120 public void testTake2() throws InterruptedException {
121 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
122 Future f1 = cs.submit(new StringTask());
123 Future f2 = cs.take();
124 assertSame(f1, f2);
125 }
126
127 /**
128 * poll returns non-null when the returned task is completed
129 */
130 public void testPoll1()
131 throws InterruptedException, ExecutionException {
132 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
133 assertNull(cs.poll());
134 cs.submit(new StringTask());
135
136 long startTime = System.nanoTime();
137 Future f;
138 while ((f = cs.poll()) == null) {
139 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
140 fail("timed out");
141 Thread.yield();
142 }
143 assertTrue(f.isDone());
144 assertSame(TEST_STRING, f.get());
145 }
146
147 /**
148 * timed poll returns non-null when the returned task is completed
149 */
150 public void testPoll2()
151 throws InterruptedException, ExecutionException {
152 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
153 assertNull(cs.poll());
154 cs.submit(new StringTask());
155
156 long startTime = System.nanoTime();
157 Future f;
158 while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
159 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
160 fail("timed out");
161 Thread.yield();
162 }
163 assertTrue(f.isDone());
164 assertSame(TEST_STRING, f.get());
165 }
166
167 /**
168 * poll returns null before the returned task is completed
169 */
170 public void testPollReturnsNull()
171 throws InterruptedException, ExecutionException {
172 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
173 final CountDownLatch proceed = new CountDownLatch(1);
174 cs.submit(new Callable() { public String call() throws Exception {
175 await(proceed);
176 return TEST_STRING;
177 }});
178 assertNull(cs.poll());
179 assertNull(cs.poll(0L, MILLISECONDS));
180 assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
181 long startTime = System.nanoTime();
182 assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
183 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
184 proceed.countDown();
185 assertSame(TEST_STRING, cs.take().get());
186 }
187
188 /**
189 * successful and failed tasks are both returned
190 */
191 public void testTaskAssortment()
192 throws InterruptedException, ExecutionException {
193 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
194 ArithmeticException ex = new ArithmeticException();
195 for (int i = 0; i < 2; i++) {
196 cs.submit(new StringTask());
197 cs.submit(callableThrowing(ex));
198 cs.submit(runnableThrowing(ex), null);
199 }
200 int normalCompletions = 0;
201 int exceptionalCompletions = 0;
202 for (int i = 0; i < 3 * 2; i++) {
203 try {
204 if (cs.take().get() == TEST_STRING)
205 normalCompletions++;
206 }
207 catch (ExecutionException expected) {
208 assertTrue(expected.getCause() instanceof ArithmeticException);
209 exceptionalCompletions++;
210 }
211 }
212 assertEquals(2 * 1, normalCompletions);
213 assertEquals(2 * 2, exceptionalCompletions);
214 assertNull(cs.poll());
215 }
216
217 /**
218 * Submitting to underlying AES that overrides newTaskFor(Callable)
219 * returns and eventually runs Future returned by newTaskFor.
220 */
221 public void testNewTaskForCallable() throws InterruptedException {
222 final AtomicBoolean done = new AtomicBoolean(false);
223 class MyCallableFuture<V> extends FutureTask<V> {
224 MyCallableFuture(Callable<V> c) { super(c); }
225 @Override protected void done() { done.set(true); }
226 }
227 final ExecutorService e =
228 new ThreadPoolExecutor(1, 1,
229 30L, TimeUnit.SECONDS,
230 new ArrayBlockingQueue<Runnable>(1)) {
231 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
232 return new MyCallableFuture<T>(c);
233 }};
|
88 try {
89 cs.submit((Callable) null);
90 shouldThrow();
91 } catch (NullPointerException success) {}
92 }
93
94 /**
95 * ecs.submit(null, val) throws NullPointerException
96 */
97 public void testSubmitNullRunnable() {
98 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
99 try {
100 cs.submit((Runnable) null, Boolean.TRUE);
101 shouldThrow();
102 } catch (NullPointerException success) {}
103 }
104
105 /**
106 * A taken submitted task is completed
107 */
108 public void testTake() throws Exception {
109 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
110 cs.submit(new StringTask());
111 Future f = cs.take();
112 assertTrue(f.isDone());
113 assertSame(TEST_STRING, f.get());
114 }
115
116 /**
117 * Take returns the same future object returned by submit
118 */
119 public void testTake2() throws InterruptedException {
120 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
121 Future f1 = cs.submit(new StringTask());
122 Future f2 = cs.take();
123 assertSame(f1, f2);
124 }
125
126 /**
127 * poll returns non-null when the returned task is completed
128 */
129 public void testPoll1() throws Exception {
130 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
131 assertNull(cs.poll());
132 cs.submit(new StringTask());
133
134 long startTime = System.nanoTime();
135 Future f;
136 while ((f = cs.poll()) == null) {
137 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
138 fail("timed out");
139 Thread.yield();
140 }
141 assertTrue(f.isDone());
142 assertSame(TEST_STRING, f.get());
143 }
144
145 /**
146 * timed poll returns non-null when the returned task is completed
147 */
148 public void testPoll2() throws Exception {
149 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
150 assertNull(cs.poll());
151 cs.submit(new StringTask());
152
153 long startTime = System.nanoTime();
154 Future f;
155 while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) {
156 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
157 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
158 fail("timed out");
159 Thread.yield();
160 }
161 assertTrue(f.isDone());
162 assertSame(TEST_STRING, f.get());
163 }
164
165 /**
166 * poll returns null before the returned task is completed
167 */
168 public void testPollReturnsNullBeforeCompletion() throws Exception {
169 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
170 final CountDownLatch proceed = new CountDownLatch(1);
171 cs.submit(new Callable() { public String call() throws Exception {
172 await(proceed);
173 return TEST_STRING;
174 }});
175 assertNull(cs.poll());
176 assertNull(cs.poll(0L, MILLISECONDS));
177 assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
178 long startTime = System.nanoTime();
179 assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
180 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
181 proceed.countDown();
182 assertSame(TEST_STRING, cs.take().get());
183 }
184
185 /**
186 * successful and failed tasks are both returned
187 */
188 public void testTaskAssortment() throws Exception {
189 CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
190 ArithmeticException ex = new ArithmeticException();
191 final int rounds = 2;
192 for (int i = rounds; i--> 0; ) {
193 cs.submit(new StringTask());
194 cs.submit(callableThrowing(ex));
195 cs.submit(runnableThrowing(ex), null);
196 }
197 int normalCompletions = 0;
198 int exceptionalCompletions = 0;
199 for (int i = 3 * rounds; i--> 0; ) {
200 try {
201 assertSame(TEST_STRING, cs.take().get());
202 normalCompletions++;
203 } catch (ExecutionException expected) {
204 assertSame(ex, expected.getCause());
205 exceptionalCompletions++;
206 }
207 }
208 assertEquals(1 * rounds, normalCompletions);
209 assertEquals(2 * rounds, exceptionalCompletions);
210 assertNull(cs.poll());
211 }
212
213 /**
214 * Submitting to underlying AES that overrides newTaskFor(Callable)
215 * returns and eventually runs Future returned by newTaskFor.
216 */
217 public void testNewTaskForCallable() throws InterruptedException {
218 final AtomicBoolean done = new AtomicBoolean(false);
219 class MyCallableFuture<V> extends FutureTask<V> {
220 MyCallableFuture(Callable<V> c) { super(c); }
221 @Override protected void done() { done.set(true); }
222 }
223 final ExecutorService e =
224 new ThreadPoolExecutor(1, 1,
225 30L, TimeUnit.SECONDS,
226 new ArrayBlockingQueue<Runnable>(1)) {
227 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
228 return new MyCallableFuture<T>(c);
229 }};
|