1 /*
   2  * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.EOFException;
  29 import java.lang.System.Logger.Level;
  30 import java.nio.ByteBuffer;
  31 import java.util.List;
  32 import java.util.concurrent.CompletableFuture;
  33 import java.util.concurrent.CompletionStage;
  34 import java.util.concurrent.Executor;
  35 import java.util.concurrent.Flow;
  36 import java.util.concurrent.atomic.AtomicBoolean;
  37 import java.util.concurrent.atomic.AtomicLong;
  38 import java.util.function.Consumer;
  39 import java.util.function.Function;
  40 import java.net.http.HttpHeaders;
  41 import java.net.http.HttpResponse;
  42 import jdk.internal.net.http.ResponseContent.BodyParser;
  43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser;
  44 import jdk.internal.net.http.common.Log;
  45 import jdk.internal.net.http.common.Logger;
  46 import jdk.internal.net.http.common.MinimalFuture;
  47 import jdk.internal.net.http.common.Utils;
  48 import static java.net.http.HttpClient.Version.HTTP_1_1;
  49 import static java.net.http.HttpResponse.BodySubscribers.discarding;
  50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
  51 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED;
  52 
  53 /**
  54  * Handles a HTTP/1.1 response (headers + body).
  55  * There can be more than one of these per Http exchange.
  56  */
  57 class Http1Response<T> {
  58 
  59     private volatile ResponseContent content;
  60     private final HttpRequestImpl request;
  61     private Response response;
  62     private final HttpConnection connection;
  63     private HttpHeaders headers;
  64     private int responseCode;
  65     private final Http1Exchange<T> exchange;
  66     private boolean return2Cache; // return connection to cache when finished
  67     private final HeadersReader headersReader; // used to read the headers
  68     private final BodyReader bodyReader; // used to read the body
  69     private final Http1AsyncReceiver asyncReceiver;
  70     private volatile EOFException eof;
  71     private volatile BodyParser bodyParser;
  72     // max number of bytes of (fixed length) body to ignore on redirect
  73     private final static int MAX_IGNORE = 1024;
  74 
  75     // Revisit: can we get rid of this?
  76     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
  77     private volatile State readProgress = State.INITIAL;
  78 
  79     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  80     final static AtomicLong responseCount = new AtomicLong();
  81     final long id = responseCount.incrementAndGet();
  82     private Http1HeaderParser hd;
  83 
  84     Http1Response(HttpConnection conn,
  85                   Http1Exchange<T> exchange,
  86                   Http1AsyncReceiver asyncReceiver) {
  87         this.readProgress = State.INITIAL;
  88         this.request = exchange.request();
  89         this.exchange = exchange;
  90         this.connection = conn;
  91         this.asyncReceiver = asyncReceiver;
  92         headersReader = new HeadersReader(this::advance);
  93         bodyReader = new BodyReader(this::advance);
  94 
  95         hd = new Http1HeaderParser();
  96         readProgress = State.READING_HEADERS;
  97         headersReader.start(hd);
  98         asyncReceiver.subscribe(headersReader);
  99     }
 100 
 101     String dbgTag;
 102     private String dbgString() {
 103         String dbg = dbgTag;
 104         if (dbg == null) {
 105             String cdbg = connection.dbgTag;
 106             if (cdbg != null) {
 107                 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";
 108             } else {
 109                 dbg = "Http1Response(id=" + id + ")";
 110             }
 111         }
 112         return dbg;
 113     }
 114 
 115     // The ClientRefCountTracker is used to track the state
 116     // of a pending operation. Altough there usually is a single
 117     // point where the operation starts, it may terminate at
 118     // different places.
 119     private final class ClientRefCountTracker {
 120         final HttpClientImpl client = connection.client();
 121         // state & 0x01 != 0 => acquire called
 122         // state & 0x02 != 0 => tryRelease called
 123         byte state;
 124 
 125         public synchronized void acquire() {
 126             if (state == 0) {
 127                 // increment the reference count on the HttpClientImpl
 128                 // to prevent the SelectorManager thread from exiting
 129                 // until our operation is complete.
 130                 if (debug.on())
 131                     debug.log("Operation started: incrementing ref count for %s", client);
 132                 client.reference();
 133                 state = 0x01;
 134             } else {
 135                 if (debug.on())
 136                     debug.log("Operation ref count for %s is already %s",
 137                               client, ((state & 0x2) == 0x2) ? "released." : "incremented!" );
 138                 assert (state & 0x01) == 0 : "reference count already incremented";
 139             }
 140         }
 141 
 142         public synchronized void tryRelease() {
 143             if (state == 0x01) {
 144                 // decrement the reference count on the HttpClientImpl
 145                 // to allow the SelectorManager thread to exit if no
 146                 // other operation is pending and the facade is no
 147                 // longer referenced.
 148                 if (debug.on())
 149                     debug.log("Operation finished: decrementing ref count for %s", client);
 150                 client.unreference();
 151             } else if (state == 0) {
 152                 if (debug.on())
 153                     debug.log("Operation finished: releasing ref count for %s", client);
 154             } else if ((state & 0x02) == 0x02) {
 155                 if (debug.on())
 156                     debug.log("ref count for %s already released", client);
 157             }
 158             state |= 0x02;
 159         }
 160     }
 161 
 162     private volatile boolean firstTimeAround = true;
 163 
 164     public CompletableFuture<Response> readHeadersAsync(Executor executor) {
 165         if (debug.on())
 166             debug.log("Reading Headers: (remaining: "
 167                       + asyncReceiver.remaining() +") "  + readProgress);
 168 
 169         if (firstTimeAround) {
 170             if (debug.on()) debug.log("First time around");
 171             firstTimeAround = false;
 172         } else {
 173             // with expect continue we will resume reading headers + body.
 174             asyncReceiver.unsubscribe(bodyReader);
 175             bodyReader.reset();
 176 
 177             hd = new Http1HeaderParser();
 178             readProgress = State.READING_HEADERS;
 179             headersReader.reset();
 180             headersReader.start(hd);
 181             asyncReceiver.subscribe(headersReader);
 182         }
 183 
 184         CompletableFuture<State> cf = headersReader.completion();
 185         assert cf != null : "parsing not started";
 186         if (debug.on()) {
 187             debug.log("headersReader is %s",
 188                     cf == null ? "not yet started"
 189                             : cf.isDone() ? "already completed"
 190                             : "not yet completed");
 191         }
 192 
 193         Function<State, Response> lambda = (State completed) -> {
 194                 assert completed == State.READING_HEADERS;
 195                 if (debug.on())
 196                     debug.log("Reading Headers: creating Response object;"
 197                               + " state is now " + readProgress);
 198                 asyncReceiver.unsubscribe(headersReader);
 199                 responseCode = hd.responseCode();
 200                 headers = hd.headers();
 201 
 202                 response = new Response(request,
 203                                         exchange.getExchange(),
 204                                         headers,
 205                                         connection,
 206                                         responseCode,
 207                                         HTTP_1_1);
 208 
 209                 if (Log.headers()) {
 210                     StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 211                     Log.dumpHeaders(sb, "    ", headers);
 212                     Log.logHeaders(sb.toString());
 213                 }
 214 
 215                 return response;
 216             };
 217 
 218         if (executor != null) {
 219             return cf.thenApplyAsync(lambda, executor);
 220         } else {
 221             return cf.thenApply(lambda);
 222         }
 223     }
 224 
 225     private boolean finished;
 226 
 227     synchronized void completed() {
 228         finished = true;
 229     }
 230 
 231     synchronized boolean finished() {
 232         return finished;
 233     }
 234 
 235     /**
 236      * Return known fixed content length or -1 if chunked, or -2 if no content-length
 237      * information in which case, connection termination delimits the response body
 238      */
 239     long fixupContentLen(long clen) {
 240         if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) {
 241             return 0L;
 242         }
 243         if (clen == -1L) {
 244             if (headers.firstValue("Transfer-encoding").orElse("")
 245                        .equalsIgnoreCase("chunked")) {
 246                 return -1L;
 247             }
 248             if (responseCode == 101) {
 249                 // this is a h2c or websocket upgrade, contentlength must be zero
 250                 return 0L;
 251             }
 252             return -2L;
 253         }
 254         return clen;
 255     }
 256 
 257     /**
 258      * Read up to MAX_IGNORE bytes discarding
 259      */
 260     public CompletableFuture<Void> ignoreBody(Executor executor) {
 261         int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 262         if (clen == -1 || clen > MAX_IGNORE) {
 263             connection.close();
 264             return MinimalFuture.completedFuture(null); // not treating as error
 265         } else {
 266             return readBody(discarding(), true, executor);
 267         }
 268     }
 269 
 270     // Used for those response codes that have no body associated
 271     public void nullBody(HttpResponse<T> resp, Throwable t) {
 272         if (t != null) connection.close();
 273         else {
 274             return2Cache = !request.isWebSocket();
 275             onFinished();
 276         }
 277     }
 278 
 279     static final Flow.Subscription NOP = new Flow.Subscription() {
 280         @Override
 281         public void request(long n) { }
 282         public void cancel() { }
 283     };
 284 
 285     /**
 286      * The Http1AsyncReceiver ensures that all calls to
 287      * the subscriber, including onSubscribe, occur sequentially.
 288      * There could however be some race conditions that could happen
 289      * in case of unexpected errors thrown at unexpected places, which
 290      * may cause onError to be called multiple times.
 291      * The Http1BodySubscriber will ensure that the user subscriber
 292      * is actually completed only once - and only after it is
 293      * subscribed.
 294      * @param <U> The type of response.
 295      */
 296     final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> {
 297         final HttpResponse.BodySubscriber<U> userSubscriber;
 298         final AtomicBoolean completed = new AtomicBoolean();
 299         volatile Throwable withError;
 300         volatile boolean subscribed;
 301         Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
 302             this.userSubscriber = userSubscriber;
 303         }
 304 
 305         // propagate the error to the user subscriber, even if not
 306         // subscribed yet.
 307         private void propagateError(Throwable t) {
 308             assert t != null;
 309             try {
 310                 // if unsubscribed at this point, it will not
 311                 // get subscribed later - so do it now and
 312                 // propagate the error
 313                 if (subscribed == false) {
 314                     subscribed = true;
 315                     userSubscriber.onSubscribe(NOP);
 316                 }
 317             } finally  {
 318                 // if onError throws then there is nothing to do
 319                 // here: let the caller deal with it by logging
 320                 // and closing the connection.
 321                 userSubscriber.onError(t);
 322             }
 323         }
 324 
 325         // complete the subscriber, either normally or exceptionally
 326         // ensure that the subscriber is completed only once.
 327         private void complete(Throwable t) {
 328             if (completed.compareAndSet(false, true)) {
 329                 t  = withError = Utils.getCompletionCause(t);
 330                 if (t == null) {
 331                     assert subscribed;
 332                     try {
 333                         userSubscriber.onComplete();
 334                     } catch (Throwable x) {
 335                         // Simply propagate the error by calling
 336                         // onError on the user subscriber, and let the
 337                         // connection be reused since we should have received
 338                         // and parsed all the bytes when we reach here.
 339                         // If onError throws in turn, then we will simply
 340                         // let that new exception flow up to the caller
 341                         // and let it deal with it.
 342                         // (i.e: log and close the connection)
 343                         // Note that rethrowing here could introduce a
 344                         // race that might cause the next send() operation to
 345                         // fail as the connection has already been put back
 346                         // into the cache when we reach here.
 347                         propagateError(t = withError = Utils.getCompletionCause(x));
 348                     }
 349                 } else {
 350                     propagateError(t);
 351                 }
 352             }
 353         }
 354 
 355         @Override
 356         public CompletionStage<U> getBody() {
 357             return userSubscriber.getBody();
 358         }
 359         @Override
 360         public void onSubscribe(Flow.Subscription subscription) {
 361             if (!subscribed) {
 362                 subscribed = true;
 363                 userSubscriber.onSubscribe(subscription);
 364             } else {
 365                 // could be already subscribed and completed
 366                 // if an unexpected error occurred before the actual
 367                 // subscription - though that's not supposed
 368                 // happen.
 369                 assert completed.get();
 370             }
 371         }
 372         @Override
 373         public void onNext(List<ByteBuffer> item) {
 374             assert !completed.get();
 375             userSubscriber.onNext(item);
 376         }
 377         @Override
 378         public void onError(Throwable throwable) {
 379             complete(throwable);
 380         }
 381         @Override
 382         public void onComplete() {
 383             complete(null);
 384         }
 385     }
 386 
 387     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
 388                                          boolean return2Cache,
 389                                          Executor executor) {
 390         this.return2Cache = return2Cache;
 391         final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
 392 
 393         final CompletableFuture<U> cf = new MinimalFuture<>();
 394 
 395         long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);
 396         final long clen = fixupContentLen(clen0);
 397 
 398         // expect-continue reads headers and body twice.
 399         // if we reach here, we must reset the headersReader state.
 400         asyncReceiver.unsubscribe(headersReader);
 401         headersReader.reset();
 402         ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
 403 
 404         // We need to keep hold on the client facade until the
 405         // tracker has been incremented.
 406         connection.client().reference();
 407         executor.execute(() -> {
 408             try {
 409                 content = new ResponseContent(
 410                         connection, clen, headers, subscriber,
 411                         this::onFinished
 412                 );
 413                 if (cf.isCompletedExceptionally()) {
 414                     // if an error occurs during subscription
 415                     connection.close();
 416                     return;
 417                 }
 418                 // increment the reference count on the HttpClientImpl
 419                 // to prevent the SelectorManager thread from exiting until
 420                 // the body is fully read.
 421                 refCountTracker.acquire();
 422                 bodyParser = content.getBodyParser(
 423                     (t) -> {
 424                         try {
 425                             if (t != null) {
 426                                 try {
 427                                     subscriber.onError(t);
 428                                 } finally {
 429                                     cf.completeExceptionally(t);
 430                                 }
 431                             }
 432                         } finally {
 433                             bodyReader.onComplete(t);
 434                             if (t != null) {
 435                                 connection.close();
 436                             }
 437                         }
 438                     });
 439                 bodyReader.start(bodyParser);
 440                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
 441                 asyncReceiver.subscribe(bodyReader);
 442                 assert bodyReaderCF != null : "parsing not started";
 443                 // Make sure to keep a reference to asyncReceiver from
 444                 // within this
 445                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
 446                     t = Utils.getCompletionCause(t);
 447                     try {
 448                         if (t == null) {
 449                             if (debug.on()) debug.log("Finished reading body: " + s);
 450                             assert s == State.READING_BODY;
 451                         }
 452                         if (t != null) {
 453                             subscriber.onError(t);
 454                             cf.completeExceptionally(t);
 455                         }
 456                     } catch (Throwable x) {
 457                         // not supposed to happen
 458                         asyncReceiver.onReadError(x);
 459                     } finally {
 460                         // we're done: release the ref count for
 461                         // the current operation.
 462                         refCountTracker.tryRelease();
 463                     }
 464                 });
 465                 connection.addTrailingOperation(trailingOp);
 466             } catch (Throwable t) {
 467                if (debug.on()) debug.log("Failed reading body: " + t);
 468                 try {
 469                     subscriber.onError(t);
 470                     cf.completeExceptionally(t);
 471                 } finally {
 472                     asyncReceiver.onReadError(t);
 473                 }
 474             } finally {
 475                 connection.client().unreference();
 476             }
 477         });
 478         try {
 479             p.getBody().whenComplete((U u, Throwable t) -> {
 480                 if (t == null)
 481                     cf.complete(u);
 482                 else
 483                     cf.completeExceptionally(t);
 484             });
 485         } catch (Throwable t) {
 486             cf.completeExceptionally(t);
 487             asyncReceiver.setRetryOnError(false);
 488             asyncReceiver.onReadError(t);
 489         }
 490 
 491         return cf.whenComplete((s,t) -> {
 492             if (t != null) {
 493                 // If an exception occurred, release the
 494                 // ref count for the current operation, as
 495                 // it may never be triggered otherwise
 496                 // (BodySubscriber ofInputStream)
 497                 // If there was no exception then the
 498                 // ref count will be/have been released when
 499                 // the last byte of the response is/was received
 500                 refCountTracker.tryRelease();
 501             }
 502         });
 503     }
 504 
 505 
 506     private void onFinished() {
 507         asyncReceiver.clear();
 508         if (return2Cache) {
 509             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
 510             // TODO: need to do something here?
 511             // connection.setAsyncCallbacks(null, null, null);
 512 
 513             // don't return the connection to the cache if EOF happened.
 514             if (debug.on())
 515                 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool");
 516             connection.closeOrReturnToCache(eof == null ? headers : null);
 517         }
 518     }
 519 
 520     HttpHeaders responseHeaders() {
 521         return headers;
 522     }
 523 
 524     int responseCode() {
 525         return responseCode;
 526     }
 527 
 528 // ================ Support for plugging into Http1Receiver   =================
 529 // ============================================================================
 530 
 531     // Callback: Error receiver: Consumer of Throwable.
 532     void onReadError(Throwable t) {
 533         Log.logError(t);
 534         Receiver<?> receiver = receiver(readProgress);
 535         if (t instanceof EOFException) {
 536             debug.log(Level.DEBUG, "onReadError: received EOF");
 537             eof = (EOFException) t;
 538         }
 539         CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
 540         debug.log(Level.DEBUG, () -> "onReadError: cf is "
 541                 + (cf == null  ? "null"
 542                 : (cf.isDone() ? "already completed"
 543                                : "not yet completed")));
 544         if (cf != null) {
 545             cf.completeExceptionally(t);
 546         } else {
 547             debug.log(Level.DEBUG, "onReadError", t);
 548         }
 549         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
 550         connection.close();
 551     }
 552 
 553     // ========================================================================
 554 
 555     private State advance(State previous) {
 556         assert readProgress == previous;
 557         switch(previous) {
 558             case READING_HEADERS:
 559                 asyncReceiver.unsubscribe(headersReader);
 560                 return readProgress = State.READING_BODY;
 561             case READING_BODY:
 562                 asyncReceiver.unsubscribe(bodyReader);
 563                 return readProgress = State.DONE;
 564             default:
 565                 throw new InternalError("can't advance from " + previous);
 566         }
 567     }
 568 
 569     Receiver<?> receiver(State state) {
 570         switch(state) {
 571             case READING_HEADERS: return headersReader;
 572             case READING_BODY: return bodyReader;
 573             default: return null;
 574         }
 575 
 576     }
 577 
 578     static abstract class Receiver<T>
 579             implements Http1AsyncReceiver.Http1AsyncDelegate {
 580         abstract void start(T parser);
 581         abstract CompletableFuture<State> completion();
 582         // accepts a buffer from upstream.
 583         // this should be implemented as a simple call to
 584         // accept(ref, parser, cf)
 585         public abstract boolean tryAsyncReceive(ByteBuffer buffer);
 586         public abstract void onReadError(Throwable t);
 587         // handle a byte buffer received from upstream.
 588         // this method should set the value of Http1Response.buffer
 589         // to ref.get() before beginning parsing.
 590         abstract void handle(ByteBuffer buf, T parser,
 591                              CompletableFuture<State> cf);
 592         // resets this objects state so that it can be reused later on
 593         // typically puts the reference to parser and completion to null
 594         abstract void reset();
 595 
 596         // accepts a byte buffer received from upstream
 597         // returns true if the buffer is fully parsed and more data can
 598         // be accepted, false otherwise.
 599         final boolean accept(ByteBuffer buf, T parser,
 600                 CompletableFuture<State> cf) {
 601             if (cf == null || parser == null || cf.isDone()) return false;
 602             handle(buf, parser, cf);
 603             return !cf.isDone();
 604         }
 605         public abstract void onSubscribe(AbstractSubscription s);
 606         public abstract AbstractSubscription subscription();
 607 
 608     }
 609 
 610     // Invoked with each new ByteBuffer when reading headers...
 611     final class HeadersReader extends Receiver<Http1HeaderParser> {
 612         final Consumer<State> onComplete;
 613         volatile Http1HeaderParser parser;
 614         volatile CompletableFuture<State> cf;
 615         volatile long count; // bytes parsed (for debug)
 616         volatile AbstractSubscription subscription;
 617 
 618         HeadersReader(Consumer<State> onComplete) {
 619             this.onComplete = onComplete;
 620         }
 621 
 622         @Override
 623         public AbstractSubscription subscription() {
 624             return subscription;
 625         }
 626 
 627         @Override
 628         public void onSubscribe(AbstractSubscription s) {
 629             this.subscription = s;
 630             s.request(1);
 631         }
 632 
 633         @Override
 634         void reset() {
 635             cf = null;
 636             parser = null;
 637             count = 0;
 638             subscription = null;
 639         }
 640 
 641         // Revisit: do we need to support restarting?
 642         @Override
 643         final void start(Http1HeaderParser hp) {
 644             count = 0;
 645             cf = new MinimalFuture<>();
 646             parser = hp;
 647         }
 648 
 649         @Override
 650         CompletableFuture<State> completion() {
 651             return cf;
 652         }
 653 
 654         @Override
 655         public final boolean tryAsyncReceive(ByteBuffer ref) {
 656             boolean hasDemand = subscription.demand().tryDecrement();
 657             assert hasDemand;
 658             boolean needsMore = accept(ref, parser, cf);
 659             if (needsMore) subscription.request(1);
 660             return needsMore;
 661         }
 662 
 663         @Override
 664         public final void onReadError(Throwable t) {
 665             t = wrapWithExtraDetail(t, parser::currentStateMessage);
 666             Http1Response.this.onReadError(t);
 667         }
 668 
 669         @Override
 670         final void handle(ByteBuffer b,
 671                           Http1HeaderParser parser,
 672                           CompletableFuture<State> cf) {
 673             assert cf != null : "parsing not started";
 674             assert parser != null : "no parser";
 675             try {
 676                 count += b.remaining();
 677                 if (debug.on())
 678                     debug.log("Sending " + b.remaining() + "/" + b.capacity()
 679                               + " bytes to header parser");
 680                 if (parser.parse(b)) {
 681                     count -= b.remaining();
 682                     if (debug.on())
 683                         debug.log("Parsing headers completed. bytes=" + count);
 684                     onComplete.accept(State.READING_HEADERS);
 685                     cf.complete(State.READING_HEADERS);
 686                 }
 687             } catch (Throwable t) {
 688                 if (debug.on())
 689                     debug.log("Header parser failed to handle buffer: " + t);
 690                 cf.completeExceptionally(t);
 691             }
 692         }
 693 
 694         @Override
 695         public void close(Throwable error) {
 696             // if there's no error nothing to do: the cf should/will
 697             // be completed.
 698             if (error != null) {
 699                 CompletableFuture<State> cf = this.cf;
 700                 if (cf != null) {
 701                     if (debug.on())
 702                         debug.log("close: completing header parser CF with " + error);
 703                     cf.completeExceptionally(error);
 704                 }
 705             }
 706         }
 707     }
 708 
 709     // Invoked with each new ByteBuffer when reading bodies...
 710     final class BodyReader extends Receiver<BodyParser> {
 711         final Consumer<State> onComplete;
 712         volatile BodyParser parser;
 713         volatile CompletableFuture<State> cf;
 714         volatile AbstractSubscription subscription;
 715         BodyReader(Consumer<State> onComplete) {
 716             this.onComplete = onComplete;
 717         }
 718 
 719         @Override
 720         void reset() {
 721             parser = null;
 722             cf = null;
 723             subscription = null;
 724         }
 725 
 726         // Revisit: do we need to support restarting?
 727         @Override
 728         final void start(BodyParser parser) {
 729             cf = new MinimalFuture<>();
 730             this.parser = parser;
 731         }
 732 
 733         @Override
 734         CompletableFuture<State> completion() {
 735             return cf;
 736         }
 737 
 738         @Override
 739         public final boolean tryAsyncReceive(ByteBuffer b) {
 740             return accept(b, parser, cf);
 741         }
 742 
 743         @Override
 744         public final void onReadError(Throwable t) {
 745             if (t instanceof EOFException && bodyParser != null &&
 746                     bodyParser instanceof UnknownLengthBodyParser) {
 747                 ((UnknownLengthBodyParser)bodyParser).complete();
 748                 return;
 749             }
 750             t = wrapWithExtraDetail(t, parser::currentStateMessage);
 751             Http1Response.this.onReadError(t);
 752         }
 753 
 754         @Override
 755         public AbstractSubscription subscription() {
 756             return subscription;
 757         }
 758 
 759         @Override
 760         public void onSubscribe(AbstractSubscription s) {
 761             this.subscription = s;
 762             try {
 763                 parser.onSubscribe(s);
 764             } catch (Throwable t) {
 765                 cf.completeExceptionally(t);
 766                 throw t;
 767             }
 768         }
 769 
 770         @Override
 771         final void handle(ByteBuffer b,
 772                           BodyParser parser,
 773                           CompletableFuture<State> cf) {
 774             assert cf != null : "parsing not started";
 775             assert parser != null : "no parser";
 776             try {
 777                 if (debug.on())
 778                     debug.log("Sending " + b.remaining() + "/" + b.capacity()
 779                               + " bytes to body parser");
 780                 parser.accept(b);
 781             } catch (Throwable t) {
 782                 if (debug.on())
 783                     debug.log("Body parser failed to handle buffer: " + t);
 784                 if (!cf.isDone()) {
 785                     cf.completeExceptionally(t);
 786                 }
 787             }
 788         }
 789 
 790         final void onComplete(Throwable closedExceptionally) {
 791             if (cf.isDone()) return;
 792             if (closedExceptionally != null) {
 793                 cf.completeExceptionally(closedExceptionally);
 794             } else {
 795                 onComplete.accept(State.READING_BODY);
 796                 cf.complete(State.READING_BODY);
 797             }
 798         }
 799 
 800         @Override
 801         public final void close(Throwable error) {
 802             CompletableFuture<State> cf = this.cf;
 803             if (cf != null && !cf.isDone()) {
 804                 // we want to make sure dependent actions are triggered
 805                 // in order to make sure the client reference count
 806                 // is decremented
 807                 if (error != null) {
 808                     if (debug.on())
 809                         debug.log("close: completing body parser CF with " + error);
 810                     cf.completeExceptionally(error);
 811                 } else {
 812                     if (debug.on())
 813                         debug.log("close: completing body parser CF");
 814                     cf.complete(State.READING_BODY);
 815                 }
 816             }
 817         }
 818 
 819         @Override
 820         public String toString() {
 821             return super.toString() + "/parser=" + String.valueOf(parser);
 822         }
 823     }
 824 }