< prev index next >
src/java.httpclient/share/classes/java/net/http/Stream.java
Print this page
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -22,80 +22,821 @@
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
+import sun.net.httpclient.hpack.DecodingCallback;
+
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
import java.util.function.LongConsumer;
/**
- * Http/2 Stream
+ * Http/2 Stream handling.
+ *
+ * REQUESTS
+ *
+ * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
+ *
+ * sendRequest() -- sendHeadersOnly() + sendBody()
+ *
+ * sendBody() -- in calling thread: obeys all flow control (so may block)
+ * obtains data from request body processor and places on connection
+ * outbound Q.
+ *
+ * sendBodyAsync() -- calls sendBody() in an executor thread.
+ *
+ * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
+ *
+ * sendRequestAsync() -- calls sendRequest() in an executor thread
+ *
+ * RESPONSES
+ *
+ * Multiple responses can be received per request. Responses are queued up on
+ * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
+ * with the next response
+ *
+ * getResponseAsync() -- queries list of response CFs and returns first one
+ * if one exists. Otherwise, creates one and adds it to list
+ * and returns it. Completion is achieved through the
+ * incoming() upcall from connection reader thread.
+ *
+ * getResponse() -- calls getResponseAsync() and waits for CF to complete
+ *
+ * responseBody() -- in calling thread: blocks for incoming DATA frames on
+ * stream inputQ. Obeys remote and local flow control so may block.
+ * Calls user response body processor with data buffers.
+ *
+ * responseBodyAsync() -- calls responseBody() in an executor thread.
+ *
+ * incoming() -- entry point called from connection reader thread. Frames are
+ * either handled immediately without blocking or for data frames
+ * placed on the stream's inputQ which is consumed by the stream's
+ * reader thread.
+ *
+ * PushedStream sub class
+ * ======================
+ * Sending side methods are not used because the request comes from a PUSH_PROMISE
+ * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
+ * is created. PushedStream does not use responseCF list as there can be only
+ * one response. The CF is created when the object created and when the response
+ * HEADERS frame is received the object is completed.
*/
class Stream extends ExchangeImpl {
- void debugPrint() {
- }
+ final Queue<Http2Frame> inputQ;
+
+ volatile int streamid;
+
+ long responseContentLen = -1;
+ long responseBytesProcessed = 0;
+ long requestContentLen;
+
+ Http2Connection connection;
+ HttpClientImpl client;
+ final HttpRequestImpl request;
+ final DecodingCallback rspHeadersConsumer;
+ HttpHeadersImpl responseHeaders;
+ final HttpHeadersImpl requestHeaders;
+ final HttpHeadersImpl requestPseudoHeaders;
+ HttpResponse.BodyProcessor<?> responseProcessor;
+ final HttpRequest.BodyProcessor requestProcessor;
+ HttpResponse response;
+
+ // state flags
+ boolean requestSent, responseReceived;
+
+ final FlowController userRequestFlowController =
+ new FlowController();
+ final FlowController remoteRequestFlowController =
+ new FlowController();
+ final FlowController responseFlowController =
+ new FlowController();
+
+ final ExecutorWrapper executor;
@Override
@SuppressWarnings("unchecked")
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
- return null;
+ this.responseProcessor = processor;
+ CompletableFuture<T> cf;
+ try {
+ T body = processor.onResponseBodyStart(
+ responseContentLen, responseHeaders,
+ responseFlowController); // TODO: filter headers
+ if (body != null) {
+ cf = CompletableFuture.completedFuture(body);
+ receiveDataAsync(processor);
+ } else
+ cf = receiveDataAsync(processor);
+ } catch (IOException e) {
+ cf = CompletableFuture.failedFuture(e);
+ }
+ PushGroup<?> pg = request.pushGroup();
+ if (pg != null) {
+ // if an error occurs make sure it is recorded in the PushGroup
+ cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ }
+ return cf;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("streamid: ")
+ .append(streamid);
+ return sb.toString();
+ }
+
+ // pushes entire response body into response processor
+ // blocking when required by local or remote flow control
+ void receiveData() throws IOException {
+ Http2Frame frame;
+ DataFrame df = null;
+ try {
+ do {
+ frame = inputQ.take();
+ if (!(frame instanceof DataFrame)) {
+ assert false;
+ continue;
+ }
+ df = (DataFrame) frame;
+ int len = df.getDataLength();
+ ByteBuffer[] buffers = df.getData();
+ for (ByteBuffer b : buffers) {
+ responseFlowController.take();
+ responseProcessor.onResponseBodyChunk(b);
+ }
+ sendWindowUpdate(len);
+ } while (!df.getFlag(DataFrame.END_STREAM));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
+ CompletableFuture<T> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ receiveData();
+ T body = processor.onResponseComplete();
+ cf.complete(body);
+ responseReceived();
+ } catch (Throwable t) {
+ cf.completeExceptionally(t);
+ }
+ }, null);
+ return cf;
+ }
+
+ private void sendWindowUpdate(int increment)
+ throws IOException, InterruptedException {
+ if (increment == 0)
+ return;
+ LinkedList<Http2Frame> list = new LinkedList<>();
+ WindowUpdateFrame frame = new WindowUpdateFrame();
+ frame.streamid(streamid);
+ frame.setUpdate(increment);
+ list.add(frame);
+ frame = new WindowUpdateFrame();
+ frame.streamid(0);
+ frame.setUpdate(increment);
+ list.add(frame);
+ connection.sendFrames(list);
}
+ @Override
+ CompletableFuture<Void> sendBodyAsync() {
+ final CompletableFuture<Void> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ sendBodyImpl();
+ cf.complete(null);
+ } catch (IOException | InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ }, null);
+ return cf;
+ }
+
+ @SuppressWarnings("unchecked")
Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
super(e);
+ this.client = client;
+ this.connection = connection;
+ this.request = e.request();
+ this.requestProcessor = request.requestProcessor();
+ responseHeaders = new HttpHeadersImpl();
+ requestHeaders = new HttpHeadersImpl();
+ rspHeadersConsumer = (name, value) -> {
+ responseHeaders.addHeader(name.toString(), value.toString());
+ };
+ this.executor = client.executorWrapper();
+ //this.response_cf = new CompletableFuture<HttpResponseImpl>();
+ this.requestPseudoHeaders = new HttpHeadersImpl();
+ // NEW
+ this.inputQ = new Queue<>();
+ }
+
+ @SuppressWarnings("unchecked")
+ Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
+ super(null);
+ this.client = client;
+ this.connection = connection;
+ this.request = req;
+ this.requestProcessor = null;
+ responseHeaders = new HttpHeadersImpl();
+ requestHeaders = new HttpHeadersImpl();
+ rspHeadersConsumer = (name, value) -> {
+ responseHeaders.addHeader(name.toString(), value.toString());
+ };
+ this.executor = client.executorWrapper();
+ //this.response_cf = new CompletableFuture<HttpResponseImpl>();
+ this.requestPseudoHeaders = new HttpHeadersImpl();
+ // NEW
+ this.inputQ = new Queue<>();
+ }
+
+ /**
+ * Entry point from Http2Connection reader thread.
+ *
+ * Data frames will be removed by response body thread.
+ *
+ * @param frame
+ * @throws IOException
+ */
+ void incoming(Http2Frame frame) throws IOException, InterruptedException {
+ if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
+ // Complete headers accumulated. handle response.
+ // It's okay if there are multiple HeaderFrames.
+ handleResponse();
+ } else if (frame instanceof DataFrame) {
+ inputQ.put(frame);
+ } else {
+ otherFrame(frame);
+ }
+ }
+
+ void otherFrame(Http2Frame frame) throws IOException {
+ switch (frame.type()) {
+ case WindowUpdateFrame.TYPE:
+ incoming_windowUpdate((WindowUpdateFrame) frame);
+ break;
+ case ResetFrame.TYPE:
+ incoming_reset((ResetFrame) frame);
+ break;
+ case PriorityFrame.TYPE:
+ incoming_priority((PriorityFrame) frame);
+ break;
+ default:
+ String msg = "Unexpected frame: " + frame.toString();
+ throw new IOException(msg);
+ }
+ }
+
+ // The Hpack decoder decodes into one of these consumers of name,value pairs
+
+ DecodingCallback rspHeadersConsumer() {
+ return rspHeadersConsumer;
+ }
+
+ // create and return the HttpResponseImpl
+ protected void handleResponse() throws IOException {
+ HttpConnection c = connection.connection; // TODO: improve
+ long statusCode = responseHeaders
+ .firstValueAsLong(":status")
+ .orElseThrow(() -> new IOException("no statuscode in response"));
+
+ this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
+ c.sslParameters(), HttpClient.Version.HTTP_2, c);
+ this.responseContentLen = responseHeaders
+ .firstValueAsLong("content-length")
+ .orElse(-1L);
+ // different implementations for normal streams and pushed streams
+ completeResponse(response);
+ }
+
+ void incoming_reset(ResetFrame frame) {
+ // TODO: implement reset
+ int error = frame.getErrorCode();
+ IOException e = new IOException(ErrorFrame.stringForCode(error));
+ completeResponseExceptionally(e);
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ void incoming_priority(PriorityFrame frame) {
+ // TODO: implement priority
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ void incoming_windowUpdate(WindowUpdateFrame frame) {
+ int amount = frame.getUpdate();
+ if (amount > 0)
+ remoteRequestFlowController.accept(amount);
+ }
+
+ void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
+ if (Log.requests()) {
+ Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+ }
+ PushGroup<?> pushGroup = request.pushGroup();
+ if (pushGroup == null) {
+ cancelImpl(new IllegalStateException("unexpected push promise"));
+ }
+ // get the handler and call it.
+ BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
+ pushGroup.pushHandler();
+
+ CompletableFuture<HttpResponse> pushCF = pushStream
+ .getResponseAsync(null)
+ .thenApply(r -> (HttpResponse)r);
+ boolean accept = ph.apply(pushReq, pushCF);
+ if (!accept) {
+ IOException ex = new IOException("Stream cancelled by user");
+ cancelImpl(ex);
+ pushCF.completeExceptionally(ex);
+ } else {
+ pushStream.requestSent();
+ pushGroup.addPush();
+ }
+ }
+
+ private OutgoingHeaders headerFrame(long contentLength) {
+ HttpHeadersImpl h = request.getSystemHeaders();
+ if (contentLength > 0) {
+ h.setHeader("content-length", Long.toString(contentLength));
+ }
+ setPseudoHeaderFields();
+ OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
+ if (contentLength == 0) {
+ f.setFlag(HeadersFrame.END_STREAM);
+ }
+ return f;
+ }
+
+ private void setPseudoHeaderFields() {
+ HttpHeadersImpl hdrs = requestPseudoHeaders;
+ String method = request.method();
+ hdrs.setHeader(":method", method);
+ URI uri = request.uri();
+ hdrs.setHeader(":scheme", uri.getScheme());
+ // TODO: userinfo deprecated. Needs to be removed
+ hdrs.setHeader(":authority", uri.getAuthority());
+ // TODO: ensure header names beginning with : not in user headers
+ String query = uri.getQuery();
+ String path = uri.getPath();
+ if (path == null) {
+ if (method.equalsIgnoreCase("OPTIONS")) {
+ path = "*";
+ } else {
+ path = "/";
+ }
+ }
+ if (query != null) {
+ path += "?" + query;
+ }
+ hdrs.setHeader(":path", path);
+ }
+
+ HttpHeadersImpl getRequestPseudoHeaders() {
+ return requestPseudoHeaders;
}
@Override
HttpResponseImpl getResponse() throws IOException {
- return null;
+ try {
+ return getResponseAsync(null).join();
+ } catch (Throwable e) {
+ Throwable t = e.getCause();
+ if (t instanceof IOException) {
+ throw (IOException)t;
+ }
+ throw e;
+ }
}
@Override
void sendRequest() throws IOException, InterruptedException {
+ sendHeadersOnly();
+ sendBody();
+ }
+
+ /**
+ * A simple general purpose blocking flow controller
+ */
+ class FlowController implements LongConsumer {
+ int permits;
+
+ FlowController() {
+ this.permits = 0;
+ }
+
+ @Override
+ public synchronized void accept(long n) {
+ if (n < 1) {
+ throw new InternalError("FlowController.accept called with " + n);
+ }
+ if (permits == 0) {
+ permits += n;
+ notifyAll();
+ } else {
+ permits += n;
+ }
+ }
+
+ public synchronized void take() throws InterruptedException {
+ take(1);
+ }
+
+ public synchronized void take(int amount) throws InterruptedException {
+ assert permits >= 0;
+ while (permits < amount) {
+ int n = Math.min(amount, permits);
+ permits -= n;
+ amount -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
}
@Override
void sendHeadersOnly() throws IOException, InterruptedException {
+ if (Log.requests() && request != null) {
+ Log.logRequest(request.toString());
+ }
+ requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
+ OutgoingHeaders f = headerFrame(requestContentLen);
+ connection.sendFrame(f);
}
@Override
void sendBody() throws IOException, InterruptedException {
+ sendBodyImpl();
+ }
+
+ void registerStream(int id) {
+ this.streamid = id;
+ connection.putStream(this, streamid);
+ }
+
+ DataFrame getDataFrame() throws IOException, InterruptedException {
+ userRequestFlowController.take();
+ int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
+ ByteBuffer buffer = connection.getBuffer();
+ buffer.limit(maxpayloadLen);
+ boolean complete = requestProcessor.onRequestBodyChunk(buffer);
+ buffer.flip();
+ int amount = buffer.remaining();
+ // wait for flow control if necessary. Following method will block
+ // until after headers frame is sent, so correct streamid is set.
+ remoteRequestFlowController.take(amount);
+ connection.obtainSendWindow(amount);
+
+ DataFrame df = new DataFrame();
+ df.streamid(streamid);
+ if (complete) {
+ df.setFlag(DataFrame.END_STREAM);
+ }
+ df.setData(buffer);
+ df.computeLength();
+ return df;
}
+
@Override
CompletableFuture<Void> sendHeadersAsync() {
- return null;
+ try {
+ sendHeadersOnly();
+ return CompletableFuture.completedFuture(null);
+ } catch (IOException | InterruptedException ex) {
+ return CompletableFuture.failedFuture(ex);
}
+ }
+
+ /**
+ * A List of responses relating to this stream. Normally there is only
+ * one response, but intermediate responses like 100 are allowed
+ * and must be passed up to higher level before continuing. Deals with races
+ * such as if responses are returned before the CFs get created by
+ * getResponseAsync()
+ */
+
+ final List<CompletableFuture<HttpResponseImpl>> response_cfs = new LinkedList<>();
@Override
CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
- return null;
+ CompletableFuture<HttpResponseImpl> cf;
+ synchronized (response_cfs) {
+ if (!response_cfs.isEmpty()) {
+ cf = response_cfs.remove(0);
+ } else {
+ cf = new CompletableFuture<>();
+ response_cfs.add(cf);
+ }
+ }
+ PushGroup<?> pg = request.pushGroup();
+ if (pg != null) {
+ // if an error occurs make sure it is recorded in the PushGroup
+ cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ }
+ return cf;
+ }
+
+ /**
+ * Completes the first uncompleted CF on list, and removes it. If there is no
+ * uncompleted CF then creates one (completes it) and adds to list
+ */
+ void completeResponse(HttpResponse r) {
+ HttpResponseImpl resp = (HttpResponseImpl)r;
+ synchronized (response_cfs) {
+ for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
+ if (!cf.isDone()) {
+ cf.complete(resp);
+ response_cfs.remove(cf);
+ //responseHeaders = new HttpHeadersImpl(); // for any following header blocks
+ return;
+ } else
+ System.err.println("Stream: " + this + " ALREADY DONE");
+ }
+ response_cfs.add(CompletableFuture.completedFuture(resp));
+ //responseHeaders = new HttpHeadersImpl(); // for any following header blocks
+ }
}
- @Override
- CompletableFuture<Void> sendBodyAsync() {
- return null;
+ // methods to update state and remove stream when finished
+
+ synchronized void requestSent() {
+ requestSent = true;
+ if (responseReceived)
+ connection.deleteStream(this);
+ }
+
+ synchronized void responseReceived() {
+ responseReceived = true;
+ if (requestSent)
+ connection.deleteStream(this);
+ PushGroup<?> pg = request.pushGroup();
+ if (pg != null)
+ pg.noMorePushes();
+ }
+
+ /**
+ * same as above but for errors
+ *
+ * @param t
+ */
+ void completeResponseExceptionally(Throwable t) {
+ synchronized (response_cfs) {
+ for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
+ if (!cf.isDone()) {
+ cf.completeExceptionally(t);
+ response_cfs.remove(cf);
+ return;
+ }
+ }
+ response_cfs.add(CompletableFuture.failedFuture(t));
+ }
+ }
+
+ void sendBodyImpl() throws IOException, InterruptedException {
+ if (requestContentLen == 0) {
+ // no body
+ return;
+ }
+ DataFrame df;
+ do {
+ df = getDataFrame();
+ // TODO: check accumulated content length (if not checked below)
+ connection.sendFrame(df);
+ } while (!df.getFlag(DataFrame.END_STREAM));
+ requestSent();
}
@Override
void cancel() {
+ cancelImpl(new Exception("Cancelled"));
}
+ void cancelImpl(Throwable e) {
+ Log.logTrace("cancelling stream: {0}\n", e.toString());
+ inputQ.close();
+ try {
+ connection.resetStream(streamid, ResetFrame.CANCEL);
+ } catch (IOException | InterruptedException ex) {
+ Log.logError(ex);
+ }
+ }
+
@Override
CompletableFuture<Void> sendRequestAsync() {
- return null;
+ CompletableFuture<Void> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ sendRequest();
+ cf.complete(null);
+ } catch (IOException |InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ }, null);
+ return cf;
}
@Override
<T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
- return null;
+ this.responseProcessor = processor;
+ T body = processor.onResponseBodyStart(
+ responseContentLen, responseHeaders,
+ responseFlowController); // TODO: filter headers
+ if (body == null) {
+ receiveData();
+ return processor.onResponseComplete();
+ } else
+ receiveDataAsync(processor);
+ responseReceived();
+ return body;
+ }
+
+ // called from Http2Connection reader thread
+ synchronized void updateOutgoingWindow(int update) {
+ remoteRequestFlowController.accept(update);
+ }
+
+ void close(String msg) {
+ cancel();
+ }
+
+ static class PushedStream extends Stream {
+ final PushGroup<?> pushGroup;
+ final private Stream parent; // used by server push streams
+ // push streams need the response CF allocated up front as it is
+ // given directly to user via the multi handler callback function.
+ final CompletableFuture<HttpResponseImpl> pushCF;
+ final HttpRequestImpl pushReq;
+
+ PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
+ Http2Connection connection, Stream parent,
+ HttpRequestImpl pushReq) {
+ super(client, connection, pushReq);
+ this.pushGroup = pushGroup;
+ this.pushReq = pushReq;
+ this.pushCF = new CompletableFuture<>();
+ this.parent = parent;
+ }
+
+ // Following methods call the super class but in case of
+ // error record it in the PushGroup. The error method is called
+ // with a null value when no error occurred (is a no-op)
+ @Override
+ CompletableFuture<Void> sendBodyAsync() {
+ return super.sendBodyAsync()
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ CompletableFuture<Void> sendHeadersAsync() {
+ return super.sendHeadersAsync()
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ CompletableFuture<Void> sendRequestAsync() {
+ return super.sendRequestAsync()
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
+ return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
+ return super.responseBodyAsync(processor)
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ void completeResponse(HttpResponse r) {
+ HttpResponseImpl resp = (HttpResponseImpl)r;
+ Utils.logResponse(resp);
+ pushCF.complete(resp);
+ }
+
+ @Override
+ void completeResponseExceptionally(Throwable t) {
+ pushCF.completeExceptionally(t);
+ }
+
+ @Override
+ synchronized void responseReceived() {
+ super.responseReceived();
+ pushGroup.pushCompleted();
+ }
+
+ // create and return the PushResponseImpl
+ @Override
+ protected void handleResponse() {
+ HttpConnection c = connection.connection; // TODO: improve
+ long statusCode = responseHeaders
+ .firstValueAsLong(":status")
+ .orElse(-1L);
+
+ if (statusCode == -1L)
+ completeResponseExceptionally(new IOException("No status code"));
+ ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
+ this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
+ c.sslParameters());
+ this.responseContentLen = responseHeaders
+ .firstValueAsLong("content-length")
+ .orElse(-1L);
+ // different implementations for normal streams and pushed streams
+ completeResponse(response);
+ }
+ }
+
+ /**
+ * One PushGroup object is associated with the parent Stream of
+ * the pushed Streams. This keeps track of all common state associated
+ * with the pushes.
+ */
+ static class PushGroup<T> {
+ // the overall completion object, completed when all pushes are done.
+ final CompletableFuture<T> resultCF;
+ Throwable error; // any exception that occured during pushes
+
+ // CF for main response
+ final CompletableFuture<HttpResponse> mainResponse;
+
+ // user's processor object
+ final HttpResponse.MultiProcessor<T> multiProcessor;
+
+ // per push handler function provided by processor
+ final private BiFunction<HttpRequest,
+ CompletableFuture<HttpResponse>,
+ Boolean> pushHandler;
+ int numberOfPushes;
+ int remainingPushes;
+ boolean noMorePushes = false;
+
+ PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
+ this.resultCF = new CompletableFuture<>();
+ this.mainResponse = new CompletableFuture<>();
+ this.multiProcessor = multiProcessor;
+ this.pushHandler = multiProcessor.onStart(req, mainResponse);
+ }
+
+ CompletableFuture<T> groupResult() {
+ return resultCF;
+ }
+
+ CompletableFuture<HttpResponse> mainResponse() {
+ return mainResponse;
+ }
+
+ private BiFunction<HttpRequest,
+ CompletableFuture<HttpResponse>, Boolean> pushHandler()
+ {
+ return pushHandler;
+ }
+
+ synchronized void addPush() {
+ numberOfPushes++;
+ remainingPushes++;
+ }
+
+ synchronized int numberOfPushes() {
+ return numberOfPushes;
+ }
+ // This is called when the main body response completes because it means
+ // no more PUSH_PROMISEs are possible
+ synchronized void noMorePushes() {
+ noMorePushes = true;
+ checkIfCompleted();
+ }
+
+ synchronized void pushCompleted() {
+ remainingPushes--;
+ checkIfCompleted();
+ }
+
+ synchronized void checkIfCompleted() {
+ if (remainingPushes == 0 && error == null && noMorePushes) {
+ T overallResult = multiProcessor.onComplete();
+ resultCF.complete(overallResult);
+ }
+ }
+
+ synchronized void pushError(Throwable t) {
+ if (t == null)
+ return;
+ this.error = t;
+ resultCF.completeExceptionally(t);
+ }
}
}
< prev index next >