< prev index next >
src/java.httpclient/share/classes/java/net/http/Stream.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 22,101 ****
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
import java.io.IOException;
- import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
! import java.util.concurrent.CompletionException;
import java.util.function.LongConsumer;
/**
! * Http/2 Stream
*/
class Stream extends ExchangeImpl {
! void debugPrint() {
! }
@Override
@SuppressWarnings("unchecked")
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
! return null;
}
Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
super(e);
}
@Override
HttpResponseImpl getResponse() throws IOException {
! return null;
}
@Override
void sendRequest() throws IOException, InterruptedException {
}
@Override
void sendHeadersOnly() throws IOException, InterruptedException {
}
@Override
void sendBody() throws IOException, InterruptedException {
}
@Override
CompletableFuture<Void> sendHeadersAsync() {
! return null;
}
@Override
CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
! return null;
}
! @Override
! CompletableFuture<Void> sendBodyAsync() {
! return null;
}
@Override
void cancel() {
}
@Override
CompletableFuture<Void> sendRequestAsync() {
! return null;
}
@Override
<T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
! return null;
}
}
--- 22,842 ----
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
+ import sun.net.httpclient.hpack.DecodingCallback;
+
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
! import java.util.function.BiFunction;
import java.util.function.LongConsumer;
/**
! * Http/2 Stream handling.
! *
! * REQUESTS
! *
! * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
! *
! * sendRequest() -- sendHeadersOnly() + sendBody()
! *
! * sendBody() -- in calling thread: obeys all flow control (so may block)
! * obtains data from request body processor and places on connection
! * outbound Q.
! *
! * sendBodyAsync() -- calls sendBody() in an executor thread.
! *
! * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
! *
! * sendRequestAsync() -- calls sendRequest() in an executor thread
! *
! * RESPONSES
! *
! * Multiple responses can be received per request. Responses are queued up on
! * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
! * with the next response
! *
! * getResponseAsync() -- queries list of response CFs and returns first one
! * if one exists. Otherwise, creates one and adds it to list
! * and returns it. Completion is achieved through the
! * incoming() upcall from connection reader thread.
! *
! * getResponse() -- calls getResponseAsync() and waits for CF to complete
! *
! * responseBody() -- in calling thread: blocks for incoming DATA frames on
! * stream inputQ. Obeys remote and local flow control so may block.
! * Calls user response body processor with data buffers.
! *
! * responseBodyAsync() -- calls responseBody() in an executor thread.
! *
! * incoming() -- entry point called from connection reader thread. Frames are
! * either handled immediately without blocking or for data frames
! * placed on the stream's inputQ which is consumed by the stream's
! * reader thread.
! *
! * PushedStream sub class
! * ======================
! * Sending side methods are not used because the request comes from a PUSH_PROMISE
! * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
! * is created. PushedStream does not use responseCF list as there can be only
! * one response. The CF is created when the object created and when the response
! * HEADERS frame is received the object is completed.
*/
class Stream extends ExchangeImpl {
! final Queue<Http2Frame> inputQ;
!
! volatile int streamid;
!
! long responseContentLen = -1;
! long responseBytesProcessed = 0;
! long requestContentLen;
!
! Http2Connection connection;
! HttpClientImpl client;
! final HttpRequestImpl request;
! final DecodingCallback rspHeadersConsumer;
! HttpHeadersImpl responseHeaders;
! final HttpHeadersImpl requestHeaders;
! final HttpHeadersImpl requestPseudoHeaders;
! HttpResponse.BodyProcessor<?> responseProcessor;
! final HttpRequest.BodyProcessor requestProcessor;
! HttpResponse response;
!
! // state flags
! boolean requestSent, responseReceived;
!
! final FlowController userRequestFlowController =
! new FlowController();
! final FlowController remoteRequestFlowController =
! new FlowController();
! final FlowController responseFlowController =
! new FlowController();
!
! final ExecutorWrapper executor;
@Override
@SuppressWarnings("unchecked")
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
! this.responseProcessor = processor;
! CompletableFuture<T> cf;
! try {
! T body = processor.onResponseBodyStart(
! responseContentLen, responseHeaders,
! responseFlowController); // TODO: filter headers
! if (body != null) {
! cf = CompletableFuture.completedFuture(body);
! receiveDataAsync(processor);
! } else
! cf = receiveDataAsync(processor);
! } catch (IOException e) {
! cf = CompletableFuture.failedFuture(e);
! }
! PushGroup<?> pg = request.pushGroup();
! if (pg != null) {
! // if an error occurs make sure it is recorded in the PushGroup
! cf = cf.whenComplete((t,e) -> pg.pushError(e));
! }
! return cf;
! }
!
! @Override
! public String toString() {
! StringBuilder sb = new StringBuilder();
! sb.append("streamid: ")
! .append(streamid);
! return sb.toString();
! }
!
! // pushes entire response body into response processor
! // blocking when required by local or remote flow control
! void receiveData() throws IOException {
! Http2Frame frame;
! DataFrame df = null;
! try {
! do {
! frame = inputQ.take();
! if (!(frame instanceof DataFrame)) {
! assert false;
! continue;
! }
! df = (DataFrame) frame;
! int len = df.getDataLength();
! ByteBuffer[] buffers = df.getData();
! for (ByteBuffer b : buffers) {
! responseFlowController.take();
! responseProcessor.onResponseBodyChunk(b);
! }
! sendWindowUpdate(len);
! } while (!df.getFlag(DataFrame.END_STREAM));
! } catch (InterruptedException e) {
! throw new IOException(e);
! }
! }
!
! private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
! CompletableFuture<T> cf = new CompletableFuture<>();
! executor.execute(() -> {
! try {
! receiveData();
! T body = processor.onResponseComplete();
! cf.complete(body);
! responseReceived();
! } catch (Throwable t) {
! cf.completeExceptionally(t);
! }
! }, null);
! return cf;
! }
!
! private void sendWindowUpdate(int increment)
! throws IOException, InterruptedException {
! if (increment == 0)
! return;
! LinkedList<Http2Frame> list = new LinkedList<>();
! WindowUpdateFrame frame = new WindowUpdateFrame();
! frame.streamid(streamid);
! frame.setUpdate(increment);
! list.add(frame);
! frame = new WindowUpdateFrame();
! frame.streamid(0);
! frame.setUpdate(increment);
! list.add(frame);
! connection.sendFrames(list);
}
+ @Override
+ CompletableFuture<Void> sendBodyAsync() {
+ final CompletableFuture<Void> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ sendBodyImpl();
+ cf.complete(null);
+ } catch (IOException | InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ }, null);
+ return cf;
+ }
+
+ @SuppressWarnings("unchecked")
Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
super(e);
+ this.client = client;
+ this.connection = connection;
+ this.request = e.request();
+ this.requestProcessor = request.requestProcessor();
+ responseHeaders = new HttpHeadersImpl();
+ requestHeaders = new HttpHeadersImpl();
+ rspHeadersConsumer = (name, value) -> {
+ responseHeaders.addHeader(name.toString(), value.toString());
+ };
+ this.executor = client.executorWrapper();
+ //this.response_cf = new CompletableFuture<HttpResponseImpl>();
+ this.requestPseudoHeaders = new HttpHeadersImpl();
+ // NEW
+ this.inputQ = new Queue<>();
+ }
+
+ @SuppressWarnings("unchecked")
+ Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
+ super(null);
+ this.client = client;
+ this.connection = connection;
+ this.request = req;
+ this.requestProcessor = null;
+ responseHeaders = new HttpHeadersImpl();
+ requestHeaders = new HttpHeadersImpl();
+ rspHeadersConsumer = (name, value) -> {
+ responseHeaders.addHeader(name.toString(), value.toString());
+ };
+ this.executor = client.executorWrapper();
+ //this.response_cf = new CompletableFuture<HttpResponseImpl>();
+ this.requestPseudoHeaders = new HttpHeadersImpl();
+ // NEW
+ this.inputQ = new Queue<>();
+ }
+
+ /**
+ * Entry point from Http2Connection reader thread.
+ *
+ * Data frames will be removed by response body thread.
+ *
+ * @param frame
+ * @throws IOException
+ */
+ void incoming(Http2Frame frame) throws IOException, InterruptedException {
+ if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
+ // Complete headers accumulated. handle response.
+ // It's okay if there are multiple HeaderFrames.
+ handleResponse();
+ } else if (frame instanceof DataFrame) {
+ inputQ.put(frame);
+ } else {
+ otherFrame(frame);
+ }
+ }
+
+ void otherFrame(Http2Frame frame) throws IOException {
+ switch (frame.type()) {
+ case WindowUpdateFrame.TYPE:
+ incoming_windowUpdate((WindowUpdateFrame) frame);
+ break;
+ case ResetFrame.TYPE:
+ incoming_reset((ResetFrame) frame);
+ break;
+ case PriorityFrame.TYPE:
+ incoming_priority((PriorityFrame) frame);
+ break;
+ default:
+ String msg = "Unexpected frame: " + frame.toString();
+ throw new IOException(msg);
+ }
+ }
+
+ // The Hpack decoder decodes into one of these consumers of name,value pairs
+
+ DecodingCallback rspHeadersConsumer() {
+ return rspHeadersConsumer;
+ }
+
+ // create and return the HttpResponseImpl
+ protected void handleResponse() throws IOException {
+ HttpConnection c = connection.connection; // TODO: improve
+ long statusCode = responseHeaders
+ .firstValueAsLong(":status")
+ .orElseThrow(() -> new IOException("no statuscode in response"));
+
+ this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
+ c.sslParameters(), HttpClient.Version.HTTP_2, c);
+ this.responseContentLen = responseHeaders
+ .firstValueAsLong("content-length")
+ .orElse(-1L);
+ // different implementations for normal streams and pushed streams
+ completeResponse(response);
+ }
+
+ void incoming_reset(ResetFrame frame) {
+ // TODO: implement reset
+ int error = frame.getErrorCode();
+ IOException e = new IOException(ErrorFrame.stringForCode(error));
+ completeResponseExceptionally(e);
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ void incoming_priority(PriorityFrame frame) {
+ // TODO: implement priority
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ void incoming_windowUpdate(WindowUpdateFrame frame) {
+ int amount = frame.getUpdate();
+ if (amount > 0)
+ remoteRequestFlowController.accept(amount);
+ }
+
+ void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
+ if (Log.requests()) {
+ Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+ }
+ PushGroup<?> pushGroup = request.pushGroup();
+ if (pushGroup == null) {
+ cancelImpl(new IllegalStateException("unexpected push promise"));
+ }
+ // get the handler and call it.
+ BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
+ pushGroup.pushHandler();
+
+ CompletableFuture<HttpResponse> pushCF = pushStream
+ .getResponseAsync(null)
+ .thenApply(r -> (HttpResponse)r);
+ boolean accept = ph.apply(pushReq, pushCF);
+ if (!accept) {
+ IOException ex = new IOException("Stream cancelled by user");
+ cancelImpl(ex);
+ pushCF.completeExceptionally(ex);
+ } else {
+ pushStream.requestSent();
+ pushGroup.addPush();
+ }
+ }
+
+ private OutgoingHeaders headerFrame(long contentLength) {
+ HttpHeadersImpl h = request.getSystemHeaders();
+ if (contentLength > 0) {
+ h.setHeader("content-length", Long.toString(contentLength));
+ }
+ setPseudoHeaderFields();
+ OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
+ if (contentLength == 0) {
+ f.setFlag(HeadersFrame.END_STREAM);
+ }
+ return f;
+ }
+
+ private void setPseudoHeaderFields() {
+ HttpHeadersImpl hdrs = requestPseudoHeaders;
+ String method = request.method();
+ hdrs.setHeader(":method", method);
+ URI uri = request.uri();
+ hdrs.setHeader(":scheme", uri.getScheme());
+ // TODO: userinfo deprecated. Needs to be removed
+ hdrs.setHeader(":authority", uri.getAuthority());
+ // TODO: ensure header names beginning with : not in user headers
+ String query = uri.getQuery();
+ String path = uri.getPath();
+ if (path == null) {
+ if (method.equalsIgnoreCase("OPTIONS")) {
+ path = "*";
+ } else {
+ path = "/";
+ }
+ }
+ if (query != null) {
+ path += "?" + query;
+ }
+ hdrs.setHeader(":path", path);
+ }
+
+ HttpHeadersImpl getRequestPseudoHeaders() {
+ return requestPseudoHeaders;
}
@Override
HttpResponseImpl getResponse() throws IOException {
! try {
! return getResponseAsync(null).join();
! } catch (Throwable e) {
! Throwable t = e.getCause();
! if (t instanceof IOException) {
! throw (IOException)t;
! }
! throw e;
! }
}
@Override
void sendRequest() throws IOException, InterruptedException {
+ sendHeadersOnly();
+ sendBody();
+ }
+
+ /**
+ * A simple general purpose blocking flow controller
+ */
+ class FlowController implements LongConsumer {
+ int permits;
+
+ FlowController() {
+ this.permits = 0;
+ }
+
+ @Override
+ public synchronized void accept(long n) {
+ if (n < 1) {
+ throw new InternalError("FlowController.accept called with " + n);
+ }
+ if (permits == 0) {
+ permits += n;
+ notifyAll();
+ } else {
+ permits += n;
+ }
+ }
+
+ public synchronized void take() throws InterruptedException {
+ take(1);
+ }
+
+ public synchronized void take(int amount) throws InterruptedException {
+ assert permits >= 0;
+ while (permits < amount) {
+ int n = Math.min(amount, permits);
+ permits -= n;
+ amount -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
}
@Override
void sendHeadersOnly() throws IOException, InterruptedException {
+ if (Log.requests() && request != null) {
+ Log.logRequest(request.toString());
+ }
+ requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
+ OutgoingHeaders f = headerFrame(requestContentLen);
+ connection.sendFrame(f);
}
@Override
void sendBody() throws IOException, InterruptedException {
+ sendBodyImpl();
+ }
+
+ void registerStream(int id) {
+ this.streamid = id;
+ connection.putStream(this, streamid);
+ }
+
+ DataFrame getDataFrame() throws IOException, InterruptedException {
+ userRequestFlowController.take();
+ int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
+ ByteBuffer buffer = connection.getBuffer();
+ buffer.limit(maxpayloadLen);
+ boolean complete = requestProcessor.onRequestBodyChunk(buffer);
+ buffer.flip();
+ int amount = buffer.remaining();
+ // wait for flow control if necessary. Following method will block
+ // until after headers frame is sent, so correct streamid is set.
+ remoteRequestFlowController.take(amount);
+ connection.obtainSendWindow(amount);
+
+ DataFrame df = new DataFrame();
+ df.streamid(streamid);
+ if (complete) {
+ df.setFlag(DataFrame.END_STREAM);
+ }
+ df.setData(buffer);
+ df.computeLength();
+ return df;
}
+
@Override
CompletableFuture<Void> sendHeadersAsync() {
! try {
! sendHeadersOnly();
! return CompletableFuture.completedFuture(null);
! } catch (IOException | InterruptedException ex) {
! return CompletableFuture.failedFuture(ex);
}
+ }
+
+ /**
+ * A List of responses relating to this stream. Normally there is only
+ * one response, but intermediate responses like 100 are allowed
+ * and must be passed up to higher level before continuing. Deals with races
+ * such as if responses are returned before the CFs get created by
+ * getResponseAsync()
+ */
+
+ final List<CompletableFuture<HttpResponseImpl>> response_cfs = new LinkedList<>();
@Override
CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
! CompletableFuture<HttpResponseImpl> cf;
! synchronized (response_cfs) {
! if (!response_cfs.isEmpty()) {
! cf = response_cfs.remove(0);
! } else {
! cf = new CompletableFuture<>();
! response_cfs.add(cf);
! }
! }
! PushGroup<?> pg = request.pushGroup();
! if (pg != null) {
! // if an error occurs make sure it is recorded in the PushGroup
! cf = cf.whenComplete((t,e) -> pg.pushError(e));
! }
! return cf;
! }
!
! /**
! * Completes the first uncompleted CF on list, and removes it. If there is no
! * uncompleted CF then creates one (completes it) and adds to list
! */
! void completeResponse(HttpResponse r) {
! HttpResponseImpl resp = (HttpResponseImpl)r;
! synchronized (response_cfs) {
! for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
! if (!cf.isDone()) {
! cf.complete(resp);
! response_cfs.remove(cf);
! //responseHeaders = new HttpHeadersImpl(); // for any following header blocks
! return;
! } else
! System.err.println("Stream: " + this + " ALREADY DONE");
! }
! response_cfs.add(CompletableFuture.completedFuture(resp));
! //responseHeaders = new HttpHeadersImpl(); // for any following header blocks
! }
}
! // methods to update state and remove stream when finished
!
! synchronized void requestSent() {
! requestSent = true;
! if (responseReceived)
! connection.deleteStream(this);
! }
!
! synchronized void responseReceived() {
! responseReceived = true;
! if (requestSent)
! connection.deleteStream(this);
! PushGroup<?> pg = request.pushGroup();
! if (pg != null)
! pg.noMorePushes();
! }
!
! /**
! * same as above but for errors
! *
! * @param t
! */
! void completeResponseExceptionally(Throwable t) {
! synchronized (response_cfs) {
! for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
! if (!cf.isDone()) {
! cf.completeExceptionally(t);
! response_cfs.remove(cf);
! return;
! }
! }
! response_cfs.add(CompletableFuture.failedFuture(t));
! }
! }
!
! void sendBodyImpl() throws IOException, InterruptedException {
! if (requestContentLen == 0) {
! // no body
! return;
! }
! DataFrame df;
! do {
! df = getDataFrame();
! // TODO: check accumulated content length (if not checked below)
! connection.sendFrame(df);
! } while (!df.getFlag(DataFrame.END_STREAM));
! requestSent();
}
@Override
void cancel() {
+ cancelImpl(new Exception("Cancelled"));
}
+ void cancelImpl(Throwable e) {
+ Log.logTrace("cancelling stream: {0}\n", e.toString());
+ inputQ.close();
+ try {
+ connection.resetStream(streamid, ResetFrame.CANCEL);
+ } catch (IOException | InterruptedException ex) {
+ Log.logError(ex);
+ }
+ }
+
@Override
CompletableFuture<Void> sendRequestAsync() {
! CompletableFuture<Void> cf = new CompletableFuture<>();
! executor.execute(() -> {
! try {
! sendRequest();
! cf.complete(null);
! } catch (IOException |InterruptedException e) {
! cf.completeExceptionally(e);
! }
! }, null);
! return cf;
}
@Override
<T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
! this.responseProcessor = processor;
! T body = processor.onResponseBodyStart(
! responseContentLen, responseHeaders,
! responseFlowController); // TODO: filter headers
! if (body == null) {
! receiveData();
! return processor.onResponseComplete();
! } else
! receiveDataAsync(processor);
! responseReceived();
! return body;
! }
!
! // called from Http2Connection reader thread
! synchronized void updateOutgoingWindow(int update) {
! remoteRequestFlowController.accept(update);
! }
!
! void close(String msg) {
! cancel();
! }
!
! static class PushedStream extends Stream {
! final PushGroup<?> pushGroup;
! final private Stream parent; // used by server push streams
! // push streams need the response CF allocated up front as it is
! // given directly to user via the multi handler callback function.
! final CompletableFuture<HttpResponseImpl> pushCF;
! final HttpRequestImpl pushReq;
!
! PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
! Http2Connection connection, Stream parent,
! HttpRequestImpl pushReq) {
! super(client, connection, pushReq);
! this.pushGroup = pushGroup;
! this.pushReq = pushReq;
! this.pushCF = new CompletableFuture<>();
! this.parent = parent;
! }
!
! // Following methods call the super class but in case of
! // error record it in the PushGroup. The error method is called
! // with a null value when no error occurred (is a no-op)
! @Override
! CompletableFuture<Void> sendBodyAsync() {
! return super.sendBodyAsync()
! .whenComplete((v, t) -> pushGroup.pushError(t));
! }
!
! @Override
! CompletableFuture<Void> sendHeadersAsync() {
! return super.sendHeadersAsync()
! .whenComplete((v, t) -> pushGroup.pushError(t));
! }
!
! @Override
! CompletableFuture<Void> sendRequestAsync() {
! return super.sendRequestAsync()
! .whenComplete((v, t) -> pushGroup.pushError(t));
! }
!
! @Override
! CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
! return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
! }
!
! @Override
! <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
! return super.responseBodyAsync(processor)
! .whenComplete((v, t) -> pushGroup.pushError(t));
! }
!
! @Override
! void completeResponse(HttpResponse r) {
! HttpResponseImpl resp = (HttpResponseImpl)r;
! Utils.logResponse(resp);
! pushCF.complete(resp);
! }
!
! @Override
! void completeResponseExceptionally(Throwable t) {
! pushCF.completeExceptionally(t);
! }
!
! @Override
! synchronized void responseReceived() {
! super.responseReceived();
! pushGroup.pushCompleted();
! }
!
! // create and return the PushResponseImpl
! @Override
! protected void handleResponse() {
! HttpConnection c = connection.connection; // TODO: improve
! long statusCode = responseHeaders
! .firstValueAsLong(":status")
! .orElse(-1L);
!
! if (statusCode == -1L)
! completeResponseExceptionally(new IOException("No status code"));
! ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
! this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
! c.sslParameters());
! this.responseContentLen = responseHeaders
! .firstValueAsLong("content-length")
! .orElse(-1L);
! // different implementations for normal streams and pushed streams
! completeResponse(response);
! }
! }
!
! /**
! * One PushGroup object is associated with the parent Stream of
! * the pushed Streams. This keeps track of all common state associated
! * with the pushes.
! */
! static class PushGroup<T> {
! // the overall completion object, completed when all pushes are done.
! final CompletableFuture<T> resultCF;
! Throwable error; // any exception that occured during pushes
!
! // CF for main response
! final CompletableFuture<HttpResponse> mainResponse;
!
! // user's processor object
! final HttpResponse.MultiProcessor<T> multiProcessor;
!
! // per push handler function provided by processor
! final private BiFunction<HttpRequest,
! CompletableFuture<HttpResponse>,
! Boolean> pushHandler;
! int numberOfPushes;
! int remainingPushes;
! boolean noMorePushes = false;
!
! PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
! this.resultCF = new CompletableFuture<>();
! this.mainResponse = new CompletableFuture<>();
! this.multiProcessor = multiProcessor;
! this.pushHandler = multiProcessor.onStart(req, mainResponse);
! }
!
! CompletableFuture<T> groupResult() {
! return resultCF;
! }
!
! CompletableFuture<HttpResponse> mainResponse() {
! return mainResponse;
! }
!
! private BiFunction<HttpRequest,
! CompletableFuture<HttpResponse>, Boolean> pushHandler()
! {
! return pushHandler;
! }
!
! synchronized void addPush() {
! numberOfPushes++;
! remainingPushes++;
! }
!
! synchronized int numberOfPushes() {
! return numberOfPushes;
! }
! // This is called when the main body response completes because it means
! // no more PUSH_PROMISEs are possible
! synchronized void noMorePushes() {
! noMorePushes = true;
! checkIfCompleted();
! }
!
! synchronized void pushCompleted() {
! remainingPushes--;
! checkIfCompleted();
! }
!
! synchronized void checkIfCompleted() {
! if (remainingPushes == 0 && error == null && noMorePushes) {
! T overallResult = multiProcessor.onComplete();
! resultCF.complete(overallResult);
! }
! }
!
! synchronized void pushError(Throwable t) {
! if (t == null)
! return;
! this.error = t;
! resultCF.completeExceptionally(t);
! }
}
}
< prev index next >