1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.nio.ByteBuffer; 29 import java.util.ArrayList; 30 import java.util.Collections; 31 import java.util.List; 32 import java.util.ListIterator; 33 import java.util.Objects; 34 import java.util.concurrent.CompletionStage; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import jdk.incubator.http.internal.common.Demand; 38 import jdk.incubator.http.internal.common.SequentialScheduler; 39 import jdk.incubator.http.internal.common.Utils; 40 41 /** 42 * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given 43 * amount ( in bytes ) of a publisher's data before pushing it to a downstream 44 * subscriber. 45 */ 46 class BufferingSubscriber<T> implements HttpResponse.BodySubscriber<T> 47 { 48 /** The downstream consumer of the data. */ 49 private final HttpResponse.BodySubscriber<T> downstreamSubscriber; 50 /** The amount of data to be accumulate before pushing downstream. */ 51 private final int bufferSize; 52 53 /** The subscription, created lazily. */ 54 private volatile Flow.Subscription subscription; 55 /** The downstream subscription, created lazily. */ 56 private volatile DownstreamSubscription downstreamSubscription; 57 58 /** Must be held when accessing the internal buffers. */ 59 private final Object buffersLock = new Object(); 60 /** The internal buffers holding the buffered data. */ 61 private ArrayList<ByteBuffer> internalBuffers; 62 /** The actual accumulated remaining bytes in internalBuffers. */ 63 private int accumulatedBytes; 64 65 /** Holds the Throwable from upstream's onError. */ 66 private volatile Throwable throwable; 67 68 /** State of the buffering subscriber: 69 * 1) [UNSUBSCRIBED] when initially created 70 * 2) [ACTIVE] when subscribed and can receive data 71 * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) 72 */ 73 static final int UNSUBSCRIBED = 0x01; 74 static final int ACTIVE = 0x02; 75 static final int ERROR = 0x04; 76 static final int CANCELLED = 0x08; 77 static final int COMPLETE = 0x10; 78 79 private volatile int state; 80 81 BufferingSubscriber(HttpResponse.BodySubscriber<T> downstreamSubscriber, 82 int bufferSize) { 83 this.downstreamSubscriber = downstreamSubscriber; 84 this.bufferSize = bufferSize; 85 synchronized (buffersLock) { 86 internalBuffers = new ArrayList<>(); 87 } 88 state = UNSUBSCRIBED; 89 } 90 91 /** Returns the number of bytes remaining in the given buffers. */ 92 private static final int remaining(List<ByteBuffer> buffers) { 93 return buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 94 } 95 96 /** 97 * Tells whether, or not, there is at least a sufficient number of bytes 98 * accumulated in the internal buffers. If the subscriber is COMPLETE, and 99 * has some buffered data, then there is always enough ( to pass downstream ). 100 */ 101 private final boolean hasEnoughAccumulatedBytes() { 102 assert Thread.holdsLock(buffersLock); 103 return accumulatedBytes >= bufferSize 104 || (state == COMPLETE && accumulatedBytes > 0); 105 } 106 107 /** 108 * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the 109 * amount of data as required before pushing downstream. The amount of data 110 * may be less than required ( bufferSize ), in the case where the subscriber 111 * is COMPLETE. 112 */ 113 private List<ByteBuffer> fromInternalBuffers() { 114 assert Thread.holdsLock(buffersLock); 115 int leftToFill = bufferSize; 116 int state = this.state; 117 assert (state == ACTIVE || state == CANCELLED) 118 ? accumulatedBytes >= leftToFill : true; 119 List<ByteBuffer> dsts = new ArrayList<>(); 120 121 ListIterator<ByteBuffer> itr = internalBuffers.listIterator(); 122 while (itr.hasNext()) { 123 ByteBuffer b = itr.next(); 124 if (b.remaining() <= leftToFill) { 125 itr.remove(); 126 if (b.position() != 0) 127 b = b.slice(); // ensure position = 0 when propagated 128 dsts.add(b); 129 leftToFill -= b.remaining(); 130 accumulatedBytes -= b.remaining(); 131 if (leftToFill == 0) 132 break; 133 } else { 134 int prevLimit = b.limit(); 135 b.limit(b.position() + leftToFill); 136 ByteBuffer slice = b.slice(); 137 dsts.add(slice); 138 b.limit(prevLimit); 139 b.position(b.position() + leftToFill); 140 accumulatedBytes -= leftToFill; 141 leftToFill = 0; 142 break; 143 } 144 } 145 assert (state == ACTIVE || state == CANCELLED) 146 ? leftToFill == 0 : state == COMPLETE; 147 assert (state == ACTIVE || state == CANCELLED) 148 ? remaining(dsts) == bufferSize : state == COMPLETE; 149 assert accumulatedBytes >= 0; 150 assert dsts.stream().noneMatch(b -> b.position() != 0); 151 return Collections.unmodifiableList(dsts); 152 } 153 154 /** Subscription that is passed to the downstream subscriber. */ 155 private class DownstreamSubscription implements Flow.Subscription { 156 private final AtomicBoolean cancelled = new AtomicBoolean(); // false 157 private final Demand demand = new Demand(); 158 private volatile boolean illegalArg; 159 160 @Override 161 public void request(long n) { 162 if (cancelled.get() || illegalArg) { 163 return; 164 } 165 if (n <= 0L) { 166 // pass the "bad" value upstream so the Publisher can deal with 167 // it appropriately, i.e. invoke onError 168 illegalArg = true; 169 subscription.request(n); 170 return; 171 } 172 173 demand.increase(n); 174 175 pushDemanded(); 176 } 177 178 private final SequentialScheduler pushDemandedScheduler = 179 new SequentialScheduler(new PushDemandedTask()); 180 181 void pushDemanded() { 182 if (cancelled.get()) 183 return; 184 pushDemandedScheduler.runOrSchedule(); 185 } 186 187 class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask { 188 @Override 189 public void run() { 190 try { 191 Throwable t = throwable; 192 if (t != null) { 193 downstreamSubscriber.onError(t); 194 pushDemandedScheduler.stop(); // stop the demand scheduler 195 return; 196 } 197 198 while (true) { 199 List<ByteBuffer> item; 200 synchronized (buffersLock) { 201 if (cancelled.get()) 202 return; 203 if (!hasEnoughAccumulatedBytes()) 204 break; 205 if (!demand.tryDecrement()) 206 break; 207 item = fromInternalBuffers(); 208 } 209 assert item != null; 210 211 downstreamSubscriber.onNext(item); 212 } 213 if (cancelled.get()) 214 return; 215 216 // complete only if all data consumed 217 boolean complete; 218 synchronized (buffersLock) { 219 complete = state == COMPLETE && internalBuffers.isEmpty(); 220 } 221 if (complete) { 222 downstreamSubscriber.onComplete(); 223 pushDemandedScheduler.stop(); // stop the demand scheduler 224 return; 225 } 226 } catch (Throwable t) { 227 cancel(); // cancel if there is any error 228 throw t; 229 } 230 231 boolean requestMore = false; 232 synchronized (buffersLock) { 233 if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) { 234 // request more upstream data 235 requestMore = true; 236 } 237 } 238 if (requestMore) 239 subscription.request(1); 240 } 241 } 242 243 @Override 244 public void cancel() { 245 if (cancelled.compareAndExchange(false, true)) 246 return; // already cancelled 247 248 state = CANCELLED; // set CANCELLED state of upstream subscriber 249 subscription.cancel(); // cancel upstream subscription 250 pushDemandedScheduler.stop(); // stop the demand scheduler 251 } 252 } 253 254 @Override 255 public void onSubscribe(Flow.Subscription subscription) { 256 Objects.requireNonNull(subscription); 257 if (this.subscription != null) { 258 subscription.cancel(); 259 return; 260 } 261 262 int s = this.state; 263 assert s == UNSUBSCRIBED; 264 state = ACTIVE; 265 this.subscription = subscription; 266 downstreamSubscription = new DownstreamSubscription(); 267 downstreamSubscriber.onSubscribe(downstreamSubscription); 268 } 269 270 @Override 271 public void onNext(List<ByteBuffer> item) { 272 Objects.requireNonNull(item); 273 274 int s = state; 275 if (s == CANCELLED) 276 return; 277 278 if (s != ACTIVE) 279 throw new InternalError("onNext on inactive subscriber"); 280 281 synchronized (buffersLock) { 282 accumulatedBytes += Utils.accumulateBuffers(internalBuffers, item); 283 } 284 285 downstreamSubscription.pushDemanded(); 286 } 287 288 @Override 289 public void onError(Throwable incomingThrowable) { 290 Objects.requireNonNull(incomingThrowable); 291 int s = state; 292 assert s == ACTIVE : "Expected ACTIVE, got:" + s; 293 state = ERROR; 294 Throwable t = this.throwable; 295 assert t == null : "Expected null, got:" + t; 296 this.throwable = incomingThrowable; 297 downstreamSubscription.pushDemanded(); 298 } 299 300 @Override 301 public void onComplete() { 302 int s = state; 303 assert s == ACTIVE : "Expected ACTIVE, got:" + s; 304 state = COMPLETE; 305 downstreamSubscription.pushDemanded(); 306 } 307 308 @Override 309 public CompletionStage<T> getBody() { 310 return downstreamSubscriber.getBody(); 311 } 312 }