1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.util.ArrayList;
  30 import java.util.Collections;
  31 import java.util.List;
  32 import java.util.ListIterator;
  33 import java.util.Objects;
  34 import java.util.concurrent.CompletionStage;
  35 import java.util.concurrent.Flow;
  36 import java.util.concurrent.atomic.AtomicBoolean;
  37 import jdk.incubator.http.internal.common.Demand;
  38 import jdk.incubator.http.internal.common.SequentialScheduler;
  39 import jdk.incubator.http.internal.common.Utils;
  40 
  41 /**
  42  * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given
  43  * amount ( in bytes ) of a publisher's data before pushing it to a downstream
  44  * subscriber.
  45  */
  46 class BufferingSubscriber<T> implements HttpResponse.BodySubscriber<T>
  47 {
  48     /** The downstream consumer of the data. */
  49     private final HttpResponse.BodySubscriber<T> downstreamSubscriber;
  50     /** The amount of data to be accumulate before pushing downstream. */
  51     private final int bufferSize;
  52 
  53     /** The subscription, created lazily. */
  54     private volatile Flow.Subscription subscription;
  55     /** The downstream subscription, created lazily. */
  56     private volatile DownstreamSubscription downstreamSubscription;
  57 
  58     /** Must be held when accessing the internal buffers. */
  59     private final Object buffersLock = new Object();
  60     /** The internal buffers holding the buffered data. */
  61     private ArrayList<ByteBuffer> internalBuffers;
  62     /** The actual accumulated remaining bytes in internalBuffers. */
  63     private int accumulatedBytes;
  64 
  65     /** Holds the Throwable from upstream's onError. */
  66     private volatile Throwable throwable;
  67 
  68     /** State of the buffering subscriber:
  69      *  1) [UNSUBSCRIBED] when initially created
  70      *  2) [ACTIVE] when subscribed and can receive data
  71      *  3) [ERROR | CANCELLED | COMPLETE] (terminal state)
  72      */
  73     static final int UNSUBSCRIBED = 0x01;
  74     static final int ACTIVE       = 0x02;
  75     static final int ERROR        = 0x04;
  76     static final int CANCELLED    = 0x08;
  77     static final int COMPLETE     = 0x10;
  78 
  79     private volatile int state;
  80 
  81     BufferingSubscriber(HttpResponse.BodySubscriber<T> downstreamSubscriber,
  82                         int bufferSize) {
  83         this.downstreamSubscriber = downstreamSubscriber;
  84         this.bufferSize = bufferSize;
  85         synchronized (buffersLock) {
  86             internalBuffers = new ArrayList<>();
  87         }
  88         state = UNSUBSCRIBED;
  89     }
  90 
  91     /** Returns the number of bytes remaining in the given buffers. */
  92     private static final int remaining(List<ByteBuffer> buffers) {
  93         return buffers.stream().mapToInt(ByteBuffer::remaining).sum();
  94     }
  95 
  96     /**
  97      * Tells whether, or not, there is at least a sufficient number of bytes
  98      * accumulated in the internal buffers. If the subscriber is COMPLETE, and
  99      * has some buffered data, then there is always enough ( to pass downstream ).
 100      */
 101     private final boolean hasEnoughAccumulatedBytes() {
 102         assert Thread.holdsLock(buffersLock);
 103         return accumulatedBytes >= bufferSize
 104                 || (state == COMPLETE && accumulatedBytes > 0);
 105     }
 106 
 107     /**
 108      * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the
 109      * amount of data as required before pushing downstream. The amount of data
 110      * may be less than required ( bufferSize ), in the case where the subscriber
 111      * is COMPLETE.
 112      */
 113     private List<ByteBuffer> fromInternalBuffers() {
 114         assert Thread.holdsLock(buffersLock);
 115         int leftToFill = bufferSize;
 116         int state = this.state;
 117         assert (state == ACTIVE || state == CANCELLED)
 118                 ? accumulatedBytes >= leftToFill : true;
 119         List<ByteBuffer> dsts = new ArrayList<>();
 120 
 121         ListIterator<ByteBuffer> itr = internalBuffers.listIterator();
 122         while (itr.hasNext()) {
 123             ByteBuffer b = itr.next();
 124             if (b.remaining() <= leftToFill) {
 125                 itr.remove();
 126                 if (b.position() != 0)
 127                     b = b.slice();  // ensure position = 0 when propagated
 128                 dsts.add(b);
 129                 leftToFill -= b.remaining();
 130                 accumulatedBytes -= b.remaining();
 131                 if (leftToFill == 0)
 132                     break;
 133             } else {
 134                 int prevLimit = b.limit();
 135                 b.limit(b.position() + leftToFill);
 136                 ByteBuffer slice = b.slice();
 137                 dsts.add(slice);
 138                 b.limit(prevLimit);
 139                 b.position(b.position() + leftToFill);
 140                 accumulatedBytes -= leftToFill;
 141                 leftToFill = 0;
 142                 break;
 143             }
 144         }
 145         assert (state == ACTIVE || state == CANCELLED)
 146                 ? leftToFill == 0 : state == COMPLETE;
 147         assert (state == ACTIVE || state == CANCELLED)
 148                 ? remaining(dsts) == bufferSize : state == COMPLETE;
 149         assert accumulatedBytes >= 0;
 150         assert dsts.stream().noneMatch(b -> b.position() != 0);
 151         return Collections.unmodifiableList(dsts);
 152     }
 153 
 154     /** Subscription that is passed to the downstream subscriber. */
 155     private class DownstreamSubscription implements Flow.Subscription {
 156         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
 157         private final Demand demand = new Demand();
 158         private volatile boolean illegalArg;
 159 
 160         @Override
 161         public void request(long n) {
 162             if (cancelled.get() || illegalArg) {
 163                 return;
 164             }
 165             if (n <= 0L) {
 166                 // pass the "bad" value upstream so the Publisher can deal with
 167                 // it appropriately, i.e. invoke onError
 168                 illegalArg = true;
 169                 subscription.request(n);
 170                 return;
 171             }
 172 
 173             demand.increase(n);
 174 
 175             pushDemanded();
 176         }
 177 
 178         private final SequentialScheduler pushDemandedScheduler =
 179                 new SequentialScheduler(new PushDemandedTask());
 180 
 181         void pushDemanded() {
 182             if (cancelled.get())
 183                 return;
 184             pushDemandedScheduler.runOrSchedule();
 185         }
 186 
 187         class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {
 188             @Override
 189             public void run() {
 190                 try {
 191                     Throwable t = throwable;
 192                     if (t != null) {
 193                         downstreamSubscriber.onError(t);
 194                         pushDemandedScheduler.stop(); // stop the demand scheduler
 195                         return;
 196                     }
 197 
 198                     while (true) {
 199                         List<ByteBuffer> item;
 200                         synchronized (buffersLock) {
 201                             if (cancelled.get())
 202                                 return;
 203                             if (!hasEnoughAccumulatedBytes())
 204                                 break;
 205                             if (!demand.tryDecrement())
 206                                 break;
 207                             item = fromInternalBuffers();
 208                         }
 209                         assert item != null;
 210 
 211                         downstreamSubscriber.onNext(item);
 212                     }
 213                     if (cancelled.get())
 214                         return;
 215 
 216                     // complete only if all data consumed
 217                     boolean complete;
 218                     synchronized (buffersLock) {
 219                         complete = state == COMPLETE && internalBuffers.isEmpty();
 220                     }
 221                     if (complete) {
 222                         downstreamSubscriber.onComplete();
 223                         pushDemandedScheduler.stop(); // stop the demand scheduler
 224                         return;
 225                     }
 226                 } catch (Throwable t) {
 227                     cancel();  // cancel if there is any error
 228                     throw t;
 229                 }
 230 
 231                 boolean requestMore = false;
 232                 synchronized (buffersLock) {
 233                     if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {
 234                         // request more upstream data
 235                         requestMore = true;
 236                     }
 237                 }
 238                 if (requestMore)
 239                     subscription.request(1);
 240             }
 241         }
 242 
 243         @Override
 244         public void cancel() {
 245             if (cancelled.compareAndExchange(false, true))
 246                 return;  // already cancelled
 247 
 248             state = CANCELLED;  // set CANCELLED state of upstream subscriber
 249             subscription.cancel();  // cancel upstream subscription
 250             pushDemandedScheduler.stop(); // stop the demand scheduler
 251         }
 252     }
 253 
 254     @Override
 255     public void onSubscribe(Flow.Subscription subscription) {
 256         Objects.requireNonNull(subscription);
 257         if (this.subscription != null) {
 258             subscription.cancel();
 259             return;
 260         }
 261 
 262         int s = this.state;
 263         assert s == UNSUBSCRIBED;
 264         state = ACTIVE;
 265         this.subscription = subscription;
 266         downstreamSubscription = new DownstreamSubscription();
 267         downstreamSubscriber.onSubscribe(downstreamSubscription);
 268     }
 269 
 270     @Override
 271     public void onNext(List<ByteBuffer> item) {
 272         Objects.requireNonNull(item);
 273 
 274         int s = state;
 275         if (s == CANCELLED)
 276             return;
 277 
 278         if (s != ACTIVE)
 279             throw new InternalError("onNext on inactive subscriber");
 280 
 281         synchronized (buffersLock) {
 282             accumulatedBytes += Utils.accumulateBuffers(internalBuffers, item);
 283         }
 284 
 285         downstreamSubscription.pushDemanded();
 286     }
 287 
 288     @Override
 289     public void onError(Throwable incomingThrowable) {
 290         Objects.requireNonNull(incomingThrowable);
 291         int s = state;
 292         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
 293         state = ERROR;
 294         Throwable t = this.throwable;
 295         assert t == null : "Expected null, got:" + t;
 296         this.throwable = incomingThrowable;
 297         downstreamSubscription.pushDemanded();
 298     }
 299 
 300     @Override
 301     public void onComplete() {
 302         int s = state;
 303         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
 304         state = COMPLETE;
 305         downstreamSubscription.pushDemanded();
 306     }
 307 
 308     @Override
 309     public CompletionStage<T> getBody() {
 310         return downstreamSubscriber.getBody();
 311     }
 312 }