1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.nio.ByteBuffer; 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.ListIterator; 32 import java.util.Objects; 33 import java.util.concurrent.CompletionStage; 34 import java.util.concurrent.Flow; 35 import java.util.concurrent.atomic.AtomicBoolean; 36 import java.util.concurrent.atomic.AtomicInteger; 37 38 /** 39 * A buffering BodyProcessor. When subscribed, accumulates ( buffers ) a given 40 * amount ( in bytes ) of a subscriptions data before pushing it to a downstream 41 * processor. 42 */ 43 class BufferingProcessor<T> implements HttpResponse.BodyProcessor<T> 44 { 45 /** The downstream consumer of the data. */ 46 private final HttpResponse.BodyProcessor<T> downstreamProcessor; 47 /** The amount of data to be accumulate before pushing downstream. */ 48 private final int bufferSize; 49 50 /** The subscription, created lazily. */ 51 private volatile Flow.Subscription subscription; 52 /** The downstream subscription, created lazily. */ 53 private volatile DownstreamSubscription downstreamSubscription; 54 55 /** Must be held when accessing the internal buffers. */ 56 private final Object buffersLock = new Object(); 57 /** The internal buffers holding the buffered data. */ 58 private ArrayList<ByteBuffer> internalBuffers; 59 /** The actual accumulated remaining bytes in internalBuffers. */ 60 private int accumulatedBytes; 61 62 /** State of the buffering processor: 63 * 1) [UNSUBSCRIBED] when initially created 64 * 2) [ACTIVE] when subscribed and can receive data 65 * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) 66 */ 67 static final int UNSUBSCRIBED = 0x01; 68 static final int ACTIVE = 0x02; 69 static final int ERROR = 0x04; 70 static final int CANCELLED = 0x08; 71 static final int COMPLETE = 0x10; 72 73 private volatile int state; 74 75 BufferingProcessor(HttpResponse.BodyProcessor<T> downstreamProcessor, 76 int bufferSize) { 77 this.downstreamProcessor = downstreamProcessor; 78 this.bufferSize = bufferSize; 79 synchronized (buffersLock) { 80 internalBuffers = new ArrayList<>(); 81 } 82 state = UNSUBSCRIBED; 83 } 84 85 /** Returns the number of bytes remaining in the given buffers. */ 86 private static final int remaining(List<ByteBuffer> buffers) { 87 return buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 88 } 89 90 /** 91 * Tells whether, or not, there is at least a sufficient number of bytes 92 * accumulated in the internal buffers. If the processor is COMPLETE, and 93 * has some buffered data, then there is always enough ( to pass downstream ). 94 */ 95 private final boolean hasEnoughAccumulatedBytes() { 96 assert Thread.holdsLock(buffersLock); 97 return accumulatedBytes >= bufferSize 98 || (state == COMPLETE && accumulatedBytes > 0); 99 } 100 101 /** 102 * Returns a new List<ByteBuffer> containing exactly the amount of data as 103 * required before pushing downstream. The amount of data may be less than 104 * required ( bufferSize ), in the case where the processor is COMPLETE. 105 */ 106 private List<ByteBuffer> fromInternalBuffers() { 107 assert Thread.holdsLock(buffersLock); 108 int leftToFill = bufferSize; 109 assert (state == ACTIVE || state == CANCELLED) 110 ? accumulatedBytes >= leftToFill : true; 111 List<ByteBuffer> dsts = new ArrayList<>(); 112 113 ListIterator<ByteBuffer> itr = internalBuffers.listIterator(); 114 while (itr.hasNext()) { 115 ByteBuffer b = itr.next(); 116 if (b.remaining() <= leftToFill) { 117 itr.remove(); 118 if (b.position() != 0) 119 b = b.slice(); // ensure position = 0 when propagated 120 dsts.add(b); 121 leftToFill -= b.remaining(); 122 accumulatedBytes -= b.remaining(); 123 if (leftToFill == 0) 124 break; 125 } else { 126 int prevLimit = b.limit(); 127 b.limit(b.position() + leftToFill); 128 ByteBuffer slice = b.slice(); 129 dsts.add(slice); 130 b.limit(prevLimit); 131 b.position(b.position() + leftToFill); 132 accumulatedBytes -= leftToFill; 133 leftToFill = 0; 134 break; 135 } 136 } 137 assert (state == ACTIVE || state == CANCELLED) 138 ? leftToFill == 0 : state == COMPLETE; 139 assert (state == ACTIVE || state == CANCELLED) 140 ? remaining(dsts) == bufferSize : state == COMPLETE; 141 assert accumulatedBytes >= 0; 142 assert dsts.stream().filter(b -> b.position() != 0).count() == 0; 143 assert dsts.stream().filter(b -> !b.isReadOnly()).count() == 0; 144 return dsts; 145 } 146 147 /** Subscription that is passed to the downstream processor. */ 148 private class DownstreamSubscription implements Flow.Subscription { 149 private final ThreadLocal<Boolean> tl = ThreadLocal.withInitial(() -> Boolean.FALSE); 150 private final AtomicInteger callCount = new AtomicInteger(); 151 private final AtomicBoolean cancelled = new AtomicBoolean(); // false 152 private final Object demandLock = new Object(); 153 private long demand; 154 155 @Override 156 public void request(long n) { 157 if (n <= 0L) { 158 onError(new IllegalArgumentException( 159 "non-positive subscription request")); 160 return; 161 } 162 if (cancelled.get()) 163 return; 164 165 synchronized (demandLock) { 166 long prev = demand; 167 demand += n; 168 if (demand < prev) // saturate 169 demand = Long.MAX_VALUE; 170 } 171 pushDemanded(); 172 } 173 174 void pushDemanded() { 175 if (tl.get()) 176 return; // avoid recursion 177 tl.set(Boolean.TRUE); 178 try { 179 pushDemanded0(); 180 } finally { 181 tl.set(Boolean.FALSE); 182 } 183 } 184 185 private void pushDemanded0() { 186 if (callCount.getAndIncrement() > 0) 187 return; // the executor of pushDemanded1 can do the work. 188 pushDemanded1(); 189 } 190 191 private void pushDemanded1() { 192 try { 193 do { 194 while (true) { 195 List<ByteBuffer> item; 196 synchronized (buffersLock) { 197 if (cancelled.get()) 198 return; 199 if (!hasEnoughAccumulatedBytes()) 200 break; 201 synchronized (demandLock) { 202 if (demand < 1) 203 break; 204 item = fromInternalBuffers(); 205 demand--; 206 } 207 } 208 assert item != null; 209 210 downstreamProcessor.onNext(item); 211 } 212 if (cancelled.get()) 213 return; 214 215 // complete only if all data consumed 216 boolean complete; 217 synchronized (buffersLock) { 218 complete = state == COMPLETE && internalBuffers.isEmpty(); 219 } 220 if (complete) { 221 downstreamProcessor.onComplete(); 222 return; 223 } 224 } while (callCount.decrementAndGet() > 0); 225 } finally { 226 callCount.set(0); // reset in case of a exception / cancellation 227 } 228 synchronized (demandLock) { 229 if (demand > 0) { // request more upstream data 230 subscription.request(1); 231 } 232 } 233 } 234 235 @Override 236 public void cancel() { 237 if (cancelled.compareAndExchange(false, true)) 238 return; // already cancelled 239 240 state = CANCELLED; // set CANCELLED state of upstream subscriber 241 subscription.cancel(); // cancel upstream subscription 242 } 243 244 boolean hasDemand() { 245 synchronized (demandLock) { 246 return demand > 0; 247 } 248 } 249 } 250 251 @Override 252 public void onSubscribe(Flow.Subscription subscription) { 253 Objects.requireNonNull(subscription); 254 if (this.subscription != null) { 255 subscription.cancel(); 256 return; 257 } 258 259 assert state == UNSUBSCRIBED; 260 state = ACTIVE; 261 this.subscription = subscription; 262 downstreamSubscription = new DownstreamSubscription(); 263 downstreamProcessor.onSubscribe(downstreamSubscription); 264 } 265 266 @Override 267 public void onNext(List<ByteBuffer> item) { 268 Objects.requireNonNull(item); 269 //assert accumulatedBytes < bufferSize; 270 assert downstreamSubscription.hasDemand(); 271 272 if (state == CANCELLED) 273 return; 274 275 if (state != ACTIVE) 276 throw new InternalError("onNext on inactive subscriber"); 277 278 synchronized (buffersLock) { 279 item.forEach(bb -> { 280 internalBuffers.add(bb.asReadOnlyBuffer()); 281 accumulatedBytes += bb.remaining(); 282 }); 283 } 284 285 downstreamSubscription.pushDemanded(); 286 } 287 288 @Override 289 public void onError(Throwable throwable) { 290 Objects.requireNonNull(throwable); 291 assert state == ACTIVE; 292 state = ERROR; 293 downstreamProcessor.onError(throwable); 294 } 295 296 @Override 297 public void onComplete() { 298 assert state == ACTIVE; 299 state = COMPLETE; 300 downstreamSubscription.pushDemanded(); 301 } 302 303 @Override 304 public CompletionStage<T> getBody() { 305 return downstreamProcessor.getBody(); 306 } 307 }