1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.io.InputStream; 30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.OpenOption; 34 import java.nio.file.Path; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.ArrayList; 40 import java.util.Iterator; 41 import java.util.List; 42 import java.util.Objects; 43 import java.util.Optional; 44 import java.util.concurrent.ArrayBlockingQueue; 45 import java.util.concurrent.BlockingQueue; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.CompletionStage; 48 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.Flow.Subscriber; 51 import java.util.concurrent.Flow.Subscription; 52 import java.util.concurrent.atomic.AtomicBoolean; 53 import java.util.function.Consumer; 54 import java.util.function.Function; 55 import jdk.incubator.http.internal.common.MinimalFuture; 56 import jdk.incubator.http.internal.common.Utils; 57 58 class ResponseSubscribers { 59 60 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { 61 private final Consumer<Optional<byte[]>> consumer; 62 private Flow.Subscription subscription; 63 private final CompletableFuture<Void> result = new MinimalFuture<>(); 64 private final AtomicBoolean subscribed = new AtomicBoolean(); 65 66 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { 67 this.consumer = consumer; 68 } 69 70 @Override 71 public CompletionStage<Void> getBody() { 72 return result; 73 } 74 75 @Override 76 public void onSubscribe(Flow.Subscription subscription) { 77 if (!subscribed.compareAndSet(false, true)) { 78 subscription.cancel(); 79 } else { 80 this.subscription = subscription; 81 subscription.request(1); 82 } 83 } 84 85 @Override 86 public void onNext(List<ByteBuffer> items) { 87 for (ByteBuffer item : items) { 88 byte[] buf = new byte[item.remaining()]; 89 item.get(buf); 90 consumer.accept(Optional.of(buf)); 91 } 92 subscription.request(1); 93 } 94 95 @Override 96 public void onError(Throwable throwable) { 97 result.completeExceptionally(throwable); 98 } 99 100 @Override 101 public void onComplete() { 102 consumer.accept(Optional.empty()); 103 result.complete(null); 104 } 105 106 } 107 108 static class PathSubscriber implements HttpResponse.BodySubscriber<Path> { 109 110 private final Path file; 111 private final CompletableFuture<Path> result = new MinimalFuture<>(); 112 113 private volatile Flow.Subscription subscription; 114 private volatile FileChannel out; 115 private volatile AccessControlContext acc; 116 private final OpenOption[] options; 117 118 PathSubscriber(Path file, OpenOption... options) { 119 this.file = file; 120 this.options = options; 121 } 122 123 void setAccessControlContext(AccessControlContext acc) { 124 this.acc = acc; 125 } 126 127 @Override 128 public void onSubscribe(Flow.Subscription subscription) { 129 if (System.getSecurityManager() != null && acc == null) 130 throw new InternalError( 131 "Unexpected null acc when security manager has been installed"); 132 133 this.subscription = subscription; 134 try { 135 PrivilegedExceptionAction<FileChannel> pa = 136 () -> FileChannel.open(file, options); 137 out = AccessController.doPrivileged(pa, acc); 138 } catch (PrivilegedActionException pae) { 139 Throwable t = pae.getCause() != null ? pae.getCause() : pae; 140 result.completeExceptionally(t); 141 subscription.cancel(); 142 return; 143 } 144 subscription.request(1); 145 } 146 147 @Override 148 public void onNext(List<ByteBuffer> items) { 149 try { 150 out.write(items.toArray(Utils.EMPTY_BB_ARRAY)); 151 } catch (IOException ex) { 152 Utils.close(out); 153 subscription.cancel(); 154 result.completeExceptionally(ex); 155 } 156 subscription.request(1); 157 } 158 159 @Override 160 public void onError(Throwable e) { 161 result.completeExceptionally(e); 162 Utils.close(out); 163 } 164 165 @Override 166 public void onComplete() { 167 Utils.close(out); 168 result.complete(file); 169 } 170 171 @Override 172 public CompletionStage<Path> getBody() { 173 return result; 174 } 175 } 176 177 static class ByteArraySubscriber<T> implements HttpResponse.BodySubscriber<T> { 178 private final Function<byte[], T> finisher; 179 private final CompletableFuture<T> result = new MinimalFuture<>(); 180 private final List<ByteBuffer> received = new ArrayList<>(); 181 182 private volatile Flow.Subscription subscription; 183 184 ByteArraySubscriber(Function<byte[],T> finisher) { 185 this.finisher = finisher; 186 } 187 188 @Override 189 public void onSubscribe(Flow.Subscription subscription) { 190 if (this.subscription != null) { 191 subscription.cancel(); 192 return; 193 } 194 this.subscription = subscription; 195 // We can handle whatever you've got 196 subscription.request(Long.MAX_VALUE); 197 } 198 199 @Override 200 public void onNext(List<ByteBuffer> items) { 201 // incoming buffers are allocated by http client internally, 202 // and won't be used anywhere except this place. 203 // So it's free simply to store them for further processing. 204 assert Utils.hasRemaining(items); 205 Utils.accumulateBuffers(received, items); 206 } 207 208 @Override 209 public void onError(Throwable throwable) { 210 received.clear(); 211 result.completeExceptionally(throwable); 212 } 213 214 static private byte[] join(List<ByteBuffer> bytes) { 215 int size = Utils.remaining(bytes, Integer.MAX_VALUE); 216 byte[] res = new byte[size]; 217 int from = 0; 218 for (ByteBuffer b : bytes) { 219 int l = b.remaining(); 220 b.get(res, from, l); 221 from += l; 222 } 223 return res; 224 } 225 226 @Override 227 public void onComplete() { 228 try { 229 result.complete(finisher.apply(join(received))); 230 received.clear(); 231 } catch (IllegalArgumentException e) { 232 result.completeExceptionally(e); 233 } 234 } 235 236 @Override 237 public CompletionStage<T> getBody() { 238 return result; 239 } 240 } 241 242 /** 243 * An InputStream built on top of the Flow API. 244 */ 245 static class HttpResponseInputStream extends InputStream 246 implements HttpResponse.BodySubscriber<InputStream> 247 { 248 final static boolean DEBUG = Utils.DEBUG; 249 final static int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer 250 251 // An immutable ByteBuffer sentinel to mark that the last byte was received. 252 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); 253 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); 254 private static final System.Logger DEBUG_LOGGER = 255 Utils.getDebugLogger("HttpResponseInputStream"::toString, DEBUG); 256 257 // A queue of yet unprocessed ByteBuffers received from the flow API. 258 private final BlockingQueue<List<ByteBuffer>> buffers; 259 private volatile Flow.Subscription subscription; 260 private volatile boolean closed; 261 private volatile Throwable failed; 262 private volatile Iterator<ByteBuffer> currentListItr; 263 private volatile ByteBuffer currentBuffer; 264 private final AtomicBoolean subscribed = new AtomicBoolean(); 265 266 HttpResponseInputStream() { 267 this(MAX_BUFFERS_IN_QUEUE); 268 } 269 270 HttpResponseInputStream(int maxBuffers) { 271 int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers); 272 // 1 additional slot needed for LAST_LIST added by onComplete 273 this.buffers = new ArrayBlockingQueue<>(capacity + 1); 274 } 275 276 @Override 277 public CompletionStage<InputStream> getBody() { 278 // Returns the stream immediately, before the 279 // response body is received. 280 // This makes it possible for senAsync().get().body() 281 // to complete before the response body is received. 282 return CompletableFuture.completedStage(this); 283 } 284 285 // Returns the current byte buffer to read from. 286 // If the current buffer has no remaining data, this method will take the 287 // next buffer from the buffers queue, possibly blocking until 288 // a new buffer is made available through the Flow API, or the 289 // end of the flow has been reached. 290 private ByteBuffer current() throws IOException { 291 while (currentBuffer == null || !currentBuffer.hasRemaining()) { 292 // Check whether the stream is closed or exhausted 293 if (closed || failed != null) { 294 throw new IOException("closed", failed); 295 } 296 if (currentBuffer == LAST_BUFFER) break; 297 298 try { 299 if (currentListItr == null || !currentListItr.hasNext()) { 300 // Take a new list of buffers from the queue, blocking 301 // if none is available yet... 302 303 DEBUG_LOGGER.log(Level.DEBUG, "Taking list of Buffers"); 304 List<ByteBuffer> lb = buffers.take(); 305 currentListItr = lb.iterator(); 306 DEBUG_LOGGER.log(Level.DEBUG, "List of Buffers Taken"); 307 308 // Check whether an exception was encountered upstream 309 if (closed || failed != null) 310 throw new IOException("closed", failed); 311 312 // Check whether we're done. 313 if (lb == LAST_LIST) { 314 currentListItr = null; 315 currentBuffer = LAST_BUFFER; 316 break; 317 } 318 319 // Request another upstream item ( list of buffers ) 320 Flow.Subscription s = subscription; 321 if (s != null) { 322 DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1"); 323 s.request(1); 324 } 325 assert currentListItr != null; 326 if (lb.isEmpty()) continue; 327 } 328 assert currentListItr != null; 329 assert currentListItr.hasNext(); 330 DEBUG_LOGGER.log(Level.DEBUG, "Next Buffer"); 331 currentBuffer = currentListItr.next(); 332 } catch (InterruptedException ex) { 333 // continue 334 } 335 } 336 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); 337 return currentBuffer; 338 } 339 340 @Override 341 public int read(byte[] bytes, int off, int len) throws IOException { 342 // get the buffer to read from, possibly blocking if 343 // none is available 344 ByteBuffer buffer; 345 if ((buffer = current()) == LAST_BUFFER) return -1; 346 347 // don't attempt to read more than what is available 348 // in the current buffer. 349 int read = Math.min(buffer.remaining(), len); 350 assert read > 0 && read <= buffer.remaining(); 351 352 // buffer.get() will do the boundary check for us. 353 buffer.get(bytes, off, read); 354 return read; 355 } 356 357 @Override 358 public int read() throws IOException { 359 ByteBuffer buffer; 360 if ((buffer = current()) == LAST_BUFFER) return -1; 361 return buffer.get() & 0xFF; 362 } 363 364 @Override 365 public void onSubscribe(Flow.Subscription s) { 366 try { 367 if (!subscribed.compareAndSet(false, true)) { 368 s.cancel(); 369 } else { 370 // check whether the stream is already closed. 371 // if so, we should cancel the subscription 372 // immediately. 373 boolean closed; 374 synchronized (this) { 375 closed = this.closed; 376 if (!closed) { 377 this.subscription = s; 378 } 379 } 380 if (closed) { 381 s.cancel(); 382 return; 383 } 384 assert buffers.remainingCapacity() > 1; // should contain at least 2 385 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " 386 + Math.max(1, buffers.remainingCapacity() - 1)); 387 s.request(Math.max(1, buffers.remainingCapacity() - 1)); 388 } 389 } catch (Throwable t) { 390 failed = t; 391 try { 392 close(); 393 } catch (IOException x) { 394 // OK 395 } finally { 396 onError(t); 397 } 398 } 399 } 400 401 @Override 402 public void onNext(List<ByteBuffer> t) { 403 Objects.requireNonNull(t); 404 try { 405 DEBUG_LOGGER.log(Level.DEBUG, "next item received"); 406 if (!buffers.offer(t)) { 407 throw new IllegalStateException("queue is full"); 408 } 409 DEBUG_LOGGER.log(Level.DEBUG, "item offered"); 410 } catch (Throwable ex) { 411 failed = ex; 412 try { 413 close(); 414 } catch (IOException ex1) { 415 // OK 416 } finally { 417 onError(ex); 418 } 419 } 420 } 421 422 @Override 423 public void onError(Throwable thrwbl) { 424 subscription = null; 425 failed = Objects.requireNonNull(thrwbl); 426 // The client process that reads the input stream might 427 // be blocked in queue.take(). 428 // Tries to offer LAST_LIST to the queue. If the queue is 429 // full we don't care if we can't insert this buffer, as 430 // the client can't be blocked in queue.take() in that case. 431 // Adding LAST_LIST to the queue is harmless, as the client 432 // should find failed != null before handling LAST_LIST. 433 buffers.offer(LAST_LIST); 434 } 435 436 @Override 437 public void onComplete() { 438 subscription = null; 439 onNext(LAST_LIST); 440 } 441 442 @Override 443 public void close() throws IOException { 444 Flow.Subscription s; 445 synchronized (this) { 446 if (closed) return; 447 closed = true; 448 s = subscription; 449 subscription = null; 450 } 451 // s will be null if already completed 452 try { 453 if (s != null) { 454 s.cancel(); 455 } 456 } finally { 457 buffers.offer(LAST_LIST); 458 super.close(); 459 } 460 } 461 462 } 463 464 static class MultiSubscriberImpl<V> 465 implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V> 466 { 467 private final MultiMapResult<V> results; 468 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; 469 private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler; 470 private final boolean completion; // aggregate completes on last PP received or overall completion 471 472 MultiSubscriberImpl( 473 Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler, 474 Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { 475 this.results = new MultiMapResult<>(new ConcurrentHashMap<>()); 476 this.requestHandler = requestHandler; 477 this.pushHandler = pushHandler; 478 this.completion = completion; 479 } 480 481 @Override 482 public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) { 483 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); 484 results.put(request, cf); 485 return requestHandler.apply(request); 486 } 487 488 @Override 489 public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) { 490 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); 491 results.put(push, cf); 492 return pushHandler.apply(push); 493 } 494 495 @Override 496 public void onResponse(HttpResponse<V> response) { 497 CompletableFuture<HttpResponse<V>> cf = results.get(response.request()); 498 cf.complete(response); 499 } 500 501 @Override 502 public void onError(HttpRequest request, Throwable t) { 503 CompletableFuture<HttpResponse<V>> cf = results.get(request); 504 cf.completeExceptionally(t); 505 } 506 507 @Override 508 public CompletableFuture<MultiMapResult<V>> completion( 509 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { 510 if (completion) 511 return onComplete.thenApply((ignored)-> results); 512 else 513 return onFinalPushPromise.thenApply((ignored) -> results); 514 } 515 } 516 517 /** 518 * Currently this consumes all of the data and ignores it 519 */ 520 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { 521 522 private final CompletableFuture<T> cf = new MinimalFuture<>(); 523 private final Optional<T> result; 524 private final AtomicBoolean subscribed = new AtomicBoolean(); 525 526 NullSubscriber(Optional<T> result) { 527 this.result = result; 528 } 529 530 @Override 531 public void onSubscribe(Flow.Subscription subscription) { 532 if (!subscribed.compareAndSet(false, true)) { 533 subscription.cancel(); 534 } else { 535 subscription.request(Long.MAX_VALUE); 536 } 537 } 538 539 @Override 540 public void onNext(List<ByteBuffer> items) { 541 Objects.requireNonNull(items); 542 } 543 544 @Override 545 public void onError(Throwable throwable) { 546 cf.completeExceptionally(throwable); 547 } 548 549 @Override 550 public void onComplete() { 551 if (result.isPresent()) { 552 cf.complete(result.get()); 553 } else { 554 cf.complete(null); 555 } 556 } 557 558 @Override 559 public CompletionStage<T> getBody() { 560 return cf; 561 } 562 } 563 564 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ 565 static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R> 566 implements HttpResponse.BodySubscriber<R> 567 { 568 private final CompletableFuture<R> cf = new MinimalFuture<>(); 569 private final S subscriber; 570 private final Function<S,R> finisher; 571 private volatile Subscription subscription; 572 573 SubscriberAdapter(S subscriber, Function<S,R> finisher) { 574 this.subscriber = Objects.requireNonNull(subscriber); 575 this.finisher = Objects.requireNonNull(finisher); 576 } 577 578 @Override 579 public void onSubscribe(Subscription subscription) { 580 Objects.requireNonNull(subscription); 581 if (this.subscription != null) { 582 subscription.cancel(); 583 } else { 584 this.subscription = subscription; 585 subscriber.onSubscribe(subscription); 586 } 587 } 588 589 @Override 590 public void onNext(List<ByteBuffer> item) { 591 Objects.requireNonNull(item); 592 try { 593 subscriber.onNext(item); 594 } catch (Throwable throwable) { 595 subscription.cancel(); 596 onError(throwable); 597 } 598 } 599 600 @Override 601 public void onError(Throwable throwable) { 602 Objects.requireNonNull(throwable); 603 try { 604 subscriber.onError(throwable); 605 } finally { 606 cf.completeExceptionally(throwable); 607 } 608 } 609 610 @Override 611 public void onComplete() { 612 try { 613 subscriber.onComplete(); 614 } finally { 615 try { 616 cf.complete(finisher.apply(subscriber)); 617 } catch (Throwable throwable) { 618 cf.completeExceptionally(throwable); 619 } 620 } 621 } 622 623 @Override 624 public CompletionStage<R> getBody() { 625 return cf; 626 } 627 } 628 }