1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38 import java.util.concurrent.ArrayBlockingQueue; 39 import java.util.concurrent.Callable; 40 import java.util.concurrent.CompletionService; 41 import java.util.concurrent.CountDownLatch; 42 import java.util.concurrent.ExecutionException; 43 import java.util.concurrent.ExecutorCompletionService; 44 import java.util.concurrent.ExecutorService; 45 import java.util.concurrent.Future; 46 import java.util.concurrent.FutureTask; 47 import java.util.concurrent.RunnableFuture; 48 import java.util.concurrent.ThreadPoolExecutor; 49 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 52 import junit.framework.Test; 53 import junit.framework.TestSuite; 54 55 public class ExecutorCompletionServiceTest extends JSR166TestCase { 56 public static void main(String[] args) { 57 main(suite(), args); 58 } 59 public static Test suite() { 60 return new TestSuite(ExecutorCompletionServiceTest.class); 61 } 62 63 /** 64 * new ExecutorCompletionService(null) throws NullPointerException 65 */ 66 public void testConstructorNPE() { 67 try { 68 new ExecutorCompletionService(null); 69 shouldThrow(); 70 } catch (NullPointerException success) {} 71 } 72 73 /** 74 * new ExecutorCompletionService(e, null) throws NullPointerException 75 */ 76 public void testConstructorNPE2() { 77 try { 78 new ExecutorCompletionService(cachedThreadPool, null); 79 shouldThrow(); 80 } catch (NullPointerException success) {} 81 } 82 83 /** 84 * ecs.submit(null) throws NullPointerException 85 */ 86 public void testSubmitNullCallable() { 87 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 88 try { 89 cs.submit((Callable) null); 90 shouldThrow(); 91 } catch (NullPointerException success) {} 92 } 93 94 /** 95 * ecs.submit(null, val) throws NullPointerException 96 */ 97 public void testSubmitNullRunnable() { 98 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 99 try { 100 cs.submit((Runnable) null, Boolean.TRUE); 101 shouldThrow(); 102 } catch (NullPointerException success) {} 103 } 104 105 /** 106 * A taken submitted task is completed 107 */ 108 public void testTake() throws Exception { 109 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 110 cs.submit(new StringTask()); 111 Future f = cs.take(); 112 assertTrue(f.isDone()); 113 assertSame(TEST_STRING, f.get()); 114 } 115 116 /** 117 * Take returns the same future object returned by submit 118 */ 119 public void testTake2() throws InterruptedException { 120 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 121 Future f1 = cs.submit(new StringTask()); 122 Future f2 = cs.take(); 123 assertSame(f1, f2); 124 } 125 126 /** 127 * poll returns non-null when the returned task is completed 128 */ 129 public void testPoll1() throws Exception { 130 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 131 assertNull(cs.poll()); 132 cs.submit(new StringTask()); 133 134 long startTime = System.nanoTime(); 135 Future f; 136 while ((f = cs.poll()) == null) { 137 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 138 fail("timed out"); 139 Thread.yield(); 140 } 141 assertTrue(f.isDone()); 142 assertSame(TEST_STRING, f.get()); 143 } 144 145 /** 146 * timed poll returns non-null when the returned task is completed 147 */ 148 public void testPoll2() throws Exception { 149 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 150 assertNull(cs.poll()); 151 cs.submit(new StringTask()); 152 153 long startTime = System.nanoTime(); 154 Future f; 155 while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) { 156 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 157 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 158 fail("timed out"); 159 Thread.yield(); 160 } 161 assertTrue(f.isDone()); 162 assertSame(TEST_STRING, f.get()); 163 } 164 165 /** 166 * poll returns null before the returned task is completed 167 */ 168 public void testPollReturnsNullBeforeCompletion() throws Exception { 169 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 170 final CountDownLatch proceed = new CountDownLatch(1); 171 cs.submit(new Callable() { public String call() throws Exception { 172 await(proceed); 173 return TEST_STRING; 174 }}); 175 assertNull(cs.poll()); 176 assertNull(cs.poll(0L, MILLISECONDS)); 177 assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS)); 178 long startTime = System.nanoTime(); 179 assertNull(cs.poll(timeoutMillis(), MILLISECONDS)); 180 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 181 proceed.countDown(); 182 assertSame(TEST_STRING, cs.take().get()); 183 } 184 185 /** 186 * successful and failed tasks are both returned 187 */ 188 public void testTaskAssortment() throws Exception { 189 CompletionService cs = new ExecutorCompletionService(cachedThreadPool); 190 ArithmeticException ex = new ArithmeticException(); 191 final int rounds = 2; 192 for (int i = rounds; i--> 0; ) { 193 cs.submit(new StringTask()); 194 cs.submit(callableThrowing(ex)); 195 cs.submit(runnableThrowing(ex), null); 196 } 197 int normalCompletions = 0; 198 int exceptionalCompletions = 0; 199 for (int i = 3 * rounds; i--> 0; ) { 200 try { 201 assertSame(TEST_STRING, cs.take().get()); 202 normalCompletions++; 203 } catch (ExecutionException expected) { 204 assertSame(ex, expected.getCause()); 205 exceptionalCompletions++; 206 } 207 } 208 assertEquals(1 * rounds, normalCompletions); 209 assertEquals(2 * rounds, exceptionalCompletions); 210 assertNull(cs.poll()); 211 } 212 213 /** 214 * Submitting to underlying AES that overrides newTaskFor(Callable) 215 * returns and eventually runs Future returned by newTaskFor. 216 */ 217 public void testNewTaskForCallable() throws InterruptedException { 218 final AtomicBoolean done = new AtomicBoolean(false); 219 class MyCallableFuture<V> extends FutureTask<V> { 220 MyCallableFuture(Callable<V> c) { super(c); } 221 @Override protected void done() { done.set(true); } 222 } 223 final ExecutorService e = 224 new ThreadPoolExecutor(1, 1, 225 30L, TimeUnit.SECONDS, 226 new ArrayBlockingQueue<Runnable>(1)) { 227 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) { 228 return new MyCallableFuture<T>(c); 229 }}; 230 CompletionService<String> cs = new ExecutorCompletionService<>(e); 231 try (PoolCleaner cleaner = cleaner(e)) { 232 assertNull(cs.poll()); 233 Callable<String> c = new StringTask(); 234 Future f1 = cs.submit(c); 235 assertTrue("submit must return MyCallableFuture", 236 f1 instanceof MyCallableFuture); 237 Future f2 = cs.take(); 238 assertSame("submit and take must return same objects", f1, f2); 239 assertTrue("completed task must have set done", done.get()); 240 } 241 } 242 243 /** 244 * Submitting to underlying AES that overrides newTaskFor(Runnable,T) 245 * returns and eventually runs Future returned by newTaskFor. 246 */ 247 public void testNewTaskForRunnable() throws InterruptedException { 248 final AtomicBoolean done = new AtomicBoolean(false); 249 class MyRunnableFuture<V> extends FutureTask<V> { 250 MyRunnableFuture(Runnable t, V r) { super(t, r); } 251 @Override protected void done() { done.set(true); } 252 } 253 final ExecutorService e = 254 new ThreadPoolExecutor(1, 1, 255 30L, TimeUnit.SECONDS, 256 new ArrayBlockingQueue<Runnable>(1)) { 257 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) { 258 return new MyRunnableFuture<T>(t, r); 259 }}; 260 CompletionService<String> cs = new ExecutorCompletionService<>(e); 261 try (PoolCleaner cleaner = cleaner(e)) { 262 assertNull(cs.poll()); 263 Runnable r = new NoOpRunnable(); 264 Future f1 = cs.submit(r, null); 265 assertTrue("submit must return MyRunnableFuture", 266 f1 instanceof MyRunnableFuture); 267 Future f2 = cs.take(); 268 assertSame("submit and take must return same objects", f1, f2); 269 assertTrue("completed task must have set done", done.get()); 270 } 271 } 272 273 }