1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.http.HttpResponse.BodyHandler;
  31 import java.net.http.HttpResponse.BodySubscriber;
  32 import java.nio.ByteBuffer;
  33 import java.util.Objects;
  34 import java.util.concurrent.CompletableFuture;
  35 import java.util.LinkedList;
  36 import java.util.List;
  37 import java.util.concurrent.ConcurrentLinkedDeque;
  38 import java.util.concurrent.Executor;
  39 import java.util.concurrent.Flow;
  40 import jdk.internal.net.http.common.Demand;
  41 import jdk.internal.net.http.common.Log;
  42 import jdk.internal.net.http.common.FlowTube;
  43 import jdk.internal.net.http.common.Logger;
  44 import jdk.internal.net.http.common.SequentialScheduler;
  45 import jdk.internal.net.http.common.MinimalFuture;
  46 import jdk.internal.net.http.common.Utils;
  47 import static java.net.http.HttpClient.Version.HTTP_1_1;
  48 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
  49 
  50 /**
  51  * Encapsulates one HTTP/1.1 request/response exchange.
  52  */
  53 class Http1Exchange<T> extends ExchangeImpl<T> {
  54 
  55     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  56     final HttpRequestImpl request; // main request
  57     final Http1Request requestAction;
  58     private volatile Http1Response<T> response;
  59     final HttpConnection connection;
  60     final HttpClientImpl client;
  61     final Executor executor;
  62     private final Http1AsyncReceiver asyncReceiver;
  63 
  64     /** Records a possible cancellation raised before any operation
  65      * has been initiated, or an error received while sending the request. */
  66     private Throwable failed;
  67     private final List<CompletableFuture<?>> operations; // used for cancel
  68 
  69     /** Must be held when operating on any internal state or data. */
  70     private final Object lock = new Object();
  71 
  72     /** Holds the outgoing data, either the headers or a request body part. Or
  73      * an error from the request body publisher. At most there can be ~2 pieces
  74      * of outgoing data ( onComplete|onError can be invoked without demand ).*/
  75     final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
  76 
  77     /** The write publisher, responsible for writing the complete request ( both
  78      * headers and body ( if any ). */
  79     private final Http1Publisher writePublisher = new Http1Publisher();
  80 
  81     /** Completed when the header have been published, or there is an error */
  82     private final CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
  83      /** Completed when the body has been published, or there is an error */
  84     private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
  85 
  86     /** The subscriber to the request's body published. Maybe null. */
  87     private volatile Http1BodySubscriber bodySubscriber;
  88 
  89     enum State { INITIAL,
  90                  HEADERS,
  91                  BODY,
  92                  ERROR,          // terminal state
  93                  COMPLETING,
  94                  COMPLETED }     // terminal state
  95 
  96     private State state = State.INITIAL;
  97 
  98     /** A carrier for either data or an error. Used to carry data, and communicate
  99      * errors from the request ( both headers and body ) to the exchange. */
 100     static class DataPair {
 101         Throwable throwable;
 102         List<ByteBuffer> data;
 103         DataPair(List<ByteBuffer> data, Throwable throwable){
 104             this.data = data;
 105             this.throwable = throwable;
 106         }
 107         @Override
 108         public String toString() {
 109             return "DataPair [data=" + data + ", throwable=" + throwable + "]";
 110         }
 111     }
 112 
 113     /** An abstract supertype for HTTP/1.1 body subscribers. There are two
 114      * concrete implementations: {@link Http1Request.StreamSubscriber}, and
 115      * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
 116      * fixed length bodies, respectively. */
 117     static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
 118         final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
 119         private volatile Flow.Subscription subscription;
 120         volatile boolean complete;
 121         private final Logger debug;
 122         Http1BodySubscriber(Logger debug) {
 123             assert debug != null;
 124             this.debug = debug;
 125         }
 126 
 127         /** Final sentinel in the stream of request body. */
 128         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
 129 
 130         final void request(long n) {
 131             if (debug.on())
 132                 debug.log("Http1BodySubscriber requesting %d, from %s",
 133                           n, subscription);
 134             subscription.request(n);
 135         }
 136 
 137         /** A current-state message suitable for inclusion in an exception detail message. */
 138         abstract String currentStateMessage();
 139 
 140         final boolean isSubscribed() {
 141             return subscription != null;
 142         }
 143 
 144         final void setSubscription(Flow.Subscription subscription) {
 145             this.subscription = subscription;
 146             whenSubscribed.complete(subscription);
 147         }
 148 
 149         final void cancelSubscription() {
 150             try {
 151                 subscription.cancel();
 152             } catch(Throwable t) {
 153                 String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
 154                 if (debug.on()) debug.log("%s: %s", msg, t);
 155                 Log.logError("{0}: {1}", msg, (Object)t);
 156             }
 157         }
 158 
 159         static Http1BodySubscriber completeSubscriber(Logger debug) {
 160             return new Http1BodySubscriber(debug) {
 161                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
 162                 @Override public void onNext(ByteBuffer item) { error(); }
 163                 @Override public void onError(Throwable throwable) { error(); }
 164                 @Override public void onComplete() { error(); }
 165                 @Override String currentStateMessage() { return null; }
 166                 private void error() {
 167                     throw new InternalError("should not reach here");
 168                 }
 169             };
 170         }
 171     }
 172 
 173     @Override
 174     public String toString() {
 175         return "HTTP/1.1 " + request.toString();
 176     }
 177 
 178     HttpRequestImpl request() {
 179         return request;
 180     }
 181 
 182     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
 183         throws IOException
 184     {
 185         super(exchange);
 186         this.request = exchange.request();
 187         this.client = exchange.client();
 188         this.executor = exchange.executor();
 189         this.operations = new LinkedList<>();
 190         operations.add(headersSentCF);
 191         operations.add(bodySentCF);
 192         if (connection != null) {
 193             this.connection = connection;
 194         } else {
 195             InetSocketAddress addr = request.getAddress();
 196             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
 197         }
 198         this.requestAction = new Http1Request(request, this);
 199         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
 200     }
 201 
 202     @Override
 203     HttpConnection connection() {
 204         return connection;
 205     }
 206 
 207     private void connectFlows(HttpConnection connection) {
 208         FlowTube tube =  connection.getConnectionFlow();
 209         if (debug.on()) debug.log("%s connecting flows", tube);
 210 
 211         // Connect the flow to our Http1TubeSubscriber:
 212         //   asyncReceiver.subscriber().
 213         tube.connectFlows(writePublisher,
 214                           asyncReceiver.subscriber());
 215     }
 216 
 217     @Override
 218     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 219         // create the response before sending the request headers, so that
 220         // the response can set the appropriate receivers.
 221         if (debug.on()) debug.log("Sending headers only");
 222         // If the first attempt to read something triggers EOF, or
 223         // IOException("channel reset by peer"), we're going to retry.
 224         // Instruct the asyncReceiver to throw ConnectionExpiredException
 225         // to force a retry.
 226         asyncReceiver.setRetryOnError(true);
 227         if (response == null) {
 228             response = new Http1Response<>(connection, this, asyncReceiver);
 229         }
 230 
 231         if (debug.on()) debug.log("response created in advance");
 232 
 233         CompletableFuture<Void> connectCF;
 234         if (!connection.connected()) {
 235             if (debug.on()) debug.log("initiating connect async");
 236             connectCF = connection.connectAsync();
 237             Throwable cancelled;
 238             synchronized (lock) {
 239                 if ((cancelled = failed) == null) {
 240                     operations.add(connectCF);
 241                 }
 242             }
 243             if (cancelled != null) {
 244                 if (client.isSelectorThread()) {
 245                     executor.execute(() ->
 246                         connectCF.completeExceptionally(cancelled));
 247                 } else {
 248                     connectCF.completeExceptionally(cancelled);
 249                 }
 250             }
 251         } else {
 252             connectCF = new MinimalFuture<>();
 253             connectCF.complete(null);
 254         }
 255 
 256         return connectCF
 257                 .thenCompose(unused -> {
 258                     CompletableFuture<Void> cf = new MinimalFuture<>();
 259                     try {
 260                         asyncReceiver.whenFinished.whenComplete((r,t) -> {
 261                             if (t != null) {
 262                                 if (debug.on())
 263                                     debug.log("asyncReceiver finished (failed=%s)", (Object)t);
 264                                 if (!headersSentCF.isDone())
 265                                     headersSentCF.completeAsync(() -> this, executor);
 266                             }
 267                         });
 268                         connectFlows(connection);
 269 
 270                         if (debug.on()) debug.log("requestAction.headers");
 271                         List<ByteBuffer> data = requestAction.headers();
 272                         synchronized (lock) {
 273                             state = State.HEADERS;
 274                         }
 275                         if (debug.on()) debug.log("setting outgoing with headers");
 276                         assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
 277                         appendToOutgoing(data);
 278                         cf.complete(null);
 279                         return cf;
 280                     } catch (Throwable t) {
 281                         if (debug.on()) debug.log("Failed to send headers: %s", t);
 282                         headersSentCF.completeExceptionally(t);
 283                         bodySentCF.completeExceptionally(t);
 284                         connection.close();
 285                         cf.completeExceptionally(t);
 286                         return cf;
 287                     } })
 288                 .thenCompose(unused -> headersSentCF);
 289     }
 290 
 291     private void cancelIfFailed(Flow.Subscription s) {
 292         asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
 293             if (debug.on())
 294                 debug.log("asyncReceiver finished (failed=%s)", (Object)t);
 295             if (t != null) {
 296                 s.cancel();
 297                 // Don't complete exceptionally here as 't'
 298                 // might not be the right exception: it will
 299                 // not have been decorated yet.
 300                 // t is an exception raised by the read side,
 301                 // an EOFException or Broken Pipe...
 302                 // We are cancelling the BodyPublisher subscription
 303                 // and completing bodySentCF to allow the next step
 304                 // to flow and call readHeaderAsync, which will
 305                 // get the right exception from the asyncReceiver.
 306                 bodySentCF.complete(this);
 307             }
 308         }, executor);
 309     }
 310 
 311     @Override
 312     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 313         assert headersSentCF.isDone();
 314         if (debug.on()) debug.log("sendBodyAsync");
 315         try {
 316             bodySubscriber = requestAction.continueRequest();
 317             if (debug.on()) debug.log("bodySubscriber is %s",
 318                     bodySubscriber == null ? null : bodySubscriber.getClass());
 319             if (bodySubscriber == null) {
 320                 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
 321                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
 322             } else {
 323                 // start
 324                 bodySubscriber.whenSubscribed
 325                         .thenAccept((s) -> cancelIfFailed(s))
 326                         .thenAccept((s) -> requestMoreBody());
 327             }
 328         } catch (Throwable t) {
 329             cancelImpl(t);
 330             bodySentCF.completeExceptionally(t);
 331         }
 332         return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
 333     }
 334 
 335     @Override
 336     CompletableFuture<Response> getResponseAsync(Executor executor) {
 337         if (debug.on()) debug.log("reading headers");
 338         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
 339         Throwable cause;
 340         synchronized (lock) {
 341             operations.add(cf);
 342             cause = failed;
 343             failed = null;
 344         }
 345 
 346         if (cause != null) {
 347             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
 348                             + "\n\tCompleting exceptionally with {2}\n",
 349                          request.uri(),
 350                          request.timeout().isPresent() ?
 351                             // calling duration.toMillis() can throw an exception.
 352                             // this is just debugging, we don't care if it overflows.
 353                             (request.timeout().get().getSeconds() * 1000
 354                              + request.timeout().get().getNano() / 1000000) : -1,
 355                          cause);
 356             boolean acknowledged = cf.completeExceptionally(cause);
 357             if (debug.on())
 358                 debug.log(acknowledged ? ("completed response with " + cause)
 359                           : ("response already completed, ignoring " + cause));
 360         }
 361         return Utils.wrapForDebug(debug, "getResponseAsync", cf);
 362     }
 363 
 364     @Override
 365     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
 366                                        boolean returnConnectionToPool,
 367                                        Executor executor)
 368     {
 369         BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
 370                                                                   response.responseHeaders(),
 371                                                                   HTTP_1_1));
 372         CompletableFuture<T> bodyCF = response.readBody(bs,
 373                                                         returnConnectionToPool,
 374                                                         executor);
 375         return bodyCF;
 376     }
 377 
 378     @Override
 379     CompletableFuture<Void> ignoreBody() {
 380         return response.ignoreBody(executor);
 381     }
 382 
 383     ByteBuffer drainLeftOverBytes() {
 384         synchronized (lock) {
 385             asyncReceiver.stop();
 386             return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
 387         }
 388     }
 389 
 390     void released() {
 391         Http1Response<T> resp = this.response;
 392         if (resp != null) resp.completed();
 393         asyncReceiver.clear();
 394     }
 395 
 396     void completed() {
 397         Http1Response<T> resp = this.response;
 398         if (resp != null) resp.completed();
 399     }
 400 
 401     /**
 402      * Cancel checks to see if request and responseAsync finished already.
 403      * If not it closes the connection and completes all pending operations
 404      */
 405     @Override
 406     void cancel() {
 407         cancelImpl(new IOException("Request cancelled"));
 408     }
 409 
 410     /**
 411      * Cancel checks to see if request and responseAsync finished already.
 412      * If not it closes the connection and completes all pending operations
 413      */
 414     @Override
 415     void cancel(IOException cause) {
 416         cancelImpl(cause);
 417     }
 418 
 419     private void cancelImpl(Throwable cause) {
 420         LinkedList<CompletableFuture<?>> toComplete = null;
 421         int count = 0;
 422         Throwable error;
 423         synchronized (lock) {
 424             if ((error = failed) == null) {
 425                 failed = error = cause;
 426             }
 427             if (debug.on()) {
 428                 debug.log(request.uri() + ": " + error);
 429             }
 430             if (requestAction != null && requestAction.finished()
 431                     && response != null && response.finished()) {
 432                 return;
 433             }
 434             writePublisher.writeScheduler.stop();
 435             if (operations.isEmpty()) {
 436                 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
 437                                 + "\n\tCan''t cancel yet with {2}",
 438                              request.uri(),
 439                              request.timeout().isPresent() ?
 440                                 // calling duration.toMillis() can throw an exception.
 441                                 // this is just debugging, we don't care if it overflows.
 442                                 (request.timeout().get().getSeconds() * 1000
 443                                  + request.timeout().get().getNano() / 1000000) : -1,
 444                              cause);
 445             } else {
 446                 for (CompletableFuture<?> cf : operations) {
 447                     if (!cf.isDone()) {
 448                         if (toComplete == null) toComplete = new LinkedList<>();
 449                         toComplete.add(cf);
 450                         count++;
 451                     }
 452                 }
 453                 operations.clear();
 454             }
 455         }
 456         try {
 457             Log.logError("Http1Exchange.cancel: count=" + count);
 458             if (toComplete != null) {
 459                 // We might be in the selector thread in case of timeout, when
 460                 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
 461                 // There may or may not be other places that reach here
 462                 // from the SelectorManager thread, so just make sure we
 463                 // don't complete any CF from within the selector manager
 464                 // thread.
 465                 Executor exec = client.isSelectorThread()
 466                         ? executor
 467                         : this::runInline;
 468                 Throwable x = error;
 469                 while (!toComplete.isEmpty()) {
 470                     CompletableFuture<?> cf = toComplete.poll();
 471                     exec.execute(() -> {
 472                         if (cf.completeExceptionally(x)) {
 473                             if (debug.on())
 474                                 debug.log("%s: completed cf with %s", request.uri(), x);
 475                         }
 476                     });
 477                 }
 478             }
 479         } finally {
 480             connection.close();
 481         }
 482     }
 483 
 484     private void runInline(Runnable run) {
 485         assert !client.isSelectorThread();
 486         run.run();
 487     }
 488 
 489     /** Returns true if this exchange was canceled. */
 490     boolean isCanceled() {
 491         synchronized (lock) {
 492             return failed != null;
 493         }
 494     }
 495 
 496     /** Returns the cause for which this exchange was canceled, if available. */
 497     Throwable getCancelCause() {
 498         synchronized (lock) {
 499             return failed;
 500         }
 501     }
 502 
 503     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
 504     void appendToOutgoing(Throwable throwable) {
 505         appendToOutgoing(new DataPair(null, throwable));
 506     }
 507 
 508     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
 509     void appendToOutgoing(List<ByteBuffer> item) {
 510         appendToOutgoing(new DataPair(item, null));
 511     }
 512 
 513     private void appendToOutgoing(DataPair dp) {
 514         if (debug.on()) debug.log("appending to outgoing " + dp);
 515         outgoing.add(dp);
 516         writePublisher.writeScheduler.runOrSchedule();
 517     }
 518 
 519     /** Tells whether, or not, there is any outgoing data that can be published,
 520      * or if there is an error. */
 521     private boolean hasOutgoing() {
 522         return !outgoing.isEmpty();
 523     }
 524 
 525     private void requestMoreBody() {
 526         try {
 527             if (debug.on()) debug.log("requesting more request body from the subscriber");
 528             bodySubscriber.request(1);
 529         } catch (Throwable t) {
 530             if (debug.on()) debug.log("Subscription::request failed", t);
 531             cancelImpl(t);
 532             bodySentCF.completeExceptionally(t);
 533         }
 534     }
 535 
 536     // Invoked only by the publisher
 537     // ALL tasks should execute off the Selector-Manager thread
 538     /** Returns the next portion of the HTTP request, or the error. */
 539     private DataPair getOutgoing() {
 540         final Executor exec = client.theExecutor();
 541         final DataPair dp = outgoing.pollFirst();
 542 
 543         if (writePublisher.cancelled) {
 544             if (debug.on()) debug.log("cancelling upstream publisher");
 545             if (bodySubscriber != null) {
 546                 exec.execute(bodySubscriber::cancelSubscription);
 547             } else if (debug.on()) {
 548                 debug.log("bodySubscriber is null");
 549             }
 550             headersSentCF.completeAsync(() -> this, exec);
 551             bodySentCF.completeAsync(() -> this, exec);
 552             return null;
 553         }
 554 
 555         if (dp == null)  // publisher has not published anything yet
 556             return null;
 557 
 558         if (dp.throwable != null) {
 559             synchronized (lock) {
 560                 state = State.ERROR;
 561             }
 562             exec.execute(() -> {
 563                 headersSentCF.completeExceptionally(dp.throwable);
 564                 bodySentCF.completeExceptionally(dp.throwable);
 565                 connection.close();
 566             });
 567             return dp;
 568         }
 569 
 570         switch (state) {
 571             case HEADERS:
 572                 synchronized (lock) {
 573                     state = State.BODY;
 574                 }
 575                 // completeAsync, since dependent tasks should run in another thread
 576                 if (debug.on()) debug.log("initiating completion of headersSentCF");
 577                 headersSentCF.completeAsync(() -> this, exec);
 578                 break;
 579             case BODY:
 580                 if (dp.data == Http1BodySubscriber.COMPLETED) {
 581                     synchronized (lock) {
 582                         state = State.COMPLETING;
 583                     }
 584                     if (debug.on()) debug.log("initiating completion of bodySentCF");
 585                     bodySentCF.completeAsync(() -> this, exec);
 586                 } else {
 587                     exec.execute(this::requestMoreBody);
 588                 }
 589                 break;
 590             case INITIAL:
 591             case ERROR:
 592             case COMPLETING:
 593             case COMPLETED:
 594             default:
 595                 assert false : "Unexpected state:" + state;
 596         }
 597 
 598         return dp;
 599     }
 600 
 601     /** A Publisher of HTTP/1.1 headers and request body. */
 602     final class Http1Publisher implements FlowTube.TubePublisher {
 603 
 604         final Logger debug = Utils.getDebugLogger(this::dbgString);
 605         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 606         volatile boolean cancelled;
 607         final Http1WriteSubscription subscription = new Http1WriteSubscription();
 608         final Demand demand = new Demand();
 609         final SequentialScheduler writeScheduler =
 610                 SequentialScheduler.synchronizedScheduler(new WriteTask());
 611 
 612         @Override
 613         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 614             assert state == State.INITIAL;
 615             Objects.requireNonNull(s);
 616             assert subscriber == null;
 617 
 618             subscriber = s;
 619             if (debug.on()) debug.log("got subscriber: %s", s);
 620             s.onSubscribe(subscription);
 621         }
 622 
 623         volatile String dbgTag;
 624         String dbgString() {
 625             String tag = dbgTag;
 626             Object flow = connection.getConnectionFlow();
 627             if (tag == null && flow != null) {
 628                 dbgTag = tag = "Http1Publisher(" + flow + ")";
 629             } else if (tag == null) {
 630                 tag = "Http1Publisher(?)";
 631             }
 632             return tag;
 633         }
 634 
 635         final class WriteTask implements Runnable {
 636             @Override
 637             public void run() {
 638                 assert state != State.COMPLETED : "Unexpected state:" + state;
 639                 if (debug.on()) debug.log("WriteTask");
 640 
 641                 if (cancelled) {
 642                     if (debug.on()) debug.log("handling cancellation");
 643                     writeScheduler.stop();
 644                     getOutgoing();
 645                     return;
 646                 }
 647 
 648                 if (subscriber == null) {
 649                     if (debug.on()) debug.log("no subscriber yet");
 650                     return;
 651                 }
 652                 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
 653                 while (hasOutgoing() && demand.tryDecrement()) {
 654                     DataPair dp = getOutgoing();
 655                     if (dp == null)
 656                         break;
 657 
 658                     if (dp.throwable != null) {
 659                         if (debug.on()) debug.log("onError");
 660                         // Do not call the subscriber's onError, it is not required.
 661                         writeScheduler.stop();
 662                     } else {
 663                         List<ByteBuffer> data = dp.data;
 664                         if (data == Http1BodySubscriber.COMPLETED) {
 665                             synchronized (lock) {
 666                                 assert state == State.COMPLETING : "Unexpected state:" + state;
 667                                 state = State.COMPLETED;
 668                             }
 669                             if (debug.on())
 670                                 debug.log("completed, stopping %s", writeScheduler);
 671                             writeScheduler.stop();
 672                             // Do nothing more. Just do not publish anything further.
 673                             // The next Subscriber will eventually take over.
 674 
 675                         } else {
 676                             if (debug.on())
 677                                 debug.log("onNext with " + Utils.remaining(data) + " bytes");
 678                             subscriber.onNext(data);
 679                         }
 680                     }
 681                 }
 682             }
 683         }
 684 
 685         final class Http1WriteSubscription implements Flow.Subscription {
 686 
 687             @Override
 688             public void request(long n) {
 689                 if (cancelled)
 690                     return;  //no-op
 691                 demand.increase(n);
 692                 if (debug.on())
 693                     debug.log("subscription request(%d), demand=%s", n, demand);
 694                 writeScheduler.runOrSchedule(client.theExecutor());
 695             }
 696 
 697             @Override
 698             public void cancel() {
 699                 if (debug.on()) debug.log("subscription cancelled");
 700                 if (cancelled)
 701                     return;  //no-op
 702                 cancelled = true;
 703                 writeScheduler.runOrSchedule(client.theExecutor());
 704             }
 705         }
 706     }
 707 
 708     String dbgString() {
 709         return "Http1Exchange";
 710     }
 711 }