--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java 2017-08-16 17:16:50.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java 2017-08-16 17:16:50.000000000 +0100 @@ -37,6 +37,22 @@ import java.util.function.BiConsumer; public class AsyncWriteQueue implements Closeable { + + @FunctionalInterface + public static interface AsyncConsumer { + /** + * Takes an array of buffer reference and attempt to send the data + * downstream. If not all the data can be sent, then push back + * to the source queue by calling {@code source.setDelayed(buffers)} + * and return false. If all the data was successfully sent downstream + * then returns true. + * @param buffers An array of ButeBufferReference containing data + * to send downstream. + * @param source This AsyncWriteQueue. + * @return true if all the data could be sent downstream, false otherwise. + */ + boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); + } private static final int IDLE = 0; // nobody is flushing from the queue private static final int FLUSHING = 1; // there is the only thread flushing from the queue @@ -51,7 +67,7 @@ private final AtomicInteger state = new AtomicInteger(IDLE); private final Deque queue = new ConcurrentLinkedDeque<>(); - private final BiConsumer consumeAction; + private final AsyncConsumer consumeAction; // Queue may be processed in two modes: // 1. if(!doFullDrain) - invoke callback on each chunk @@ -60,11 +76,11 @@ private ByteBufferReference[] delayedElement = null; - public AsyncWriteQueue(BiConsumer consumeAction) { + public AsyncWriteQueue(AsyncConsumer consumeAction) { this(consumeAction, true); } - public AsyncWriteQueue(BiConsumer consumeAction, boolean doFullDrain) { + public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { this.consumeAction = consumeAction; this.doFullDrain = doFullDrain; } @@ -156,8 +172,7 @@ } while(true) { while (element != null) { - consumeAction.accept(element, this); - if (state.get() == DELAYED) { + if (!consumeAction.trySend(element, this)) { return; } element = drain();