1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38 import java.util.concurrent.ArrayBlockingQueue; 39 import java.util.concurrent.Callable; 40 import java.util.concurrent.CompletionService; 41 import java.util.concurrent.CountDownLatch; 42 import java.util.concurrent.ExecutionException; 43 import java.util.concurrent.ExecutorCompletionService; 44 import java.util.concurrent.ExecutorService; 45 import java.util.concurrent.Future; 46 import java.util.concurrent.FutureTask; 47 import java.util.concurrent.RunnableFuture; 48 import java.util.concurrent.ThreadPoolExecutor; 49 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 52 import junit.framework.Test; 53 import junit.framework.TestSuite; 54 55 public class ExecutorCompletionServiceTest extends JSR166TestCase { 56 public static void main(String[] args) { 57 main(suite(), args); 58 } 59 public static Test suite() { 60 return new TestSuite(ExecutorCompletionServiceTest.class); 61 } 62 63 /** 64 * new ExecutorCompletionService(null) throws NullPointerException 65 */ 66 public void testConstructorNPE() { 67 try { 68 new ExecutorCompletionService(null); 69 shouldThrow(); 70 } catch (NullPointerException success) {} 71 } 72 73 /** 74 * new ExecutorCompletionService(e, null) throws NullPointerException 75 */ 76 public void testConstructorNPE2() { 77 try { 78 new ExecutorCompletionService(cachedThreadPool, null); 79 shouldThrow(); 80 } catch (NullPointerException success) {} 81 } 82 83 /** 84 * ecs.submit(null) throws NullPointerException 85 */ 86 public void testSubmitNullCallable() { 87 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 88 try { 89 cs.submit((Callable) null); 90 shouldThrow(); 91 } catch (NullPointerException success) {} 92 } 93 94 /** 95 * ecs.submit(null, val) throws NullPointerException 96 */ 97 public void testSubmitNullRunnable() { 98 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 99 try { 100 cs.submit((Runnable) null, Boolean.TRUE); 101 shouldThrow(); 102 } catch (NullPointerException success) {} 103 } 104 105 /** 106 * A taken submitted task is completed 107 */ 108 public void testTake() 109 throws InterruptedException, ExecutionException { 110 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 111 cs.submit(new StringTask()); 112 Future f = cs.take(); 113 assertTrue(f.isDone()); 114 assertSame(TEST_STRING, f.get()); 115 } 116 117 /** 118 * Take returns the same future object returned by submit 119 */ 120 public void testTake2() throws InterruptedException { 121 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 122 Future f1 = cs.submit(new StringTask()); 123 Future f2 = cs.take(); 124 assertSame(f1, f2); 125 } 126 127 /** 128 * poll returns non-null when the returned task is completed 129 */ 130 public void testPoll1() 131 throws InterruptedException, ExecutionException { 132 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 133 assertNull(cs.poll()); 134 cs.submit(new StringTask()); 135 136 long startTime = System.nanoTime(); 137 Future f; 138 while ((f = cs.poll()) == null) { 139 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 140 fail("timed out"); 141 Thread.yield(); 142 } 143 assertTrue(f.isDone()); 144 assertSame(TEST_STRING, f.get()); 145 } 146 147 /** 148 * timed poll returns non-null when the returned task is completed 149 */ 150 public void testPoll2() 151 throws InterruptedException, ExecutionException { 152 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 153 assertNull(cs.poll()); 154 cs.submit(new StringTask()); 155 156 long startTime = System.nanoTime(); 157 Future f; 158 while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) { 159 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 160 fail("timed out"); 161 Thread.yield(); 162 } 163 assertTrue(f.isDone()); 164 assertSame(TEST_STRING, f.get()); 165 } 166 167 /** 168 * poll returns null before the returned task is completed 169 */ 170 public void testPollReturnsNull() 171 throws InterruptedException, ExecutionException { 172 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 173 final CountDownLatch proceed = new CountDownLatch(1); 174 cs.submit(new Callable() { public String call() throws Exception { 175 await(proceed); 176 return TEST_STRING; 177 }}); 178 assertNull(cs.poll()); 179 assertNull(cs.poll(0L, MILLISECONDS)); 180 assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS)); 181 long startTime = System.nanoTime(); 182 assertNull(cs.poll(timeoutMillis(), MILLISECONDS)); 183 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 184 proceed.countDown(); 185 assertSame(TEST_STRING, cs.take().get()); 186 } 187 188 /** 189 * successful and failed tasks are both returned 190 */ 191 public void testTaskAssortment() 192 throws InterruptedException, ExecutionException { 193 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 194 ArithmeticException ex = new ArithmeticException(); 195 for (int i = 0; i < 2; i++) { 196 cs.submit(new StringTask()); 197 cs.submit(callableThrowing(ex)); 198 cs.submit(runnableThrowing(ex), null); 199 } 200 int normalCompletions = 0; 201 int exceptionalCompletions = 0; 202 for (int i = 0; i < 3 * 2; i++) { 203 try { 204 if (cs.take().get() == TEST_STRING) 205 normalCompletions++; 206 } 207 catch (ExecutionException expected) { 208 assertTrue(expected.getCause() instanceof ArithmeticException); 209 exceptionalCompletions++; 210 } 211 } 212 assertEquals(2 * 1, normalCompletions); 213 assertEquals(2 * 2, exceptionalCompletions); 214 assertNull(cs.poll()); 215 } 216 217 /** 218 * Submitting to underlying AES that overrides newTaskFor(Callable) 219 * returns and eventually runs Future returned by newTaskFor. 220 */ 221 public void testNewTaskForCallable() throws InterruptedException { 222 final AtomicBoolean done = new AtomicBoolean(false); 223 class MyCallableFuture<V> extends FutureTask<V> { 224 MyCallableFuture(Callable<V> c) { super(c); } 225 @Override protected void done() { done.set(true); } 226 } 227 final ExecutorService e = 228 new ThreadPoolExecutor(1, 1, 229 30L, TimeUnit.SECONDS, 230 new ArrayBlockingQueue<Runnable>(1)) { 231 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) { 232 return new MyCallableFuture<T>(c); 233 }}; 234 CompletionService<String> cs = new ExecutorCompletionService<>(e); 235 try (PoolCleaner cleaner = cleaner(e)) { 236 assertNull(cs.poll()); 237 Callable<String> c = new StringTask(); 238 Future f1 = cs.submit(c); 239 assertTrue("submit must return MyCallableFuture", 240 f1 instanceof MyCallableFuture); 241 Future f2 = cs.take(); 242 assertSame("submit and take must return same objects", f1, f2); 243 assertTrue("completed task must have set done", done.get()); 244 } 245 } 246 247 /** 248 * Submitting to underlying AES that overrides newTaskFor(Runnable,T) 249 * returns and eventually runs Future returned by newTaskFor. 250 */ 251 public void testNewTaskForRunnable() throws InterruptedException { 252 final AtomicBoolean done = new AtomicBoolean(false); 253 class MyRunnableFuture<V> extends FutureTask<V> { 254 MyRunnableFuture(Runnable t, V r) { super(t, r); } 255 @Override protected void done() { done.set(true); } 256 } 257 final ExecutorService e = 258 new ThreadPoolExecutor(1, 1, 259 30L, TimeUnit.SECONDS, 260 new ArrayBlockingQueue<Runnable>(1)) { 261 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) { 262 return new MyRunnableFuture<T>(t, r); 263 }}; 264 CompletionService<String> cs = new ExecutorCompletionService<>(e); 265 try (PoolCleaner cleaner = cleaner(e)) { 266 assertNull(cs.poll()); 267 Runnable r = new NoOpRunnable(); 268 Future f1 = cs.submit(r, null); 269 assertTrue("submit must return MyRunnableFuture", 270 f1 instanceof MyRunnableFuture); 271 Future f2 = cs.take(); 272 assertSame("submit and take must return same objects", f1, f2); 273 assertTrue("completed task must have set done", done.get()); 274 } 275 } 276 277 }