1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.util.ArrayList;
  30 import java.util.List;
  31 import java.util.ListIterator;
  32 import java.util.Objects;
  33 import java.util.concurrent.CompletionStage;
  34 import java.util.concurrent.Flow;
  35 import java.util.concurrent.atomic.AtomicBoolean;
  36 import java.util.concurrent.atomic.AtomicInteger;
  37 
  38 /**
  39  * A buffering BodyProcessor. When subscribed, accumulates ( buffers ) a given
  40  * amount ( in bytes ) of a subscriptions data before pushing it to a downstream
  41  * processor.
  42  */
  43 class BufferingProcessor<T> implements HttpResponse.BodyProcessor<T>
  44 {
  45     /** The downstream consumer of the data. */
  46     private final HttpResponse.BodyProcessor<T> downstreamProcessor;
  47     /** The amount of data to be accumulate before pushing downstream. */
  48     private final int bufferSize;
  49 
  50     /** The subscription, created lazily. */
  51     private volatile Flow.Subscription subscription;
  52     /** The downstream subscription, created lazily. */
  53     private volatile DownstreamSubscription downstreamSubscription;
  54 
  55     /** Must be held when accessing the internal buffers. */
  56     private final Object buffersLock = new Object();
  57     /** The internal buffers holding the buffered data. */
  58     private ArrayList<ByteBuffer> internalBuffers;
  59     /** The actual accumulated remaining bytes in internalBuffers. */
  60     private int accumulatedBytes;
  61 
  62     /** State of the buffering processor:
  63      *  1) [UNSUBSCRIBED] when initially created
  64      *  2) [ACTIVE] when subscribed and can receive data
  65      *  3) [ERROR | CANCELLED | COMPLETE] (terminal state)
  66      */
  67     static final int UNSUBSCRIBED = 0x01;
  68     static final int ACTIVE       = 0x02;
  69     static final int ERROR        = 0x04;
  70     static final int CANCELLED    = 0x08;
  71     static final int COMPLETE     = 0x10;
  72 
  73     private volatile int state;
  74 
  75     BufferingProcessor(HttpResponse.BodyProcessor<T> downstreamProcessor,
  76                        int bufferSize) {
  77         this.downstreamProcessor = downstreamProcessor;
  78         this.bufferSize = bufferSize;
  79         synchronized (buffersLock) {
  80             internalBuffers = new ArrayList<>();
  81         }
  82         state = UNSUBSCRIBED;
  83     }
  84 
  85     /** Returns the number of bytes remaining in the given buffers. */
  86     private static final int remaining(List<ByteBuffer> buffers) {
  87         return buffers.stream().mapToInt(ByteBuffer::remaining).sum();
  88     }
  89 
  90     /**
  91      * Tells whether, or not, there is at least a sufficient number of bytes
  92      * accumulated in the internal buffers. If the processor is COMPLETE, and
  93      * has some buffered data, then there is always enough ( to pass downstream ).
  94      */
  95     private final boolean hasEnoughAccumulatedBytes() {
  96         assert Thread.holdsLock(buffersLock);
  97         return accumulatedBytes >= bufferSize
  98                 || (state == COMPLETE && accumulatedBytes > 0);
  99     }
 100 
 101     /**
 102      * Returns a new List<ByteBuffer> containing exactly the amount of data as
 103      * required before pushing downstream. The amount of data may be less than
 104      * required ( bufferSize ), in the case where the processor is COMPLETE.
 105      */
 106     private List<ByteBuffer> fromInternalBuffers() {
 107         assert Thread.holdsLock(buffersLock);
 108         int leftToFill = bufferSize;
 109         assert (state == ACTIVE || state == CANCELLED)
 110                 ? accumulatedBytes >= leftToFill : true;
 111         List<ByteBuffer> dsts = new ArrayList<>();
 112 
 113         ListIterator<ByteBuffer> itr = internalBuffers.listIterator();
 114         while (itr.hasNext()) {
 115             ByteBuffer b = itr.next();
 116             if (b.remaining() <= leftToFill) {
 117                 itr.remove();
 118                 if (b.position() != 0)
 119                     b = b.slice();  // ensure position = 0 when propagated
 120                 dsts.add(b);
 121                 leftToFill -= b.remaining();
 122                 accumulatedBytes -= b.remaining();
 123                 if (leftToFill == 0)
 124                     break;
 125             } else {
 126                 int prevLimit = b.limit();
 127                 b.limit(b.position() + leftToFill);
 128                 ByteBuffer slice = b.slice();
 129                 dsts.add(slice);
 130                 b.limit(prevLimit);
 131                 b.position(b.position() + leftToFill);
 132                 accumulatedBytes -= leftToFill;
 133                 leftToFill = 0;
 134                 break;
 135             }
 136         }
 137         assert (state == ACTIVE || state == CANCELLED)
 138                 ? leftToFill == 0 : state == COMPLETE;
 139         assert (state == ACTIVE || state == CANCELLED)
 140                 ? remaining(dsts) == bufferSize : state == COMPLETE;
 141         assert accumulatedBytes >= 0;
 142         assert dsts.stream().filter(b -> b.position() != 0).count() == 0;
 143         assert dsts.stream().filter(b -> !b.isReadOnly()).count() == 0;
 144         return dsts;
 145     }
 146 
 147     /** Subscription that is passed to the downstream processor. */
 148     private class DownstreamSubscription implements Flow.Subscription {
 149         private final ThreadLocal<Boolean> tl = ThreadLocal.withInitial(() -> Boolean.FALSE);
 150         private final AtomicInteger callCount = new AtomicInteger();
 151         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
 152         private final Object demandLock = new Object();
 153         private long demand;
 154 
 155         @Override
 156         public void request(long n) {
 157             if (n <= 0L) {
 158                 onError(new IllegalArgumentException(
 159                         "non-positive subscription request"));
 160                 return;
 161             }
 162             if (cancelled.get())
 163                 return;
 164 
 165             synchronized (demandLock) {
 166                 long prev = demand;
 167                 demand += n;
 168                 if (demand < prev)  // saturate
 169                     demand = Long.MAX_VALUE;
 170             }
 171             pushDemanded();
 172         }
 173 
 174         void pushDemanded() {
 175             if (tl.get())
 176                 return;  // avoid recursion
 177             tl.set(Boolean.TRUE);
 178             try {
 179                 pushDemanded0();
 180             } finally {
 181                 tl.set(Boolean.FALSE);
 182             }
 183         }
 184 
 185         private void pushDemanded0() {
 186             if (callCount.getAndIncrement() > 0)
 187                 return;  // the executor of pushDemanded1 can do the work.
 188             pushDemanded1();
 189         }
 190 
 191         private void pushDemanded1() {
 192             try {
 193                 do {
 194                     while (true) {
 195                         List<ByteBuffer> item;
 196                         synchronized (buffersLock) {
 197                             if (cancelled.get())
 198                                 return;
 199                             if (!hasEnoughAccumulatedBytes())
 200                                 break;
 201                             synchronized (demandLock) {
 202                                 if (demand < 1)
 203                                     break;
 204                                 item = fromInternalBuffers();
 205                                 demand--;
 206                             }
 207                         }
 208                         assert item != null;
 209 
 210                         downstreamProcessor.onNext(item);
 211                     }
 212                     if (cancelled.get())
 213                         return;
 214 
 215                     // complete only if all data consumed
 216                     boolean complete;
 217                     synchronized (buffersLock) {
 218                         complete = state == COMPLETE && internalBuffers.isEmpty();
 219                     }
 220                     if (complete) {
 221                         downstreamProcessor.onComplete();
 222                         return;
 223                     }
 224                 } while (callCount.decrementAndGet() > 0);
 225             } finally {
 226                 callCount.set(0);  // reset in case of a exception / cancellation
 227             }
 228             synchronized (demandLock) {
 229                 if (demand > 0) {  // request more upstream data
 230                     subscription.request(1);
 231                 }
 232             }
 233         }
 234 
 235         @Override
 236         public void cancel() {
 237             if (cancelled.compareAndExchange(false, true))
 238                 return;  // already cancelled
 239 
 240             state = CANCELLED;  // set CANCELLED state of upstream subscriber
 241             subscription.cancel();  // cancel upstream subscription
 242         }
 243 
 244         boolean hasDemand() {
 245             synchronized (demandLock) {
 246                 return demand > 0;
 247             }
 248         }
 249     }
 250 
 251     @Override
 252     public void onSubscribe(Flow.Subscription subscription) {
 253         Objects.requireNonNull(subscription);
 254         if (this.subscription != null) {
 255             subscription.cancel();
 256             return;
 257         }
 258 
 259         assert state == UNSUBSCRIBED;
 260         state = ACTIVE;
 261         this.subscription = subscription;
 262         downstreamSubscription = new DownstreamSubscription();
 263         downstreamProcessor.onSubscribe(downstreamSubscription);
 264     }
 265 
 266     @Override
 267     public void onNext(List<ByteBuffer> item) {
 268         Objects.requireNonNull(item);
 269         //assert accumulatedBytes < bufferSize;
 270         assert downstreamSubscription.hasDemand();
 271 
 272         if (state == CANCELLED)
 273             return;
 274 
 275         if (state != ACTIVE)
 276             throw new InternalError("onNext on inactive subscriber");
 277 
 278         synchronized (buffersLock) {
 279             item.forEach(bb -> {
 280                 internalBuffers.add(bb.asReadOnlyBuffer());
 281                 accumulatedBytes += bb.remaining();
 282             });
 283         }
 284 
 285         downstreamSubscription.pushDemanded();
 286     }
 287 
 288     @Override
 289     public void onError(Throwable throwable) {
 290         Objects.requireNonNull(throwable);
 291         assert state == ACTIVE;
 292         state = ERROR;
 293         downstreamProcessor.onError(throwable);
 294     }
 295 
 296     @Override
 297     public void onComplete() {
 298         assert state == ACTIVE;
 299         state = COMPLETE;
 300         downstreamSubscription.pushDemanded();
 301     }
 302 
 303     @Override
 304     public CompletionStage<T> getBody() {
 305         return downstreamProcessor.getBody();
 306     }
 307 }