--- old/test/java/net/httpclient/HttpInputStreamTest.java 2017-08-12 14:14:42.000000000 +0100 +++ new/test/java/net/httpclient/HttpInputStreamTest.java 2017-08-12 14:14:42.000000000 +0100 @@ -32,6 +32,8 @@ import jdk.incubator.http.HttpResponse; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; @@ -40,6 +42,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.stream.Stream; +import static java.lang.System.err; /* * @test @@ -61,7 +64,7 @@ public static class HttpInputStreamHandler implements HttpResponse.BodyHandler { - public static final int MAX_BUFFERS_IN_QUEUE = 1; + public static final int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer private final int maxBuffers; @@ -86,14 +89,16 @@ implements HttpResponse.BodyProcessor { // An immutable ByteBuffer sentinel to mark that the last byte was received. - private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); + private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); + private static final List LAST_LIST = List.of(LAST_BUFFER); // A queue of yet unprocessed ByteBuffers received from the flow API. - private final BlockingQueue buffers; + private final BlockingQueue> buffers; private volatile Flow.Subscription subscription; private volatile boolean closed; private volatile Throwable failed; - private volatile ByteBuffer current; + private volatile List currentList; + private volatile ByteBuffer currentBuffer; HttpResponseInputStream() { this(MAX_BUFFERS_IN_QUEUE); @@ -119,40 +124,50 @@ // a new buffer is made available through the Flow API, or the // end of the flow is reached. private ByteBuffer current() throws IOException { - while (current == null || !current.hasRemaining()) { - // Check whether the stream is claused or exhausted + while (currentBuffer == null || !currentBuffer.hasRemaining()) { + // Check whether the stream is closed or exhausted if (closed || failed != null) { throw new IOException("closed", failed); } - if (current == LAST) break; + if (currentBuffer == LAST_BUFFER) break; try { - // Take a new buffer from the queue, blocking - // if none is available yet... - if (DEBUG) System.err.println("Taking Buffer"); - current = buffers.take(); - if (DEBUG) System.err.println("Buffer Taken"); - - // Check whether some exception was encountered - // upstream - if (closed || failed != null) { - throw new IOException("closed", failed); + if (currentList == null || currentList.isEmpty()) { + // Take a new list of buffers from the queue, blocking + // if none is available yet... + + if (DEBUG) err.println("Taking list of Buffers"); + List lb = buffers.take(); + if (DEBUG) err.println("List of Buffers Taken"); + + // Check whether an exception was encountered upstream + if (closed || failed != null) + throw new IOException("closed", failed); + + // Check whether we're done. + if (lb == LAST_LIST) { + currentList = LAST_LIST; + currentBuffer = LAST_BUFFER; + break; + } + + currentList = new ArrayList<>(lb); // TODO: lb is immutable + + // Request another upstream item ( list of buffers ) + Flow.Subscription s = subscription; + if (s != null) + s.request(1); } - - // Check whether we're done. - if (current == LAST) break; - - // Inform the producer that it can start sending - // us a new buffer - Flow.Subscription s = subscription; - if (s != null) s.request(1); - + assert currentList != null; + assert !currentList.isEmpty(); + if (DEBUG) err.println("Next Buffer"); + currentBuffer = currentList.remove(0); } catch (InterruptedException ex) { // continue } } - assert current == LAST || current.hasRemaining(); - return current; + assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); + return currentBuffer; } @Override @@ -160,7 +175,7 @@ // get the buffer to read from, possibly blocking if // none is available ByteBuffer buffer; - if ((buffer = current()) == LAST) return -1; + if ((buffer = current()) == LAST_BUFFER) return -1; // don't attempt to read more than what is available // in the current buffer. @@ -175,7 +190,7 @@ @Override public int read() throws IOException { ByteBuffer buffer; - if ((buffer = current()) == LAST) return -1; + if ((buffer = current()) == LAST_BUFFER) return -1; return buffer.get() & 0xFF; } @@ -186,11 +201,11 @@ } @Override - public synchronized void onNext(ByteBuffer t) { + public synchronized void onNext(List t) { try { - if (DEBUG) System.err.println("next buffer received"); + if (DEBUG) err.println("next item received"); buffers.put(t); - if (DEBUG) System.err.println("buffered offered"); + if (DEBUG) err.println("item offered"); } catch (Exception ex) { failed = ex; try { @@ -209,7 +224,7 @@ @Override public synchronized void onComplete() { subscription = null; - onNext(LAST); + onNext(LAST_LIST); } @Override @@ -275,7 +290,7 @@ CompletableFuture> handle = client.sendAsync(request, new HttpInputStreamHandler()); - if (DEBUG) System.err.println("Request sent"); + if (DEBUG) err.println("Request sent"); HttpResponse pending = handle.get(); @@ -301,8 +316,8 @@ char[] buff = new char[32]; int off=0, n=0; - if (DEBUG) System.err.println("Start receiving response body"); - if (DEBUG) System.err.println("Charset: " + charset.get()); + if (DEBUG) err.println("Start receiving response body"); + if (DEBUG) err.println("Charset: " + charset.get()); // Start consuming the InputStream as the data arrives. // Will block until there is something to read...