/* * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.incubator.http; import java.io.EOFException; import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import jdk.incubator.http.ResponseContent.BodyParser; import jdk.incubator.http.internal.common.Log; import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Utils; /** * Handles a HTTP/1.1 response (headers + body). * There can be more than one of these per Http exchange. */ class Http1Response { private volatile ResponseContent content; private final HttpRequestImpl request; private Response response; private final HttpConnection connection; private HttpHeaders headers; private int responseCode; private final Http1Exchange exchange; private boolean return2Cache; // return connection to cache when finished private final HeadersReader headersReader; // used to read the headers private final BodyReader bodyReader; // used to read the body private final Http1AsyncReceiver asyncReceiver; private volatile EOFException eof; // max number of bytes of (fixed length) body to ignore on redirect private final static int MAX_IGNORE = 1024; // Revisit: can we get rid of this? static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} private volatile State readProgress = State.INITIAL; static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG); Http1Response(HttpConnection conn, Http1Exchange exchange, Http1AsyncReceiver asyncReceiver) { this.readProgress = State.INITIAL; this.request = exchange.request(); this.exchange = exchange; this.connection = conn; this.asyncReceiver = asyncReceiver; headersReader = new HeadersReader(this::advance); bodyReader = new BodyReader(this::advance); } public CompletableFuture readHeadersAsync(Executor executor) { debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " + asyncReceiver.remaining() +") " + readProgress); // with expect continue we will resume reading headers + body. asyncReceiver.unsubscribe(bodyReader); bodyReader.reset(); Http1HeaderParser hd = new Http1HeaderParser(); readProgress = State.READING_HEADERS; headersReader.start(hd); asyncReceiver.subscribe(headersReader); CompletableFuture cf = headersReader.completion(); assert cf != null : "parsing not started"; Function lambda = (State completed) -> { assert completed == State.READING_HEADERS; debug.log(Level.DEBUG, () -> "Reading Headers: creating Response object;" + " state is now " + readProgress); asyncReceiver.unsubscribe(headersReader); responseCode = hd.responseCode(); headers = hd.headers(); response = new Response(request, exchange.getExchange(), headers, responseCode, HTTP_1_1); return response; }; if (executor != null) { return cf.thenApplyAsync(lambda, executor); } else { return cf.thenApply(lambda); } } private boolean finished; synchronized void completed() { finished = true; } synchronized boolean finished() { return finished; } int fixupContentLen(int clen) { if (request.method().equalsIgnoreCase("HEAD")) { return 0; } if (clen == -1) { if (headers.firstValue("Transfer-encoding").orElse("") .equalsIgnoreCase("chunked")) { return -1; } return 0; } return clen; } /** * Read up to MAX_IGNORE bytes discarding */ public CompletableFuture ignoreBody(Executor executor) { int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); if (clen == -1 || clen > MAX_IGNORE) { connection.close(); return MinimalFuture.completedFuture(null); // not treating as error } else { return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor); } } public CompletableFuture readBody(HttpResponse.BodySubscriber p, boolean return2Cache, Executor executor) { this.return2Cache = return2Cache; final HttpResponse.BodySubscriber pusher = p; final CompletionStage bodyCF = p.getBody(); final CompletableFuture cf = MinimalFuture.of(bodyCF); int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); final int clen = fixupContentLen(clen0); // expect-continue reads headers and body twice. // if we reach here, we must reset the headersReader state. asyncReceiver.unsubscribe(headersReader); headersReader.reset(); executor.execute(() -> { try { HttpClientImpl client = connection.client(); content = new ResponseContent( connection, clen, headers, pusher, this::onFinished ); if (cf.isCompletedExceptionally()) { // if an error occurs during subscription connection.close(); return; } // increment the reference count on the HttpClientImpl // to prevent the SelectorManager thread from exiting until // the body is fully read. client.reference(); bodyReader.start(content.getBodyParser( (t) -> { try { if (t != null) { pusher.onError(t); connection.close(); if (!cf.isDone()) cf.completeExceptionally(t); } } finally { // decrement the reference count on the HttpClientImpl // to allow the SelectorManager thread to exit if no // other operation is pending and the facade is no // longer referenced. client.unreference(); bodyReader.onComplete(t); } })); CompletableFuture bodyReaderCF = bodyReader.completion(); asyncReceiver.subscribe(bodyReader); assert bodyReaderCF != null : "parsing not started"; // Make sure to keep a reference to asyncReceiver from // within this CompletableFuture trailingOp = bodyReaderCF.whenComplete((s,t) -> { t = Utils.getCompletionCause(t); try { if (t != null) { debug.log(Level.DEBUG, () -> "Finished reading body: " + s); assert s == State.READING_BODY; } if (t != null && !cf.isDone()) { pusher.onError(t); cf.completeExceptionally(t); } } catch (Throwable x) { // not supposed to happen asyncReceiver.onReadError(x); } }); connection.addTrailingOperation(trailingOp); } catch (Throwable t) { debug.log(Level.DEBUG, () -> "Failed reading body: " + t); try { if (!cf.isDone()) { pusher.onError(t); cf.completeExceptionally(t); } } finally { asyncReceiver.onReadError(t); } } }); return cf; } private void onFinished() { asyncReceiver.clear(); if (return2Cache) { Log.logTrace("Attempting to return connection to the pool: {0}", connection); // TODO: need to do something here? // connection.setAsyncCallbacks(null, null, null); // don't return the connection to the cache if EOF happened. debug.log(Level.DEBUG, () -> connection.getConnectionFlow() + ": return to HTTP/1.1 pool"); connection.closeOrReturnToCache(eof == null ? headers : null); } } HttpHeaders responseHeaders() { return headers; } int responseCode() { return responseCode; } // ================ Support for plugging into Http1Receiver ================= // ============================================================================ // Callback: Error receiver: Consumer of Throwable. void onReadError(Throwable t) { Log.logError(t); Receiver receiver = receiver(readProgress); if (t instanceof EOFException) { debug.log(Level.DEBUG, "onReadError: received EOF"); eof = (EOFException) t; } CompletableFuture cf = receiver == null ? null : receiver.completion(); debug.log(Level.DEBUG, () -> "onReadError: cf is " + (cf == null ? "null" : (cf.isDone() ? "already completed" : "not yet completed"))); if (cf != null && !cf.isDone()) cf.completeExceptionally(t); else { debug.log(Level.DEBUG, "onReadError", t); } debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); connection.close(); } // ======================================================================== private State advance(State previous) { assert readProgress == previous; switch(previous) { case READING_HEADERS: asyncReceiver.unsubscribe(headersReader); return readProgress = State.READING_BODY; case READING_BODY: asyncReceiver.unsubscribe(bodyReader); return readProgress = State.DONE; default: throw new InternalError("can't advance from " + previous); } } Receiver receiver(State state) { switch(state) { case READING_HEADERS: return headersReader; case READING_BODY: return bodyReader; default: return null; } } static abstract class Receiver implements Http1AsyncReceiver.Http1AsyncDelegate { abstract void start(T parser); abstract CompletableFuture completion(); // accepts a buffer from upstream. // this should be implemented as a simple call to // accept(ref, parser, cf) public abstract boolean tryAsyncReceive(ByteBuffer buffer); public abstract void onReadError(Throwable t); // handle a byte buffer received from upstream. // this method should set the value of Http1Response.buffer // to ref.get() before beginning parsing. abstract void handle(ByteBuffer buf, T parser, CompletableFuture cf); // resets this objects state so that it can be reused later on // typically puts the reference to parser and completion to null abstract void reset(); // accepts a byte buffer received from upstream // returns true if the buffer is fully parsed and more data can // be accepted, false otherwise. final boolean accept(ByteBuffer buf, T parser, CompletableFuture cf) { if (cf == null || parser == null || cf.isDone()) return false; handle(buf, parser, cf); return !cf.isDone(); } public abstract void onSubscribe(AbstractSubscription s); public abstract AbstractSubscription subscription(); } // Invoked with each new ByteBuffer when reading headers... final class HeadersReader extends Receiver { final Consumer onComplete; volatile Http1HeaderParser parser; volatile CompletableFuture cf; volatile long count; // bytes parsed (for debug) volatile AbstractSubscription subscription; HeadersReader(Consumer onComplete) { this.onComplete = onComplete; } @Override public AbstractSubscription subscription() { return subscription; } @Override public void onSubscribe(AbstractSubscription s) { this.subscription = s; s.request(1); } @Override void reset() { cf = null; parser = null; count = 0; subscription = null; } // Revisit: do we need to support restarting? @Override final void start(Http1HeaderParser hp) { count = 0; cf = new MinimalFuture<>(); parser = hp; } @Override CompletableFuture completion() { return cf; } @Override public final boolean tryAsyncReceive(ByteBuffer ref) { boolean hasDemand = subscription.demand().tryDecrement(); assert hasDemand; boolean needsMore = accept(ref, parser, cf); if (needsMore) subscription.request(1); return needsMore; } @Override public final void onReadError(Throwable t) { Http1Response.this.onReadError(t); } @Override final void handle(ByteBuffer b, Http1HeaderParser parser, CompletableFuture cf) { assert cf != null : "parsing not started"; assert parser != null : "no parser"; try { count += b.remaining(); debug.log(Level.DEBUG, () -> "Sending " + b.remaining() + "/" + b.capacity() + " bytes to header parser"); if (parser.parse(b)) { count -= b.remaining(); debug.log(Level.DEBUG, () -> "Parsing headers completed. bytes=" + count); onComplete.accept(State.READING_HEADERS); cf.complete(State.READING_HEADERS); } } catch (Throwable t) { debug.log(Level.DEBUG, () -> "Header parser failed to handle buffer: " + t); cf.completeExceptionally(t); } } } // Invoked with each new ByteBuffer when reading bodies... final class BodyReader extends Receiver { final Consumer onComplete; volatile BodyParser parser; volatile CompletableFuture cf; volatile AbstractSubscription subscription; BodyReader(Consumer onComplete) { this.onComplete = onComplete; } @Override void reset() { parser = null; cf = null; subscription = null; } // Revisit: do we need to support restarting? @Override final void start(BodyParser parser) { cf = new MinimalFuture<>(); this.parser = parser; } @Override CompletableFuture completion() { return cf; } @Override public final boolean tryAsyncReceive(ByteBuffer b) { return accept(b, parser, cf); } @Override public final void onReadError(Throwable t) { Http1Response.this.onReadError(t); } @Override public AbstractSubscription subscription() { return subscription; } @Override public void onSubscribe(AbstractSubscription s) { this.subscription = s; parser.onSubscribe(s); } @Override final void handle(ByteBuffer b, BodyParser parser, CompletableFuture cf) { assert cf != null : "parsing not started"; assert parser != null : "no parser"; try { debug.log(Level.DEBUG, () -> "Sending " + b.remaining() + "/" + b.capacity() + " bytes to body parser"); parser.accept(b); } catch (Throwable t) { debug.log(Level.DEBUG, () -> "Body parser failed to handle buffer: " + t); if (!cf.isDone()) { cf.completeExceptionally(t); } } } final void onComplete(Throwable closedExceptionally) { if (cf.isDone()) return; if (closedExceptionally != null) { cf.completeExceptionally(closedExceptionally); } else { onComplete.accept(State.READING_BODY); cf.complete(State.READING_BODY); } } @Override public String toString() { return super.toString() + "/parser=" + String.valueOf(parser); } } }