--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java 2017-11-30 04:03:55.480746479 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java 2017-11-30 04:03:55.279728906 -0800 @@ -25,16 +25,24 @@ package jdk.incubator.http; -import java.io.IOException; +import java.io.EOFException; +import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import jdk.incubator.http.ResponseContent.BodyParser; import jdk.incubator.http.internal.common.Log; import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; +import jdk.incubator.http.internal.common.MinimalFuture; +import jdk.incubator.http.internal.common.Utils; /** - * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and - * readBody(). There can be more than one of these per Http exchange. + * Handles a HTTP/1.1 response (headers + body). + * There can be more than one of these per Http exchange. */ class Http1Response { @@ -42,48 +50,71 @@ private final HttpRequestImpl request; private Response response; private final HttpConnection connection; - private ResponseHeaders headers; + private HttpHeaders headers; private int responseCode; - private ByteBuffer buffer; private final Http1Exchange exchange; - private final boolean redirecting; // redirecting private boolean return2Cache; // return connection to cache when finished - - Http1Response(HttpConnection conn, Http1Exchange exchange) { + private final HeadersReader headersReader; // used to read the headers + private final BodyReader bodyReader; // used to read the body + private final Http1AsyncReceiver asyncReceiver; + private volatile EOFException eof; + // max number of bytes of (fixed length) body to ignore on redirect + private final static int MAX_IGNORE = 1024; + + // Revisit: can we get rid of this? + static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} + private volatile State readProgress = State.INITIAL; + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG); + + + Http1Response(HttpConnection conn, + Http1Exchange exchange, + Http1AsyncReceiver asyncReceiver) { + this.readProgress = State.INITIAL; this.request = exchange.request(); this.exchange = exchange; this.connection = conn; - this.redirecting = false; - buffer = exchange.getBuffer(); - } - - @SuppressWarnings("unchecked") - public void readHeaders() throws IOException { - String statusline = readStatusLine(); - if (statusline == null) { - if (Log.errors()) { - Log.logError("Connection closed. Retry"); - } - connection.close(); - // connection was closed - throw new IOException("Connection closed"); - } - if (!statusline.startsWith("HTTP/1.")) { - throw new IOException("Invalid status line: " + statusline); + this.asyncReceiver = asyncReceiver; + headersReader = new HeadersReader(this::advance); + bodyReader = new BodyReader(this::advance); + } + + public CompletableFuture readHeadersAsync(Executor executor) { + debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " + + asyncReceiver.remaining() +") " + readProgress); + // with expect continue we will resume reading headers + body. + asyncReceiver.unsubscribe(bodyReader); + bodyReader.reset(); + Http1HeaderParser hd = new Http1HeaderParser(); + readProgress = State.READING_HEADERS; + headersReader.start(hd); + asyncReceiver.subscribe(headersReader); + CompletableFuture cf = headersReader.completion(); + assert cf != null : "parsing not started"; + + Function lambda = (State completed) -> { + assert completed == State.READING_HEADERS; + debug.log(Level.DEBUG, () -> + "Reading Headers: creating Response object;" + + " state is now " + readProgress); + asyncReceiver.unsubscribe(headersReader); + responseCode = hd.responseCode(); + headers = hd.headers(); + + response = new Response(request, + exchange.getExchange(), + headers, + responseCode, + HTTP_1_1); + return response; + }; + + if (executor != null) { + return cf.thenApplyAsync(lambda, executor); + } else { + return cf.thenApply(lambda); } - if (Log.trace()) { - Log.logTrace("Statusline: {0}", statusline); - } - char c = statusline.charAt(7); - responseCode = Integer.parseInt(statusline.substring(9, 12)); - - headers = new ResponseHeaders(connection, buffer); - if (Log.headers()) { - logHeaders(headers); - } - response = new Response( - request, exchange.getExchange(), - headers, responseCode, HTTP_1_1); } private boolean finished; @@ -96,10 +127,6 @@ return finished; } - ByteBuffer getBuffer() { - return buffer; - } - int fixupContentLen(int clen) { if (request.method().equalsIgnoreCase("HEAD")) { return 0; @@ -114,131 +141,376 @@ return clen; } - public CompletableFuture readBody( - HttpResponse.BodyProcessor p, - boolean return2Cache, - Executor executor) { - final BlockingPushPublisher publisher = new BlockingPushPublisher<>(); - return readBody(p, return2Cache, publisher, executor); + /** + * Read up to MAX_IGNORE bytes discarding + */ + public CompletableFuture ignoreBody(Executor executor) { + int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); + if (clen == -1 || clen > MAX_IGNORE) { + connection.close(); + return MinimalFuture.completedFuture(null); // not treating as error + } else { + return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor); + } } - private CompletableFuture readBody( - HttpResponse.BodyProcessor p, - boolean return2Cache, - AbstractPushPublisher publisher, - Executor executor) { + public CompletableFuture readBody(HttpResponse.BodySubscriber p, + boolean return2Cache, + Executor executor) { this.return2Cache = return2Cache; - final jdk.incubator.http.HttpResponse.BodyProcessor pusher = p; - final CompletableFuture cf = p.getBody().toCompletableFuture(); + final HttpResponse.BodySubscriber pusher = p; + final CompletionStage bodyCF = p.getBody(); + final CompletableFuture cf = MinimalFuture.of(bodyCF); + + int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); - int clen0; - try { - clen0 = headers.getContentLength(); - } catch (IOException ex) { - cf.completeExceptionally(ex); - return cf; - } final int clen = fixupContentLen(clen0); + // expect-continue reads headers and body twice. + // if we reach here, we must reset the headersReader state. + asyncReceiver.unsubscribe(headersReader); + headersReader.reset(); + executor.execute(() -> { try { + HttpClientImpl client = connection.client(); content = new ResponseContent( connection, clen, headers, pusher, - publisher.asDataConsumer(), - (t -> { - publisher.acceptError(t); - connection.close(); - cf.completeExceptionally(t); - }), - () -> onFinished() + this::onFinished ); - publisher.subscribe(p); if (cf.isCompletedExceptionally()) { // if an error occurs during subscription connection.close(); return; } - content.pushBody(buffer); + // increment the reference count on the HttpClientImpl + // to prevent the SelectorManager thread from exiting until + // the body is fully read. + client.reference(); + bodyReader.start(content.getBodyParser( + (t) -> { + try { + if (t != null) { + pusher.onError(t); + connection.close(); + if (!cf.isDone()) + cf.completeExceptionally(t); + } + } finally { + // decrement the reference count on the HttpClientImpl + // to allow the SelectorManager thread to exit if no + // other operation is pending and the facade is no + // longer referenced. + client.unreference(); + bodyReader.onComplete(t); + } + })); + CompletableFuture bodyReaderCF = bodyReader.completion(); + asyncReceiver.subscribe(bodyReader); + assert bodyReaderCF != null : "parsing not started"; + // Make sure to keep a reference to asyncReceiver from + // within this + CompletableFuture trailingOp = bodyReaderCF.whenComplete((s,t) -> { + t = Utils.getCompletionCause(t); + try { + if (t != null) { + debug.log(Level.DEBUG, () -> + "Finished reading body: " + s); + assert s == State.READING_BODY; + } + if (t != null && !cf.isDone()) { + pusher.onError(t); + cf.completeExceptionally(t); + } + } catch (Throwable x) { + // not supposed to happen + asyncReceiver.onReadError(x); + } + }); + connection.addTrailingOperation(trailingOp); } catch (Throwable t) { - cf.completeExceptionally(t); + debug.log(Level.DEBUG, () -> "Failed reading body: " + t); + try { + if (!cf.isDone()) { + pusher.onError(t); + cf.completeExceptionally(t); + } + } finally { + asyncReceiver.onReadError(t); + } } }); return cf; } + private void onFinished() { + asyncReceiver.clear(); if (return2Cache) { - Log.logTrace("Returning connection to the pool: {0}", connection); - connection.returnToCache(headers); + Log.logTrace("Attempting to return connection to the pool: {0}", connection); + // TODO: need to do something here? + // connection.setAsyncCallbacks(null, null, null); + + // don't return the connection to the cache if EOF happened. + debug.log(Level.DEBUG, () -> connection.getConnectionFlow() + + ": return to HTTP/1.1 pool"); + connection.closeOrReturnToCache(eof == null ? headers : null); } } - private void logHeaders(ResponseHeaders headers) { - StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); - Log.dumpHeaders(sb, " ", headers); - Log.logHeaders(sb.toString()); + HttpHeaders responseHeaders() { + return headers; } - Response response() { - return response; + int responseCode() { + return responseCode; } - boolean redirecting() { - return redirecting; +// ================ Support for plugging into Http1Receiver ================= +// ============================================================================ + + // Callback: Error receiver: Consumer of Throwable. + void onReadError(Throwable t) { + Log.logError(t); + Receiver receiver = receiver(readProgress); + if (t instanceof EOFException) { + debug.log(Level.DEBUG, "onReadError: received EOF"); + eof = (EOFException) t; + } + CompletableFuture cf = receiver == null ? null : receiver.completion(); + debug.log(Level.DEBUG, () -> "onReadError: cf is " + + (cf == null ? "null" + : (cf.isDone() ? "already completed" + : "not yet completed"))); + if (cf != null && !cf.isDone()) cf.completeExceptionally(t); + else { debug.log(Level.DEBUG, "onReadError", t); } + debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); + connection.close(); + } + + // ======================================================================== + + private State advance(State previous) { + assert readProgress == previous; + switch(previous) { + case READING_HEADERS: + asyncReceiver.unsubscribe(headersReader); + return readProgress = State.READING_BODY; + case READING_BODY: + asyncReceiver.unsubscribe(bodyReader); + return readProgress = State.DONE; + default: + throw new InternalError("can't advance from " + previous); + } } - HttpHeaders responseHeaders() { - return headers; + Receiver receiver(State state) { + switch(state) { + case READING_HEADERS: return headersReader; + case READING_BODY: return bodyReader; + default: return null; + } + } - int responseCode() { - return responseCode; + static abstract class Receiver + implements Http1AsyncReceiver.Http1AsyncDelegate { + abstract void start(T parser); + abstract CompletableFuture completion(); + // accepts a buffer from upstream. + // this should be implemented as a simple call to + // accept(ref, parser, cf) + public abstract boolean tryAsyncReceive(ByteBuffer buffer); + public abstract void onReadError(Throwable t); + // handle a byte buffer received from upstream. + // this method should set the value of Http1Response.buffer + // to ref.get() before beginning parsing. + abstract void handle(ByteBuffer buf, T parser, + CompletableFuture cf); + // resets this objects state so that it can be reused later on + // typically puts the reference to parser and completion to null + abstract void reset(); + + // accepts a byte buffer received from upstream + // returns true if the buffer is fully parsed and more data can + // be accepted, false otherwise. + final boolean accept(ByteBuffer buf, T parser, + CompletableFuture cf) { + if (cf == null || parser == null || cf.isDone()) return false; + handle(buf, parser, cf); + return !cf.isDone(); + } + public abstract void onSubscribe(AbstractSubscription s); + public abstract AbstractSubscription subscription(); + } - static final char CR = '\r'; - static final char LF = '\n'; + // Invoked with each new ByteBuffer when reading headers... + final class HeadersReader extends Receiver { + final Consumer onComplete; + volatile Http1HeaderParser parser; + volatile CompletableFuture cf; + volatile long count; // bytes parsed (for debug) + volatile AbstractSubscription subscription; - private int obtainBuffer() throws IOException { - int n = buffer.remaining(); + HeadersReader(Consumer onComplete) { + this.onComplete = onComplete; + } - if (n == 0) { - buffer = connection.read(); - if (buffer == null) { - return -1; + @Override + public AbstractSubscription subscription() { + return subscription; + } + + @Override + public void onSubscribe(AbstractSubscription s) { + this.subscription = s; + s.request(1); + } + + @Override + void reset() { + cf = null; + parser = null; + count = 0; + subscription = null; + } + + // Revisit: do we need to support restarting? + @Override + final void start(Http1HeaderParser hp) { + count = 0; + cf = new MinimalFuture<>(); + parser = hp; + } + + @Override + CompletableFuture completion() { + return cf; + } + + @Override + public final boolean tryAsyncReceive(ByteBuffer ref) { + boolean hasDemand = subscription.demand().tryDecrement(); + assert hasDemand; + boolean needsMore = accept(ref, parser, cf); + if (needsMore) subscription.request(1); + return needsMore; + } + + @Override + public final void onReadError(Throwable t) { + Http1Response.this.onReadError(t); + } + + @Override + final void handle(ByteBuffer b, + Http1HeaderParser parser, + CompletableFuture cf) { + assert cf != null : "parsing not started"; + assert parser != null : "no parser"; + try { + count += b.remaining(); + debug.log(Level.DEBUG, () -> "Sending " + b.remaining() + + "/" + b.capacity() + " bytes to header parser"); + if (parser.parse(b)) { + count -= b.remaining(); + debug.log(Level.DEBUG, () -> + "Parsing headers completed. bytes=" + count); + onComplete.accept(State.READING_HEADERS); + cf.complete(State.READING_HEADERS); + } + } catch (Throwable t) { + debug.log(Level.DEBUG, + () -> "Header parser failed to handle buffer: " + t); + cf.completeExceptionally(t); } - n = buffer.remaining(); } - return n; } - String readStatusLine() throws IOException { - boolean cr = false; - StringBuilder statusLine = new StringBuilder(128); - while ((obtainBuffer()) != -1) { - byte[] buf = buffer.array(); - int offset = buffer.position(); - int len = buffer.limit() - offset; - - for (int i = 0; i < len; i++) { - char c = (char) buf[i+offset]; - - if (cr) { - if (c == LF) { - buffer.position(i + 1 + offset); - return statusLine.toString(); - } else { - throw new IOException("invalid status line"); - } - } - if (c == CR) { - cr = true; - } else { - statusLine.append(c); + // Invoked with each new ByteBuffer when reading bodies... + final class BodyReader extends Receiver { + final Consumer onComplete; + volatile BodyParser parser; + volatile CompletableFuture cf; + volatile AbstractSubscription subscription; + BodyReader(Consumer onComplete) { + this.onComplete = onComplete; + } + + @Override + void reset() { + parser = null; + cf = null; + subscription = null; + } + + // Revisit: do we need to support restarting? + @Override + final void start(BodyParser parser) { + cf = new MinimalFuture<>(); + this.parser = parser; + } + + @Override + CompletableFuture completion() { + return cf; + } + + @Override + public final boolean tryAsyncReceive(ByteBuffer b) { + return accept(b, parser, cf); + } + + @Override + public final void onReadError(Throwable t) { + Http1Response.this.onReadError(t); + } + + @Override + public AbstractSubscription subscription() { + return subscription; + } + + @Override + public void onSubscribe(AbstractSubscription s) { + this.subscription = s; + parser.onSubscribe(s); + } + + @Override + final void handle(ByteBuffer b, + BodyParser parser, + CompletableFuture cf) { + assert cf != null : "parsing not started"; + assert parser != null : "no parser"; + try { + debug.log(Level.DEBUG, () -> "Sending " + b.remaining() + + "/" + b.capacity() + " bytes to body parser"); + parser.accept(b); + } catch (Throwable t) { + debug.log(Level.DEBUG, + () -> "Body parser failed to handle buffer: " + t); + if (!cf.isDone()) { + cf.completeExceptionally(t); } } - // unlikely, but possible, that multiple reads required - buffer.position(buffer.limit()); } - return null; + + final void onComplete(Throwable closedExceptionally) { + if (cf.isDone()) return; + if (closedExceptionally != null) { + cf.completeExceptionally(closedExceptionally); + } else { + onComplete.accept(State.READING_BODY); + cf.complete(State.READING_BODY); + } + } + + @Override + public String toString() { + return super.toString() + "/parser=" + String.valueOf(parser); + } + } }