1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.io.InputStream; 30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.OpenOption; 34 import java.nio.file.Path; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.ArrayList; 40 import java.util.Iterator; 41 import java.util.List; 42 import java.util.Objects; 43 import java.util.Optional; 44 import java.util.concurrent.ArrayBlockingQueue; 45 import java.util.concurrent.BlockingQueue; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.CompletionStage; 48 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 import java.util.function.Consumer; 52 import java.util.function.Function; 53 import jdk.incubator.http.internal.common.MinimalFuture; 54 import jdk.incubator.http.internal.common.Utils; 55 56 class ResponseSubscribers { 57 58 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { 59 private final Consumer<Optional<byte[]>> consumer; 60 private Flow.Subscription subscription; 61 private final CompletableFuture<Void> result = new MinimalFuture<>(); 62 private final AtomicBoolean subscribed = new AtomicBoolean(); 63 64 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { 65 this.consumer = consumer; 66 } 67 68 @Override 69 public CompletionStage<Void> getBody() { 70 return result; 71 } 72 73 @Override 74 public void onSubscribe(Flow.Subscription subscription) { 75 if (!subscribed.compareAndSet(false, true)) { 76 subscription.cancel(); 77 } else { 78 this.subscription = subscription; 79 subscription.request(1); 80 } 81 } 82 83 @Override 84 public void onNext(List<ByteBuffer> items) { 85 for (ByteBuffer item : items) { 86 byte[] buf = new byte[item.remaining()]; 87 item.get(buf); 88 consumer.accept(Optional.of(buf)); 89 } 90 subscription.request(1); 91 } 92 93 @Override 94 public void onError(Throwable throwable) { 95 result.completeExceptionally(throwable); 96 } 97 98 @Override 99 public void onComplete() { 100 consumer.accept(Optional.empty()); 101 result.complete(null); 102 } 103 104 } 105 106 static class PathSubscriber implements HttpResponse.BodySubscriber<Path> { 107 108 private final Path file; 109 private final CompletableFuture<Path> result = new MinimalFuture<>(); 110 111 private volatile Flow.Subscription subscription; 112 private volatile FileChannel out; 113 private volatile AccessControlContext acc; 114 private final OpenOption[] options; 115 116 PathSubscriber(Path file, OpenOption... options) { 117 this.file = file; 118 this.options = options; 119 } 120 121 void setAccessControlContext(AccessControlContext acc) { 122 this.acc = acc; 123 } 124 125 @Override 126 public void onSubscribe(Flow.Subscription subscription) { 127 if (System.getSecurityManager() != null && acc == null) 128 throw new InternalError( 129 "Unexpected null acc when security manager has been installed"); 130 131 this.subscription = subscription; 132 try { 133 PrivilegedExceptionAction<FileChannel> pa = 134 () -> FileChannel.open(file, options); 135 out = AccessController.doPrivileged(pa, acc); 136 } catch (PrivilegedActionException pae) { 137 Throwable t = pae.getCause() != null ? pae.getCause() : pae; 138 result.completeExceptionally(t); 139 subscription.cancel(); 140 return; 141 } 142 subscription.request(1); 143 } 144 145 @Override 146 public void onNext(List<ByteBuffer> items) { 147 try { 148 out.write(items.toArray(Utils.EMPTY_BB_ARRAY)); 149 } catch (IOException ex) { 150 Utils.close(out); 151 subscription.cancel(); 152 result.completeExceptionally(ex); 153 } 154 subscription.request(1); 155 } 156 157 @Override 158 public void onError(Throwable e) { 159 result.completeExceptionally(e); 160 Utils.close(out); 161 } 162 163 @Override 164 public void onComplete() { 165 Utils.close(out); 166 result.complete(file); 167 } 168 169 @Override 170 public CompletionStage<Path> getBody() { 171 return result; 172 } 173 } 174 175 static class ByteArraySubscriber<T> implements HttpResponse.BodySubscriber<T> { 176 private final Function<byte[], T> finisher; 177 private final CompletableFuture<T> result = new MinimalFuture<>(); 178 private final List<ByteBuffer> received = new ArrayList<>(); 179 180 private volatile Flow.Subscription subscription; 181 182 ByteArraySubscriber(Function<byte[],T> finisher) { 183 this.finisher = finisher; 184 } 185 186 @Override 187 public void onSubscribe(Flow.Subscription subscription) { 188 if (this.subscription != null) { 189 subscription.cancel(); 190 return; 191 } 192 this.subscription = subscription; 193 // We can handle whatever you've got 194 subscription.request(Long.MAX_VALUE); 195 } 196 197 @Override 198 public void onNext(List<ByteBuffer> items) { 199 // incoming buffers are allocated by http client internally, 200 // and won't be used anywhere except this place. 201 // So it's free simply to store them for further processing. 202 assert Utils.hasRemaining(items); 203 Utils.accumulateBuffers(received, items); 204 } 205 206 @Override 207 public void onError(Throwable throwable) { 208 received.clear(); 209 result.completeExceptionally(throwable); 210 } 211 212 static private byte[] join(List<ByteBuffer> bytes) { 213 int size = Utils.remaining(bytes, Integer.MAX_VALUE); 214 byte[] res = new byte[size]; 215 int from = 0; 216 for (ByteBuffer b : bytes) { 217 int l = b.remaining(); 218 b.get(res, from, l); 219 from += l; 220 } 221 return res; 222 } 223 224 @Override 225 public void onComplete() { 226 try { 227 result.complete(finisher.apply(join(received))); 228 received.clear(); 229 } catch (IllegalArgumentException e) { 230 result.completeExceptionally(e); 231 } 232 } 233 234 @Override 235 public CompletionStage<T> getBody() { 236 return result; 237 } 238 } 239 240 /** 241 * An InputStream built on top of the Flow API. 242 */ 243 static class HttpResponseInputStream extends InputStream 244 implements HttpResponse.BodySubscriber<InputStream> 245 { 246 final static boolean DEBUG = Utils.DEBUG; 247 final static int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer 248 249 // An immutable ByteBuffer sentinel to mark that the last byte was received. 250 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); 251 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); 252 private static final System.Logger DEBUG_LOGGER = 253 Utils.getDebugLogger("HttpResponseInputStream"::toString, DEBUG); 254 255 // A queue of yet unprocessed ByteBuffers received from the flow API. 256 private final BlockingQueue<List<ByteBuffer>> buffers; 257 private volatile Flow.Subscription subscription; 258 private volatile boolean closed; 259 private volatile Throwable failed; 260 private volatile Iterator<ByteBuffer> currentListItr; 261 private volatile ByteBuffer currentBuffer; 262 private final AtomicBoolean subscribed = new AtomicBoolean(); 263 264 HttpResponseInputStream() { 265 this(MAX_BUFFERS_IN_QUEUE); 266 } 267 268 HttpResponseInputStream(int maxBuffers) { 269 int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers); 270 // 1 additional slot needed for LAST_LIST added by onComplete 271 this.buffers = new ArrayBlockingQueue<>(capacity + 1); 272 } 273 274 @Override 275 public CompletionStage<InputStream> getBody() { 276 // Returns the stream immediately, before the 277 // response body is received. 278 // This makes it possible for senAsync().get().body() 279 // to complete before the response body is received. 280 return CompletableFuture.completedStage(this); 281 } 282 283 // Returns the current byte buffer to read from. 284 // If the current buffer has no remaining data, this method will take the 285 // next buffer from the buffers queue, possibly blocking until 286 // a new buffer is made available through the Flow API, or the 287 // end of the flow has been reached. 288 private ByteBuffer current() throws IOException { 289 while (currentBuffer == null || !currentBuffer.hasRemaining()) { 290 // Check whether the stream is closed or exhausted 291 if (closed || failed != null) { 292 throw new IOException("closed", failed); 293 } 294 if (currentBuffer == LAST_BUFFER) break; 295 296 try { 297 if (currentListItr == null || !currentListItr.hasNext()) { 298 // Take a new list of buffers from the queue, blocking 299 // if none is available yet... 300 301 DEBUG_LOGGER.log(Level.DEBUG, "Taking list of Buffers"); 302 List<ByteBuffer> lb = buffers.take(); 303 currentListItr = lb.iterator(); 304 DEBUG_LOGGER.log(Level.DEBUG, "List of Buffers Taken"); 305 306 // Check whether an exception was encountered upstream 307 if (closed || failed != null) 308 throw new IOException("closed", failed); 309 310 // Check whether we're done. 311 if (lb == LAST_LIST) { 312 currentListItr = null; 313 currentBuffer = LAST_BUFFER; 314 break; 315 } 316 317 // Request another upstream item ( list of buffers ) 318 Flow.Subscription s = subscription; 319 if (s != null) { 320 DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1"); 321 s.request(1); 322 } 323 assert currentListItr != null; 324 if (lb.isEmpty()) continue; 325 } 326 assert currentListItr != null; 327 assert currentListItr.hasNext(); 328 DEBUG_LOGGER.log(Level.DEBUG, "Next Buffer"); 329 currentBuffer = currentListItr.next(); 330 } catch (InterruptedException ex) { 331 // continue 332 } 333 } 334 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); 335 return currentBuffer; 336 } 337 338 @Override 339 public int read(byte[] bytes, int off, int len) throws IOException { 340 // get the buffer to read from, possibly blocking if 341 // none is available 342 ByteBuffer buffer; 343 if ((buffer = current()) == LAST_BUFFER) return -1; 344 345 // don't attempt to read more than what is available 346 // in the current buffer. 347 int read = Math.min(buffer.remaining(), len); 348 assert read > 0 && read <= buffer.remaining(); 349 350 // buffer.get() will do the boundary check for us. 351 buffer.get(bytes, off, read); 352 return read; 353 } 354 355 @Override 356 public int read() throws IOException { 357 ByteBuffer buffer; 358 if ((buffer = current()) == LAST_BUFFER) return -1; 359 return buffer.get() & 0xFF; 360 } 361 362 @Override 363 public void onSubscribe(Flow.Subscription s) { 364 try { 365 if (!subscribed.compareAndSet(false, true)) { 366 s.cancel(); 367 } else { 368 // check whether the stream is already closed. 369 // if so, we should cancel the subscription 370 // immediately. 371 boolean closed; 372 synchronized (this) { 373 closed = this.closed; 374 if (!closed) { 375 this.subscription = s; 376 } 377 } 378 if (closed) { 379 s.cancel(); 380 return; 381 } 382 assert buffers.remainingCapacity() > 1; // should contain at least 2 383 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " 384 + Math.max(1, buffers.remainingCapacity() - 1)); 385 s.request(Math.max(1, buffers.remainingCapacity() - 1)); 386 } 387 } catch (Throwable t) { 388 failed = t; 389 try { 390 close(); 391 } catch (IOException x) { 392 // OK 393 } finally { 394 onError(t); 395 } 396 } 397 } 398 399 @Override 400 public void onNext(List<ByteBuffer> t) { 401 Objects.requireNonNull(t); 402 try { 403 DEBUG_LOGGER.log(Level.DEBUG, "next item received"); 404 if (!buffers.offer(t)) { 405 throw new IllegalStateException("queue is full"); 406 } 407 DEBUG_LOGGER.log(Level.DEBUG, "item offered"); 408 } catch (Throwable ex) { 409 failed = ex; 410 try { 411 close(); 412 } catch (IOException ex1) { 413 // OK 414 } finally { 415 onError(ex); 416 } 417 } 418 } 419 420 @Override 421 public void onError(Throwable thrwbl) { 422 subscription = null; 423 failed = Objects.requireNonNull(thrwbl); 424 // The client process that reads the input stream might 425 // be blocked in queue.take(). 426 // Tries to offer LAST_LIST to the queue. If the queue is 427 // full we don't care if we can't insert this buffer, as 428 // the client can't be blocked in queue.take() in that case. 429 // Adding LAST_LIST to the queue is harmless, as the client 430 // should find failed != null before handling LAST_LIST. 431 buffers.offer(LAST_LIST); 432 } 433 434 @Override 435 public void onComplete() { 436 subscription = null; 437 onNext(LAST_LIST); 438 } 439 440 @Override 441 public void close() throws IOException { 442 Flow.Subscription s; 443 synchronized (this) { 444 if (closed) return; 445 closed = true; 446 s = subscription; 447 subscription = null; 448 } 449 // s will be null if already completed 450 try { 451 if (s != null) { 452 s.cancel(); 453 } 454 } finally { 455 buffers.offer(LAST_LIST); 456 super.close(); 457 } 458 } 459 460 } 461 462 static class MultiSubscriberImpl<V> 463 implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V> 464 { 465 private final MultiMapResult<V> results; 466 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; 467 private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler; 468 private final boolean completion; // aggregate completes on last PP received or overall completion 469 470 MultiSubscriberImpl( 471 Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler, 472 Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { 473 this.results = new MultiMapResult<>(new ConcurrentHashMap<>()); 474 this.requestHandler = requestHandler; 475 this.pushHandler = pushHandler; 476 this.completion = completion; 477 } 478 479 @Override 480 public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) { 481 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); 482 results.put(request, cf); 483 return requestHandler.apply(request); 484 } 485 486 @Override 487 public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) { 488 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); 489 results.put(push, cf); 490 return pushHandler.apply(push); 491 } 492 493 @Override 494 public void onResponse(HttpResponse<V> response) { 495 CompletableFuture<HttpResponse<V>> cf = results.get(response.request()); 496 cf.complete(response); 497 } 498 499 @Override 500 public void onError(HttpRequest request, Throwable t) { 501 CompletableFuture<HttpResponse<V>> cf = results.get(request); 502 cf.completeExceptionally(t); 503 } 504 505 @Override 506 public CompletableFuture<MultiMapResult<V>> completion( 507 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { 508 if (completion) 509 return onComplete.thenApply((ignored)-> results); 510 else 511 return onFinalPushPromise.thenApply((ignored) -> results); 512 } 513 } 514 515 /** 516 * Currently this consumes all of the data and ignores it 517 */ 518 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { 519 520 private final CompletableFuture<T> cf = new MinimalFuture<>(); 521 private final Optional<T> result; 522 private final AtomicBoolean subscribed = new AtomicBoolean(); 523 524 NullSubscriber(Optional<T> result) { 525 this.result = result; 526 } 527 528 @Override 529 public void onSubscribe(Flow.Subscription subscription) { 530 if (!subscribed.compareAndSet(false, true)) { 531 subscription.cancel(); 532 } else { 533 subscription.request(Long.MAX_VALUE); 534 } 535 } 536 537 @Override 538 public void onNext(List<ByteBuffer> items) { 539 Objects.requireNonNull(items); 540 } 541 542 @Override 543 public void onError(Throwable throwable) { 544 cf.completeExceptionally(throwable); 545 } 546 547 @Override 548 public void onComplete() { 549 if (result.isPresent()) { 550 cf.complete(result.get()); 551 } else { 552 cf.complete(null); 553 } 554 } 555 556 @Override 557 public CompletionStage<T> getBody() { 558 return cf; 559 } 560 } 561 }