< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java

Print this page

        

*** 23,91 **** * questions. */ package jdk.incubator.http; ! import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import jdk.incubator.http.internal.common.Log; import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; /** ! * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and ! * readBody(). There can be more than one of these per Http exchange. */ class Http1Response<T> { private volatile ResponseContent content; private final HttpRequestImpl request; private Response response; private final HttpConnection connection; ! private ResponseHeaders headers; private int responseCode; - private ByteBuffer buffer; private final Http1Exchange<T> exchange; - private final boolean redirecting; // redirecting private boolean return2Cache; // return connection to cache when finished ! ! Http1Response(HttpConnection conn, Http1Exchange<T> exchange) { this.request = exchange.request(); this.exchange = exchange; this.connection = conn; ! this.redirecting = false; ! buffer = exchange.getBuffer(); ! } ! ! @SuppressWarnings("unchecked") ! public void readHeaders() throws IOException { ! String statusline = readStatusLine(); ! if (statusline == null) { ! if (Log.errors()) { ! Log.logError("Connection closed. Retry"); ! } ! connection.close(); ! // connection was closed ! throw new IOException("Connection closed"); ! } ! if (!statusline.startsWith("HTTP/1.")) { ! throw new IOException("Invalid status line: " + statusline); ! } ! if (Log.trace()) { ! Log.logTrace("Statusline: {0}", statusline); ! } ! char c = statusline.charAt(7); ! responseCode = Integer.parseInt(statusline.substring(9, 12)); ! headers = new ResponseHeaders(connection, buffer); ! if (Log.headers()) { ! logHeaders(headers); } - response = new Response( - request, exchange.getExchange(), - headers, responseCode, HTTP_1_1); } private boolean finished; synchronized void completed() { --- 23,122 ---- * questions. */ package jdk.incubator.http; ! import java.io.EOFException; ! import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; + import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; + import java.util.function.BiConsumer; + import java.util.function.Consumer; + import java.util.function.Function; + import jdk.incubator.http.ResponseContent.BodyParser; import jdk.incubator.http.internal.common.Log; import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; + import jdk.incubator.http.internal.common.MinimalFuture; + import jdk.incubator.http.internal.common.Utils; /** ! * Handles a HTTP/1.1 response (headers + body). ! * There can be more than one of these per Http exchange. */ class Http1Response<T> { private volatile ResponseContent content; private final HttpRequestImpl request; private Response response; private final HttpConnection connection; ! private HttpHeaders headers; private int responseCode; private final Http1Exchange<T> exchange; private boolean return2Cache; // return connection to cache when finished ! private final HeadersReader headersReader; // used to read the headers ! private final BodyReader bodyReader; // used to read the body ! private final Http1AsyncReceiver asyncReceiver; ! private volatile EOFException eof; ! // max number of bytes of (fixed length) body to ignore on redirect ! private final static int MAX_IGNORE = 1024; ! ! // Revisit: can we get rid of this? ! static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} ! private volatile State readProgress = State.INITIAL; ! static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. ! final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG); ! ! ! Http1Response(HttpConnection conn, ! Http1Exchange<T> exchange, ! Http1AsyncReceiver asyncReceiver) { ! this.readProgress = State.INITIAL; this.request = exchange.request(); this.exchange = exchange; this.connection = conn; ! this.asyncReceiver = asyncReceiver; ! headersReader = new HeadersReader(this::advance); ! bodyReader = new BodyReader(this::advance); ! } ! ! public CompletableFuture<Response> readHeadersAsync(Executor executor) { ! debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " ! + asyncReceiver.remaining() +") " + readProgress); ! // with expect continue we will resume reading headers + body. ! asyncReceiver.unsubscribe(bodyReader); ! bodyReader.reset(); ! Http1HeaderParser hd = new Http1HeaderParser(); ! readProgress = State.READING_HEADERS; ! headersReader.start(hd); ! asyncReceiver.subscribe(headersReader); ! CompletableFuture<State> cf = headersReader.completion(); ! assert cf != null : "parsing not started"; ! ! Function<State, Response> lambda = (State completed) -> { ! assert completed == State.READING_HEADERS; ! debug.log(Level.DEBUG, () -> ! "Reading Headers: creating Response object;" ! + " state is now " + readProgress); ! asyncReceiver.unsubscribe(headersReader); ! responseCode = hd.responseCode(); ! headers = hd.headers(); ! ! response = new Response(request, ! exchange.getExchange(), ! headers, ! responseCode, ! HTTP_1_1); ! return response; ! }; ! if (executor != null) { ! return cf.thenApplyAsync(lambda, executor); ! } else { ! return cf.thenApply(lambda); } } private boolean finished; synchronized void completed() {
*** 94,107 **** synchronized boolean finished() { return finished; } - ByteBuffer getBuffer() { - return buffer; - } - int fixupContentLen(int clen) { if (request.method().equalsIgnoreCase("HEAD")) { return 0; } if (clen == -1) { --- 125,134 ----
*** 112,244 **** return 0; } return clen; } ! public CompletableFuture<T> readBody( ! HttpResponse.BodyProcessor<T> p, ! boolean return2Cache, ! Executor executor) { ! final BlockingPushPublisher<ByteBuffer> publisher = new BlockingPushPublisher<>(); ! return readBody(p, return2Cache, publisher, executor); } ! private CompletableFuture<T> readBody( ! HttpResponse.BodyProcessor<T> p, boolean return2Cache, - AbstractPushPublisher<ByteBuffer> publisher, Executor executor) { this.return2Cache = return2Cache; ! final jdk.incubator.http.HttpResponse.BodyProcessor<T> pusher = p; ! final CompletableFuture<T> cf = p.getBody().toCompletableFuture(); - int clen0; - try { - clen0 = headers.getContentLength(); - } catch (IOException ex) { - cf.completeExceptionally(ex); - return cf; - } final int clen = fixupContentLen(clen0); executor.execute(() -> { try { content = new ResponseContent( connection, clen, headers, pusher, ! publisher.asDataConsumer(), ! (t -> { ! publisher.acceptError(t); ! connection.close(); ! cf.completeExceptionally(t); ! }), ! () -> onFinished() ); - publisher.subscribe(p); if (cf.isCompletedExceptionally()) { // if an error occurs during subscription connection.close(); return; } ! content.pushBody(buffer); } catch (Throwable t) { cf.completeExceptionally(t); } }); return cf; } private void onFinished() { if (return2Cache) { ! Log.logTrace("Returning connection to the pool: {0}", connection); ! connection.returnToCache(headers); } } ! private void logHeaders(ResponseHeaders headers) { ! StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); ! Log.dumpHeaders(sb, " ", headers); ! Log.logHeaders(sb.toString()); } ! Response response() { ! return response; } ! boolean redirecting() { ! return redirecting; } ! HttpHeaders responseHeaders() { ! return headers; } ! int responseCode() { ! return responseCode; } ! static final char CR = '\r'; ! static final char LF = '\n'; ! private int obtainBuffer() throws IOException { ! int n = buffer.remaining(); ! if (n == 0) { ! buffer = connection.read(); ! if (buffer == null) { ! return -1; } ! n = buffer.remaining(); } ! return n; } ! String readStatusLine() throws IOException { ! boolean cr = false; ! StringBuilder statusLine = new StringBuilder(128); ! while ((obtainBuffer()) != -1) { ! byte[] buf = buffer.array(); ! int offset = buffer.position(); ! int len = buffer.limit() - offset; ! ! for (int i = 0; i < len; i++) { ! char c = (char) buf[i+offset]; ! ! if (cr) { ! if (c == LF) { ! buffer.position(i + 1 + offset); ! return statusLine.toString(); ! } else { ! throw new IOException("invalid status line"); } } ! if (c == CR) { ! cr = true; } else { ! statusLine.append(c); } } ! // unlikely, but possible, that multiple reads required ! buffer.position(buffer.limit()); } ! return null; } } --- 139,516 ---- return 0; } return clen; } ! /** ! * Read up to MAX_IGNORE bytes discarding ! */ ! public CompletableFuture<Void> ignoreBody(Executor executor) { ! int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); ! if (clen == -1 || clen > MAX_IGNORE) { ! connection.close(); ! return MinimalFuture.completedFuture(null); // not treating as error ! } else { ! return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor); ! } } ! public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, boolean return2Cache, Executor executor) { this.return2Cache = return2Cache; ! final HttpResponse.BodySubscriber<U> pusher = p; ! final CompletionStage<U> bodyCF = p.getBody(); ! final CompletableFuture<U> cf = MinimalFuture.of(bodyCF); ! ! int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); final int clen = fixupContentLen(clen0); + // expect-continue reads headers and body twice. + // if we reach here, we must reset the headersReader state. + asyncReceiver.unsubscribe(headersReader); + headersReader.reset(); + executor.execute(() -> { try { + HttpClientImpl client = connection.client(); content = new ResponseContent( connection, clen, headers, pusher, ! this::onFinished ); if (cf.isCompletedExceptionally()) { // if an error occurs during subscription connection.close(); return; } ! // increment the reference count on the HttpClientImpl ! // to prevent the SelectorManager thread from exiting until ! // the body is fully read. ! client.reference(); ! bodyReader.start(content.getBodyParser( ! (t) -> { ! try { ! if (t != null) { ! pusher.onError(t); ! connection.close(); ! if (!cf.isDone()) ! cf.completeExceptionally(t); ! } ! } finally { ! // decrement the reference count on the HttpClientImpl ! // to allow the SelectorManager thread to exit if no ! // other operation is pending and the facade is no ! // longer referenced. ! client.unreference(); ! bodyReader.onComplete(t); ! } ! })); ! CompletableFuture<State> bodyReaderCF = bodyReader.completion(); ! asyncReceiver.subscribe(bodyReader); ! assert bodyReaderCF != null : "parsing not started"; ! // Make sure to keep a reference to asyncReceiver from ! // within this ! CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { ! t = Utils.getCompletionCause(t); ! try { ! if (t != null) { ! debug.log(Level.DEBUG, () -> ! "Finished reading body: " + s); ! assert s == State.READING_BODY; ! } ! if (t != null && !cf.isDone()) { ! pusher.onError(t); ! cf.completeExceptionally(t); ! } ! } catch (Throwable x) { ! // not supposed to happen ! asyncReceiver.onReadError(x); ! } ! }); ! connection.addTrailingOperation(trailingOp); } catch (Throwable t) { + debug.log(Level.DEBUG, () -> "Failed reading body: " + t); + try { + if (!cf.isDone()) { + pusher.onError(t); cf.completeExceptionally(t); } + } finally { + asyncReceiver.onReadError(t); + } + } }); return cf; } + private void onFinished() { + asyncReceiver.clear(); if (return2Cache) { ! Log.logTrace("Attempting to return connection to the pool: {0}", connection); ! // TODO: need to do something here? ! // connection.setAsyncCallbacks(null, null, null); ! ! // don't return the connection to the cache if EOF happened. ! debug.log(Level.DEBUG, () -> connection.getConnectionFlow() ! + ": return to HTTP/1.1 pool"); ! connection.closeOrReturnToCache(eof == null ? headers : null); } } ! HttpHeaders responseHeaders() { ! return headers; } ! int responseCode() { ! return responseCode; } ! // ================ Support for plugging into Http1Receiver ================= ! // ============================================================================ ! ! // Callback: Error receiver: Consumer of Throwable. ! void onReadError(Throwable t) { ! Log.logError(t); ! Receiver<?> receiver = receiver(readProgress); ! if (t instanceof EOFException) { ! debug.log(Level.DEBUG, "onReadError: received EOF"); ! eof = (EOFException) t; ! } ! CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); ! debug.log(Level.DEBUG, () -> "onReadError: cf is " ! + (cf == null ? "null" ! : (cf.isDone() ? "already completed" ! : "not yet completed"))); ! if (cf != null && !cf.isDone()) cf.completeExceptionally(t); ! else { debug.log(Level.DEBUG, "onReadError", t); } ! debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); ! connection.close(); } ! // ======================================================================== ! ! private State advance(State previous) { ! assert readProgress == previous; ! switch(previous) { ! case READING_HEADERS: ! asyncReceiver.unsubscribe(headersReader); ! return readProgress = State.READING_BODY; ! case READING_BODY: ! asyncReceiver.unsubscribe(bodyReader); ! return readProgress = State.DONE; ! default: ! throw new InternalError("can't advance from " + previous); ! } ! } ! ! Receiver<?> receiver(State state) { ! switch(state) { ! case READING_HEADERS: return headersReader; ! case READING_BODY: return bodyReader; ! default: return null; ! } ! ! } ! ! static abstract class Receiver<T> ! implements Http1AsyncReceiver.Http1AsyncDelegate { ! abstract void start(T parser); ! abstract CompletableFuture<State> completion(); ! // accepts a buffer from upstream. ! // this should be implemented as a simple call to ! // accept(ref, parser, cf) ! public abstract boolean tryAsyncReceive(ByteBuffer buffer); ! public abstract void onReadError(Throwable t); ! // handle a byte buffer received from upstream. ! // this method should set the value of Http1Response.buffer ! // to ref.get() before beginning parsing. ! abstract void handle(ByteBuffer buf, T parser, ! CompletableFuture<State> cf); ! // resets this objects state so that it can be reused later on ! // typically puts the reference to parser and completion to null ! abstract void reset(); ! ! // accepts a byte buffer received from upstream ! // returns true if the buffer is fully parsed and more data can ! // be accepted, false otherwise. ! final boolean accept(ByteBuffer buf, T parser, ! CompletableFuture<State> cf) { ! if (cf == null || parser == null || cf.isDone()) return false; ! handle(buf, parser, cf); ! return !cf.isDone(); ! } ! public abstract void onSubscribe(AbstractSubscription s); ! public abstract AbstractSubscription subscription(); ! ! } ! ! // Invoked with each new ByteBuffer when reading headers... ! final class HeadersReader extends Receiver<Http1HeaderParser> { ! final Consumer<State> onComplete; ! volatile Http1HeaderParser parser; ! volatile CompletableFuture<State> cf; ! volatile long count; // bytes parsed (for debug) ! volatile AbstractSubscription subscription; ! ! HeadersReader(Consumer<State> onComplete) { ! this.onComplete = onComplete; ! } ! ! @Override ! public AbstractSubscription subscription() { ! return subscription; ! } ! ! @Override ! public void onSubscribe(AbstractSubscription s) { ! this.subscription = s; ! s.request(1); ! } ! ! @Override ! void reset() { ! cf = null; ! parser = null; ! count = 0; ! subscription = null; ! } ! ! // Revisit: do we need to support restarting? ! @Override ! final void start(Http1HeaderParser hp) { ! count = 0; ! cf = new MinimalFuture<>(); ! parser = hp; } ! @Override ! CompletableFuture<State> completion() { ! return cf; ! } ! ! @Override ! public final boolean tryAsyncReceive(ByteBuffer ref) { ! boolean hasDemand = subscription.demand().tryDecrement(); ! assert hasDemand; ! boolean needsMore = accept(ref, parser, cf); ! if (needsMore) subscription.request(1); ! return needsMore; ! } ! ! @Override ! public final void onReadError(Throwable t) { ! Http1Response.this.onReadError(t); ! } ! ! @Override ! final void handle(ByteBuffer b, ! Http1HeaderParser parser, ! CompletableFuture<State> cf) { ! assert cf != null : "parsing not started"; ! assert parser != null : "no parser"; ! try { ! count += b.remaining(); ! debug.log(Level.DEBUG, () -> "Sending " + b.remaining() ! + "/" + b.capacity() + " bytes to header parser"); ! if (parser.parse(b)) { ! count -= b.remaining(); ! debug.log(Level.DEBUG, () -> ! "Parsing headers completed. bytes=" + count); ! onComplete.accept(State.READING_HEADERS); ! cf.complete(State.READING_HEADERS); ! } ! } catch (Throwable t) { ! debug.log(Level.DEBUG, ! () -> "Header parser failed to handle buffer: " + t); ! cf.completeExceptionally(t); ! } ! } } ! // Invoked with each new ByteBuffer when reading bodies... ! final class BodyReader extends Receiver<BodyParser> { ! final Consumer<State> onComplete; ! volatile BodyParser parser; ! volatile CompletableFuture<State> cf; ! volatile AbstractSubscription subscription; ! BodyReader(Consumer<State> onComplete) { ! this.onComplete = onComplete; ! } ! ! @Override ! void reset() { ! parser = null; ! cf = null; ! subscription = null; ! } ! ! // Revisit: do we need to support restarting? ! @Override ! final void start(BodyParser parser) { ! cf = new MinimalFuture<>(); ! this.parser = parser; ! } ! @Override ! CompletableFuture<State> completion() { ! return cf; ! } ! @Override ! public final boolean tryAsyncReceive(ByteBuffer b) { ! return accept(b, parser, cf); } ! ! @Override ! public final void onReadError(Throwable t) { ! Http1Response.this.onReadError(t); } ! ! @Override ! public AbstractSubscription subscription() { ! return subscription; } ! @Override ! public void onSubscribe(AbstractSubscription s) { ! this.subscription = s; ! parser.onSubscribe(s); ! } ! ! @Override ! final void handle(ByteBuffer b, ! BodyParser parser, ! CompletableFuture<State> cf) { ! assert cf != null : "parsing not started"; ! assert parser != null : "no parser"; ! try { ! debug.log(Level.DEBUG, () -> "Sending " + b.remaining() ! + "/" + b.capacity() + " bytes to body parser"); ! parser.accept(b); ! } catch (Throwable t) { ! debug.log(Level.DEBUG, ! () -> "Body parser failed to handle buffer: " + t); ! if (!cf.isDone()) { ! cf.completeExceptionally(t); } } ! } ! ! final void onComplete(Throwable closedExceptionally) { ! if (cf.isDone()) return; ! if (closedExceptionally != null) { ! cf.completeExceptionally(closedExceptionally); } else { ! onComplete.accept(State.READING_BODY); ! cf.complete(State.READING_BODY); } } ! ! @Override ! public String toString() { ! return super.toString() + "/parser=" + String.valueOf(parser); } ! } }
< prev index next >