1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.CompletionStage; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.util.concurrent.atomic.AtomicLong; 38 import java.util.function.Consumer; 39 import java.util.function.Function; 40 import java.net.http.HttpHeaders; 41 import java.net.http.HttpResponse; 42 import jdk.internal.net.http.ResponseContent.BodyParser; 43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser; 44 import jdk.internal.net.http.common.Log; 45 import jdk.internal.net.http.common.Logger; 46 import jdk.internal.net.http.common.MinimalFuture; 47 import jdk.internal.net.http.common.Utils; 48 import static java.net.http.HttpClient.Version.HTTP_1_1; 49 import static java.net.http.HttpResponse.BodySubscribers.discarding; 50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 51 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED; 52 53 /** 54 * Handles a HTTP/1.1 response (headers + body). 55 * There can be more than one of these per Http exchange. 56 */ 57 class Http1Response<T> { 58 59 private volatile ResponseContent content; 60 private final HttpRequestImpl request; 61 private Response response; 62 private final HttpConnection connection; 63 private HttpHeaders headers; 64 private int responseCode; 65 private final Http1Exchange<T> exchange; 66 private boolean return2Cache; // return connection to cache when finished 67 private final HeadersReader headersReader; // used to read the headers 68 private final BodyReader bodyReader; // used to read the body 69 private final Http1AsyncReceiver asyncReceiver; 70 private volatile EOFException eof; 71 private volatile BodyParser bodyParser; 72 // max number of bytes of (fixed length) body to ignore on redirect 73 private final static int MAX_IGNORE = 1024; 74 75 // Revisit: can we get rid of this? 76 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} 77 private volatile State readProgress = State.INITIAL; 78 79 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 80 final static AtomicLong responseCount = new AtomicLong(); 81 final long id = responseCount.incrementAndGet(); 82 private Http1HeaderParser hd; 83 84 Http1Response(HttpConnection conn, 85 Http1Exchange<T> exchange, 86 Http1AsyncReceiver asyncReceiver) { 87 this.readProgress = State.INITIAL; 88 this.request = exchange.request(); 89 this.exchange = exchange; 90 this.connection = conn; 91 this.asyncReceiver = asyncReceiver; 92 headersReader = new HeadersReader(this::advance); 93 bodyReader = new BodyReader(this::advance); 94 95 hd = new Http1HeaderParser(); 96 readProgress = State.READING_HEADERS; 97 headersReader.start(hd); 98 asyncReceiver.subscribe(headersReader); 99 } 100 101 String dbgTag; 102 private String dbgString() { 103 String dbg = dbgTag; 104 if (dbg == null) { 105 String cdbg = connection.dbgTag; 106 if (cdbg != null) { 107 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; 108 } else { 109 dbg = "Http1Response(id=" + id + ")"; 110 } 111 } 112 return dbg; 113 } 114 115 // The ClientRefCountTracker is used to track the state 116 // of a pending operation. Altough there usually is a single 117 // point where the operation starts, it may terminate at 118 // different places. 119 private final class ClientRefCountTracker { 120 final HttpClientImpl client = connection.client(); 121 // state & 0x01 != 0 => acquire called 122 // state & 0x02 != 0 => tryRelease called 123 byte state; 124 125 public synchronized void acquire() { 126 if (state == 0) { 127 // increment the reference count on the HttpClientImpl 128 // to prevent the SelectorManager thread from exiting 129 // until our operation is complete. 130 if (debug.on()) 131 debug.log("Operation started: incrementing ref count for %s", client); 132 client.reference(); 133 state = 0x01; 134 } else { 135 if (debug.on()) 136 debug.log("Operation ref count for %s is already %s", 137 client, ((state & 0x2) == 0x2) ? "released." : "incremented!" ); 138 assert (state & 0x01) == 0 : "reference count already incremented"; 139 } 140 } 141 142 public synchronized void tryRelease() { 143 if (state == 0x01) { 144 // decrement the reference count on the HttpClientImpl 145 // to allow the SelectorManager thread to exit if no 146 // other operation is pending and the facade is no 147 // longer referenced. 148 if (debug.on()) 149 debug.log("Operation finished: decrementing ref count for %s", client); 150 client.unreference(); 151 } else if (state == 0) { 152 if (debug.on()) 153 debug.log("Operation finished: releasing ref count for %s", client); 154 } else if ((state & 0x02) == 0x02) { 155 if (debug.on()) 156 debug.log("ref count for %s already released", client); 157 } 158 state |= 0x02; 159 } 160 } 161 162 private volatile boolean firstTimeAround = true; 163 164 public CompletableFuture<Response> readHeadersAsync(Executor executor) { 165 if (debug.on()) 166 debug.log("Reading Headers: (remaining: " 167 + asyncReceiver.remaining() +") " + readProgress); 168 169 if (firstTimeAround) { 170 if (debug.on()) debug.log("First time around"); 171 firstTimeAround = false; 172 } else { 173 // with expect continue we will resume reading headers + body. 174 asyncReceiver.unsubscribe(bodyReader); 175 bodyReader.reset(); 176 177 hd = new Http1HeaderParser(); 178 readProgress = State.READING_HEADERS; 179 headersReader.reset(); 180 headersReader.start(hd); 181 asyncReceiver.subscribe(headersReader); 182 } 183 184 CompletableFuture<State> cf = headersReader.completion(); 185 assert cf != null : "parsing not started"; 186 if (debug.on()) { 187 debug.log("headersReader is %s", 188 cf == null ? "not yet started" 189 : cf.isDone() ? "already completed" 190 : "not yet completed"); 191 } 192 193 Function<State, Response> lambda = (State completed) -> { 194 assert completed == State.READING_HEADERS; 195 if (debug.on()) 196 debug.log("Reading Headers: creating Response object;" 197 + " state is now " + readProgress); 198 asyncReceiver.unsubscribe(headersReader); 199 responseCode = hd.responseCode(); 200 headers = hd.headers(); 201 202 response = new Response(request, 203 exchange.getExchange(), 204 headers, 205 connection, 206 responseCode, 207 HTTP_1_1); 208 209 if (Log.headers()) { 210 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 211 Log.dumpHeaders(sb, " ", headers); 212 Log.logHeaders(sb.toString()); 213 } 214 215 return response; 216 }; 217 218 if (executor != null) { 219 return cf.thenApplyAsync(lambda, executor); 220 } else { 221 return cf.thenApply(lambda); 222 } 223 } 224 225 private boolean finished; 226 227 synchronized void completed() { 228 finished = true; 229 } 230 231 synchronized boolean finished() { 232 return finished; 233 } 234 235 /** 236 * Return known fixed content length or -1 if chunked, or -2 if no content-length 237 * information in which case, connection termination delimits the response body 238 */ 239 long fixupContentLen(long clen) { 240 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { 241 return 0L; 242 } 243 if (clen == -1L) { 244 if (headers.firstValue("Transfer-encoding").orElse("") 245 .equalsIgnoreCase("chunked")) { 246 return -1L; 247 } 248 if (responseCode == 101) { 249 // this is a h2c or websocket upgrade, contentlength must be zero 250 return 0L; 251 } 252 return -2L; 253 } 254 return clen; 255 } 256 257 /** 258 * Read up to MAX_IGNORE bytes discarding 259 */ 260 public CompletableFuture<Void> ignoreBody(Executor executor) { 261 int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 262 if (clen == -1 || clen > MAX_IGNORE) { 263 connection.close(); 264 return MinimalFuture.completedFuture(null); // not treating as error 265 } else { 266 return readBody(discarding(), !request.isWebSocket(), executor); 267 } 268 } 269 270 // Used for those response codes that have no body associated 271 public void nullBody(HttpResponse<T> resp, Throwable t) { 272 if (t != null) connection.close(); 273 else { 274 return2Cache = !request.isWebSocket(); 275 onFinished(); 276 } 277 } 278 279 static final Flow.Subscription NOP = new Flow.Subscription() { 280 @Override 281 public void request(long n) { } 282 public void cancel() { } 283 }; 284 285 /** 286 * The Http1AsyncReceiver ensures that all calls to 287 * the subscriber, including onSubscribe, occur sequentially. 288 * There could however be some race conditions that could happen 289 * in case of unexpected errors thrown at unexpected places, which 290 * may cause onError to be called multiple times. 291 * The Http1BodySubscriber will ensure that the user subscriber 292 * is actually completed only once - and only after it is 293 * subscribed. 294 * @param <U> The type of response. 295 */ 296 final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> { 297 final HttpResponse.BodySubscriber<U> userSubscriber; 298 final AtomicBoolean completed = new AtomicBoolean(); 299 volatile Throwable withError; 300 volatile boolean subscribed; 301 Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) { 302 this.userSubscriber = userSubscriber; 303 } 304 305 // propagate the error to the user subscriber, even if not 306 // subscribed yet. 307 private void propagateError(Throwable t) { 308 assert t != null; 309 try { 310 // if unsubscribed at this point, it will not 311 // get subscribed later - so do it now and 312 // propagate the error 313 if (subscribed == false) { 314 subscribed = true; 315 userSubscriber.onSubscribe(NOP); 316 } 317 } finally { 318 // if onError throws then there is nothing to do 319 // here: let the caller deal with it by logging 320 // and closing the connection. 321 userSubscriber.onError(t); 322 } 323 } 324 325 // complete the subscriber, either normally or exceptionally 326 // ensure that the subscriber is completed only once. 327 private void complete(Throwable t) { 328 if (completed.compareAndSet(false, true)) { 329 t = withError = Utils.getCompletionCause(t); 330 if (t == null) { 331 assert subscribed; 332 try { 333 userSubscriber.onComplete(); 334 } catch (Throwable x) { 335 // Simply propagate the error by calling 336 // onError on the user subscriber, and let the 337 // connection be reused since we should have received 338 // and parsed all the bytes when we reach here. 339 // If onError throws in turn, then we will simply 340 // let that new exception flow up to the caller 341 // and let it deal with it. 342 // (i.e: log and close the connection) 343 // Note that rethrowing here could introduce a 344 // race that might cause the next send() operation to 345 // fail as the connection has already been put back 346 // into the cache when we reach here. 347 propagateError(t = withError = Utils.getCompletionCause(x)); 348 } 349 } else { 350 propagateError(t); 351 } 352 } 353 } 354 355 @Override 356 public CompletionStage<U> getBody() { 357 return userSubscriber.getBody(); 358 } 359 @Override 360 public void onSubscribe(Flow.Subscription subscription) { 361 if (!subscribed) { 362 subscribed = true; 363 userSubscriber.onSubscribe(subscription); 364 } else { 365 // could be already subscribed and completed 366 // if an unexpected error occurred before the actual 367 // subscription - though that's not supposed 368 // happen. 369 assert completed.get(); 370 } 371 } 372 @Override 373 public void onNext(List<ByteBuffer> item) { 374 assert !completed.get(); 375 userSubscriber.onNext(item); 376 } 377 @Override 378 public void onError(Throwable throwable) { 379 complete(throwable); 380 } 381 @Override 382 public void onComplete() { 383 complete(null); 384 } 385 } 386 387 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, 388 boolean return2Cache, 389 Executor executor) { 390 if (debug.on()) { 391 debug.log("readBody: return2Cache: " + return2Cache); 392 if (request.isWebSocket() && return2Cache && connection != null) { 393 debug.log("websocket connection will be returned to cache: " 394 + connection.getClass() + "/" + connection ); 395 } 396 } 397 assert !return2Cache || !request.isWebSocket(); 398 this.return2Cache = return2Cache; 399 final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p); 400 401 final CompletableFuture<U> cf = new MinimalFuture<>(); 402 403 long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L); 404 final long clen = fixupContentLen(clen0); 405 406 // expect-continue reads headers and body twice. 407 // if we reach here, we must reset the headersReader state. 408 asyncReceiver.unsubscribe(headersReader); 409 headersReader.reset(); 410 ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); 411 412 // We need to keep hold on the client facade until the 413 // tracker has been incremented. 414 connection.client().reference(); 415 executor.execute(() -> { 416 try { 417 content = new ResponseContent( 418 connection, clen, headers, subscriber, 419 this::onFinished 420 ); 421 if (cf.isCompletedExceptionally()) { 422 // if an error occurs during subscription 423 connection.close(); 424 return; 425 } 426 // increment the reference count on the HttpClientImpl 427 // to prevent the SelectorManager thread from exiting until 428 // the body is fully read. 429 refCountTracker.acquire(); 430 bodyParser = content.getBodyParser( 431 (t) -> { 432 try { 433 if (t != null) { 434 try { 435 subscriber.onError(t); 436 } finally { 437 cf.completeExceptionally(t); 438 } 439 } 440 } finally { 441 bodyReader.onComplete(t); 442 if (t != null) { 443 connection.close(); 444 } 445 } 446 }); 447 bodyReader.start(bodyParser); 448 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); 449 asyncReceiver.subscribe(bodyReader); 450 assert bodyReaderCF != null : "parsing not started"; 451 // Make sure to keep a reference to asyncReceiver from 452 // within this 453 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { 454 t = Utils.getCompletionCause(t); 455 try { 456 if (t == null) { 457 if (debug.on()) debug.log("Finished reading body: " + s); 458 assert s == State.READING_BODY; 459 } 460 if (t != null) { 461 subscriber.onError(t); 462 cf.completeExceptionally(t); 463 } 464 } catch (Throwable x) { 465 // not supposed to happen 466 asyncReceiver.onReadError(x); 467 } finally { 468 // we're done: release the ref count for 469 // the current operation. 470 refCountTracker.tryRelease(); 471 } 472 }); 473 connection.addTrailingOperation(trailingOp); 474 } catch (Throwable t) { 475 if (debug.on()) debug.log("Failed reading body: " + t); 476 try { 477 subscriber.onError(t); 478 cf.completeExceptionally(t); 479 } finally { 480 asyncReceiver.onReadError(t); 481 } 482 } finally { 483 connection.client().unreference(); 484 } 485 }); 486 try { 487 p.getBody().whenComplete((U u, Throwable t) -> { 488 if (t == null) 489 cf.complete(u); 490 else 491 cf.completeExceptionally(t); 492 }); 493 } catch (Throwable t) { 494 cf.completeExceptionally(t); 495 asyncReceiver.setRetryOnError(false); 496 asyncReceiver.onReadError(t); 497 } 498 499 return cf.whenComplete((s,t) -> { 500 if (t != null) { 501 // If an exception occurred, release the 502 // ref count for the current operation, as 503 // it may never be triggered otherwise 504 // (BodySubscriber ofInputStream) 505 // If there was no exception then the 506 // ref count will be/have been released when 507 // the last byte of the response is/was received 508 refCountTracker.tryRelease(); 509 } 510 }); 511 } 512 513 514 private void onFinished() { 515 asyncReceiver.clear(); 516 if (return2Cache) { 517 Log.logTrace("Attempting to return connection to the pool: {0}", connection); 518 // TODO: need to do something here? 519 // connection.setAsyncCallbacks(null, null, null); 520 521 // don't return the connection to the cache if EOF happened. 522 if (debug.on()) 523 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool"); 524 connection.closeOrReturnToCache(eof == null ? headers : null); 525 } 526 } 527 528 HttpHeaders responseHeaders() { 529 return headers; 530 } 531 532 int responseCode() { 533 return responseCode; 534 } 535 536 // ================ Support for plugging into Http1Receiver ================= 537 // ============================================================================ 538 539 // Callback: Error receiver: Consumer of Throwable. 540 void onReadError(Throwable t) { 541 Log.logError(t); 542 Receiver<?> receiver = receiver(readProgress); 543 if (t instanceof EOFException) { 544 debug.log(Level.DEBUG, "onReadError: received EOF"); 545 eof = (EOFException) t; 546 } 547 CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); 548 debug.log(Level.DEBUG, () -> "onReadError: cf is " 549 + (cf == null ? "null" 550 : (cf.isDone() ? "already completed" 551 : "not yet completed"))); 552 if (cf != null) { 553 cf.completeExceptionally(t); 554 } else { 555 debug.log(Level.DEBUG, "onReadError", t); 556 } 557 debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); 558 connection.close(); 559 } 560 561 // ======================================================================== 562 563 private State advance(State previous) { 564 assert readProgress == previous; 565 switch(previous) { 566 case READING_HEADERS: 567 asyncReceiver.unsubscribe(headersReader); 568 return readProgress = State.READING_BODY; 569 case READING_BODY: 570 asyncReceiver.unsubscribe(bodyReader); 571 return readProgress = State.DONE; 572 default: 573 throw new InternalError("can't advance from " + previous); 574 } 575 } 576 577 Receiver<?> receiver(State state) { 578 switch(state) { 579 case READING_HEADERS: return headersReader; 580 case READING_BODY: return bodyReader; 581 default: return null; 582 } 583 584 } 585 586 static abstract class Receiver<T> 587 implements Http1AsyncReceiver.Http1AsyncDelegate { 588 abstract void start(T parser); 589 abstract CompletableFuture<State> completion(); 590 // accepts a buffer from upstream. 591 // this should be implemented as a simple call to 592 // accept(ref, parser, cf) 593 public abstract boolean tryAsyncReceive(ByteBuffer buffer); 594 public abstract void onReadError(Throwable t); 595 // handle a byte buffer received from upstream. 596 // this method should set the value of Http1Response.buffer 597 // to ref.get() before beginning parsing. 598 abstract void handle(ByteBuffer buf, T parser, 599 CompletableFuture<State> cf); 600 // resets this objects state so that it can be reused later on 601 // typically puts the reference to parser and completion to null 602 abstract void reset(); 603 604 // accepts a byte buffer received from upstream 605 // returns true if the buffer is fully parsed and more data can 606 // be accepted, false otherwise. 607 final boolean accept(ByteBuffer buf, T parser, 608 CompletableFuture<State> cf) { 609 if (cf == null || parser == null || cf.isDone()) return false; 610 handle(buf, parser, cf); 611 return !cf.isDone(); 612 } 613 public abstract void onSubscribe(AbstractSubscription s); 614 public abstract AbstractSubscription subscription(); 615 616 } 617 618 // Invoked with each new ByteBuffer when reading headers... 619 final class HeadersReader extends Receiver<Http1HeaderParser> { 620 final Consumer<State> onComplete; 621 volatile Http1HeaderParser parser; 622 volatile CompletableFuture<State> cf; 623 volatile long count; // bytes parsed (for debug) 624 volatile AbstractSubscription subscription; 625 626 HeadersReader(Consumer<State> onComplete) { 627 this.onComplete = onComplete; 628 } 629 630 @Override 631 public AbstractSubscription subscription() { 632 return subscription; 633 } 634 635 @Override 636 public void onSubscribe(AbstractSubscription s) { 637 this.subscription = s; 638 s.request(1); 639 } 640 641 @Override 642 void reset() { 643 cf = null; 644 parser = null; 645 count = 0; 646 subscription = null; 647 } 648 649 // Revisit: do we need to support restarting? 650 @Override 651 final void start(Http1HeaderParser hp) { 652 count = 0; 653 cf = new MinimalFuture<>(); 654 parser = hp; 655 } 656 657 @Override 658 CompletableFuture<State> completion() { 659 return cf; 660 } 661 662 @Override 663 public final boolean tryAsyncReceive(ByteBuffer ref) { 664 boolean hasDemand = subscription.demand().tryDecrement(); 665 assert hasDemand; 666 boolean needsMore = accept(ref, parser, cf); 667 if (needsMore) subscription.request(1); 668 return needsMore; 669 } 670 671 @Override 672 public final void onReadError(Throwable t) { 673 t = wrapWithExtraDetail(t, parser::currentStateMessage); 674 Http1Response.this.onReadError(t); 675 } 676 677 @Override 678 final void handle(ByteBuffer b, 679 Http1HeaderParser parser, 680 CompletableFuture<State> cf) { 681 assert cf != null : "parsing not started"; 682 assert parser != null : "no parser"; 683 try { 684 count += b.remaining(); 685 if (debug.on()) 686 debug.log("Sending " + b.remaining() + "/" + b.capacity() 687 + " bytes to header parser"); 688 if (parser.parse(b)) { 689 count -= b.remaining(); 690 if (debug.on()) 691 debug.log("Parsing headers completed. bytes=" + count); 692 onComplete.accept(State.READING_HEADERS); 693 cf.complete(State.READING_HEADERS); 694 } 695 } catch (Throwable t) { 696 if (debug.on()) 697 debug.log("Header parser failed to handle buffer: " + t); 698 cf.completeExceptionally(t); 699 } 700 } 701 702 @Override 703 public void close(Throwable error) { 704 // if there's no error nothing to do: the cf should/will 705 // be completed. 706 if (error != null) { 707 CompletableFuture<State> cf = this.cf; 708 if (cf != null) { 709 if (debug.on()) 710 debug.log("close: completing header parser CF with " + error); 711 cf.completeExceptionally(error); 712 } 713 } 714 } 715 } 716 717 // Invoked with each new ByteBuffer when reading bodies... 718 final class BodyReader extends Receiver<BodyParser> { 719 final Consumer<State> onComplete; 720 volatile BodyParser parser; 721 volatile CompletableFuture<State> cf; 722 volatile AbstractSubscription subscription; 723 BodyReader(Consumer<State> onComplete) { 724 this.onComplete = onComplete; 725 } 726 727 @Override 728 void reset() { 729 parser = null; 730 cf = null; 731 subscription = null; 732 } 733 734 // Revisit: do we need to support restarting? 735 @Override 736 final void start(BodyParser parser) { 737 cf = new MinimalFuture<>(); 738 this.parser = parser; 739 } 740 741 @Override 742 CompletableFuture<State> completion() { 743 return cf; 744 } 745 746 @Override 747 public final boolean tryAsyncReceive(ByteBuffer b) { 748 return accept(b, parser, cf); 749 } 750 751 @Override 752 public final void onReadError(Throwable t) { 753 if (t instanceof EOFException && bodyParser != null && 754 bodyParser instanceof UnknownLengthBodyParser) { 755 ((UnknownLengthBodyParser)bodyParser).complete(); 756 return; 757 } 758 t = wrapWithExtraDetail(t, parser::currentStateMessage); 759 Http1Response.this.onReadError(t); 760 } 761 762 @Override 763 public AbstractSubscription subscription() { 764 return subscription; 765 } 766 767 @Override 768 public void onSubscribe(AbstractSubscription s) { 769 this.subscription = s; 770 try { 771 parser.onSubscribe(s); 772 } catch (Throwable t) { 773 cf.completeExceptionally(t); 774 throw t; 775 } 776 } 777 778 @Override 779 final void handle(ByteBuffer b, 780 BodyParser parser, 781 CompletableFuture<State> cf) { 782 assert cf != null : "parsing not started"; 783 assert parser != null : "no parser"; 784 try { 785 if (debug.on()) 786 debug.log("Sending " + b.remaining() + "/" + b.capacity() 787 + " bytes to body parser"); 788 parser.accept(b); 789 } catch (Throwable t) { 790 if (debug.on()) 791 debug.log("Body parser failed to handle buffer: " + t); 792 if (!cf.isDone()) { 793 cf.completeExceptionally(t); 794 } 795 } 796 } 797 798 final void onComplete(Throwable closedExceptionally) { 799 if (cf.isDone()) return; 800 if (closedExceptionally != null) { 801 cf.completeExceptionally(closedExceptionally); 802 } else { 803 onComplete.accept(State.READING_BODY); 804 cf.complete(State.READING_BODY); 805 } 806 } 807 808 @Override 809 public final void close(Throwable error) { 810 CompletableFuture<State> cf = this.cf; 811 if (cf != null && !cf.isDone()) { 812 // we want to make sure dependent actions are triggered 813 // in order to make sure the client reference count 814 // is decremented 815 if (error != null) { 816 if (debug.on()) 817 debug.log("close: completing body parser CF with " + error); 818 cf.completeExceptionally(error); 819 } else { 820 if (debug.on()) 821 debug.log("close: completing body parser CF"); 822 cf.complete(State.READING_BODY); 823 } 824 } 825 } 826 827 @Override 828 public String toString() { 829 return super.toString() + "/parser=" + String.valueOf(parser); 830 } 831 } 832 }