--- /dev/null 2017-08-12 14:14:50.000000000 +0100 +++ new/test/java/net/httpclient/BufferingProcessorTest.java 2017-08-12 14:14:49.000000000 +0100 @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.SubmissionPublisher; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodyProcessor; +import jdk.test.lib.RandomFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import static java.lang.Long.MAX_VALUE; +import static java.lang.System.out; +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.testng.Assert.*; + +/* + * @test + * @library /test/lib + * @build jdk.test.lib.RandomFactory + * @bug 8184285 + * @summary Direct test for HttpResponse.BodyProcessor.buffering() API + * @run testng BufferingProcessorTest + * @key randomness + */ + +public class BufferingProcessorTest { + + static final Random random = RandomFactory.getRandom(); + + @DataProvider(name = "negatives") + public Object[][] negatives() { + return new Object[][] { { 0 }, { -1 }, { -1000 } }; + } + + @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) + public void processorThrowsIAE(int bufferSize) { + BodyProcessor bp = BodyProcessor.asByteArray(); + BodyProcessor.buffering(bp, bufferSize); + } + + @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) + public void handlerThrowsIAE(int bufferSize) { + BodyHandler bp = BodyHandler.asByteArray(); + BodyHandler.buffering(bp, bufferSize); + } + + // --- + + @DataProvider(name = "config") + public Object[][] config() { + return new Object[][] { + // numSubscribers delayMillis numBuffers bufferSize maxBufferSize minBufferSize + { 1, 0, 1, 1, 2, 1 }, + { 1, 0, 1, 10, 1000, 1 }, + { 1, 10, 1, 10, 1000, 1 }, + { 1, 0, 1, 1000, 1000, 1 }, + { 1, 0, 10, 1000, 1000, 1 }, + { 1, 0, 1000, 10 , 1000, 50 }, + { 1, 100, 1, 1000 * 4, 1000, 1 }, + { 100, 0, 1000, 1, 2, 1 }, + { 3, 0, 4, 5006, 1000, 1 }, + { 20, 0, 100, 4888, 1000, 100 }, + { 16, 10, 1000, 50 , 1000, 100 }, + }; + } + + @Test(dataProvider = "config") + public void test(int numSubscribers, + int delayMillis, + int numBuffers, + int bufferSize, + int maxBufferSize, + int minbufferSize) { + for (long perRequestAmount : new long[] { 1L, MAX_VALUE }) + test(numSubscribers, + delayMillis, + numBuffers, + bufferSize, + maxBufferSize, + minbufferSize, + perRequestAmount); + } + + public void test(int numSubscribers, + int delayMillis, + int numBuffers, + int bufferSize, + int maxBufferSize, + int minBuferSize, + long requestAmount) { + ExecutorService executor = Executors.newFixedThreadPool(5); + try { + SubmissionPublisher> publisher = + new SubmissionPublisher<>(executor, 1); + CompletableFuture cf = sink(publisher, + numSubscribers, + delayMillis, + numBuffers * bufferSize, + requestAmount, + maxBufferSize, + minBuferSize); + source(publisher, numBuffers, bufferSize); + cf.join(); + out.println("OK"); + } finally { + executor.shutdown(); + } + } + + static int accumulatedDataSize(List bufs) { + return bufs.stream().mapToInt(ByteBuffer::remaining).sum(); + } + + /** Returns a new BB with its contents set to monotonically increasing + * values, staring at the given start index and wrapping every 100. */ + static ByteBuffer allocateBuffer(int size, int startIdx) { + ByteBuffer b = ByteBuffer.allocate(size); + for (int i=0; i { + final int delayMillis; + final int bufferSize; + final int expectedTotalSize; + final long requestAmount; + final CompletableFuture completion; + final Executor delayedExecutor; + volatile Flow.Subscription subscription; + + TestProcessor(int bufferSize, + int delayMillis, + int expectedTotalSize, + long requestAmount) { + this.bufferSize = bufferSize; + this.completion = new CompletableFuture<>(); + this.delayMillis = delayMillis; + this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS); + this.expectedTotalSize = expectedTotalSize; + this.requestAmount = requestAmount; + } + + /** + * Example of a factory method which would decorate a buffering processor + * to create a new processor dependent on buffering capability. + * + * The integer type parameter simulates the body just by counting the + * number of bytes in the body. + */ + static BodyProcessor createSubscriber(int bufferSize, + int delay, + int expectedTotalSize, + long requestAmount) { + TestProcessor s = new TestProcessor(bufferSize, + delay, + expectedTotalSize, + requestAmount); + return BodyProcessor.buffering(s, bufferSize); + } + + private void requestMore() { subscription.request(requestAmount); } + + @Override + public void onSubscribe(Subscription subscription) { + assertNull(this.subscription); + this.subscription = subscription; + if (delayMillis > 0) + delayedExecutor.execute(this::requestMore); + else + requestMore(); + } + + volatile int wrongSizes; + volatile int totalBytesReceived; + volatile int onNextInvocations; + volatile int lastSeenSize = -1; + volatile boolean noMoreOnNext; // false + volatile int index; // 0 + + @Override + public void onNext(List items) { + long sz = accumulatedDataSize(items); + onNextInvocations++; + assertNotEquals(sz, 0L, "Unexpected empty buffers"); + items.stream().forEach(b -> assertEquals(b.position(), 0)); + assertFalse(noMoreOnNext); + + if (sz != bufferSize) { + String msg = sz + ", should be less than bufferSize, " + bufferSize; + assertTrue(sz < bufferSize, msg); + assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize); + noMoreOnNext = true; + wrongSizes++; + } else { + assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize"); + } + + // Ensure expected contents + for (ByteBuffer b : items) { + while (b.hasRemaining()) { + assertEquals(b.get(), (byte) (index % 100)); + index++; + } + } + + totalBytesReceived += sz; + assertEquals(totalBytesReceived, index ); + if (delayMillis > 0) + delayedExecutor.execute(this::requestMore); + else + requestMore(); + } + + @Override + public void onError(Throwable throwable) { + completion.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + if (wrongSizes > 1) { // allow just the final item to be smaller + String msg = "Wrong sizes. Expected no more than 1. [" + this + "]"; + completion.completeExceptionally(new Throwable(msg)); + } + if (totalBytesReceived != expectedTotalSize) { + String msg = "Wrong number of bytes. [" + this + "]"; + completion.completeExceptionally(new Throwable(msg)); + } else { + completion.complete(totalBytesReceived); + } + } + + @Override + public CompletionStage getBody() { return completion; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(super.toString()); + sb.append(", bufferSize=").append(bufferSize); + sb.append(", onNextInvocations=").append(onNextInvocations); + sb.append(", totalBytesReceived=").append(totalBytesReceived); + sb.append(", expectedTotalSize=").append(expectedTotalSize); + sb.append(", requestAmount=").append(requestAmount); + sb.append(", lastSeenSize=").append(lastSeenSize); + sb.append(", wrongSizes=").append(wrongSizes); + sb.append(", index=").append(index); + return sb.toString(); + } + } + + /** + * Publishes data, through the given publisher, using the main thread. + * + * Note: The executor supplied when creating the SubmissionPublisher provides + * the threads for executing the Subscribers. + * + * @param publisher the publisher + * @param numBuffers the number of buffers to send ( before splitting in two ) + * @param bufferSize the total size of the data to send ( before splitting in two ) + */ + static void source(SubmissionPublisher> publisher, + int numBuffers, + int bufferSize) { + out.printf("Publishing %d buffers of size %d each\n", numBuffers, bufferSize); + int index = 0; + for (int i=0; i sink(SubmissionPublisher> publisher, + int numSubscribers, + int delayMillis, + int expectedTotalSize, + long requestAmount, + int maxBufferSize, + int minBufferSize) { + int[] bufferSizes = new int[numSubscribers]; + CompletableFuture[] cfs = new CompletableFuture[numSubscribers]; + for (int i=0; i sub = TestProcessor.createSubscriber(bufferSize, + delayMillis, + expectedTotalSize, + requestAmount); + cfs[i] = sub.getBody().toCompletableFuture(); + publisher.subscribe(sub); + } + out.printf("Number of subscribers: %d\n", numSubscribers); + out.printf("Each subscriber reads data with buffer sizes:"); + out.printf("%s bytes\n", Arrays.toString(bufferSizes)); + out.printf("Subscription delay is %d msec\n", delayMillis); + out.printf("Request amount is %d items\n", requestAmount); + return CompletableFuture.allOf(cfs); + } + + // --- + + // TODO: Add a test for cancel + + // --- + + /* Main entry point for standalone testing of the main functional test. */ + public static void main(String... args) { + BufferingProcessorTest t = new BufferingProcessorTest(); + for (Object[] objs : t.config()) + t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]); + } +}