/* * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.incubator.http; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import jdk.incubator.http.internal.common.Demand; import jdk.incubator.http.internal.common.SequentialScheduler; import jdk.incubator.http.internal.common.Utils; /** * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given * amount ( in bytes ) of a publisher's data before pushing it to a downstream * subscriber. */ class BufferingSubscriber implements HttpResponse.BodySubscriber { /** The downstream consumer of the data. */ private final HttpResponse.BodySubscriber downstreamSubscriber; /** The amount of data to be accumulate before pushing downstream. */ private final int bufferSize; /** The subscription, created lazily. */ private volatile Flow.Subscription subscription; /** The downstream subscription, created lazily. */ private volatile DownstreamSubscription downstreamSubscription; /** Must be held when accessing the internal buffers. */ private final Object buffersLock = new Object(); /** The internal buffers holding the buffered data. */ private ArrayList internalBuffers; /** The actual accumulated remaining bytes in internalBuffers. */ private int accumulatedBytes; /** Holds the Throwable from upstream's onError. */ private volatile Throwable throwable; /** State of the buffering subscriber: * 1) [UNSUBSCRIBED] when initially created * 2) [ACTIVE] when subscribed and can receive data * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) */ static final int UNSUBSCRIBED = 0x01; static final int ACTIVE = 0x02; static final int ERROR = 0x04; static final int CANCELLED = 0x08; static final int COMPLETE = 0x10; private volatile int state; BufferingSubscriber(HttpResponse.BodySubscriber downstreamSubscriber, int bufferSize) { this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber); this.bufferSize = bufferSize; synchronized (buffersLock) { internalBuffers = new ArrayList<>(); } state = UNSUBSCRIBED; } /** Returns the number of bytes remaining in the given buffers. */ private static final long remaining(List buffers) { return buffers.stream().mapToLong(ByteBuffer::remaining).sum(); } /** * Tells whether, or not, there is at least a sufficient number of bytes * accumulated in the internal buffers. If the subscriber is COMPLETE, and * has some buffered data, then there is always enough ( to pass downstream ). */ private final boolean hasEnoughAccumulatedBytes() { assert Thread.holdsLock(buffersLock); return accumulatedBytes >= bufferSize || (state == COMPLETE && accumulatedBytes > 0); } /** * Returns a new, unmodifiable, List containing exactly the * amount of data as required before pushing downstream. The amount of data * may be less than required ( bufferSize ), in the case where the subscriber * is COMPLETE. */ private List fromInternalBuffers() { assert Thread.holdsLock(buffersLock); int leftToFill = bufferSize; int state = this.state; assert (state == ACTIVE || state == CANCELLED) ? accumulatedBytes >= leftToFill : true; List dsts = new ArrayList<>(); ListIterator itr = internalBuffers.listIterator(); while (itr.hasNext()) { ByteBuffer b = itr.next(); if (b.remaining() <= leftToFill) { itr.remove(); if (b.position() != 0) b = b.slice(); // ensure position = 0 when propagated dsts.add(b); leftToFill -= b.remaining(); accumulatedBytes -= b.remaining(); if (leftToFill == 0) break; } else { int prevLimit = b.limit(); b.limit(b.position() + leftToFill); ByteBuffer slice = b.slice(); dsts.add(slice); b.limit(prevLimit); b.position(b.position() + leftToFill); accumulatedBytes -= leftToFill; leftToFill = 0; break; } } assert (state == ACTIVE || state == CANCELLED) ? leftToFill == 0 : state == COMPLETE; assert (state == ACTIVE || state == CANCELLED) ? remaining(dsts) == bufferSize : state == COMPLETE; assert accumulatedBytes >= 0; assert dsts.stream().noneMatch(b -> b.position() != 0); return Collections.unmodifiableList(dsts); } /** Subscription that is passed to the downstream subscriber. */ private class DownstreamSubscription implements Flow.Subscription { private final AtomicBoolean cancelled = new AtomicBoolean(); // false private final Demand demand = new Demand(); private volatile boolean illegalArg; @Override public void request(long n) { if (cancelled.get() || illegalArg) { return; } if (n <= 0L) { // pass the "bad" value upstream so the Publisher can deal with // it appropriately, i.e. invoke onError illegalArg = true; subscription.request(n); return; } demand.increase(n); pushDemanded(); } private final SequentialScheduler pushDemandedScheduler = new SequentialScheduler(new PushDemandedTask()); void pushDemanded() { if (cancelled.get()) return; pushDemandedScheduler.runOrSchedule(); } class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask { @Override public void run() { try { Throwable t = throwable; if (t != null) { pushDemandedScheduler.stop(); // stop the demand scheduler downstreamSubscriber.onError(t); return; } while (true) { List item; synchronized (buffersLock) { if (cancelled.get()) return; if (!hasEnoughAccumulatedBytes()) break; if (!demand.tryDecrement()) break; item = fromInternalBuffers(); } assert item != null; downstreamSubscriber.onNext(item); } if (cancelled.get()) return; // complete only if all data consumed boolean complete; synchronized (buffersLock) { complete = state == COMPLETE && internalBuffers.isEmpty(); } if (complete) { assert internalBuffers.isEmpty(); pushDemandedScheduler.stop(); // stop the demand scheduler downstreamSubscriber.onComplete(); return; } } catch (Throwable t) { cancel(); // cancel if there is any error throw t; } boolean requestMore = false; synchronized (buffersLock) { if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) { // request more upstream data requestMore = true; } } if (requestMore) subscription.request(1); } } @Override public void cancel() { if (cancelled.compareAndExchange(false, true)) return; // already cancelled state = CANCELLED; // set CANCELLED state of upstream subscriber subscription.cancel(); // cancel upstream subscription pushDemandedScheduler.stop(); // stop the demand scheduler } } @Override public void onSubscribe(Flow.Subscription subscription) { Objects.requireNonNull(subscription); if (this.subscription != null) { subscription.cancel(); return; } int s = this.state; assert s == UNSUBSCRIBED; state = ACTIVE; this.subscription = subscription; downstreamSubscription = new DownstreamSubscription(); downstreamSubscriber.onSubscribe(downstreamSubscription); } @Override public void onNext(List item) { Objects.requireNonNull(item); int s = state; if (s == CANCELLED) return; if (s != ACTIVE) throw new InternalError("onNext on inactive subscriber"); synchronized (buffersLock) { accumulatedBytes += Utils.accumulateBuffers(internalBuffers, item); } downstreamSubscription.pushDemanded(); } @Override public void onError(Throwable incomingThrowable) { Objects.requireNonNull(incomingThrowable); int s = state; assert s == ACTIVE : "Expected ACTIVE, got:" + s; state = ERROR; Throwable t = this.throwable; assert t == null : "Expected null, got:" + t; this.throwable = incomingThrowable; downstreamSubscription.pushDemanded(); } @Override public void onComplete() { int s = state; assert s == ACTIVE : "Expected ACTIVE, got:" + s; state = COMPLETE; downstreamSubscription.pushDemanded(); } @Override public CompletionStage getBody() { return downstreamSubscriber.getBody(); } }