1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.nio.ByteBuffer;
  25 import java.util.List;
  26 import java.util.Random;
  27 import java.util.concurrent.CompletableFuture;
  28 import java.util.concurrent.CompletionStage;
  29 import java.util.concurrent.Executor;
  30 import java.util.concurrent.ExecutorService;
  31 import java.util.concurrent.Executors;
  32 import java.util.concurrent.Flow;
  33 import java.util.concurrent.Flow.Subscription;
  34 import java.util.concurrent.SubmissionPublisher;
  35 import jdk.incubator.http.HttpResponse.BodyHandler;
  36 import jdk.incubator.http.HttpResponse.BodySubscriber;
  37 import jdk.test.lib.RandomFactory;
  38 import org.testng.annotations.DataProvider;
  39 import org.testng.annotations.Test;
  40 import static java.lang.Long.MAX_VALUE;
  41 import static java.lang.Long.min;
  42 import static java.lang.System.out;
  43 import static java.util.concurrent.CompletableFuture.delayedExecutor;
  44 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  45 import static org.testng.Assert.*;
  46 
  47 /*
  48  * @test
  49  * @bug 8184285
  50  * @summary Direct test for HttpResponse.BodySubscriber.buffering() API
  51  * @key randomness
  52  * @library /test/lib
  53  * @build jdk.test.lib.RandomFactory
  54  * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
  55  */
  56 
  57 public class BufferingSubscriberTest {
  58 
  59     // If we compute that a test will take less that 10s
  60     // we judge it acceptable
  61     static final long LOWER_THRESHOLD = 10_000; // 10 sec.
  62     // If we compute that a test will take more than 20 sec
  63     // we judge it problematic: we will try to adjust the
  64     // buffer sizes, and if we can't we will print a warning
  65     static final long UPPER_THRESHOLD = 20_000; // 20 sec.
  66 
  67     static final Random random = RandomFactory.getRandom();
  68     static final long start = System.nanoTime();
  69     static final String START = "start";
  70     static final String END   = "end  ";
  71     static long elapsed() { return (System.nanoTime() - start)/1000_000;}
  72     static void printStamp(String what, String fmt, Object... args) {
  73         long elapsed = elapsed();
  74         long sec = elapsed/1000;
  75         long ms  = elapsed % 1000;
  76         String time = sec > 0 ? sec + "sec " : "";
  77         time = time + ms + "ms";
  78         out.println(what + "\t ["+time+"]\t "+ String.format(fmt,args));
  79     }
  80     @DataProvider(name = "negatives")
  81     public Object[][] negatives() {
  82         return new Object[][] {  { 0 }, { -1 }, { -1000 } };
  83     }
  84 
  85     @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
  86     public void subscriberThrowsIAE(int bufferSize) {
  87         printStamp(START, "subscriberThrowsIAE(%d)", bufferSize);
  88         try {
  89             BodySubscriber<?> bp = BodySubscriber.asByteArray();
  90             BodySubscriber.buffering(bp, bufferSize);
  91         } finally {
  92             printStamp(END, "subscriberThrowsIAE(%d)", bufferSize);
  93         }
  94     }
  95 
  96     @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
  97     public void handlerThrowsIAE(int bufferSize) {
  98         printStamp(START, "handlerThrowsIAE(%d)", bufferSize);
  99         try {
 100             BodyHandler<?> bp = BodyHandler.asByteArray();
 101             BodyHandler.buffering(bp, bufferSize);
 102         } finally {
 103             printStamp(END, "handlerThrowsIAE(%d)", bufferSize);
 104         }
 105     }
 106 
 107     // ---
 108 
 109     @DataProvider(name = "config")
 110     public Object[][] config() {
 111         return new Object[][] {
 112             // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
 113             { 1,              0,          1,         1,         2,            1   },
 114             { 1,              5,          1,         100,       2,            1   },
 115             { 1,              0,          1,         10,        1000,         1   },
 116             { 1,              10,         1,         10,        1000,         1   },
 117             { 1,              0,          1,         1000,      1000,         10  },
 118             { 1,              0,          10,        1000,      1000,         50  },
 119             { 1,              0,          1000,      10 ,       1000,         50  },
 120             { 1,              100,        1,         1000 * 4,  1000,         50  },
 121             { 100,            0,          1000,      1,         2,            1   },
 122             { 3,              0,          4,         5006,      1000,         50  },
 123             { 20,             0,          100,       4888,      1000,         100 },
 124             { 16,             10,         1000,      50 ,       1000,         100 },
 125         };
 126     }
 127 
 128     @Test(dataProvider = "config")
 129     public void test(int iterations,
 130                      int delayMillis,
 131                      int numBuffers,
 132                      int bufferSize,
 133                      int maxBufferSize,
 134                      int minbufferSize) {
 135         for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
 136             test(iterations,
 137                  delayMillis,
 138                  numBuffers,
 139                  bufferSize,
 140                  maxBufferSize,
 141                  minbufferSize,
 142                  perRequestAmount);
 143     }
 144 
 145     public void test(int iterations,
 146                      int delayMillis,
 147                      int numBuffers,
 148                      int bufferSize,
 149                      int maxBufferSize,
 150                      int minBufferSize,
 151                      long requestAmount) {
 152         ExecutorService executor = Executors.newFixedThreadPool(1);
 153         try {
 154             out.printf("Iterations %d\n", iterations);
 155             for (int i=0; i<iterations; i++ ) {
 156                 printStamp(START, "Iteration %d", i);
 157                 try {
 158                     SubmissionPublisher<List<ByteBuffer>> publisher =
 159                             new SubmissionPublisher<>(executor, 1);
 160                     CompletableFuture<?> cf = sink(publisher,
 161                             delayMillis,
 162                             numBuffers * bufferSize,
 163                             requestAmount,
 164                             maxBufferSize,
 165                             minBufferSize);
 166                     source(publisher, numBuffers, bufferSize);
 167                     publisher.close();
 168                     cf.join();
 169                 } finally {
 170                     printStamp(END, "Iteration %d\n", i);
 171                 }
 172             }
 173             out.println("OK");
 174         } finally {
 175             executor.shutdown();
 176         }
 177     }
 178 
 179     static int accumulatedDataSize(List<ByteBuffer> bufs) {
 180         return bufs.stream().mapToInt(ByteBuffer::remaining).sum();
 181     }
 182 
 183     /** Returns a new BB with its contents set to monotonically increasing
 184      * values, staring at the given start index and wrapping every 100. */
 185     static ByteBuffer allocateBuffer(int size, int startIdx) {
 186         ByteBuffer b = ByteBuffer.allocate(size);
 187         for (int i=0; i<size; i++)
 188             b.put((byte)((startIdx + i) % 100));
 189         b.position(0);
 190         return b;
 191     }
 192 
 193     static class TestSubscriber implements BodySubscriber<Integer> {
 194         final int delayMillis;
 195         final int bufferSize;
 196         final int expectedTotalSize;
 197         final long requestAmount;
 198         final CompletableFuture<Integer> completion;
 199         final Executor delayedExecutor;
 200         volatile Flow.Subscription subscription;
 201 
 202         TestSubscriber(int bufferSize,
 203                        int delayMillis,
 204                        int expectedTotalSize,
 205                        long requestAmount) {
 206             this.bufferSize = bufferSize;
 207             this.completion = new CompletableFuture<>();
 208             this.delayMillis = delayMillis;
 209             this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
 210             this.expectedTotalSize = expectedTotalSize;
 211             this.requestAmount = requestAmount;
 212         }
 213 
 214         /**
 215          * Example of a factory method which would decorate a buffering
 216          * subscriber to create a new subscriber dependent on buffering capability.
 217          *
 218          * The integer type parameter simulates the body just by counting the
 219          * number of bytes in the body.
 220          */
 221         static BodySubscriber<Integer> createSubscriber(int bufferSize,
 222                                                         int delay,
 223                                                         int expectedTotalSize,
 224                                                         long requestAmount) {
 225             TestSubscriber s = new TestSubscriber(bufferSize,
 226                                                 delay,
 227                                                 expectedTotalSize,
 228                                                 requestAmount);
 229             return BodySubscriber.buffering(s, bufferSize);
 230         }
 231 
 232         private void requestMore() { subscription.request(requestAmount); }
 233 
 234         @Override
 235         public void onSubscribe(Subscription subscription) {
 236             assertNull(this.subscription);
 237             this.subscription = subscription;
 238             if (delayMillis > 0)
 239                 delayedExecutor.execute(this::requestMore);
 240             else
 241                 requestMore();
 242         }
 243 
 244         volatile int wrongSizes;
 245         volatile int totalBytesReceived;
 246         volatile int onNextInvocations;
 247         volatile int lastSeenSize = -1;
 248         volatile boolean noMoreOnNext; // false
 249         volatile int index; // 0
 250         volatile long count;
 251 
 252         @Override
 253         public void onNext(List<ByteBuffer> items) {
 254             long sz = accumulatedDataSize(items);
 255             boolean printStamp = delayMillis > 0
 256                             && requestAmount < Long.MAX_VALUE
 257                             && count % 20 == 0;
 258             if (printStamp) {
 259                 printStamp("stamp",  "count=%d sz=%d accumulated=%d",
 260                         count, sz, (totalBytesReceived + sz));
 261             }
 262             count++;
 263             onNextInvocations++;
 264             assertNotEquals(sz, 0L, "Unexpected empty buffers");
 265             items.stream().forEach(b -> assertEquals(b.position(), 0));
 266             assertFalse(noMoreOnNext);
 267 
 268             if (sz != bufferSize) {
 269                 String msg = sz + ", should be less than bufferSize, " + bufferSize;
 270                 assertTrue(sz < bufferSize, msg);
 271                 assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
 272                 noMoreOnNext = true;
 273                 wrongSizes++;
 274             } else {
 275                 assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
 276             }
 277 
 278             // Ensure expected contents
 279             for (ByteBuffer b : items) {
 280                 while (b.hasRemaining()) {
 281                     assertEquals(b.get(), (byte) (index % 100));
 282                     index++;
 283                 }
 284             }
 285 
 286             totalBytesReceived += sz;
 287             assertEquals(totalBytesReceived, index );
 288             if (delayMillis > 0)
 289                 delayedExecutor.execute(this::requestMore);
 290             else
 291                 requestMore();
 292         }
 293 
 294         @Override
 295         public void onError(Throwable throwable) {
 296             completion.completeExceptionally(throwable);
 297         }
 298 
 299         @Override
 300         public void onComplete() {
 301             if (wrongSizes > 1) { // allow just the final item to be smaller
 302                 String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
 303                 completion.completeExceptionally(new Throwable(msg));
 304             }
 305             if (totalBytesReceived != expectedTotalSize) {
 306                 String msg = "Wrong number of bytes. [" + this + "]";
 307                 completion.completeExceptionally(new Throwable(msg));
 308             } else {
 309                 completion.complete(totalBytesReceived);
 310             }
 311         }
 312 
 313         @Override
 314         public CompletionStage<Integer> getBody() { return completion; }
 315 
 316         @Override
 317         public String toString() {
 318             StringBuilder sb = new StringBuilder();
 319             sb.append(super.toString());
 320             sb.append(", bufferSize=").append(bufferSize);
 321             sb.append(", onNextInvocations=").append(onNextInvocations);
 322             sb.append(", totalBytesReceived=").append(totalBytesReceived);
 323             sb.append(", expectedTotalSize=").append(expectedTotalSize);
 324             sb.append(", requestAmount=").append(requestAmount);
 325             sb.append(", lastSeenSize=").append(lastSeenSize);
 326             sb.append(", wrongSizes=").append(wrongSizes);
 327             sb.append(", index=").append(index);
 328             return sb.toString();
 329         }
 330     }
 331 
 332     /**
 333      * Publishes data, through the given publisher, using the main thread.
 334      *
 335      * Note: The executor supplied when creating the SubmissionPublisher provides
 336      * the threads for executing the Subscribers.
 337      *
 338      * @param publisher the publisher
 339      * @param numBuffers the number of buffers to send ( before splitting in two )
 340      * @param bufferSize the total size of the data to send ( before splitting in two )
 341      */
 342     static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
 343                        int numBuffers,
 344                        int bufferSize) {
 345         printStamp("source","Publishing %d buffers of size %d each", numBuffers, bufferSize);
 346         int index = 0;
 347         for (int i=0; i<numBuffers; i++) {
 348             int chunkSize = random.nextInt(bufferSize);
 349             ByteBuffer buf1 = allocateBuffer(chunkSize, index);
 350             index += chunkSize;
 351             ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
 352             index += bufferSize - chunkSize;
 353             publisher.submit(List.of(buf1, buf2));
 354         }
 355         printStamp("source", "complete");
 356     }
 357 
 358     /**
 359      * Creates and subscribes Subscribers that receive data from the given
 360      * publisher.
 361      *
 362      * @param publisher the publisher
 363      * @param delayMillis time, in milliseconds, to delay the Subscription
 364      *                    requesting more bytes ( for simulating slow consumption )
 365      * @param expectedTotalSize the total number of bytes expected to be received
 366      *                          by the subscribers
 367      * @return a CompletableFuture which completes when the subscription is complete
 368      */
 369     static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
 370                                      int delayMillis,
 371                                      int expectedTotalSize,
 372                                      long requestAmount,
 373                                      int maxBufferSize,
 374                                      int minBufferSize) {
 375         int bufferSize = chooseBufferSize(maxBufferSize,
 376                                           minBufferSize,
 377                                           delayMillis,
 378                                           expectedTotalSize,
 379                                           requestAmount);
 380         assert bufferSize > 0;
 381         assert bufferSize >= minBufferSize;
 382         assert bufferSize <= maxBufferSize;
 383         BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
 384                                                                     delayMillis,
 385                                                                     expectedTotalSize,
 386                                                                     requestAmount);
 387         publisher.subscribe(sub);
 388         printStamp("sink","Subscriber reads data with buffer size: %d", bufferSize);
 389         out.printf("Subscription delay is %d msec\n", delayMillis);
 390         long delay = (((long)delayMillis * expectedTotalSize) / bufferSize) / requestAmount;
 391         out.printf("Minimum total delay is %d sec %d ms\n", delay / 1000, delay % 1000);
 392         out.printf("Request amount is %d items\n", requestAmount);
 393         return sub.getBody().toCompletableFuture();
 394     }
 395 
 396     static int chooseBufferSize(int maxBufferSize,
 397                                 int minBufferSize,
 398                                 int delaysMillis,
 399                                 int expectedTotalSize,
 400                                 long requestAmount) {
 401         assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;
 402         int bufferSize = random.nextInt(maxBufferSize - minBufferSize)
 403                 + minBufferSize;
 404         if (requestAmount == Long.MAX_VALUE) return bufferSize;
 405         long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)
 406                 / requestAmount;
 407         long maxDelay = (((long)delaysMillis * expectedTotalSize) / minBufferSize)
 408                 / requestAmount;
 409         // if the maximum delay is < 10s just take a random number between min and max.
 410         if (maxDelay <= LOWER_THRESHOLD) {
 411             return bufferSize;
 412         }
 413         // if minimum delay is greater than 20s then print a warning and use max buffer.
 414         if (minDelay >= UPPER_THRESHOLD) {
 415             System.out.println("Warning: minimum delay is "
 416                     + minDelay/1000 + "sec " + minDelay%1000 + "ms");
 417             System.err.println("Warning: minimum delay is "
 418                     + minDelay/1000 + "sec " + minDelay%1000 + "ms");
 419             return maxBufferSize;
 420         }
 421         // maxDelay could be anything, but minDelay is below the UPPER_THRESHOLD
 422         // try to pick up a buffer size that keeps the delay below the
 423         // UPPER_THRESHOLD
 424         while (minBufferSize < maxBufferSize) {
 425             bufferSize = random.nextInt(maxBufferSize - minBufferSize)
 426                     + minBufferSize;
 427             long delay = (((long)delaysMillis * expectedTotalSize) / bufferSize)
 428                     / requestAmount;
 429             if (delay < UPPER_THRESHOLD) return bufferSize;
 430             minBufferSize++;
 431         }
 432         return minBufferSize;
 433     }
 434 
 435     // ---
 436 
 437     /* Main entry point for standalone testing of the main functional test. */
 438     public static void main(String... args) {
 439         BufferingSubscriberTest t = new BufferingSubscriberTest();
 440         for (Object[] objs : t.config())
 441             t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
 442     }
 443 }