1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.
   7  *
   8  * This code is distributed in the hope that it will be useful, but WITHOUT
   9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  11  * version 2 for more details (a copy is included in the LICENSE file that
  12  * accompanied this code).
  13  *
  14  * You should have received a copy of the GNU General Public License version
  15  * 2 along with this work; if not, write to the Free Software Foundation,
  16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  17  *
  18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  19  * or visit www.oracle.com if you need additional information or have any
  20  * questions.
  21  */
  22 
  23 /*
  24  * This file is available under and governed by the GNU General Public
  25  * License version 2 only, as published by the Free Software Foundation.
  26  * However, the following notice accompanied the original version of this
  27  * file:
  28  *
  29  * Written by Doug Lea with assistance from members of JCP JSR-166
  30  * Expert Group and released to the public domain, as explained at
  31  * http://creativecommons.org/publicdomain/zero/1.0/
  32  * Other contributors include Andrew Wright, Jeffrey Hayes,
  33  * Pat Fisher, Mike Judd.
  34  */
  35 
  36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  37 
  38 import java.util.concurrent.ArrayBlockingQueue;
  39 import java.util.concurrent.Callable;
  40 import java.util.concurrent.CompletionService;
  41 import java.util.concurrent.CountDownLatch;
  42 import java.util.concurrent.ExecutionException;
  43 import java.util.concurrent.ExecutorCompletionService;
  44 import java.util.concurrent.ExecutorService;
  45 import java.util.concurrent.Future;
  46 import java.util.concurrent.FutureTask;
  47 import java.util.concurrent.RunnableFuture;
  48 import java.util.concurrent.ThreadPoolExecutor;
  49 import java.util.concurrent.TimeUnit;
  50 import java.util.concurrent.atomic.AtomicBoolean;
  51 
  52 import junit.framework.Test;
  53 import junit.framework.TestSuite;
  54 
  55 public class ExecutorCompletionServiceTest extends JSR166TestCase {
  56     public static void main(String[] args) {
  57         main(suite(), args);
  58     }
  59     public static Test suite() {
  60         return new TestSuite(ExecutorCompletionServiceTest.class);
  61     }
  62 
  63     /**
  64      * new ExecutorCompletionService(null) throws NullPointerException
  65      */
  66     public void testConstructorNPE() {
  67         try {
  68             new ExecutorCompletionService(null);
  69             shouldThrow();
  70         } catch (NullPointerException success) {}
  71     }
  72 
  73     /**
  74      * new ExecutorCompletionService(e, null) throws NullPointerException
  75      */
  76     public void testConstructorNPE2() {
  77         try {
  78             new ExecutorCompletionService(cachedThreadPool, null);
  79             shouldThrow();
  80         } catch (NullPointerException success) {}
  81     }
  82 
  83     /**
  84      * ecs.submit(null) throws NullPointerException
  85      */
  86     public void testSubmitNullCallable() {
  87         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
  88         try {
  89             cs.submit((Callable) null);
  90             shouldThrow();
  91         } catch (NullPointerException success) {}
  92     }
  93 
  94     /**
  95      * ecs.submit(null, val) throws NullPointerException
  96      */
  97     public void testSubmitNullRunnable() {
  98         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
  99         try {
 100             cs.submit((Runnable) null, Boolean.TRUE);
 101             shouldThrow();
 102         } catch (NullPointerException success) {}
 103     }
 104 
 105     /**
 106      * A taken submitted task is completed
 107      */
 108     public void testTake() throws Exception {
 109         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 110         cs.submit(new StringTask());
 111         Future f = cs.take();
 112         assertTrue(f.isDone());
 113         assertSame(TEST_STRING, f.get());
 114     }
 115 
 116     /**
 117      * Take returns the same future object returned by submit
 118      */
 119     public void testTake2() throws InterruptedException {
 120         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 121         Future f1 = cs.submit(new StringTask());
 122         Future f2 = cs.take();
 123         assertSame(f1, f2);
 124     }
 125 
 126     /**
 127      * poll returns non-null when the returned task is completed
 128      */
 129     public void testPoll1() throws Exception {
 130         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 131         assertNull(cs.poll());
 132         cs.submit(new StringTask());
 133 
 134         long startTime = System.nanoTime();
 135         Future f;
 136         while ((f = cs.poll()) == null) {
 137             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
 138                 fail("timed out");
 139             Thread.yield();
 140         }
 141         assertTrue(f.isDone());
 142         assertSame(TEST_STRING, f.get());
 143     }
 144 
 145     /**
 146      * timed poll returns non-null when the returned task is completed
 147      */
 148     public void testPoll2() throws Exception {
 149         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 150         assertNull(cs.poll());
 151         cs.submit(new StringTask());
 152 
 153         long startTime = System.nanoTime();
 154         Future f;
 155         while ((f = cs.poll(timeoutMillis(), MILLISECONDS)) == null) {
 156             assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 157             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
 158                 fail("timed out");
 159             Thread.yield();
 160         }
 161         assertTrue(f.isDone());
 162         assertSame(TEST_STRING, f.get());
 163     }
 164 
 165     /**
 166      * poll returns null before the returned task is completed
 167      */
 168     public void testPollReturnsNullBeforeCompletion() throws Exception {
 169         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 170         final CountDownLatch proceed = new CountDownLatch(1);
 171         cs.submit(new Callable() { public String call() throws Exception {
 172             await(proceed);
 173             return TEST_STRING;
 174         }});
 175         assertNull(cs.poll());
 176         assertNull(cs.poll(0L, MILLISECONDS));
 177         assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
 178         long startTime = System.nanoTime();
 179         assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
 180         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
 181         proceed.countDown();
 182         assertSame(TEST_STRING, cs.take().get());
 183     }
 184 
 185     /**
 186      * successful and failed tasks are both returned
 187      */
 188     public void testTaskAssortment() throws Exception {
 189         CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
 190         ArithmeticException ex = new ArithmeticException();
 191         final int rounds = 2;
 192         for (int i = rounds; i--> 0; ) {
 193             cs.submit(new StringTask());
 194             cs.submit(callableThrowing(ex));
 195             cs.submit(runnableThrowing(ex), null);
 196         }
 197         int normalCompletions = 0;
 198         int exceptionalCompletions = 0;
 199         for (int i = 3 * rounds; i--> 0; ) {
 200             try {
 201                 assertSame(TEST_STRING, cs.take().get());
 202                 normalCompletions++;
 203             } catch (ExecutionException expected) {
 204                 assertSame(ex, expected.getCause());
 205                 exceptionalCompletions++;
 206             }
 207         }
 208         assertEquals(1 * rounds, normalCompletions);
 209         assertEquals(2 * rounds, exceptionalCompletions);
 210         assertNull(cs.poll());
 211     }
 212 
 213     /**
 214      * Submitting to underlying AES that overrides newTaskFor(Callable)
 215      * returns and eventually runs Future returned by newTaskFor.
 216      */
 217     public void testNewTaskForCallable() throws InterruptedException {
 218         final AtomicBoolean done = new AtomicBoolean(false);
 219         class MyCallableFuture<V> extends FutureTask<V> {
 220             MyCallableFuture(Callable<V> c) { super(c); }
 221             @Override protected void done() { done.set(true); }
 222         }
 223         final ExecutorService e =
 224             new ThreadPoolExecutor(1, 1,
 225                                    30L, TimeUnit.SECONDS,
 226                                    new ArrayBlockingQueue<Runnable>(1)) {
 227                 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
 228                     return new MyCallableFuture<T>(c);
 229                 }};
 230         CompletionService<String> cs = new ExecutorCompletionService<>(e);
 231         try (PoolCleaner cleaner = cleaner(e)) {
 232             assertNull(cs.poll());
 233             Callable<String> c = new StringTask();
 234             Future f1 = cs.submit(c);
 235             assertTrue("submit must return MyCallableFuture",
 236                        f1 instanceof MyCallableFuture);
 237             Future f2 = cs.take();
 238             assertSame("submit and take must return same objects", f1, f2);
 239             assertTrue("completed task must have set done", done.get());
 240         }
 241     }
 242 
 243     /**
 244      * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
 245      * returns and eventually runs Future returned by newTaskFor.
 246      */
 247     public void testNewTaskForRunnable() throws InterruptedException {
 248         final AtomicBoolean done = new AtomicBoolean(false);
 249         class MyRunnableFuture<V> extends FutureTask<V> {
 250             MyRunnableFuture(Runnable t, V r) { super(t, r); }
 251             @Override protected void done() { done.set(true); }
 252         }
 253         final ExecutorService e =
 254             new ThreadPoolExecutor(1, 1,
 255                                    30L, TimeUnit.SECONDS,
 256                                    new ArrayBlockingQueue<Runnable>(1)) {
 257                 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
 258                     return new MyRunnableFuture<T>(t, r);
 259                 }};
 260         CompletionService<String> cs = new ExecutorCompletionService<>(e);
 261         try (PoolCleaner cleaner = cleaner(e)) {
 262             assertNull(cs.poll());
 263             Runnable r = new NoOpRunnable();
 264             Future f1 = cs.submit(r, null);
 265             assertTrue("submit must return MyRunnableFuture",
 266                        f1 instanceof MyRunnableFuture);
 267             Future f2 = cs.take();
 268             assertSame("submit and take must return same objects", f1, f2);
 269             assertTrue("completed task must have set done", done.get());
 270         }
 271     }
 272 
 273 }