--- /dev/null 2017-08-12 14:14:48.000000000 +0100 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingProcessor.java 2017-08-12 14:14:48.000000000 +0100 @@ -0,0 +1,307 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.incubator.http; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A buffering BodyProcessor. When subscribed, accumulates ( buffers ) a given + * amount ( in bytes ) of a subscriptions data before pushing it to a downstream + * processor. + */ +class BufferingProcessor implements HttpResponse.BodyProcessor +{ + /** The downstream consumer of the data. */ + private final HttpResponse.BodyProcessor downstreamProcessor; + /** The amount of data to be accumulate before pushing downstream. */ + private final int bufferSize; + + /** The subscription, created lazily. */ + private volatile Flow.Subscription subscription; + /** The downstream subscription, created lazily. */ + private volatile DownstreamSubscription downstreamSubscription; + + /** Must be held when accessing the internal buffers. */ + private final Object buffersLock = new Object(); + /** The internal buffers holding the buffered data. */ + private ArrayList internalBuffers; + /** The actual accumulated remaining bytes in internalBuffers. */ + private int accumulatedBytes; + + /** State of the buffering processor: + * 1) [UNSUBSCRIBED] when initially created + * 2) [ACTIVE] when subscribed and can receive data + * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) + */ + static final int UNSUBSCRIBED = 0x01; + static final int ACTIVE = 0x02; + static final int ERROR = 0x04; + static final int CANCELLED = 0x08; + static final int COMPLETE = 0x10; + + private volatile int state; + + BufferingProcessor(HttpResponse.BodyProcessor downstreamProcessor, + int bufferSize) { + this.downstreamProcessor = downstreamProcessor; + this.bufferSize = bufferSize; + synchronized (buffersLock) { + internalBuffers = new ArrayList<>(); + } + state = UNSUBSCRIBED; + } + + /** Returns the number of bytes remaining in the given buffers. */ + private static final int remaining(List buffers) { + return buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + } + + /** + * Tells whether, or not, there is at least a sufficient number of bytes + * accumulated in the internal buffers. If the processor is COMPLETE, and + * has some buffered data, then there is always enough ( to pass downstream ). + */ + private final boolean hasEnoughAccumulatedBytes() { + assert Thread.holdsLock(buffersLock); + return accumulatedBytes >= bufferSize + || (state == COMPLETE && accumulatedBytes > 0); + } + + /** + * Returns a new List containing exactly the amount of data as + * required before pushing downstream. The amount of data may be less than + * required ( bufferSize ), in the case where the processor is COMPLETE. + */ + private List fromInternalBuffers() { + assert Thread.holdsLock(buffersLock); + int leftToFill = bufferSize; + assert (state == ACTIVE || state == CANCELLED) + ? accumulatedBytes >= leftToFill : true; + List dsts = new ArrayList<>(); + + ListIterator itr = internalBuffers.listIterator(); + while (itr.hasNext()) { + ByteBuffer b = itr.next(); + if (b.remaining() <= leftToFill) { + itr.remove(); + if (b.position() != 0) + b = b.slice(); // ensure position = 0 when propagated + dsts.add(b); + leftToFill -= b.remaining(); + accumulatedBytes -= b.remaining(); + if (leftToFill == 0) + break; + } else { + int prevLimit = b.limit(); + b.limit(b.position() + leftToFill); + ByteBuffer slice = b.slice(); + dsts.add(slice); + b.limit(prevLimit); + b.position(b.position() + leftToFill); + accumulatedBytes -= leftToFill; + leftToFill = 0; + break; + } + } + assert (state == ACTIVE || state == CANCELLED) + ? leftToFill == 0 : state == COMPLETE; + assert (state == ACTIVE || state == CANCELLED) + ? remaining(dsts) == bufferSize : state == COMPLETE; + assert accumulatedBytes >= 0; + assert dsts.stream().filter(b -> b.position() != 0).count() == 0; + assert dsts.stream().filter(b -> !b.isReadOnly()).count() == 0; + return dsts; + } + + /** Subscription that is passed to the downstream processor. */ + private class DownstreamSubscription implements Flow.Subscription { + private final ThreadLocal tl = ThreadLocal.withInitial(() -> Boolean.FALSE); + private final AtomicInteger callCount = new AtomicInteger(); + private final AtomicBoolean cancelled = new AtomicBoolean(); // false + private final Object demandLock = new Object(); + private long demand; + + @Override + public void request(long n) { + if (n <= 0L) { + onError(new IllegalArgumentException( + "non-positive subscription request")); + return; + } + if (cancelled.get()) + return; + + synchronized (demandLock) { + long prev = demand; + demand += n; + if (demand < prev) // saturate + demand = Long.MAX_VALUE; + } + pushDemanded(); + } + + void pushDemanded() { + if (tl.get()) + return; // avoid recursion + tl.set(Boolean.TRUE); + try { + pushDemanded0(); + } finally { + tl.set(Boolean.FALSE); + } + } + + private void pushDemanded0() { + if (callCount.getAndIncrement() > 0) + return; // the executor of pushDemanded1 can do the work. + pushDemanded1(); + } + + private void pushDemanded1() { + try { + do { + while (true) { + List item; + synchronized (buffersLock) { + if (cancelled.get()) + return; + if (!hasEnoughAccumulatedBytes()) + break; + synchronized (demandLock) { + if (demand < 1) + break; + item = fromInternalBuffers(); + demand--; + } + } + assert item != null; + + downstreamProcessor.onNext(item); + } + if (cancelled.get()) + return; + + // complete only if all data consumed + boolean complete; + synchronized (buffersLock) { + complete = state == COMPLETE && internalBuffers.isEmpty(); + } + if (complete) { + downstreamProcessor.onComplete(); + return; + } + } while (callCount.decrementAndGet() > 0); + } finally { + callCount.set(0); // reset in case of a exception / cancellation + } + synchronized (demandLock) { + if (demand > 0) { // request more upstream data + subscription.request(1); + } + } + } + + @Override + public void cancel() { + if (cancelled.compareAndExchange(false, true)) + return; // already cancelled + + state = CANCELLED; // set CANCELLED state of upstream subscriber + subscription.cancel(); // cancel upstream subscription + } + + boolean hasDemand() { + synchronized (demandLock) { + return demand > 0; + } + } + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + Objects.requireNonNull(subscription); + if (this.subscription != null) { + subscription.cancel(); + return; + } + + assert state == UNSUBSCRIBED; + state = ACTIVE; + this.subscription = subscription; + downstreamSubscription = new DownstreamSubscription(); + downstreamProcessor.onSubscribe(downstreamSubscription); + } + + @Override + public void onNext(List item) { + Objects.requireNonNull(item); + //assert accumulatedBytes < bufferSize; + assert downstreamSubscription.hasDemand(); + + if (state == CANCELLED) + return; + + if (state != ACTIVE) + throw new InternalError("onNext on inactive subscriber"); + + synchronized (buffersLock) { + item.forEach(bb -> { + internalBuffers.add(bb.asReadOnlyBuffer()); + accumulatedBytes += bb.remaining(); + }); + } + + downstreamSubscription.pushDemanded(); + } + + @Override + public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); + assert state == ACTIVE; + state = ERROR; + downstreamProcessor.onError(throwable); + } + + @Override + public void onComplete() { + assert state == ACTIVE; + state = COMPLETE; + downstreamSubscription.pushDemanded(); + } + + @Override + public CompletionStage getBody() { + return downstreamProcessor.getBody(); + } +}