1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.
   7  *
   8  * This code is distributed in the hope that it will be useful, but WITHOUT
   9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  11  * version 2 for more details (a copy is included in the LICENSE file that
  12  * accompanied this code).
  13  *
  14  * You should have received a copy of the GNU General Public License version
  15  * 2 along with this work; if not, write to the Free Software Foundation,
  16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  17  *
  18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  19  * or visit www.oracle.com if you need additional information or have any
  20  * questions.
  21  */
  22 
  23 /*
  24  * This file is available under and governed by the GNU General Public
  25  * License version 2 only, as published by the Free Software Foundation.
  26  * However, the following notice accompanied the original version of this
  27  * file:
  28  *
  29  * Written by Doug Lea with assistance from members of JCP JSR-166
  30  * Expert Group and released to the public domain, as explained at
  31  * http://creativecommons.org/publicdomain/zero/1.0/
  32  * Other contributors include Andrew Wright, Jeffrey Hayes,
  33  * Pat Fisher, Mike Judd.
  34  */
  35 
  36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  37 
  38 import java.util.concurrent.ArrayBlockingQueue;
  39 import java.util.concurrent.Callable;
  40 import java.util.concurrent.CompletionService;
  41 import java.util.concurrent.CountDownLatch;
  42 import java.util.concurrent.ExecutionException;
  43 import java.util.concurrent.ExecutorCompletionService;
  44 import java.util.concurrent.ExecutorService;
  45 import java.util.concurrent.Future;
  46 import java.util.concurrent.FutureTask;
  47 import java.util.concurrent.RunnableFuture;
  48 import java.util.concurrent.ThreadPoolExecutor;
  49 import java.util.concurrent.TimeUnit;
  50 import java.util.concurrent.atomic.AtomicBoolean;
  51 
  52 import junit.framework.Test;
  53 import junit.framework.TestSuite;
  54 
  55 public class ExecutorCompletionServiceTest extends JSR166TestCase {
  56     public static void main(String[] args) {
  57         main(suite(), args);
  58     }
  59     public static Test suite() {
  60         return new TestSuite(ExecutorCompletionServiceTest.class);
  61     }
  62 
  63     /**
  64      * new ExecutorCompletionService(null) throws NullPointerException
  65      */
  66     public void testConstructorNPE() {
  67         try {
  68             new ExecutorCompletionService(null);
  69             shouldThrow();
  70         } catch (NullPointerException success) {}
  71     }
  72 
  73     /**
  74      * new ExecutorCompletionService(e, null) throws NullPointerException
  75      */
  76     public void testConstructorNPE2() {
  77         try {
  78             new ExecutorCompletionService(cachedThreadPool, null);
  79             shouldThrow();
  80         } catch (NullPointerException success) {}
  81     }
  82 
  83     /**
  84      * ecs.submit(null) throws NullPointerException
  85      */
  86     public void testSubmitNullCallable() {
  87         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
  88         try {
  89             cs.submit((Callable) null);
  90             shouldThrow();
  91         } catch (NullPointerException success) {}
  92     }
  93 
  94     /**
  95      * ecs.submit(null, val) throws NullPointerException
  96      */
  97     public void testSubmitNullRunnable() {
  98         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
  99         try {
 100             cs.submit((Runnable) null, Boolean.TRUE);
 101             shouldThrow();
 102         } catch (NullPointerException success) {}
 103     }
 104 
 105     /**
 106      * A taken submitted task is completed
 107      */
 108     public void testTake()
 109         throws InterruptedException, ExecutionException {
 110         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 111         cs.submit(new StringTask());
 112         Future f = cs.take();
 113         assertTrue(f.isDone());
 114         assertSame(TEST_STRING, f.get());
 115     }
 116 
 117     /**
 118      * Take returns the same future object returned by submit
 119      */
 120     public void testTake2() throws InterruptedException {
 121         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 122         Future f1 = cs.submit(new StringTask());
 123         Future f2 = cs.take();
 124         assertSame(f1, f2);
 125     }
 126 
 127     /**
 128      * poll returns non-null when the returned task is completed
 129      */
 130     public void testPoll1()
 131         throws InterruptedException, ExecutionException {
 132         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 133         assertNull(cs.poll());
 134         cs.submit(new StringTask());
 135 
 136         long startTime = System.nanoTime();
 137         Future f;
 138         while ((f = cs.poll()) == null) {
 139             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
 140                 fail("timed out");
 141             Thread.yield();
 142         }
 143         assertTrue(f.isDone());
 144         assertSame(TEST_STRING, f.get());
 145     }
 146 
 147     /**
 148      * timed poll returns non-null when the returned task is completed
 149      */
 150     public void testPoll2()
 151         throws InterruptedException, ExecutionException {
 152         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 153         assertNull(cs.poll());
 154         cs.submit(new StringTask());
 155 
 156         long startTime = System.nanoTime();
 157         Future f;
 158         while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
 159             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
 160                 fail("timed out");
 161             Thread.yield();
 162         }
 163         assertTrue(f.isDone());
 164         assertSame(TEST_STRING, f.get());
 165     }
 166 
 167     /**
 168      * poll returns null before the returned task is completed
 169      */
 170     public void testPollReturnsNull()
 171         throws InterruptedException, ExecutionException {
 172         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 173         final CountDownLatch proceed = new CountDownLatch(1);
 174         cs.submit(new Callable() { public String call() throws Exception {
 175             await(proceed);
 176             return TEST_STRING;
 177         }});
 178         assertNull(cs.poll());
 179         assertNull(cs.poll(0L, MILLISECONDS));
 180         assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
 181         long startTime = System.nanoTime();
 182         assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
 183         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 184         proceed.countDown();
 185         assertSame(TEST_STRING, cs.take().get());
 186     }
 187 
 188     /**
 189      * successful and failed tasks are both returned
 190      */
 191     public void testTaskAssortment()
 192         throws InterruptedException, ExecutionException {
 193         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 194         ArithmeticException ex = new ArithmeticException();
 195         for (int i = 0; i < 2; i++) {
 196             cs.submit(new StringTask());
 197             cs.submit(callableThrowing(ex));
 198             cs.submit(runnableThrowing(ex), null);
 199         }
 200         int normalCompletions = 0;
 201         int exceptionalCompletions = 0;
 202         for (int i = 0; i < 3 * 2; i++) {
 203             try {
 204                 if (cs.take().get() == TEST_STRING)
 205                     normalCompletions++;
 206             }
 207             catch (ExecutionException expected) {
 208                 assertTrue(expected.getCause() instanceof ArithmeticException);
 209                 exceptionalCompletions++;
 210             }
 211         }
 212         assertEquals(2 * 1, normalCompletions);
 213         assertEquals(2 * 2, exceptionalCompletions);
 214         assertNull(cs.poll());
 215     }
 216 
 217     /**
 218      * Submitting to underlying AES that overrides newTaskFor(Callable)
 219      * returns and eventually runs Future returned by newTaskFor.
 220      */
 221     public void testNewTaskForCallable() throws InterruptedException {
 222         final AtomicBoolean done = new AtomicBoolean(false);
 223         class MyCallableFuture<V> extends FutureTask<V> {
 224             MyCallableFuture(Callable<V> c) { super(c); }
 225             @Override protected void done() { done.set(true); }
 226         }
 227         final ExecutorService e =
 228             new ThreadPoolExecutor(1, 1,
 229                                    30L, TimeUnit.SECONDS,
 230                                    new ArrayBlockingQueue<Runnable>(1)) {
 231                 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
 232                     return new MyCallableFuture<T>(c);
 233                 }};
 234         CompletionService<String> cs = new ExecutorCompletionService<>(e);
 235         try (PoolCleaner cleaner = cleaner(e)) {
 236             assertNull(cs.poll());
 237             Callable<String> c = new StringTask();
 238             Future f1 = cs.submit(c);
 239             assertTrue("submit must return MyCallableFuture",
 240                        f1 instanceof MyCallableFuture);
 241             Future f2 = cs.take();
 242             assertSame("submit and take must return same objects", f1, f2);
 243             assertTrue("completed task must have set done", done.get());
 244         }
 245     }
 246 
 247     /**
 248      * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
 249      * returns and eventually runs Future returned by newTaskFor.
 250      */
 251     public void testNewTaskForRunnable() throws InterruptedException {
 252         final AtomicBoolean done = new AtomicBoolean(false);
 253         class MyRunnableFuture<V> extends FutureTask<V> {
 254             MyRunnableFuture(Runnable t, V r) { super(t, r); }
 255             @Override protected void done() { done.set(true); }
 256         }
 257         final ExecutorService e =
 258             new ThreadPoolExecutor(1, 1,
 259                                    30L, TimeUnit.SECONDS,
 260                                    new ArrayBlockingQueue<Runnable>(1)) {
 261                 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
 262                     return new MyRunnableFuture<T>(t, r);
 263                 }};
 264         CompletionService<String> cs = new ExecutorCompletionService<>(e);
 265         try (PoolCleaner cleaner = cleaner(e)) {
 266             assertNull(cs.poll());
 267             Runnable r = new NoOpRunnable();
 268             Future f1 = cs.submit(r, null);
 269             assertTrue("submit must return MyRunnableFuture",
 270                        f1 instanceof MyRunnableFuture);
 271             Future f2 = cs.take();
 272             assertSame("submit and take must return same objects", f1, f2);
 273             assertTrue("completed task must have set done", done.get());
 274         }
 275     }
 276 
 277 }