1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.EOFException;
  29 import java.lang.System.Logger.Level;
  30 import java.nio.ByteBuffer;
  31 import java.util.concurrent.CompletableFuture;
  32 import java.util.concurrent.CompletionStage;
  33 import java.util.concurrent.Executor;
  34 import java.util.function.BiConsumer;
  35 import java.util.function.Consumer;
  36 import java.util.function.Function;
  37 import jdk.incubator.http.ResponseContent.BodyParser;
  38 import jdk.incubator.http.internal.common.Log;
  39 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
  40 import jdk.incubator.http.internal.common.MinimalFuture;
  41 import jdk.incubator.http.internal.common.Utils;
  42 
  43 /**
  44  * Handles a HTTP/1.1 response (headers + body).
  45  * There can be more than one of these per Http exchange.
  46  */
  47 class Http1Response<T> {
  48 
  49     private volatile ResponseContent content;
  50     private final HttpRequestImpl request;
  51     private Response response;
  52     private final HttpConnection connection;
  53     private HttpHeaders headers;
  54     private int responseCode;
  55     private final Http1Exchange<T> exchange;
  56     private boolean return2Cache; // return connection to cache when finished
  57     private final HeadersReader headersReader; // used to read the headers
  58     private final BodyReader bodyReader; // used to read the body
  59     private final Http1AsyncReceiver asyncReceiver;
  60     private volatile EOFException eof;
  61     // max number of bytes of (fixed length) body to ignore on redirect
  62     private final static int MAX_IGNORE = 1024;
  63 
  64     // Revisit: can we get rid of this?
  65     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
  66     private volatile State readProgress = State.INITIAL;
  67     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  68     final System.Logger  debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
  69 
  70 
  71     Http1Response(HttpConnection conn,
  72                   Http1Exchange<T> exchange,
  73                   Http1AsyncReceiver asyncReceiver) {
  74         this.readProgress = State.INITIAL;
  75         this.request = exchange.request();
  76         this.exchange = exchange;
  77         this.connection = conn;
  78         this.asyncReceiver = asyncReceiver;
  79         headersReader = new HeadersReader(this::advance);
  80         bodyReader = new BodyReader(this::advance);
  81     }
  82 
  83    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
  84         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
  85                 + asyncReceiver.remaining() +") "  + readProgress);
  86         // with expect continue we will resume reading headers + body.
  87         asyncReceiver.unsubscribe(bodyReader);
  88         bodyReader.reset();
  89         Http1HeaderParser hd = new Http1HeaderParser();
  90         readProgress = State.READING_HEADERS;
  91         headersReader.start(hd);
  92         asyncReceiver.subscribe(headersReader);
  93         CompletableFuture<State> cf = headersReader.completion();
  94         assert cf != null : "parsing not started";
  95 
  96         Function<State, Response> lambda = (State completed) -> {
  97                 assert completed == State.READING_HEADERS;
  98                 debug.log(Level.DEBUG, () ->
  99                             "Reading Headers: creating Response object;"
 100                             + " state is now " + readProgress);
 101                 asyncReceiver.unsubscribe(headersReader);
 102                 responseCode = hd.responseCode();
 103                 headers = hd.headers();
 104 
 105                 response = new Response(request,
 106                                         exchange.getExchange(),
 107                                         headers,
 108                                         responseCode,
 109                                         HTTP_1_1);
 110                 return response;
 111             };
 112 
 113         if (executor != null) {
 114             return cf.thenApplyAsync(lambda, executor);
 115         } else {
 116             return cf.thenApply(lambda);
 117         }
 118     }
 119 
 120     private boolean finished;
 121 
 122     synchronized void completed() {
 123         finished = true;
 124     }
 125 
 126     synchronized boolean finished() {
 127         return finished;
 128     }
 129 
 130     int fixupContentLen(int clen) {
 131         if (request.method().equalsIgnoreCase("HEAD")) {
 132             return 0;
 133         }
 134         if (clen == -1) {
 135             if (headers.firstValue("Transfer-encoding").orElse("")
 136                        .equalsIgnoreCase("chunked")) {
 137                 return -1;
 138             }
 139             return 0;
 140         }
 141         return clen;
 142     }
 143 
 144     /**
 145      * Read up to MAX_IGNORE bytes discarding
 146      */
 147     public CompletableFuture<Void> ignoreBody(Executor executor) {
 148         int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 149         if (clen == -1 || clen > MAX_IGNORE) {
 150             connection.close();
 151             return MinimalFuture.completedFuture(null); // not treating as error
 152         } else {
 153             return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
 154         }
 155     }
 156 
 157     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
 158                                          boolean return2Cache,
 159                                          Executor executor) {
 160         this.return2Cache = return2Cache;
 161         final HttpResponse.BodySubscriber<U> pusher = p;
 162         final CompletionStage<U> bodyCF = p.getBody();
 163         final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
 164 
 165         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 166 
 167         final int clen = fixupContentLen(clen0);
 168 
 169         // expect-continue reads headers and body twice.
 170         // if we reach here, we must reset the headersReader state.
 171         asyncReceiver.unsubscribe(headersReader);
 172         headersReader.reset();
 173 
 174         executor.execute(() -> {
 175             try {
 176                 HttpClientImpl client = connection.client();
 177                 content = new ResponseContent(
 178                         connection, clen, headers, pusher,
 179                         this::onFinished
 180                 );
 181                 if (cf.isCompletedExceptionally()) {
 182                     // if an error occurs during subscription
 183                     connection.close();
 184                     return;
 185                 }
 186                 // increment the reference count on the HttpClientImpl
 187                 // to prevent the SelectorManager thread from exiting until
 188                 // the body is fully read.
 189                 client.reference();
 190                 bodyReader.start(content.getBodyParser(
 191                     (t) -> {
 192                         try {
 193                             if (t != null) {
 194                                 pusher.onError(t);
 195                                 connection.close();
 196                                 if (!cf.isDone())
 197                                     cf.completeExceptionally(t);
 198                             }
 199                         } finally {
 200                             // decrement the reference count on the HttpClientImpl
 201                             // to allow the SelectorManager thread to exit if no
 202                             // other operation is pending and the facade is no
 203                             // longer referenced.
 204                             client.unreference();
 205                             bodyReader.onComplete(t);
 206                         }
 207                     }));
 208                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
 209                 asyncReceiver.subscribe(bodyReader);
 210                 assert bodyReaderCF != null : "parsing not started";
 211                 // Make sure to keep a reference to asyncReceiver from
 212                 // within this
 213                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
 214                     t = Utils.getCompletionCause(t);
 215                     try {
 216                         if (t != null) {
 217                             debug.log(Level.DEBUG, () ->
 218                                     "Finished reading body: " + s);
 219                             assert s == State.READING_BODY;
 220                         }
 221                         if (t != null && !cf.isDone()) {
 222                             pusher.onError(t);
 223                             cf.completeExceptionally(t);
 224                         }
 225                     } catch (Throwable x) {
 226                         // not supposed to happen
 227                         asyncReceiver.onReadError(x);
 228                     }
 229                 });
 230                 connection.addTrailingOperation(trailingOp);
 231             } catch (Throwable t) {
 232                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
 233                 try {
 234                     if (!cf.isDone()) {
 235                         pusher.onError(t);
 236                         cf.completeExceptionally(t);
 237                     }
 238                 } finally {
 239                     asyncReceiver.onReadError(t);
 240                 }
 241             }
 242         });
 243         return cf;
 244     }
 245 
 246 
 247     private void onFinished() {
 248         asyncReceiver.clear();
 249         if (return2Cache) {
 250             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
 251             // TODO: need to do something here?
 252             // connection.setAsyncCallbacks(null, null, null);
 253 
 254             // don't return the connection to the cache if EOF happened.
 255             debug.log(Level.DEBUG, () -> connection.getConnectionFlow()
 256                                    + ": return to HTTP/1.1 pool");
 257             connection.closeOrReturnToCache(eof == null ? headers : null);
 258         }
 259     }
 260 
 261     HttpHeaders responseHeaders() {
 262         return headers;
 263     }
 264 
 265     int responseCode() {
 266         return responseCode;
 267     }
 268 
 269 // ================ Support for plugging into Http1Receiver   =================
 270 // ============================================================================
 271 
 272     // Callback: Error receiver: Consumer of Throwable.
 273     void onReadError(Throwable t) {
 274         Log.logError(t);
 275         Receiver<?> receiver = receiver(readProgress);
 276         if (t instanceof EOFException) {
 277             debug.log(Level.DEBUG, "onReadError: received EOF");
 278             eof = (EOFException) t;
 279         }
 280         CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
 281         debug.log(Level.DEBUG, () -> "onReadError: cf is "
 282                 + (cf == null  ? "null"
 283                 : (cf.isDone() ? "already completed"
 284                                : "not yet completed")));
 285         if (cf != null && !cf.isDone()) cf.completeExceptionally(t);
 286         else { debug.log(Level.DEBUG, "onReadError", t); }
 287         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
 288         connection.close();
 289     }
 290 
 291     // ========================================================================
 292 
 293     private State advance(State previous) {
 294         assert readProgress == previous;
 295         switch(previous) {
 296             case READING_HEADERS:
 297                 asyncReceiver.unsubscribe(headersReader);
 298                 return readProgress = State.READING_BODY;
 299             case READING_BODY:
 300                 asyncReceiver.unsubscribe(bodyReader);
 301                 return readProgress = State.DONE;
 302             default:
 303                 throw new InternalError("can't advance from " + previous);
 304         }
 305     }
 306 
 307     Receiver<?> receiver(State state) {
 308         switch(state) {
 309             case READING_HEADERS: return headersReader;
 310             case READING_BODY: return bodyReader;
 311             default: return null;
 312         }
 313 
 314     }
 315 
 316     static abstract class Receiver<T>
 317             implements Http1AsyncReceiver.Http1AsyncDelegate {
 318         abstract void start(T parser);
 319         abstract CompletableFuture<State> completion();
 320         // accepts a buffer from upstream.
 321         // this should be implemented as a simple call to
 322         // accept(ref, parser, cf)
 323         public abstract boolean tryAsyncReceive(ByteBuffer buffer);
 324         public abstract void onReadError(Throwable t);
 325         // handle a byte buffer received from upstream.
 326         // this method should set the value of Http1Response.buffer
 327         // to ref.get() before beginning parsing.
 328         abstract void handle(ByteBuffer buf, T parser,
 329                              CompletableFuture<State> cf);
 330         // resets this objects state so that it can be reused later on
 331         // typically puts the reference to parser and completion to null
 332         abstract void reset();
 333 
 334         // accepts a byte buffer received from upstream
 335         // returns true if the buffer is fully parsed and more data can
 336         // be accepted, false otherwise.
 337         final boolean accept(ByteBuffer buf, T parser,
 338                 CompletableFuture<State> cf) {
 339             if (cf == null || parser == null || cf.isDone()) return false;
 340             handle(buf, parser, cf);
 341             return !cf.isDone();
 342         }
 343         public abstract void onSubscribe(AbstractSubscription s);
 344         public abstract AbstractSubscription subscription();
 345 
 346     }
 347 
 348     // Invoked with each new ByteBuffer when reading headers...
 349     final class HeadersReader extends Receiver<Http1HeaderParser> {
 350         final Consumer<State> onComplete;
 351         volatile Http1HeaderParser parser;
 352         volatile CompletableFuture<State> cf;
 353         volatile long count; // bytes parsed (for debug)
 354         volatile AbstractSubscription subscription;
 355 
 356         HeadersReader(Consumer<State> onComplete) {
 357             this.onComplete = onComplete;
 358         }
 359 
 360         @Override
 361         public AbstractSubscription subscription() {
 362             return subscription;
 363         }
 364 
 365         @Override
 366         public void onSubscribe(AbstractSubscription s) {
 367             this.subscription = s;
 368             s.request(1);
 369         }
 370 
 371         @Override
 372         void reset() {
 373             cf = null;
 374             parser = null;
 375             count = 0;
 376             subscription = null;
 377         }
 378 
 379         // Revisit: do we need to support restarting?
 380         @Override
 381         final void start(Http1HeaderParser hp) {
 382             count = 0;
 383             cf = new MinimalFuture<>();
 384             parser = hp;
 385         }
 386 
 387         @Override
 388         CompletableFuture<State> completion() {
 389             return cf;
 390         }
 391 
 392         @Override
 393         public final boolean tryAsyncReceive(ByteBuffer ref) {
 394             boolean hasDemand = subscription.demand().tryDecrement();
 395             assert hasDemand;
 396             boolean needsMore = accept(ref, parser, cf);
 397             if (needsMore) subscription.request(1);
 398             return needsMore;
 399         }
 400 
 401         @Override
 402         public final void onReadError(Throwable t) {
 403             Http1Response.this.onReadError(t);
 404         }
 405 
 406         @Override
 407         final void handle(ByteBuffer b,
 408                           Http1HeaderParser parser,
 409                           CompletableFuture<State> cf) {
 410             assert cf != null : "parsing not started";
 411             assert parser != null : "no parser";
 412             try {
 413                 count += b.remaining();
 414                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
 415                         + "/" + b.capacity() + " bytes to header parser");
 416                 if (parser.parse(b)) {
 417                     count -= b.remaining();
 418                     debug.log(Level.DEBUG, () ->
 419                             "Parsing headers completed. bytes=" + count);
 420                     onComplete.accept(State.READING_HEADERS);
 421                     cf.complete(State.READING_HEADERS);
 422                 }
 423             } catch (Throwable t) {
 424                 debug.log(Level.DEBUG,
 425                         () -> "Header parser failed to handle buffer: " + t);
 426                 cf.completeExceptionally(t);
 427             }
 428         }
 429     }
 430 
 431     // Invoked with each new ByteBuffer when reading bodies...
 432     final class BodyReader extends Receiver<BodyParser> {
 433         final Consumer<State> onComplete;
 434         volatile BodyParser parser;
 435         volatile CompletableFuture<State> cf;
 436         volatile AbstractSubscription subscription;
 437         BodyReader(Consumer<State> onComplete) {
 438             this.onComplete = onComplete;
 439         }
 440 
 441         @Override
 442         void reset() {
 443             parser = null;
 444             cf = null;
 445             subscription = null;
 446         }
 447 
 448         // Revisit: do we need to support restarting?
 449         @Override
 450         final void start(BodyParser parser) {
 451             cf = new MinimalFuture<>();
 452             this.parser = parser;
 453         }
 454 
 455         @Override
 456         CompletableFuture<State> completion() {
 457             return cf;
 458         }
 459 
 460         @Override
 461         public final boolean tryAsyncReceive(ByteBuffer b) {
 462             return accept(b, parser, cf);
 463         }
 464 
 465         @Override
 466         public final void onReadError(Throwable t) {
 467             Http1Response.this.onReadError(t);
 468         }
 469 
 470         @Override
 471         public AbstractSubscription subscription() {
 472             return subscription;
 473         }
 474 
 475         @Override
 476         public void onSubscribe(AbstractSubscription s) {
 477             this.subscription = s;
 478             parser.onSubscribe(s);
 479         }
 480 
 481         @Override
 482         final void handle(ByteBuffer b,
 483                           BodyParser parser,
 484                           CompletableFuture<State> cf) {
 485             assert cf != null : "parsing not started";
 486             assert parser != null : "no parser";
 487             try {
 488                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
 489                         + "/" + b.capacity() + " bytes to body parser");
 490                 parser.accept(b);
 491             } catch (Throwable t) {
 492                 debug.log(Level.DEBUG,
 493                         () -> "Body parser failed to handle buffer: " + t);
 494                 if (!cf.isDone()) {
 495                     cf.completeExceptionally(t);
 496                 }
 497             }
 498         }
 499 
 500         final void onComplete(Throwable closedExceptionally) {
 501             if (cf.isDone()) return;
 502             if (closedExceptionally != null) {
 503                 cf.completeExceptionally(closedExceptionally);
 504             } else {
 505                 onComplete.accept(State.READING_BODY);
 506                 cf.complete(State.READING_BODY);
 507             }
 508         }
 509 
 510         @Override
 511         public String toString() {
 512             return super.toString() + "/parser=" + String.valueOf(parser);
 513         }
 514 
 515     }
 516 }