1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.util.Collections;
  30 import java.util.LinkedList;
  31 import java.util.List;
  32 import java.util.concurrent.CompletionStage;
  33 import java.util.concurrent.Flow;
  34 import java.util.concurrent.atomic.AtomicBoolean;
  35 import jdk.incubator.http.HttpResponse.BodyHandler;
  36 import jdk.incubator.http.HttpResponse.BodyProcessor;
  37 
  38 /**
  39  * A buffering processor whose subscriber runs in the 
  40  */
  41 class BufferingProcessor<T> implements HttpResponse.BodyProcessor<T> {
  42 
  43     final HttpResponse.BodyProcessor<T> downstreamSubscriber;
  44     volatile Subscription<T> downstreamSubscription;
  45     volatile Flow.Subscription upstreamSubscription;
  46     final List<ByteBuffer> buffers;
  47     volatile long bufCount = 0; // number of bytes in buffers received
  48     long upstreamDemand = 0; // unfulfilled demand to upstream
  49     final AtomicBoolean completed, error;
  50     final long bufsize;
  51 
  52     BufferingProcessor(HttpResponse.BodyProcessor<T> downstreamSubscriber, long bufsize) {
  53         this.downstreamSubscriber = downstreamSubscriber;
  54         this.buffers = Collections.synchronizedList(new LinkedList<>());
  55         this.completed = new AtomicBoolean(false);
  56         this.error = new AtomicBoolean(false);
  57         this.bufsize = bufsize;
  58     }
  59 
  60     private synchronized void requestUpstream() {
  61         if (upstreamDemand < 1) {
  62             upstreamSubscription.request(1);
  63             upstreamDemand++;
  64         }
  65     }
  66     
  67     private synchronized void reduceUpstream() {
  68         upstreamDemand--;
  69     }
  70     
  71     /**
  72      * Return a List<ByteBuffer> containing n bytes. Will return null if fewer than n are
  73      * available. Returned Buffers will normally contain exactly n bytes except
  74      * if subscription is complete and may contain fewer than n in that case.
  75      *
  76      * @param n
  77      * @return
  78      */
  79     List<ByteBuffer> getBuffersOf(long n) {
  80         boolean complete = completed.get();
  81         long l = bufCount;
  82         if (n > l && !complete) {
  83             requestUpstream();
  84             return null;
  85         }
  86         LinkedList<ByteBuffer> list = new LinkedList<>();
  87         long nn = n;
  88         while (nn > 0) {
  89             if (buffers.isEmpty() && complete) {
  90                 break;
  91             }
  92             ByteBuffer buf = buffers.remove(0);
  93             int buflen = buf.remaining();
  94             if (buflen > nn) {
  95                 // need to copy
  96                 ByteBuffer buf1 = getNBytesFrom((int) nn, buf);
  97                 buffers.add(0, buf.slice());
  98                 buf = buf1;
  99             } 
 100             nn -= buf.remaining();
 101             list.add(buf);
 102             l -= buf.remaining();
 103             bufCount = l;
 104         }
 105         requestUpstream();
 106         return list;
 107     }
 108 
 109     static ByteBuffer getNBytesFrom(int n, ByteBuffer buf) {
 110         byte[] bb = new byte[n];
 111         buf.get(bb);
 112         return ByteBuffer.wrap(bb);
 113     }
 114 
 115     @Override
 116     public void onSubscribe(Flow.Subscription subscription) {
 117         this.upstreamSubscription = subscription;
 118         this.downstreamSubscription = new Subscription<>(subscription, bufsize, downstreamSubscriber);
 119         downstreamSubscriber.onSubscribe(downstreamSubscription);
 120         subscription.request(1);
 121     }
 122 
 123     static List<ByteBuffer> asReadOnly(List<ByteBuffer> list) {
 124         LinkedList<ByteBuffer> l = new LinkedList<>();
 125         for (ByteBuffer buf : list) {
 126             ByteBuffer ro = buf.asReadOnlyBuffer();
 127             l.add(ro);
 128         }
 129         return l;
 130     }
 131 
 132     static long remaining(List<ByteBuffer> items) {
 133         long r = 0;
 134         for (ByteBuffer buf : items) {
 135             r += buf.remaining();
 136         }
 137         return r;
 138     }
 139 
 140     @Override
 141     public void onNext(List<ByteBuffer> items) {
 142         reduceUpstream();
 143         assert upstreamDemand >= 0;
 144         // each subscriber gets a read only view of the source buffer
 145         items = asReadOnly(items);
 146         if (completed.get() || error.get()) {
 147             throw new IllegalStateException();
 148         }
 149         buffers.addAll(items);
 150         bufCount += remaining(items);
 151         downstreamSubscription.processRequests();
 152     }
 153 
 154     @Override
 155     public void onError(Throwable throwable) {
 156         if (completed.get() || error.get()) {
 157             throw new IllegalStateException();
 158         }
 159         error.set(true);
 160         upstreamSubscription.cancel();
 161     }
 162 
 163     @Override
 164     public void onComplete() {
 165         if (completed.get() || error.get()) {
 166             throw new IllegalStateException();
 167         }
 168         completed.set(true);
 169         downstreamSubscription.processRequests();
 170     }
 171 
 172     boolean complete() {
 173         return completed.get() && buffers.isEmpty();
 174     }
 175 
 176     @Override
 177     public CompletionStage<T> getBody() {
 178         return downstreamSubscriber.getBody();
 179     }
 180     
 181     class Subscription<T> implements Flow.Subscription {
 182         final Flow.Subscription parent;
 183         long requests = 0;
 184         final long bufsize;
 185         final HttpResponse.BodyProcessor<T> downstreamSubscriber;
 186         int callLevel;
 187         volatile boolean closed;
 188         
 189         Subscription(Flow.Subscription parent, long bufsize, HttpResponse.BodyProcessor<T> downstreamSubscriber) {
 190             this.parent = parent;
 191             this.downstreamSubscriber = downstreamSubscriber;
 192             this.closed = false;
 193             this.bufsize = bufsize;
 194         }
 195         
 196         @Override
 197         public void request(long n) {
 198             if (n <= 0)
 199                 throw new IllegalArgumentException("n<=0");
 200             
 201             if (closed) {
 202                 return;
 203             }
 204             addRequests(n);
 205             processRequests();
 206         }
 207         
 208         synchronized void addRequests(long n) {
 209             requests += n;
 210         }
 211         
 212         /**
 213          * work function which drives as much data as allowed from the publisher
 214          * to the user subscriber. 
 215          */
 216         synchronized void processRequests() {
 217             if (closed) {
 218                 throw new IllegalStateException("subscription closed");
 219             }
 220 
 221             if (++callLevel > 1) {
 222                 callLevel--;
 223                 return;
 224             }
 225 
 226             try {
 227                 while (requests > 0) {
 228                     List<ByteBuffer> bufs = getBuffersOf(bufsize);
 229                     if (bufs == null) {
 230                         return;
 231                     }
 232                     // either bufsize bytes available or we are complete
 233                     requests--;
 234                     long c = getSize(bufs);
 235                     if (c != 0) {
 236                         downstreamSubscriber.onNext(bufs);
 237                     }
 238                     if (c != bufsize) {
 239                         assert complete();
 240                         downstreamSubscriber.onComplete();
 241                         closed = true;
 242                         break;
 243                     }
 244                 }
 245             } catch (Throwable t) {
 246                 cancel();
 247             } finally {
 248                 callLevel--;
 249             }
 250         }
 251        
 252         /**
 253          * cancels this subscription and the underlying parent subscription.
 254          */
 255         @Override
 256         public void cancel() {
 257             if (closed)
 258                 throw new IllegalStateException("subscription closed");
 259             closed = true;
 260             onError(new Throwable("wha"));
 261         }        
 262     }
 263 
 264     static long getSize(List<ByteBuffer> bufs) {
 265         long c = 0;
 266         for (ByteBuffer buf : bufs) {
 267             c += buf.remaining();
 268         }
 269         return c;
 270     }
 271 }