1 /* 2 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.CompletionStage; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.util.concurrent.atomic.AtomicLong; 38 import java.util.function.Consumer; 39 import java.util.function.Function; 40 import java.net.http.HttpHeaders; 41 import java.net.http.HttpResponse; 42 import jdk.internal.net.http.ResponseContent.BodyParser; 43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser; 44 import jdk.internal.net.http.common.Log; 45 import jdk.internal.net.http.common.Logger; 46 import jdk.internal.net.http.common.MinimalFuture; 47 import jdk.internal.net.http.common.Utils; 48 import static java.net.http.HttpClient.Version.HTTP_1_1; 49 import static java.net.http.HttpResponse.BodySubscribers.discarding; 50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 51 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED; 52 53 /** 54 * Handles a HTTP/1.1 response (headers + body). 55 * There can be more than one of these per Http exchange. 56 */ 57 class Http1Response<T> { 58 59 private volatile ResponseContent content; 60 private final HttpRequestImpl request; 61 private Response response; 62 private final HttpConnection connection; 63 private HttpHeaders headers; 64 private int responseCode; 65 private final Http1Exchange<T> exchange; 66 private boolean return2Cache; // return connection to cache when finished 67 private final HeadersReader headersReader; // used to read the headers 68 private final BodyReader bodyReader; // used to read the body 69 private final Http1AsyncReceiver asyncReceiver; 70 private volatile EOFException eof; 71 private volatile BodyParser bodyParser; 72 // max number of bytes of (fixed length) body to ignore on redirect 73 private final static int MAX_IGNORE = 1024; 74 75 // Revisit: can we get rid of this? 76 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} 77 private volatile State readProgress = State.INITIAL; 78 79 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 80 final static AtomicLong responseCount = new AtomicLong(); 81 final long id = responseCount.incrementAndGet(); 82 private Http1HeaderParser hd; 83 84 Http1Response(HttpConnection conn, 85 Http1Exchange<T> exchange, 86 Http1AsyncReceiver asyncReceiver) { 87 this.readProgress = State.INITIAL; 88 this.request = exchange.request(); 89 this.exchange = exchange; 90 this.connection = conn; 91 this.asyncReceiver = asyncReceiver; 92 headersReader = new HeadersReader(this::advance); 93 bodyReader = new BodyReader(this::advance); 94 95 hd = new Http1HeaderParser(); 96 readProgress = State.READING_HEADERS; 97 headersReader.start(hd); 98 asyncReceiver.subscribe(headersReader); 99 } 100 101 String dbgTag; 102 private String dbgString() { 103 String dbg = dbgTag; 104 if (dbg == null) { 105 String cdbg = connection.dbgTag; 106 if (cdbg != null) { 107 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; 108 } else { 109 dbg = "Http1Response(id=" + id + ")"; 110 } 111 } 112 return dbg; 113 } 114 115 // The ClientRefCountTracker is used to track the state 116 // of a pending operation. Altough there usually is a single 117 // point where the operation starts, it may terminate at 118 // different places. 119 private final class ClientRefCountTracker { 120 final HttpClientImpl client = connection.client(); 121 // state & 0x01 != 0 => acquire called 122 // state & 0x02 != 0 => tryRelease called 123 byte state; 124 125 public synchronized void acquire() { 126 if (state == 0) { 127 // increment the reference count on the HttpClientImpl 128 // to prevent the SelectorManager thread from exiting 129 // until our operation is complete. 130 if (debug.on()) 131 debug.log("Operation started: incrementing ref count for %s", client); 132 client.reference(); 133 state = 0x01; 134 } else { 135 if (debug.on()) 136 debug.log("Operation ref count for %s is already %s", 137 client, ((state & 0x2) == 0x2) ? "released." : "incremented!" ); 138 assert (state & 0x01) == 0 : "reference count already incremented"; 139 } 140 } 141 142 public synchronized void tryRelease() { 143 if (state == 0x01) { 144 // decrement the reference count on the HttpClientImpl 145 // to allow the SelectorManager thread to exit if no 146 // other operation is pending and the facade is no 147 // longer referenced. 148 if (debug.on()) 149 debug.log("Operation finished: decrementing ref count for %s", client); 150 client.unreference(); 151 } else if (state == 0) { 152 if (debug.on()) 153 debug.log("Operation finished: releasing ref count for %s", client); 154 } else if ((state & 0x02) == 0x02) { 155 if (debug.on()) 156 debug.log("ref count for %s already released", client); 157 } 158 state |= 0x02; 159 } 160 } 161 162 private volatile boolean firstTimeAround = true; 163 164 public CompletableFuture<Response> readHeadersAsync(Executor executor) { 165 if (debug.on()) 166 debug.log("Reading Headers: (remaining: " 167 + asyncReceiver.remaining() +") " + readProgress); 168 169 if (firstTimeAround) { 170 if (debug.on()) debug.log("First time around"); 171 firstTimeAround = false; 172 } else { 173 // with expect continue we will resume reading headers + body. 174 asyncReceiver.unsubscribe(bodyReader); 175 bodyReader.reset(); 176 177 hd = new Http1HeaderParser(); 178 readProgress = State.READING_HEADERS; 179 headersReader.reset(); 180 headersReader.start(hd); 181 asyncReceiver.subscribe(headersReader); 182 } 183 184 CompletableFuture<State> cf = headersReader.completion(); 185 assert cf != null : "parsing not started"; 186 if (debug.on()) { 187 debug.log("headersReader is %s", 188 cf == null ? "not yet started" 189 : cf.isDone() ? "already completed" 190 : "not yet completed"); 191 } 192 193 Function<State, Response> lambda = (State completed) -> { 194 assert completed == State.READING_HEADERS; 195 if (debug.on()) 196 debug.log("Reading Headers: creating Response object;" 197 + " state is now " + readProgress); 198 asyncReceiver.unsubscribe(headersReader); 199 responseCode = hd.responseCode(); 200 headers = hd.headers(); 201 202 response = new Response(request, 203 exchange.getExchange(), 204 headers, 205 connection, 206 responseCode, 207 HTTP_1_1); 208 209 if (Log.headers()) { 210 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 211 Log.dumpHeaders(sb, " ", headers); 212 Log.logHeaders(sb.toString()); 213 } 214 215 return response; 216 }; 217 218 if (executor != null) { 219 return cf.thenApplyAsync(lambda, executor); 220 } else { 221 return cf.thenApply(lambda); 222 } 223 } 224 225 private boolean finished; 226 227 synchronized void completed() { 228 finished = true; 229 } 230 231 synchronized boolean finished() { 232 return finished; 233 } 234 235 /** 236 * Return known fixed content length or -1 if chunked, or -2 if no content-length 237 * information in which case, connection termination delimits the response body 238 */ 239 long fixupContentLen(long clen) { 240 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { 241 return 0L; 242 } 243 if (clen == -1L) { 244 if (headers.firstValue("Transfer-encoding").orElse("") 245 .equalsIgnoreCase("chunked")) { 246 return -1L; 247 } 248 if (responseCode == 101) { 249 // this is a h2c or websocket upgrade, contentlength must be zero 250 return 0L; 251 } 252 return -2L; 253 } 254 return clen; 255 } 256 257 /** 258 * Read up to MAX_IGNORE bytes discarding 259 */ 260 public CompletableFuture<Void> ignoreBody(Executor executor) { 261 int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 262 if (clen == -1 || clen > MAX_IGNORE) { 263 connection.close(); 264 return MinimalFuture.completedFuture(null); // not treating as error 265 } else { 266 return readBody(discarding(), true, executor); 267 } 268 } 269 270 // Used for those response codes that have no body associated 271 public void nullBody(HttpResponse<T> resp, Throwable t) { 272 if (t != null) connection.close(); 273 else { 274 return2Cache = !request.isWebSocket(); 275 onFinished(); 276 } 277 } 278 279 static final Flow.Subscription NOP = new Flow.Subscription() { 280 @Override 281 public void request(long n) { } 282 public void cancel() { } 283 }; 284 285 /** 286 * The Http1AsyncReceiver ensures that all calls to 287 * the subscriber, including onSubscribe, occur sequentially. 288 * There could however be some race conditions that could happen 289 * in case of unexpected errors thrown at unexpected places, which 290 * may cause onError to be called multiple times. 291 * The Http1BodySubscriber will ensure that the user subscriber 292 * is actually completed only once - and only after it is 293 * subscribed. 294 * @param <U> The type of response. 295 */ 296 final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> { 297 final HttpResponse.BodySubscriber<U> userSubscriber; 298 final AtomicBoolean completed = new AtomicBoolean(); 299 volatile Throwable withError; 300 volatile boolean subscribed; 301 Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) { 302 this.userSubscriber = userSubscriber; 303 } 304 305 // propagate the error to the user subscriber, even if not 306 // subscribed yet. 307 private void propagateError(Throwable t) { 308 assert t != null; 309 try { 310 // if unsubscribed at this point, it will not 311 // get subscribed later - so do it now and 312 // propagate the error 313 if (subscribed == false) { 314 subscribed = true; 315 userSubscriber.onSubscribe(NOP); 316 } 317 } finally { 318 // if onError throws then there is nothing to do 319 // here: let the caller deal with it by logging 320 // and closing the connection. 321 userSubscriber.onError(t); 322 } 323 } 324 325 // complete the subscriber, either normally or exceptionally 326 // ensure that the subscriber is completed only once. 327 private void complete(Throwable t) { 328 if (completed.compareAndSet(false, true)) { 329 t = withError = Utils.getCompletionCause(t); 330 if (t == null) { 331 assert subscribed; 332 try { 333 userSubscriber.onComplete(); 334 } catch (Throwable x) { 335 // Simply propagate the error by calling 336 // onError on the user subscriber, and let the 337 // connection be reused since we should have received 338 // and parsed all the bytes when we reach here. 339 // If onError throws in turn, then we will simply 340 // let that new exception flow up to the caller 341 // and let it deal with it. 342 // (i.e: log and close the connection) 343 // Note that rethrowing here could introduce a 344 // race that might cause the next send() operation to 345 // fail as the connection has already been put back 346 // into the cache when we reach here. 347 propagateError(t = withError = Utils.getCompletionCause(x)); 348 } 349 } else { 350 propagateError(t); 351 } 352 } 353 } 354 355 @Override 356 public CompletionStage<U> getBody() { 357 return userSubscriber.getBody(); 358 } 359 @Override 360 public void onSubscribe(Flow.Subscription subscription) { 361 if (!subscribed) { 362 subscribed = true; 363 userSubscriber.onSubscribe(subscription); 364 } else { 365 // could be already subscribed and completed 366 // if an unexpected error occurred before the actual 367 // subscription - though that's not supposed 368 // happen. 369 assert completed.get(); 370 } 371 } 372 @Override 373 public void onNext(List<ByteBuffer> item) { 374 assert !completed.get(); 375 userSubscriber.onNext(item); 376 } 377 @Override 378 public void onError(Throwable throwable) { 379 complete(throwable); 380 } 381 @Override 382 public void onComplete() { 383 complete(null); 384 } 385 } 386 387 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, 388 boolean return2Cache, 389 Executor executor) { 390 this.return2Cache = return2Cache; 391 final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p); 392 393 final CompletableFuture<U> cf = new MinimalFuture<>(); 394 395 long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L); 396 final long clen = fixupContentLen(clen0); 397 398 // expect-continue reads headers and body twice. 399 // if we reach here, we must reset the headersReader state. 400 asyncReceiver.unsubscribe(headersReader); 401 headersReader.reset(); 402 ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); 403 404 // We need to keep hold on the client facade until the 405 // tracker has been incremented. 406 connection.client().reference(); 407 executor.execute(() -> { 408 try { 409 content = new ResponseContent( 410 connection, clen, headers, subscriber, 411 this::onFinished 412 ); 413 if (cf.isCompletedExceptionally()) { 414 // if an error occurs during subscription 415 connection.close(); 416 return; 417 } 418 // increment the reference count on the HttpClientImpl 419 // to prevent the SelectorManager thread from exiting until 420 // the body is fully read. 421 refCountTracker.acquire(); 422 bodyParser = content.getBodyParser( 423 (t) -> { 424 try { 425 if (t != null) { 426 try { 427 subscriber.onError(t); 428 } finally { 429 cf.completeExceptionally(t); 430 } 431 } 432 } finally { 433 bodyReader.onComplete(t); 434 if (t != null) { 435 connection.close(); 436 } 437 } 438 }); 439 bodyReader.start(bodyParser); 440 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); 441 asyncReceiver.subscribe(bodyReader); 442 assert bodyReaderCF != null : "parsing not started"; 443 // Make sure to keep a reference to asyncReceiver from 444 // within this 445 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { 446 t = Utils.getCompletionCause(t); 447 try { 448 if (t == null) { 449 if (debug.on()) debug.log("Finished reading body: " + s); 450 assert s == State.READING_BODY; 451 } 452 if (t != null) { 453 subscriber.onError(t); 454 cf.completeExceptionally(t); 455 } 456 } catch (Throwable x) { 457 // not supposed to happen 458 asyncReceiver.onReadError(x); 459 } finally { 460 // we're done: release the ref count for 461 // the current operation. 462 refCountTracker.tryRelease(); 463 } 464 }); 465 connection.addTrailingOperation(trailingOp); 466 } catch (Throwable t) { 467 if (debug.on()) debug.log("Failed reading body: " + t); 468 try { 469 subscriber.onError(t); 470 cf.completeExceptionally(t); 471 } finally { 472 asyncReceiver.onReadError(t); 473 } 474 } finally { 475 connection.client().unreference(); 476 } 477 }); 478 try { 479 p.getBody().whenComplete((U u, Throwable t) -> { 480 if (t == null) 481 cf.complete(u); 482 else 483 cf.completeExceptionally(t); 484 }); 485 } catch (Throwable t) { 486 cf.completeExceptionally(t); 487 asyncReceiver.setRetryOnError(false); 488 asyncReceiver.onReadError(t); 489 } 490 491 return cf.whenComplete((s,t) -> { 492 if (t != null) { 493 // If an exception occurred, release the 494 // ref count for the current operation, as 495 // it may never be triggered otherwise 496 // (BodySubscriber ofInputStream) 497 // If there was no exception then the 498 // ref count will be/have been released when 499 // the last byte of the response is/was received 500 refCountTracker.tryRelease(); 501 } 502 }); 503 } 504 505 506 private void onFinished() { 507 asyncReceiver.clear(); 508 if (return2Cache) { 509 Log.logTrace("Attempting to return connection to the pool: {0}", connection); 510 // TODO: need to do something here? 511 // connection.setAsyncCallbacks(null, null, null); 512 513 // don't return the connection to the cache if EOF happened. 514 if (debug.on()) 515 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool"); 516 connection.closeOrReturnToCache(eof == null ? headers : null); 517 } 518 } 519 520 HttpHeaders responseHeaders() { 521 return headers; 522 } 523 524 int responseCode() { 525 return responseCode; 526 } 527 528 // ================ Support for plugging into Http1Receiver ================= 529 // ============================================================================ 530 531 // Callback: Error receiver: Consumer of Throwable. 532 void onReadError(Throwable t) { 533 Log.logError(t); 534 Receiver<?> receiver = receiver(readProgress); 535 if (t instanceof EOFException) { 536 debug.log(Level.DEBUG, "onReadError: received EOF"); 537 eof = (EOFException) t; 538 } 539 CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); 540 debug.log(Level.DEBUG, () -> "onReadError: cf is " 541 + (cf == null ? "null" 542 : (cf.isDone() ? "already completed" 543 : "not yet completed"))); 544 if (cf != null) { 545 cf.completeExceptionally(t); 546 } else { 547 debug.log(Level.DEBUG, "onReadError", t); 548 } 549 debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); 550 connection.close(); 551 } 552 553 // ======================================================================== 554 555 private State advance(State previous) { 556 assert readProgress == previous; 557 switch(previous) { 558 case READING_HEADERS: 559 asyncReceiver.unsubscribe(headersReader); 560 return readProgress = State.READING_BODY; 561 case READING_BODY: 562 asyncReceiver.unsubscribe(bodyReader); 563 return readProgress = State.DONE; 564 default: 565 throw new InternalError("can't advance from " + previous); 566 } 567 } 568 569 Receiver<?> receiver(State state) { 570 switch(state) { 571 case READING_HEADERS: return headersReader; 572 case READING_BODY: return bodyReader; 573 default: return null; 574 } 575 576 } 577 578 static abstract class Receiver<T> 579 implements Http1AsyncReceiver.Http1AsyncDelegate { 580 abstract void start(T parser); 581 abstract CompletableFuture<State> completion(); 582 // accepts a buffer from upstream. 583 // this should be implemented as a simple call to 584 // accept(ref, parser, cf) 585 public abstract boolean tryAsyncReceive(ByteBuffer buffer); 586 public abstract void onReadError(Throwable t); 587 // handle a byte buffer received from upstream. 588 // this method should set the value of Http1Response.buffer 589 // to ref.get() before beginning parsing. 590 abstract void handle(ByteBuffer buf, T parser, 591 CompletableFuture<State> cf); 592 // resets this objects state so that it can be reused later on 593 // typically puts the reference to parser and completion to null 594 abstract void reset(); 595 596 // accepts a byte buffer received from upstream 597 // returns true if the buffer is fully parsed and more data can 598 // be accepted, false otherwise. 599 final boolean accept(ByteBuffer buf, T parser, 600 CompletableFuture<State> cf) { 601 if (cf == null || parser == null || cf.isDone()) return false; 602 handle(buf, parser, cf); 603 return !cf.isDone(); 604 } 605 public abstract void onSubscribe(AbstractSubscription s); 606 public abstract AbstractSubscription subscription(); 607 608 } 609 610 // Invoked with each new ByteBuffer when reading headers... 611 final class HeadersReader extends Receiver<Http1HeaderParser> { 612 final Consumer<State> onComplete; 613 volatile Http1HeaderParser parser; 614 volatile CompletableFuture<State> cf; 615 volatile long count; // bytes parsed (for debug) 616 volatile AbstractSubscription subscription; 617 618 HeadersReader(Consumer<State> onComplete) { 619 this.onComplete = onComplete; 620 } 621 622 @Override 623 public AbstractSubscription subscription() { 624 return subscription; 625 } 626 627 @Override 628 public void onSubscribe(AbstractSubscription s) { 629 this.subscription = s; 630 s.request(1); 631 } 632 633 @Override 634 void reset() { 635 cf = null; 636 parser = null; 637 count = 0; 638 subscription = null; 639 } 640 641 // Revisit: do we need to support restarting? 642 @Override 643 final void start(Http1HeaderParser hp) { 644 count = 0; 645 cf = new MinimalFuture<>(); 646 parser = hp; 647 } 648 649 @Override 650 CompletableFuture<State> completion() { 651 return cf; 652 } 653 654 @Override 655 public final boolean tryAsyncReceive(ByteBuffer ref) { 656 boolean hasDemand = subscription.demand().tryDecrement(); 657 assert hasDemand; 658 boolean needsMore = accept(ref, parser, cf); 659 if (needsMore) subscription.request(1); 660 return needsMore; 661 } 662 663 @Override 664 public final void onReadError(Throwable t) { 665 t = wrapWithExtraDetail(t, parser::currentStateMessage); 666 Http1Response.this.onReadError(t); 667 } 668 669 @Override 670 final void handle(ByteBuffer b, 671 Http1HeaderParser parser, 672 CompletableFuture<State> cf) { 673 assert cf != null : "parsing not started"; 674 assert parser != null : "no parser"; 675 try { 676 count += b.remaining(); 677 if (debug.on()) 678 debug.log("Sending " + b.remaining() + "/" + b.capacity() 679 + " bytes to header parser"); 680 if (parser.parse(b)) { 681 count -= b.remaining(); 682 if (debug.on()) 683 debug.log("Parsing headers completed. bytes=" + count); 684 onComplete.accept(State.READING_HEADERS); 685 cf.complete(State.READING_HEADERS); 686 } 687 } catch (Throwable t) { 688 if (debug.on()) 689 debug.log("Header parser failed to handle buffer: " + t); 690 cf.completeExceptionally(t); 691 } 692 } 693 694 @Override 695 public void close(Throwable error) { 696 // if there's no error nothing to do: the cf should/will 697 // be completed. 698 if (error != null) { 699 CompletableFuture<State> cf = this.cf; 700 if (cf != null) { 701 if (debug.on()) 702 debug.log("close: completing header parser CF with " + error); 703 cf.completeExceptionally(error); 704 } 705 } 706 } 707 } 708 709 // Invoked with each new ByteBuffer when reading bodies... 710 final class BodyReader extends Receiver<BodyParser> { 711 final Consumer<State> onComplete; 712 volatile BodyParser parser; 713 volatile CompletableFuture<State> cf; 714 volatile AbstractSubscription subscription; 715 BodyReader(Consumer<State> onComplete) { 716 this.onComplete = onComplete; 717 } 718 719 @Override 720 void reset() { 721 parser = null; 722 cf = null; 723 subscription = null; 724 } 725 726 // Revisit: do we need to support restarting? 727 @Override 728 final void start(BodyParser parser) { 729 cf = new MinimalFuture<>(); 730 this.parser = parser; 731 } 732 733 @Override 734 CompletableFuture<State> completion() { 735 return cf; 736 } 737 738 @Override 739 public final boolean tryAsyncReceive(ByteBuffer b) { 740 return accept(b, parser, cf); 741 } 742 743 @Override 744 public final void onReadError(Throwable t) { 745 if (t instanceof EOFException && bodyParser != null && 746 bodyParser instanceof UnknownLengthBodyParser) { 747 ((UnknownLengthBodyParser)bodyParser).complete(); 748 return; 749 } 750 t = wrapWithExtraDetail(t, parser::currentStateMessage); 751 Http1Response.this.onReadError(t); 752 } 753 754 @Override 755 public AbstractSubscription subscription() { 756 return subscription; 757 } 758 759 @Override 760 public void onSubscribe(AbstractSubscription s) { 761 this.subscription = s; 762 try { 763 parser.onSubscribe(s); 764 } catch (Throwable t) { 765 cf.completeExceptionally(t); 766 throw t; 767 } 768 } 769 770 @Override 771 final void handle(ByteBuffer b, 772 BodyParser parser, 773 CompletableFuture<State> cf) { 774 assert cf != null : "parsing not started"; 775 assert parser != null : "no parser"; 776 try { 777 if (debug.on()) 778 debug.log("Sending " + b.remaining() + "/" + b.capacity() 779 + " bytes to body parser"); 780 parser.accept(b); 781 } catch (Throwable t) { 782 if (debug.on()) 783 debug.log("Body parser failed to handle buffer: " + t); 784 if (!cf.isDone()) { 785 cf.completeExceptionally(t); 786 } 787 } 788 } 789 790 final void onComplete(Throwable closedExceptionally) { 791 if (cf.isDone()) return; 792 if (closedExceptionally != null) { 793 cf.completeExceptionally(closedExceptionally); 794 } else { 795 onComplete.accept(State.READING_BODY); 796 cf.complete(State.READING_BODY); 797 } 798 } 799 800 @Override 801 public final void close(Throwable error) { 802 CompletableFuture<State> cf = this.cf; 803 if (cf != null && !cf.isDone()) { 804 // we want to make sure dependent actions are triggered 805 // in order to make sure the client reference count 806 // is decremented 807 if (error != null) { 808 if (debug.on()) 809 debug.log("close: completing body parser CF with " + error); 810 cf.completeExceptionally(error); 811 } else { 812 if (debug.on()) 813 debug.log("close: completing body parser CF"); 814 cf.complete(State.READING_BODY); 815 } 816 } 817 } 818 819 @Override 820 public String toString() { 821 return super.toString() + "/parser=" + String.valueOf(parser); 822 } 823 } 824 }