1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.nio.ByteBuffer; 25 import java.util.ArrayList; 26 import java.util.List; 27 import java.util.concurrent.CompletionStage; 28 import java.util.concurrent.CountDownLatch; 29 import java.util.concurrent.CyclicBarrier; 30 import java.util.concurrent.ExecutorService; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.Flow.Subscription; 33 import java.util.concurrent.Phaser; 34 import java.util.concurrent.SubmissionPublisher; 35 import java.util.function.IntSupplier; 36 import java.util.stream.IntStream; 37 import jdk.incubator.http.HttpResponse.BodySubscriber; 38 import org.testng.annotations.DataProvider; 39 import org.testng.annotations.Test; 40 import static java.lang.Long.MAX_VALUE; 41 import static java.lang.Long.MIN_VALUE; 42 import static java.lang.System.out; 43 import static java.nio.ByteBuffer.wrap; 44 import static java.util.concurrent.TimeUnit.SECONDS; 45 import static jdk.incubator.http.HttpResponse.BodySubscriber.buffering; 46 import static org.testng.Assert.*; 47 48 /* 49 * @test 50 * @summary Test for HttpResponse.BodySubscriber.buffering() onError/onComplete 51 * @run testng/othervm BufferingSubscriberErrorCompleteTest 52 */ 53 54 public class BufferingSubscriberErrorCompleteTest { 55 56 @DataProvider(name = "illegalDemand") 57 public Object[][] illegalDemand() { 58 return new Object[][]{ 59 {0L}, {-1L}, {-5L}, {-100L}, {-101L}, {-100_001L}, {MIN_VALUE} 60 }; 61 } 62 63 @Test(dataProvider = "illegalDemand") 64 public void illegalRequest(long demand) throws Exception { 65 ExecutorService executor = Executors.newFixedThreadPool(1); 66 SubmissionPublisher<List<ByteBuffer>> publisher = 67 new SubmissionPublisher<>(executor, 1); 68 69 Phaser gate = new Phaser(2); // single onSubscribe and onError 70 ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate); 71 BodySubscriber subscriber = buffering(exposingSubscriber, 1); 72 publisher.subscribe(subscriber); 73 gate.arriveAndAwaitAdvance(); 74 75 Subscription s = exposingSubscriber.subscription; 76 int previous = exposingSubscriber.onErrorInvocations; 77 s.request(demand); 78 gate.arriveAndAwaitAdvance(); 79 80 assertEquals(previous + 1, exposingSubscriber.onErrorInvocations); 81 assertTrue(exposingSubscriber.throwable instanceof IllegalArgumentException, 82 "Expected IAE, got:" + exposingSubscriber.throwable); 83 84 furtherCancelsRequestsShouldBeNoOp(s); 85 assertEquals(exposingSubscriber.onErrorInvocations, 1); 86 executor.shutdown(); 87 } 88 89 90 @DataProvider(name = "bufferAndItemSizes") 91 public Object[][] bufferAndItemSizes() { 92 List<Object[]> values = new ArrayList<>(); 93 94 for (int bufferSize : new int[] { 1, 5, 10, 100, 1000 }) 95 for (int items : new int[] { 0, 1, 2, 5, 9, 10, 11, 15, 29, 99 }) 96 values.add(new Object[] { bufferSize, items }); 97 98 return values.stream().toArray(Object[][]::new); 99 } 100 101 @Test(dataProvider = "bufferAndItemSizes") 102 public void onErrorFromPublisher(int bufferSize, 103 int numberOfItems) 104 throws Exception 105 { 106 ExecutorService executor = Executors.newFixedThreadPool(1); 107 SubmissionPublisher<List<ByteBuffer>> publisher = 108 new SubmissionPublisher<>(executor, 1); 109 110 // onSubscribe + onError + this thread 111 Phaser gate = new Phaser(3); 112 ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate); 113 BodySubscriber subscriber = buffering(exposingSubscriber, bufferSize); 114 publisher.subscribe(subscriber); 115 116 List<ByteBuffer> item = List.of(wrap(new byte[] { 1 })); 117 IntStream.range(0, numberOfItems).forEach(x -> publisher.submit(item)); 118 Throwable t = new Throwable("a message from me to me"); 119 publisher.closeExceptionally(t); 120 121 gate.arriveAndAwaitAdvance(); 122 123 Subscription s = exposingSubscriber.subscription; 124 125 assertEquals(exposingSubscriber.onErrorInvocations, 1); 126 assertEquals(exposingSubscriber.onCompleteInvocations, 0); 127 assertEquals(exposingSubscriber.throwable, t); 128 assertEquals(exposingSubscriber.throwable.getMessage(), 129 "a message from me to me"); 130 131 furtherCancelsRequestsShouldBeNoOp(s); 132 assertEquals(exposingSubscriber.onErrorInvocations, 1); 133 assertEquals(exposingSubscriber.onCompleteInvocations, 0); 134 executor.shutdown(); 135 } 136 137 @Test(dataProvider = "bufferAndItemSizes") 138 public void onCompleteFromPublisher(int bufferSize, 139 int numberOfItems) 140 throws Exception 141 { 142 ExecutorService executor = Executors.newFixedThreadPool(1); 143 SubmissionPublisher<List<ByteBuffer>> publisher = 144 new SubmissionPublisher<>(executor, 1); 145 146 // onSubscribe + onComplete + this thread 147 Phaser gate = new Phaser(3); 148 ExposingSubscriber exposingSubscriber = new ExposingSubscriber(gate); 149 BodySubscriber subscriber = buffering(exposingSubscriber, bufferSize); 150 publisher.subscribe(subscriber); 151 152 List<ByteBuffer> item = List.of(wrap(new byte[] { 1 })); 153 IntStream.range(0, numberOfItems).forEach(x -> publisher.submit(item)); 154 publisher.close(); 155 156 gate.arriveAndAwaitAdvance(); 157 158 Subscription s = exposingSubscriber.subscription; 159 160 assertEquals(exposingSubscriber.onErrorInvocations, 0); 161 assertEquals(exposingSubscriber.onCompleteInvocations, 1); 162 assertEquals(exposingSubscriber.throwable, null); 163 164 furtherCancelsRequestsShouldBeNoOp(s); 165 assertEquals(exposingSubscriber.onErrorInvocations, 0); 166 assertEquals(exposingSubscriber.onCompleteInvocations, 1); 167 assertEquals(exposingSubscriber.throwable, null); 168 executor.shutdown(); 169 } 170 171 static class ExposingSubscriber implements BodySubscriber<Void> { 172 final Phaser gate; 173 volatile Subscription subscription; 174 volatile int onNextInvocations; 175 volatile int onErrorInvocations; 176 volatile int onCompleteInvocations; 177 volatile Throwable throwable; 178 179 ExposingSubscriber(Phaser gate) { 180 this.gate = gate; 181 } 182 183 @Override 184 public void onSubscribe(Subscription subscription) { 185 //out.println("onSubscribe " + subscription); 186 this.subscription = subscription; 187 subscription.request(MAX_VALUE); 188 gate.arrive(); 189 } 190 191 @Override 192 public void onNext(List<ByteBuffer> item) { 193 //out.println("onNext " + item); 194 onNextInvocations++; 195 } 196 197 @Override 198 public void onError(Throwable throwable) { 199 //out.println("onError " + throwable); 200 this.throwable = throwable; 201 onErrorInvocations++; 202 gate.arrive(); 203 } 204 205 @Override 206 public void onComplete() { 207 //out.println("onComplete "); 208 onCompleteInvocations++; 209 gate.arrive(); 210 } 211 212 @Override 213 public CompletionStage<Void> getBody() { 214 throw new UnsupportedOperationException("getBody is unsupported"); 215 } 216 } 217 218 static void furtherCancelsRequestsShouldBeNoOp(Subscription s) { 219 s.cancel(); s.request(1); 220 s.cancel(); s.request(100); s.cancel(); 221 s.cancel(); s.request(MAX_VALUE); s.cancel(); s.cancel(); 222 s.cancel(); s.cancel(); s.cancel(); s.cancel(); 223 s.request(MAX_VALUE); s.request(MAX_VALUE); s.request(MAX_VALUE); 224 s.request(-1); s.request(-100); s.request(MIN_VALUE); 225 } 226 }