1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.nio.ByteBuffer;
  25 import java.util.ArrayList;
  26 import java.util.List;
  27 import java.util.concurrent.CompletionStage;
  28 import java.util.concurrent.CountDownLatch;
  29 import java.util.concurrent.CyclicBarrier;
  30 import java.util.concurrent.ExecutorService;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.Flow.Subscription;
  33 import java.util.concurrent.Phaser;
  34 import java.util.concurrent.SubmissionPublisher;
  35 import java.util.function.IntSupplier;
  36 import java.util.stream.IntStream;
  37 import jdk.incubator.http.HttpResponse.BodySubscriber;
  38 import org.testng.annotations.DataProvider;
  39 import org.testng.annotations.Test;
  40 import static java.lang.Long.MAX_VALUE;
  41 import static java.lang.Long.MIN_VALUE;
  42 import static java.lang.System.out;
  43 import static java.nio.ByteBuffer.wrap;
  44 import static java.util.concurrent.TimeUnit.SECONDS;
  45 import static jdk.incubator.http.HttpResponse.BodySubscriber.buffering;
  46 import static org.testng.Assert.*;
  47 
  48 /*
  49  * @test
  50  * @summary Test for HttpResponse.BodySubscriber.buffering() onError/onComplete
  51  * @run testng/othervm BufferingSubscriberErrorCompleteTest
  52  */
  53 
  54 public class BufferingSubscriberErrorCompleteTest {
  55 
  56     @DataProvider(name = "illegalDemand")
  57     public Object[][] illegalDemand() {
  58         return new Object[][]{
  59             {0L}, {-1L}, {-5L}, {-100L}, {-101L}, {-100_001L}, {MIN_VALUE}
  60         };
  61     }
  62 
  63     @Test(dataProvider = "illegalDemand")
  64     public void illegalRequest(long demand) throws Exception {
  65         ExecutorService executor = Executors.newFixedThreadPool(1);
  66         SubmissionPublisher<List<ByteBuffer>> publisher =
  67                 new SubmissionPublisher<>(executor, 1);
  68 
  69         Phaser gate = new Phaser(2);  // single onSubscribe and onError
  70         ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate);
  71         BodySubscriber subscriber = buffering(exposingSubscriber, 1);
  72         publisher.subscribe(subscriber);
  73         gate.arriveAndAwaitAdvance();
  74 
  75         Subscription s = exposingSubscriber.subscription;
  76         int previous = exposingSubscriber.onErrorInvocations;
  77         s.request(demand);
  78         gate.arriveAndAwaitAdvance();
  79 
  80         assertEquals(previous + 1, exposingSubscriber.onErrorInvocations);
  81         assertTrue(exposingSubscriber.throwable instanceof IllegalArgumentException,
  82                 "Expected IAE, got:" + exposingSubscriber.throwable);
  83 
  84         furtherCancelsRequestsShouldBeNoOp(s);
  85         assertEquals(exposingSubscriber.onErrorInvocations, 1);
  86         executor.shutdown();
  87     }
  88 
  89 
  90     @DataProvider(name = "bufferAndItemSizes")
  91     public Object[][] bufferAndItemSizes() {
  92         List<Object[]> values = new ArrayList<>();
  93 
  94         for (int bufferSize : new int[] { 1, 5, 10, 100, 1000 })
  95             for (int items : new int[]  { 0, 1, 2, 5, 9, 10, 11, 15, 29, 99 })
  96                 values.add(new Object[] { bufferSize, items });
  97 
  98         return values.stream().toArray(Object[][]::new);
  99     }
 100 
 101     @Test(dataProvider = "bufferAndItemSizes")
 102     public void onErrorFromPublisher(int bufferSize,
 103                                      int numberOfItems)
 104         throws Exception
 105     {
 106         ExecutorService executor = Executors.newFixedThreadPool(1);
 107         SubmissionPublisher<List<ByteBuffer>> publisher =
 108                 new SubmissionPublisher<>(executor, 1);
 109 
 110         // onSubscribe + onError + this thread
 111         Phaser gate = new Phaser(3);
 112         ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate);
 113         BodySubscriber subscriber = buffering(exposingSubscriber, bufferSize);
 114         publisher.subscribe(subscriber);
 115 
 116         List<ByteBuffer> item = List.of(wrap(new byte[] { 1 }));
 117         IntStream.range(0, numberOfItems).forEach(x -> publisher.submit(item));
 118         Throwable t = new Throwable("a message from me to me");
 119         publisher.closeExceptionally(t);
 120 
 121         gate.arriveAndAwaitAdvance();
 122 
 123         Subscription s = exposingSubscriber.subscription;
 124 
 125         assertEquals(exposingSubscriber.onErrorInvocations, 1);
 126         assertEquals(exposingSubscriber.onCompleteInvocations, 0);
 127         assertEquals(exposingSubscriber.throwable, t);
 128         assertEquals(exposingSubscriber.throwable.getMessage(),
 129                      "a message from me to me");
 130 
 131         furtherCancelsRequestsShouldBeNoOp(s);
 132         assertEquals(exposingSubscriber.onErrorInvocations, 1);
 133         assertEquals(exposingSubscriber.onCompleteInvocations, 0);
 134         executor.shutdown();
 135     }
 136 
 137     @Test(dataProvider = "bufferAndItemSizes")
 138     public void onCompleteFromPublisher(int bufferSize,
 139                                         int numberOfItems)
 140         throws Exception
 141     {
 142         ExecutorService executor = Executors.newFixedThreadPool(1);
 143         SubmissionPublisher<List<ByteBuffer>> publisher =
 144                 new SubmissionPublisher<>(executor, 1);
 145 
 146         // onSubscribe + onComplete + this thread
 147         Phaser gate = new Phaser(3);
 148         ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate);
 149         BodySubscriber subscriber = buffering(exposingSubscriber, bufferSize);
 150         publisher.subscribe(subscriber);
 151 
 152         List<ByteBuffer> item = List.of(wrap(new byte[] { 1 }));
 153         IntStream.range(0, numberOfItems).forEach(x -> publisher.submit(item));
 154         publisher.close();
 155 
 156         gate.arriveAndAwaitAdvance();
 157 
 158         Subscription s = exposingSubscriber.subscription;
 159 
 160         assertEquals(exposingSubscriber.onErrorInvocations, 0);
 161         assertEquals(exposingSubscriber.onCompleteInvocations, 1);
 162         assertEquals(exposingSubscriber.throwable, null);
 163 
 164         furtherCancelsRequestsShouldBeNoOp(s);
 165         assertEquals(exposingSubscriber.onErrorInvocations, 0);
 166         assertEquals(exposingSubscriber.onCompleteInvocations, 1);
 167         assertEquals(exposingSubscriber.throwable, null);
 168         executor.shutdown();
 169     }
 170 
 171     static class ExposingSubscriber implements BodySubscriber<Void> {
 172         final Phaser gate;
 173         volatile Subscription subscription;
 174         volatile int onNextInvocations;
 175         volatile int onErrorInvocations;
 176         volatile int onCompleteInvocations;
 177         volatile Throwable throwable;
 178 
 179         ExposingSubscriber(Phaser gate) {
 180             this.gate = gate;
 181         }
 182 
 183         @Override
 184         public void onSubscribe(Subscription subscription) {
 185             //out.println("onSubscribe " + subscription);
 186             this.subscription = subscription;
 187             subscription.request(MAX_VALUE);
 188             gate.arrive();
 189         }
 190 
 191         @Override
 192         public void onNext(List<ByteBuffer> item) {
 193             //out.println("onNext " + item);
 194             onNextInvocations++;
 195         }
 196 
 197         @Override
 198         public void onError(Throwable throwable) {
 199             //out.println("onError " + throwable);
 200             this.throwable = throwable;
 201             onErrorInvocations++;
 202             gate.arrive();
 203         }
 204 
 205         @Override
 206         public void onComplete() {
 207             //out.println("onComplete ");
 208             onCompleteInvocations++;
 209             gate.arrive();
 210         }
 211 
 212         @Override
 213         public CompletionStage<Void> getBody() {
 214             throw new UnsupportedOperationException("getBody is unsupported");
 215         }
 216     }
 217 
 218     static void furtherCancelsRequestsShouldBeNoOp(Subscription s) {
 219         s.cancel(); s.request(1);
 220         s.cancel(); s.request(100); s.cancel();
 221         s.cancel(); s.request(MAX_VALUE); s.cancel(); s.cancel();
 222         s.cancel(); s.cancel(); s.cancel(); s.cancel();
 223         s.request(MAX_VALUE); s.request(MAX_VALUE); s.request(MAX_VALUE);
 224         s.request(-1); s.request(-100); s.request(MIN_VALUE);
 225     }
 226 }