--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java 2017-08-12 14:14:32.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java 2017-08-12 14:14:31.000000000 +0100 @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import jdk.incubator.http.internal.common.Log; @@ -114,21 +115,19 @@ return clen; } - public CompletableFuture readBody( - HttpResponse.BodyProcessor p, - boolean return2Cache, - Executor executor) { - final BlockingPushPublisher publisher = new BlockingPushPublisher<>(); + public CompletableFuture readBody(HttpResponse.BodyProcessor p, + boolean return2Cache, + Executor executor) { + BlockingPushPublisher> publisher = new BlockingPushPublisher<>(); return readBody(p, return2Cache, publisher, executor); } - private CompletableFuture readBody( - HttpResponse.BodyProcessor p, - boolean return2Cache, - AbstractPushPublisher publisher, - Executor executor) { + private CompletableFuture readBody(HttpResponse.BodyProcessor p, + boolean return2Cache, + AbstractPushPublisher> publisher, + Executor executor) { this.return2Cache = return2Cache; - final jdk.incubator.http.HttpResponse.BodyProcessor pusher = p; + final HttpResponse.BodyProcessor pusher = p; final CompletableFuture cf = p.getBody().toCompletableFuture(); int clen0; --- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java 2017-08-12 14:14:34.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java 2017-08-12 14:14:34.000000000 +0100 @@ -26,20 +26,17 @@ package jdk.incubator.http; import java.io.IOException; -import java.io.UncheckedIOException; import java.net.URI; -import jdk.incubator.http.ResponseProcessors.MultiFile; import jdk.incubator.http.ResponseProcessors.MultiProcessorImpl; import static jdk.incubator.http.internal.common.Utils.unchecked; import static jdk.incubator.http.internal.common.Utils.charsetFrom; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.Map; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -397,16 +394,40 @@ public static BodyHandler asString() { return (status, headers) -> BodyProcessor.asString(charsetFrom(headers)); } + + /** + * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain + * BodyProcessor#buffering(BodyProcessor,int) buffering BodyProcessor} + * that buffers data before delivering it to the downstream processor. + * These {@code BodyProcessor} instances are created by calling + * {@linkplain BodyProcessor#buffering(BodyProcessor,int) + * BodyProcessor.buffering} with a processor obtained from the given + * downstream handler and the {@code bufferSize} parameter. + * + * @param downstreamHandler the downstream handler + * @param bufferSize the buffer size parameter passed to {@linkplain + * BodyProcessor#buffering(BodyProcessor,int) BodyProcessor.buffering} + * @return a body handler + * @throws IllegalArgumentException if {@code bufferSize <= 0} + */ + public static BodyHandler buffering(BodyHandler downstreamHandler, + int bufferSize) { + if (bufferSize <= 0) + throw new IllegalArgumentException("must be greater than 0"); + return (status, headers) -> BodyProcessor + .buffering(downstreamHandler.apply(status, headers), + bufferSize); + } } /** * A processor for response bodies. * {@Incubating} *

- * The object acts as a {@link Flow.Subscriber}<{@link ByteBuffer}> to - * the HTTP client implementation which publishes ByteBuffers containing the - * response body. The processor converts the incoming buffers of data to - * some user-defined object type {@code T}. + * The object acts as a {@link Flow.Subscriber}<{@link List}<{@link + * ByteBuffer}>> to the HTTP client implementation which publishes + * ByteBuffers containing the response body. The processor converts the + * incoming buffers of data to some user-defined object type {@code T}. *

* The {@link #getBody()} method returns a {@link CompletionStage}{@code } * that provides the response body object. The {@code CompletionStage} must @@ -419,7 +440,7 @@ * @param the response body type */ public interface BodyProcessor - extends Flow.Subscriber { + extends Flow.Subscriber> { /** * Returns a {@code CompletionStage} which when completed will return the @@ -530,6 +551,30 @@ public static BodyProcessor discard(U value) { return new ResponseProcessors.NullProcessor<>(Optional.ofNullable(value)); } + + /** + * Returns a {@code BodyProcessor} which buffers data before delivering + * it to the given downstream processor. The processor guarantees to + * deliver {@code buffersize} bytes of data to each invocation of the + * downstream's {@linkplain #onNext(Object) onNext} method, except for + * the final invocation, just before {@linkplain #onComplete() onComplete} + * is invoked. The final invocation of {@code onNext} may contain fewer + * than {@code buffersize} bytes. + *

+ * The returned processor delegates its {@link #getBody()} method to the + * downstream processor. + * + * @param downstream the downstream processor + * @param bufferSize the buffer size + * @return a buffering body processor + * @throws IllegalArgumentException if {@code bufferSize <= 0} + */ + public static BodyProcessor buffering(BodyProcessor downstream, + int bufferSize) { + if (bufferSize <= 0) + throw new IllegalArgumentException("must be greater than 0"); + return new BufferingProcessor(downstream, bufferSize); + } } /** --- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java 2017-08-12 14:14:37.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java 2017-08-12 14:14:36.000000000 +0100 @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Optional; import java.util.function.Consumer; import jdk.incubator.http.internal.common.Utils; @@ -45,7 +46,7 @@ ByteBuffer buffer; //ByteBuffer lastBufferUsed; final ResponseHeaders headers; - private final Consumer> dataConsumer; + private final Consumer>> dataConsumer; private final Consumer errorConsumer; private final HttpClientImpl client; // this needs to run before we complete the body @@ -56,7 +57,7 @@ int contentLength, ResponseHeaders h, HttpResponse.BodyProcessor userProcessor, - Consumer> dataConsumer, + Consumer>> dataConsumer, Consumer errorConsumer, Runnable onFinished) { @@ -227,7 +228,7 @@ ByteBuffer b1 = readChunkedBuffer(); if (b1 != null) { if (b1.hasRemaining()) { - dataConsumer.accept(Optional.of(b1)); + dataConsumer.accept(Optional.of(List.of(b1))); } } else { onFinished.run(); @@ -258,7 +259,7 @@ Utils.copy(b, buffer, amount); remaining -= amount; buffer.flip(); - dataConsumer.accept(Optional.of(buffer)); + dataConsumer.accept(Optional.of(List.of(buffer))); } while (remaining > 0) { ByteBuffer buffer = connection.read(); @@ -271,7 +272,7 @@ throw new IOException("too many bytes read"); } remaining -= bytesread; - dataConsumer.accept(Optional.of(buffer)); + dataConsumer.accept(Optional.of(List.of(buffer))); } onFinished.run(); dataConsumer.accept(Optional.empty()); --- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java 2017-08-12 14:14:38.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java 2017-08-12 14:14:38.000000000 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -72,10 +72,12 @@ } @Override - public void onNext(ByteBuffer item) { - byte[] buf = new byte[item.remaining()]; - item.get(buf); - consumer.accept(Optional.of(buf)); + public void onNext(List items) { + for (ByteBuffer item : items) { + byte[] buf = new byte[item.remaining()]; + item.get(buf); + consumer.accept(Optional.of(buf)); + } subscription.request(1); } @@ -120,9 +122,9 @@ } @Override - public void onNext(ByteBuffer item) { + public void onNext(List items) { try { - out.write(item); + out.write(items.toArray(new ByteBuffer[0])); } catch (IOException ex) { Utils.close(out); subscription.cancel(); @@ -172,13 +174,12 @@ } @Override - public void onNext(ByteBuffer item) { + public void onNext(List items) { // incoming buffers are allocated by http client internally, // and won't be used anywhere except this place. // So it's free simply to store them for further processing. - if(item.hasRemaining()) { - received.add(item); - } + assert Utils.remaining(items) > 0; // TODO: is this really necessary? + received.addAll(items); } @Override @@ -304,9 +305,8 @@ } @Override - public void onNext(ByteBuffer item) { - // TODO: check whether this should consume the buffer, as in: - item.position(item.limit()); + public void onNext(List items) { + // NO-OP } @Override --- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java 2017-08-12 14:14:40.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java 2017-08-12 14:14:40.000000000 +0100 @@ -29,6 +29,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -44,6 +45,7 @@ import jdk.incubator.http.internal.common.*; import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.DecodingCallback; +import static java.util.stream.Collectors.toList; /** * Http/2 Stream handling. @@ -122,7 +124,7 @@ volatile int responseCode; volatile Response response; volatile CompletableFuture responseCF; - final AbstractPushPublisher publisher; + final AbstractPushPublisher> publisher; final CompletableFuture requestBodyCF = new MinimalFuture<>(); /** True if END_STREAM has been seen in a frame received on this stream. */ @@ -200,11 +202,12 @@ // including the Pad Length and Padding fields if present int len = df.payloadLength(); ByteBufferReference[] buffers = df.getData(); - for (ByteBufferReference b : buffers) { - ByteBuffer buf = b.get(); - if (buf.hasRemaining()) { - publisher.acceptData(Optional.of(buf)); - } + List dsts = Arrays.stream(buffers) + .map(ByteBufferReference::get) + .filter(ByteBuffer::hasRemaining) + .collect(toList()); + if (!dsts.isEmpty()) { + publisher.acceptData(Optional.of(dsts)); } connection.windowUpdater.update(len); if (df.getFlag(DataFrame.END_STREAM)) { --- old/test/java/net/httpclient/HttpInputStreamTest.java 2017-08-12 14:14:42.000000000 +0100 +++ new/test/java/net/httpclient/HttpInputStreamTest.java 2017-08-12 14:14:42.000000000 +0100 @@ -32,6 +32,8 @@ import jdk.incubator.http.HttpResponse; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; @@ -40,6 +42,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.stream.Stream; +import static java.lang.System.err; /* * @test @@ -61,7 +64,7 @@ public static class HttpInputStreamHandler implements HttpResponse.BodyHandler { - public static final int MAX_BUFFERS_IN_QUEUE = 1; + public static final int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer private final int maxBuffers; @@ -86,14 +89,16 @@ implements HttpResponse.BodyProcessor { // An immutable ByteBuffer sentinel to mark that the last byte was received. - private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); + private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); + private static final List LAST_LIST = List.of(LAST_BUFFER); // A queue of yet unprocessed ByteBuffers received from the flow API. - private final BlockingQueue buffers; + private final BlockingQueue> buffers; private volatile Flow.Subscription subscription; private volatile boolean closed; private volatile Throwable failed; - private volatile ByteBuffer current; + private volatile List currentList; + private volatile ByteBuffer currentBuffer; HttpResponseInputStream() { this(MAX_BUFFERS_IN_QUEUE); @@ -119,40 +124,50 @@ // a new buffer is made available through the Flow API, or the // end of the flow is reached. private ByteBuffer current() throws IOException { - while (current == null || !current.hasRemaining()) { - // Check whether the stream is claused or exhausted + while (currentBuffer == null || !currentBuffer.hasRemaining()) { + // Check whether the stream is closed or exhausted if (closed || failed != null) { throw new IOException("closed", failed); } - if (current == LAST) break; + if (currentBuffer == LAST_BUFFER) break; try { - // Take a new buffer from the queue, blocking - // if none is available yet... - if (DEBUG) System.err.println("Taking Buffer"); - current = buffers.take(); - if (DEBUG) System.err.println("Buffer Taken"); - - // Check whether some exception was encountered - // upstream - if (closed || failed != null) { - throw new IOException("closed", failed); + if (currentList == null || currentList.isEmpty()) { + // Take a new list of buffers from the queue, blocking + // if none is available yet... + + if (DEBUG) err.println("Taking list of Buffers"); + List lb = buffers.take(); + if (DEBUG) err.println("List of Buffers Taken"); + + // Check whether an exception was encountered upstream + if (closed || failed != null) + throw new IOException("closed", failed); + + // Check whether we're done. + if (lb == LAST_LIST) { + currentList = LAST_LIST; + currentBuffer = LAST_BUFFER; + break; + } + + currentList = new ArrayList<>(lb); // TODO: lb is immutable + + // Request another upstream item ( list of buffers ) + Flow.Subscription s = subscription; + if (s != null) + s.request(1); } - - // Check whether we're done. - if (current == LAST) break; - - // Inform the producer that it can start sending - // us a new buffer - Flow.Subscription s = subscription; - if (s != null) s.request(1); - + assert currentList != null; + assert !currentList.isEmpty(); + if (DEBUG) err.println("Next Buffer"); + currentBuffer = currentList.remove(0); } catch (InterruptedException ex) { // continue } } - assert current == LAST || current.hasRemaining(); - return current; + assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); + return currentBuffer; } @Override @@ -160,7 +175,7 @@ // get the buffer to read from, possibly blocking if // none is available ByteBuffer buffer; - if ((buffer = current()) == LAST) return -1; + if ((buffer = current()) == LAST_BUFFER) return -1; // don't attempt to read more than what is available // in the current buffer. @@ -175,7 +190,7 @@ @Override public int read() throws IOException { ByteBuffer buffer; - if ((buffer = current()) == LAST) return -1; + if ((buffer = current()) == LAST_BUFFER) return -1; return buffer.get() & 0xFF; } @@ -186,11 +201,11 @@ } @Override - public synchronized void onNext(ByteBuffer t) { + public synchronized void onNext(List t) { try { - if (DEBUG) System.err.println("next buffer received"); + if (DEBUG) err.println("next item received"); buffers.put(t); - if (DEBUG) System.err.println("buffered offered"); + if (DEBUG) err.println("item offered"); } catch (Exception ex) { failed = ex; try { @@ -209,7 +224,7 @@ @Override public synchronized void onComplete() { subscription = null; - onNext(LAST); + onNext(LAST_LIST); } @Override @@ -275,7 +290,7 @@ CompletableFuture> handle = client.sendAsync(request, new HttpInputStreamHandler()); - if (DEBUG) System.err.println("Request sent"); + if (DEBUG) err.println("Request sent"); HttpResponse pending = handle.get(); @@ -301,8 +316,8 @@ char[] buff = new char[32]; int off=0, n=0; - if (DEBUG) System.err.println("Start receiving response body"); - if (DEBUG) System.err.println("Charset: " + charset.get()); + if (DEBUG) err.println("Start receiving response body"); + if (DEBUG) err.println("Charset: " + charset.get()); // Start consuming the InputStream as the data arrives. // Will block until there is something to read... --- old/test/java/net/httpclient/RequestBodyTest.java 2017-08-12 14:14:44.000000000 +0100 +++ new/test/java/net/httpclient/RequestBodyTest.java 2017-08-12 14:14:44.000000000 +0100 @@ -42,6 +42,7 @@ import jdk.incubator.http.HttpClient; import jdk.incubator.http.HttpRequest; import jdk.incubator.http.HttpResponse; +import jdk.incubator.http.HttpResponse.BodyHandler; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -53,6 +54,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.function.Supplier; import javax.net.ssl.SSLContext; import jdk.test.lib.util.FileUtils; @@ -128,8 +130,9 @@ for (String file : new String[] { smallFilename, midSizedFilename }) for (RequestBody requestBodyType : RequestBody.values()) for (ResponseBody responseBodyType : ResponseBody.values()) - values.add(new Object[] - {uri, requestBodyType, responseBodyType, file, async}); + for (boolean bufferResponseBody : new boolean[] { false, true }) + values.add(new Object[] + {uri, requestBodyType, responseBodyType, file, async, bufferResponseBody}); return values.stream().toArray(Object[][]::new); } @@ -139,7 +142,8 @@ RequestBody requestBodyType, ResponseBody responseBodyType, String file, - boolean async) + boolean async, + boolean bufferResponseBody) throws Exception { Path filePath = Paths.get(fileroot + file); @@ -147,7 +151,7 @@ HttpRequest request = createRequest(uri, requestBodyType, filePath); - checkResponse(client, request, requestBodyType, responseBodyType, filePath, async); + checkResponse(client, request, requestBodyType, responseBodyType, filePath, async, bufferResponseBody); } static final int DEFAULT_OFFSET = 10; @@ -198,7 +202,8 @@ RequestBody requestBodyType, ResponseBody responseBodyType, Path file, - boolean async) + boolean async, + boolean bufferResponseBody) throws InterruptedException, IOException { String filename = file.toFile().getAbsolutePath(); @@ -215,43 +220,57 @@ switch (responseBodyType) { case BYTE_ARRAY: - HttpResponse bar = getResponse(client, request, asByteArray(), async); + BodyHandler bh = asByteArray(); + if (bufferResponseBody) bh = buffering(bh, 50); + HttpResponse bar = getResponse(client, request, bh, async); assertEquals(bar.statusCode(), 200); assertEquals(bar.body(), fileAsBytes); break; case BYTE_ARRAY_CONSUMER: ByteArrayOutputStream baos = new ByteArrayOutputStream(); - HttpResponse v = getResponse(client, request, - asByteArrayConsumer(o -> consumerBytes(o, baos) ), async); + Consumer> consumer = o -> consumerBytes(o, baos); + BodyHandler bh1 = asByteArrayConsumer(consumer); + if (bufferResponseBody) bh1 = buffering(bh1, 49); + HttpResponse v = getResponse(client, request, bh1, async); byte[] ba = baos.toByteArray(); assertEquals(v.statusCode(), 200); assertEquals(ba, fileAsBytes); break; case DISCARD: Object o = new Object(); - HttpResponse or = getResponse(client, request, discard(o), async); + BodyHandler bh2 = discard(o); + if (bufferResponseBody) bh2 = buffering(bh2, 51); + HttpResponse or = getResponse(client, request, bh2, async); assertEquals(or.statusCode(), 200); assertSame(or.body(), o); break; case FILE: - HttpResponse fr = getResponse(client, request, asFile(tempFile), async); + BodyHandler bh3 = asFile(tempFile); + if (bufferResponseBody) bh3 = buffering(bh3, 48); + HttpResponse fr = getResponse(client, request, bh3, async); assertEquals(fr.statusCode(), 200); assertEquals(Files.size(tempFile), fileAsString.length()); assertEquals(Files.readAllBytes(tempFile), fileAsBytes); break; case FILE_WITH_OPTION: - fr = getResponse(client, request, asFile(tempFile, CREATE_NEW, WRITE), async); + BodyHandler bh4 = asFile(tempFile, CREATE_NEW, WRITE); + if (bufferResponseBody) bh4 = buffering(bh4, 52); + fr = getResponse(client, request, bh4, async); assertEquals(fr.statusCode(), 200); assertEquals(Files.size(tempFile), fileAsString.length()); assertEquals(Files.readAllBytes(tempFile), fileAsBytes); break; case STRING: - HttpResponse sr = getResponse(client, request, asString(), async); + BodyHandler bh5 = asString(); + if(bufferResponseBody) bh5 = buffering(bh5, 47); + HttpResponse sr = getResponse(client, request, bh5, async); assertEquals(sr.statusCode(), 200); assertEquals(sr.body(), fileAsString); break; case STRING_WITH_CHARSET: - HttpResponse r = getResponse(client, request, asString(StandardCharsets.UTF_8), async); + BodyHandler bh6 = asString(StandardCharsets.UTF_8); + if (bufferResponseBody) bh6 = buffering(bh6, 53); + HttpResponse r = getResponse(client, request, bh6, async); assertEquals(r.statusCode(), 200); assertEquals(r.body(), fileAsString); break; --- old/test/java/net/httpclient/security/Security.java 2017-08-12 14:14:46.000000000 +0100 +++ new/test/java/net/httpclient/security/Security.java 2017-08-12 14:14:46.000000000 +0100 @@ -305,7 +305,7 @@ return stproc.getBody(); } @Override - public void onNext(ByteBuffer item) { + public void onNext(List item) { SecurityManager sm = System.getSecurityManager(); // should succeed. sm.checkPermission(new RuntimePermission("foobar")); --- /dev/null 2017-08-12 14:14:48.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingProcessor.java 2017-08-12 14:14:48.000000000 +0100 @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.incubator.http; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A buffering BodyProcessor. When subscribed, accumulates ( buffers ) a given + * amount ( in bytes ) of a subscriptions data before pushing it to a downstream + * processor. + */ +class BufferingProcessor implements HttpResponse.BodyProcessor +{ + /** The downstream consumer of the data. */ + private final HttpResponse.BodyProcessor downstreamProcessor; + /** The amount of data to be accumulate before pushing downstream. */ + private final int bufferSize; + + /** The subscription, created lazily. */ + private volatile Flow.Subscription subscription; + /** The downstream subscription, created lazily. */ + private volatile DownstreamSubscription downstreamSubscription; + + /** Must be held when accessing the internal buffers. */ + private final Object buffersLock = new Object(); + /** The internal buffers holding the buffered data. */ + private ArrayList internalBuffers; + /** The actual accumulated remaining bytes in internalBuffers. */ + private int accumulatedBytes; + + /** State of the buffering processor: + * 1) [UNSUBSCRIBED] when initially created + * 2) [ACTIVE] when subscribed and can receive data + * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) + */ + static final int UNSUBSCRIBED = 0x01; + static final int ACTIVE = 0x02; + static final int ERROR = 0x04; + static final int CANCELLED = 0x08; + static final int COMPLETE = 0x10; + + private volatile int state; + + BufferingProcessor(HttpResponse.BodyProcessor downstreamProcessor, + int bufferSize) { + this.downstreamProcessor = downstreamProcessor; + this.bufferSize = bufferSize; + synchronized (buffersLock) { + internalBuffers = new ArrayList<>(); + } + state = UNSUBSCRIBED; + } + + /** Returns the number of bytes remaining in the given buffers. */ + private static final int remaining(List buffers) { + return buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + } + + /** + * Tells whether, or not, there is at least a sufficient number of bytes + * accumulated in the internal buffers. If the processor is COMPLETE, and + * has some buffered data, then there is always enough ( to pass downstream ). + */ + private final boolean hasEnoughAccumulatedBytes() { + assert Thread.holdsLock(buffersLock); + return accumulatedBytes >= bufferSize + || (state == COMPLETE && accumulatedBytes > 0); + } + + /** + * Returns a new List containing exactly the amount of data as + * required before pushing downstream. The amount of data may be less than + * required ( bufferSize ), in the case where the processor is COMPLETE. + */ + private List fromInternalBuffers() { + assert Thread.holdsLock(buffersLock); + int leftToFill = bufferSize; + assert (state == ACTIVE || state == CANCELLED) + ? accumulatedBytes >= leftToFill : true; + List dsts = new ArrayList<>(); + + ListIterator itr = internalBuffers.listIterator(); + while (itr.hasNext()) { + ByteBuffer b = itr.next(); + if (b.remaining() <= leftToFill) { + itr.remove(); + if (b.position() != 0) + b = b.slice(); // ensure position = 0 when propagated + dsts.add(b); + leftToFill -= b.remaining(); + accumulatedBytes -= b.remaining(); + if (leftToFill == 0) + break; + } else { + int prevLimit = b.limit(); + b.limit(b.position() + leftToFill); + ByteBuffer slice = b.slice(); + dsts.add(slice); + b.limit(prevLimit); + b.position(b.position() + leftToFill); + accumulatedBytes -= leftToFill; + leftToFill = 0; + break; + } + } + assert (state == ACTIVE || state == CANCELLED) + ? leftToFill == 0 : state == COMPLETE; + assert (state == ACTIVE || state == CANCELLED) + ? remaining(dsts) == bufferSize : state == COMPLETE; + assert accumulatedBytes >= 0; + assert dsts.stream().filter(b -> b.position() != 0).count() == 0; + assert dsts.stream().filter(b -> !b.isReadOnly()).count() == 0; + return dsts; + } + + /** Subscription that is passed to the downstream processor. */ + private class DownstreamSubscription implements Flow.Subscription { + private final ThreadLocal tl = ThreadLocal.withInitial(() -> Boolean.FALSE); + private final AtomicInteger callCount = new AtomicInteger(); + private final AtomicBoolean cancelled = new AtomicBoolean(); // false + private final Object demandLock = new Object(); + private long demand; + + @Override + public void request(long n) { + if (n <= 0L) { + onError(new IllegalArgumentException( + "non-positive subscription request")); + return; + } + if (cancelled.get()) + return; + + synchronized (demandLock) { + long prev = demand; + demand += n; + if (demand < prev) // saturate + demand = Long.MAX_VALUE; + } + pushDemanded(); + } + + void pushDemanded() { + if (tl.get()) + return; // avoid recursion + tl.set(Boolean.TRUE); + try { + pushDemanded0(); + } finally { + tl.set(Boolean.FALSE); + } + } + + private void pushDemanded0() { + if (callCount.getAndIncrement() > 0) + return; // the executor of pushDemanded1 can do the work. + pushDemanded1(); + } + + private void pushDemanded1() { + try { + do { + while (true) { + List item; + synchronized (buffersLock) { + if (cancelled.get()) + return; + if (!hasEnoughAccumulatedBytes()) + break; + synchronized (demandLock) { + if (demand < 1) + break; + item = fromInternalBuffers(); + demand--; + } + } + assert item != null; + + downstreamProcessor.onNext(item); + } + if (cancelled.get()) + return; + + // complete only if all data consumed + boolean complete; + synchronized (buffersLock) { + complete = state == COMPLETE && internalBuffers.isEmpty(); + } + if (complete) { + downstreamProcessor.onComplete(); + return; + } + } while (callCount.decrementAndGet() > 0); + } finally { + callCount.set(0); // reset in case of a exception / cancellation + } + synchronized (demandLock) { + if (demand > 0) { // request more upstream data + subscription.request(1); + } + } + } + + @Override + public void cancel() { + if (cancelled.compareAndExchange(false, true)) + return; // already cancelled + + state = CANCELLED; // set CANCELLED state of upstream subscriber + subscription.cancel(); // cancel upstream subscription + } + + boolean hasDemand() { + synchronized (demandLock) { + return demand > 0; + } + } + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + Objects.requireNonNull(subscription); + if (this.subscription != null) { + subscription.cancel(); + return; + } + + assert state == UNSUBSCRIBED; + state = ACTIVE; + this.subscription = subscription; + downstreamSubscription = new DownstreamSubscription(); + downstreamProcessor.onSubscribe(downstreamSubscription); + } + + @Override + public void onNext(List item) { + Objects.requireNonNull(item); + //assert accumulatedBytes < bufferSize; + assert downstreamSubscription.hasDemand(); + + if (state == CANCELLED) + return; + + if (state != ACTIVE) + throw new InternalError("onNext on inactive subscriber"); + + synchronized (buffersLock) { + item.forEach(bb -> { + internalBuffers.add(bb.asReadOnlyBuffer()); + accumulatedBytes += bb.remaining(); + }); + } + + downstreamSubscription.pushDemanded(); + } + + @Override + public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); + assert state == ACTIVE; + state = ERROR; + downstreamProcessor.onError(throwable); + } + + @Override + public void onComplete() { + assert state == ACTIVE; + state = COMPLETE; + downstreamSubscription.pushDemanded(); + } + + @Override + public CompletionStage getBody() { + return downstreamProcessor.getBody(); + } +} --- /dev/null 2017-08-12 14:14:50.000000000 +0100 +++ new/test/java/net/httpclient/BufferingProcessorTest.java 2017-08-12 14:14:49.000000000 +0100 @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.SubmissionPublisher; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodyProcessor; +import jdk.test.lib.RandomFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import static java.lang.Long.MAX_VALUE; +import static java.lang.System.out; +import static java.util.concurrent.CompletableFuture.delayedExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.testng.Assert.*; + +/* + * @test + * @library /test/lib + * @build jdk.test.lib.RandomFactory + * @bug 8184285 + * @summary Direct test for HttpResponse.BodyProcessor.buffering() API + * @run testng BufferingProcessorTest + * @key randomness + */ + +public class BufferingProcessorTest { + + static final Random random = RandomFactory.getRandom(); + + @DataProvider(name = "negatives") + public Object[][] negatives() { + return new Object[][] { { 0 }, { -1 }, { -1000 } }; + } + + @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) + public void processorThrowsIAE(int bufferSize) { + BodyProcessor bp = BodyProcessor.asByteArray(); + BodyProcessor.buffering(bp, bufferSize); + } + + @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class) + public void handlerThrowsIAE(int bufferSize) { + BodyHandler bp = BodyHandler.asByteArray(); + BodyHandler.buffering(bp, bufferSize); + } + + // --- + + @DataProvider(name = "config") + public Object[][] config() { + return new Object[][] { + // numSubscribers delayMillis numBuffers bufferSize maxBufferSize minBufferSize + { 1, 0, 1, 1, 2, 1 }, + { 1, 0, 1, 10, 1000, 1 }, + { 1, 10, 1, 10, 1000, 1 }, + { 1, 0, 1, 1000, 1000, 1 }, + { 1, 0, 10, 1000, 1000, 1 }, + { 1, 0, 1000, 10 , 1000, 50 }, + { 1, 100, 1, 1000 * 4, 1000, 1 }, + { 100, 0, 1000, 1, 2, 1 }, + { 3, 0, 4, 5006, 1000, 1 }, + { 20, 0, 100, 4888, 1000, 100 }, + { 16, 10, 1000, 50 , 1000, 100 }, + }; + } + + @Test(dataProvider = "config") + public void test(int numSubscribers, + int delayMillis, + int numBuffers, + int bufferSize, + int maxBufferSize, + int minbufferSize) { + for (long perRequestAmount : new long[] { 1L, MAX_VALUE }) + test(numSubscribers, + delayMillis, + numBuffers, + bufferSize, + maxBufferSize, + minbufferSize, + perRequestAmount); + } + + public void test(int numSubscribers, + int delayMillis, + int numBuffers, + int bufferSize, + int maxBufferSize, + int minBuferSize, + long requestAmount) { + ExecutorService executor = Executors.newFixedThreadPool(5); + try { + SubmissionPublisher> publisher = + new SubmissionPublisher<>(executor, 1); + CompletableFuture cf = sink(publisher, + numSubscribers, + delayMillis, + numBuffers * bufferSize, + requestAmount, + maxBufferSize, + minBuferSize); + source(publisher, numBuffers, bufferSize); + cf.join(); + out.println("OK"); + } finally { + executor.shutdown(); + } + } + + static int accumulatedDataSize(List bufs) { + return bufs.stream().mapToInt(ByteBuffer::remaining).sum(); + } + + /** Returns a new BB with its contents set to monotonically increasing + * values, staring at the given start index and wrapping every 100. */ + static ByteBuffer allocateBuffer(int size, int startIdx) { + ByteBuffer b = ByteBuffer.allocate(size); + for (int i=0; i { + final int delayMillis; + final int bufferSize; + final int expectedTotalSize; + final long requestAmount; + final CompletableFuture completion; + final Executor delayedExecutor; + volatile Flow.Subscription subscription; + + TestProcessor(int bufferSize, + int delayMillis, + int expectedTotalSize, + long requestAmount) { + this.bufferSize = bufferSize; + this.completion = new CompletableFuture<>(); + this.delayMillis = delayMillis; + this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS); + this.expectedTotalSize = expectedTotalSize; + this.requestAmount = requestAmount; + } + + /** + * Example of a factory method which would decorate a buffering processor + * to create a new processor dependent on buffering capability. + * + * The integer type parameter simulates the body just by counting the + * number of bytes in the body. + */ + static BodyProcessor createSubscriber(int bufferSize, + int delay, + int expectedTotalSize, + long requestAmount) { + TestProcessor s = new TestProcessor(bufferSize, + delay, + expectedTotalSize, + requestAmount); + return BodyProcessor.buffering(s, bufferSize); + } + + private void requestMore() { subscription.request(requestAmount); } + + @Override + public void onSubscribe(Subscription subscription) { + assertNull(this.subscription); + this.subscription = subscription; + if (delayMillis > 0) + delayedExecutor.execute(this::requestMore); + else + requestMore(); + } + + volatile int wrongSizes; + volatile int totalBytesReceived; + volatile int onNextInvocations; + volatile int lastSeenSize = -1; + volatile boolean noMoreOnNext; // false + volatile int index; // 0 + + @Override + public void onNext(List items) { + long sz = accumulatedDataSize(items); + onNextInvocations++; + assertNotEquals(sz, 0L, "Unexpected empty buffers"); + items.stream().forEach(b -> assertEquals(b.position(), 0)); + assertFalse(noMoreOnNext); + + if (sz != bufferSize) { + String msg = sz + ", should be less than bufferSize, " + bufferSize; + assertTrue(sz < bufferSize, msg); + assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize); + noMoreOnNext = true; + wrongSizes++; + } else { + assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize"); + } + + // Ensure expected contents + for (ByteBuffer b : items) { + while (b.hasRemaining()) { + assertEquals(b.get(), (byte) (index % 100)); + index++; + } + } + + totalBytesReceived += sz; + assertEquals(totalBytesReceived, index ); + if (delayMillis > 0) + delayedExecutor.execute(this::requestMore); + else + requestMore(); + } + + @Override + public void onError(Throwable throwable) { + completion.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + if (wrongSizes > 1) { // allow just the final item to be smaller + String msg = "Wrong sizes. Expected no more than 1. [" + this + "]"; + completion.completeExceptionally(new Throwable(msg)); + } + if (totalBytesReceived != expectedTotalSize) { + String msg = "Wrong number of bytes. [" + this + "]"; + completion.completeExceptionally(new Throwable(msg)); + } else { + completion.complete(totalBytesReceived); + } + } + + @Override + public CompletionStage getBody() { return completion; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(super.toString()); + sb.append(", bufferSize=").append(bufferSize); + sb.append(", onNextInvocations=").append(onNextInvocations); + sb.append(", totalBytesReceived=").append(totalBytesReceived); + sb.append(", expectedTotalSize=").append(expectedTotalSize); + sb.append(", requestAmount=").append(requestAmount); + sb.append(", lastSeenSize=").append(lastSeenSize); + sb.append(", wrongSizes=").append(wrongSizes); + sb.append(", index=").append(index); + return sb.toString(); + } + } + + /** + * Publishes data, through the given publisher, using the main thread. + * + * Note: The executor supplied when creating the SubmissionPublisher provides + * the threads for executing the Subscribers. + * + * @param publisher the publisher + * @param numBuffers the number of buffers to send ( before splitting in two ) + * @param bufferSize the total size of the data to send ( before splitting in two ) + */ + static void source(SubmissionPublisher> publisher, + int numBuffers, + int bufferSize) { + out.printf("Publishing %d buffers of size %d each\n", numBuffers, bufferSize); + int index = 0; + for (int i=0; i sink(SubmissionPublisher> publisher, + int numSubscribers, + int delayMillis, + int expectedTotalSize, + long requestAmount, + int maxBufferSize, + int minBufferSize) { + int[] bufferSizes = new int[numSubscribers]; + CompletableFuture[] cfs = new CompletableFuture[numSubscribers]; + for (int i=0; i sub = TestProcessor.createSubscriber(bufferSize, + delayMillis, + expectedTotalSize, + requestAmount); + cfs[i] = sub.getBody().toCompletableFuture(); + publisher.subscribe(sub); + } + out.printf("Number of subscribers: %d\n", numSubscribers); + out.printf("Each subscriber reads data with buffer sizes:"); + out.printf("%s bytes\n", Arrays.toString(bufferSizes)); + out.printf("Subscription delay is %d msec\n", delayMillis); + out.printf("Request amount is %d items\n", requestAmount); + return CompletableFuture.allOf(cfs); + } + + // --- + + // TODO: Add a test for cancel + + // --- + + /* Main entry point for standalone testing of the main functional test. */ + public static void main(String... args) { + BufferingProcessorTest t = new BufferingProcessorTest(); + for (Object[] objs : t.config()) + t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]); + } +}