< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java

Print this page

        

@@ -24,25 +24,25 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-
+import java.util.concurrent.atomic.AtomicReference;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
 
 /**

@@ -52,14 +52,10 @@
  *
  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  *
  * sendRequest() -- sendHeadersOnly() + sendBody()
  *
- * sendBody() -- in calling thread: obeys all flow control (so may block)
- *               obtains data from request body processor and places on connection
- *               outbound Q.
- *
  * sendBodyAsync() -- calls sendBody() in an executor thread.
  *
  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  *
  * sendRequestAsync() -- calls sendRequest() in an executor thread

@@ -75,14 +71,10 @@
  *               and returns it. Completion is achieved through the
  *               incoming() upcall from connection reader thread.
  *
  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  *
- * responseBody() -- in calling thread: blocks for incoming DATA frames on
- *               stream inputQ. Obeys remote and local flow control so may block.
- *               Calls user response body processor with data buffers.
- *
  * responseBodyAsync() -- calls responseBody() in an executor thread.
  *
  * incoming() -- entry point called from connection reader thread. Frames are
  *               either handled immediately without blocking or for data frames
  *               placed on the stream's inputQ which is consumed by the stream's

@@ -96,44 +88,47 @@
  * one response. The CF is created when the object created and when the response
  * HEADERS frame is received the object is completed.
  */
 class Stream<T> extends ExchangeImpl<T> {
 
-    final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
+    final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
+    final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+
+    final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
+    final SequentialScheduler sched =
+            SequentialScheduler.synchronizedScheduler(this::schedule);
+    final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
 
     /**
      * This stream's identifier. Assigned lazily by the HTTP2Connection before
      * the stream's first frame is sent.
      */
     protected volatile int streamid;
 
-    long responseContentLen = -1;
-    long responseBytesProcessed = 0;
     long requestContentLen;
 
     final Http2Connection connection;
-    HttpClientImpl client;
     final HttpRequestImpl request;
     final DecodingCallback rspHeadersConsumer;
     HttpHeadersImpl responseHeaders;
-    final HttpHeadersImpl requestHeaders;
     final HttpHeadersImpl requestPseudoHeaders;
-    HttpResponse.BodyProcessor<T> responseProcessor;
-    final HttpRequest.BodyProcessor requestProcessor;
+    volatile HttpResponse.BodySubscriber<T> responseSubscriber;
+    final HttpRequest.BodyPublisher requestPublisher;
+    volatile RequestSubscriber requestSubscriber;
     volatile int responseCode;
     volatile Response response;
-    volatile CompletableFuture<Response> responseCF;
-    final AbstractPushPublisher<ByteBuffer> publisher;
+    volatile Throwable failed; // The exception with which this stream was canceled.
     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
+    volatile CompletableFuture<T> responseBodyCF;
 
     /** True if END_STREAM has been seen in a frame received on this stream. */
     private volatile boolean remotelyClosed;
     private volatile boolean closed;
     private volatile boolean endStreamSent;
 
     // state flags
-    boolean requestSent, responseReceived, responseHeadersReceived;
+    private boolean requestSent, responseReceived;
 
     /**
      * A reference to this Stream's connection Send Window controller. The
      * stream MUST acquire the appropriate amount of Send Window before
      * sending any data. Will be null for PushStreams, as they cannot send data.

@@ -144,159 +139,184 @@
     @Override
     HttpConnection connection() {
         return connection.connection;
     }
 
+    /**
+     * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
+     * of after user subscription window has re-opened, from SubscriptionBase.request()
+     */
+    private void schedule() {
+        if (responseSubscriber == null)
+            // can't process anything yet
+            return;
+
+        while (!inputQ.isEmpty()) {
+            Http2Frame frame  = inputQ.peek();
+            if (frame instanceof ResetFrame) {
+                inputQ.remove();
+                handleReset((ResetFrame)frame);
+                return;
+            }
+            DataFrame df = (DataFrame)frame;
+            boolean finished = df.getFlag(DataFrame.END_STREAM);
+
+            List<ByteBuffer> buffers = df.getData();
+            List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
+            int size = Utils.remaining(dsts, Integer.MAX_VALUE);
+            if (size == 0 && finished) {
+                inputQ.remove();
+                Log.logTrace("responseSubscriber.onComplete");
+                debug.log(Level.DEBUG, "incoming: onComplete");
+                sched.stop();
+                responseSubscriber.onComplete();
+                setEndStreamReceived();
+                return;
+            } else if (userSubscription.tryDecrement()) {
+                inputQ.remove();
+                Log.logTrace("responseSubscriber.onNext {0}", size);
+                debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
+                responseSubscriber.onNext(dsts);
+                if (consumed(df)) {
+                    Log.logTrace("responseSubscriber.onComplete");
+                    debug.log(Level.DEBUG, "incoming: onComplete");
+                    sched.stop();
+                    responseSubscriber.onComplete();
+                    setEndStreamReceived();
+                    return;
+                }
+            } else {
+                return;
+            }
+        }
+        Throwable t = failed;
+        if (t != null) {
+            sched.stop();
+            responseSubscriber.onError(t);
+            close();
+        }
+    }
+
+    // Callback invoked after the Response BodySubscriber has consumed the
+    // buffers contained in a DataFrame.
+    // Returns true if END_STREAM is reached, false otherwise.
+    private boolean consumed(DataFrame df) {
+        // RFC 7540 6.1:
+        // The entire DATA frame payload is included in flow control,
+        // including the Pad Length and Padding fields if present
+        int len = df.payloadLength();
+        connection.windowUpdater.update(len);
+
+        if (!df.getFlag(DataFrame.END_STREAM)) {
+            // Don't send window update on a stream which is
+            // closed or half closed.
+            windowUpdater.update(len);
+            return false; // more data coming
+        }
+        return true; // end of stream
+    }
+
     @Override
     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
                                        boolean returnConnectionToPool,
                                        Executor executor)
     {
         Log.logTrace("Reading body on stream {0}", streamid);
-        responseProcessor = handler.apply(responseCode, responseHeaders);
-        publisher.subscribe(responseProcessor);
-        CompletableFuture<T> cf = receiveData(executor);
+        BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+        CompletableFuture<T> cf = receiveData(bodySubscriber);
 
         PushGroup<?,?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(e));
         }
         return cf;
     }
 
     @Override
-    T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
-        throws IOException
-    {
-        CompletableFuture<T> cf = readBodyAsync(handler,
-                                                returnConnectionToPool,
-                                                null);
-        try {
-            return cf.join();
-        } catch (CompletionException e) {
-            throw Utils.getIOException(e);
-        }
-    }
-
-    @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("streamid: ")
                 .append(streamid);
         return sb.toString();
     }
 
-    private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
-        if (frame instanceof ResetFrame) {
-            handleReset((ResetFrame) frame);
-            return true;
-        } else if (!(frame instanceof DataFrame)) {
-            assert false;
-            return true;
-        }
-        DataFrame df = (DataFrame) frame;
-        // RFC 7540 6.1:
-        // The entire DATA frame payload is included in flow control,
-        // including the Pad Length and Padding fields if present
-        int len = df.payloadLength();
-        ByteBufferReference[] buffers = df.getData();
-        for (ByteBufferReference b : buffers) {
-            ByteBuffer buf = b.get();
-            if (buf.hasRemaining()) {
-                publisher.acceptData(Optional.of(buf));
-            }
+    private void receiveDataFrame(DataFrame df) {
+        inputQ.add(df);
+        sched.runOrSchedule();
         }
-        connection.windowUpdater.update(len);
-        if (df.getFlag(DataFrame.END_STREAM)) {
-            setEndStreamReceived();
-            publisher.acceptData(Optional.empty());
-            return false;
-        }
-        // Don't send window update on a stream which is
-        // closed or half closed.
-        windowUpdater.update(len);
-        return true;
+
+    /** Handles a RESET frame. RESET is always handled inline in the queue. */
+    private void receiveResetFrame(ResetFrame frame) {
+        inputQ.add(frame);
+        sched.runOrSchedule();
     }
 
-    // pushes entire response body into response processor
+    // pushes entire response body into response subscriber
     // blocking when required by local or remote flow control
-    CompletableFuture<T> receiveData(Executor executor) {
-        CompletableFuture<T> cf = responseProcessor
-                .getBody()
-                .toCompletableFuture();
-        Consumer<Throwable> onError = e -> {
-            Log.logTrace("receiveData: {0}", e.toString());
-            e.printStackTrace();
-            cf.completeExceptionally(e);
-            publisher.acceptError(e);
-        };
-        if (executor == null) {
-            inputQ.blockingReceive(this::receiveDataFrame, onError);
+    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
+        responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
+
+        if (isCanceled()) {
+            Throwable t = getCancelCause();
+            responseBodyCF.completeExceptionally(t);
         } else {
-            inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
+            bodySubscriber.onSubscribe(userSubscription);
         }
-        return cf;
+        // Set the responseSubscriber field now that onSubscribe has been called.
+        // This effectively allows the scheduler to start invoking the callbacks.
+        responseSubscriber = bodySubscriber;
+        sched.runOrSchedule(); // in case data waiting already to be processed
+        return responseBodyCF;
     }
 
     @Override
-    void sendBody() throws IOException {
-        try {
-            sendBodyImpl().join();
-        } catch (CompletionException e) {
-            throw Utils.getIOException(e);
-        }
-    }
-
     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
         return sendBodyImpl().thenApply( v -> this);
     }
 
     @SuppressWarnings("unchecked")
-    Stream(HttpClientImpl client,
-           Http2Connection connection,
+    Stream(Http2Connection connection,
            Exchange<T> e,
            WindowController windowController)
     {
         super(e);
-        this.client = client;
         this.connection = connection;
         this.windowController = windowController;
         this.request = e.request();
-        this.requestProcessor = request.requestProcessor;
+        this.requestPublisher = request.requestPublisher;  // may be null
         responseHeaders = new HttpHeadersImpl();
-        requestHeaders = new HttpHeadersImpl();
         rspHeadersConsumer = (name, value) -> {
             responseHeaders.addHeader(name.toString(), value.toString());
             if (Log.headers() && Log.trace()) {
                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
                              streamid, name, value);
             }
         };
         this.requestPseudoHeaders = new HttpHeadersImpl();
         // NEW
-        this.publisher = new BlockingPushPublisher<>();
         this.windowUpdater = new StreamWindowUpdateSender(connection);
     }
 
     /**
      * Entry point from Http2Connection reader thread.
      *
      * Data frames will be removed by response body thread.
      */
     void incoming(Http2Frame frame) throws IOException {
+        debug.log(Level.DEBUG, "incoming: %s", frame);
         if ((frame instanceof HeaderFrame)) {
             HeaderFrame hframe = (HeaderFrame)frame;
             if (hframe.endHeaders()) {
                 Log.logTrace("handling response (streamid={0})", streamid);
                 handleResponse();
                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
-                    inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
+                    receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
                 }
             }
         } else if (frame instanceof DataFrame) {
-            inputQ.put(frame);
+            receiveDataFrame((DataFrame)frame);
         } else {
             otherFrame(frame);
         }
     }
 

@@ -322,79 +342,59 @@
     DecodingCallback rspHeadersConsumer() {
         return rspHeadersConsumer;
     }
 
     protected void handleResponse() throws IOException {
-        synchronized(this) {
-            responseHeadersReceived = true;
-        }
-        HttpConnection c = connection.connection; // TODO: improve
         responseCode = (int)responseHeaders
                 .firstValueAsLong(":status")
                 .orElseThrow(() -> new IOException("no statuscode in response"));
 
         response = new Response(
                 request, exchange, responseHeaders,
                 responseCode, HttpClient.Version.HTTP_2);
 
-        this.responseContentLen = responseHeaders
-                .firstValueAsLong("content-length")
-                .orElse(-1L);
+        /* TODO: review if needs to be removed
+           the value is not used, but in case `content-length` doesn't parse as
+           long, there will be NumberFormatException. If left as is, make sure
+           code up the stack handles NFE correctly. */
+        responseHeaders.firstValueAsLong("content-length");
 
         if (Log.headers()) {
             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
             Log.dumpHeaders(sb, "    ", responseHeaders);
             Log.logHeaders(sb.toString());
         }
 
         completeResponse(response);
     }
 
-    void incoming_reset(ResetFrame frame) throws IOException {
+    void incoming_reset(ResetFrame frame) {
         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
         if (endStreamReceived()) {
             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
         } else if (closed) {
             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
         } else {
-            boolean pushedToQueue = false;
-            synchronized(this) {
-                // if the response headers are not yet
-                // received, or the inputQueue is closed, handle reset directly.
-                // Otherwise, put it in the input queue in order to read all
+            // put it in the input queue in order to read all
                 // pending data frames first. Indeed, a server may send
                 // RST_STREAM after sending END_STREAM, in which case we should
                 // ignore it. However, we won't know if we have received END_STREAM
                 // or not until all pending data frames are read.
-                // Because the inputQ will not be read until the response
-                // headers are received, and because response headers won't be
-                // sent if the server sent RST_STREAM, then we must handle
-                // reset here directly unless responseHeadersReceived is true.
-                pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
-            }
-            if (!pushedToQueue) {
-                // RST_STREAM was not pushed to the queue: handle it.
-                try {
-                    handleReset(frame);
-                } catch (IOException io) {
-                    completeResponseExceptionally(io);
-                }
-            } else {
+            receiveResetFrame(frame);
                 // RST_STREAM was pushed to the queue. It will be handled by
                 // asyncReceive after all pending data frames have been
                 // processed.
                 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
             }
         }
-    }
 
-    void handleReset(ResetFrame frame) throws IOException {
+    void handleReset(ResetFrame frame) {
         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
         if (!closed) {
             close();
             int error = frame.getErrorCode();
-            throw new IOException(ErrorFrame.stringForCode(error));
+            completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
         } else {
             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
         }
     }
 

@@ -429,24 +429,25 @@
         }
         PushGroup<?,T> pushGroup = exchange.getPushGroup();
         if (pushGroup == null || pushGroup.noMorePushes()) {
             cancelImpl(new IllegalStateException("unexpected push promise"
                 + " on stream " + streamid));
+            return;
         }
 
-        HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
+        HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
 
         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 
-        Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
-                pushReq);
+        Optional<HttpResponse.BodyHandler<T>> bpOpt =
+                pushGroup.handlerForPushRequest(pushReq);
 
         if (!bpOpt.isPresent()) {
             IOException ex = new IOException("Stream "
                  + streamid + " cancelled by user");
             if (Log.trace()) {
-                Log.logTrace("No body processor for {0}: {1}", pushReq,
+                Log.logTrace("No body subscriber for {0}: {1}", pushReq,
                             ex.getMessage());
             }
             pushStream.cancelImpl(ex);
             cf.completeExceptionally(ex);
             return;

@@ -456,10 +457,11 @@
         pushStream.requestSent();
         pushStream.setPushHandler(bpOpt.get());
         // setup housekeeping for when the push is received
         // TODO: deal with ignoring of CF anti-pattern
         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
+            t = Utils.getCompletionCause(t);
             if (Log.trace()) {
                 Log.logTrace("Push completed on stream {0} for {1}{2}",
                              pushStream.streamid, resp,
                              ((t==null) ? "": " with exception " + t));
             }

@@ -514,38 +516,10 @@
 
     HttpHeadersImpl getRequestPseudoHeaders() {
         return requestPseudoHeaders;
     }
 
-    @Override
-    Response getResponse() throws IOException {
-        try {
-            if (request.duration() != null) {
-                Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
-                             streamid,
-                             request.duration().toMillis());
-                return getResponseAsync(null).get(
-                        request.duration().toMillis(), TimeUnit.MILLISECONDS);
-            } else {
-                Log.logTrace("Waiting for response (streamid={0})", streamid);
-                return getResponseAsync(null).join();
-            }
-        } catch (TimeoutException e) {
-            Log.logTrace("Response timeout (streamid={0})", streamid);
-            throw new HttpTimeoutException("Response timed out");
-        } catch (InterruptedException | ExecutionException | CompletionException e) {
-            Throwable t = e.getCause();
-            Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
-            if (t instanceof IOException) {
-                throw (IOException)t;
-            }
-            throw new IOException(e);
-        } finally {
-            Log.logTrace("Got response or failed (streamid={0})", streamid);
-        }
-    }
-
     /** Sets endStreamReceived. Should be called only once. */
     void setEndStreamReceived() {
         assert remotelyClosed == false: "Unexpected endStream already set";
         remotelyClosed = true;
         responseReceived();

@@ -556,104 +530,251 @@
     private boolean endStreamReceived() {
         return remotelyClosed;
     }
 
     @Override
-    void sendHeadersOnly() throws IOException, InterruptedException {
+    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
+        debug.log(Level.DEBUG, "sendHeadersOnly()");
         if (Log.requests() && request != null) {
             Log.logRequest(request.toString());
         }
-        requestContentLen = requestProcessor.contentLength();
+        if (requestPublisher != null) {
+            requestContentLen = requestPublisher.contentLength();
+        } else {
+            requestContentLen = 0;
+        }
         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
         connection.sendFrame(f);
+        CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
+        cf.complete(this);  // #### good enough for now
+        return cf;
+    }
+
+    @Override
+    void released() {
+        if (streamid > 0) {
+            debug.log(Level.DEBUG, "Released stream %d", streamid);
+            // remove this stream from the Http2Connection map.
+            connection.closeStream(streamid);
+        } else {
+            debug.log(Level.DEBUG, "Can't release stream %d", streamid);
+        }
+    }
+
+    @Override
+    void completed() {
+        // There should be nothing to do here: the stream should have
+        // been already closed (or will be closed shortly after).
     }
 
     void registerStream(int id) {
         this.streamid = id;
         connection.putStream(this, streamid);
+        debug.log(Level.DEBUG, "Registered stream %d", id);
+    }
+
+    void signalWindowUpdate() {
+        RequestSubscriber subscriber = requestSubscriber;
+        assert subscriber != null;
+        debug.log(Level.DEBUG, "Signalling window update");
+        subscriber.sendScheduler.runOrSchedule();
     }
 
+    static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
         // can be < 0 if the actual length is not known.
+        private final long contentLength;
         private volatile long remainingContentLength;
         private volatile Subscription subscription;
 
+        // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
+        //  1) The data that was published by the request body Publisher, and
+        //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
+        final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
+
+        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        // A scheduler used to honor window updates. Writing must be paused
+        // when the window is exhausted, and resumed when the window acquires
+        // some space. The sendScheduler makes it possible to implement this
+        // behaviour in an asynchronous non-blocking way.
+        // See RequestSubscriber::trySend below.
+        final SequentialScheduler sendScheduler;
+
         RequestSubscriber(long contentLen) {
+            this.contentLength = contentLen;
             this.remainingContentLength = contentLen;
+            this.sendScheduler =
+                    SequentialScheduler.synchronizedScheduler(this::trySend);
         }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             if (this.subscription != null) {
-                throw new IllegalStateException();
+                throw new IllegalStateException("already subscribed");
             }
             this.subscription = subscription;
+            debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
             subscription.request(1);
         }
 
         @Override
         public void onNext(ByteBuffer item) {
+            debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
+            int size = outgoing.size();
+            assert size == 0 : "non-zero size: " + size;
+            onNextImpl(item);
+        }
+
+        private void onNextImpl(ByteBuffer item) {
+            // Got some more request body bytes to send.
             if (requestBodyCF.isDone()) {
-                throw new IllegalStateException();
+                // stream already cancelled, probably in timeout
+                sendScheduler.stop();
+                subscription.cancel();
+                return;
+            }
+            outgoing.add(item);
+            sendScheduler.runOrSchedule();
             }
 
+        @Override
+        public void onError(Throwable throwable) {
+            debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
+            // ensure that errors are handled within the flow.
+            if (errorRef.compareAndSet(null, throwable)) {
+                sendScheduler.runOrSchedule();
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
+            int size = outgoing.size();
+            assert size == 0 || size == 1 : "non-zero or one size: " + size;
+            // last byte of request body has been obtained.
+            // ensure that everything is completed within the flow.
+            onNextImpl(COMPLETED);
+        }
+
+        // Attempts to send the data, if any.
+        // Handles errors and completion state.
+        // Pause writing if the send window is exhausted, resume it if the
+        // send window has some bytes that can be acquired.
+        void trySend() {
             try {
+                // handle errors raised by onError;
+                Throwable t = errorRef.get();
+                if (t != null) {
+                    sendScheduler.stop();
+                    if (requestBodyCF.isDone()) return;
+                    subscription.cancel();
+                    requestBodyCF.completeExceptionally(t);
+                    return;
+                }
+
+                do {
+                    // handle COMPLETED;
+                    ByteBuffer item = outgoing.peekFirst();
+                    if (item == null) return;
+                    else if (item == COMPLETED) {
+                        sendScheduler.stop();
+                        complete();
+                        return;
+                    }
+
+                    // handle bytes to send downstream
                 while (item.hasRemaining()) {
+                        debug.log(Level.DEBUG, "trySend: %d", item.remaining());
                     assert !endStreamSent : "internal error, send data after END_STREAM flag";
                     DataFrame df = getDataFrame(item);
-                    if (remainingContentLength > 0) {
+                        if (df == null) {
+                            debug.log(Level.DEBUG, "trySend: can't send yet: %d",
+                                    item.remaining());
+                            return; // the send window is exhausted: come back later
+                        }
+
+                        if (contentLength > 0) {
                         remainingContentLength -= df.getDataLength();
-                        assert remainingContentLength >= 0;
-                        if (remainingContentLength == 0) {
+                            if (remainingContentLength < 0) {
+                                String msg = connection().getConnectionFlow()
+                                        + " stream=" + streamid + " "
+                                        + "[" + Thread.currentThread().getName() + "] "
+                                        + "Too many bytes in request body. Expected: "
+                                        + contentLength + ", got: "
+                                        + (contentLength - remainingContentLength);
+                                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+                                throw new IOException(msg);
+                            } else if (remainingContentLength == 0) {
                             df.setFlag(DataFrame.END_STREAM);
                             endStreamSent = true;
                         }
                     }
+                        debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
                     connection.sendDataFrame(df);
                 }
+                    assert !item.hasRemaining();
+                    ByteBuffer b = outgoing.removeFirst();
+                    assert b == item;
+                } while (outgoing.peekFirst() != null);
+
+                debug.log(Level.DEBUG, "trySend: request 1");
                 subscription.request(1);
-            } catch (InterruptedException ex) {
+            } catch (Throwable ex) {
+                debug.log(Level.DEBUG, "trySend: ", ex);
+                sendScheduler.stop();
                 subscription.cancel();
                 requestBodyCF.completeExceptionally(ex);
             }
         }
 
-        @Override
-        public void onError(Throwable throwable) {
-            if (requestBodyCF.isDone()) {
-                return;
+        private void complete() throws IOException {
+            long remaining = remainingContentLength;
+            long written = contentLength - remaining;
+            if (remaining > 0) {
+                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+                // let trySend() handle the exception
+                throw new IOException(connection().getConnectionFlow()
+                                     + " stream=" + streamid + " "
+                                     + "[" + Thread.currentThread().getName() +"] "
+                                     + "Too few bytes returned by the publisher ("
+                                              + written + "/"
+                                              + contentLength + ")");
             }
-            subscription.cancel();
-            requestBodyCF.completeExceptionally(throwable);
-        }
-
-        @Override
-        public void onComplete() {
-            assert endStreamSent || remainingContentLength < 0;
-            try {
                 if (!endStreamSent) {
                     endStreamSent = true;
                     connection.sendDataFrame(getEmptyEndStreamDataFrame());
                 }
                 requestBodyCF.complete(null);
-            } catch (InterruptedException ex) {
-                requestBodyCF.completeExceptionally(ex);
             }
         }
+
+    /**
+     * Send a RESET frame to tell server to stop sending data on this stream
+     */
+    @Override
+    public CompletableFuture<Void> ignoreBody() {
+        try {
+            connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
+            return MinimalFuture.completedFuture(null);
+        } catch (Throwable e) {
+            Log.logTrace("Error resetting stream {0}", e.toString());
+            return MinimalFuture.failedFuture(e);
+        }
     }
 
-    DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
+    DataFrame getDataFrame(ByteBuffer buffer) {
         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
         // blocks waiting for stream send window, if exhausted
-        int actualAmount = windowController.tryAcquire(requestAmount, streamid);
+        int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
+        if (actualAmount <= 0) return null;
         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
-        DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
+        DataFrame df = new DataFrame(streamid, 0 , outBuf);
         return df;
     }
 
-    private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
-        return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
+    private DataFrame getEmptyEndStreamDataFrame()  {
+        return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
     }
 
     /**
      * A List of responses relating to this stream. Normally there is only
      * one response, but intermediate responses like 100 are allowed

@@ -664,11 +785,11 @@
 
     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 
     @Override
     CompletableFuture<Response> getResponseAsync(Executor executor) {
-        CompletableFuture<Response> cf = null;
+        CompletableFuture<Response> cf;
         // The code below deals with race condition that can be caused when
         // completeResponse() is being called before getResponseAsync()
         synchronized (response_cfs) {
             if (!response_cfs.isEmpty()) {
                 // This CompletableFuture was created by completeResponse().

@@ -691,11 +812,11 @@
         }
         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
         PushGroup<?,?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
-            cf = cf.whenComplete((t,e) -> pg.pushError(e));
+            cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
         }
         return cf;
     }
 
     /**

@@ -730,14 +851,10 @@
         if (responseReceived) {
             close();
         }
     }
 
-    final synchronized boolean isResponseReceived() {
-        return responseReceived;
-    }
-
     synchronized void responseReceived() {
         responseReceived = true;
         if (requestSent) {
             close();
         }

@@ -761,13 +878,19 @@
             response_cfs.add(MinimalFuture.failedFuture(t));
         }
     }
 
     CompletableFuture<Void> sendBodyImpl() {
-        RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
-        requestProcessor.subscribe(subscriber);
-        requestBodyCF.whenComplete((v,t) -> requestSent());
+        requestBodyCF.whenComplete((v, t) -> requestSent());
+        if (requestPublisher != null) {
+            final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
+            requestPublisher.subscribe(requestSubscriber = subscriber);
+        } else {
+            // there is no request body, therefore the request is complete,
+            // END_STREAM has already sent with outgoing headers
+            requestBodyCF.complete(null);
+        }
         return requestBodyCF;
     }
 
     @Override
     void cancel() {

@@ -779,25 +902,34 @@
         cancelImpl(cause);
     }
 
     // This method sends a RST_STREAM frame
     void cancelImpl(Throwable e) {
+        debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
         if (Log.trace()) {
             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
         }
         boolean closing;
         if (closing = !closed) { // assigning closing to !closed
             synchronized (this) {
+                failed = e;
                 if (closing = !closed) { // assigning closing to !closed
                     closed=true;
                 }
             }
         }
         if (closing) { // true if the stream has not been closed yet
-            inputQ.close();
+            if (responseSubscriber != null)
+                sched.runOrSchedule();
         }
         completeResponseExceptionally(e);
+        if (!requestBodyCF.isDone()) {
+            requestBodyCF.completeExceptionally(e); // we may be sending the body..
+        }
+        if (responseBodyCF != null) {
+            responseBodyCF.completeExceptionally(e);
+        }
         try {
             // will send a RST_STREAM frame
             if (streamid != 0) {
                 connection.resetStream(streamid, ResetFrame.CANCEL);
             }

@@ -812,35 +944,32 @@
         synchronized(this) {
             if (closed) return;
             closed = true;
         }
         Log.logTrace("Closing stream {0}", streamid);
-        inputQ.close();
         connection.closeStream(streamid);
         Log.logTrace("Stream {0} closed", streamid);
     }
 
     static class PushedStream<U,T> extends Stream<T> {
         final PushGroup<U,T> pushGroup;
-        private final Stream<T> parent;      // used by server push streams
         // push streams need the response CF allocated up front as it is
         // given directly to user via the multi handler callback function.
         final CompletableFuture<Response> pushCF;
         final CompletableFuture<HttpResponse<T>> responseCF;
         final HttpRequestImpl pushReq;
         HttpResponse.BodyHandler<T> pushHandler;
 
-        PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
-                Http2Connection connection, Stream<T> parent,
+        PushedStream(PushGroup<U,T> pushGroup,
+                     Http2Connection connection,
                 Exchange<T> pushReq) {
             // ## no request body possible, null window controller
-            super(client, connection, pushReq, null);
+            super(connection, pushReq, null);
             this.pushGroup = pushGroup;
             this.pushReq = pushReq.request();
             this.pushCF = new MinimalFuture<>();
             this.responseCF = new MinimalFuture<>();
-            this.parent = parent;
         }
 
         CompletableFuture<HttpResponse<T>> responseCF() {
             return responseCF;
         }

@@ -858,22 +987,25 @@
         // error record it in the PushGroup. The error method is called
         // with a null value when no error occurred (is a no-op)
         @Override
         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
             return super.sendBodyAsync()
-                        .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
+                        .whenComplete((ExchangeImpl<T> v, Throwable t)
+                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
         }
 
         @Override
         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
             return super.sendHeadersAsync()
-                        .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
+                        .whenComplete((ExchangeImpl<T> ex, Throwable t)
+                                -> pushGroup.pushError(Utils.getCompletionCause(t)));
         }
 
         @Override
         CompletableFuture<Response> getResponseAsync(Executor executor) {
-            CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+            CompletableFuture<Response> cf = pushCF.whenComplete(
+                    (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
             if(executor!=null && !cf.isDone()) {
                 cf  = cf.thenApplyAsync( r -> r, executor);
             }
             return cf;
         }

@@ -888,40 +1020,40 @@
                         .whenComplete((v, t) -> pushGroup.pushError(t));
         }
 
         @Override
         void completeResponse(Response r) {
-            HttpResponseImpl.logResponse(r);
+            Log.logResponse(r::toString);
             pushCF.complete(r); // not strictly required for push API
-            // start reading the body using the obtained BodyProcessor
+            // start reading the body using the obtained BodySubscriber
             CompletableFuture<Void> start = new MinimalFuture<>();
             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
                 .whenComplete((T body, Throwable t) -> {
                     if (t != null) {
                         responseCF.completeExceptionally(t);
                     } else {
-                        HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
-                        responseCF.complete(response);
+                        HttpResponseImpl<T> resp =
+                                new HttpResponseImpl<>(r.request, r, null, body, getExchange());
+                        responseCF.complete(resp);
                     }
                 });
             start.completeAsync(() -> null, getExchange().executor());
         }
 
         @Override
         void completeResponseExceptionally(Throwable t) {
             pushCF.completeExceptionally(t);
         }
 
-        @Override
-        synchronized void responseReceived() {
-            super.responseReceived();
-        }
+//        @Override
+//        synchronized void responseReceived() {
+//            super.responseReceived();
+//        }
 
         // create and return the PushResponseImpl
         @Override
         protected void handleResponse() {
-            HttpConnection c = connection.connection; // TODO: improve
             responseCode = (int)responseHeaders
                 .firstValueAsLong(":status")
                 .orElse(-1);
 
             if (responseCode == -1) {

@@ -930,13 +1062,15 @@
 
             this.response = new Response(
                 pushReq, exchange, responseHeaders,
                 responseCode, HttpClient.Version.HTTP_2);
 
-            this.responseContentLen = responseHeaders
-                .firstValueAsLong("content-length")
-                .orElse(-1L);
+            /* TODO: review if needs to be removed
+               the value is not used, but in case `content-length` doesn't parse
+               as long, there will be NumberFormatException. If left as is, make
+               sure code up the stack handles NFE correctly. */
+            responseHeaders.firstValueAsLong("content-length");
 
             if (Log.headers()) {
                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
                 sb.append(" (streamid=").append(streamid).append("): ");
                 Log.dumpHeaders(sb, "    ", responseHeaders);

@@ -958,6 +1092,25 @@
         int getStreamId() {
             return streamid;
         }
     }
 
+    /**
+     * Returns true if this exchange was canceled.
+     * @return true if this exchange was canceled.
+     */
+    synchronized boolean isCanceled() {
+        return failed != null;
+    }
+
+    /**
+     * Returns the cause for which this exchange was canceled, if available.
+     * @return the cause for which this exchange was canceled, if available.
+     */
+    synchronized Throwable getCancelCause() {
+        return failed;
+    }
+
+    final String dbgString() {
+        return connection.dbgString() + "/Stream("+streamid+")";
+    }
 }
< prev index next >