< prev index next >

src/java.httpclient/share/classes/java/net/http/Stream.java

Print this page

        

*** 1,7 **** /* ! * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this --- 1,7 ---- /* ! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this
*** 22,101 **** * or visit www.oracle.com if you need additional information or have any */ package java.net.http; import java.io.IOException; - import java.io.UncheckedIOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; ! import java.util.concurrent.CompletionException; import java.util.function.LongConsumer; /** ! * Http/2 Stream */ class Stream extends ExchangeImpl { ! void debugPrint() { ! } @Override @SuppressWarnings("unchecked") <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { ! return null; } Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { super(e); } @Override HttpResponseImpl getResponse() throws IOException { ! return null; } @Override void sendRequest() throws IOException, InterruptedException { } @Override void sendHeadersOnly() throws IOException, InterruptedException { } @Override void sendBody() throws IOException, InterruptedException { } @Override CompletableFuture<Void> sendHeadersAsync() { ! return null; } @Override CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) { ! return null; } ! @Override ! CompletableFuture<Void> sendBodyAsync() { ! return null; } @Override void cancel() { } @Override CompletableFuture<Void> sendRequestAsync() { ! return null; } @Override <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException { ! return null; } } --- 22,842 ---- * or visit www.oracle.com if you need additional information or have any */ package java.net.http; + import sun.net.httpclient.hpack.DecodingCallback; + import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; ! import java.util.function.BiFunction; import java.util.function.LongConsumer; /** ! * Http/2 Stream handling. ! * ! * REQUESTS ! * ! * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q ! * ! * sendRequest() -- sendHeadersOnly() + sendBody() ! * ! * sendBody() -- in calling thread: obeys all flow control (so may block) ! * obtains data from request body processor and places on connection ! * outbound Q. ! * ! * sendBodyAsync() -- calls sendBody() in an executor thread. ! * ! * sendHeadersAsync() -- calls sendHeadersOnly() which does not block ! * ! * sendRequestAsync() -- calls sendRequest() in an executor thread ! * ! * RESPONSES ! * ! * Multiple responses can be received per request. Responses are queued up on ! * a LinkedList of CF<HttpResponse> and the the first one on the list is completed ! * with the next response ! * ! * getResponseAsync() -- queries list of response CFs and returns first one ! * if one exists. Otherwise, creates one and adds it to list ! * and returns it. Completion is achieved through the ! * incoming() upcall from connection reader thread. ! * ! * getResponse() -- calls getResponseAsync() and waits for CF to complete ! * ! * responseBody() -- in calling thread: blocks for incoming DATA frames on ! * stream inputQ. Obeys remote and local flow control so may block. ! * Calls user response body processor with data buffers. ! * ! * responseBodyAsync() -- calls responseBody() in an executor thread. ! * ! * incoming() -- entry point called from connection reader thread. Frames are ! * either handled immediately without blocking or for data frames ! * placed on the stream's inputQ which is consumed by the stream's ! * reader thread. ! * ! * PushedStream sub class ! * ====================== ! * Sending side methods are not used because the request comes from a PUSH_PROMISE ! * frame sent by the server. When a PUSH_PROMISE is received the PushedStream ! * is created. PushedStream does not use responseCF list as there can be only ! * one response. The CF is created when the object created and when the response ! * HEADERS frame is received the object is completed. */ class Stream extends ExchangeImpl { ! final Queue<Http2Frame> inputQ; ! ! volatile int streamid; ! ! long responseContentLen = -1; ! long responseBytesProcessed = 0; ! long requestContentLen; ! ! Http2Connection connection; ! HttpClientImpl client; ! final HttpRequestImpl request; ! final DecodingCallback rspHeadersConsumer; ! HttpHeadersImpl responseHeaders; ! final HttpHeadersImpl requestHeaders; ! final HttpHeadersImpl requestPseudoHeaders; ! HttpResponse.BodyProcessor<?> responseProcessor; ! final HttpRequest.BodyProcessor requestProcessor; ! HttpResponse response; ! ! // state flags ! boolean requestSent, responseReceived; ! ! final FlowController userRequestFlowController = ! new FlowController(); ! final FlowController remoteRequestFlowController = ! new FlowController(); ! final FlowController responseFlowController = ! new FlowController(); ! ! final ExecutorWrapper executor; @Override @SuppressWarnings("unchecked") <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { ! this.responseProcessor = processor; ! CompletableFuture<T> cf; ! try { ! T body = processor.onResponseBodyStart( ! responseContentLen, responseHeaders, ! responseFlowController); // TODO: filter headers ! if (body != null) { ! cf = CompletableFuture.completedFuture(body); ! receiveDataAsync(processor); ! } else ! cf = receiveDataAsync(processor); ! } catch (IOException e) { ! cf = CompletableFuture.failedFuture(e); ! } ! PushGroup<?> pg = request.pushGroup(); ! if (pg != null) { ! // if an error occurs make sure it is recorded in the PushGroup ! cf = cf.whenComplete((t,e) -> pg.pushError(e)); ! } ! return cf; ! } ! ! @Override ! public String toString() { ! StringBuilder sb = new StringBuilder(); ! sb.append("streamid: ") ! .append(streamid); ! return sb.toString(); ! } ! ! // pushes entire response body into response processor ! // blocking when required by local or remote flow control ! void receiveData() throws IOException { ! Http2Frame frame; ! DataFrame df = null; ! try { ! do { ! frame = inputQ.take(); ! if (!(frame instanceof DataFrame)) { ! assert false; ! continue; ! } ! df = (DataFrame) frame; ! int len = df.getDataLength(); ! ByteBuffer[] buffers = df.getData(); ! for (ByteBuffer b : buffers) { ! responseFlowController.take(); ! responseProcessor.onResponseBodyChunk(b); ! } ! sendWindowUpdate(len); ! } while (!df.getFlag(DataFrame.END_STREAM)); ! } catch (InterruptedException e) { ! throw new IOException(e); ! } ! } ! ! private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) { ! CompletableFuture<T> cf = new CompletableFuture<>(); ! executor.execute(() -> { ! try { ! receiveData(); ! T body = processor.onResponseComplete(); ! cf.complete(body); ! responseReceived(); ! } catch (Throwable t) { ! cf.completeExceptionally(t); ! } ! }, null); ! return cf; ! } ! ! private void sendWindowUpdate(int increment) ! throws IOException, InterruptedException { ! if (increment == 0) ! return; ! LinkedList<Http2Frame> list = new LinkedList<>(); ! WindowUpdateFrame frame = new WindowUpdateFrame(); ! frame.streamid(streamid); ! frame.setUpdate(increment); ! list.add(frame); ! frame = new WindowUpdateFrame(); ! frame.streamid(0); ! frame.setUpdate(increment); ! list.add(frame); ! connection.sendFrames(list); } + @Override + CompletableFuture<Void> sendBodyAsync() { + final CompletableFuture<Void> cf = new CompletableFuture<>(); + executor.execute(() -> { + try { + sendBodyImpl(); + cf.complete(null); + } catch (IOException | InterruptedException e) { + cf.completeExceptionally(e); + } + }, null); + return cf; + } + + @SuppressWarnings("unchecked") Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { super(e); + this.client = client; + this.connection = connection; + this.request = e.request(); + this.requestProcessor = request.requestProcessor(); + responseHeaders = new HttpHeadersImpl(); + requestHeaders = new HttpHeadersImpl(); + rspHeadersConsumer = (name, value) -> { + responseHeaders.addHeader(name.toString(), value.toString()); + }; + this.executor = client.executorWrapper(); + //this.response_cf = new CompletableFuture<HttpResponseImpl>(); + this.requestPseudoHeaders = new HttpHeadersImpl(); + // NEW + this.inputQ = new Queue<>(); + } + + @SuppressWarnings("unchecked") + Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) { + super(null); + this.client = client; + this.connection = connection; + this.request = req; + this.requestProcessor = null; + responseHeaders = new HttpHeadersImpl(); + requestHeaders = new HttpHeadersImpl(); + rspHeadersConsumer = (name, value) -> { + responseHeaders.addHeader(name.toString(), value.toString()); + }; + this.executor = client.executorWrapper(); + //this.response_cf = new CompletableFuture<HttpResponseImpl>(); + this.requestPseudoHeaders = new HttpHeadersImpl(); + // NEW + this.inputQ = new Queue<>(); + } + + /** + * Entry point from Http2Connection reader thread. + * + * Data frames will be removed by response body thread. + * + * @param frame + * @throws IOException + */ + void incoming(Http2Frame frame) throws IOException, InterruptedException { + if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) { + // Complete headers accumulated. handle response. + // It's okay if there are multiple HeaderFrames. + handleResponse(); + } else if (frame instanceof DataFrame) { + inputQ.put(frame); + } else { + otherFrame(frame); + } + } + + void otherFrame(Http2Frame frame) throws IOException { + switch (frame.type()) { + case WindowUpdateFrame.TYPE: + incoming_windowUpdate((WindowUpdateFrame) frame); + break; + case ResetFrame.TYPE: + incoming_reset((ResetFrame) frame); + break; + case PriorityFrame.TYPE: + incoming_priority((PriorityFrame) frame); + break; + default: + String msg = "Unexpected frame: " + frame.toString(); + throw new IOException(msg); + } + } + + // The Hpack decoder decodes into one of these consumers of name,value pairs + + DecodingCallback rspHeadersConsumer() { + return rspHeadersConsumer; + } + + // create and return the HttpResponseImpl + protected void handleResponse() throws IOException { + HttpConnection c = connection.connection; // TODO: improve + long statusCode = responseHeaders + .firstValueAsLong(":status") + .orElseThrow(() -> new IOException("no statuscode in response")); + + this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null, + c.sslParameters(), HttpClient.Version.HTTP_2, c); + this.responseContentLen = responseHeaders + .firstValueAsLong("content-length") + .orElse(-1L); + // different implementations for normal streams and pushed streams + completeResponse(response); + } + + void incoming_reset(ResetFrame frame) { + // TODO: implement reset + int error = frame.getErrorCode(); + IOException e = new IOException(ErrorFrame.stringForCode(error)); + completeResponseExceptionally(e); + throw new UnsupportedOperationException("Not implemented"); + } + + void incoming_priority(PriorityFrame frame) { + // TODO: implement priority + throw new UnsupportedOperationException("Not implemented"); + } + + void incoming_windowUpdate(WindowUpdateFrame frame) { + int amount = frame.getUpdate(); + if (amount > 0) + remoteRequestFlowController.accept(amount); + } + + void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { + if (Log.requests()) { + Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); + } + PushGroup<?> pushGroup = request.pushGroup(); + if (pushGroup == null) { + cancelImpl(new IllegalStateException("unexpected push promise")); + } + // get the handler and call it. + BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph = + pushGroup.pushHandler(); + + CompletableFuture<HttpResponse> pushCF = pushStream + .getResponseAsync(null) + .thenApply(r -> (HttpResponse)r); + boolean accept = ph.apply(pushReq, pushCF); + if (!accept) { + IOException ex = new IOException("Stream cancelled by user"); + cancelImpl(ex); + pushCF.completeExceptionally(ex); + } else { + pushStream.requestSent(); + pushGroup.addPush(); + } + } + + private OutgoingHeaders headerFrame(long contentLength) { + HttpHeadersImpl h = request.getSystemHeaders(); + if (contentLength > 0) { + h.setHeader("content-length", Long.toString(contentLength)); + } + setPseudoHeaderFields(); + OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this); + if (contentLength == 0) { + f.setFlag(HeadersFrame.END_STREAM); + } + return f; + } + + private void setPseudoHeaderFields() { + HttpHeadersImpl hdrs = requestPseudoHeaders; + String method = request.method(); + hdrs.setHeader(":method", method); + URI uri = request.uri(); + hdrs.setHeader(":scheme", uri.getScheme()); + // TODO: userinfo deprecated. Needs to be removed + hdrs.setHeader(":authority", uri.getAuthority()); + // TODO: ensure header names beginning with : not in user headers + String query = uri.getQuery(); + String path = uri.getPath(); + if (path == null) { + if (method.equalsIgnoreCase("OPTIONS")) { + path = "*"; + } else { + path = "/"; + } + } + if (query != null) { + path += "?" + query; + } + hdrs.setHeader(":path", path); + } + + HttpHeadersImpl getRequestPseudoHeaders() { + return requestPseudoHeaders; } @Override HttpResponseImpl getResponse() throws IOException { ! try { ! return getResponseAsync(null).join(); ! } catch (Throwable e) { ! Throwable t = e.getCause(); ! if (t instanceof IOException) { ! throw (IOException)t; ! } ! throw e; ! } } @Override void sendRequest() throws IOException, InterruptedException { + sendHeadersOnly(); + sendBody(); + } + + /** + * A simple general purpose blocking flow controller + */ + class FlowController implements LongConsumer { + int permits; + + FlowController() { + this.permits = 0; + } + + @Override + public synchronized void accept(long n) { + if (n < 1) { + throw new InternalError("FlowController.accept called with " + n); + } + if (permits == 0) { + permits += n; + notifyAll(); + } else { + permits += n; + } + } + + public synchronized void take() throws InterruptedException { + take(1); + } + + public synchronized void take(int amount) throws InterruptedException { + assert permits >= 0; + while (permits < amount) { + int n = Math.min(amount, permits); + permits -= n; + amount -= n; + if (amount > 0) + wait(); + } + } } @Override void sendHeadersOnly() throws IOException, InterruptedException { + if (Log.requests() && request != null) { + Log.logRequest(request.toString()); + } + requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController); + OutgoingHeaders f = headerFrame(requestContentLen); + connection.sendFrame(f); } @Override void sendBody() throws IOException, InterruptedException { + sendBodyImpl(); + } + + void registerStream(int id) { + this.streamid = id; + connection.putStream(this, streamid); + } + + DataFrame getDataFrame() throws IOException, InterruptedException { + userRequestFlowController.take(); + int maxpayloadLen = connection.getMaxSendFrameSize() - 9; + ByteBuffer buffer = connection.getBuffer(); + buffer.limit(maxpayloadLen); + boolean complete = requestProcessor.onRequestBodyChunk(buffer); + buffer.flip(); + int amount = buffer.remaining(); + // wait for flow control if necessary. Following method will block + // until after headers frame is sent, so correct streamid is set. + remoteRequestFlowController.take(amount); + connection.obtainSendWindow(amount); + + DataFrame df = new DataFrame(); + df.streamid(streamid); + if (complete) { + df.setFlag(DataFrame.END_STREAM); + } + df.setData(buffer); + df.computeLength(); + return df; } + @Override CompletableFuture<Void> sendHeadersAsync() { ! try { ! sendHeadersOnly(); ! return CompletableFuture.completedFuture(null); ! } catch (IOException | InterruptedException ex) { ! return CompletableFuture.failedFuture(ex); } + } + + /** + * A List of responses relating to this stream. Normally there is only + * one response, but intermediate responses like 100 are allowed + * and must be passed up to higher level before continuing. Deals with races + * such as if responses are returned before the CFs get created by + * getResponseAsync() + */ + + final List<CompletableFuture<HttpResponseImpl>> response_cfs = new LinkedList<>(); @Override CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) { ! CompletableFuture<HttpResponseImpl> cf; ! synchronized (response_cfs) { ! if (!response_cfs.isEmpty()) { ! cf = response_cfs.remove(0); ! } else { ! cf = new CompletableFuture<>(); ! response_cfs.add(cf); ! } ! } ! PushGroup<?> pg = request.pushGroup(); ! if (pg != null) { ! // if an error occurs make sure it is recorded in the PushGroup ! cf = cf.whenComplete((t,e) -> pg.pushError(e)); ! } ! return cf; ! } ! ! /** ! * Completes the first uncompleted CF on list, and removes it. If there is no ! * uncompleted CF then creates one (completes it) and adds to list ! */ ! void completeResponse(HttpResponse r) { ! HttpResponseImpl resp = (HttpResponseImpl)r; ! synchronized (response_cfs) { ! for (CompletableFuture<HttpResponseImpl> cf : response_cfs) { ! if (!cf.isDone()) { ! cf.complete(resp); ! response_cfs.remove(cf); ! //responseHeaders = new HttpHeadersImpl(); // for any following header blocks ! return; ! } else ! System.err.println("Stream: " + this + " ALREADY DONE"); ! } ! response_cfs.add(CompletableFuture.completedFuture(resp)); ! //responseHeaders = new HttpHeadersImpl(); // for any following header blocks ! } } ! // methods to update state and remove stream when finished ! ! synchronized void requestSent() { ! requestSent = true; ! if (responseReceived) ! connection.deleteStream(this); ! } ! ! synchronized void responseReceived() { ! responseReceived = true; ! if (requestSent) ! connection.deleteStream(this); ! PushGroup<?> pg = request.pushGroup(); ! if (pg != null) ! pg.noMorePushes(); ! } ! ! /** ! * same as above but for errors ! * ! * @param t ! */ ! void completeResponseExceptionally(Throwable t) { ! synchronized (response_cfs) { ! for (CompletableFuture<HttpResponseImpl> cf : response_cfs) { ! if (!cf.isDone()) { ! cf.completeExceptionally(t); ! response_cfs.remove(cf); ! return; ! } ! } ! response_cfs.add(CompletableFuture.failedFuture(t)); ! } ! } ! ! void sendBodyImpl() throws IOException, InterruptedException { ! if (requestContentLen == 0) { ! // no body ! return; ! } ! DataFrame df; ! do { ! df = getDataFrame(); ! // TODO: check accumulated content length (if not checked below) ! connection.sendFrame(df); ! } while (!df.getFlag(DataFrame.END_STREAM)); ! requestSent(); } @Override void cancel() { + cancelImpl(new Exception("Cancelled")); } + void cancelImpl(Throwable e) { + Log.logTrace("cancelling stream: {0}\n", e.toString()); + inputQ.close(); + try { + connection.resetStream(streamid, ResetFrame.CANCEL); + } catch (IOException | InterruptedException ex) { + Log.logError(ex); + } + } + @Override CompletableFuture<Void> sendRequestAsync() { ! CompletableFuture<Void> cf = new CompletableFuture<>(); ! executor.execute(() -> { ! try { ! sendRequest(); ! cf.complete(null); ! } catch (IOException |InterruptedException e) { ! cf.completeExceptionally(e); ! } ! }, null); ! return cf; } @Override <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException { ! this.responseProcessor = processor; ! T body = processor.onResponseBodyStart( ! responseContentLen, responseHeaders, ! responseFlowController); // TODO: filter headers ! if (body == null) { ! receiveData(); ! return processor.onResponseComplete(); ! } else ! receiveDataAsync(processor); ! responseReceived(); ! return body; ! } ! ! // called from Http2Connection reader thread ! synchronized void updateOutgoingWindow(int update) { ! remoteRequestFlowController.accept(update); ! } ! ! void close(String msg) { ! cancel(); ! } ! ! static class PushedStream extends Stream { ! final PushGroup<?> pushGroup; ! final private Stream parent; // used by server push streams ! // push streams need the response CF allocated up front as it is ! // given directly to user via the multi handler callback function. ! final CompletableFuture<HttpResponseImpl> pushCF; ! final HttpRequestImpl pushReq; ! ! PushedStream(PushGroup<?> pushGroup, HttpClientImpl client, ! Http2Connection connection, Stream parent, ! HttpRequestImpl pushReq) { ! super(client, connection, pushReq); ! this.pushGroup = pushGroup; ! this.pushReq = pushReq; ! this.pushCF = new CompletableFuture<>(); ! this.parent = parent; ! } ! ! // Following methods call the super class but in case of ! // error record it in the PushGroup. The error method is called ! // with a null value when no error occurred (is a no-op) ! @Override ! CompletableFuture<Void> sendBodyAsync() { ! return super.sendBodyAsync() ! .whenComplete((v, t) -> pushGroup.pushError(t)); ! } ! ! @Override ! CompletableFuture<Void> sendHeadersAsync() { ! return super.sendHeadersAsync() ! .whenComplete((v, t) -> pushGroup.pushError(t)); ! } ! ! @Override ! CompletableFuture<Void> sendRequestAsync() { ! return super.sendRequestAsync() ! .whenComplete((v, t) -> pushGroup.pushError(t)); ! } ! ! @Override ! CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) { ! return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); ! } ! ! @Override ! <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) { ! return super.responseBodyAsync(processor) ! .whenComplete((v, t) -> pushGroup.pushError(t)); ! } ! ! @Override ! void completeResponse(HttpResponse r) { ! HttpResponseImpl resp = (HttpResponseImpl)r; ! Utils.logResponse(resp); ! pushCF.complete(resp); ! } ! ! @Override ! void completeResponseExceptionally(Throwable t) { ! pushCF.completeExceptionally(t); ! } ! ! @Override ! synchronized void responseReceived() { ! super.responseReceived(); ! pushGroup.pushCompleted(); ! } ! ! // create and return the PushResponseImpl ! @Override ! protected void handleResponse() { ! HttpConnection c = connection.connection; // TODO: improve ! long statusCode = responseHeaders ! .firstValueAsLong(":status") ! .orElse(-1L); ! ! if (statusCode == -1L) ! completeResponseExceptionally(new IOException("No status code")); ! ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS); ! this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this, ! c.sslParameters()); ! this.responseContentLen = responseHeaders ! .firstValueAsLong("content-length") ! .orElse(-1L); ! // different implementations for normal streams and pushed streams ! completeResponse(response); ! } ! } ! ! /** ! * One PushGroup object is associated with the parent Stream of ! * the pushed Streams. This keeps track of all common state associated ! * with the pushes. ! */ ! static class PushGroup<T> { ! // the overall completion object, completed when all pushes are done. ! final CompletableFuture<T> resultCF; ! Throwable error; // any exception that occured during pushes ! ! // CF for main response ! final CompletableFuture<HttpResponse> mainResponse; ! ! // user's processor object ! final HttpResponse.MultiProcessor<T> multiProcessor; ! ! // per push handler function provided by processor ! final private BiFunction<HttpRequest, ! CompletableFuture<HttpResponse>, ! Boolean> pushHandler; ! int numberOfPushes; ! int remainingPushes; ! boolean noMorePushes = false; ! ! PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) { ! this.resultCF = new CompletableFuture<>(); ! this.mainResponse = new CompletableFuture<>(); ! this.multiProcessor = multiProcessor; ! this.pushHandler = multiProcessor.onStart(req, mainResponse); ! } ! ! CompletableFuture<T> groupResult() { ! return resultCF; ! } ! ! CompletableFuture<HttpResponse> mainResponse() { ! return mainResponse; ! } ! ! private BiFunction<HttpRequest, ! CompletableFuture<HttpResponse>, Boolean> pushHandler() ! { ! return pushHandler; ! } ! ! synchronized void addPush() { ! numberOfPushes++; ! remainingPushes++; ! } ! ! synchronized int numberOfPushes() { ! return numberOfPushes; ! } ! // This is called when the main body response completes because it means ! // no more PUSH_PROMISEs are possible ! synchronized void noMorePushes() { ! noMorePushes = true; ! checkIfCompleted(); ! } ! ! synchronized void pushCompleted() { ! remainingPushes--; ! checkIfCompleted(); ! } ! ! synchronized void checkIfCompleted() { ! if (remainingPushes == 0 && error == null && noMorePushes) { ! T overallResult = multiProcessor.onComplete(); ! resultCF.complete(overallResult); ! } ! } ! ! synchronized void pushError(Throwable t) { ! if (t == null) ! return; ! this.error = t; ! resultCF.completeExceptionally(t); ! } } }
< prev index next >