1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.io.InputStream; 30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.OpenOption; 34 import java.nio.file.Path; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.ArrayList; 40 import java.util.Iterator; 41 import java.util.List; 42 import java.util.Objects; 43 import java.util.Optional; 44 import java.util.concurrent.ArrayBlockingQueue; 45 import java.util.concurrent.BlockingQueue; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.CompletionStage; 48 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 import java.util.function.Consumer; 52 import java.util.function.Function; 53 import jdk.incubator.http.internal.common.MinimalFuture; 54 import jdk.incubator.http.internal.common.Utils; 55 56 class ResponseSubscribers { 57 58 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { 59 private final Consumer<Optional<byte[]>> consumer; 60 private Flow.Subscription subscription; 61 private final CompletableFuture<Void> result = new MinimalFuture<>(); 62 private final AtomicBoolean subscribed = new AtomicBoolean(); 63 64 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { 65 this.consumer = consumer; 66 } 67 68 @Override 69 public CompletionStage<Void> getBody() { 70 return result; 71 } 72 73 @Override 74 public void onSubscribe(Flow.Subscription subscription) { 75 if (!subscribed.compareAndSet(false, true)) { 76 subscription.cancel(); 77 } else { 78 this.subscription = subscription; 79 subscription.request(1); 80 } 81 } 82 83 @Override 84 public void onNext(List<ByteBuffer> items) { 85 for (ByteBuffer item : items) { 86 byte[] buf = new byte[item.remaining()]; 87 item.get(buf); 88 consumer.accept(Optional.of(buf)); 89 } 90 subscription.request(1); 91 } 92 93 @Override 94 public void onError(Throwable throwable) { 95 result.completeExceptionally(throwable); 96 } 97 98 @Override 99 public void onComplete() { 100 consumer.accept(Optional.empty()); 101 result.complete(null); 102 } 103 104 } 105 106 static class PathSubscriber implements HttpResponse.BodySubscriber<Path> { 107 108 private final Path file; 109 private final CompletableFuture<Path> result = new MinimalFuture<>(); 110 111 private volatile Flow.Subscription subscription; 112 private volatile FileChannel out; 113 private volatile AccessControlContext acc; 114 private final OpenOption[] options; 115 116 PathSubscriber(Path file, OpenOption... options) { 117 this.file = file; 118 this.options = options; 119 } 120 121 void setAccessControlContext(AccessControlContext acc) { 122 this.acc = acc; 123 } 124 125 @Override 126 public void onSubscribe(Flow.Subscription subscription) { 127 if (System.getSecurityManager() != null && acc == null) 128 throw new InternalError( 129 "Unexpected null acc when security manager has been installed"); 130 131 this.subscription = subscription; 132 try { 133 PrivilegedExceptionAction<FileChannel> pa = 134 () -> FileChannel.open(file, options); 135 out = AccessController.doPrivileged(pa, acc); 136 } catch (PrivilegedActionException pae) { 137 Throwable t = pae.getCause() != null ? pae.getCause() : pae; 138 result.completeExceptionally(t); 139 subscription.cancel(); 140 return; 141 } 142 subscription.request(1); 143 } 144 145 @Override 146 public void onNext(List<ByteBuffer> items) { 147 try { 148 out.write(items.toArray(Utils.EMPTY_BB_ARRAY)); 149 } catch (IOException ex) { 150 Utils.close(out); 151 subscription.cancel(); 152 result.completeExceptionally(ex); 153 } 154 subscription.request(1); 155 } 156 157 @Override 158 public void onError(Throwable e) { 159 result.completeExceptionally(e); 160 Utils.close(out); 161 } 162 163 @Override 164 public void onComplete() { 165 Utils.close(out); 166 result.complete(file); 167 } 168 169 @Override 170 public CompletionStage<Path> getBody() { 171 return result; 172 } 173 } 174 175 static class ByteArraySubscriber<T> implements HttpResponse.BodySubscriber<T> { 176 private final Function<byte[], T> finisher; 177 private final CompletableFuture<T> result = new MinimalFuture<>(); 178 private final List<ByteBuffer> received = new ArrayList<>(); 179 180 private volatile Flow.Subscription subscription; 181 182 ByteArraySubscriber(Function<byte[],T> finisher) { 183 this.finisher = finisher; 184 } 185 186 @Override 187 public void onSubscribe(Flow.Subscription subscription) { 188 if (this.subscription != null) { 189 subscription.cancel(); 190 return; 191 } 192 this.subscription = subscription; 193 // We can handle whatever you've got 194 subscription.request(Long.MAX_VALUE); 195 } 196 197 @Override 198 public void onNext(List<ByteBuffer> items) { 199 // incoming buffers are allocated by http client internally, 200 // and won't be used anywhere except this place. 201 // So it's free simply to store them for further processing. 202 assert Utils.hasRemaining(items); 203 Utils.accumulateBuffers(received, items); 204 } 205 206 @Override 207 public void onError(Throwable throwable) { 208 received.clear(); 209 result.completeExceptionally(throwable); 210 } 211 212 static private byte[] join(List<ByteBuffer> bytes) { 213 int size = Utils.remaining(bytes, Integer.MAX_VALUE); 214 byte[] res = new byte[size]; 215 int from = 0; 216 for (ByteBuffer b : bytes) { 217 int l = b.remaining(); 218 b.get(res, from, l); 219 from += l; 220 } 221 return res; 222 } 223 224 @Override 225 public void onComplete() { 226 try { 227 result.complete(finisher.apply(join(received))); 228 received.clear(); 229 } catch (IllegalArgumentException e) { 230 result.completeExceptionally(e); 231 } 232 } 233 234 @Override 235 public CompletionStage<T> getBody() { 236 return result; 237 } 238 } 239 240 /** 241 * An InputStream built on top of the Flow API. 242 */ 243 static class HttpResponseInputStream extends InputStream 244 implements HttpResponse.BodySubscriber<InputStream> 245 { 246 final static boolean DEBUG = Utils.DEBUG; 247 final static int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer 248 249 // An immutable ByteBuffer sentinel to mark that the last byte was received. 250 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); 251 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); 252 private static final System.Logger DEBUG_LOGGER = 253 Utils.getDebugLogger("HttpResponseInputStream"::toString, DEBUG); 254 255 // A queue of yet unprocessed ByteBuffers received from the flow API. 256 private final BlockingQueue<List<ByteBuffer>> buffers; 257 private volatile Flow.Subscription subscription; 258 private volatile boolean closed; 259 private volatile Throwable failed; 260 private volatile Iterator<ByteBuffer> currentListItr; 261 private volatile ByteBuffer currentBuffer; 262 private final AtomicBoolean subscribed = new AtomicBoolean(); 263 264 HttpResponseInputStream() { 265 this(MAX_BUFFERS_IN_QUEUE); 266 } 267 268 HttpResponseInputStream(int maxBuffers) { 269 int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers); 270 // 1 additional slot needed for LAST_LIST added by onComplete 271 this.buffers = new ArrayBlockingQueue<>(capacity + 1); 272 } 273 274 @Override 275 public CompletionStage<InputStream> getBody() { 276 // Returns the stream immediately, before the 277 // response body is received. 278 // This makes it possible for senAsync().get().body() 279 // to complete before the response body is received. 280 return CompletableFuture.completedStage(this); 281 } 282 283 // Returns the current byte buffer to read from. 284 // If the current buffer has no remaining data, this method will take the 285 // next buffer from the buffers queue, possibly blocking until 286 // a new buffer is made available through the Flow API, or the 287 // end of the flow has been reached. 288 private ByteBuffer current() throws IOException { 289 while (currentBuffer == null || !currentBuffer.hasRemaining()) { 290 // Check whether the stream is closed or exhausted 291 if (closed || failed != null) { 292 throw new IOException("closed", failed); 293 } 294 if (currentBuffer == LAST_BUFFER) break; 295 296 try { 297 if (currentListItr == null || !currentListItr.hasNext()) { 298 // Take a new list of buffers from the queue, blocking 299 // if none is available yet... 300 301 DEBUG_LOGGER.log(Level.DEBUG, "Taking list of Buffers"); 302 List<ByteBuffer> lb = buffers.take(); 303 currentListItr = lb.iterator(); 304 DEBUG_LOGGER.log(Level.DEBUG, "List of Buffers Taken"); 305 306 // Check whether an exception was encountered upstream 307 if (closed || failed != null) 308 throw new IOException("closed", failed); 309 310 // Check whether we're done. 311 if (lb == LAST_LIST) { 312 currentListItr = null; 313 currentBuffer = LAST_BUFFER; 314 break; 315 } 316 317 // Request another upstream item ( list of buffers ) 318 Flow.Subscription s = subscription; 319 if (s != null) { 320 DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1"); 321 s.request(1); 322 } 323 } 324 assert currentListItr != null; 325 assert currentListItr.hasNext(); 326 DEBUG_LOGGER.log(Level.DEBUG, "Next Buffer"); 327 currentBuffer = currentListItr.next(); 328 } catch (InterruptedException ex) { 329 // continue 330 } 331 } 332 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); 333 return currentBuffer; 334 } 335 336 @Override 337 public int read(byte[] bytes, int off, int len) throws IOException { 338 // get the buffer to read from, possibly blocking if 339 // none is available 340 ByteBuffer buffer; 341 if ((buffer = current()) == LAST_BUFFER) return -1; 342 343 // don't attempt to read more than what is available 344 // in the current buffer. 345 int read = Math.min(buffer.remaining(), len); 346 assert read > 0 && read <= buffer.remaining(); 347 348 // buffer.get() will do the boundary check for us. 349 buffer.get(bytes, off, read); 350 return read; 351 } 352 353 @Override 354 public int read() throws IOException { 355 ByteBuffer buffer; 356 if ((buffer = current()) == LAST_BUFFER) return -1; 357 return buffer.get() & 0xFF; 358 } 359 360 @Override 361 public void onSubscribe(Flow.Subscription s) { 362 if (!subscribed.compareAndSet(false, true)) { 363 s.cancel(); 364 } else { 365 // check whether the stream is already closed. 366 // if so, we should cancel the subscription 367 // immediately. 368 boolean closed; 369 synchronized(this) { 370 closed = this.closed; 371 if (!closed) { 372 this.subscription = s; 373 } 374 } 375 if (closed) { 376 s.cancel(); 377 return; 378 } 379 assert buffers.remainingCapacity() > 1; // should contain at least 2 380 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " 381 + Math.max(1, buffers.remainingCapacity() - 1)); 382 s.request(Math.max(1, buffers.remainingCapacity() - 1)); 383 } 384 } 385 386 @Override 387 public void onNext(List<ByteBuffer> t) { 388 Objects.requireNonNull(t); 389 try { 390 DEBUG_LOGGER.log(Level.DEBUG, "next item received"); 391 if (!buffers.offer(t)) { 392 throw new IllegalStateException("queue is full"); 393 } 394 DEBUG_LOGGER.log(Level.DEBUG, "item offered"); 395 } catch (Exception ex) { 396 failed = ex; 397 try { 398 close(); 399 } catch (IOException ex1) { 400 // OK 401 } 402 } 403 } 404 405 @Override 406 public void onError(Throwable thrwbl) { 407 subscription = null; 408 failed = Objects.requireNonNull(thrwbl); 409 // The client process that reads the input stream might 410 // be blocked in queue.take(). 411 // Tries to offer LAST_LIST to the queue. If the queue is 412 // full we don't care if we can't insert this buffer, as 413 // the client can't be blocked in queue.take() in that case. 414 // Adding LAST_LIST to the queue is harmless, as the client 415 // should find failed != null before handling LAST_LIST. 416 buffers.offer(LAST_LIST); 417 } 418 419 @Override 420 public void onComplete() { 421 subscription = null; 422 onNext(LAST_LIST); 423 } 424 425 @Override 426 public void close() throws IOException { 427 Flow.Subscription s; 428 synchronized (this) { 429 if (closed) return; 430 closed = true; 431 s = subscription; 432 subscription = null; 433 } 434 // s will be null if already completed 435 if (s != null) { 436 s.cancel(); 437 } 438 super.close(); 439 } 440 441 } 442 443 static class MultiSubscriberImpl<V> 444 implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V> 445 { 446 private final MultiMapResult<V> results; 447 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; 448 private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler; 449 private final boolean completion; // aggregate completes on last PP received or overall completion 450 451 MultiSubscriberImpl( 452 Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler, 453 Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { 454 this.results = new MultiMapResult<>(new ConcurrentHashMap<>()); 455 this.requestHandler = requestHandler; 456 this.pushHandler = pushHandler; 457 this.completion = completion; 458 } 459 460 @Override 461 public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) { 462 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); 463 results.put(request, cf); 464 return requestHandler.apply(request); 465 } 466 467 @Override 468 public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) { 469 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); 470 results.put(push, cf); 471 return pushHandler.apply(push); 472 } 473 474 @Override 475 public void onResponse(HttpResponse<V> response) { 476 CompletableFuture<HttpResponse<V>> cf = results.get(response.request()); 477 cf.complete(response); 478 } 479 480 @Override 481 public void onError(HttpRequest request, Throwable t) { 482 CompletableFuture<HttpResponse<V>> cf = results.get(request); 483 cf.completeExceptionally(t); 484 } 485 486 @Override 487 public CompletableFuture<MultiMapResult<V>> completion( 488 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { 489 if (completion) 490 return onComplete.thenApply((ignored)-> results); 491 else 492 return onFinalPushPromise.thenApply((ignored) -> results); 493 } 494 } 495 496 /** 497 * Currently this consumes all of the data and ignores it 498 */ 499 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { 500 501 private final CompletableFuture<T> cf = new MinimalFuture<>(); 502 private final Optional<T> result; 503 private final AtomicBoolean subscribed = new AtomicBoolean(); 504 505 NullSubscriber(Optional<T> result) { 506 this.result = result; 507 } 508 509 @Override 510 public void onSubscribe(Flow.Subscription subscription) { 511 if (!subscribed.compareAndSet(false, true)) { 512 subscription.cancel(); 513 } else { 514 subscription.request(Long.MAX_VALUE); 515 } 516 } 517 518 @Override 519 public void onNext(List<ByteBuffer> items) { 520 Objects.requireNonNull(items); 521 } 522 523 @Override 524 public void onError(Throwable throwable) { 525 cf.completeExceptionally(throwable); 526 } 527 528 @Override 529 public void onComplete() { 530 if (result.isPresent()) { 531 cf.complete(result.get()); 532 } else { 533 cf.complete(null); 534 } 535 } 536 537 @Override 538 public CompletionStage<T> getBody() { 539 return cf; 540 } 541 } 542 }