1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.nio.ByteBuffer; 29 import java.util.Collections; 30 import java.util.LinkedList; 31 import java.util.List; 32 import java.util.concurrent.CompletionStage; 33 import java.util.concurrent.Flow; 34 import java.util.concurrent.atomic.AtomicBoolean; 35 import jdk.incubator.http.HttpResponse.BodyHandler; 36 import jdk.incubator.http.HttpResponse.BodyProcessor; 37 38 /** 39 * A buffering processor whose subscriber runs in the 40 */ 41 class BufferingProcessor<T> implements HttpResponse.BodyProcessor<T> { 42 43 final HttpResponse.BodyProcessor<T> downstreamSubscriber; 44 volatile Subscription<T> downstreamSubscription; 45 volatile Flow.Subscription upstreamSubscription; 46 final List<ByteBuffer> buffers; 47 volatile long bufCount = 0; // number of bytes in buffers received 48 long upstreamDemand = 0; // unfulfilled demand to upstream 49 final AtomicBoolean completed, error; 50 final long bufsize; 51 52 BufferingProcessor(HttpResponse.BodyProcessor<T> downstreamSubscriber, long bufsize) { 53 this.downstreamSubscriber = downstreamSubscriber; 54 this.buffers = Collections.synchronizedList(new LinkedList<>()); 55 this.completed = new AtomicBoolean(false); 56 this.error = new AtomicBoolean(false); 57 this.bufsize = bufsize; 58 } 59 60 private synchronized void requestUpstream() { 61 if (upstreamDemand < 1) { 62 upstreamSubscription.request(1); 63 upstreamDemand++; 64 } 65 } 66 67 private synchronized void reduceUpstream() { 68 upstreamDemand--; 69 } 70 71 /** 72 * Return a List<ByteBuffer> containing n bytes. Will return null if fewer than n are 73 * available. Returned Buffers will normally contain exactly n bytes except 74 * if subscription is complete and may contain fewer than n in that case. 75 * 76 * @param n 77 * @return 78 */ 79 List<ByteBuffer> getBuffersOf(long n) { 80 boolean complete = completed.get(); 81 long l = bufCount; 82 if (n > l && !complete) { 83 requestUpstream(); 84 return null; 85 } 86 LinkedList<ByteBuffer> list = new LinkedList<>(); 87 long nn = n; 88 while (nn > 0) { 89 if (buffers.isEmpty() && complete) { 90 break; 91 } 92 ByteBuffer buf = buffers.remove(0); 93 int buflen = buf.remaining(); 94 if (buflen > nn) { 95 // need to copy 96 ByteBuffer buf1 = getNBytesFrom((int) nn, buf); 97 buffers.add(0, buf.slice()); 98 buf = buf1; 99 } 100 nn -= buf.remaining(); 101 list.add(buf); 102 l -= buf.remaining(); 103 bufCount = l; 104 } 105 requestUpstream(); 106 return list; 107 } 108 109 static ByteBuffer getNBytesFrom(int n, ByteBuffer buf) { 110 byte[] bb = new byte[n]; 111 buf.get(bb); 112 return ByteBuffer.wrap(bb); 113 } 114 115 @Override 116 public void onSubscribe(Flow.Subscription subscription) { 117 this.upstreamSubscription = subscription; 118 this.downstreamSubscription = new Subscription<>(subscription, bufsize, downstreamSubscriber); 119 downstreamSubscriber.onSubscribe(downstreamSubscription); 120 subscription.request(1); 121 } 122 123 static List<ByteBuffer> asReadOnly(List<ByteBuffer> list) { 124 LinkedList<ByteBuffer> l = new LinkedList<>(); 125 for (ByteBuffer buf : list) { 126 ByteBuffer ro = buf.asReadOnlyBuffer(); 127 l.add(ro); 128 } 129 return l; 130 } 131 132 static long remaining(List<ByteBuffer> items) { 133 long r = 0; 134 for (ByteBuffer buf : items) { 135 r += buf.remaining(); 136 } 137 return r; 138 } 139 140 @Override 141 public void onNext(List<ByteBuffer> items) { 142 reduceUpstream(); 143 assert upstreamDemand >= 0; 144 // each subscriber gets a read only view of the source buffer 145 items = asReadOnly(items); 146 if (completed.get() || error.get()) { 147 throw new IllegalStateException(); 148 } 149 buffers.addAll(items); 150 bufCount += remaining(items); 151 downstreamSubscription.processRequests(); 152 } 153 154 @Override 155 public void onError(Throwable throwable) { 156 if (completed.get() || error.get()) { 157 throw new IllegalStateException(); 158 } 159 error.set(true); 160 upstreamSubscription.cancel(); 161 } 162 163 @Override 164 public void onComplete() { 165 if (completed.get() || error.get()) { 166 throw new IllegalStateException(); 167 } 168 completed.set(true); 169 downstreamSubscription.processRequests(); 170 } 171 172 boolean complete() { 173 return completed.get() && buffers.isEmpty(); 174 } 175 176 @Override 177 public CompletionStage<T> getBody() { 178 return downstreamSubscriber.getBody(); 179 } 180 181 class Subscription<T> implements Flow.Subscription { 182 final Flow.Subscription parent; 183 long requests = 0; 184 final long bufsize; 185 final HttpResponse.BodyProcessor<T> downstreamSubscriber; 186 int callLevel; 187 volatile boolean closed; 188 189 Subscription(Flow.Subscription parent, long bufsize, HttpResponse.BodyProcessor<T> downstreamSubscriber) { 190 this.parent = parent; 191 this.downstreamSubscriber = downstreamSubscriber; 192 this.closed = false; 193 this.bufsize = bufsize; 194 } 195 196 @Override 197 public void request(long n) { 198 if (n <= 0) 199 throw new IllegalArgumentException("n<=0"); 200 201 if (closed) { 202 return; 203 } 204 addRequests(n); 205 processRequests(); 206 } 207 208 synchronized void addRequests(long n) { 209 requests += n; 210 } 211 212 /** 213 * work function which drives as much data as allowed from the publisher 214 * to the user subscriber. 215 */ 216 synchronized void processRequests() { 217 if (closed) { 218 throw new IllegalStateException("subscription closed"); 219 } 220 221 if (++callLevel > 1) { 222 callLevel--; 223 return; 224 } 225 226 try { 227 while (requests > 0) { 228 List<ByteBuffer> bufs = getBuffersOf(bufsize); 229 if (bufs == null) { 230 return; 231 } 232 // either bufsize bytes available or we are complete 233 requests--; 234 long c = getSize(bufs); 235 if (c != 0) { 236 downstreamSubscriber.onNext(bufs); 237 } 238 if (c != bufsize) { 239 assert complete(); 240 downstreamSubscriber.onComplete(); 241 closed = true; 242 break; 243 } 244 } 245 } catch (Throwable t) { 246 cancel(); 247 } finally { 248 callLevel--; 249 } 250 } 251 252 /** 253 * cancels this subscription and the underlying parent subscription. 254 */ 255 @Override 256 public void cancel() { 257 if (closed) 258 throw new IllegalStateException("subscription closed"); 259 closed = true; 260 onError(new Throwable("wha")); 261 } 262 } 263 264 static long getSize(List<ByteBuffer> bufs) { 265 long c = 0; 266 for (ByteBuffer buf : bufs) { 267 c += buf.remaining(); 268 } 269 return c; 270 } 271 }