< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
Print this page
@@ -24,25 +24,25 @@
*/
package jdk.incubator.http;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-
+import java.util.concurrent.atomic.AtomicReference;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
/**
@@ -52,14 +52,10 @@
*
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
*
* sendRequest() -- sendHeadersOnly() + sendBody()
*
- * sendBody() -- in calling thread: obeys all flow control (so may block)
- * obtains data from request body processor and places on connection
- * outbound Q.
- *
* sendBodyAsync() -- calls sendBody() in an executor thread.
*
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block
*
* sendRequestAsync() -- calls sendRequest() in an executor thread
@@ -75,14 +71,10 @@
* and returns it. Completion is achieved through the
* incoming() upcall from connection reader thread.
*
* getResponse() -- calls getResponseAsync() and waits for CF to complete
*
- * responseBody() -- in calling thread: blocks for incoming DATA frames on
- * stream inputQ. Obeys remote and local flow control so may block.
- * Calls user response body processor with data buffers.
- *
* responseBodyAsync() -- calls responseBody() in an executor thread.
*
* incoming() -- entry point called from connection reader thread. Frames are
* either handled immediately without blocking or for data frames
* placed on the stream's inputQ which is consumed by the stream's
@@ -96,44 +88,47 @@
* one response. The CF is created when the object created and when the response
* HEADERS frame is received the object is completed.
*/
class Stream<T> extends ExchangeImpl<T> {
- final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
+ final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+
+ final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
+ final SequentialScheduler sched =
+ SequentialScheduler.synchronizedScheduler(this::schedule);
+ final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
/**
* This stream's identifier. Assigned lazily by the HTTP2Connection before
* the stream's first frame is sent.
*/
protected volatile int streamid;
- long responseContentLen = -1;
- long responseBytesProcessed = 0;
long requestContentLen;
final Http2Connection connection;
- HttpClientImpl client;
final HttpRequestImpl request;
final DecodingCallback rspHeadersConsumer;
HttpHeadersImpl responseHeaders;
- final HttpHeadersImpl requestHeaders;
final HttpHeadersImpl requestPseudoHeaders;
- HttpResponse.BodyProcessor<T> responseProcessor;
- final HttpRequest.BodyProcessor requestProcessor;
+ volatile HttpResponse.BodySubscriber<T> responseSubscriber;
+ final HttpRequest.BodyPublisher requestPublisher;
+ volatile RequestSubscriber requestSubscriber;
volatile int responseCode;
volatile Response response;
- volatile CompletableFuture<Response> responseCF;
- final AbstractPushPublisher<ByteBuffer> publisher;
+ volatile Throwable failed; // The exception with which this stream was canceled.
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
+ volatile CompletableFuture<T> responseBodyCF;
/** True if END_STREAM has been seen in a frame received on this stream. */
private volatile boolean remotelyClosed;
private volatile boolean closed;
private volatile boolean endStreamSent;
// state flags
- boolean requestSent, responseReceived, responseHeadersReceived;
+ private boolean requestSent, responseReceived;
/**
* A reference to this Stream's connection Send Window controller. The
* stream MUST acquire the appropriate amount of Send Window before
* sending any data. Will be null for PushStreams, as they cannot send data.
@@ -144,159 +139,184 @@
@Override
HttpConnection connection() {
return connection.connection;
}
+ /**
+ * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
+ * of after user subscription window has re-opened, from SubscriptionBase.request()
+ */
+ private void schedule() {
+ if (responseSubscriber == null)
+ // can't process anything yet
+ return;
+
+ while (!inputQ.isEmpty()) {
+ Http2Frame frame = inputQ.peek();
+ if (frame instanceof ResetFrame) {
+ inputQ.remove();
+ handleReset((ResetFrame)frame);
+ return;
+ }
+ DataFrame df = (DataFrame)frame;
+ boolean finished = df.getFlag(DataFrame.END_STREAM);
+
+ List<ByteBuffer> buffers = df.getData();
+ List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
+ int size = Utils.remaining(dsts, Integer.MAX_VALUE);
+ if (size == 0 && finished) {
+ inputQ.remove();
+ Log.logTrace("responseSubscriber.onComplete");
+ debug.log(Level.DEBUG, "incoming: onComplete");
+ sched.stop();
+ responseSubscriber.onComplete();
+ setEndStreamReceived();
+ return;
+ } else if (userSubscription.tryDecrement()) {
+ inputQ.remove();
+ Log.logTrace("responseSubscriber.onNext {0}", size);
+ debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
+ responseSubscriber.onNext(dsts);
+ if (consumed(df)) {
+ Log.logTrace("responseSubscriber.onComplete");
+ debug.log(Level.DEBUG, "incoming: onComplete");
+ sched.stop();
+ responseSubscriber.onComplete();
+ setEndStreamReceived();
+ return;
+ }
+ } else {
+ return;
+ }
+ }
+ Throwable t = failed;
+ if (t != null) {
+ sched.stop();
+ responseSubscriber.onError(t);
+ close();
+ }
+ }
+
+ // Callback invoked after the Response BodySubscriber has consumed the
+ // buffers contained in a DataFrame.
+ // Returns true if END_STREAM is reached, false otherwise.
+ private boolean consumed(DataFrame df) {
+ // RFC 7540 6.1:
+ // The entire DATA frame payload is included in flow control,
+ // including the Pad Length and Padding fields if present
+ int len = df.payloadLength();
+ connection.windowUpdater.update(len);
+
+ if (!df.getFlag(DataFrame.END_STREAM)) {
+ // Don't send window update on a stream which is
+ // closed or half closed.
+ windowUpdater.update(len);
+ return false; // more data coming
+ }
+ return true; // end of stream
+ }
+
@Override
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
Log.logTrace("Reading body on stream {0}", streamid);
- responseProcessor = handler.apply(responseCode, responseHeaders);
- publisher.subscribe(responseProcessor);
- CompletableFuture<T> cf = receiveData(executor);
+ BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+ CompletableFuture<T> cf = receiveData(bodySubscriber);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t,e) -> pg.pushError(e));
}
return cf;
}
@Override
- T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
- throws IOException
- {
- CompletableFuture<T> cf = readBodyAsync(handler,
- returnConnectionToPool,
- null);
- try {
- return cf.join();
- } catch (CompletionException e) {
- throw Utils.getIOException(e);
- }
- }
-
- @Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("streamid: ")
.append(streamid);
return sb.toString();
}
- private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
- if (frame instanceof ResetFrame) {
- handleReset((ResetFrame) frame);
- return true;
- } else if (!(frame instanceof DataFrame)) {
- assert false;
- return true;
- }
- DataFrame df = (DataFrame) frame;
- // RFC 7540 6.1:
- // The entire DATA frame payload is included in flow control,
- // including the Pad Length and Padding fields if present
- int len = df.payloadLength();
- ByteBufferReference[] buffers = df.getData();
- for (ByteBufferReference b : buffers) {
- ByteBuffer buf = b.get();
- if (buf.hasRemaining()) {
- publisher.acceptData(Optional.of(buf));
- }
+ private void receiveDataFrame(DataFrame df) {
+ inputQ.add(df);
+ sched.runOrSchedule();
}
- connection.windowUpdater.update(len);
- if (df.getFlag(DataFrame.END_STREAM)) {
- setEndStreamReceived();
- publisher.acceptData(Optional.empty());
- return false;
- }
- // Don't send window update on a stream which is
- // closed or half closed.
- windowUpdater.update(len);
- return true;
+
+ /** Handles a RESET frame. RESET is always handled inline in the queue. */
+ private void receiveResetFrame(ResetFrame frame) {
+ inputQ.add(frame);
+ sched.runOrSchedule();
}
- // pushes entire response body into response processor
+ // pushes entire response body into response subscriber
// blocking when required by local or remote flow control
- CompletableFuture<T> receiveData(Executor executor) {
- CompletableFuture<T> cf = responseProcessor
- .getBody()
- .toCompletableFuture();
- Consumer<Throwable> onError = e -> {
- Log.logTrace("receiveData: {0}", e.toString());
- e.printStackTrace();
- cf.completeExceptionally(e);
- publisher.acceptError(e);
- };
- if (executor == null) {
- inputQ.blockingReceive(this::receiveDataFrame, onError);
+ CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
+ responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
+
+ if (isCanceled()) {
+ Throwable t = getCancelCause();
+ responseBodyCF.completeExceptionally(t);
} else {
- inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
+ bodySubscriber.onSubscribe(userSubscription);
}
- return cf;
+ // Set the responseSubscriber field now that onSubscribe has been called.
+ // This effectively allows the scheduler to start invoking the callbacks.
+ responseSubscriber = bodySubscriber;
+ sched.runOrSchedule(); // in case data waiting already to be processed
+ return responseBodyCF;
}
@Override
- void sendBody() throws IOException {
- try {
- sendBodyImpl().join();
- } catch (CompletionException e) {
- throw Utils.getIOException(e);
- }
- }
-
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
- Stream(HttpClientImpl client,
- Http2Connection connection,
+ Stream(Http2Connection connection,
Exchange<T> e,
WindowController windowController)
{
super(e);
- this.client = client;
this.connection = connection;
this.windowController = windowController;
this.request = e.request();
- this.requestProcessor = request.requestProcessor;
+ this.requestPublisher = request.requestPublisher; // may be null
responseHeaders = new HttpHeadersImpl();
- requestHeaders = new HttpHeadersImpl();
rspHeadersConsumer = (name, value) -> {
responseHeaders.addHeader(name.toString(), value.toString());
if (Log.headers() && Log.trace()) {
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
streamid, name, value);
}
};
this.requestPseudoHeaders = new HttpHeadersImpl();
// NEW
- this.publisher = new BlockingPushPublisher<>();
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
/**
* Entry point from Http2Connection reader thread.
*
* Data frames will be removed by response body thread.
*/
void incoming(Http2Frame frame) throws IOException {
+ debug.log(Level.DEBUG, "incoming: %s", frame);
if ((frame instanceof HeaderFrame)) {
HeaderFrame hframe = (HeaderFrame)frame;
if (hframe.endHeaders()) {
Log.logTrace("handling response (streamid={0})", streamid);
handleResponse();
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
- inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
+ receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
}
} else if (frame instanceof DataFrame) {
- inputQ.put(frame);
+ receiveDataFrame((DataFrame)frame);
} else {
otherFrame(frame);
}
}
@@ -322,79 +342,59 @@
DecodingCallback rspHeadersConsumer() {
return rspHeadersConsumer;
}
protected void handleResponse() throws IOException {
- synchronized(this) {
- responseHeadersReceived = true;
- }
- HttpConnection c = connection.connection; // TODO: improve
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElseThrow(() -> new IOException("no statuscode in response"));
response = new Response(
request, exchange, responseHeaders,
responseCode, HttpClient.Version.HTTP_2);
- this.responseContentLen = responseHeaders
- .firstValueAsLong("content-length")
- .orElse(-1L);
+ /* TODO: review if needs to be removed
+ the value is not used, but in case `content-length` doesn't parse as
+ long, there will be NumberFormatException. If left as is, make sure
+ code up the stack handles NFE correctly. */
+ responseHeaders.firstValueAsLong("content-length");
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
Log.dumpHeaders(sb, " ", responseHeaders);
Log.logHeaders(sb.toString());
}
completeResponse(response);
}
- void incoming_reset(ResetFrame frame) throws IOException {
+ void incoming_reset(ResetFrame frame) {
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
if (endStreamReceived()) {
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
} else if (closed) {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else {
- boolean pushedToQueue = false;
- synchronized(this) {
- // if the response headers are not yet
- // received, or the inputQueue is closed, handle reset directly.
- // Otherwise, put it in the input queue in order to read all
+ // put it in the input queue in order to read all
// pending data frames first. Indeed, a server may send
// RST_STREAM after sending END_STREAM, in which case we should
// ignore it. However, we won't know if we have received END_STREAM
// or not until all pending data frames are read.
- // Because the inputQ will not be read until the response
- // headers are received, and because response headers won't be
- // sent if the server sent RST_STREAM, then we must handle
- // reset here directly unless responseHeadersReceived is true.
- pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
- }
- if (!pushedToQueue) {
- // RST_STREAM was not pushed to the queue: handle it.
- try {
- handleReset(frame);
- } catch (IOException io) {
- completeResponseExceptionally(io);
- }
- } else {
+ receiveResetFrame(frame);
// RST_STREAM was pushed to the queue. It will be handled by
// asyncReceive after all pending data frames have been
// processed.
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
}
}
- }
- void handleReset(ResetFrame frame) throws IOException {
+ void handleReset(ResetFrame frame) {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
close();
int error = frame.getErrorCode();
- throw new IOException(ErrorFrame.stringForCode(error));
+ completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
} else {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
}
}
@@ -429,24 +429,25 @@
}
PushGroup<?,T> pushGroup = exchange.getPushGroup();
if (pushGroup == null || pushGroup.noMorePushes()) {
cancelImpl(new IllegalStateException("unexpected push promise"
+ " on stream " + streamid));
+ return;
}
- HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
+ HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
- Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
- pushReq);
+ Optional<HttpResponse.BodyHandler<T>> bpOpt =
+ pushGroup.handlerForPushRequest(pushReq);
if (!bpOpt.isPresent()) {
IOException ex = new IOException("Stream "
+ streamid + " cancelled by user");
if (Log.trace()) {
- Log.logTrace("No body processor for {0}: {1}", pushReq,
+ Log.logTrace("No body subscriber for {0}: {1}", pushReq,
ex.getMessage());
}
pushStream.cancelImpl(ex);
cf.completeExceptionally(ex);
return;
@@ -456,10 +457,11 @@
pushStream.requestSent();
pushStream.setPushHandler(bpOpt.get());
// setup housekeeping for when the push is received
// TODO: deal with ignoring of CF anti-pattern
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
+ t = Utils.getCompletionCause(t);
if (Log.trace()) {
Log.logTrace("Push completed on stream {0} for {1}{2}",
pushStream.streamid, resp,
((t==null) ? "": " with exception " + t));
}
@@ -514,38 +516,10 @@
HttpHeadersImpl getRequestPseudoHeaders() {
return requestPseudoHeaders;
}
- @Override
- Response getResponse() throws IOException {
- try {
- if (request.duration() != null) {
- Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
- streamid,
- request.duration().toMillis());
- return getResponseAsync(null).get(
- request.duration().toMillis(), TimeUnit.MILLISECONDS);
- } else {
- Log.logTrace("Waiting for response (streamid={0})", streamid);
- return getResponseAsync(null).join();
- }
- } catch (TimeoutException e) {
- Log.logTrace("Response timeout (streamid={0})", streamid);
- throw new HttpTimeoutException("Response timed out");
- } catch (InterruptedException | ExecutionException | CompletionException e) {
- Throwable t = e.getCause();
- Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
- if (t instanceof IOException) {
- throw (IOException)t;
- }
- throw new IOException(e);
- } finally {
- Log.logTrace("Got response or failed (streamid={0})", streamid);
- }
- }
-
/** Sets endStreamReceived. Should be called only once. */
void setEndStreamReceived() {
assert remotelyClosed == false: "Unexpected endStream already set";
remotelyClosed = true;
responseReceived();
@@ -556,104 +530,251 @@
private boolean endStreamReceived() {
return remotelyClosed;
}
@Override
- void sendHeadersOnly() throws IOException, InterruptedException {
+ CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
+ debug.log(Level.DEBUG, "sendHeadersOnly()");
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
- requestContentLen = requestProcessor.contentLength();
+ if (requestPublisher != null) {
+ requestContentLen = requestPublisher.contentLength();
+ } else {
+ requestContentLen = 0;
+ }
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
connection.sendFrame(f);
+ CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
+ cf.complete(this); // #### good enough for now
+ return cf;
+ }
+
+ @Override
+ void released() {
+ if (streamid > 0) {
+ debug.log(Level.DEBUG, "Released stream %d", streamid);
+ // remove this stream from the Http2Connection map.
+ connection.closeStream(streamid);
+ } else {
+ debug.log(Level.DEBUG, "Can't release stream %d", streamid);
+ }
+ }
+
+ @Override
+ void completed() {
+ // There should be nothing to do here: the stream should have
+ // been already closed (or will be closed shortly after).
}
void registerStream(int id) {
this.streamid = id;
connection.putStream(this, streamid);
+ debug.log(Level.DEBUG, "Registered stream %d", id);
+ }
+
+ void signalWindowUpdate() {
+ RequestSubscriber subscriber = requestSubscriber;
+ assert subscriber != null;
+ debug.log(Level.DEBUG, "Signalling window update");
+ subscriber.sendScheduler.runOrSchedule();
}
+ static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
// can be < 0 if the actual length is not known.
+ private final long contentLength;
private volatile long remainingContentLength;
private volatile Subscription subscription;
+ // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
+ // 1) The data that was published by the request body Publisher, and
+ // 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
+ final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
+
+ private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ // A scheduler used to honor window updates. Writing must be paused
+ // when the window is exhausted, and resumed when the window acquires
+ // some space. The sendScheduler makes it possible to implement this
+ // behaviour in an asynchronous non-blocking way.
+ // See RequestSubscriber::trySend below.
+ final SequentialScheduler sendScheduler;
+
RequestSubscriber(long contentLen) {
+ this.contentLength = contentLen;
this.remainingContentLength = contentLen;
+ this.sendScheduler =
+ SequentialScheduler.synchronizedScheduler(this::trySend);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
- throw new IllegalStateException();
+ throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
+ debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
+ debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
+ int size = outgoing.size();
+ assert size == 0 : "non-zero size: " + size;
+ onNextImpl(item);
+ }
+
+ private void onNextImpl(ByteBuffer item) {
+ // Got some more request body bytes to send.
if (requestBodyCF.isDone()) {
- throw new IllegalStateException();
+ // stream already cancelled, probably in timeout
+ sendScheduler.stop();
+ subscription.cancel();
+ return;
+ }
+ outgoing.add(item);
+ sendScheduler.runOrSchedule();
}
+ @Override
+ public void onError(Throwable throwable) {
+ debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
+ // ensure that errors are handled within the flow.
+ if (errorRef.compareAndSet(null, throwable)) {
+ sendScheduler.runOrSchedule();
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
+ int size = outgoing.size();
+ assert size == 0 || size == 1 : "non-zero or one size: " + size;
+ // last byte of request body has been obtained.
+ // ensure that everything is completed within the flow.
+ onNextImpl(COMPLETED);
+ }
+
+ // Attempts to send the data, if any.
+ // Handles errors and completion state.
+ // Pause writing if the send window is exhausted, resume it if the
+ // send window has some bytes that can be acquired.
+ void trySend() {
try {
+ // handle errors raised by onError;
+ Throwable t = errorRef.get();
+ if (t != null) {
+ sendScheduler.stop();
+ if (requestBodyCF.isDone()) return;
+ subscription.cancel();
+ requestBodyCF.completeExceptionally(t);
+ return;
+ }
+
+ do {
+ // handle COMPLETED;
+ ByteBuffer item = outgoing.peekFirst();
+ if (item == null) return;
+ else if (item == COMPLETED) {
+ sendScheduler.stop();
+ complete();
+ return;
+ }
+
+ // handle bytes to send downstream
while (item.hasRemaining()) {
+ debug.log(Level.DEBUG, "trySend: %d", item.remaining());
assert !endStreamSent : "internal error, send data after END_STREAM flag";
DataFrame df = getDataFrame(item);
- if (remainingContentLength > 0) {
+ if (df == null) {
+ debug.log(Level.DEBUG, "trySend: can't send yet: %d",
+ item.remaining());
+ return; // the send window is exhausted: come back later
+ }
+
+ if (contentLength > 0) {
remainingContentLength -= df.getDataLength();
- assert remainingContentLength >= 0;
- if (remainingContentLength == 0) {
+ if (remainingContentLength < 0) {
+ String msg = connection().getConnectionFlow()
+ + " stream=" + streamid + " "
+ + "[" + Thread.currentThread().getName() + "] "
+ + "Too many bytes in request body. Expected: "
+ + contentLength + ", got: "
+ + (contentLength - remainingContentLength);
+ connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+ throw new IOException(msg);
+ } else if (remainingContentLength == 0) {
df.setFlag(DataFrame.END_STREAM);
endStreamSent = true;
}
}
+ debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
connection.sendDataFrame(df);
}
+ assert !item.hasRemaining();
+ ByteBuffer b = outgoing.removeFirst();
+ assert b == item;
+ } while (outgoing.peekFirst() != null);
+
+ debug.log(Level.DEBUG, "trySend: request 1");
subscription.request(1);
- } catch (InterruptedException ex) {
+ } catch (Throwable ex) {
+ debug.log(Level.DEBUG, "trySend: ", ex);
+ sendScheduler.stop();
subscription.cancel();
requestBodyCF.completeExceptionally(ex);
}
}
- @Override
- public void onError(Throwable throwable) {
- if (requestBodyCF.isDone()) {
- return;
+ private void complete() throws IOException {
+ long remaining = remainingContentLength;
+ long written = contentLength - remaining;
+ if (remaining > 0) {
+ connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+ // let trySend() handle the exception
+ throw new IOException(connection().getConnectionFlow()
+ + " stream=" + streamid + " "
+ + "[" + Thread.currentThread().getName() +"] "
+ + "Too few bytes returned by the publisher ("
+ + written + "/"
+ + contentLength + ")");
}
- subscription.cancel();
- requestBodyCF.completeExceptionally(throwable);
- }
-
- @Override
- public void onComplete() {
- assert endStreamSent || remainingContentLength < 0;
- try {
if (!endStreamSent) {
endStreamSent = true;
connection.sendDataFrame(getEmptyEndStreamDataFrame());
}
requestBodyCF.complete(null);
- } catch (InterruptedException ex) {
- requestBodyCF.completeExceptionally(ex);
}
}
+
+ /**
+ * Send a RESET frame to tell server to stop sending data on this stream
+ */
+ @Override
+ public CompletableFuture<Void> ignoreBody() {
+ try {
+ connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
+ return MinimalFuture.completedFuture(null);
+ } catch (Throwable e) {
+ Log.logTrace("Error resetting stream {0}", e.toString());
+ return MinimalFuture.failedFuture(e);
+ }
}
- DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
+ DataFrame getDataFrame(ByteBuffer buffer) {
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
// blocks waiting for stream send window, if exhausted
- int actualAmount = windowController.tryAcquire(requestAmount, streamid);
+ int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
+ if (actualAmount <= 0) return null;
ByteBuffer outBuf = Utils.slice(buffer, actualAmount);
- DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
+ DataFrame df = new DataFrame(streamid, 0 , outBuf);
return df;
}
- private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
- return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
+ private DataFrame getEmptyEndStreamDataFrame() {
+ return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
}
/**
* A List of responses relating to this stream. Normally there is only
* one response, but intermediate responses like 100 are allowed
@@ -664,11 +785,11 @@
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
- CompletableFuture<Response> cf = null;
+ CompletableFuture<Response> cf;
// The code below deals with race condition that can be caused when
// completeResponse() is being called before getResponseAsync()
synchronized (response_cfs) {
if (!response_cfs.isEmpty()) {
// This CompletableFuture was created by completeResponse().
@@ -691,11 +812,11 @@
}
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
- cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
}
return cf;
}
/**
@@ -730,14 +851,10 @@
if (responseReceived) {
close();
}
}
- final synchronized boolean isResponseReceived() {
- return responseReceived;
- }
-
synchronized void responseReceived() {
responseReceived = true;
if (requestSent) {
close();
}
@@ -761,13 +878,19 @@
response_cfs.add(MinimalFuture.failedFuture(t));
}
}
CompletableFuture<Void> sendBodyImpl() {
- RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
- requestProcessor.subscribe(subscriber);
- requestBodyCF.whenComplete((v,t) -> requestSent());
+ requestBodyCF.whenComplete((v, t) -> requestSent());
+ if (requestPublisher != null) {
+ final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
+ requestPublisher.subscribe(requestSubscriber = subscriber);
+ } else {
+ // there is no request body, therefore the request is complete,
+ // END_STREAM has already sent with outgoing headers
+ requestBodyCF.complete(null);
+ }
return requestBodyCF;
}
@Override
void cancel() {
@@ -779,25 +902,34 @@
cancelImpl(cause);
}
// This method sends a RST_STREAM frame
void cancelImpl(Throwable e) {
+ debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
if (Log.trace()) {
Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
}
boolean closing;
if (closing = !closed) { // assigning closing to !closed
synchronized (this) {
+ failed = e;
if (closing = !closed) { // assigning closing to !closed
closed=true;
}
}
}
if (closing) { // true if the stream has not been closed yet
- inputQ.close();
+ if (responseSubscriber != null)
+ sched.runOrSchedule();
}
completeResponseExceptionally(e);
+ if (!requestBodyCF.isDone()) {
+ requestBodyCF.completeExceptionally(e); // we may be sending the body..
+ }
+ if (responseBodyCF != null) {
+ responseBodyCF.completeExceptionally(e);
+ }
try {
// will send a RST_STREAM frame
if (streamid != 0) {
connection.resetStream(streamid, ResetFrame.CANCEL);
}
@@ -812,35 +944,32 @@
synchronized(this) {
if (closed) return;
closed = true;
}
Log.logTrace("Closing stream {0}", streamid);
- inputQ.close();
connection.closeStream(streamid);
Log.logTrace("Stream {0} closed", streamid);
}
static class PushedStream<U,T> extends Stream<T> {
final PushGroup<U,T> pushGroup;
- private final Stream<T> parent; // used by server push streams
// push streams need the response CF allocated up front as it is
// given directly to user via the multi handler callback function.
final CompletableFuture<Response> pushCF;
final CompletableFuture<HttpResponse<T>> responseCF;
final HttpRequestImpl pushReq;
HttpResponse.BodyHandler<T> pushHandler;
- PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
- Http2Connection connection, Stream<T> parent,
+ PushedStream(PushGroup<U,T> pushGroup,
+ Http2Connection connection,
Exchange<T> pushReq) {
// ## no request body possible, null window controller
- super(client, connection, pushReq, null);
+ super(connection, pushReq, null);
this.pushGroup = pushGroup;
this.pushReq = pushReq.request();
this.pushCF = new MinimalFuture<>();
this.responseCF = new MinimalFuture<>();
- this.parent = parent;
}
CompletableFuture<HttpResponse<T>> responseCF() {
return responseCF;
}
@@ -858,22 +987,25 @@
// error record it in the PushGroup. The error method is called
// with a null value when no error occurred (is a no-op)
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return super.sendBodyAsync()
- .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
+ .whenComplete((ExchangeImpl<T> v, Throwable t)
+ -> pushGroup.pushError(Utils.getCompletionCause(t)));
}
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
return super.sendHeadersAsync()
- .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
+ .whenComplete((ExchangeImpl<T> ex, Throwable t)
+ -> pushGroup.pushError(Utils.getCompletionCause(t)));
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
- CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ CompletableFuture<Response> cf = pushCF.whenComplete(
+ (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
if(executor!=null && !cf.isDone()) {
cf = cf.thenApplyAsync( r -> r, executor);
}
return cf;
}
@@ -888,40 +1020,40 @@
.whenComplete((v, t) -> pushGroup.pushError(t));
}
@Override
void completeResponse(Response r) {
- HttpResponseImpl.logResponse(r);
+ Log.logResponse(r::toString);
pushCF.complete(r); // not strictly required for push API
- // start reading the body using the obtained BodyProcessor
+ // start reading the body using the obtained BodySubscriber
CompletableFuture<Void> start = new MinimalFuture<>();
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
.whenComplete((T body, Throwable t) -> {
if (t != null) {
responseCF.completeExceptionally(t);
} else {
- HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
- responseCF.complete(response);
+ HttpResponseImpl<T> resp =
+ new HttpResponseImpl<>(r.request, r, null, body, getExchange());
+ responseCF.complete(resp);
}
});
start.completeAsync(() -> null, getExchange().executor());
}
@Override
void completeResponseExceptionally(Throwable t) {
pushCF.completeExceptionally(t);
}
- @Override
- synchronized void responseReceived() {
- super.responseReceived();
- }
+// @Override
+// synchronized void responseReceived() {
+// super.responseReceived();
+// }
// create and return the PushResponseImpl
@Override
protected void handleResponse() {
- HttpConnection c = connection.connection; // TODO: improve
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElse(-1);
if (responseCode == -1) {
@@ -930,13 +1062,15 @@
this.response = new Response(
pushReq, exchange, responseHeaders,
responseCode, HttpClient.Version.HTTP_2);
- this.responseContentLen = responseHeaders
- .firstValueAsLong("content-length")
- .orElse(-1L);
+ /* TODO: review if needs to be removed
+ the value is not used, but in case `content-length` doesn't parse
+ as long, there will be NumberFormatException. If left as is, make
+ sure code up the stack handles NFE correctly. */
+ responseHeaders.firstValueAsLong("content-length");
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
sb.append(" (streamid=").append(streamid).append("): ");
Log.dumpHeaders(sb, " ", responseHeaders);
@@ -958,6 +1092,25 @@
int getStreamId() {
return streamid;
}
}
+ /**
+ * Returns true if this exchange was canceled.
+ * @return true if this exchange was canceled.
+ */
+ synchronized boolean isCanceled() {
+ return failed != null;
+ }
+
+ /**
+ * Returns the cause for which this exchange was canceled, if available.
+ * @return the cause for which this exchange was canceled, if available.
+ */
+ synchronized Throwable getCancelCause() {
+ return failed;
+ }
+
+ final String dbgString() {
+ return connection.dbgString() + "/Stream("+streamid+")";
+ }
}
< prev index next >