< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java

Print this page




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;

  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;

  32 import java.util.List;
  33 import java.util.Optional;
  34 import java.util.concurrent.CompletableFuture;
  35 import java.util.concurrent.CompletionException;
  36 import java.util.concurrent.ExecutionException;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.Flow;
  39 import java.util.concurrent.Flow.Subscription;
  40 import java.util.concurrent.TimeUnit;
  41 import java.util.concurrent.TimeoutException;
  42 import java.util.function.Consumer;
  43 
  44 import jdk.incubator.http.internal.common.*;
  45 import jdk.incubator.http.internal.frame.*;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 
  48 /**
  49  * Http/2 Stream handling.
  50  *
  51  * REQUESTS
  52  *
  53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  54  *
  55  * sendRequest() -- sendHeadersOnly() + sendBody()
  56  *
  57  * sendBody() -- in calling thread: obeys all flow control (so may block)
  58  *               obtains data from request body processor and places on connection
  59  *               outbound Q.
  60  *
  61  * sendBodyAsync() -- calls sendBody() in an executor thread.
  62  *
  63  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  64  *
  65  * sendRequestAsync() -- calls sendRequest() in an executor thread
  66  *
  67  * RESPONSES
  68  *
  69  * Multiple responses can be received per request. Responses are queued up on
  70  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  71  * with the next response
  72  *
  73  * getResponseAsync() -- queries list of response CFs and returns first one
  74  *               if one exists. Otherwise, creates one and adds it to list
  75  *               and returns it. Completion is achieved through the
  76  *               incoming() upcall from connection reader thread.
  77  *
  78  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  79  *
  80  * responseBody() -- in calling thread: blocks for incoming DATA frames on
  81  *               stream inputQ. Obeys remote and local flow control so may block.
  82  *               Calls user response body processor with data buffers.
  83  *
  84  * responseBodyAsync() -- calls responseBody() in an executor thread.
  85  *
  86  * incoming() -- entry point called from connection reader thread. Frames are
  87  *               either handled immediately without blocking or for data frames
  88  *               placed on the stream's inputQ which is consumed by the stream's
  89  *               reader thread.
  90  *
  91  * PushedStream sub class
  92  * ======================
  93  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  94  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  95  * is created. PushedStream does not use responseCF list as there can be only
  96  * one response. The CF is created when the object created and when the response
  97  * HEADERS frame is received the object is completed.
  98  */
  99 class Stream<T> extends ExchangeImpl<T> {
 100 
 101     final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();






 102 
 103     /**
 104      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 105      * the stream's first frame is sent.
 106      */
 107     protected volatile int streamid;
 108 
 109     long responseContentLen = -1;
 110     long responseBytesProcessed = 0;
 111     long requestContentLen;
 112 
 113     final Http2Connection connection;
 114     HttpClientImpl client;
 115     final HttpRequestImpl request;
 116     final DecodingCallback rspHeadersConsumer;
 117     HttpHeadersImpl responseHeaders;
 118     final HttpHeadersImpl requestHeaders;
 119     final HttpHeadersImpl requestPseudoHeaders;
 120     HttpResponse.BodyProcessor<T> responseProcessor;
 121     final HttpRequest.BodyProcessor requestProcessor;

 122     volatile int responseCode;
 123     volatile Response response;
 124     volatile CompletableFuture<Response> responseCF;
 125     final AbstractPushPublisher<ByteBuffer> publisher;
 126     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();

 127 
 128     /** True if END_STREAM has been seen in a frame received on this stream. */
 129     private volatile boolean remotelyClosed;
 130     private volatile boolean closed;
 131     private volatile boolean endStreamSent;
 132 
 133     // state flags
 134     boolean requestSent, responseReceived, responseHeadersReceived;
 135 
 136     /**
 137      * A reference to this Stream's connection Send Window controller. The
 138      * stream MUST acquire the appropriate amount of Send Window before
 139      * sending any data. Will be null for PushStreams, as they cannot send data.
 140      */
 141     private final WindowController windowController;
 142     private final WindowUpdateSender windowUpdater;
 143 
 144     @Override
 145     HttpConnection connection() {
 146         return connection.connection;
 147     }
 148 










































































 149     @Override
 150     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 151                                        boolean returnConnectionToPool,
 152                                        Executor executor)
 153     {
 154         Log.logTrace("Reading body on stream {0}", streamid);
 155         responseProcessor = handler.apply(responseCode, responseHeaders);
 156         publisher.subscribe(responseProcessor);
 157         CompletableFuture<T> cf = receiveData(executor);
 158 
 159         PushGroup<?,?> pg = exchange.getPushGroup();
 160         if (pg != null) {
 161             // if an error occurs make sure it is recorded in the PushGroup
 162             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 163         }
 164         return cf;
 165     }
 166 
 167     @Override
 168     T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
 169         throws IOException
 170     {
 171         CompletableFuture<T> cf = readBodyAsync(handler,
 172                                                 returnConnectionToPool,
 173                                                 null);
 174         try {
 175             return cf.join();
 176         } catch (CompletionException e) {
 177             throw Utils.getIOException(e);
 178         }
 179     }
 180 
 181     @Override
 182     public String toString() {
 183         StringBuilder sb = new StringBuilder();
 184         sb.append("streamid: ")
 185                 .append(streamid);
 186         return sb.toString();
 187     }
 188 
 189     private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
 190         if (frame instanceof ResetFrame) {
 191             handleReset((ResetFrame) frame);
 192             return true;
 193         } else if (!(frame instanceof DataFrame)) {
 194             assert false;
 195             return true;
 196         }
 197         DataFrame df = (DataFrame) frame;
 198         // RFC 7540 6.1:
 199         // The entire DATA frame payload is included in flow control,
 200         // including the Pad Length and Padding fields if present
 201         int len = df.payloadLength();
 202         ByteBufferReference[] buffers = df.getData();
 203         for (ByteBufferReference b : buffers) {
 204             ByteBuffer buf = b.get();
 205             if (buf.hasRemaining()) {
 206                 publisher.acceptData(Optional.of(buf));
 207             }
 208         }
 209         connection.windowUpdater.update(len);
 210         if (df.getFlag(DataFrame.END_STREAM)) {
 211             setEndStreamReceived();
 212             publisher.acceptData(Optional.empty());
 213             return false;
 214         }
 215         // Don't send window update on a stream which is
 216         // closed or half closed.
 217         windowUpdater.update(len);
 218         return true;
 219     }
 220 
 221     // pushes entire response body into response processor
 222     // blocking when required by local or remote flow control
 223     CompletableFuture<T> receiveData(Executor executor) {
 224         CompletableFuture<T> cf = responseProcessor
 225                 .getBody()
 226                 .toCompletableFuture();
 227         Consumer<Throwable> onError = e -> {
 228             Log.logTrace("receiveData: {0}", e.toString());
 229             e.printStackTrace();
 230             cf.completeExceptionally(e);
 231             publisher.acceptError(e);
 232         };
 233         if (executor == null) {
 234             inputQ.blockingReceive(this::receiveDataFrame, onError);
 235         } else {
 236             inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
 237         }
 238         return cf;




 239     }
 240 
 241     @Override
 242     void sendBody() throws IOException {
 243         try {
 244             sendBodyImpl().join();
 245         } catch (CompletionException e) {
 246             throw Utils.getIOException(e);
 247         }
 248     }
 249 
 250     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 251         return sendBodyImpl().thenApply( v -> this);
 252     }
 253 
 254     @SuppressWarnings("unchecked")
 255     Stream(HttpClientImpl client,
 256            Http2Connection connection,
 257            Exchange<T> e,
 258            WindowController windowController)
 259     {
 260         super(e);
 261         this.client = client;
 262         this.connection = connection;
 263         this.windowController = windowController;
 264         this.request = e.request();
 265         this.requestProcessor = request.requestProcessor;
 266         responseHeaders = new HttpHeadersImpl();
 267         requestHeaders = new HttpHeadersImpl();
 268         rspHeadersConsumer = (name, value) -> {
 269             responseHeaders.addHeader(name.toString(), value.toString());
 270             if (Log.headers() && Log.trace()) {
 271                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
 272                              streamid, name, value);
 273             }
 274         };
 275         this.requestPseudoHeaders = new HttpHeadersImpl();
 276         // NEW
 277         this.publisher = new BlockingPushPublisher<>();
 278         this.windowUpdater = new StreamWindowUpdateSender(connection);
 279     }
 280 
 281     /**
 282      * Entry point from Http2Connection reader thread.
 283      *
 284      * Data frames will be removed by response body thread.
 285      */
 286     void incoming(Http2Frame frame) throws IOException {

 287         if ((frame instanceof HeaderFrame)) {
 288             HeaderFrame hframe = (HeaderFrame)frame;
 289             if (hframe.endHeaders()) {
 290                 Log.logTrace("handling response (streamid={0})", streamid);
 291                 handleResponse();
 292                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 293                     inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
 294                 }
 295             }
 296         } else if (frame instanceof DataFrame) {
 297             inputQ.put(frame);
 298         } else {
 299             otherFrame(frame);
 300         }
 301     }
 302 
 303     void otherFrame(Http2Frame frame) throws IOException {
 304         switch (frame.type()) {
 305             case WindowUpdateFrame.TYPE:
 306                 incoming_windowUpdate((WindowUpdateFrame) frame);
 307                 break;
 308             case ResetFrame.TYPE:
 309                 incoming_reset((ResetFrame) frame);
 310                 break;
 311             case PriorityFrame.TYPE:
 312                 incoming_priority((PriorityFrame) frame);
 313                 break;
 314             default:
 315                 String msg = "Unexpected frame: " + frame.toString();
 316                 throw new IOException(msg);
 317         }
 318     }
 319 
 320     // The Hpack decoder decodes into one of these consumers of name,value pairs
 321 
 322     DecodingCallback rspHeadersConsumer() {
 323         return rspHeadersConsumer;
 324     }
 325 
 326     protected void handleResponse() throws IOException {
 327         synchronized(this) {
 328             responseHeadersReceived = true;
 329         }
 330         HttpConnection c = connection.connection; // TODO: improve
 331         responseCode = (int)responseHeaders
 332                 .firstValueAsLong(":status")
 333                 .orElseThrow(() -> new IOException("no statuscode in response"));
 334 
 335         response = new Response(
 336                 request, exchange, responseHeaders,
 337                 responseCode, HttpClient.Version.HTTP_2);
 338 
 339         this.responseContentLen = responseHeaders
 340                 .firstValueAsLong("content-length")
 341                 .orElse(-1L);


 342 
 343         if (Log.headers()) {
 344             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 345             Log.dumpHeaders(sb, "    ", responseHeaders);
 346             Log.logHeaders(sb.toString());
 347         }
 348 
 349         completeResponse(response);
 350     }
 351 
 352     void incoming_reset(ResetFrame frame) throws IOException {
 353         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 354         if (endStreamReceived()) {
 355             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 356         } else if (closed) {
 357             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 358         } else {
 359             boolean pushedToQueue = false;
 360             synchronized(this) {
 361                 // if the response headers are not yet
 362                 // received, or the inputQueue is closed, handle reset directly.
 363                 // Otherwise, put it in the input queue in order to read all
 364                 // pending data frames first. Indeed, a server may send
 365                 // RST_STREAM after sending END_STREAM, in which case we should
 366                 // ignore it. However, we won't know if we have received END_STREAM
 367                 // or not until all pending data frames are read.
 368                 // Because the inputQ will not be read until the response
 369                 // headers are received, and because response headers won't be
 370                 // sent if the server sent RST_STREAM, then we must handle
 371                 // reset here directly unless responseHeadersReceived is true.
 372                 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
 373             }
 374             if (!pushedToQueue) {
 375                 // RST_STREAM was not pushed to the queue: handle it.
 376                 try {
 377                     handleReset(frame);
 378                 } catch (IOException io) {
 379                     completeResponseExceptionally(io);
 380                 }
 381             } else {
 382                 // RST_STREAM was pushed to the queue. It will be handled by
 383                 // asyncReceive after all pending data frames have been
 384                 // processed.
 385                 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 386             }
 387         }
 388     }
 389 
 390     void handleReset(ResetFrame frame) throws IOException {
 391         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 392         if (!closed) {
 393             close();
 394             int error = frame.getErrorCode();
 395             throw new IOException(ErrorFrame.stringForCode(error));
 396         } else {
 397             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 398         }
 399     }
 400 
 401     void incoming_priority(PriorityFrame frame) {
 402         // TODO: implement priority
 403         throw new UnsupportedOperationException("Not implemented");
 404     }
 405 
 406     private void incoming_windowUpdate(WindowUpdateFrame frame)
 407         throws IOException
 408     {
 409         int amount = frame.getUpdate();
 410         if (amount <= 0) {
 411             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
 412                          streamid, streamid, amount);
 413             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 414         } else {
 415             assert streamid != 0;
 416             boolean success = windowController.increaseStreamWindow(amount, streamid);
 417             if (!success) {  // overflow
 418                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 419             }
 420         }
 421     }
 422 
 423     void incoming_pushPromise(HttpRequestImpl pushReq,
 424                               PushedStream<?,T> pushStream)
 425         throws IOException
 426     {
 427         if (Log.requests()) {
 428             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 429         }
 430         PushGroup<?,T> pushGroup = exchange.getPushGroup();
 431         if (pushGroup == null || pushGroup.noMorePushes()) {
 432             cancelImpl(new IllegalStateException("unexpected push promise"
 433                 + " on stream " + streamid));

 434         }
 435 
 436         HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
 437 
 438         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 439 
 440         Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
 441                 pushReq);
 442 
 443         if (!bpOpt.isPresent()) {
 444             IOException ex = new IOException("Stream "
 445                  + streamid + " cancelled by user");
 446             if (Log.trace()) {
 447                 Log.logTrace("No body processor for {0}: {1}", pushReq,
 448                             ex.getMessage());
 449             }
 450             pushStream.cancelImpl(ex);
 451             cf.completeExceptionally(ex);
 452             return;
 453         }
 454 
 455         pushGroup.addPush();
 456         pushStream.requestSent();
 457         pushStream.setPushHandler(bpOpt.get());
 458         // setup housekeeping for when the push is received
 459         // TODO: deal with ignoring of CF anti-pattern
 460         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {

 461             if (Log.trace()) {
 462                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 463                              pushStream.streamid, resp,
 464                              ((t==null) ? "": " with exception " + t));
 465             }
 466             if (t != null) {
 467                 pushGroup.pushError(t);
 468                 proc.onError(pushReq, t);
 469             } else {
 470                 proc.onResponse(resp);
 471             }
 472             pushGroup.pushCompleted();
 473         });
 474 
 475     }
 476 
 477     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 478         HttpHeadersImpl h = request.getSystemHeaders();
 479         if (contentLength > 0) {
 480             h.setHeader("content-length", Long.toString(contentLength));


 499         // TODO: ensure header names beginning with : not in user headers
 500         String query = uri.getQuery();
 501         String path = uri.getPath();
 502         if (path == null || path.isEmpty()) {
 503             if (method.equalsIgnoreCase("OPTIONS")) {
 504                 path = "*";
 505             } else {
 506                 path = "/";
 507             }
 508         }
 509         if (query != null) {
 510             path += "?" + query;
 511         }
 512         hdrs.setHeader(":path", path);
 513     }
 514 
 515     HttpHeadersImpl getRequestPseudoHeaders() {
 516         return requestPseudoHeaders;
 517     }
 518 
 519     @Override
 520     Response getResponse() throws IOException {
 521         try {
 522             if (request.duration() != null) {
 523                 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
 524                              streamid,
 525                              request.duration().toMillis());
 526                 return getResponseAsync(null).get(
 527                         request.duration().toMillis(), TimeUnit.MILLISECONDS);
 528             } else {
 529                 Log.logTrace("Waiting for response (streamid={0})", streamid);
 530                 return getResponseAsync(null).join();
 531             }
 532         } catch (TimeoutException e) {
 533             Log.logTrace("Response timeout (streamid={0})", streamid);
 534             throw new HttpTimeoutException("Response timed out");
 535         } catch (InterruptedException | ExecutionException | CompletionException e) {
 536             Throwable t = e.getCause();
 537             Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
 538             if (t instanceof IOException) {
 539                 throw (IOException)t;
 540             }
 541             throw new IOException(e);
 542         } finally {
 543             Log.logTrace("Got response or failed (streamid={0})", streamid);
 544         }
 545     }
 546 
 547     /** Sets endStreamReceived. Should be called only once. */
 548     void setEndStreamReceived() {
 549         assert remotelyClosed == false: "Unexpected endStream already set";
 550         remotelyClosed = true;
 551         responseReceived();
 552     }
 553 
 554     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 555      *  received on this stream. */
 556     private boolean endStreamReceived() {
 557         return remotelyClosed;
 558     }
 559 
 560     @Override
 561     void sendHeadersOnly() throws IOException, InterruptedException {

 562         if (Log.requests() && request != null) {
 563             Log.logRequest(request.toString());
 564         }
 565         requestContentLen = requestProcessor.contentLength();




 566         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 567         connection.sendFrame(f);




















 568     }
 569 
 570     void registerStream(int id) {
 571         this.streamid = id;
 572         connection.putStream(this, streamid);








 573     }
 574 

 575     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 576         // can be < 0 if the actual length is not known.

 577         private volatile long remainingContentLength;
 578         private volatile Subscription subscription;
 579 













 580         RequestSubscriber(long contentLen) {

 581             this.remainingContentLength = contentLen;


 582         }
 583 
 584         @Override
 585         public void onSubscribe(Flow.Subscription subscription) {
 586             if (this.subscription != null) {
 587                 throw new IllegalStateException();
 588             }
 589             this.subscription = subscription;

 590             subscription.request(1);
 591         }
 592 
 593         @Override
 594         public void onNext(ByteBuffer item) {








 595             if (requestBodyCF.isDone()) {
 596                 throw new IllegalStateException();






 597             }
 598 
























 599             try {





















 600                 while (item.hasRemaining()) {

 601                     assert !endStreamSent : "internal error, send data after END_STREAM flag";
 602                     DataFrame df = getDataFrame(item);
 603                     if (remainingContentLength > 0) {






 604                         remainingContentLength -= df.getDataLength();
 605                         assert remainingContentLength >= 0;
 606                         if (remainingContentLength == 0) {








 607                             df.setFlag(DataFrame.END_STREAM);
 608                             endStreamSent = true;
 609                         }
 610                     }

 611                     connection.sendDataFrame(df);
 612                 }






 613                 subscription.request(1);
 614             } catch (InterruptedException ex) {


 615                 subscription.cancel();
 616                 requestBodyCF.completeExceptionally(ex);
 617             }
 618         }
 619 
 620         @Override
 621         public void onError(Throwable throwable) {
 622             if (requestBodyCF.isDone()) {
 623                 return;








 624             }
 625             subscription.cancel();
 626             requestBodyCF.completeExceptionally(throwable);
 627         }
 628 
 629         @Override
 630         public void onComplete() {
 631             assert endStreamSent || remainingContentLength < 0;
 632             try {
 633                 if (!endStreamSent) {
 634                     endStreamSent = true;
 635                     connection.sendDataFrame(getEmptyEndStreamDataFrame());
 636                 }
 637                 requestBodyCF.complete(null);
 638             } catch (InterruptedException ex) {
 639                 requestBodyCF.completeExceptionally(ex);
 640             }
 641         }













 642     }
 643 
 644     DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
 645         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 646         // blocks waiting for stream send window, if exhausted
 647         int actualAmount = windowController.tryAcquire(requestAmount, streamid);

 648         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
 649         DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
 650         return df;
 651     }
 652 
 653     private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
 654         return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
 655     }
 656 
 657     /**
 658      * A List of responses relating to this stream. Normally there is only
 659      * one response, but intermediate responses like 100 are allowed
 660      * and must be passed up to higher level before continuing. Deals with races
 661      * such as if responses are returned before the CFs get created by
 662      * getResponseAsync()
 663      */
 664 
 665     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 666 
 667     @Override
 668     CompletableFuture<Response> getResponseAsync(Executor executor) {
 669         CompletableFuture<Response> cf = null;
 670         // The code below deals with race condition that can be caused when
 671         // completeResponse() is being called before getResponseAsync()
 672         synchronized (response_cfs) {
 673             if (!response_cfs.isEmpty()) {
 674                 // This CompletableFuture was created by completeResponse().
 675                 // it will be already completed.
 676                 cf = response_cfs.remove(0);
 677                 // if we find a cf here it should be already completed.
 678                 // finding a non completed cf should not happen. just assert it.
 679                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 680             } else {
 681                 // getResponseAsync() is called first. Create a CompletableFuture
 682                 // that will be completed by completeResponse() when
 683                 // completeResponse() is called.
 684                 cf = new MinimalFuture<>();
 685                 response_cfs.add(cf);
 686             }
 687         }
 688         if (executor != null && !cf.isDone()) {
 689             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 690             cf = cf.thenApplyAsync(r -> r, executor);
 691         }
 692         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 693         PushGroup<?,?> pg = exchange.getPushGroup();
 694         if (pg != null) {
 695             // if an error occurs make sure it is recorded in the PushGroup
 696             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 697         }
 698         return cf;
 699     }
 700 
 701     /**
 702      * Completes the first uncompleted CF on list, and removes it. If there is no
 703      * uncompleted CF then creates one (completes it) and adds to list
 704      */
 705     void completeResponse(Response resp) {
 706         synchronized (response_cfs) {
 707             CompletableFuture<Response> cf;
 708             int cfs_len = response_cfs.size();
 709             for (int i=0; i<cfs_len; i++) {
 710                 cf = response_cfs.get(i);
 711                 if (!cf.isDone()) {
 712                     Log.logTrace("Completing response (streamid={0}): {1}",
 713                                  streamid, cf);
 714                     cf.complete(resp);
 715                     response_cfs.remove(cf);
 716                     return;
 717                 } // else we found the previous response: just leave it alone.
 718             }
 719             cf = MinimalFuture.completedFuture(resp);
 720             Log.logTrace("Created completed future (streamid={0}): {1}",
 721                          streamid, cf);
 722             response_cfs.add(cf);
 723         }
 724     }
 725 
 726     // methods to update state and remove stream when finished
 727 
 728     synchronized void requestSent() {
 729         requestSent = true;
 730         if (responseReceived) {
 731             close();
 732         }
 733     }
 734 
 735     final synchronized boolean isResponseReceived() {
 736         return responseReceived;
 737     }
 738 
 739     synchronized void responseReceived() {
 740         responseReceived = true;
 741         if (requestSent) {
 742             close();
 743         }
 744     }
 745 
 746     /**
 747      * same as above but for errors
 748      */
 749     void completeResponseExceptionally(Throwable t) {
 750         synchronized (response_cfs) {
 751             // use index to avoid ConcurrentModificationException
 752             // caused by removing the CF from within the loop.
 753             for (int i = 0; i < response_cfs.size(); i++) {
 754                 CompletableFuture<Response> cf = response_cfs.get(i);
 755                 if (!cf.isDone()) {
 756                     cf.completeExceptionally(t);
 757                     response_cfs.remove(i);
 758                     return;
 759                 }
 760             }
 761             response_cfs.add(MinimalFuture.failedFuture(t));
 762         }
 763     }
 764 
 765     CompletableFuture<Void> sendBodyImpl() {
 766         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
 767         requestProcessor.subscribe(subscriber);
 768         requestBodyCF.whenComplete((v,t) -> requestSent());






 769         return requestBodyCF;
 770     }
 771 
 772     @Override
 773     void cancel() {
 774         cancel(new IOException("Stream " + streamid + " cancelled"));
 775     }
 776 
 777     @Override
 778     void cancel(IOException cause) {
 779         cancelImpl(cause);
 780     }
 781 
 782     // This method sends a RST_STREAM frame
 783     void cancelImpl(Throwable e) {

 784         if (Log.trace()) {
 785             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
 786         }
 787         boolean closing;
 788         if (closing = !closed) { // assigning closing to !closed
 789             synchronized (this) {

 790                 if (closing = !closed) { // assigning closing to !closed
 791                     closed=true;
 792                 }
 793             }
 794         }
 795         if (closing) { // true if the stream has not been closed yet
 796             inputQ.close();

 797         }
 798         completeResponseExceptionally(e);






 799         try {
 800             // will send a RST_STREAM frame
 801             if (streamid != 0) {
 802                 connection.resetStream(streamid, ResetFrame.CANCEL);
 803             }
 804         } catch (IOException ex) {
 805             Log.logError(ex);
 806         }
 807     }
 808 
 809     // This method doesn't send any frame
 810     void close() {
 811         if (closed) return;
 812         synchronized(this) {
 813             if (closed) return;
 814             closed = true;
 815         }
 816         Log.logTrace("Closing stream {0}", streamid);
 817         inputQ.close();
 818         connection.closeStream(streamid);
 819         Log.logTrace("Stream {0} closed", streamid);
 820     }
 821 
 822     static class PushedStream<U,T> extends Stream<T> {
 823         final PushGroup<U,T> pushGroup;
 824         private final Stream<T> parent;      // used by server push streams
 825         // push streams need the response CF allocated up front as it is
 826         // given directly to user via the multi handler callback function.
 827         final CompletableFuture<Response> pushCF;
 828         final CompletableFuture<HttpResponse<T>> responseCF;
 829         final HttpRequestImpl pushReq;
 830         HttpResponse.BodyHandler<T> pushHandler;
 831 
 832         PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
 833                 Http2Connection connection, Stream<T> parent,
 834                 Exchange<T> pushReq) {
 835             // ## no request body possible, null window controller
 836             super(client, connection, pushReq, null);
 837             this.pushGroup = pushGroup;
 838             this.pushReq = pushReq.request();
 839             this.pushCF = new MinimalFuture<>();
 840             this.responseCF = new MinimalFuture<>();
 841             this.parent = parent;
 842         }
 843 
 844         CompletableFuture<HttpResponse<T>> responseCF() {
 845             return responseCF;
 846         }
 847 
 848         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
 849             this.pushHandler = pushHandler;
 850         }
 851 
 852         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
 853             // ignored parameters to function can be used as BodyHandler
 854             return this.pushHandler;
 855         }
 856 
 857         // Following methods call the super class but in case of
 858         // error record it in the PushGroup. The error method is called
 859         // with a null value when no error occurred (is a no-op)
 860         @Override
 861         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 862             return super.sendBodyAsync()
 863                         .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));

 864         }
 865 
 866         @Override
 867         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 868             return super.sendHeadersAsync()
 869                         .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));

 870         }
 871 
 872         @Override
 873         CompletableFuture<Response> getResponseAsync(Executor executor) {
 874             CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));

 875             if(executor!=null && !cf.isDone()) {
 876                 cf  = cf.thenApplyAsync( r -> r, executor);
 877             }
 878             return cf;
 879         }
 880 
 881         @Override
 882         CompletableFuture<T> readBodyAsync(
 883                 HttpResponse.BodyHandler<T> handler,
 884                 boolean returnConnectionToPool,
 885                 Executor executor)
 886         {
 887             return super.readBodyAsync(handler, returnConnectionToPool, executor)
 888                         .whenComplete((v, t) -> pushGroup.pushError(t));
 889         }
 890 
 891         @Override
 892         void completeResponse(Response r) {
 893             HttpResponseImpl.logResponse(r);
 894             pushCF.complete(r); // not strictly required for push API
 895             // start reading the body using the obtained BodyProcessor
 896             CompletableFuture<Void> start = new MinimalFuture<>();
 897             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
 898                 .whenComplete((T body, Throwable t) -> {
 899                     if (t != null) {
 900                         responseCF.completeExceptionally(t);
 901                     } else {
 902                         HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
 903                         responseCF.complete(response);

 904                     }
 905                 });
 906             start.completeAsync(() -> null, getExchange().executor());
 907         }
 908 
 909         @Override
 910         void completeResponseExceptionally(Throwable t) {
 911             pushCF.completeExceptionally(t);
 912         }
 913 
 914         @Override
 915         synchronized void responseReceived() {
 916             super.responseReceived();
 917         }
 918 
 919         // create and return the PushResponseImpl
 920         @Override
 921         protected void handleResponse() {
 922             HttpConnection c = connection.connection; // TODO: improve
 923             responseCode = (int)responseHeaders
 924                 .firstValueAsLong(":status")
 925                 .orElse(-1);
 926 
 927             if (responseCode == -1) {
 928                 completeResponseExceptionally(new IOException("No status code"));
 929             }
 930 
 931             this.response = new Response(
 932                 pushReq, exchange, responseHeaders,
 933                 responseCode, HttpClient.Version.HTTP_2);
 934 
 935             this.responseContentLen = responseHeaders
 936                 .firstValueAsLong("content-length")
 937                 .orElse(-1L);


 938 
 939             if (Log.headers()) {
 940                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
 941                 sb.append(" (streamid=").append(streamid).append("): ");
 942                 Log.dumpHeaders(sb, "    ", responseHeaders);
 943                 Log.logHeaders(sb.toString());
 944             }
 945 
 946             // different implementations for normal streams and pushed streams
 947             completeResponse(response);
 948         }
 949     }
 950 
 951     final class StreamWindowUpdateSender extends WindowUpdateSender {
 952 
 953         StreamWindowUpdateSender(Http2Connection connection) {
 954             super(connection);
 955         }
 956 
 957         @Override
 958         int getStreamId() {
 959             return streamid;
 960         }
 961     }
 962 



















 963 }


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.URI;
  31 import java.nio.ByteBuffer;
  32 import java.util.ArrayList;
  33 import java.util.Collections;
  34 import java.util.List;
  35 import java.util.Optional;
  36 import java.util.concurrent.CompletableFuture;
  37 import java.util.concurrent.ConcurrentLinkedDeque;
  38 import java.util.concurrent.ConcurrentLinkedQueue;
  39 import java.util.concurrent.Executor;
  40 import java.util.concurrent.Flow;
  41 import java.util.concurrent.Flow.Subscription;
  42 import java.util.concurrent.atomic.AtomicReference;
  43 import jdk.incubator.http.HttpResponse.BodySubscriber;


  44 import jdk.incubator.http.internal.common.*;
  45 import jdk.incubator.http.internal.frame.*;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 
  48 /**
  49  * Http/2 Stream handling.
  50  *
  51  * REQUESTS
  52  *
  53  * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
  54  *
  55  * sendRequest() -- sendHeadersOnly() + sendBody()
  56  *




  57  * sendBodyAsync() -- calls sendBody() in an executor thread.
  58  *
  59  * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
  60  *
  61  * sendRequestAsync() -- calls sendRequest() in an executor thread
  62  *
  63  * RESPONSES
  64  *
  65  * Multiple responses can be received per request. Responses are queued up on
  66  * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
  67  * with the next response
  68  *
  69  * getResponseAsync() -- queries list of response CFs and returns first one
  70  *               if one exists. Otherwise, creates one and adds it to list
  71  *               and returns it. Completion is achieved through the
  72  *               incoming() upcall from connection reader thread.
  73  *
  74  * getResponse() -- calls getResponseAsync() and waits for CF to complete
  75  *




  76  * responseBodyAsync() -- calls responseBody() in an executor thread.
  77  *
  78  * incoming() -- entry point called from connection reader thread. Frames are
  79  *               either handled immediately without blocking or for data frames
  80  *               placed on the stream's inputQ which is consumed by the stream's
  81  *               reader thread.
  82  *
  83  * PushedStream sub class
  84  * ======================
  85  * Sending side methods are not used because the request comes from a PUSH_PROMISE
  86  * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
  87  * is created. PushedStream does not use responseCF list as there can be only
  88  * one response. The CF is created when the object created and when the response
  89  * HEADERS frame is received the object is completed.
  90  */
  91 class Stream<T> extends ExchangeImpl<T> {
  92 
  93     final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
  94     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  95 
  96     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
  97     final SequentialScheduler sched =
  98             SequentialScheduler.synchronizedScheduler(this::schedule);
  99     final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
 100 
 101     /**
 102      * This stream's identifier. Assigned lazily by the HTTP2Connection before
 103      * the stream's first frame is sent.
 104      */
 105     protected volatile int streamid;
 106 


 107     long requestContentLen;
 108 
 109     final Http2Connection connection;

 110     final HttpRequestImpl request;
 111     final DecodingCallback rspHeadersConsumer;
 112     HttpHeadersImpl responseHeaders;

 113     final HttpHeadersImpl requestPseudoHeaders;
 114     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
 115     final HttpRequest.BodyPublisher requestPublisher;
 116     volatile RequestSubscriber requestSubscriber;
 117     volatile int responseCode;
 118     volatile Response response;
 119     volatile Throwable failed; // The exception with which this stream was canceled.

 120     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
 121     volatile CompletableFuture<T> responseBodyCF;
 122 
 123     /** True if END_STREAM has been seen in a frame received on this stream. */
 124     private volatile boolean remotelyClosed;
 125     private volatile boolean closed;
 126     private volatile boolean endStreamSent;
 127 
 128     // state flags
 129     private boolean requestSent, responseReceived;
 130 
 131     /**
 132      * A reference to this Stream's connection Send Window controller. The
 133      * stream MUST acquire the appropriate amount of Send Window before
 134      * sending any data. Will be null for PushStreams, as they cannot send data.
 135      */
 136     private final WindowController windowController;
 137     private final WindowUpdateSender windowUpdater;
 138 
 139     @Override
 140     HttpConnection connection() {
 141         return connection.connection;
 142     }
 143 
 144     /**
 145      * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
 146      * of after user subscription window has re-opened, from SubscriptionBase.request()
 147      */
 148     private void schedule() {
 149         if (responseSubscriber == null)
 150             // can't process anything yet
 151             return;
 152 
 153         while (!inputQ.isEmpty()) {
 154             Http2Frame frame  = inputQ.peek();
 155             if (frame instanceof ResetFrame) {
 156                 inputQ.remove();
 157                 handleReset((ResetFrame)frame);
 158                 return;
 159             }
 160             DataFrame df = (DataFrame)frame;
 161             boolean finished = df.getFlag(DataFrame.END_STREAM);
 162 
 163             List<ByteBuffer> buffers = df.getData();
 164             List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
 165             int size = Utils.remaining(dsts, Integer.MAX_VALUE);
 166             if (size == 0 && finished) {
 167                 inputQ.remove();
 168                 Log.logTrace("responseSubscriber.onComplete");
 169                 debug.log(Level.DEBUG, "incoming: onComplete");
 170                 sched.stop();
 171                 responseSubscriber.onComplete();
 172                 setEndStreamReceived();
 173                 return;
 174             } else if (userSubscription.tryDecrement()) {
 175                 inputQ.remove();
 176                 Log.logTrace("responseSubscriber.onNext {0}", size);
 177                 debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
 178                 responseSubscriber.onNext(dsts);
 179                 if (consumed(df)) {
 180                     Log.logTrace("responseSubscriber.onComplete");
 181                     debug.log(Level.DEBUG, "incoming: onComplete");
 182                     sched.stop();
 183                     responseSubscriber.onComplete();
 184                     setEndStreamReceived();
 185                     return;
 186                 }
 187             } else {
 188                 return;
 189             }
 190         }
 191         Throwable t = failed;
 192         if (t != null) {
 193             sched.stop();
 194             responseSubscriber.onError(t);
 195             close();
 196         }
 197     }
 198 
 199     // Callback invoked after the Response BodySubscriber has consumed the
 200     // buffers contained in a DataFrame.
 201     // Returns true if END_STREAM is reached, false otherwise.
 202     private boolean consumed(DataFrame df) {
 203         // RFC 7540 6.1:
 204         // The entire DATA frame payload is included in flow control,
 205         // including the Pad Length and Padding fields if present
 206         int len = df.payloadLength();
 207         connection.windowUpdater.update(len);
 208 
 209         if (!df.getFlag(DataFrame.END_STREAM)) {
 210             // Don't send window update on a stream which is
 211             // closed or half closed.
 212             windowUpdater.update(len);
 213             return false; // more data coming
 214         }
 215         return true; // end of stream
 216     }
 217 
 218     @Override
 219     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
 220                                        boolean returnConnectionToPool,
 221                                        Executor executor)
 222     {
 223         Log.logTrace("Reading body on stream {0}", streamid);
 224         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
 225         CompletableFuture<T> cf = receiveData(bodySubscriber);

 226 
 227         PushGroup<?,?> pg = exchange.getPushGroup();
 228         if (pg != null) {
 229             // if an error occurs make sure it is recorded in the PushGroup
 230             cf = cf.whenComplete((t,e) -> pg.pushError(e));
 231         }
 232         return cf;
 233     }
 234 
 235     @Override














 236     public String toString() {
 237         StringBuilder sb = new StringBuilder();
 238         sb.append("streamid: ")
 239                 .append(streamid);
 240         return sb.toString();
 241     }
 242 
 243     private void receiveDataFrame(DataFrame df) {
 244         inputQ.add(df);
 245         sched.runOrSchedule();
















 246     }
 247 
 248     /** Handles a RESET frame. RESET is always handled inline in the queue. */
 249     private void receiveResetFrame(ResetFrame frame) {
 250         inputQ.add(frame);
 251         sched.runOrSchedule();





 252     }
 253 
 254     // pushes entire response body into response subscriber
 255     // blocking when required by local or remote flow control
 256     CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
 257         responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
 258 
 259         if (isCanceled()) {
 260             Throwable t = getCancelCause();
 261             responseBodyCF.completeExceptionally(t);






 262         } else {
 263             bodySubscriber.onSubscribe(userSubscription);
 264         }
 265         // Set the responseSubscriber field now that onSubscribe has been called.
 266         // This effectively allows the scheduler to start invoking the callbacks.
 267         responseSubscriber = bodySubscriber;
 268         sched.runOrSchedule(); // in case data waiting already to be processed
 269         return responseBodyCF;
 270     }
 271 
 272     @Override








 273     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 274         return sendBodyImpl().thenApply( v -> this);
 275     }
 276 
 277     @SuppressWarnings("unchecked")
 278     Stream(Http2Connection connection,

 279            Exchange<T> e,
 280            WindowController windowController)
 281     {
 282         super(e);

 283         this.connection = connection;
 284         this.windowController = windowController;
 285         this.request = e.request();
 286         this.requestPublisher = request.requestPublisher;  // may be null
 287         responseHeaders = new HttpHeadersImpl();

 288         rspHeadersConsumer = (name, value) -> {
 289             responseHeaders.addHeader(name.toString(), value.toString());
 290             if (Log.headers() && Log.trace()) {
 291                 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
 292                              streamid, name, value);
 293             }
 294         };
 295         this.requestPseudoHeaders = new HttpHeadersImpl();
 296         // NEW

 297         this.windowUpdater = new StreamWindowUpdateSender(connection);
 298     }
 299 
 300     /**
 301      * Entry point from Http2Connection reader thread.
 302      *
 303      * Data frames will be removed by response body thread.
 304      */
 305     void incoming(Http2Frame frame) throws IOException {
 306         debug.log(Level.DEBUG, "incoming: %s", frame);
 307         if ((frame instanceof HeaderFrame)) {
 308             HeaderFrame hframe = (HeaderFrame)frame;
 309             if (hframe.endHeaders()) {
 310                 Log.logTrace("handling response (streamid={0})", streamid);
 311                 handleResponse();
 312                 if (hframe.getFlag(HeaderFrame.END_STREAM)) {
 313                     receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
 314                 }
 315             }
 316         } else if (frame instanceof DataFrame) {
 317             receiveDataFrame((DataFrame)frame);
 318         } else {
 319             otherFrame(frame);
 320         }
 321     }
 322 
 323     void otherFrame(Http2Frame frame) throws IOException {
 324         switch (frame.type()) {
 325             case WindowUpdateFrame.TYPE:
 326                 incoming_windowUpdate((WindowUpdateFrame) frame);
 327                 break;
 328             case ResetFrame.TYPE:
 329                 incoming_reset((ResetFrame) frame);
 330                 break;
 331             case PriorityFrame.TYPE:
 332                 incoming_priority((PriorityFrame) frame);
 333                 break;
 334             default:
 335                 String msg = "Unexpected frame: " + frame.toString();
 336                 throw new IOException(msg);
 337         }
 338     }
 339 
 340     // The Hpack decoder decodes into one of these consumers of name,value pairs
 341 
 342     DecodingCallback rspHeadersConsumer() {
 343         return rspHeadersConsumer;
 344     }
 345 
 346     protected void handleResponse() throws IOException {




 347         responseCode = (int)responseHeaders
 348                 .firstValueAsLong(":status")
 349                 .orElseThrow(() -> new IOException("no statuscode in response"));
 350 
 351         response = new Response(
 352                 request, exchange, responseHeaders,
 353                 responseCode, HttpClient.Version.HTTP_2);
 354 
 355         /* TODO: review if needs to be removed
 356            the value is not used, but in case `content-length` doesn't parse as
 357            long, there will be NumberFormatException. If left as is, make sure
 358            code up the stack handles NFE correctly. */
 359         responseHeaders.firstValueAsLong("content-length");
 360 
 361         if (Log.headers()) {
 362             StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 363             Log.dumpHeaders(sb, "    ", responseHeaders);
 364             Log.logHeaders(sb.toString());
 365         }
 366 
 367         completeResponse(response);
 368     }
 369 
 370     void incoming_reset(ResetFrame frame) {
 371         Log.logTrace("Received RST_STREAM on stream {0}", streamid);
 372         if (endStreamReceived()) {
 373             Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
 374         } else if (closed) {
 375             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 376         } else {
 377             // put it in the input queue in order to read all




 378             // pending data frames first. Indeed, a server may send
 379             // RST_STREAM after sending END_STREAM, in which case we should
 380             // ignore it. However, we won't know if we have received END_STREAM
 381             // or not until all pending data frames are read.
 382             receiveResetFrame(frame);













 383             // RST_STREAM was pushed to the queue. It will be handled by
 384             // asyncReceive after all pending data frames have been
 385             // processed.
 386             Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
 387         }
 388     }

 389 
 390     void handleReset(ResetFrame frame) {
 391         Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
 392         if (!closed) {
 393             close();
 394             int error = frame.getErrorCode();
 395             completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
 396         } else {
 397             Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
 398         }
 399     }
 400 
 401     void incoming_priority(PriorityFrame frame) {
 402         // TODO: implement priority
 403         throw new UnsupportedOperationException("Not implemented");
 404     }
 405 
 406     private void incoming_windowUpdate(WindowUpdateFrame frame)
 407         throws IOException
 408     {
 409         int amount = frame.getUpdate();
 410         if (amount <= 0) {
 411             Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
 412                          streamid, streamid, amount);
 413             connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 414         } else {
 415             assert streamid != 0;
 416             boolean success = windowController.increaseStreamWindow(amount, streamid);
 417             if (!success) {  // overflow
 418                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
 419             }
 420         }
 421     }
 422 
 423     void incoming_pushPromise(HttpRequestImpl pushReq,
 424                               PushedStream<?,T> pushStream)
 425         throws IOException
 426     {
 427         if (Log.requests()) {
 428             Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
 429         }
 430         PushGroup<?,T> pushGroup = exchange.getPushGroup();
 431         if (pushGroup == null || pushGroup.noMorePushes()) {
 432             cancelImpl(new IllegalStateException("unexpected push promise"
 433                 + " on stream " + streamid));
 434             return;
 435         }
 436 
 437         HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
 438 
 439         CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
 440 
 441         Optional<HttpResponse.BodyHandler<T>> bpOpt =
 442                 pushGroup.handlerForPushRequest(pushReq);
 443 
 444         if (!bpOpt.isPresent()) {
 445             IOException ex = new IOException("Stream "
 446                  + streamid + " cancelled by user");
 447             if (Log.trace()) {
 448                 Log.logTrace("No body subscriber for {0}: {1}", pushReq,
 449                             ex.getMessage());
 450             }
 451             pushStream.cancelImpl(ex);
 452             cf.completeExceptionally(ex);
 453             return;
 454         }
 455 
 456         pushGroup.addPush();
 457         pushStream.requestSent();
 458         pushStream.setPushHandler(bpOpt.get());
 459         // setup housekeeping for when the push is received
 460         // TODO: deal with ignoring of CF anti-pattern
 461         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
 462             t = Utils.getCompletionCause(t);
 463             if (Log.trace()) {
 464                 Log.logTrace("Push completed on stream {0} for {1}{2}",
 465                              pushStream.streamid, resp,
 466                              ((t==null) ? "": " with exception " + t));
 467             }
 468             if (t != null) {
 469                 pushGroup.pushError(t);
 470                 proc.onError(pushReq, t);
 471             } else {
 472                 proc.onResponse(resp);
 473             }
 474             pushGroup.pushCompleted();
 475         });
 476 
 477     }
 478 
 479     private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
 480         HttpHeadersImpl h = request.getSystemHeaders();
 481         if (contentLength > 0) {
 482             h.setHeader("content-length", Long.toString(contentLength));


 501         // TODO: ensure header names beginning with : not in user headers
 502         String query = uri.getQuery();
 503         String path = uri.getPath();
 504         if (path == null || path.isEmpty()) {
 505             if (method.equalsIgnoreCase("OPTIONS")) {
 506                 path = "*";
 507             } else {
 508                 path = "/";
 509             }
 510         }
 511         if (query != null) {
 512             path += "?" + query;
 513         }
 514         hdrs.setHeader(":path", path);
 515     }
 516 
 517     HttpHeadersImpl getRequestPseudoHeaders() {
 518         return requestPseudoHeaders;
 519     }
 520 




























 521     /** Sets endStreamReceived. Should be called only once. */
 522     void setEndStreamReceived() {
 523         assert remotelyClosed == false: "Unexpected endStream already set";
 524         remotelyClosed = true;
 525         responseReceived();
 526     }
 527 
 528     /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
 529      *  received on this stream. */
 530     private boolean endStreamReceived() {
 531         return remotelyClosed;
 532     }
 533 
 534     @Override
 535     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 536         debug.log(Level.DEBUG, "sendHeadersOnly()");
 537         if (Log.requests() && request != null) {
 538             Log.logRequest(request.toString());
 539         }
 540         if (requestPublisher != null) {
 541             requestContentLen = requestPublisher.contentLength();
 542         } else {
 543             requestContentLen = 0;
 544         }
 545         OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
 546         connection.sendFrame(f);
 547         CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
 548         cf.complete(this);  // #### good enough for now
 549         return cf;
 550     }
 551 
 552     @Override
 553     void released() {
 554         if (streamid > 0) {
 555             debug.log(Level.DEBUG, "Released stream %d", streamid);
 556             // remove this stream from the Http2Connection map.
 557             connection.closeStream(streamid);
 558         } else {
 559             debug.log(Level.DEBUG, "Can't release stream %d", streamid);
 560         }
 561     }
 562 
 563     @Override
 564     void completed() {
 565         // There should be nothing to do here: the stream should have
 566         // been already closed (or will be closed shortly after).
 567     }
 568 
 569     void registerStream(int id) {
 570         this.streamid = id;
 571         connection.putStream(this, streamid);
 572         debug.log(Level.DEBUG, "Registered stream %d", id);
 573     }
 574 
 575     void signalWindowUpdate() {
 576         RequestSubscriber subscriber = requestSubscriber;
 577         assert subscriber != null;
 578         debug.log(Level.DEBUG, "Signalling window update");
 579         subscriber.sendScheduler.runOrSchedule();
 580     }
 581 
 582     static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
 583     class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
 584         // can be < 0 if the actual length is not known.
 585         private final long contentLength;
 586         private volatile long remainingContentLength;
 587         private volatile Subscription subscription;
 588 
 589         // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
 590         //  1) The data that was published by the request body Publisher, and
 591         //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
 592         final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
 593 
 594         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 595         // A scheduler used to honor window updates. Writing must be paused
 596         // when the window is exhausted, and resumed when the window acquires
 597         // some space. The sendScheduler makes it possible to implement this
 598         // behaviour in an asynchronous non-blocking way.
 599         // See RequestSubscriber::trySend below.
 600         final SequentialScheduler sendScheduler;
 601 
 602         RequestSubscriber(long contentLen) {
 603             this.contentLength = contentLen;
 604             this.remainingContentLength = contentLen;
 605             this.sendScheduler =
 606                     SequentialScheduler.synchronizedScheduler(this::trySend);
 607         }
 608 
 609         @Override
 610         public void onSubscribe(Flow.Subscription subscription) {
 611             if (this.subscription != null) {
 612                 throw new IllegalStateException("already subscribed");
 613             }
 614             this.subscription = subscription;
 615             debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
 616             subscription.request(1);
 617         }
 618 
 619         @Override
 620         public void onNext(ByteBuffer item) {
 621             debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
 622             int size = outgoing.size();
 623             assert size == 0 : "non-zero size: " + size;
 624             onNextImpl(item);
 625         }
 626 
 627         private void onNextImpl(ByteBuffer item) {
 628             // Got some more request body bytes to send.
 629             if (requestBodyCF.isDone()) {
 630                 // stream already cancelled, probably in timeout
 631                 sendScheduler.stop();
 632                 subscription.cancel();
 633                 return;
 634             }
 635             outgoing.add(item);
 636             sendScheduler.runOrSchedule();
 637         }
 638 
 639         @Override
 640         public void onError(Throwable throwable) {
 641             debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
 642             // ensure that errors are handled within the flow.
 643             if (errorRef.compareAndSet(null, throwable)) {
 644                 sendScheduler.runOrSchedule();
 645             }
 646         }
 647 
 648         @Override
 649         public void onComplete() {
 650             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
 651             int size = outgoing.size();
 652             assert size == 0 || size == 1 : "non-zero or one size: " + size;
 653             // last byte of request body has been obtained.
 654             // ensure that everything is completed within the flow.
 655             onNextImpl(COMPLETED);
 656         }
 657 
 658         // Attempts to send the data, if any.
 659         // Handles errors and completion state.
 660         // Pause writing if the send window is exhausted, resume it if the
 661         // send window has some bytes that can be acquired.
 662         void trySend() {
 663             try {
 664                 // handle errors raised by onError;
 665                 Throwable t = errorRef.get();
 666                 if (t != null) {
 667                     sendScheduler.stop();
 668                     if (requestBodyCF.isDone()) return;
 669                     subscription.cancel();
 670                     requestBodyCF.completeExceptionally(t);
 671                     return;
 672                 }
 673 
 674                 do {
 675                     // handle COMPLETED;
 676                     ByteBuffer item = outgoing.peekFirst();
 677                     if (item == null) return;
 678                     else if (item == COMPLETED) {
 679                         sendScheduler.stop();
 680                         complete();
 681                         return;
 682                     }
 683 
 684                     // handle bytes to send downstream
 685                     while (item.hasRemaining()) {
 686                         debug.log(Level.DEBUG, "trySend: %d", item.remaining());
 687                         assert !endStreamSent : "internal error, send data after END_STREAM flag";
 688                         DataFrame df = getDataFrame(item);
 689                         if (df == null) {
 690                             debug.log(Level.DEBUG, "trySend: can't send yet: %d",
 691                                     item.remaining());
 692                             return; // the send window is exhausted: come back later
 693                         }
 694 
 695                         if (contentLength > 0) {
 696                             remainingContentLength -= df.getDataLength();
 697                             if (remainingContentLength < 0) {
 698                                 String msg = connection().getConnectionFlow()
 699                                         + " stream=" + streamid + " "
 700                                         + "[" + Thread.currentThread().getName() + "] "
 701                                         + "Too many bytes in request body. Expected: "
 702                                         + contentLength + ", got: "
 703                                         + (contentLength - remainingContentLength);
 704                                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 705                                 throw new IOException(msg);
 706                             } else if (remainingContentLength == 0) {
 707                                 df.setFlag(DataFrame.END_STREAM);
 708                                 endStreamSent = true;
 709                             }
 710                         }
 711                         debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
 712                         connection.sendDataFrame(df);
 713                     }
 714                     assert !item.hasRemaining();
 715                     ByteBuffer b = outgoing.removeFirst();
 716                     assert b == item;
 717                 } while (outgoing.peekFirst() != null);
 718 
 719                 debug.log(Level.DEBUG, "trySend: request 1");
 720                 subscription.request(1);
 721             } catch (Throwable ex) {
 722                 debug.log(Level.DEBUG, "trySend: ", ex);
 723                 sendScheduler.stop();
 724                 subscription.cancel();
 725                 requestBodyCF.completeExceptionally(ex);
 726             }
 727         }
 728 
 729         private void complete() throws IOException {
 730             long remaining = remainingContentLength;
 731             long written = contentLength - remaining;
 732             if (remaining > 0) {
 733                 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
 734                 // let trySend() handle the exception
 735                 throw new IOException(connection().getConnectionFlow()
 736                                      + " stream=" + streamid + " "
 737                                      + "[" + Thread.currentThread().getName() +"] "
 738                                      + "Too few bytes returned by the publisher ("
 739                                               + written + "/"
 740                                               + contentLength + ")");
 741             }








 742             if (!endStreamSent) {
 743                 endStreamSent = true;
 744                 connection.sendDataFrame(getEmptyEndStreamDataFrame());
 745             }
 746             requestBodyCF.complete(null);


 747         }
 748     }
 749 
 750     /**
 751      * Send a RESET frame to tell server to stop sending data on this stream
 752      */
 753     @Override
 754     public CompletableFuture<Void> ignoreBody() {
 755         try {
 756             connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
 757             return MinimalFuture.completedFuture(null);
 758         } catch (Throwable e) {
 759             Log.logTrace("Error resetting stream {0}", e.toString());
 760             return MinimalFuture.failedFuture(e);
 761         }
 762     }
 763 
 764     DataFrame getDataFrame(ByteBuffer buffer) {
 765         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
 766         // blocks waiting for stream send window, if exhausted
 767         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
 768         if (actualAmount <= 0) return null;
 769         ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
 770         DataFrame df = new DataFrame(streamid, 0 , outBuf);
 771         return df;
 772     }
 773 
 774     private DataFrame getEmptyEndStreamDataFrame()  {
 775         return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
 776     }
 777 
 778     /**
 779      * A List of responses relating to this stream. Normally there is only
 780      * one response, but intermediate responses like 100 are allowed
 781      * and must be passed up to higher level before continuing. Deals with races
 782      * such as if responses are returned before the CFs get created by
 783      * getResponseAsync()
 784      */
 785 
 786     final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
 787 
 788     @Override
 789     CompletableFuture<Response> getResponseAsync(Executor executor) {
 790         CompletableFuture<Response> cf;
 791         // The code below deals with race condition that can be caused when
 792         // completeResponse() is being called before getResponseAsync()
 793         synchronized (response_cfs) {
 794             if (!response_cfs.isEmpty()) {
 795                 // This CompletableFuture was created by completeResponse().
 796                 // it will be already completed.
 797                 cf = response_cfs.remove(0);
 798                 // if we find a cf here it should be already completed.
 799                 // finding a non completed cf should not happen. just assert it.
 800                 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
 801             } else {
 802                 // getResponseAsync() is called first. Create a CompletableFuture
 803                 // that will be completed by completeResponse() when
 804                 // completeResponse() is called.
 805                 cf = new MinimalFuture<>();
 806                 response_cfs.add(cf);
 807             }
 808         }
 809         if (executor != null && !cf.isDone()) {
 810             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
 811             cf = cf.thenApplyAsync(r -> r, executor);
 812         }
 813         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
 814         PushGroup<?,?> pg = exchange.getPushGroup();
 815         if (pg != null) {
 816             // if an error occurs make sure it is recorded in the PushGroup
 817             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
 818         }
 819         return cf;
 820     }
 821 
 822     /**
 823      * Completes the first uncompleted CF on list, and removes it. If there is no
 824      * uncompleted CF then creates one (completes it) and adds to list
 825      */
 826     void completeResponse(Response resp) {
 827         synchronized (response_cfs) {
 828             CompletableFuture<Response> cf;
 829             int cfs_len = response_cfs.size();
 830             for (int i=0; i<cfs_len; i++) {
 831                 cf = response_cfs.get(i);
 832                 if (!cf.isDone()) {
 833                     Log.logTrace("Completing response (streamid={0}): {1}",
 834                                  streamid, cf);
 835                     cf.complete(resp);
 836                     response_cfs.remove(cf);
 837                     return;
 838                 } // else we found the previous response: just leave it alone.
 839             }
 840             cf = MinimalFuture.completedFuture(resp);
 841             Log.logTrace("Created completed future (streamid={0}): {1}",
 842                          streamid, cf);
 843             response_cfs.add(cf);
 844         }
 845     }
 846 
 847     // methods to update state and remove stream when finished
 848 
 849     synchronized void requestSent() {
 850         requestSent = true;
 851         if (responseReceived) {
 852             close();
 853         }
 854     }
 855 




 856     synchronized void responseReceived() {
 857         responseReceived = true;
 858         if (requestSent) {
 859             close();
 860         }
 861     }
 862 
 863     /**
 864      * same as above but for errors
 865      */
 866     void completeResponseExceptionally(Throwable t) {
 867         synchronized (response_cfs) {
 868             // use index to avoid ConcurrentModificationException
 869             // caused by removing the CF from within the loop.
 870             for (int i = 0; i < response_cfs.size(); i++) {
 871                 CompletableFuture<Response> cf = response_cfs.get(i);
 872                 if (!cf.isDone()) {
 873                     cf.completeExceptionally(t);
 874                     response_cfs.remove(i);
 875                     return;
 876                 }
 877             }
 878             response_cfs.add(MinimalFuture.failedFuture(t));
 879         }
 880     }
 881 
 882     CompletableFuture<Void> sendBodyImpl() {
 883         requestBodyCF.whenComplete((v, t) -> requestSent());
 884         if (requestPublisher != null) {
 885             final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
 886             requestPublisher.subscribe(requestSubscriber = subscriber);
 887         } else {
 888             // there is no request body, therefore the request is complete,
 889             // END_STREAM has already sent with outgoing headers
 890             requestBodyCF.complete(null);
 891         }
 892         return requestBodyCF;
 893     }
 894 
 895     @Override
 896     void cancel() {
 897         cancel(new IOException("Stream " + streamid + " cancelled"));
 898     }
 899 
 900     @Override
 901     void cancel(IOException cause) {
 902         cancelImpl(cause);
 903     }
 904 
 905     // This method sends a RST_STREAM frame
 906     void cancelImpl(Throwable e) {
 907         debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
 908         if (Log.trace()) {
 909             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
 910         }
 911         boolean closing;
 912         if (closing = !closed) { // assigning closing to !closed
 913             synchronized (this) {
 914                 failed = e;
 915                 if (closing = !closed) { // assigning closing to !closed
 916                     closed=true;
 917                 }
 918             }
 919         }
 920         if (closing) { // true if the stream has not been closed yet
 921             if (responseSubscriber != null)
 922                 sched.runOrSchedule();
 923         }
 924         completeResponseExceptionally(e);
 925         if (!requestBodyCF.isDone()) {
 926             requestBodyCF.completeExceptionally(e); // we may be sending the body..
 927         }
 928         if (responseBodyCF != null) {
 929             responseBodyCF.completeExceptionally(e);
 930         }
 931         try {
 932             // will send a RST_STREAM frame
 933             if (streamid != 0) {
 934                 connection.resetStream(streamid, ResetFrame.CANCEL);
 935             }
 936         } catch (IOException ex) {
 937             Log.logError(ex);
 938         }
 939     }
 940 
 941     // This method doesn't send any frame
 942     void close() {
 943         if (closed) return;
 944         synchronized(this) {
 945             if (closed) return;
 946             closed = true;
 947         }
 948         Log.logTrace("Closing stream {0}", streamid);

 949         connection.closeStream(streamid);
 950         Log.logTrace("Stream {0} closed", streamid);
 951     }
 952 
 953     static class PushedStream<U,T> extends Stream<T> {
 954         final PushGroup<U,T> pushGroup;

 955         // push streams need the response CF allocated up front as it is
 956         // given directly to user via the multi handler callback function.
 957         final CompletableFuture<Response> pushCF;
 958         final CompletableFuture<HttpResponse<T>> responseCF;
 959         final HttpRequestImpl pushReq;
 960         HttpResponse.BodyHandler<T> pushHandler;
 961 
 962         PushedStream(PushGroup<U,T> pushGroup,
 963                      Http2Connection connection,
 964                      Exchange<T> pushReq) {
 965             // ## no request body possible, null window controller
 966             super(connection, pushReq, null);
 967             this.pushGroup = pushGroup;
 968             this.pushReq = pushReq.request();
 969             this.pushCF = new MinimalFuture<>();
 970             this.responseCF = new MinimalFuture<>();

 971         }
 972 
 973         CompletableFuture<HttpResponse<T>> responseCF() {
 974             return responseCF;
 975         }
 976 
 977         synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
 978             this.pushHandler = pushHandler;
 979         }
 980 
 981         synchronized HttpResponse.BodyHandler<T> getPushHandler() {
 982             // ignored parameters to function can be used as BodyHandler
 983             return this.pushHandler;
 984         }
 985 
 986         // Following methods call the super class but in case of
 987         // error record it in the PushGroup. The error method is called
 988         // with a null value when no error occurred (is a no-op)
 989         @Override
 990         CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
 991             return super.sendBodyAsync()
 992                         .whenComplete((ExchangeImpl<T> v, Throwable t)
 993                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
 994         }
 995 
 996         @Override
 997         CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
 998             return super.sendHeadersAsync()
 999                         .whenComplete((ExchangeImpl<T> ex, Throwable t)
1000                                 -> pushGroup.pushError(Utils.getCompletionCause(t)));
1001         }
1002 
1003         @Override
1004         CompletableFuture<Response> getResponseAsync(Executor executor) {
1005             CompletableFuture<Response> cf = pushCF.whenComplete(
1006                     (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
1007             if(executor!=null && !cf.isDone()) {
1008                 cf  = cf.thenApplyAsync( r -> r, executor);
1009             }
1010             return cf;
1011         }
1012 
1013         @Override
1014         CompletableFuture<T> readBodyAsync(
1015                 HttpResponse.BodyHandler<T> handler,
1016                 boolean returnConnectionToPool,
1017                 Executor executor)
1018         {
1019             return super.readBodyAsync(handler, returnConnectionToPool, executor)
1020                         .whenComplete((v, t) -> pushGroup.pushError(t));
1021         }
1022 
1023         @Override
1024         void completeResponse(Response r) {
1025             Log.logResponse(r::toString);
1026             pushCF.complete(r); // not strictly required for push API
1027             // start reading the body using the obtained BodySubscriber
1028             CompletableFuture<Void> start = new MinimalFuture<>();
1029             start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
1030                 .whenComplete((T body, Throwable t) -> {
1031                     if (t != null) {
1032                         responseCF.completeExceptionally(t);
1033                     } else {
1034                         HttpResponseImpl<T> resp =
1035                                 new HttpResponseImpl<>(r.request, r, null, body, getExchange());
1036                         responseCF.complete(resp);
1037                     }
1038                 });
1039             start.completeAsync(() -> null, getExchange().executor());
1040         }
1041 
1042         @Override
1043         void completeResponseExceptionally(Throwable t) {
1044             pushCF.completeExceptionally(t);
1045         }
1046 
1047 //        @Override
1048 //        synchronized void responseReceived() {
1049 //            super.responseReceived();
1050 //        }
1051 
1052         // create and return the PushResponseImpl
1053         @Override
1054         protected void handleResponse() {

1055             responseCode = (int)responseHeaders
1056                 .firstValueAsLong(":status")
1057                 .orElse(-1);
1058 
1059             if (responseCode == -1) {
1060                 completeResponseExceptionally(new IOException("No status code"));
1061             }
1062 
1063             this.response = new Response(
1064                 pushReq, exchange, responseHeaders,
1065                 responseCode, HttpClient.Version.HTTP_2);
1066 
1067             /* TODO: review if needs to be removed
1068                the value is not used, but in case `content-length` doesn't parse
1069                as long, there will be NumberFormatException. If left as is, make
1070                sure code up the stack handles NFE correctly. */
1071             responseHeaders.firstValueAsLong("content-length");
1072 
1073             if (Log.headers()) {
1074                 StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
1075                 sb.append(" (streamid=").append(streamid).append("): ");
1076                 Log.dumpHeaders(sb, "    ", responseHeaders);
1077                 Log.logHeaders(sb.toString());
1078             }
1079 
1080             // different implementations for normal streams and pushed streams
1081             completeResponse(response);
1082         }
1083     }
1084 
1085     final class StreamWindowUpdateSender extends WindowUpdateSender {
1086 
1087         StreamWindowUpdateSender(Http2Connection connection) {
1088             super(connection);
1089         }
1090 
1091         @Override
1092         int getStreamId() {
1093             return streamid;
1094         }
1095     }
1096 
1097     /**
1098      * Returns true if this exchange was canceled.
1099      * @return true if this exchange was canceled.
1100      */
1101     synchronized boolean isCanceled() {
1102         return failed != null;
1103     }
1104 
1105     /**
1106      * Returns the cause for which this exchange was canceled, if available.
1107      * @return the cause for which this exchange was canceled, if available.
1108      */
1109     synchronized Throwable getCancelCause() {
1110         return failed;
1111     }
1112 
1113     final String dbgString() {
1114         return connection.dbgString() + "/Stream("+streamid+")";
1115     }
1116 }
< prev index next >