1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.http.HttpResponse.BodyHandler;
  31 import java.net.http.HttpResponse.BodySubscriber;
  32 import java.nio.ByteBuffer;
  33 import java.util.Objects;
  34 import java.util.concurrent.CompletableFuture;
  35 import java.util.LinkedList;
  36 import java.util.List;
  37 import java.util.concurrent.ConcurrentLinkedDeque;
  38 import java.util.concurrent.Executor;
  39 import java.util.concurrent.Flow;
  40 import jdk.internal.net.http.common.Demand;
  41 import jdk.internal.net.http.common.Log;
  42 import jdk.internal.net.http.common.FlowTube;
  43 import jdk.internal.net.http.common.Logger;
  44 import jdk.internal.net.http.common.SequentialScheduler;
  45 import jdk.internal.net.http.common.MinimalFuture;
  46 import jdk.internal.net.http.common.Utils;
  47 import static java.net.http.HttpClient.Version.HTTP_1_1;
  48 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
  49 
  50 /**
  51  * Encapsulates one HTTP/1.1 request/response exchange.
  52  */
  53 class Http1Exchange<T> extends ExchangeImpl<T> {
  54 
  55     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  56     final HttpRequestImpl request; // main request
  57     final Http1Request requestAction;
  58     private volatile Http1Response<T> response;
  59     final HttpConnection connection;
  60     final HttpClientImpl client;
  61     final Executor executor;
  62     private final Http1AsyncReceiver asyncReceiver;
  63 
  64     /** Records a possible cancellation raised before any operation
  65      * has been initiated, or an error received while sending the request. */
  66     private Throwable failed;
  67     private final List<CompletableFuture<?>> operations; // used for cancel
  68 
  69     /** Must be held when operating on any internal state or data. */
  70     private final Object lock = new Object();
  71 
  72     /** Holds the outgoing data, either the headers or a request body part. Or
  73      * an error from the request body publisher. At most there can be ~2 pieces
  74      * of outgoing data ( onComplete|onError can be invoked without demand ).*/
  75     final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
  76 
  77     /** The write publisher, responsible for writing the complete request ( both
  78      * headers and body ( if any ). */
  79     private final Http1Publisher writePublisher = new Http1Publisher();
  80 
  81     /** Completed when the header have been published, or there is an error */
  82     private final CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
  83      /** Completed when the body has been published, or there is an error */
  84     private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
  85 
  86     /** The subscriber to the request's body published. Maybe null. */
  87     private volatile Http1BodySubscriber bodySubscriber;
  88 
  89     enum State { INITIAL,
  90                  HEADERS,
  91                  BODY,
  92                  ERROR,          // terminal state
  93                  COMPLETING,
  94                  COMPLETED }     // terminal state
  95 
  96     private State state = State.INITIAL;
  97 
  98     /** A carrier for either data or an error. Used to carry data, and communicate
  99      * errors from the request ( both headers and body ) to the exchange. */
 100     static class DataPair {
 101         Throwable throwable;
 102         List<ByteBuffer> data;
 103         DataPair(List<ByteBuffer> data, Throwable throwable){
 104             this.data = data;
 105             this.throwable = throwable;
 106         }
 107         @Override
 108         public String toString() {
 109             return "DataPair [data=" + data + ", throwable=" + throwable + "]";
 110         }
 111     }
 112 
 113     /** An abstract supertype for HTTP/1.1 body subscribers. There are two
 114      * concrete implementations: {@link Http1Request.StreamSubscriber}, and
 115      * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
 116      * fixed length bodies, respectively. */
 117     static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
 118         final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
 119         private volatile Flow.Subscription subscription;
 120         volatile boolean complete;
 121         private final Logger debug;
 122         Http1BodySubscriber(Logger debug) {
 123             assert debug != null;
 124             this.debug = debug;
 125         }
 126 
 127         /** Final sentinel in the stream of request body. */
 128         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
 129 
 130         final void request(long n) {
 131             if (debug.on())
 132                 debug.log("Http1BodySubscriber requesting %d, from %s",
 133                           n, subscription);
 134             subscription.request(n);
 135         }
 136 
 137         /** A current-state message suitable for inclusion in an exception detail message. */
 138         abstract String currentStateMessage();
 139 
 140         final boolean isSubscribed() {
 141             return subscription != null;
 142         }
 143 
 144         final void setSubscription(Flow.Subscription subscription) {
 145             this.subscription = subscription;
 146             whenSubscribed.complete(subscription);
 147         }
 148 
 149         final void cancelSubscription() {
 150             try {
 151                 subscription.cancel();
 152             } catch(Throwable t) {
 153                 String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
 154                 if (debug.on()) debug.log("%s: %s", msg, t);
 155                 Log.logError("{0}: {1}", msg, (Object)t);
 156             }
 157         }
 158 
 159         static Http1BodySubscriber completeSubscriber(Logger debug) {
 160             return new Http1BodySubscriber(debug) {
 161                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
 162                 @Override public void onNext(ByteBuffer item) { error(); }
 163                 @Override public void onError(Throwable throwable) { error(); }
 164                 @Override public void onComplete() { error(); }
 165                 @Override String currentStateMessage() { return null; }
 166                 private void error() {
 167                     throw new InternalError("should not reach here");
 168                 }
 169             };
 170         }
 171     }
 172 
 173     @Override
 174     public String toString() {
 175         return "HTTP/1.1 " + request.toString();
 176     }
 177 
 178     HttpRequestImpl request() {
 179         return request;
 180     }
 181 
 182     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
 183         throws IOException
 184     {
 185         super(exchange);
 186         this.request = exchange.request();
 187         this.client = exchange.client();
 188         this.executor = exchange.executor();
 189         this.operations = new LinkedList<>();
 190         operations.add(headersSentCF);
 191         operations.add(bodySentCF);
 192         if (connection != null) {
 193             this.connection = connection;
 194         } else {
 195             InetSocketAddress addr = request.getAddress();
 196             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
 197         }
 198         this.requestAction = new Http1Request(request, this);
 199         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
 200     }
 201 
 202     @Override
 203     HttpConnection connection() {
 204         return connection;
 205     }
 206 
 207     private void connectFlows(HttpConnection connection) {
 208         FlowTube tube =  connection.getConnectionFlow();
 209         if (debug.on()) debug.log("%s connecting flows", tube);
 210 
 211         // Connect the flow to our Http1TubeSubscriber:
 212         //   asyncReceiver.subscriber().
 213         tube.connectFlows(writePublisher,
 214                           asyncReceiver.subscriber());
 215     }
 216 
 217     @Override
 218     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 219         // create the response before sending the request headers, so that
 220         // the response can set the appropriate receivers.
 221         if (debug.on()) debug.log("Sending headers only");
 222         // If the first attempt to read something triggers EOF, or
 223         // IOException("channel reset by peer"), we're going to retry.
 224         // Instruct the asyncReceiver to throw ConnectionExpiredException
 225         // to force a retry.
 226         asyncReceiver.setRetryOnError(true);
 227         if (response == null) {
 228             response = new Http1Response<>(connection, this, asyncReceiver);
 229         }
 230 
 231         if (debug.on()) debug.log("response created in advance");
 232 
 233         CompletableFuture<Void> connectCF;
 234         if (!connection.connected()) {
 235             if (debug.on()) debug.log("initiating connect async");
 236             connectCF = connection.connectAsync(exchange)
 237                     .thenCompose(unused -> connection.finishConnect());
 238             Throwable cancelled;
 239             synchronized (lock) {
 240                 if ((cancelled = failed) == null) {
 241                     operations.add(connectCF);
 242                 }
 243             }
 244             if (cancelled != null) {
 245                 if (client.isSelectorThread()) {
 246                     executor.execute(() ->
 247                         connectCF.completeExceptionally(cancelled));
 248                 } else {
 249                     connectCF.completeExceptionally(cancelled);
 250                 }
 251             }
 252         } else {
 253             connectCF = new MinimalFuture<>();
 254             connectCF.complete(null);
 255         }
 256 
 257         return connectCF
 258                 .thenCompose(unused -> {
 259                     CompletableFuture<Void> cf = new MinimalFuture<>();
 260                     try {
 261                         asyncReceiver.whenFinished.whenComplete((r,t) -> {
 262                             if (t != null) {
 263                                 if (debug.on())
 264                                     debug.log("asyncReceiver finished (failed=%s)", (Object)t);
 265                                 if (!headersSentCF.isDone())
 266                                     headersSentCF.completeAsync(() -> this, executor);
 267                             }
 268                         });
 269                         connectFlows(connection);
 270 
 271                         if (debug.on()) debug.log("requestAction.headers");
 272                         List<ByteBuffer> data = requestAction.headers();
 273                         synchronized (lock) {
 274                             state = State.HEADERS;
 275                         }
 276                         if (debug.on()) debug.log("setting outgoing with headers");
 277                         assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
 278                         appendToOutgoing(data);
 279                         cf.complete(null);
 280                         return cf;
 281                     } catch (Throwable t) {
 282                         if (debug.on()) debug.log("Failed to send headers: %s", t);
 283                         headersSentCF.completeExceptionally(t);
 284                         bodySentCF.completeExceptionally(t);
 285                         connection.close();
 286                         cf.completeExceptionally(t);
 287                         return cf;
 288                     } })
 289                 .thenCompose(unused -> headersSentCF);
 290     }
 291 
 292     private void cancelIfFailed(Flow.Subscription s) {
 293         asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
 294             if (debug.on())
 295                 debug.log("asyncReceiver finished (failed=%s)", (Object)t);
 296             if (t != null) {
 297                 s.cancel();
 298                 // Don't complete exceptionally here as 't'
 299                 // might not be the right exception: it will
 300                 // not have been decorated yet.
 301                 // t is an exception raised by the read side,
 302                 // an EOFException or Broken Pipe...
 303                 // We are cancelling the BodyPublisher subscription
 304                 // and completing bodySentCF to allow the next step
 305                 // to flow and call readHeaderAsync, which will
 306                 // get the right exception from the asyncReceiver.
 307                 bodySentCF.complete(this);
 308             }
 309         }, executor);
 310     }
 311 
 312     @Override
 313     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 314         assert headersSentCF.isDone();
 315         if (debug.on()) debug.log("sendBodyAsync");
 316         try {
 317             bodySubscriber = requestAction.continueRequest();
 318             if (debug.on()) debug.log("bodySubscriber is %s",
 319                     bodySubscriber == null ? null : bodySubscriber.getClass());
 320             if (bodySubscriber == null) {
 321                 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
 322                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
 323             } else {
 324                 // start
 325                 bodySubscriber.whenSubscribed
 326                         .thenAccept((s) -> cancelIfFailed(s))
 327                         .thenAccept((s) -> requestMoreBody());
 328             }
 329         } catch (Throwable t) {
 330             cancelImpl(t);
 331             bodySentCF.completeExceptionally(t);
 332         }
 333         return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
 334     }
 335 
 336     @Override
 337     CompletableFuture<Response> getResponseAsync(Executor executor) {
 338         if (debug.on()) debug.log("reading headers");
 339         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
 340         Throwable cause;
 341         synchronized (lock) {
 342             operations.add(cf);
 343             cause = failed;
 344             failed = null;
 345         }
 346 
 347         if (cause != null) {
 348             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
 349                             + "\n\tCompleting exceptionally with {2}\n",
 350                          request.uri(),
 351                          request.timeout().isPresent() ?
 352                             // calling duration.toMillis() can throw an exception.
 353                             // this is just debugging, we don't care if it overflows.
 354                             (request.timeout().get().getSeconds() * 1000
 355                              + request.timeout().get().getNano() / 1000000) : -1,
 356                          cause);
 357             boolean acknowledged = cf.completeExceptionally(cause);
 358             if (debug.on())
 359                 debug.log(acknowledged ? ("completed response with " + cause)
 360                           : ("response already completed, ignoring " + cause));
 361         }
 362         return Utils.wrapForDebug(debug, "getResponseAsync", cf);
 363     }
 364 
 365     @Override
 366     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
 367                                        boolean returnConnectionToPool,
 368                                        Executor executor)
 369     {
 370         BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
 371                                                                   response.responseHeaders(),
 372                                                                   HTTP_1_1));
 373         CompletableFuture<T> bodyCF = response.readBody(bs,
 374                                                         returnConnectionToPool,
 375                                                         executor);
 376         return bodyCF;
 377     }
 378 
 379     @Override
 380     CompletableFuture<Void> ignoreBody() {
 381         return response.ignoreBody(executor);
 382     }
 383 
 384     ByteBuffer drainLeftOverBytes() {
 385         synchronized (lock) {
 386             asyncReceiver.stop();
 387             return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
 388         }
 389     }
 390 
 391     void released() {
 392         Http1Response<T> resp = this.response;
 393         if (resp != null) resp.completed();
 394         asyncReceiver.clear();
 395     }
 396 
 397     void completed() {
 398         Http1Response<T> resp = this.response;
 399         if (resp != null) resp.completed();
 400     }
 401 
 402     /**
 403      * Cancel checks to see if request and responseAsync finished already.
 404      * If not it closes the connection and completes all pending operations
 405      */
 406     @Override
 407     void cancel() {
 408         cancelImpl(new IOException("Request cancelled"));
 409     }
 410 
 411     /**
 412      * Cancel checks to see if request and responseAsync finished already.
 413      * If not it closes the connection and completes all pending operations
 414      */
 415     @Override
 416     void cancel(IOException cause) {
 417         cancelImpl(cause);
 418     }
 419 
 420     private void cancelImpl(Throwable cause) {
 421         LinkedList<CompletableFuture<?>> toComplete = null;
 422         int count = 0;
 423         Throwable error;
 424         synchronized (lock) {
 425             if ((error = failed) == null) {
 426                 failed = error = cause;
 427             }
 428             if (debug.on()) {
 429                 debug.log(request.uri() + ": " + error);
 430             }
 431             if (requestAction != null && requestAction.finished()
 432                     && response != null && response.finished()) {
 433                 return;
 434             }
 435             writePublisher.writeScheduler.stop();
 436             if (operations.isEmpty()) {
 437                 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
 438                                 + "\n\tCan''t cancel yet with {2}",
 439                              request.uri(),
 440                              request.timeout().isPresent() ?
 441                                 // calling duration.toMillis() can throw an exception.
 442                                 // this is just debugging, we don't care if it overflows.
 443                                 (request.timeout().get().getSeconds() * 1000
 444                                  + request.timeout().get().getNano() / 1000000) : -1,
 445                              cause);
 446             } else {
 447                 for (CompletableFuture<?> cf : operations) {
 448                     if (!cf.isDone()) {
 449                         if (toComplete == null) toComplete = new LinkedList<>();
 450                         toComplete.add(cf);
 451                         count++;
 452                     }
 453                 }
 454                 operations.clear();
 455             }
 456         }
 457         try {
 458             Log.logError("Http1Exchange.cancel: count=" + count);
 459             if (toComplete != null) {
 460                 // We might be in the selector thread in case of timeout, when
 461                 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
 462                 // There may or may not be other places that reach here
 463                 // from the SelectorManager thread, so just make sure we
 464                 // don't complete any CF from within the selector manager
 465                 // thread.
 466                 Executor exec = client.isSelectorThread()
 467                         ? executor
 468                         : this::runInline;
 469                 Throwable x = error;
 470                 while (!toComplete.isEmpty()) {
 471                     CompletableFuture<?> cf = toComplete.poll();
 472                     exec.execute(() -> {
 473                         if (cf.completeExceptionally(x)) {
 474                             if (debug.on())
 475                                 debug.log("%s: completed cf with %s", request.uri(), x);
 476                         }
 477                     });
 478                 }
 479             }
 480         } finally {
 481             connection.close();
 482         }
 483     }
 484 
 485     private void runInline(Runnable run) {
 486         assert !client.isSelectorThread();
 487         run.run();
 488     }
 489 
 490     /** Returns true if this exchange was canceled. */
 491     boolean isCanceled() {
 492         synchronized (lock) {
 493             return failed != null;
 494         }
 495     }
 496 
 497     /** Returns the cause for which this exchange was canceled, if available. */
 498     Throwable getCancelCause() {
 499         synchronized (lock) {
 500             return failed;
 501         }
 502     }
 503 
 504     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
 505     void appendToOutgoing(Throwable throwable) {
 506         appendToOutgoing(new DataPair(null, throwable));
 507     }
 508 
 509     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
 510     void appendToOutgoing(List<ByteBuffer> item) {
 511         appendToOutgoing(new DataPair(item, null));
 512     }
 513 
 514     private void appendToOutgoing(DataPair dp) {
 515         if (debug.on()) debug.log("appending to outgoing " + dp);
 516         outgoing.add(dp);
 517         writePublisher.writeScheduler.runOrSchedule();
 518     }
 519 
 520     /** Tells whether, or not, there is any outgoing data that can be published,
 521      * or if there is an error. */
 522     private boolean hasOutgoing() {
 523         return !outgoing.isEmpty();
 524     }
 525 
 526     private void requestMoreBody() {
 527         try {
 528             if (debug.on()) debug.log("requesting more request body from the subscriber");
 529             bodySubscriber.request(1);
 530         } catch (Throwable t) {
 531             if (debug.on()) debug.log("Subscription::request failed", t);
 532             cancelImpl(t);
 533             bodySentCF.completeExceptionally(t);
 534         }
 535     }
 536 
 537     // Invoked only by the publisher
 538     // ALL tasks should execute off the Selector-Manager thread
 539     /** Returns the next portion of the HTTP request, or the error. */
 540     private DataPair getOutgoing() {
 541         final Executor exec = client.theExecutor();
 542         final DataPair dp = outgoing.pollFirst();
 543 
 544         if (writePublisher.cancelled) {
 545             if (debug.on()) debug.log("cancelling upstream publisher");
 546             if (bodySubscriber != null) {
 547                 exec.execute(bodySubscriber::cancelSubscription);
 548             } else if (debug.on()) {
 549                 debug.log("bodySubscriber is null");
 550             }
 551             headersSentCF.completeAsync(() -> this, exec);
 552             bodySentCF.completeAsync(() -> this, exec);
 553             return null;
 554         }
 555 
 556         if (dp == null)  // publisher has not published anything yet
 557             return null;
 558 
 559         if (dp.throwable != null) {
 560             synchronized (lock) {
 561                 state = State.ERROR;
 562             }
 563             exec.execute(() -> {
 564                 headersSentCF.completeExceptionally(dp.throwable);
 565                 bodySentCF.completeExceptionally(dp.throwable);
 566                 connection.close();
 567             });
 568             return dp;
 569         }
 570 
 571         switch (state) {
 572             case HEADERS:
 573                 synchronized (lock) {
 574                     state = State.BODY;
 575                 }
 576                 // completeAsync, since dependent tasks should run in another thread
 577                 if (debug.on()) debug.log("initiating completion of headersSentCF");
 578                 headersSentCF.completeAsync(() -> this, exec);
 579                 break;
 580             case BODY:
 581                 if (dp.data == Http1BodySubscriber.COMPLETED) {
 582                     synchronized (lock) {
 583                         state = State.COMPLETING;
 584                     }
 585                     if (debug.on()) debug.log("initiating completion of bodySentCF");
 586                     bodySentCF.completeAsync(() -> this, exec);
 587                 } else {
 588                     exec.execute(this::requestMoreBody);
 589                 }
 590                 break;
 591             case INITIAL:
 592             case ERROR:
 593             case COMPLETING:
 594             case COMPLETED:
 595             default:
 596                 assert false : "Unexpected state:" + state;
 597         }
 598 
 599         return dp;
 600     }
 601 
 602     /** A Publisher of HTTP/1.1 headers and request body. */
 603     final class Http1Publisher implements FlowTube.TubePublisher {
 604 
 605         final Logger debug = Utils.getDebugLogger(this::dbgString);
 606         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 607         volatile boolean cancelled;
 608         final Http1WriteSubscription subscription = new Http1WriteSubscription();
 609         final Demand demand = new Demand();
 610         final SequentialScheduler writeScheduler =
 611                 SequentialScheduler.synchronizedScheduler(new WriteTask());
 612 
 613         @Override
 614         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 615             assert state == State.INITIAL;
 616             Objects.requireNonNull(s);
 617             assert subscriber == null;
 618 
 619             subscriber = s;
 620             if (debug.on()) debug.log("got subscriber: %s", s);
 621             s.onSubscribe(subscription);
 622         }
 623 
 624         volatile String dbgTag;
 625         String dbgString() {
 626             String tag = dbgTag;
 627             Object flow = connection.getConnectionFlow();
 628             if (tag == null && flow != null) {
 629                 dbgTag = tag = "Http1Publisher(" + flow + ")";
 630             } else if (tag == null) {
 631                 tag = "Http1Publisher(?)";
 632             }
 633             return tag;
 634         }
 635 
 636         final class WriteTask implements Runnable {
 637             @Override
 638             public void run() {
 639                 assert state != State.COMPLETED : "Unexpected state:" + state;
 640                 if (debug.on()) debug.log("WriteTask");
 641 
 642                 if (cancelled) {
 643                     if (debug.on()) debug.log("handling cancellation");
 644                     writeScheduler.stop();
 645                     getOutgoing();
 646                     return;
 647                 }
 648 
 649                 if (subscriber == null) {
 650                     if (debug.on()) debug.log("no subscriber yet");
 651                     return;
 652                 }
 653                 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
 654                 while (hasOutgoing() && demand.tryDecrement()) {
 655                     DataPair dp = getOutgoing();
 656                     if (dp == null)
 657                         break;
 658 
 659                     if (dp.throwable != null) {
 660                         if (debug.on()) debug.log("onError");
 661                         // Do not call the subscriber's onError, it is not required.
 662                         writeScheduler.stop();
 663                     } else {
 664                         List<ByteBuffer> data = dp.data;
 665                         if (data == Http1BodySubscriber.COMPLETED) {
 666                             synchronized (lock) {
 667                                 assert state == State.COMPLETING : "Unexpected state:" + state;
 668                                 state = State.COMPLETED;
 669                             }
 670                             if (debug.on())
 671                                 debug.log("completed, stopping %s", writeScheduler);
 672                             writeScheduler.stop();
 673                             // Do nothing more. Just do not publish anything further.
 674                             // The next Subscriber will eventually take over.
 675 
 676                         } else {
 677                             if (debug.on())
 678                                 debug.log("onNext with " + Utils.remaining(data) + " bytes");
 679                             subscriber.onNext(data);
 680                         }
 681                     }
 682                 }
 683             }
 684         }
 685 
 686         final class Http1WriteSubscription implements Flow.Subscription {
 687 
 688             @Override
 689             public void request(long n) {
 690                 if (cancelled)
 691                     return;  //no-op
 692                 demand.increase(n);
 693                 if (debug.on())
 694                     debug.log("subscription request(%d), demand=%s", n, demand);
 695                 writeScheduler.runOrSchedule(client.theExecutor());
 696             }
 697 
 698             @Override
 699             public void cancel() {
 700                 if (debug.on()) debug.log("subscription cancelled");
 701                 if (cancelled)
 702                     return;  //no-op
 703                 cancelled = true;
 704                 writeScheduler.runOrSchedule(client.theExecutor());
 705             }
 706         }
 707     }
 708 
 709     String dbgString() {
 710         return "Http1Exchange";
 711     }
 712 }