/* * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; import jdk.incubator.http.HttpResponse.BodyHandler; import jdk.incubator.http.HttpResponse.BodyProcessor; import jdk.test.lib.RandomFactory; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static java.lang.Long.MAX_VALUE; import static java.lang.System.out; import static java.util.concurrent.CompletableFuture.delayedExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.testng.Assert.*; /* * @test * @library /test/lib * @build jdk.test.lib.RandomFactory * @bug 8184285 * @summary Direct test for HttpResponse.BodyProcessor.buffering() API * @run testng BufferingProcessorTest * @key randomness */ public class BufferingProcessorTest { static final Random random = RandomFactory.getRandom(); @DataProvider(name = "negatives") public Object[][] negatives() { return new Object[][] { { 0 }, { -1 }, { -1000 } }; } @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) public void processorThrowsIAE(int bufferSize) { BodyProcessor bp = BodyProcessor.asByteArray(); BodyProcessor.buffering(bp, bufferSize); } @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) public void handlerThrowsIAE(int bufferSize) { BodyHandler bp = BodyHandler.asByteArray(); BodyHandler.buffering(bp, bufferSize); } // --- @DataProvider(name = "config") public Object[][] config() { return new Object[][] { // numSubscribers delayMillis numBuffers bufferSize maxBufferSize minBufferSize { 1, 0, 1, 1, 2, 1 }, { 1, 0, 1, 10, 1000, 1 }, { 1, 10, 1, 10, 1000, 1 }, { 1, 0, 1, 1000, 1000, 1 }, { 1, 0, 10, 1000, 1000, 1 }, { 1, 0, 1000, 10 , 1000, 50 }, { 1, 100, 1, 1000 * 4, 1000, 1 }, { 100, 0, 1000, 1, 2, 1 }, { 3, 0, 4, 5006, 1000, 1 }, { 20, 0, 100, 4888, 1000, 100 }, { 16, 10, 1000, 50 , 1000, 100 }, }; } @Test(dataProvider = "config") public void test(int numSubscribers, int delayMillis, int numBuffers, int bufferSize, int maxBufferSize, int minbufferSize) { for (long perRequestAmount : new long[] { 1L, MAX_VALUE }) test(numSubscribers, delayMillis, numBuffers, bufferSize, maxBufferSize, minbufferSize, perRequestAmount); } public void test(int numSubscribers, int delayMillis, int numBuffers, int bufferSize, int maxBufferSize, int minBuferSize, long requestAmount) { ExecutorService executor = Executors.newFixedThreadPool(5); try { SubmissionPublisher> publisher = new SubmissionPublisher<>(executor, 1); CompletableFuture cf = sink(publisher, numSubscribers, delayMillis, numBuffers * bufferSize, requestAmount, maxBufferSize, minBuferSize); source(publisher, numBuffers, bufferSize); cf.join(); out.println("OK"); } finally { executor.shutdown(); } } static int accumulatedDataSize(List bufs) { return bufs.stream().mapToInt(ByteBuffer::remaining).sum(); } /** Returns a new BB with its contents set to monotonically increasing * values, staring at the given start index and wrapping every 100. */ static ByteBuffer allocateBuffer(int size, int startIdx) { ByteBuffer b = ByteBuffer.allocate(size); for (int i=0; i { final int delayMillis; final int bufferSize; final int expectedTotalSize; final long requestAmount; final CompletableFuture completion; final Executor delayedExecutor; volatile Flow.Subscription subscription; TestProcessor(int bufferSize, int delayMillis, int expectedTotalSize, long requestAmount) { this.bufferSize = bufferSize; this.completion = new CompletableFuture<>(); this.delayMillis = delayMillis; this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS); this.expectedTotalSize = expectedTotalSize; this.requestAmount = requestAmount; } /** * Example of a factory method which would decorate a buffering processor * to create a new processor dependent on buffering capability. * * The integer type parameter simulates the body just by counting the * number of bytes in the body. */ static BodyProcessor createSubscriber(int bufferSize, int delay, int expectedTotalSize, long requestAmount) { TestProcessor s = new TestProcessor(bufferSize, delay, expectedTotalSize, requestAmount); return BodyProcessor.buffering(s, bufferSize); } private void requestMore() { subscription.request(requestAmount); } @Override public void onSubscribe(Subscription subscription) { assertNull(this.subscription); this.subscription = subscription; if (delayMillis > 0) delayedExecutor.execute(this::requestMore); else requestMore(); } volatile int wrongSizes; volatile int totalBytesReceived; volatile int onNextInvocations; volatile int lastSeenSize = -1; volatile boolean noMoreOnNext; // false volatile int index; // 0 @Override public void onNext(List items) { long sz = accumulatedDataSize(items); onNextInvocations++; assertNotEquals(sz, 0L, "Unexpected empty buffers"); items.stream().forEach(b -> assertEquals(b.position(), 0)); assertFalse(noMoreOnNext); if (sz != bufferSize) { String msg = sz + ", should be less than bufferSize, " + bufferSize; assertTrue(sz < bufferSize, msg); assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize); noMoreOnNext = true; wrongSizes++; } else { assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize"); } // Ensure expected contents for (ByteBuffer b : items) { while (b.hasRemaining()) { assertEquals(b.get(), (byte) (index % 100)); index++; } } totalBytesReceived += sz; assertEquals(totalBytesReceived, index ); if (delayMillis > 0) delayedExecutor.execute(this::requestMore); else requestMore(); } @Override public void onError(Throwable throwable) { completion.completeExceptionally(throwable); } @Override public void onComplete() { if (wrongSizes > 1) { // allow just the final item to be smaller String msg = "Wrong sizes. Expected no more than 1. [" + this + "]"; completion.completeExceptionally(new Throwable(msg)); } if (totalBytesReceived != expectedTotalSize) { String msg = "Wrong number of bytes. [" + this + "]"; completion.completeExceptionally(new Throwable(msg)); } else { completion.complete(totalBytesReceived); } } @Override public CompletionStage getBody() { return completion; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(super.toString()); sb.append(", bufferSize=").append(bufferSize); sb.append(", onNextInvocations=").append(onNextInvocations); sb.append(", totalBytesReceived=").append(totalBytesReceived); sb.append(", expectedTotalSize=").append(expectedTotalSize); sb.append(", requestAmount=").append(requestAmount); sb.append(", lastSeenSize=").append(lastSeenSize); sb.append(", wrongSizes=").append(wrongSizes); sb.append(", index=").append(index); return sb.toString(); } } /** * Publishes data, through the given publisher, using the main thread. * * Note: The executor supplied when creating the SubmissionPublisher provides * the threads for executing the Subscribers. * * @param publisher the publisher * @param numBuffers the number of buffers to send ( before splitting in two ) * @param bufferSize the total size of the data to send ( before splitting in two ) */ static void source(SubmissionPublisher> publisher, int numBuffers, int bufferSize) { out.printf("Publishing %d buffers of size %d each\n", numBuffers, bufferSize); int index = 0; for (int i=0; i sink(SubmissionPublisher> publisher, int numSubscribers, int delayMillis, int expectedTotalSize, long requestAmount, int maxBufferSize, int minBufferSize) { int[] bufferSizes = new int[numSubscribers]; CompletableFuture[] cfs = new CompletableFuture[numSubscribers]; for (int i=0; i sub = TestProcessor.createSubscriber(bufferSize, delayMillis, expectedTotalSize, requestAmount); cfs[i] = sub.getBody().toCompletableFuture(); publisher.subscribe(sub); } out.printf("Number of subscribers: %d\n", numSubscribers); out.printf("Each subscriber reads data with buffer sizes:"); out.printf("%s bytes\n", Arrays.toString(bufferSizes)); out.printf("Subscription delay is %d msec\n", delayMillis); out.printf("Request amount is %d items\n", requestAmount); return CompletableFuture.allOf(cfs); } // --- // TODO: Add a test for cancel // --- /* Main entry point for standalone testing of the main functional test. */ public static void main(String... args) { BufferingProcessorTest t = new BufferingProcessorTest(); for (Object[] objs : t.config()) t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]); } }