1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.InetSocketAddress;
  31 import jdk.incubator.http.HttpResponse.BodyHandler;
  32 import jdk.incubator.http.HttpResponse.BodySubscriber;
  33 import java.nio.ByteBuffer;
  34 import java.util.Objects;
  35 import java.util.concurrent.CompletableFuture;
  36 import java.util.LinkedList;
  37 import java.util.List;
  38 import java.util.concurrent.ConcurrentLinkedDeque;
  39 import java.util.concurrent.Executor;
  40 import java.util.concurrent.Flow;
  41 import jdk.incubator.http.internal.common.Demand;
  42 import jdk.incubator.http.internal.common.Log;
  43 import jdk.incubator.http.internal.common.FlowTube;
  44 import jdk.incubator.http.internal.common.SequentialScheduler;
  45 import jdk.incubator.http.internal.common.MinimalFuture;
  46 import jdk.incubator.http.internal.common.Utils;
  47 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
  48 
  49 /**
  50  * Encapsulates one HTTP/1.1 request/response exchange.
  51  */
  52 class Http1Exchange<T> extends ExchangeImpl<T> {
  53 
  54     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  55     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  56     private static final System.Logger DEBUG_LOGGER =
  57             Utils.getDebugLogger("Http1Exchange"::toString, DEBUG);
  58 
  59     final HttpRequestImpl request; // main request
  60     final Http1Request requestAction;
  61     private volatile Http1Response<T> response;
  62     final HttpConnection connection;
  63     final HttpClientImpl client;
  64     final Executor executor;
  65     private final Http1AsyncReceiver asyncReceiver;
  66 
  67     /** Records a possible cancellation raised before any operation
  68      * has been initiated, or an error received while sending the request. */
  69     private Throwable failed;
  70     private final List<CompletableFuture<?>> operations; // used for cancel
  71 
  72     /** Must be held when operating on any internal state or data. */
  73     private final Object lock = new Object();
  74 
  75     /** Holds the outgoing data, either the headers or a request body part. Or
  76      * an error from the request body publisher. At most there can be ~2 pieces
  77      * of outgoing data ( onComplete|onError can be invoked without demand ).*/
  78     final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
  79 
  80     /** The write publisher, responsible for writing the complete request ( both
  81      * headers and body ( if any ). */
  82     private final Http1Publisher writePublisher = new Http1Publisher();
  83 
  84     /** Completed when the header have been published, or there is an error */
  85     private volatile CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
  86      /** Completed when the body has been published, or there is an error */
  87     private volatile CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
  88 
  89     /** The subscriber to the request's body published. Maybe null. */
  90     private volatile Http1BodySubscriber bodySubscriber;
  91 
  92     enum State { INITIAL,
  93                  HEADERS,
  94                  BODY,
  95                  ERROR,          // terminal state
  96                  COMPLETING,
  97                  COMPLETED }     // terminal state
  98 
  99     private State state = State.INITIAL;
 100 
 101     /** A carrier for either data or an error. Used to carry data, and communicate
 102      * errors from the request ( both headers and body ) to the exchange. */
 103     static class DataPair {
 104         Throwable throwable;
 105         List<ByteBuffer> data;
 106         DataPair(List<ByteBuffer> data, Throwable throwable){
 107             this.data = data;
 108             this.throwable = throwable;
 109         }
 110         @Override
 111         public String toString() {
 112             return "DataPair [data=" + data + ", throwable=" + throwable + "]";
 113         }
 114     }
 115 
 116     /** An abstract supertype for HTTP/1.1 body subscribers. There are two
 117      * concrete implementations: {@link Http1Request.StreamSubscriber}, and
 118      * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
 119      * fixed length bodies, respectively. */
 120     static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
 121         protected volatile Flow.Subscription subscription;
 122         protected volatile boolean complete;
 123 
 124         /** Final sentinel in the stream of request body. */
 125         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
 126 
 127         void request(long n) {
 128             DEBUG_LOGGER.log(Level.DEBUG, () ->
 129                 "Http1BodySubscriber requesting " + n + ", from " + subscription);
 130             subscription.request(n);
 131         }
 132 
 133         static Http1BodySubscriber completeSubscriber() {
 134             return new Http1BodySubscriber() {
 135                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
 136                 @Override public void onNext(ByteBuffer item) { error(); }
 137                 @Override public void onError(Throwable throwable) { error(); }
 138                 @Override public void onComplete() { error(); }
 139                 private void error() {
 140                     throw new InternalError("should not reach here");
 141                 }
 142             };
 143         }
 144     }
 145 
 146     @Override
 147     public String toString() {
 148         return "HTTP/1.1 " + request.toString();
 149     }
 150 
 151     HttpRequestImpl request() {
 152         return request;
 153     }
 154 
 155     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
 156         throws IOException
 157     {
 158         super(exchange);
 159         this.request = exchange.request();
 160         this.client = exchange.client();
 161         this.executor = exchange.executor();
 162         this.operations = new LinkedList<>();
 163         operations.add(headersSentCF);
 164         operations.add(bodySentCF);
 165         if (connection != null) {
 166             this.connection = connection;
 167         } else {
 168             InetSocketAddress addr = request.getAddress();
 169             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
 170         }
 171         this.requestAction = new Http1Request(request, this);
 172         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
 173         asyncReceiver.subscribe(new InitialErrorReceiver());
 174     }
 175 
 176     /** An initial receiver that handles no data, but cancels the request if
 177      * it receives an error. Will be replaced when reading response body. */
 178     final class InitialErrorReceiver implements Http1AsyncReceiver.Http1AsyncDelegate {
 179         volatile AbstractSubscription s;
 180         @Override
 181         public boolean tryAsyncReceive(ByteBuffer ref) {
 182             return false;  // no data has been processed, leave it in the queue
 183         }
 184 
 185         @Override
 186         public void onReadError(Throwable ex) {
 187             cancelImpl(ex);
 188         }
 189 
 190         @Override
 191         public void onSubscribe(AbstractSubscription s) {
 192             this.s = s;
 193         }
 194 
 195         public AbstractSubscription subscription() {
 196             return s;
 197         }
 198     }
 199 
 200     @Override
 201     HttpConnection connection() {
 202         return connection;
 203     }
 204 
 205     private void connectFlows(HttpConnection connection) {
 206         FlowTube tube =  connection.getConnectionFlow();
 207         debug.log(Level.DEBUG, "%s connecting flows", tube);
 208 
 209         // Connect the flow to our Http1TubeSubscriber:
 210         //   asyncReceiver.subscriber().
 211         tube.connectFlows(writePublisher,
 212                           asyncReceiver.subscriber());
 213     }
 214 
 215     @Override
 216     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 217         // create the response before sending the request headers, so that
 218         // the response can set the appropriate receivers.
 219         debug.log(Level.DEBUG, "Sending headers only");
 220         if (response == null) {
 221             response = new Http1Response<>(connection, this, asyncReceiver);
 222         }
 223 
 224         debug.log(Level.DEBUG, "response created in advance");
 225         // If the first attempt to read something triggers EOF, or
 226         // IOException("channel reset by peer"), we're going to retry.
 227         // Instruct the asyncReceiver to throw ConnectionExpiredException
 228         // to force a retry.
 229         asyncReceiver.setRetryOnError(true);
 230 
 231         CompletableFuture<Void> connectCF;
 232         if (!connection.connected()) {
 233             debug.log(Level.DEBUG, "initiating connect async");
 234             connectCF = connection.connectAsync();
 235             synchronized (lock) {
 236                 operations.add(connectCF);
 237             }
 238         } else {
 239             connectCF = new MinimalFuture<>();
 240             connectCF.complete(null);
 241         }
 242 
 243         return connectCF
 244                 .thenCompose(unused -> {
 245                     CompletableFuture<Void> cf = new MinimalFuture<>();
 246                     try {
 247                         connectFlows(connection);
 248 
 249                         debug.log(Level.DEBUG, "requestAction.headers");
 250                         List<ByteBuffer> data = requestAction.headers();
 251                         synchronized (lock) {
 252                             state = State.HEADERS;
 253                         }
 254                         debug.log(Level.DEBUG, "setting outgoing with headers");
 255                         assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
 256                         appendToOutgoing(data);
 257                         cf.complete(null);
 258                         return cf;
 259                     } catch (Throwable t) {
 260                         debug.log(Level.DEBUG, "Failed to send headers: %s", t);
 261                         connection.close();
 262                         cf.completeExceptionally(t);
 263                         return cf;
 264                     } })
 265                 .thenCompose(unused -> headersSentCF);
 266     }
 267 
 268     @Override
 269     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 270         assert headersSentCF.isDone();
 271         try {
 272             bodySubscriber = requestAction.continueRequest();
 273             if (bodySubscriber == null) {
 274                 bodySubscriber = Http1BodySubscriber.completeSubscriber();
 275                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
 276             } else {
 277                 bodySubscriber.request(1);  // start
 278             }
 279         } catch (Throwable t) {
 280             connection.close();
 281             bodySentCF.completeExceptionally(t);
 282         }
 283         return bodySentCF;
 284     }
 285 
 286     @Override
 287     CompletableFuture<Response> getResponseAsync(Executor executor) {
 288         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
 289         Throwable cause;
 290         synchronized (lock) {
 291             operations.add(cf);
 292             cause = failed;
 293             failed = null;
 294         }
 295 
 296         if (cause != null) {
 297             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
 298                             + "\n\tCompleting exceptionally with {2}\n",
 299                          request.uri(),
 300                          request.timeout().isPresent() ?
 301                             // calling duration.toMillis() can throw an exception.
 302                             // this is just debugging, we don't care if it overflows.
 303                             (request.timeout().get().getSeconds() * 1000
 304                              + request.timeout().get().getNano() / 1000000) : -1,
 305                          cause);
 306             boolean acknowledged = cf.completeExceptionally(cause);
 307             debug.log(Level.DEBUG,
 308                       () -> acknowledged
 309                             ? ("completed response with " + cause)
 310                             : ("response already completed, ignoring " + cause));
 311         }
 312         return cf;
 313     }
 314 
 315     @Override
 316     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
 317                                        boolean returnConnectionToPool,
 318                                        Executor executor)
 319     {
 320         BodySubscriber<T> bs = handler.apply(response.responseCode(),
 321                                              response.responseHeaders());
 322         CompletableFuture<T> bodyCF = response.readBody(bs,
 323                                                         returnConnectionToPool,
 324                                                         executor);
 325         return bodyCF;
 326     }
 327 
 328     @Override
 329     CompletableFuture<Void> ignoreBody() {
 330         return response.ignoreBody(executor);
 331     }
 332 
 333     ByteBuffer drainLeftOverBytes() {
 334         synchronized (lock) {
 335             asyncReceiver.stop();
 336             return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
 337         }
 338     }
 339 
 340     void released() {
 341         Http1Response<T> resp = this.response;
 342         if (resp != null) resp.completed();
 343         asyncReceiver.clear();
 344     }
 345 
 346     void completed() {
 347         Http1Response<T> resp = this.response;
 348         if (resp != null) resp.completed();
 349     }
 350 
 351     /**
 352      * Cancel checks to see if request and responseAsync finished already.
 353      * If not it closes the connection and completes all pending operations
 354      */
 355     @Override
 356     void cancel() {
 357         cancelImpl(new IOException("Request cancelled"));
 358     }
 359 
 360     /**
 361      * Cancel checks to see if request and responseAsync finished already.
 362      * If not it closes the connection and completes all pending operations
 363      */
 364     @Override
 365     void cancel(IOException cause) {
 366         cancelImpl(cause);
 367     }
 368 
 369     private void cancelImpl(Throwable cause) {
 370         LinkedList<CompletableFuture<?>> toComplete = null;
 371         int count = 0;
 372         synchronized (lock) {
 373             if (failed == null)
 374                 failed = cause;
 375             if (requestAction != null && requestAction.finished()
 376                     && response != null && response.finished()) {
 377                 return;
 378             }
 379             connection.close();   // TODO: ensure non-blocking if holding the lock
 380             writePublisher.writeScheduler.stop();
 381             if (operations.isEmpty()) {
 382                 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
 383                                 + "\n\tCan''t cancel yet with {2}",
 384                              request.uri(),
 385                              request.timeout().isPresent() ?
 386                                 // calling duration.toMillis() can throw an exception.
 387                                 // this is just debugging, we don't care if it overflows.
 388                                 (request.timeout().get().getSeconds() * 1000
 389                                  + request.timeout().get().getNano() / 1000000) : -1,
 390                              cause);
 391             } else {
 392                 for (CompletableFuture<?> cf : operations) {
 393                     if (!cf.isDone()) {
 394                         if (toComplete == null) toComplete = new LinkedList<>();
 395                         toComplete.add(cf);
 396                         count++;
 397                     }
 398                 }
 399                 operations.clear();
 400             }
 401         }
 402         Log.logError("Http1Exchange.cancel: count=" + count);
 403         if (toComplete != null) {
 404             // We might be in the selector thread in case of timeout, when
 405             // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
 406             // There may or may not be other places that reach here
 407             // from the SelectorManager thread, so just make sure we
 408             // don't complete any CF from within the selector manager
 409             // thread.
 410             Executor exec = client.isSelectorThread()
 411                             ? executor
 412                             : this::runInline;
 413             while (!toComplete.isEmpty()) {
 414                 CompletableFuture<?> cf = toComplete.poll();
 415                 exec.execute(() -> {
 416                     if (cf.completeExceptionally(cause)) {
 417                         debug.log(Level.DEBUG, "completed cf with %s",
 418                                  (Object) cause);
 419                     }
 420                 });
 421             }
 422         }
 423     }
 424 
 425     private void runInline(Runnable run) {
 426         assert !client.isSelectorThread();
 427         run.run();
 428     }
 429 
 430     /** Returns true if this exchange was canceled. */
 431     boolean isCanceled() {
 432         synchronized (lock) {
 433             return failed != null;
 434         }
 435     }
 436 
 437     /** Returns the cause for which this exchange was canceled, if available. */
 438     Throwable getCancelCause() {
 439         synchronized (lock) {
 440             return failed;
 441         }
 442     }
 443 
 444     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
 445     void appendToOutgoing(Throwable throwable) {
 446         appendToOutgoing(new DataPair(null, throwable));
 447     }
 448 
 449     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
 450     void appendToOutgoing(List<ByteBuffer> item) {
 451         appendToOutgoing(new DataPair(item, null));
 452     }
 453 
 454     private void appendToOutgoing(DataPair dp) {
 455         debug.log(Level.DEBUG, "appending to outgoing " + dp);
 456         outgoing.add(dp);
 457         writePublisher.writeScheduler.runOrSchedule();
 458     }
 459 
 460     /** Tells whether, or not, there is any outgoing data that can be published,
 461      * or if there is an error. */
 462     private boolean hasOutgoing() {
 463         return !outgoing.isEmpty();
 464     }
 465 
 466     // Invoked only by the publisher
 467     // ALL tasks should execute off the Selector-Manager thread
 468     /** Returns the next portion of the HTTP request, or the error. */
 469     private DataPair getOutgoing() {
 470         final Executor exec = client.theExecutor();
 471         final DataPair dp = outgoing.pollFirst();
 472 
 473         if (dp == null)  // publisher has not published anything yet
 474             return null;
 475 
 476         synchronized (lock) {
 477             if (dp.throwable != null) {
 478                 state = State.ERROR;
 479                 exec.execute(() -> {
 480                     connection.close();
 481                     headersSentCF.completeExceptionally(dp.throwable);
 482                     bodySentCF.completeExceptionally(dp.throwable);
 483                 });
 484                 return dp;
 485             }
 486 
 487             switch (state) {
 488                 case HEADERS:
 489                     state = State.BODY;
 490                     // completeAsync, since dependent tasks should run in another thread
 491                     debug.log(Level.DEBUG, "initiating completion of headersSentCF");
 492                     headersSentCF.completeAsync(() -> this, exec);
 493                     break;
 494                 case BODY:
 495                     if (dp.data == Http1BodySubscriber.COMPLETED) {
 496                         state = State.COMPLETING;
 497                         debug.log(Level.DEBUG, "initiating completion of bodySentCF");
 498                         bodySentCF.completeAsync(() -> this, exec);
 499                     } else {
 500                         debug.log(Level.DEBUG, "requesting more body from the subscriber");
 501                         exec.execute(() -> bodySubscriber.request(1));
 502                     }
 503                     break;
 504                 case INITIAL:
 505                 case ERROR:
 506                 case COMPLETING:
 507                 case COMPLETED:
 508                 default:
 509                     assert false : "Unexpected state:" + state;
 510             }
 511 
 512             return dp;
 513         }
 514     }
 515 
 516     /** A Publisher of HTTP/1.1 headers and request body. */
 517     final class Http1Publisher implements FlowTube.TubePublisher {
 518 
 519         final System.Logger  debug = Utils.getDebugLogger(this::dbgString);
 520         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 521         volatile boolean cancelled;
 522         final Http1WriteSubscription subscription = new Http1WriteSubscription();
 523         final Demand demand = new Demand();
 524         final SequentialScheduler writeScheduler =
 525                 SequentialScheduler.synchronizedScheduler(new WriteTask());
 526 
 527         @Override
 528         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 529             assert state == State.INITIAL;
 530             Objects.requireNonNull(s);
 531             assert subscriber == null;
 532 
 533             subscriber = s;
 534             debug.log(Level.DEBUG, "got subscriber: %s", s);
 535             s.onSubscribe(subscription);
 536         }
 537 
 538         volatile String dbgTag;
 539         String dbgString() {
 540             String tag = dbgTag;
 541             Object flow = connection.getConnectionFlow();
 542             if (tag == null && flow != null) {
 543                 dbgTag = tag = "Http1Publisher(" + flow + ")";
 544             } else if (tag == null) {
 545                 tag = "Http1Publisher(?)";
 546             }
 547             return tag;
 548         }
 549 
 550         final class WriteTask implements Runnable {
 551             @Override
 552             public void run() {
 553                 assert state != State.COMPLETED : "Unexpected state:" + state;
 554                 debug.log(Level.DEBUG, "WriteTask");
 555                 if (subscriber == null) {
 556                     debug.log(Level.DEBUG, "no subscriber yet");
 557                     return;
 558                 }
 559                 debug.log(Level.DEBUG, () -> "hasOutgoing = " + hasOutgoing());
 560                 if (hasOutgoing() && demand.tryDecrement()) {
 561                     DataPair dp = getOutgoing();
 562 
 563                     if (dp.throwable != null) {
 564                         debug.log(Level.DEBUG, "onError");
 565                         // Do not call the subscriber's onError, it is not required.
 566                         writeScheduler.stop();
 567                     } else {
 568                         List<ByteBuffer> data = dp.data;
 569                         if (data == Http1BodySubscriber.COMPLETED) {
 570                             synchronized (lock) {
 571                                 assert state == State.COMPLETING : "Unexpected state:" + state;
 572                                 state = State.COMPLETED;
 573                             }
 574                             debug.log(Level.DEBUG,
 575                                      "completed, stopping %s", writeScheduler);
 576                             writeScheduler.stop();
 577                             // Do nothing more. Just do not publish anything further.
 578                             // The next Subscriber will eventually take over.
 579 
 580                         } else {
 581                             debug.log(Level.DEBUG, () ->
 582                                     "onNext with " + Utils.remaining(data) + " bytes");
 583                             subscriber.onNext(data);
 584                         }
 585                     }
 586                 }
 587             }
 588         }
 589 
 590         final class Http1WriteSubscription implements Flow.Subscription {
 591 
 592             @Override
 593             public void request(long n) {
 594                 if (cancelled)
 595                     return;  //no-op
 596                 demand.increase(n);
 597                 debug.log(Level.DEBUG,
 598                         "subscription request(%d), demand=%s", n, demand);
 599                 writeScheduler.deferOrSchedule(client.theExecutor());
 600             }
 601 
 602             @Override
 603             public void cancel() {
 604                 debug.log(Level.DEBUG, "subscription cancelled");
 605                 if (cancelled)
 606                     return;  //no-op
 607                 cancelled = true;
 608                 writeScheduler.stop();
 609             }
 610         }
 611     }
 612 
 613     String dbgString() {
 614         return "Http1Exchange";
 615     }
 616 }