--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java 2017-11-30 04:04:11.370135527 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java 2017-11-30 04:04:11.106112446 -0800 @@ -26,21 +26,21 @@ package jdk.incubator.http; import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; - +import java.util.concurrent.atomic.AtomicReference; +import jdk.incubator.http.HttpResponse.BodySubscriber; import jdk.incubator.http.internal.common.*; import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.DecodingCallback; @@ -54,10 +54,6 @@ * * sendRequest() -- sendHeadersOnly() + sendBody() * - * sendBody() -- in calling thread: obeys all flow control (so may block) - * obtains data from request body processor and places on connection - * outbound Q. - * * sendBodyAsync() -- calls sendBody() in an executor thread. * * sendHeadersAsync() -- calls sendHeadersOnly() which does not block @@ -77,10 +73,6 @@ * * getResponse() -- calls getResponseAsync() and waits for CF to complete * - * responseBody() -- in calling thread: blocks for incoming DATA frames on - * stream inputQ. Obeys remote and local flow control so may block. - * Calls user response body processor with data buffers. - * * responseBodyAsync() -- calls responseBody() in an executor thread. * * incoming() -- entry point called from connection reader thread. Frames are @@ -98,7 +90,13 @@ */ class Stream extends ExchangeImpl { - final AsyncDataReadQueue inputQ = new AsyncDataReadQueue(); + final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + + final ConcurrentLinkedQueue inputQ = new ConcurrentLinkedQueue<>(); + final SequentialScheduler sched = + SequentialScheduler.synchronizedScheduler(this::schedule); + final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel); /** * This stream's identifier. Assigned lazily by the HTTP2Connection before @@ -106,24 +104,21 @@ */ protected volatile int streamid; - long responseContentLen = -1; - long responseBytesProcessed = 0; long requestContentLen; final Http2Connection connection; - HttpClientImpl client; final HttpRequestImpl request; final DecodingCallback rspHeadersConsumer; HttpHeadersImpl responseHeaders; - final HttpHeadersImpl requestHeaders; final HttpHeadersImpl requestPseudoHeaders; - HttpResponse.BodyProcessor responseProcessor; - final HttpRequest.BodyProcessor requestProcessor; + volatile HttpResponse.BodySubscriber responseSubscriber; + final HttpRequest.BodyPublisher requestPublisher; + volatile RequestSubscriber requestSubscriber; volatile int responseCode; volatile Response response; - volatile CompletableFuture responseCF; - final AbstractPushPublisher publisher; + volatile Throwable failed; // The exception with which this stream was canceled. final CompletableFuture requestBodyCF = new MinimalFuture<>(); + volatile CompletableFuture responseBodyCF; /** True if END_STREAM has been seen in a frame received on this stream. */ private volatile boolean remotelyClosed; @@ -131,7 +126,7 @@ private volatile boolean endStreamSent; // state flags - boolean requestSent, responseReceived, responseHeadersReceived; + private boolean requestSent, responseReceived; /** * A reference to this Stream's connection Send Window controller. The @@ -146,15 +141,88 @@ return connection.connection; } + /** + * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } + * of after user subscription window has re-opened, from SubscriptionBase.request() + */ + private void schedule() { + if (responseSubscriber == null) + // can't process anything yet + return; + + while (!inputQ.isEmpty()) { + Http2Frame frame = inputQ.peek(); + if (frame instanceof ResetFrame) { + inputQ.remove(); + handleReset((ResetFrame)frame); + return; + } + DataFrame df = (DataFrame)frame; + boolean finished = df.getFlag(DataFrame.END_STREAM); + + List buffers = df.getData(); + List dsts = Collections.unmodifiableList(buffers); + int size = Utils.remaining(dsts, Integer.MAX_VALUE); + if (size == 0 && finished) { + inputQ.remove(); + Log.logTrace("responseSubscriber.onComplete"); + debug.log(Level.DEBUG, "incoming: onComplete"); + sched.stop(); + responseSubscriber.onComplete(); + setEndStreamReceived(); + return; + } else if (userSubscription.tryDecrement()) { + inputQ.remove(); + Log.logTrace("responseSubscriber.onNext {0}", size); + debug.log(Level.DEBUG, "incoming: onNext(%d)", size); + responseSubscriber.onNext(dsts); + if (consumed(df)) { + Log.logTrace("responseSubscriber.onComplete"); + debug.log(Level.DEBUG, "incoming: onComplete"); + sched.stop(); + responseSubscriber.onComplete(); + setEndStreamReceived(); + return; + } + } else { + return; + } + } + Throwable t = failed; + if (t != null) { + sched.stop(); + responseSubscriber.onError(t); + close(); + } + } + + // Callback invoked after the Response BodySubscriber has consumed the + // buffers contained in a DataFrame. + // Returns true if END_STREAM is reached, false otherwise. + private boolean consumed(DataFrame df) { + // RFC 7540 6.1: + // The entire DATA frame payload is included in flow control, + // including the Pad Length and Padding fields if present + int len = df.payloadLength(); + connection.windowUpdater.update(len); + + if (!df.getFlag(DataFrame.END_STREAM)) { + // Don't send window update on a stream which is + // closed or half closed. + windowUpdater.update(len); + return false; // more data coming + } + return true; // end of stream + } + @Override CompletableFuture readBodyAsync(HttpResponse.BodyHandler handler, boolean returnConnectionToPool, Executor executor) { Log.logTrace("Reading body on stream {0}", streamid); - responseProcessor = handler.apply(responseCode, responseHeaders); - publisher.subscribe(responseProcessor); - CompletableFuture cf = receiveData(executor); + BodySubscriber bodySubscriber = handler.apply(responseCode, responseHeaders); + CompletableFuture cf = receiveData(bodySubscriber); PushGroup pg = exchange.getPushGroup(); if (pg != null) { @@ -165,20 +233,6 @@ } @Override - T readBody(HttpResponse.BodyHandler handler, boolean returnConnectionToPool) - throws IOException - { - CompletableFuture cf = readBodyAsync(handler, - returnConnectionToPool, - null); - try { - return cf.join(); - } catch (CompletionException e) { - throw Utils.getIOException(e); - } - } - - @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("streamid: ") @@ -186,85 +240,51 @@ return sb.toString(); } - private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { - if (frame instanceof ResetFrame) { - handleReset((ResetFrame) frame); - return true; - } else if (!(frame instanceof DataFrame)) { - assert false; - return true; - } - DataFrame df = (DataFrame) frame; - // RFC 7540 6.1: - // The entire DATA frame payload is included in flow control, - // including the Pad Length and Padding fields if present - int len = df.payloadLength(); - ByteBufferReference[] buffers = df.getData(); - for (ByteBufferReference b : buffers) { - ByteBuffer buf = b.get(); - if (buf.hasRemaining()) { - publisher.acceptData(Optional.of(buf)); - } - } - connection.windowUpdater.update(len); - if (df.getFlag(DataFrame.END_STREAM)) { - setEndStreamReceived(); - publisher.acceptData(Optional.empty()); - return false; - } - // Don't send window update on a stream which is - // closed or half closed. - windowUpdater.update(len); - return true; + private void receiveDataFrame(DataFrame df) { + inputQ.add(df); + sched.runOrSchedule(); + } + + /** Handles a RESET frame. RESET is always handled inline in the queue. */ + private void receiveResetFrame(ResetFrame frame) { + inputQ.add(frame); + sched.runOrSchedule(); } - // pushes entire response body into response processor + // pushes entire response body into response subscriber // blocking when required by local or remote flow control - CompletableFuture receiveData(Executor executor) { - CompletableFuture cf = responseProcessor - .getBody() - .toCompletableFuture(); - Consumer onError = e -> { - Log.logTrace("receiveData: {0}", e.toString()); - e.printStackTrace(); - cf.completeExceptionally(e); - publisher.acceptError(e); - }; - if (executor == null) { - inputQ.blockingReceive(this::receiveDataFrame, onError); + CompletableFuture receiveData(BodySubscriber bodySubscriber) { + responseBodyCF = MinimalFuture.of(bodySubscriber.getBody()); + + if (isCanceled()) { + Throwable t = getCancelCause(); + responseBodyCF.completeExceptionally(t); } else { - inputQ.asyncReceive(executor, this::receiveDataFrame, onError); + bodySubscriber.onSubscribe(userSubscription); } - return cf; + // Set the responseSubscriber field now that onSubscribe has been called. + // This effectively allows the scheduler to start invoking the callbacks. + responseSubscriber = bodySubscriber; + sched.runOrSchedule(); // in case data waiting already to be processed + return responseBodyCF; } @Override - void sendBody() throws IOException { - try { - sendBodyImpl().join(); - } catch (CompletionException e) { - throw Utils.getIOException(e); - } - } - CompletableFuture> sendBodyAsync() { return sendBodyImpl().thenApply( v -> this); } @SuppressWarnings("unchecked") - Stream(HttpClientImpl client, - Http2Connection connection, + Stream(Http2Connection connection, Exchange e, WindowController windowController) { super(e); - this.client = client; this.connection = connection; this.windowController = windowController; this.request = e.request(); - this.requestProcessor = request.requestProcessor; + this.requestPublisher = request.requestPublisher; // may be null responseHeaders = new HttpHeadersImpl(); - requestHeaders = new HttpHeadersImpl(); rspHeadersConsumer = (name, value) -> { responseHeaders.addHeader(name.toString(), value.toString()); if (Log.headers() && Log.trace()) { @@ -274,7 +294,6 @@ }; this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW - this.publisher = new BlockingPushPublisher<>(); this.windowUpdater = new StreamWindowUpdateSender(connection); } @@ -284,17 +303,18 @@ * Data frames will be removed by response body thread. */ void incoming(Http2Frame frame) throws IOException { + debug.log(Level.DEBUG, "incoming: %s", frame); if ((frame instanceof HeaderFrame)) { HeaderFrame hframe = (HeaderFrame)frame; if (hframe.endHeaders()) { Log.logTrace("handling response (streamid={0})", streamid); handleResponse(); if (hframe.getFlag(HeaderFrame.END_STREAM)) { - inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0])); + receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); } } } else if (frame instanceof DataFrame) { - inputQ.put(frame); + receiveDataFrame((DataFrame)frame); } else { otherFrame(frame); } @@ -324,10 +344,6 @@ } protected void handleResponse() throws IOException { - synchronized(this) { - responseHeadersReceived = true; - } - HttpConnection c = connection.connection; // TODO: improve responseCode = (int)responseHeaders .firstValueAsLong(":status") .orElseThrow(() -> new IOException("no statuscode in response")); @@ -336,9 +352,11 @@ request, exchange, responseHeaders, responseCode, HttpClient.Version.HTTP_2); - this.responseContentLen = responseHeaders - .firstValueAsLong("content-length") - .orElse(-1L); + /* TODO: review if needs to be removed + the value is not used, but in case `content-length` doesn't parse as + long, there will be NumberFormatException. If left as is, make sure + code up the stack handles NFE correctly. */ + responseHeaders.firstValueAsLong("content-length"); if (Log.headers()) { StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); @@ -349,50 +367,32 @@ completeResponse(response); } - void incoming_reset(ResetFrame frame) throws IOException { + void incoming_reset(ResetFrame frame) { Log.logTrace("Received RST_STREAM on stream {0}", streamid); if (endStreamReceived()) { Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); } else if (closed) { Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); } else { - boolean pushedToQueue = false; - synchronized(this) { - // if the response headers are not yet - // received, or the inputQueue is closed, handle reset directly. - // Otherwise, put it in the input queue in order to read all - // pending data frames first. Indeed, a server may send - // RST_STREAM after sending END_STREAM, in which case we should - // ignore it. However, we won't know if we have received END_STREAM - // or not until all pending data frames are read. - // Because the inputQ will not be read until the response - // headers are received, and because response headers won't be - // sent if the server sent RST_STREAM, then we must handle - // reset here directly unless responseHeadersReceived is true. - pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame); - } - if (!pushedToQueue) { - // RST_STREAM was not pushed to the queue: handle it. - try { - handleReset(frame); - } catch (IOException io) { - completeResponseExceptionally(io); - } - } else { - // RST_STREAM was pushed to the queue. It will be handled by - // asyncReceive after all pending data frames have been - // processed. - Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); - } + // put it in the input queue in order to read all + // pending data frames first. Indeed, a server may send + // RST_STREAM after sending END_STREAM, in which case we should + // ignore it. However, we won't know if we have received END_STREAM + // or not until all pending data frames are read. + receiveResetFrame(frame); + // RST_STREAM was pushed to the queue. It will be handled by + // asyncReceive after all pending data frames have been + // processed. + Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); } } - void handleReset(ResetFrame frame) throws IOException { + void handleReset(ResetFrame frame) { Log.logTrace("Handling RST_STREAM on stream {0}", streamid); if (!closed) { close(); int error = frame.getErrorCode(); - throw new IOException(ErrorFrame.stringForCode(error)); + completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); } else { Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); } @@ -431,20 +431,21 @@ if (pushGroup == null || pushGroup.noMorePushes()) { cancelImpl(new IllegalStateException("unexpected push promise" + " on stream " + streamid)); + return; } - HttpResponse.MultiProcessor proc = pushGroup.processor(); + HttpResponse.MultiSubscriber proc = pushGroup.subscriber(); CompletableFuture> cf = pushStream.responseCF(); - Optional> bpOpt = proc.onRequest( - pushReq); + Optional> bpOpt = + pushGroup.handlerForPushRequest(pushReq); if (!bpOpt.isPresent()) { IOException ex = new IOException("Stream " + streamid + " cancelled by user"); if (Log.trace()) { - Log.logTrace("No body processor for {0}: {1}", pushReq, + Log.logTrace("No body subscriber for {0}: {1}", pushReq, ex.getMessage()); } pushStream.cancelImpl(ex); @@ -458,6 +459,7 @@ // setup housekeeping for when the push is received // TODO: deal with ignoring of CF anti-pattern cf.whenComplete((HttpResponse resp, Throwable t) -> { + t = Utils.getCompletionCause(t); if (Log.trace()) { Log.logTrace("Push completed on stream {0} for {1}{2}", pushStream.streamid, resp, @@ -516,34 +518,6 @@ return requestPseudoHeaders; } - @Override - Response getResponse() throws IOException { - try { - if (request.duration() != null) { - Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)", - streamid, - request.duration().toMillis()); - return getResponseAsync(null).get( - request.duration().toMillis(), TimeUnit.MILLISECONDS); - } else { - Log.logTrace("Waiting for response (streamid={0})", streamid); - return getResponseAsync(null).join(); - } - } catch (TimeoutException e) { - Log.logTrace("Response timeout (streamid={0})", streamid); - throw new HttpTimeoutException("Response timed out"); - } catch (InterruptedException | ExecutionException | CompletionException e) { - Throwable t = e.getCause(); - Log.logTrace("Response failed (streamid={0}): {1}", streamid, t); - if (t instanceof IOException) { - throw (IOException)t; - } - throw new IOException(e); - } finally { - Log.logTrace("Got response or failed (streamid={0})", streamid); - } - } - /** Sets endStreamReceived. Should be called only once. */ void setEndStreamReceived() { assert remotelyClosed == false: "Unexpected endStream already set"; @@ -558,100 +532,247 @@ } @Override - void sendHeadersOnly() throws IOException, InterruptedException { + CompletableFuture> sendHeadersAsync() { + debug.log(Level.DEBUG, "sendHeadersOnly()"); if (Log.requests() && request != null) { Log.logRequest(request.toString()); } - requestContentLen = requestProcessor.contentLength(); + if (requestPublisher != null) { + requestContentLen = requestPublisher.contentLength(); + } else { + requestContentLen = 0; + } OutgoingHeaders> f = headerFrame(requestContentLen); connection.sendFrame(f); + CompletableFuture> cf = new MinimalFuture<>(); + cf.complete(this); // #### good enough for now + return cf; + } + + @Override + void released() { + if (streamid > 0) { + debug.log(Level.DEBUG, "Released stream %d", streamid); + // remove this stream from the Http2Connection map. + connection.closeStream(streamid); + } else { + debug.log(Level.DEBUG, "Can't release stream %d", streamid); + } + } + + @Override + void completed() { + // There should be nothing to do here: the stream should have + // been already closed (or will be closed shortly after). } void registerStream(int id) { this.streamid = id; connection.putStream(this, streamid); + debug.log(Level.DEBUG, "Registered stream %d", id); + } + + void signalWindowUpdate() { + RequestSubscriber subscriber = requestSubscriber; + assert subscriber != null; + debug.log(Level.DEBUG, "Signalling window update"); + subscriber.sendScheduler.runOrSchedule(); } + static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); class RequestSubscriber implements Flow.Subscriber { // can be < 0 if the actual length is not known. + private final long contentLength; private volatile long remainingContentLength; private volatile Subscription subscription; + // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. + // 1) The data that was published by the request body Publisher, and + // 2) the COMPLETED sentinel, since onComplete can be invoked without demand. + final ConcurrentLinkedDeque outgoing = new ConcurrentLinkedDeque<>(); + + private final AtomicReference errorRef = new AtomicReference<>(); + // A scheduler used to honor window updates. Writing must be paused + // when the window is exhausted, and resumed when the window acquires + // some space. The sendScheduler makes it possible to implement this + // behaviour in an asynchronous non-blocking way. + // See RequestSubscriber::trySend below. + final SequentialScheduler sendScheduler; + RequestSubscriber(long contentLen) { + this.contentLength = contentLen; this.remainingContentLength = contentLen; + this.sendScheduler = + SequentialScheduler.synchronizedScheduler(this::trySend); } @Override public void onSubscribe(Flow.Subscription subscription) { if (this.subscription != null) { - throw new IllegalStateException(); + throw new IllegalStateException("already subscribed"); } this.subscription = subscription; + debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1"); subscription.request(1); } @Override public void onNext(ByteBuffer item) { - if (requestBodyCF.isDone()) { - throw new IllegalStateException(); - } + debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining()); + int size = outgoing.size(); + assert size == 0 : "non-zero size: " + size; + onNextImpl(item); + } - try { - while (item.hasRemaining()) { - assert !endStreamSent : "internal error, send data after END_STREAM flag"; - DataFrame df = getDataFrame(item); - if (remainingContentLength > 0) { - remainingContentLength -= df.getDataLength(); - assert remainingContentLength >= 0; - if (remainingContentLength == 0) { - df.setFlag(DataFrame.END_STREAM); - endStreamSent = true; - } - } - connection.sendDataFrame(df); - } - subscription.request(1); - } catch (InterruptedException ex) { + private void onNextImpl(ByteBuffer item) { + // Got some more request body bytes to send. + if (requestBodyCF.isDone()) { + // stream already cancelled, probably in timeout + sendScheduler.stop(); subscription.cancel(); - requestBodyCF.completeExceptionally(ex); + return; } + outgoing.add(item); + sendScheduler.runOrSchedule(); } @Override public void onError(Throwable throwable) { - if (requestBodyCF.isDone()) { - return; + debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable); + // ensure that errors are handled within the flow. + if (errorRef.compareAndSet(null, throwable)) { + sendScheduler.runOrSchedule(); } - subscription.cancel(); - requestBodyCF.completeExceptionally(throwable); } @Override public void onComplete() { - assert endStreamSent || remainingContentLength < 0; + debug.log(Level.DEBUG, "RequestSubscriber: onComplete"); + int size = outgoing.size(); + assert size == 0 || size == 1 : "non-zero or one size: " + size; + // last byte of request body has been obtained. + // ensure that everything is completed within the flow. + onNextImpl(COMPLETED); + } + + // Attempts to send the data, if any. + // Handles errors and completion state. + // Pause writing if the send window is exhausted, resume it if the + // send window has some bytes that can be acquired. + void trySend() { try { - if (!endStreamSent) { - endStreamSent = true; - connection.sendDataFrame(getEmptyEndStreamDataFrame()); + // handle errors raised by onError; + Throwable t = errorRef.get(); + if (t != null) { + sendScheduler.stop(); + if (requestBodyCF.isDone()) return; + subscription.cancel(); + requestBodyCF.completeExceptionally(t); + return; } - requestBodyCF.complete(null); - } catch (InterruptedException ex) { + + do { + // handle COMPLETED; + ByteBuffer item = outgoing.peekFirst(); + if (item == null) return; + else if (item == COMPLETED) { + sendScheduler.stop(); + complete(); + return; + } + + // handle bytes to send downstream + while (item.hasRemaining()) { + debug.log(Level.DEBUG, "trySend: %d", item.remaining()); + assert !endStreamSent : "internal error, send data after END_STREAM flag"; + DataFrame df = getDataFrame(item); + if (df == null) { + debug.log(Level.DEBUG, "trySend: can't send yet: %d", + item.remaining()); + return; // the send window is exhausted: come back later + } + + if (contentLength > 0) { + remainingContentLength -= df.getDataLength(); + if (remainingContentLength < 0) { + String msg = connection().getConnectionFlow() + + " stream=" + streamid + " " + + "[" + Thread.currentThread().getName() + "] " + + "Too many bytes in request body. Expected: " + + contentLength + ", got: " + + (contentLength - remainingContentLength); + connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); + throw new IOException(msg); + } else if (remainingContentLength == 0) { + df.setFlag(DataFrame.END_STREAM); + endStreamSent = true; + } + } + debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); + connection.sendDataFrame(df); + } + assert !item.hasRemaining(); + ByteBuffer b = outgoing.removeFirst(); + assert b == item; + } while (outgoing.peekFirst() != null); + + debug.log(Level.DEBUG, "trySend: request 1"); + subscription.request(1); + } catch (Throwable ex) { + debug.log(Level.DEBUG, "trySend: ", ex); + sendScheduler.stop(); + subscription.cancel(); requestBodyCF.completeExceptionally(ex); } } + + private void complete() throws IOException { + long remaining = remainingContentLength; + long written = contentLength - remaining; + if (remaining > 0) { + connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); + // let trySend() handle the exception + throw new IOException(connection().getConnectionFlow() + + " stream=" + streamid + " " + + "[" + Thread.currentThread().getName() +"] " + + "Too few bytes returned by the publisher (" + + written + "/" + + contentLength + ")"); + } + if (!endStreamSent) { + endStreamSent = true; + connection.sendDataFrame(getEmptyEndStreamDataFrame()); + } + requestBodyCF.complete(null); + } } - DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException { + /** + * Send a RESET frame to tell server to stop sending data on this stream + */ + @Override + public CompletableFuture ignoreBody() { + try { + connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); + return MinimalFuture.completedFuture(null); + } catch (Throwable e) { + Log.logTrace("Error resetting stream {0}", e.toString()); + return MinimalFuture.failedFuture(e); + } + } + + DataFrame getDataFrame(ByteBuffer buffer) { int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); // blocks waiting for stream send window, if exhausted - int actualAmount = windowController.tryAcquire(requestAmount, streamid); + int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); + if (actualAmount <= 0) return null; ByteBuffer outBuf = Utils.slice(buffer, actualAmount); - DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); + DataFrame df = new DataFrame(streamid, 0 , outBuf); return df; } - private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { - return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); + private DataFrame getEmptyEndStreamDataFrame() { + return new DataFrame(streamid, DataFrame.END_STREAM, List.of()); } /** @@ -666,7 +787,7 @@ @Override CompletableFuture getResponseAsync(Executor executor) { - CompletableFuture cf = null; + CompletableFuture cf; // The code below deals with race condition that can be caused when // completeResponse() is being called before getResponseAsync() synchronized (response_cfs) { @@ -693,7 +814,7 @@ PushGroup pg = exchange.getPushGroup(); if (pg != null) { // if an error occurs make sure it is recorded in the PushGroup - cf = cf.whenComplete((t,e) -> pg.pushError(e)); + cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e))); } return cf; } @@ -732,10 +853,6 @@ } } - final synchronized boolean isResponseReceived() { - return responseReceived; - } - synchronized void responseReceived() { responseReceived = true; if (requestSent) { @@ -763,9 +880,15 @@ } CompletableFuture sendBodyImpl() { - RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); - requestProcessor.subscribe(subscriber); - requestBodyCF.whenComplete((v,t) -> requestSent()); + requestBodyCF.whenComplete((v, t) -> requestSent()); + if (requestPublisher != null) { + final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); + requestPublisher.subscribe(requestSubscriber = subscriber); + } else { + // there is no request body, therefore the request is complete, + // END_STREAM has already sent with outgoing headers + requestBodyCF.complete(null); + } return requestBodyCF; } @@ -781,21 +904,30 @@ // This method sends a RST_STREAM frame void cancelImpl(Throwable e) { + debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e); if (Log.trace()) { Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); } boolean closing; if (closing = !closed) { // assigning closing to !closed synchronized (this) { + failed = e; if (closing = !closed) { // assigning closing to !closed closed=true; } } } if (closing) { // true if the stream has not been closed yet - inputQ.close(); + if (responseSubscriber != null) + sched.runOrSchedule(); } completeResponseExceptionally(e); + if (!requestBodyCF.isDone()) { + requestBodyCF.completeExceptionally(e); // we may be sending the body.. + } + if (responseBodyCF != null) { + responseBodyCF.completeExceptionally(e); + } try { // will send a RST_STREAM frame if (streamid != 0) { @@ -814,14 +946,12 @@ closed = true; } Log.logTrace("Closing stream {0}", streamid); - inputQ.close(); connection.closeStream(streamid); Log.logTrace("Stream {0} closed", streamid); } static class PushedStream extends Stream { final PushGroup pushGroup; - private final Stream parent; // used by server push streams // push streams need the response CF allocated up front as it is // given directly to user via the multi handler callback function. final CompletableFuture pushCF; @@ -829,16 +959,15 @@ final HttpRequestImpl pushReq; HttpResponse.BodyHandler pushHandler; - PushedStream(PushGroup pushGroup, HttpClientImpl client, - Http2Connection connection, Stream parent, - Exchange pushReq) { + PushedStream(PushGroup pushGroup, + Http2Connection connection, + Exchange pushReq) { // ## no request body possible, null window controller - super(client, connection, pushReq, null); + super(connection, pushReq, null); this.pushGroup = pushGroup; this.pushReq = pushReq.request(); this.pushCF = new MinimalFuture<>(); this.responseCF = new MinimalFuture<>(); - this.parent = parent; } CompletableFuture> responseCF() { @@ -860,18 +989,21 @@ @Override CompletableFuture> sendBodyAsync() { return super.sendBodyAsync() - .whenComplete((ExchangeImpl v, Throwable t) -> pushGroup.pushError(t)); + .whenComplete((ExchangeImpl v, Throwable t) + -> pushGroup.pushError(Utils.getCompletionCause(t))); } @Override CompletableFuture> sendHeadersAsync() { return super.sendHeadersAsync() - .whenComplete((ExchangeImpl ex, Throwable t) -> pushGroup.pushError(t)); + .whenComplete((ExchangeImpl ex, Throwable t) + -> pushGroup.pushError(Utils.getCompletionCause(t))); } @Override CompletableFuture getResponseAsync(Executor executor) { - CompletableFuture cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); + CompletableFuture cf = pushCF.whenComplete( + (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t))); if(executor!=null && !cf.isDone()) { cf = cf.thenApplyAsync( r -> r, executor); } @@ -890,17 +1022,18 @@ @Override void completeResponse(Response r) { - HttpResponseImpl.logResponse(r); + Log.logResponse(r::toString); pushCF.complete(r); // not strictly required for push API - // start reading the body using the obtained BodyProcessor + // start reading the body using the obtained BodySubscriber CompletableFuture start = new MinimalFuture<>(); start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) .whenComplete((T body, Throwable t) -> { if (t != null) { responseCF.completeExceptionally(t); } else { - HttpResponseImpl response = new HttpResponseImpl<>(r.request, r, body, getExchange()); - responseCF.complete(response); + HttpResponseImpl resp = + new HttpResponseImpl<>(r.request, r, null, body, getExchange()); + responseCF.complete(resp); } }); start.completeAsync(() -> null, getExchange().executor()); @@ -911,15 +1044,14 @@ pushCF.completeExceptionally(t); } - @Override - synchronized void responseReceived() { - super.responseReceived(); - } +// @Override +// synchronized void responseReceived() { +// super.responseReceived(); +// } // create and return the PushResponseImpl @Override protected void handleResponse() { - HttpConnection c = connection.connection; // TODO: improve responseCode = (int)responseHeaders .firstValueAsLong(":status") .orElse(-1); @@ -932,9 +1064,11 @@ pushReq, exchange, responseHeaders, responseCode, HttpClient.Version.HTTP_2); - this.responseContentLen = responseHeaders - .firstValueAsLong("content-length") - .orElse(-1L); + /* TODO: review if needs to be removed + the value is not used, but in case `content-length` doesn't parse + as long, there will be NumberFormatException. If left as is, make + sure code up the stack handles NFE correctly. */ + responseHeaders.firstValueAsLong("content-length"); if (Log.headers()) { StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); @@ -960,4 +1094,23 @@ } } + /** + * Returns true if this exchange was canceled. + * @return true if this exchange was canceled. + */ + synchronized boolean isCanceled() { + return failed != null; + } + + /** + * Returns the cause for which this exchange was canceled, if available. + * @return the cause for which this exchange was canceled, if available. + */ + synchronized Throwable getCancelCause() { + return failed; + } + + final String dbgString() { + return connection.dbgString() + "/Stream("+streamid+")"; + } }