--- old/src/java.httpclient/share/classes/java/net/http/Stream.java 2016-04-25 23:10:31.485374693 +0100 +++ new/src/java.httpclient/share/classes/java/net/http/Stream.java 2016-04-25 23:10:31.197374697 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,78 +24,818 @@ package java.net.http; +import sun.net.httpclient.hpack.DecodingCallback; + import java.io.IOException; -import java.io.UncheckedIOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; +import java.util.function.BiFunction; import java.util.function.LongConsumer; /** - * Http/2 Stream + * Http/2 Stream handling. + * + * REQUESTS + * + * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q + * + * sendRequest() -- sendHeadersOnly() + sendBody() + * + * sendBody() -- in calling thread: obeys all flow control (so may block) + * obtains data from request body processor and places on connection + * outbound Q. + * + * sendBodyAsync() -- calls sendBody() in an executor thread. + * + * sendHeadersAsync() -- calls sendHeadersOnly() which does not block + * + * sendRequestAsync() -- calls sendRequest() in an executor thread + * + * RESPONSES + * + * Multiple responses can be received per request. Responses are queued up on + * a LinkedList of CF and the the first one on the list is completed + * with the next response + * + * getResponseAsync() -- queries list of response CFs and returns first one + * if one exists. Otherwise, creates one and adds it to list + * and returns it. Completion is achieved through the + * incoming() upcall from connection reader thread. + * + * getResponse() -- calls getResponseAsync() and waits for CF to complete + * + * responseBody() -- in calling thread: blocks for incoming DATA frames on + * stream inputQ. Obeys remote and local flow control so may block. + * Calls user response body processor with data buffers. + * + * responseBodyAsync() -- calls responseBody() in an executor thread. + * + * incoming() -- entry point called from connection reader thread. Frames are + * either handled immediately without blocking or for data frames + * placed on the stream's inputQ which is consumed by the stream's + * reader thread. + * + * PushedStream sub class + * ====================== + * Sending side methods are not used because the request comes from a PUSH_PROMISE + * frame sent by the server. When a PUSH_PROMISE is received the PushedStream + * is created. PushedStream does not use responseCF list as there can be only + * one response. The CF is created when the object created and when the response + * HEADERS frame is received the object is completed. */ class Stream extends ExchangeImpl { - void debugPrint() { - } + final Queue inputQ; + + volatile int streamid; + + long responseContentLen = -1; + long responseBytesProcessed = 0; + long requestContentLen; + + Http2Connection connection; + HttpClientImpl client; + final HttpRequestImpl request; + final DecodingCallback rspHeadersConsumer; + HttpHeadersImpl responseHeaders; + final HttpHeadersImpl requestHeaders; + final HttpHeadersImpl requestPseudoHeaders; + HttpResponse.BodyProcessor responseProcessor; + final HttpRequest.BodyProcessor requestProcessor; + HttpResponse response; + + // state flags + boolean requestSent, responseReceived; + + final FlowController userRequestFlowController = + new FlowController(); + final FlowController remoteRequestFlowController = + new FlowController(); + final FlowController responseFlowController = + new FlowController(); + + final ExecutorWrapper executor; @Override @SuppressWarnings("unchecked") CompletableFuture responseBodyAsync(HttpResponse.BodyProcessor processor) { - return null; + this.responseProcessor = processor; + CompletableFuture cf; + try { + T body = processor.onResponseBodyStart( + responseContentLen, responseHeaders, + responseFlowController); // TODO: filter headers + if (body != null) + cf = CompletableFuture.completedFuture(body); + else + cf = receiveDataAsync(processor); + } catch (IOException e) { + cf = CompletableFuture.failedFuture(e); + } + PushGroup pg = request.pushGroup(); + if (pg != null) { + // if an error occurs make sure it is recorded in the PushGroup + cf = cf.whenComplete((t,e) -> pg.pushError(e)); + } + return cf; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("streamid: ") + .append(streamid); + return sb.toString(); + } + + // pushes entire response body into response processor + // blocking when required by local or remote flow control + void receiveData() throws IOException { + Http2Frame frame; + DataFrame df = null; + try { + do { + frame = inputQ.take(); + if (!(frame instanceof DataFrame)) { + assert false; + continue; + } + df = (DataFrame) frame; + int len = df.getDataLength(); + ByteBuffer[] buffers = df.getData(); + for (ByteBuffer b : buffers) { + responseFlowController.take(); + responseProcessor.onResponseBodyChunk(b); + } + sendWindowUpdate(len); + } while (!df.getFlag(DataFrame.END_STREAM)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private CompletableFuture receiveDataAsync(HttpResponse.BodyProcessor processor) { + CompletableFuture cf = new CompletableFuture<>(); + executor.execute(() -> { + try { + receiveData(); + T body = processor.onResponseComplete(); + cf.complete(body); + responseReceived(); + } catch (Throwable t) { + cf.completeExceptionally(t); + } + }, null); + return cf; + } + + private void sendWindowUpdate(int increment) + throws IOException, InterruptedException { + if (increment == 0) + return; + LinkedList list = new LinkedList<>(); + WindowUpdateFrame frame = new WindowUpdateFrame(); + frame.streamid(streamid); + frame.setUpdate(increment); + list.add(frame); + frame = new WindowUpdateFrame(); + frame.streamid(0); + frame.setUpdate(increment); + list.add(frame); + connection.sendFrames(list); } + @Override + CompletableFuture sendBodyAsync() { + final CompletableFuture cf = new CompletableFuture<>(); + executor.execute(() -> { + try { + sendBodyImpl(); + cf.complete(null); + } catch (IOException | InterruptedException e) { + cf.completeExceptionally(e); + } + }, null); + return cf; + } + + @SuppressWarnings("unchecked") Stream(HttpClientImpl client, Http2Connection connection, Exchange e) { super(e); + this.client = client; + this.connection = connection; + this.request = e.request(); + this.requestProcessor = request.requestProcessor(); + responseHeaders = new HttpHeadersImpl(); + requestHeaders = new HttpHeadersImpl(); + rspHeadersConsumer = (name, value) -> { + responseHeaders.addHeader(name.toString(), value.toString()); + }; + this.executor = client.executorWrapper(); + //this.response_cf = new CompletableFuture(); + this.requestPseudoHeaders = new HttpHeadersImpl(); + // NEW + this.inputQ = new Queue<>(); + } + + @SuppressWarnings("unchecked") + Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) { + super(null); + this.client = client; + this.connection = connection; + this.request = req; + this.requestProcessor = null; + responseHeaders = new HttpHeadersImpl(); + requestHeaders = new HttpHeadersImpl(); + rspHeadersConsumer = (name, value) -> { + responseHeaders.addHeader(name.toString(), value.toString()); + }; + this.executor = client.executorWrapper(); + //this.response_cf = new CompletableFuture(); + this.requestPseudoHeaders = new HttpHeadersImpl(); + // NEW + this.inputQ = new Queue<>(); + } + + /** + * Entry point from Http2Connection reader thread. + * + * Data frames will be removed by response body thread. + * + * @param frame + * @throws IOException + */ + void incoming(Http2Frame frame) throws IOException, InterruptedException { + if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) { + // Complete headers accumulated. handle response. + // It's okay if there are multiple HeaderFrames. + handleResponse(); + } else if (frame instanceof DataFrame) { + inputQ.put(frame); + } else { + otherFrame(frame); + } + } + + void otherFrame(Http2Frame frame) throws IOException { + switch (frame.type()) { + case WindowUpdateFrame.TYPE: + incoming_windowUpdate((WindowUpdateFrame) frame); + break; + case ResetFrame.TYPE: + incoming_reset((ResetFrame) frame); + break; + case PriorityFrame.TYPE: + incoming_priority((PriorityFrame) frame); + break; + default: + String msg = "Unexpected frame: " + frame.toString(); + throw new IOException(msg); + } + } + + // The Hpack decoder decodes into one of these consumers of name,value pairs + + DecodingCallback rspHeadersConsumer() { + return rspHeadersConsumer; + } + + // create and return the HttpResponseImpl + protected void handleResponse() throws IOException { + HttpConnection c = connection.connection; // TODO: improve + long statusCode = responseHeaders + .firstValueAsLong(":status") + .orElseThrow(() -> new IOException("no statuscode in response")); + + this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null, + c.sslParameters(), HttpClient.Version.HTTP_2, c); + this.responseContentLen = responseHeaders + .firstValueAsLong("content-length") + .orElse(-1L); + // different implementations for normal streams and pushed streams + completeResponse(response); + } + + void incoming_reset(ResetFrame frame) { + // TODO: implement reset + int error = frame.getErrorCode(); + IOException e = new IOException(ErrorFrame.stringForCode(error)); + completeResponseExceptionally(e); + throw new UnsupportedOperationException("Not implemented"); + } + + void incoming_priority(PriorityFrame frame) { + // TODO: implement priority + throw new UnsupportedOperationException("Not implemented"); + } + + void incoming_windowUpdate(WindowUpdateFrame frame) { + int amount = frame.getUpdate(); + if (amount > 0) + remoteRequestFlowController.accept(amount); + } + + void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { + if (Log.requests()) { + Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); + } + PushGroup pushGroup = request.pushGroup(); + if (pushGroup == null) { + cancelImpl(new IllegalStateException("unexpected push promise")); + } + // get the handler and call it. + BiFunction,Boolean> ph = + pushGroup.pushHandler(); + + CompletableFuture pushCF = pushStream + .getResponseAsync(null) + .thenApply(r -> (HttpResponse)r); + boolean accept = ph.apply(pushReq, pushCF); + if (!accept) { + IOException ex = new IOException("Stream cancelled by user"); + cancelImpl(ex); + pushCF.completeExceptionally(ex); + } else { + pushStream.requestSent(); + pushGroup.addPush(); + } + } + + private OutgoingHeaders headerFrame(long contentLength) { + HttpHeadersImpl h = request.getSystemHeaders(); + if (contentLength > 0) { + h.setHeader("content-length", Long.toString(contentLength)); + } + setPseudoHeaderFields(); + OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this); + if (contentLength == 0) { + f.setFlag(HeadersFrame.END_STREAM); + } + return f; + } + + private void setPseudoHeaderFields() { + HttpHeadersImpl hdrs = requestPseudoHeaders; + String method = request.method(); + hdrs.setHeader(":method", method); + URI uri = request.uri(); + hdrs.setHeader(":scheme", uri.getScheme()); + // TODO: userinfo deprecated. Needs to be removed + hdrs.setHeader(":authority", uri.getAuthority()); + // TODO: ensure header names beginning with : not in user headers + String query = uri.getQuery(); + String path = uri.getPath(); + if (path == null) { + if (method.equalsIgnoreCase("OPTIONS")) { + path = "*"; + } else { + path = "/"; + } + } + if (query != null) { + path += "?" + query; + } + hdrs.setHeader(":path", path); + } + + HttpHeadersImpl getRequestPseudoHeaders() { + return requestPseudoHeaders; } @Override HttpResponseImpl getResponse() throws IOException { - return null; + try { + return getResponseAsync(null).join(); + } catch (Throwable e) { + Throwable t = e.getCause(); + if (t instanceof IOException) { + throw (IOException)t; + } + throw e; + } } @Override void sendRequest() throws IOException, InterruptedException { + sendHeadersOnly(); + sendBody(); + } + + /** + * A simple general purpose blocking flow controller + */ + class FlowController implements LongConsumer { + int permits; + + FlowController() { + this.permits = 0; + } + + @Override + public synchronized void accept(long n) { + if (n < 1) { + throw new InternalError("FlowController.accept called with " + n); + } + if (permits == 0) { + permits += n; + notifyAll(); + } else { + permits += n; + } + } + + public synchronized void take() throws InterruptedException { + take(1); + } + + public synchronized void take(int amount) throws InterruptedException { + assert permits >= 0; + while (permits < amount) { + int n = Math.min(amount, permits); + permits -= n; + amount -= n; + if (amount > 0) + wait(); + } + } } @Override void sendHeadersOnly() throws IOException, InterruptedException { + if (Log.requests() && request != null) { + Log.logRequest(request.toString()); + } + requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController); + OutgoingHeaders f = headerFrame(requestContentLen); + connection.sendFrame(f); } @Override void sendBody() throws IOException, InterruptedException { + sendBodyImpl(); } - @Override - CompletableFuture sendHeadersAsync() { - return null; + void registerStream(int id) { + this.streamid = id; + connection.putStream(this, streamid); + } + + DataFrame getDataFrame() throws IOException, InterruptedException { + userRequestFlowController.take(); + int maxpayloadLen = connection.getMaxSendFrameSize() - 9; + ByteBuffer buffer = connection.getBuffer(); + buffer.limit(maxpayloadLen); + boolean complete = requestProcessor.onRequestBodyChunk(buffer); + buffer.flip(); + int amount = buffer.remaining(); + // wait for flow control if necessary. Following method will block + // until after headers frame is sent, so correct streamid is set. + remoteRequestFlowController.take(amount); + connection.obtainSendWindow(amount); + + DataFrame df = new DataFrame(); + df.streamid(streamid); + if (complete) { + df.setFlag(DataFrame.END_STREAM); + } + df.setData(buffer); + df.setLength(); + return df; } + @Override - CompletableFuture getResponseAsync(Void v) { - return null; - } + CompletableFuture sendHeadersAsync() { + try { + sendHeadersOnly(); + return CompletableFuture.completedFuture(null); + } catch (IOException | InterruptedException ex) { + return CompletableFuture.failedFuture(ex); + } + } + + /** + * A List of responses relating to this stream. Normally there is only + * one response, but intermediate responses like 100 are allowed + * and must be passed up to higher level before continuing. Deals with races + * such as if responses are returned before the CFs get created by + * getResponseAsync() + */ + + final List> response_cfs = new LinkedList<>(); @Override - CompletableFuture sendBodyAsync() { - return null; + CompletableFuture getResponseAsync(Void v) { + CompletableFuture cf; + synchronized (response_cfs) { + if (!response_cfs.isEmpty()) { + cf = response_cfs.remove(0); + } else { + cf = new CompletableFuture<>(); + response_cfs.add(cf); + } + } + PushGroup pg = request.pushGroup(); + if (pg != null) { + // if an error occurs make sure it is recorded in the PushGroup + cf = cf.whenComplete((t,e) -> pg.pushError(e)); + } + return cf; + } + + /** + * Completes the first uncompleted CF on list, and removes it. If there is no + * uncompleted CF then creates one (completes it) and adds to list + */ + void completeResponse(HttpResponse r) { + HttpResponseImpl resp = (HttpResponseImpl)r; + synchronized (response_cfs) { + for (CompletableFuture cf : response_cfs) { + if (!cf.isDone()) { + cf.complete(resp); + response_cfs.remove(cf); + responseHeaders = new HttpHeadersImpl(); // for any following header blocks + return; + } else + System.err.println("Stream: " + this + " ALREADY DONE"); + } + response_cfs.add(CompletableFuture.completedFuture(resp)); + responseHeaders = new HttpHeadersImpl(); // for any following header blocks + } + } + + // methods to update state and remove stream when finished + + synchronized void requestSent() { + requestSent = true; + if (responseReceived) + connection.deleteStream(this); + } + + synchronized void responseReceived() { + responseReceived = true; + if (requestSent) + connection.deleteStream(this); + PushGroup pg = request.pushGroup(); + if (pg != null) + pg.noMorePushes(); + } + + /** + * same as above but for errors + * + * @param t + */ + void completeResponseExceptionally(Throwable t) { + synchronized (response_cfs) { + for (CompletableFuture cf : response_cfs) { + if (!cf.isDone()) { + cf.completeExceptionally(t); + response_cfs.remove(cf); + return; + } + } + response_cfs.add(CompletableFuture.failedFuture(t)); + } + } + + void sendBodyImpl() throws IOException, InterruptedException { + if (requestContentLen == 0) { + // no body + return; + } + DataFrame df; + do { + df = getDataFrame(); + // TODO: check accumulated content length (if not checked below) + connection.sendFrame(df); + } while (!df.getFlag(DataFrame.END_STREAM)); + requestSent(); } @Override void cancel() { + cancelImpl(new Exception("Cancelled")); } + void cancelImpl(Throwable e) { + Log.logTrace("cancelling stream: {0}\n", e.toString()); + inputQ.close(); + try { + connection.resetStream(streamid, ResetFrame.CANCEL); + } catch (IOException | InterruptedException ex) { + Log.logError(ex); + } + } + @Override CompletableFuture sendRequestAsync() { - return null; + CompletableFuture cf = new CompletableFuture<>(); + executor.execute(() -> { + try { + sendRequest(); + cf.complete(null); + } catch (IOException |InterruptedException e) { + cf.completeExceptionally(e); + } + }, null); + return cf; } @Override T responseBody(HttpResponse.BodyProcessor processor) throws IOException { - return null; + this.responseProcessor = processor; + T body = processor.onResponseBodyStart( + responseContentLen, responseHeaders, + responseFlowController); // TODO: filter headers + if (body == null) { + receiveData(); + return processor.onResponseComplete(); + } else + receiveDataAsync(processor); + responseReceived(); + return body; + } + + // called from Http2Connection reader thread + synchronized void updateOutgoingWindow(int update) { + remoteRequestFlowController.accept(update); + } + + void close(String msg) { + cancel(); + } + + static class PushedStream extends Stream { + final PushGroup pushGroup; + final private Stream parent; // used by server push streams + // push streams need the response CF allocated up front as it is + // given directly to user via the multi handler callback function. + final CompletableFuture pushCF; + final HttpRequestImpl pushReq; + + PushedStream(PushGroup pushGroup, HttpClientImpl client, + Http2Connection connection, Stream parent, + HttpRequestImpl pushReq) { + super(client, connection, pushReq); + this.pushGroup = pushGroup; + this.pushReq = pushReq; + this.pushCF = new CompletableFuture<>(); + this.parent = parent; + } + + // Following methods call the super class but in case of + // error record it in the PushGroup. The error method is called + // with a null value when no error occurred (is a no-op) + @Override + CompletableFuture sendBodyAsync() { + return super.sendBodyAsync() + .whenComplete((v, t) -> pushGroup.pushError(t)); + } + + @Override + CompletableFuture sendHeadersAsync() { + return super.sendHeadersAsync() + .whenComplete((v, t) -> pushGroup.pushError(t)); + } + + @Override + CompletableFuture sendRequestAsync() { + return super.sendRequestAsync() + .whenComplete((v, t) -> pushGroup.pushError(t)); + } + + @Override + CompletableFuture getResponseAsync(Void vo) { + return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); + } + + @Override + CompletableFuture responseBodyAsync(HttpResponse.BodyProcessor processor) { + return super.responseBodyAsync(processor) + .whenComplete((v, t) -> pushGroup.pushError(t)); + } + + @Override + void completeResponse(HttpResponse r) { + HttpResponseImpl resp = (HttpResponseImpl)r; + Utils.logResponse(resp); + pushCF.complete(resp); + } + + @Override + void completeResponseExceptionally(Throwable t) { + pushCF.completeExceptionally(t); + } + + @Override + synchronized void responseReceived() { + super.responseReceived(); + pushGroup.pushCompleted(); + } + + // create and return the PushResponseImpl + @Override + protected void handleResponse() { + HttpConnection c = connection.connection; // TODO: improve + long statusCode = responseHeaders + .firstValueAsLong(":status") + .orElse(-1L); + + if (statusCode == -1L) + completeResponseExceptionally(new IOException("No status code")); + ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS); + this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this, + c.sslParameters()); + this.responseContentLen = responseHeaders + .firstValueAsLong("content-length") + .orElse(-1L); + // different implementations for normal streams and pushed streams + completeResponse(response); + } + } + + /** + * One PushGroup object is associated with the parent Stream of + * the pushed Streams. This keeps track of all common state associated + * with the pushes. + */ + static class PushGroup { + // the overall completion object, completed when all pushes are done. + final CompletableFuture resultCF; + Throwable error; // any exception that occured during pushes + + // CF for main response + final CompletableFuture mainResponse; + + // user's processor object + final HttpResponse.MultiProcessor multiProcessor; + + // per push handler function provided by processor + final private BiFunction, + Boolean> pushHandler; + int numberOfPushes; + int remainingPushes; + boolean noMorePushes = false; + + PushGroup(HttpResponse.MultiProcessor multiProcessor, HttpRequestImpl req) { + this.resultCF = new CompletableFuture<>(); + this.mainResponse = new CompletableFuture<>(); + this.multiProcessor = multiProcessor; + this.pushHandler = multiProcessor.onStart(req, mainResponse); + } + + CompletableFuture groupResult() { + return resultCF; + } + + CompletableFuture mainResponse() { + return mainResponse; + } + + private BiFunction, Boolean> pushHandler() + { + return pushHandler; + } + + synchronized void addPush() { + numberOfPushes++; + remainingPushes++; + } + + synchronized int numberOfPushes() { + return numberOfPushes; + } + // This is called when the main body response completes because it means + // no more PUSH_PROMISEs are possible + synchronized void noMorePushes() { + noMorePushes = true; + checkIfCompleted(); + } + + synchronized void pushCompleted() { + remainingPushes--; + checkIfCompleted(); + } + + synchronized void checkIfCompleted() { + if (remainingPushes == 0 && error == null && noMorePushes) { + T overallResult = multiProcessor.onComplete(); + resultCF.complete(overallResult); + } + } + + synchronized void pushError(Throwable t) { + if (t == null) + return; + this.error = t; + resultCF.completeExceptionally(t); + } } }