1 /*
   2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.EOFException;
  29 import java.lang.System.Logger.Level;
  30 import java.nio.ByteBuffer;
  31 import java.util.List;
  32 import java.util.concurrent.CompletableFuture;
  33 import java.util.concurrent.CompletionStage;
  34 import java.util.concurrent.Executor;
  35 import java.util.concurrent.Flow;
  36 import java.util.concurrent.atomic.AtomicBoolean;
  37 import java.util.concurrent.atomic.AtomicLong;
  38 import java.util.function.Consumer;
  39 import java.util.function.Function;
  40 import java.net.http.HttpHeaders;
  41 import java.net.http.HttpResponse;
  42 import jdk.internal.net.http.ResponseContent.BodyParser;
  43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser;
  44 import jdk.internal.net.http.common.Log;
  45 import jdk.internal.net.http.common.Logger;
  46 import jdk.internal.net.http.common.MinimalFuture;
  47 import jdk.internal.net.http.common.Utils;
  48 import static java.net.http.HttpClient.Version.HTTP_1_1;
  49 import static java.net.http.HttpResponse.BodySubscribers.discarding;
  50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
  51 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED;
  52 
  53 /**
  54  * Handles a HTTP/1.1 response (headers + body).
  55  * There can be more than one of these per Http exchange.
  56  */
  57 class Http1Response<T> {
  58 
  59     private volatile ResponseContent content;
  60     private final HttpRequestImpl request;
  61     private Response response;
  62     private final HttpConnection connection;
  63     private HttpHeaders headers;
  64     private int responseCode;
  65     private final Http1Exchange<T> exchange;
  66     private boolean return2Cache; // return connection to cache when finished
  67     private final HeadersReader headersReader; // used to read the headers
  68     private final BodyReader bodyReader; // used to read the body
  69     private final Http1AsyncReceiver asyncReceiver;
  70     private volatile EOFException eof;
  71     private volatile BodyParser bodyParser;
  72     // max number of bytes of (fixed length) body to ignore on redirect
  73     private final static int MAX_IGNORE = 1024;
  74 
  75     // Revisit: can we get rid of this?
  76     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
  77     private volatile State readProgress = State.INITIAL;
  78 
  79     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  80     final static AtomicLong responseCount = new AtomicLong();
  81     final long id = responseCount.incrementAndGet();
  82     private Http1HeaderParser hd;
  83 
  84     Http1Response(HttpConnection conn,
  85                   Http1Exchange<T> exchange,
  86                   Http1AsyncReceiver asyncReceiver) {
  87         this.readProgress = State.INITIAL;
  88         this.request = exchange.request();
  89         this.exchange = exchange;
  90         this.connection = conn;
  91         this.asyncReceiver = asyncReceiver;
  92         headersReader = new HeadersReader(this::advance);
  93         bodyReader = new BodyReader(this::advance);
  94 
  95         hd = new Http1HeaderParser();
  96         readProgress = State.READING_HEADERS;
  97         headersReader.start(hd);
  98         asyncReceiver.subscribe(headersReader);
  99     }
 100 
 101     String dbgTag;
 102     private String dbgString() {
 103         String dbg = dbgTag;
 104         if (dbg == null) {
 105             String cdbg = connection.dbgTag;
 106             if (cdbg != null) {
 107                 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";
 108             } else {
 109                 dbg = "Http1Response(id=" + id + ")";
 110             }
 111         }
 112         return dbg;
 113     }
 114 
 115     // The ClientRefCountTracker is used to track the state
 116     // of a pending operation. Altough there usually is a single
 117     // point where the operation starts, it may terminate at
 118     // different places.
 119     private final class ClientRefCountTracker {
 120         final HttpClientImpl client = connection.client();
 121         // state & 0x01 != 0 => acquire called
 122         // state & 0x02 != 0 => tryRelease called
 123         byte state;
 124 
 125         public synchronized void acquire() {
 126             if (state == 0) {
 127                 // increment the reference count on the HttpClientImpl
 128                 // to prevent the SelectorManager thread from exiting
 129                 // until our operation is complete.
 130                 if (debug.on())
 131                     debug.log("Operation started: incrementing ref count for %s", client);
 132                 client.reference();
 133                 state = 0x01;
 134             } else {
 135                 if (debug.on())
 136                     debug.log("Operation ref count for %s is already %s",
 137                               client, ((state & 0x2) == 0x2) ? "released." : "incremented!" );
 138                 assert (state & 0x01) == 0 : "reference count already incremented";
 139             }
 140         }
 141 
 142         public synchronized void tryRelease() {
 143             if (state == 0x01) {
 144                 // decrement the reference count on the HttpClientImpl
 145                 // to allow the SelectorManager thread to exit if no
 146                 // other operation is pending and the facade is no
 147                 // longer referenced.
 148                 if (debug.on())
 149                     debug.log("Operation finished: decrementing ref count for %s", client);
 150                 client.unreference();
 151             } else if (state == 0) {
 152                 if (debug.on())
 153                     debug.log("Operation finished: releasing ref count for %s", client);
 154             } else if ((state & 0x02) == 0x02) {
 155                 if (debug.on())
 156                     debug.log("ref count for %s already released", client);
 157             }
 158             state |= 0x02;
 159         }
 160     }
 161 
 162     private volatile boolean firstTimeAround = true;
 163 
 164     public CompletableFuture<Response> readHeadersAsync(Executor executor) {
 165         if (debug.on())
 166             debug.log("Reading Headers: (remaining: "
 167                       + asyncReceiver.remaining() +") "  + readProgress);
 168 
 169         if (firstTimeAround) {
 170             if (debug.on()) debug.log("First time around");
 171             firstTimeAround = false;
 172         } else {
 173             // with expect continue we will resume reading headers + body.
 174             asyncReceiver.unsubscribe(bodyReader);
 175             bodyReader.reset();
 176 
 177             hd = new Http1HeaderParser();
 178             readProgress = State.READING_HEADERS;
 179             headersReader.reset();
 180             headersReader.start(hd);
 181             asyncReceiver.subscribe(headersReader);
 182         }
 183 
 184         CompletableFuture<State> cf = headersReader.completion();
 185         assert cf != null : "parsing not started";
 186         if (debug.on()) {
 187             debug.log("headersReader is %s",
 188                     cf == null ? "not yet started"
 189                             : cf.isDone() ? "already completed"
 190                             : "not yet completed");
 191         }
 192 
 193         Function<State, Response> lambda = (State completed) -> {
 194                 assert completed == State.READING_HEADERS;
 195                 if (debug.on())
 196                     debug.log("Reading Headers: creating Response object;"
 197                               + " state is now " + readProgress);
 198                 asyncReceiver.unsubscribe(headersReader);
 199                 responseCode = hd.responseCode();
 200                 headers = hd.headers();
 201 
 202                 response = new Response(request,
 203                                         exchange.getExchange(),
 204                                         headers,
 205                                         connection,
 206                                         responseCode,
 207                                         HTTP_1_1);
 208 
 209                 if (Log.headers()) {
 210                     StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 211                     Log.dumpHeaders(sb, "    ", headers);
 212                     Log.logHeaders(sb.toString());
 213                 }
 214 
 215                 return response;
 216             };
 217 
 218         if (executor != null) {
 219             return cf.thenApplyAsync(lambda, executor);
 220         } else {
 221             return cf.thenApply(lambda);
 222         }
 223     }
 224 
 225     private boolean finished;
 226 
 227     synchronized void completed() {
 228         finished = true;
 229     }
 230 
 231     synchronized boolean finished() {
 232         return finished;
 233     }
 234 
 235     /**
 236      * Return known fixed content length or -1 if chunked, or -2 if no content-length
 237      * information in which case, connection termination delimits the response body
 238      */
 239     long fixupContentLen(long clen) {
 240         if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) {
 241             return 0L;
 242         }
 243         if (clen == -1L) {
 244             if (headers.firstValue("Transfer-encoding").orElse("")
 245                        .equalsIgnoreCase("chunked")) {
 246                 return -1L;
 247             }
 248             if (responseCode == 101) {
 249                 // this is a h2c or websocket upgrade, contentlength must be zero
 250                 return 0L;
 251             }
 252             return -2L;
 253         }
 254         return clen;
 255     }
 256 
 257     /**
 258      * Read up to MAX_IGNORE bytes discarding
 259      */
 260     public CompletableFuture<Void> ignoreBody(Executor executor) {
 261         int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 262         if (clen == -1 || clen > MAX_IGNORE) {
 263             connection.close();
 264             return MinimalFuture.completedFuture(null); // not treating as error
 265         } else {
 266             return readBody(discarding(), !request.isWebSocket(), executor);
 267         }
 268     }
 269 
 270     // Used for those response codes that have no body associated
 271     public void nullBody(HttpResponse<T> resp, Throwable t) {
 272         if (t != null) connection.close();
 273         else {
 274             return2Cache = !request.isWebSocket();
 275             onFinished();
 276         }
 277     }
 278 
 279     static final Flow.Subscription NOP = new Flow.Subscription() {
 280         @Override
 281         public void request(long n) { }
 282         public void cancel() { }
 283     };
 284 
 285     /**
 286      * The Http1AsyncReceiver ensures that all calls to
 287      * the subscriber, including onSubscribe, occur sequentially.
 288      * There could however be some race conditions that could happen
 289      * in case of unexpected errors thrown at unexpected places, which
 290      * may cause onError to be called multiple times.
 291      * The Http1BodySubscriber will ensure that the user subscriber
 292      * is actually completed only once - and only after it is
 293      * subscribed.
 294      * @param <U> The type of response.
 295      */
 296     final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> {
 297         final HttpResponse.BodySubscriber<U> userSubscriber;
 298         final AtomicBoolean completed = new AtomicBoolean();
 299         volatile Throwable withError;
 300         volatile boolean subscribed;
 301         Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
 302             this.userSubscriber = userSubscriber;
 303         }
 304 
 305         // propagate the error to the user subscriber, even if not
 306         // subscribed yet.
 307         private void propagateError(Throwable t) {
 308             assert t != null;
 309             try {
 310                 // if unsubscribed at this point, it will not
 311                 // get subscribed later - so do it now and
 312                 // propagate the error
 313                 if (subscribed == false) {
 314                     subscribed = true;
 315                     userSubscriber.onSubscribe(NOP);
 316                 }
 317             } finally  {
 318                 // if onError throws then there is nothing to do
 319                 // here: let the caller deal with it by logging
 320                 // and closing the connection.
 321                 userSubscriber.onError(t);
 322             }
 323         }
 324 
 325         // complete the subscriber, either normally or exceptionally
 326         // ensure that the subscriber is completed only once.
 327         private void complete(Throwable t) {
 328             if (completed.compareAndSet(false, true)) {
 329                 t  = withError = Utils.getCompletionCause(t);
 330                 if (t == null) {
 331                     assert subscribed;
 332                     try {
 333                         userSubscriber.onComplete();
 334                     } catch (Throwable x) {
 335                         // Simply propagate the error by calling
 336                         // onError on the user subscriber, and let the
 337                         // connection be reused since we should have received
 338                         // and parsed all the bytes when we reach here.
 339                         // If onError throws in turn, then we will simply
 340                         // let that new exception flow up to the caller
 341                         // and let it deal with it.
 342                         // (i.e: log and close the connection)
 343                         // Note that rethrowing here could introduce a
 344                         // race that might cause the next send() operation to
 345                         // fail as the connection has already been put back
 346                         // into the cache when we reach here.
 347                         propagateError(t = withError = Utils.getCompletionCause(x));
 348                     }
 349                 } else {
 350                     propagateError(t);
 351                 }
 352             }
 353         }
 354 
 355         @Override
 356         public CompletionStage<U> getBody() {
 357             return userSubscriber.getBody();
 358         }
 359         @Override
 360         public void onSubscribe(Flow.Subscription subscription) {
 361             if (!subscribed) {
 362                 subscribed = true;
 363                 userSubscriber.onSubscribe(subscription);
 364             } else {
 365                 // could be already subscribed and completed
 366                 // if an unexpected error occurred before the actual
 367                 // subscription - though that's not supposed
 368                 // happen.
 369                 assert completed.get();
 370             }
 371         }
 372         @Override
 373         public void onNext(List<ByteBuffer> item) {
 374             assert !completed.get();
 375             userSubscriber.onNext(item);
 376         }
 377         @Override
 378         public void onError(Throwable throwable) {
 379             complete(throwable);
 380         }
 381         @Override
 382         public void onComplete() {
 383             complete(null);
 384         }
 385     }
 386 
 387     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
 388                                          boolean return2Cache,
 389                                          Executor executor) {
 390         if (debug.on()) {
 391             debug.log("readBody: return2Cache: " + return2Cache);
 392             if (request.isWebSocket() && return2Cache && connection != null) {
 393                 debug.log("websocket connection will be returned to cache: "
 394                         + connection.getClass() + "/" + connection );
 395             }
 396         }
 397         assert !return2Cache || !request.isWebSocket();
 398         this.return2Cache = return2Cache;
 399         final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
 400 
 401         final CompletableFuture<U> cf = new MinimalFuture<>();
 402 
 403         long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);
 404         final long clen = fixupContentLen(clen0);
 405 
 406         // expect-continue reads headers and body twice.
 407         // if we reach here, we must reset the headersReader state.
 408         asyncReceiver.unsubscribe(headersReader);
 409         headersReader.reset();
 410         ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
 411 
 412         // We need to keep hold on the client facade until the
 413         // tracker has been incremented.
 414         connection.client().reference();
 415         executor.execute(() -> {
 416             try {
 417                 content = new ResponseContent(
 418                         connection, clen, headers, subscriber,
 419                         this::onFinished
 420                 );
 421                 if (cf.isCompletedExceptionally()) {
 422                     // if an error occurs during subscription
 423                     connection.close();
 424                     return;
 425                 }
 426                 // increment the reference count on the HttpClientImpl
 427                 // to prevent the SelectorManager thread from exiting until
 428                 // the body is fully read.
 429                 refCountTracker.acquire();
 430                 bodyParser = content.getBodyParser(
 431                     (t) -> {
 432                         try {
 433                             if (t != null) {
 434                                 try {
 435                                     subscriber.onError(t);
 436                                 } finally {
 437                                     cf.completeExceptionally(t);
 438                                 }
 439                             }
 440                         } finally {
 441                             bodyReader.onComplete(t);
 442                             if (t != null) {
 443                                 connection.close();
 444                             }
 445                         }
 446                     });
 447                 bodyReader.start(bodyParser);
 448                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
 449                 asyncReceiver.subscribe(bodyReader);
 450                 assert bodyReaderCF != null : "parsing not started";
 451                 // Make sure to keep a reference to asyncReceiver from
 452                 // within this
 453                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
 454                     t = Utils.getCompletionCause(t);
 455                     try {
 456                         if (t == null) {
 457                             if (debug.on()) debug.log("Finished reading body: " + s);
 458                             assert s == State.READING_BODY;
 459                         }
 460                         if (t != null) {
 461                             subscriber.onError(t);
 462                             cf.completeExceptionally(t);
 463                         }
 464                     } catch (Throwable x) {
 465                         // not supposed to happen
 466                         asyncReceiver.onReadError(x);
 467                     } finally {
 468                         // we're done: release the ref count for
 469                         // the current operation.
 470                         refCountTracker.tryRelease();
 471                     }
 472                 });
 473                 connection.addTrailingOperation(trailingOp);
 474             } catch (Throwable t) {
 475                if (debug.on()) debug.log("Failed reading body: " + t);
 476                 try {
 477                     subscriber.onError(t);
 478                     cf.completeExceptionally(t);
 479                 } finally {
 480                     asyncReceiver.onReadError(t);
 481                 }
 482             } finally {
 483                 connection.client().unreference();
 484             }
 485         });
 486         try {
 487             p.getBody().whenComplete((U u, Throwable t) -> {
 488                 if (t == null)
 489                     cf.complete(u);
 490                 else
 491                     cf.completeExceptionally(t);
 492             });
 493         } catch (Throwable t) {
 494             cf.completeExceptionally(t);
 495             asyncReceiver.setRetryOnError(false);
 496             asyncReceiver.onReadError(t);
 497         }
 498 
 499         return cf.whenComplete((s,t) -> {
 500             if (t != null) {
 501                 // If an exception occurred, release the
 502                 // ref count for the current operation, as
 503                 // it may never be triggered otherwise
 504                 // (BodySubscriber ofInputStream)
 505                 // If there was no exception then the
 506                 // ref count will be/have been released when
 507                 // the last byte of the response is/was received
 508                 refCountTracker.tryRelease();
 509             }
 510         });
 511     }
 512 
 513 
 514     private void onFinished() {
 515         asyncReceiver.clear();
 516         if (return2Cache) {
 517             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
 518             // TODO: need to do something here?
 519             // connection.setAsyncCallbacks(null, null, null);
 520 
 521             // don't return the connection to the cache if EOF happened.
 522             if (debug.on())
 523                 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool");
 524             connection.closeOrReturnToCache(eof == null ? headers : null);
 525         }
 526     }
 527 
 528     HttpHeaders responseHeaders() {
 529         return headers;
 530     }
 531 
 532     int responseCode() {
 533         return responseCode;
 534     }
 535 
 536 // ================ Support for plugging into Http1Receiver   =================
 537 // ============================================================================
 538 
 539     // Callback: Error receiver: Consumer of Throwable.
 540     void onReadError(Throwable t) {
 541         Log.logError(t);
 542         Receiver<?> receiver = receiver(readProgress);
 543         if (t instanceof EOFException) {
 544             debug.log(Level.DEBUG, "onReadError: received EOF");
 545             eof = (EOFException) t;
 546         }
 547         CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
 548         debug.log(Level.DEBUG, () -> "onReadError: cf is "
 549                 + (cf == null  ? "null"
 550                 : (cf.isDone() ? "already completed"
 551                                : "not yet completed")));
 552         if (cf != null) {
 553             cf.completeExceptionally(t);
 554         } else {
 555             debug.log(Level.DEBUG, "onReadError", t);
 556         }
 557         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
 558         connection.close();
 559     }
 560 
 561     // ========================================================================
 562 
 563     private State advance(State previous) {
 564         assert readProgress == previous;
 565         switch(previous) {
 566             case READING_HEADERS:
 567                 asyncReceiver.unsubscribe(headersReader);
 568                 return readProgress = State.READING_BODY;
 569             case READING_BODY:
 570                 asyncReceiver.unsubscribe(bodyReader);
 571                 return readProgress = State.DONE;
 572             default:
 573                 throw new InternalError("can't advance from " + previous);
 574         }
 575     }
 576 
 577     Receiver<?> receiver(State state) {
 578         switch(state) {
 579             case READING_HEADERS: return headersReader;
 580             case READING_BODY: return bodyReader;
 581             default: return null;
 582         }
 583 
 584     }
 585 
 586     static abstract class Receiver<T>
 587             implements Http1AsyncReceiver.Http1AsyncDelegate {
 588         abstract void start(T parser);
 589         abstract CompletableFuture<State> completion();
 590         // accepts a buffer from upstream.
 591         // this should be implemented as a simple call to
 592         // accept(ref, parser, cf)
 593         public abstract boolean tryAsyncReceive(ByteBuffer buffer);
 594         public abstract void onReadError(Throwable t);
 595         // handle a byte buffer received from upstream.
 596         // this method should set the value of Http1Response.buffer
 597         // to ref.get() before beginning parsing.
 598         abstract void handle(ByteBuffer buf, T parser,
 599                              CompletableFuture<State> cf);
 600         // resets this objects state so that it can be reused later on
 601         // typically puts the reference to parser and completion to null
 602         abstract void reset();
 603 
 604         // accepts a byte buffer received from upstream
 605         // returns true if the buffer is fully parsed and more data can
 606         // be accepted, false otherwise.
 607         final boolean accept(ByteBuffer buf, T parser,
 608                 CompletableFuture<State> cf) {
 609             if (cf == null || parser == null || cf.isDone()) return false;
 610             handle(buf, parser, cf);
 611             return !cf.isDone();
 612         }
 613         public abstract void onSubscribe(AbstractSubscription s);
 614         public abstract AbstractSubscription subscription();
 615 
 616     }
 617 
 618     // Invoked with each new ByteBuffer when reading headers...
 619     final class HeadersReader extends Receiver<Http1HeaderParser> {
 620         final Consumer<State> onComplete;
 621         volatile Http1HeaderParser parser;
 622         volatile CompletableFuture<State> cf;
 623         volatile long count; // bytes parsed (for debug)
 624         volatile AbstractSubscription subscription;
 625 
 626         HeadersReader(Consumer<State> onComplete) {
 627             this.onComplete = onComplete;
 628         }
 629 
 630         @Override
 631         public AbstractSubscription subscription() {
 632             return subscription;
 633         }
 634 
 635         @Override
 636         public void onSubscribe(AbstractSubscription s) {
 637             this.subscription = s;
 638             s.request(1);
 639         }
 640 
 641         @Override
 642         void reset() {
 643             cf = null;
 644             parser = null;
 645             count = 0;
 646             subscription = null;
 647         }
 648 
 649         // Revisit: do we need to support restarting?
 650         @Override
 651         final void start(Http1HeaderParser hp) {
 652             count = 0;
 653             cf = new MinimalFuture<>();
 654             parser = hp;
 655         }
 656 
 657         @Override
 658         CompletableFuture<State> completion() {
 659             return cf;
 660         }
 661 
 662         @Override
 663         public final boolean tryAsyncReceive(ByteBuffer ref) {
 664             boolean hasDemand = subscription.demand().tryDecrement();
 665             assert hasDemand;
 666             boolean needsMore = accept(ref, parser, cf);
 667             if (needsMore) subscription.request(1);
 668             return needsMore;
 669         }
 670 
 671         @Override
 672         public final void onReadError(Throwable t) {
 673             t = wrapWithExtraDetail(t, parser::currentStateMessage);
 674             Http1Response.this.onReadError(t);
 675         }
 676 
 677         @Override
 678         final void handle(ByteBuffer b,
 679                           Http1HeaderParser parser,
 680                           CompletableFuture<State> cf) {
 681             assert cf != null : "parsing not started";
 682             assert parser != null : "no parser";
 683             try {
 684                 count += b.remaining();
 685                 if (debug.on())
 686                     debug.log("Sending " + b.remaining() + "/" + b.capacity()
 687                               + " bytes to header parser");
 688                 if (parser.parse(b)) {
 689                     count -= b.remaining();
 690                     if (debug.on())
 691                         debug.log("Parsing headers completed. bytes=" + count);
 692                     onComplete.accept(State.READING_HEADERS);
 693                     cf.complete(State.READING_HEADERS);
 694                 }
 695             } catch (Throwable t) {
 696                 if (debug.on())
 697                     debug.log("Header parser failed to handle buffer: " + t);
 698                 cf.completeExceptionally(t);
 699             }
 700         }
 701 
 702         @Override
 703         public void close(Throwable error) {
 704             // if there's no error nothing to do: the cf should/will
 705             // be completed.
 706             if (error != null) {
 707                 CompletableFuture<State> cf = this.cf;
 708                 if (cf != null) {
 709                     if (debug.on())
 710                         debug.log("close: completing header parser CF with " + error);
 711                     cf.completeExceptionally(error);
 712                 }
 713             }
 714         }
 715     }
 716 
 717     // Invoked with each new ByteBuffer when reading bodies...
 718     final class BodyReader extends Receiver<BodyParser> {
 719         final Consumer<State> onComplete;
 720         volatile BodyParser parser;
 721         volatile CompletableFuture<State> cf;
 722         volatile AbstractSubscription subscription;
 723         BodyReader(Consumer<State> onComplete) {
 724             this.onComplete = onComplete;
 725         }
 726 
 727         @Override
 728         void reset() {
 729             parser = null;
 730             cf = null;
 731             subscription = null;
 732         }
 733 
 734         // Revisit: do we need to support restarting?
 735         @Override
 736         final void start(BodyParser parser) {
 737             cf = new MinimalFuture<>();
 738             this.parser = parser;
 739         }
 740 
 741         @Override
 742         CompletableFuture<State> completion() {
 743             return cf;
 744         }
 745 
 746         @Override
 747         public final boolean tryAsyncReceive(ByteBuffer b) {
 748             return accept(b, parser, cf);
 749         }
 750 
 751         @Override
 752         public final void onReadError(Throwable t) {
 753             if (t instanceof EOFException && bodyParser != null &&
 754                     bodyParser instanceof UnknownLengthBodyParser) {
 755                 ((UnknownLengthBodyParser)bodyParser).complete();
 756                 return;
 757             }
 758             t = wrapWithExtraDetail(t, parser::currentStateMessage);
 759             Http1Response.this.onReadError(t);
 760         }
 761 
 762         @Override
 763         public AbstractSubscription subscription() {
 764             return subscription;
 765         }
 766 
 767         @Override
 768         public void onSubscribe(AbstractSubscription s) {
 769             this.subscription = s;
 770             try {
 771                 parser.onSubscribe(s);
 772             } catch (Throwable t) {
 773                 cf.completeExceptionally(t);
 774                 throw t;
 775             }
 776         }
 777 
 778         @Override
 779         final void handle(ByteBuffer b,
 780                           BodyParser parser,
 781                           CompletableFuture<State> cf) {
 782             assert cf != null : "parsing not started";
 783             assert parser != null : "no parser";
 784             try {
 785                 if (debug.on())
 786                     debug.log("Sending " + b.remaining() + "/" + b.capacity()
 787                               + " bytes to body parser");
 788                 parser.accept(b);
 789             } catch (Throwable t) {
 790                 if (debug.on())
 791                     debug.log("Body parser failed to handle buffer: " + t);
 792                 if (!cf.isDone()) {
 793                     cf.completeExceptionally(t);
 794                 }
 795             }
 796         }
 797 
 798         final void onComplete(Throwable closedExceptionally) {
 799             if (cf.isDone()) return;
 800             if (closedExceptionally != null) {
 801                 cf.completeExceptionally(closedExceptionally);
 802             } else {
 803                 onComplete.accept(State.READING_BODY);
 804                 cf.complete(State.READING_BODY);
 805             }
 806         }
 807 
 808         @Override
 809         public final void close(Throwable error) {
 810             CompletableFuture<State> cf = this.cf;
 811             if (cf != null && !cf.isDone()) {
 812                 // we want to make sure dependent actions are triggered
 813                 // in order to make sure the client reference count
 814                 // is decremented
 815                 if (error != null) {
 816                     if (debug.on())
 817                         debug.log("close: completing body parser CF with " + error);
 818                     cf.completeExceptionally(error);
 819                 } else {
 820                     if (debug.on())
 821                         debug.log("close: completing body parser CF");
 822                     cf.complete(State.READING_BODY);
 823                 }
 824             }
 825         }
 826 
 827         @Override
 828         public String toString() {
 829             return super.toString() + "/parser=" + String.valueOf(parser);
 830         }
 831     }
 832 }