--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java 2017-08-16 17:16:48.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java 2017-08-16 17:16:48.000000000 +0100 @@ -196,7 +196,7 @@ * This same method is called to try and resume output after a blocking * handshaking operation has completed. */ - private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { + private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread. try { ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs); @@ -230,6 +230,9 @@ closeExceptionally(t); errorHandler.accept(t); } + // We always return true: either all the data was sent, or + // an exception happened and we have closed the queue. + return true; } // Connecting at this level means the initial handshake has completed. --- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java 2017-08-16 17:16:49.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java 2017-08-16 17:16:49.000000000 +0100 @@ -231,7 +231,7 @@ assert false; } - void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { + boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { try { ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs); while (Utils.remaining(bufs) > 0) { @@ -239,13 +239,14 @@ if (n == 0) { delayCallback.setDelayed(refs); client.registerEvent(new WriteEvent()); - return; + return false; } } ByteBufferReference.clear(refs); } catch (IOException e) { shutdown(); } + return true; } @Override --- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java 2017-08-16 17:16:50.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java 2017-08-16 17:16:50.000000000 +0100 @@ -37,6 +37,22 @@ import java.util.function.BiConsumer; public class AsyncWriteQueue implements Closeable { + + @FunctionalInterface + public static interface AsyncConsumer { + /** + * Takes an array of buffer reference and attempt to send the data + * downstream. If not all the data can be sent, then push back + * to the source queue by calling {@code source.setDelayed(buffers)} + * and return false. If all the data was successfully sent downstream + * then returns true. + * @param buffers An array of ButeBufferReference containing data + * to send downstream. + * @param source This AsyncWriteQueue. + * @return true if all the data could be sent downstream, false otherwise. + */ + boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); + } private static final int IDLE = 0; // nobody is flushing from the queue private static final int FLUSHING = 1; // there is the only thread flushing from the queue @@ -51,7 +67,7 @@ private final AtomicInteger state = new AtomicInteger(IDLE); private final Deque queue = new ConcurrentLinkedDeque<>(); - private final BiConsumer consumeAction; + private final AsyncConsumer consumeAction; // Queue may be processed in two modes: // 1. if(!doFullDrain) - invoke callback on each chunk @@ -60,11 +76,11 @@ private ByteBufferReference[] delayedElement = null; - public AsyncWriteQueue(BiConsumer consumeAction) { + public AsyncWriteQueue(AsyncConsumer consumeAction) { this(consumeAction, true); } - public AsyncWriteQueue(BiConsumer consumeAction, boolean doFullDrain) { + public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { this.consumeAction = consumeAction; this.doFullDrain = doFullDrain; } @@ -156,8 +172,7 @@ } while(true) { while (element != null) { - consumeAction.accept(element, this); - if (state.get() == DELAYED) { + if (!consumeAction.trySend(element, this)) { return; } element = drain(); --- old/test/java/net/httpclient/http2/FixedThreadPoolTest.java 2017-08-16 17:16:51.000000000 +0100 +++ new/test/java/net/httpclient/http2/FixedThreadPoolTest.java 2017-08-16 17:16:51.000000000 +0100 @@ -23,8 +23,7 @@ /* * @test - * @bug 8087112 - * @key intermittent + * @bug 8087112 8177935 * @library /lib/testlibrary server * @build jdk.testlibrary.SimpleSSLContext * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common