1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.nio.ByteBuffer; 25 import java.util.List; 26 import java.util.Random; 27 import java.util.concurrent.CompletableFuture; 28 import java.util.concurrent.CompletionStage; 29 import java.util.concurrent.Executor; 30 import java.util.concurrent.ExecutorService; 31 import java.util.concurrent.Executors; 32 import java.util.concurrent.Flow; 33 import java.util.concurrent.Flow.Subscription; 34 import java.util.concurrent.SubmissionPublisher; 35 import jdk.incubator.http.HttpResponse.BodyHandler; 36 import jdk.incubator.http.HttpResponse.BodySubscriber; 37 import jdk.test.lib.RandomFactory; 38 import org.testng.annotations.DataProvider; 39 import org.testng.annotations.Test; 40 import static java.lang.Long.MAX_VALUE; 41 import static java.lang.Long.min; 42 import static java.lang.System.out; 43 import static java.util.concurrent.CompletableFuture.delayedExecutor; 44 import static java.util.concurrent.TimeUnit.MILLISECONDS; 45 import static org.testng.Assert.*; 46 47 /* 48 * @test 49 * @bug 8184285 50 * @summary Direct test for HttpResponse.BodySubscriber.buffering() API 51 * @key randomness 52 * @library /test/lib 53 * @build jdk.test.lib.RandomFactory 54 * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest 55 */ 56 57 public class BufferingSubscriberTest { 58 59 // If we compute that a test will take less that 10s 60 // we judge it acceptable 61 static final long LOWER_THRESHOLD = 10_000; // 10 sec. 62 // If we compute that a test will take more than 20 sec 63 // we judge it problematic: we will try to adjust the 64 // buffer sizes, and if we can't we will print a warning 65 static final long UPPER_THRESHOLD = 20_000; // 20 sec. 66 67 static final Random random = RandomFactory.getRandom(); 68 static final long start = System.nanoTime(); 69 static final String START = "start"; 70 static final String END = "end "; 71 static long elapsed() { return (System.nanoTime() - start)/1000_000;} 72 static void printStamp(String what, String fmt, Object... args) { 73 long elapsed = elapsed(); 74 long sec = elapsed/1000; 75 long ms = elapsed % 1000; 76 String time = sec > 0 ? sec + "sec " : ""; 77 time = time + ms + "ms"; 78 out.println(what + "\t ["+time+"]\t "+ String.format(fmt,args)); 79 } 80 @DataProvider(name = "negatives") 81 public Object[][] negatives() { 82 return new Object[][] { { 0 }, { -1 }, { -1000 } }; 83 } 84 85 @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) 86 public void subscriberThrowsIAE(int bufferSize) { 87 printStamp(START, "subscriberThrowsIAE(%d)", bufferSize); 88 try { 89 BodySubscriber<?> bp = BodySubscriber.asByteArray(); 90 BodySubscriber.buffering(bp, bufferSize); 91 } finally { 92 printStamp(END, "subscriberThrowsIAE(%d)", bufferSize); 93 } 94 } 95 96 @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) 97 public void handlerThrowsIAE(int bufferSize) { 98 printStamp(START, "handlerThrowsIAE(%d)", bufferSize); 99 try { 100 BodyHandler<?> bp = BodyHandler.asByteArray(); 101 BodyHandler.buffering(bp, bufferSize); 102 } finally { 103 printStamp(END, "handlerThrowsIAE(%d)", bufferSize); 104 } 105 } 106 107 // --- 108 109 @DataProvider(name = "config") 110 public Object[][] config() { 111 return new Object[][] { 112 // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize 113 { 1, 0, 1, 1, 2, 1 }, 114 { 1, 5, 1, 100, 2, 1 }, 115 { 1, 0, 1, 10, 1000, 1 }, 116 { 1, 10, 1, 10, 1000, 1 }, 117 { 1, 0, 1, 1000, 1000, 10 }, 118 { 1, 0, 10, 1000, 1000, 50 }, 119 { 1, 0, 1000, 10 , 1000, 50 }, 120 { 1, 100, 1, 1000 * 4, 1000, 50 }, 121 { 100, 0, 1000, 1, 2, 1 }, 122 { 3, 0, 4, 5006, 1000, 50 }, 123 { 20, 0, 100, 4888, 1000, 100 }, 124 { 16, 10, 1000, 50 , 1000, 100 }, 125 }; 126 } 127 128 @Test(dataProvider = "config") 129 public void test(int iterations, 130 int delayMillis, 131 int numBuffers, 132 int bufferSize, 133 int maxBufferSize, 134 int minbufferSize) { 135 for (long perRequestAmount : new long[] { 1L, MAX_VALUE }) 136 test(iterations, 137 delayMillis, 138 numBuffers, 139 bufferSize, 140 maxBufferSize, 141 minbufferSize, 142 perRequestAmount); 143 } 144 145 public void test(int iterations, 146 int delayMillis, 147 int numBuffers, 148 int bufferSize, 149 int maxBufferSize, 150 int minBufferSize, 151 long requestAmount) { 152 ExecutorService executor = Executors.newFixedThreadPool(1); 153 try { 154 out.printf("Iterations %d\n", iterations); 155 for (int i=0; i<iterations; i++ ) { 156 printStamp(START, "Iteration %d", i); 157 try { 158 SubmissionPublisher<List<ByteBuffer>> publisher = 159 new SubmissionPublisher<>(executor, 1); 160 CompletableFuture<?> cf = sink(publisher, 161 delayMillis, 162 numBuffers * bufferSize, 163 requestAmount, 164 maxBufferSize, 165 minBufferSize); 166 source(publisher, numBuffers, bufferSize); 167 publisher.close(); 168 cf.join(); 169 } finally { 170 printStamp(END, "Iteration %d\n", i); 171 } 172 } 173 out.println("OK"); 174 } finally { 175 executor.shutdown(); 176 } 177 } 178 179 static int accumulatedDataSize(List<ByteBuffer> bufs) { 180 return bufs.stream().mapToInt(ByteBuffer::remaining).sum(); 181 } 182 183 /** Returns a new BB with its contents set to monotonically increasing 184 * values, staring at the given start index and wrapping every 100. */ 185 static ByteBuffer allocateBuffer(int size, int startIdx) { 186 ByteBuffer b = ByteBuffer.allocate(size); 187 for (int i=0; i<size; i++) 188 b.put((byte)((startIdx + i) % 100)); 189 b.position(0); 190 return b; 191 } 192 193 static class TestSubscriber implements BodySubscriber<Integer> { 194 final int delayMillis; 195 final int bufferSize; 196 final int expectedTotalSize; 197 final long requestAmount; 198 final CompletableFuture<Integer> completion; 199 final Executor delayedExecutor; 200 volatile Flow.Subscription subscription; 201 202 TestSubscriber(int bufferSize, 203 int delayMillis, 204 int expectedTotalSize, 205 long requestAmount) { 206 this.bufferSize = bufferSize; 207 this.completion = new CompletableFuture<>(); 208 this.delayMillis = delayMillis; 209 this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS); 210 this.expectedTotalSize = expectedTotalSize; 211 this.requestAmount = requestAmount; 212 } 213 214 /** 215 * Example of a factory method which would decorate a buffering 216 * subscriber to create a new subscriber dependent on buffering capability. 217 * 218 * The integer type parameter simulates the body just by counting the 219 * number of bytes in the body. 220 */ 221 static BodySubscriber<Integer> createSubscriber(int bufferSize, 222 int delay, 223 int expectedTotalSize, 224 long requestAmount) { 225 TestSubscriber s = new TestSubscriber(bufferSize, 226 delay, 227 expectedTotalSize, 228 requestAmount); 229 return BodySubscriber.buffering(s, bufferSize); 230 } 231 232 private void requestMore() { subscription.request(requestAmount); } 233 234 @Override 235 public void onSubscribe(Subscription subscription) { 236 assertNull(this.subscription); 237 this.subscription = subscription; 238 if (delayMillis > 0) 239 delayedExecutor.execute(this::requestMore); 240 else 241 requestMore(); 242 } 243 244 volatile int wrongSizes; 245 volatile int totalBytesReceived; 246 volatile int onNextInvocations; 247 volatile int lastSeenSize = -1; 248 volatile boolean noMoreOnNext; // false 249 volatile int index; // 0 250 volatile long count; 251 252 @Override 253 public void onNext(List<ByteBuffer> items) { 254 long sz = accumulatedDataSize(items); 255 boolean printStamp = delayMillis > 0 256 && requestAmount < Long.MAX_VALUE 257 && count % 20 == 0; 258 if (printStamp) { 259 printStamp("stamp", "count=%d sz=%d accumulated=%d", 260 count, sz, (totalBytesReceived + sz)); 261 } 262 count++; 263 onNextInvocations++; 264 assertNotEquals(sz, 0L, "Unexpected empty buffers"); 265 items.stream().forEach(b -> assertEquals(b.position(), 0)); 266 assertFalse(noMoreOnNext); 267 268 if (sz != bufferSize) { 269 String msg = sz + ", should be less than bufferSize, " + bufferSize; 270 assertTrue(sz < bufferSize, msg); 271 assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize); 272 noMoreOnNext = true; 273 wrongSizes++; 274 } else { 275 assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize"); 276 } 277 278 // Ensure expected contents 279 for (ByteBuffer b : items) { 280 while (b.hasRemaining()) { 281 assertEquals(b.get(), (byte) (index % 100)); 282 index++; 283 } 284 } 285 286 totalBytesReceived += sz; 287 assertEquals(totalBytesReceived, index ); 288 if (delayMillis > 0) 289 delayedExecutor.execute(this::requestMore); 290 else 291 requestMore(); 292 } 293 294 @Override 295 public void onError(Throwable throwable) { 296 completion.completeExceptionally(throwable); 297 } 298 299 @Override 300 public void onComplete() { 301 if (wrongSizes > 1) { // allow just the final item to be smaller 302 String msg = "Wrong sizes. Expected no more than 1. [" + this + "]"; 303 completion.completeExceptionally(new Throwable(msg)); 304 } 305 if (totalBytesReceived != expectedTotalSize) { 306 String msg = "Wrong number of bytes. [" + this + "]"; 307 completion.completeExceptionally(new Throwable(msg)); 308 } else { 309 completion.complete(totalBytesReceived); 310 } 311 } 312 313 @Override 314 public CompletionStage<Integer> getBody() { return completion; } 315 316 @Override 317 public String toString() { 318 StringBuilder sb = new StringBuilder(); 319 sb.append(super.toString()); 320 sb.append(", bufferSize=").append(bufferSize); 321 sb.append(", onNextInvocations=").append(onNextInvocations); 322 sb.append(", totalBytesReceived=").append(totalBytesReceived); 323 sb.append(", expectedTotalSize=").append(expectedTotalSize); 324 sb.append(", requestAmount=").append(requestAmount); 325 sb.append(", lastSeenSize=").append(lastSeenSize); 326 sb.append(", wrongSizes=").append(wrongSizes); 327 sb.append(", index=").append(index); 328 return sb.toString(); 329 } 330 } 331 332 /** 333 * Publishes data, through the given publisher, using the main thread. 334 * 335 * Note: The executor supplied when creating the SubmissionPublisher provides 336 * the threads for executing the Subscribers. 337 * 338 * @param publisher the publisher 339 * @param numBuffers the number of buffers to send ( before splitting in two ) 340 * @param bufferSize the total size of the data to send ( before splitting in two ) 341 */ 342 static void source(SubmissionPublisher<List<ByteBuffer>> publisher, 343 int numBuffers, 344 int bufferSize) { 345 printStamp("source","Publishing %d buffers of size %d each", numBuffers, bufferSize); 346 int index = 0; 347 for (int i=0; i<numBuffers; i++) { 348 int chunkSize = random.nextInt(bufferSize); 349 ByteBuffer buf1 = allocateBuffer(chunkSize, index); 350 index += chunkSize; 351 ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index); 352 index += bufferSize - chunkSize; 353 publisher.submit(List.of(buf1, buf2)); 354 } 355 printStamp("source", "complete"); 356 } 357 358 /** 359 * Creates and subscribes Subscribers that receive data from the given 360 * publisher. 361 * 362 * @param publisher the publisher 363 * @param delayMillis time, in milliseconds, to delay the Subscription 364 * requesting more bytes ( for simulating slow consumption ) 365 * @param expectedTotalSize the total number of bytes expected to be received 366 * by the subscribers 367 * @return a CompletableFuture which completes when the subscription is complete 368 */ 369 static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher, 370 int delayMillis, 371 int expectedTotalSize, 372 long requestAmount, 373 int maxBufferSize, 374 int minBufferSize) { 375 int bufferSize = chooseBufferSize(maxBufferSize, 376 minBufferSize, 377 delayMillis, 378 expectedTotalSize, 379 requestAmount); 380 assert bufferSize > 0; 381 assert bufferSize >= minBufferSize; 382 assert bufferSize <= maxBufferSize; 383 BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize, 384 delayMillis, 385 expectedTotalSize, 386 requestAmount); 387 publisher.subscribe(sub); 388 printStamp("sink","Subscriber reads data with buffer size: %d", bufferSize); 389 out.printf("Subscription delay is %d msec\n", delayMillis); 390 long delay = (((long)delayMillis * expectedTotalSize) / bufferSize) / requestAmount; 391 out.printf("Minimum total delay is %d sec %d ms\n", delay / 1000, delay % 1000); 392 out.printf("Request amount is %d items\n", requestAmount); 393 return sub.getBody().toCompletableFuture(); 394 } 395 396 static int chooseBufferSize(int maxBufferSize, 397 int minBufferSize, 398 int delaysMillis, 399 int expectedTotalSize, 400 long requestAmount) { 401 assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0; 402 int bufferSize = random.nextInt(maxBufferSize - minBufferSize) 403 + minBufferSize; 404 if (requestAmount == Long.MAX_VALUE) return bufferSize; 405 long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize) 406 / requestAmount; 407 long maxDelay = (((long)delaysMillis * expectedTotalSize) / minBufferSize) 408 / requestAmount; 409 // if the maximum delay is < 10s just take a random number between min and max. 410 if (maxDelay <= LOWER_THRESHOLD) { 411 return bufferSize; 412 } 413 // if minimum delay is greater than 20s then print a warning and use max buffer. 414 if (minDelay >= UPPER_THRESHOLD) { 415 System.out.println("Warning: minimum delay is " 416 + minDelay/1000 + "sec " + minDelay%1000 + "ms"); 417 System.err.println("Warning: minimum delay is " 418 + minDelay/1000 + "sec " + minDelay%1000 + "ms"); 419 return maxBufferSize; 420 } 421 // maxDelay could be anything, but minDelay is below the UPPER_THRESHOLD 422 // try to pick up a buffer size that keeps the delay below the 423 // UPPER_THRESHOLD 424 while (minBufferSize < maxBufferSize) { 425 bufferSize = random.nextInt(maxBufferSize - minBufferSize) 426 + minBufferSize; 427 long delay = (((long)delaysMillis * expectedTotalSize) / bufferSize) 428 / requestAmount; 429 if (delay < UPPER_THRESHOLD) return bufferSize; 430 minBufferSize++; 431 } 432 return minBufferSize; 433 } 434 435 // --- 436 437 /* Main entry point for standalone testing of the main functional test. */ 438 public static void main(String... args) { 439 BufferingSubscriberTest t = new BufferingSubscriberTest(); 440 for (Object[] objs : t.config()) 441 t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]); 442 } 443 }