--- /dev/null 2017-10-28 22:49:55.551349757 -0700 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java 2017-11-30 04:05:42.521102857 -0800 @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.incubator.http; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import jdk.incubator.http.internal.common.Demand; +import jdk.incubator.http.internal.common.SequentialScheduler; +import jdk.incubator.http.internal.common.Utils; + +/** + * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given + * amount ( in bytes ) of a publisher's data before pushing it to a downstream + * subscriber. + */ +class BufferingSubscriber implements HttpResponse.BodySubscriber +{ + /** The downstream consumer of the data. */ + private final HttpResponse.BodySubscriber downstreamSubscriber; + /** The amount of data to be accumulate before pushing downstream. */ + private final int bufferSize; + + /** The subscription, created lazily. */ + private volatile Flow.Subscription subscription; + /** The downstream subscription, created lazily. */ + private volatile DownstreamSubscription downstreamSubscription; + + /** Must be held when accessing the internal buffers. */ + private final Object buffersLock = new Object(); + /** The internal buffers holding the buffered data. */ + private ArrayList internalBuffers; + /** The actual accumulated remaining bytes in internalBuffers. */ + private int accumulatedBytes; + + /** Holds the Throwable from upstream's onError. */ + private volatile Throwable throwable; + + /** State of the buffering subscriber: + * 1) [UNSUBSCRIBED] when initially created + * 2) [ACTIVE] when subscribed and can receive data + * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) + */ + static final int UNSUBSCRIBED = 0x01; + static final int ACTIVE = 0x02; + static final int ERROR = 0x04; + static final int CANCELLED = 0x08; + static final int COMPLETE = 0x10; + + private volatile int state; + + BufferingSubscriber(HttpResponse.BodySubscriber downstreamSubscriber, + int bufferSize) { + this.downstreamSubscriber = downstreamSubscriber; + this.bufferSize = bufferSize; + synchronized (buffersLock) { + internalBuffers = new ArrayList<>(); + } + state = UNSUBSCRIBED; + } + + /** Returns the number of bytes remaining in the given buffers. */ + private static final int remaining(List buffers) { + return buffers.stream().mapToInt(ByteBuffer::remaining).sum(); + } + + /** + * Tells whether, or not, there is at least a sufficient number of bytes + * accumulated in the internal buffers. If the subscriber is COMPLETE, and + * has some buffered data, then there is always enough ( to pass downstream ). + */ + private final boolean hasEnoughAccumulatedBytes() { + assert Thread.holdsLock(buffersLock); + return accumulatedBytes >= bufferSize + || (state == COMPLETE && accumulatedBytes > 0); + } + + /** + * Returns a new, unmodifiable, List containing exactly the + * amount of data as required before pushing downstream. The amount of data + * may be less than required ( bufferSize ), in the case where the subscriber + * is COMPLETE. + */ + private List fromInternalBuffers() { + assert Thread.holdsLock(buffersLock); + int leftToFill = bufferSize; + int state = this.state; + assert (state == ACTIVE || state == CANCELLED) + ? accumulatedBytes >= leftToFill : true; + List dsts = new ArrayList<>(); + + ListIterator itr = internalBuffers.listIterator(); + while (itr.hasNext()) { + ByteBuffer b = itr.next(); + if (b.remaining() <= leftToFill) { + itr.remove(); + if (b.position() != 0) + b = b.slice(); // ensure position = 0 when propagated + dsts.add(b); + leftToFill -= b.remaining(); + accumulatedBytes -= b.remaining(); + if (leftToFill == 0) + break; + } else { + int prevLimit = b.limit(); + b.limit(b.position() + leftToFill); + ByteBuffer slice = b.slice(); + dsts.add(slice); + b.limit(prevLimit); + b.position(b.position() + leftToFill); + accumulatedBytes -= leftToFill; + leftToFill = 0; + break; + } + } + assert (state == ACTIVE || state == CANCELLED) + ? leftToFill == 0 : state == COMPLETE; + assert (state == ACTIVE || state == CANCELLED) + ? remaining(dsts) == bufferSize : state == COMPLETE; + assert accumulatedBytes >= 0; + assert dsts.stream().noneMatch(b -> b.position() != 0); + return Collections.unmodifiableList(dsts); + } + + /** Subscription that is passed to the downstream subscriber. */ + private class DownstreamSubscription implements Flow.Subscription { + private final AtomicBoolean cancelled = new AtomicBoolean(); // false + private final Demand demand = new Demand(); + private volatile boolean illegalArg; + + @Override + public void request(long n) { + if (cancelled.get() || illegalArg) { + return; + } + if (n <= 0L) { + // pass the "bad" value upstream so the Publisher can deal with + // it appropriately, i.e. invoke onError + illegalArg = true; + subscription.request(n); + return; + } + + demand.increase(n); + + pushDemanded(); + } + + private final SequentialScheduler pushDemandedScheduler = + new SequentialScheduler(new PushDemandedTask()); + + void pushDemanded() { + if (cancelled.get()) + return; + pushDemandedScheduler.runOrSchedule(); + } + + class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask { + @Override + public void run() { + try { + Throwable t = throwable; + if (t != null) { + downstreamSubscriber.onError(t); + pushDemandedScheduler.stop(); // stop the demand scheduler + return; + } + + while (true) { + List item; + synchronized (buffersLock) { + if (cancelled.get()) + return; + if (!hasEnoughAccumulatedBytes()) + break; + if (!demand.tryDecrement()) + break; + item = fromInternalBuffers(); + } + assert item != null; + + downstreamSubscriber.onNext(item); + } + if (cancelled.get()) + return; + + // complete only if all data consumed + boolean complete; + synchronized (buffersLock) { + complete = state == COMPLETE && internalBuffers.isEmpty(); + } + if (complete) { + downstreamSubscriber.onComplete(); + pushDemandedScheduler.stop(); // stop the demand scheduler + return; + } + } catch (Throwable t) { + cancel(); // cancel if there is any error + throw t; + } + + boolean requestMore = false; + synchronized (buffersLock) { + if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) { + // request more upstream data + requestMore = true; + } + } + if (requestMore) + subscription.request(1); + } + } + + @Override + public void cancel() { + if (cancelled.compareAndExchange(false, true)) + return; // already cancelled + + state = CANCELLED; // set CANCELLED state of upstream subscriber + subscription.cancel(); // cancel upstream subscription + pushDemandedScheduler.stop(); // stop the demand scheduler + } + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + Objects.requireNonNull(subscription); + if (this.subscription != null) { + subscription.cancel(); + return; + } + + int s = this.state; + assert s == UNSUBSCRIBED; + state = ACTIVE; + this.subscription = subscription; + downstreamSubscription = new DownstreamSubscription(); + downstreamSubscriber.onSubscribe(downstreamSubscription); + } + + @Override + public void onNext(List item) { + Objects.requireNonNull(item); + + int s = state; + if (s == CANCELLED) + return; + + if (s != ACTIVE) + throw new InternalError("onNext on inactive subscriber"); + + synchronized (buffersLock) { + accumulatedBytes += Utils.accumulateBuffers(internalBuffers, item); + } + + downstreamSubscription.pushDemanded(); + } + + @Override + public void onError(Throwable incomingThrowable) { + Objects.requireNonNull(incomingThrowable); + int s = state; + assert s == ACTIVE : "Expected ACTIVE, got:" + s; + state = ERROR; + Throwable t = this.throwable; + assert t == null : "Expected null, got:" + t; + this.throwable = incomingThrowable; + downstreamSubscription.pushDemanded(); + } + + @Override + public void onComplete() { + int s = state; + assert s == ACTIVE : "Expected ACTIVE, got:" + s; + state = COMPLETE; + downstreamSubscription.pushDemanded(); + } + + @Override + public CompletionStage getBody() { + return downstreamSubscriber.getBody(); + } +}