/* * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.net.http; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.http.HttpConnection.Mode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import sun.net.httpclient.hpack.Encoder; import sun.net.httpclient.hpack.Decoder; import static java.net.http.SettingsFrame.*; import static java.net.http.Utils.BUFSIZE; import java.util.ArrayList; import java.util.Collections; import java.util.Formatter; import java.util.stream.Collectors; import sun.net.httpclient.hpack.DecodingCallback; /** * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. * * Http2Connections belong to a Http2ClientImpl, (one of) which belongs * to a HttpClientImpl. * * Creation cases: * 1) upgraded HTTP/1.1 plain tcp connection * 2) prior knowledge directly created plain tcp connection * 3) directly created HTTP/2 SSL connection which uses ALPN. * * Sending is done by writing directly to underlying HttpConnection object which * is operating in async mode. No flow control applies on output at this level * and all writes are just executed as puts to an output Q belonging to HttpConnection * Flow control is implemented by HTTP/2 protocol itself. * * Hpack header compression * and outgoing stream creation is also done here, because these operations * must be synchronized at the socket level. Stream objects send frames simply * by placing them on the connection's output Queue. sendFrame() is called * from a higher level (Stream) thread. * * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles * incoming Http2Frames, and directs them to the appropriate Stream.incoming() * or handles them directly itself. This thread performs hpack decompression * and incoming stream creation (Server push). Incoming frames destined for a * stream are provided by calling Stream.incoming(). */ class Http2Connection implements BufferHandler { final Queue outputQ; volatile boolean closed; //------------------------------------- final HttpConnection connection; HttpClientImpl client; final Http2ClientImpl client2; Map streams; int nextstreamid = 3; // stream 1 is registered separately int nextPushStream = 2; Encoder hpackOut; Decoder hpackIn; SettingsFrame clientSettings, serverSettings; ByteBufferConsumer bbc; final LinkedList freeList; final String key; // for HttpClientImpl.connections map FrameReader reader; // Connection level flow control windows final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE); final static int DEFAULT_FRAME_SIZE = 16 * 1024; private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY; final ExecutorWrapper executor; /** * This is established by the protocol spec and the peer will update it with * WINDOW_UPDATEs, which affects the connectionSendWindow. */ final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1; // TODO: need list of control frames from other threads // that need to be sent /** * Case 1) Create from upgraded HTTP/1.1 connection. * Is ready to use. Will not be SSL. exchange is the Exchange * that initiated the connection, whose response will be delivered * on a Stream. */ Http2Connection(HttpConnection connection, Http2ClientImpl client2, Exchange exchange) throws IOException, InterruptedException { this.outputQ = new Queue<>(); String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); Log.logTrace(msg); //this.initialExchange = exchange; assert !(connection instanceof SSLConnection); this.connection = connection; this.client = client2.client(); this.client2 = client2; this.executor = client.executorWrapper(); this.freeList = new LinkedList<>(); this.key = keyFor(connection); streams = Collections.synchronizedMap(new HashMap<>()); initCommon(); //sendConnectionPreface(); Stream initialStream = createStream(exchange); initialStream.registerStream(1); initialStream.requestSent(); sendConnectionPreface(); connection.configureMode(Mode.ASYNC); // start reading and writing // start reading AsyncConnection asyncConn = (AsyncConnection)connection; asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown); asyncReceive(connection.getRemaining()); asyncConn.startReading(); } // async style but completes immediately static CompletableFuture createAsync(HttpConnection connection, Http2ClientImpl client2, Exchange exchange) { CompletableFuture cf = new CompletableFuture<>(); try { Http2Connection c = new Http2Connection(connection, client2, exchange); cf.complete(c); } catch (IOException | InterruptedException e) { cf.completeExceptionally(e); } return cf; } /** * Cases 2) 3) * * request is request to be sent. */ Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException { InetSocketAddress proxy = request.proxy(); URI uri = request.uri(); InetSocketAddress addr = Utils.getAddress(request); String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); Log.logTrace(msg); this.key = keyFor(uri, proxy); this.connection = HttpConnection.getConnection(addr, request, this); streams = Collections.synchronizedMap(new HashMap<>()); this.client = request.client(); this.client2 = client.client2(); this.executor = client.executorWrapper(); this.freeList = new LinkedList<>(); this.outputQ = new Queue<>(); nextstreamid = 1; initCommon(); connection.connect(); connection.configureMode(Mode.ASYNC); // start reading AsyncConnection asyncConn = (AsyncConnection)connection; asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown); sendConnectionPreface(); asyncConn.startReading(); } static String keyFor(HttpConnection connection) { boolean isProxy = connection.isProxied(); boolean isSecure = connection.isSecure(); InetSocketAddress addr = connection.address(); return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); } static String keyFor(URI uri, InetSocketAddress proxy) { boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); boolean isProxy = proxy != null; String host; int port; if (isProxy) { host = proxy.getHostString(); port = proxy.getPort(); } else { host = uri.getHost(); port = uri.getPort(); } return keyString(isSecure, isProxy, host, port); } // {C,S}:{H:P}:host:port // C indicates clear text connection "http" // S indicates secure "https" // H indicates host (direct) connection // P indicates proxy // Eg: "S:H:foo.com:80" static String keyString(boolean secure, boolean proxy, String host, int port) { char c1 = secure ? 'S' : 'C'; char c2 = proxy ? 'P' : 'H'; StringBuilder sb = new StringBuilder(128); sb.append(c1).append(':').append(c2).append(':') .append(host).append(':').append(port); return sb.toString(); } String key() { return this.key; } void putConnection() { client2.putConnection(this); } private static String toHexdump1(ByteBuffer bb) { bb.mark(); StringBuilder sb = new StringBuilder(512); Formatter f = new Formatter(sb); while (bb.hasRemaining()) { int i = Byte.toUnsignedInt(bb.get()); f.format("%02x:", i); } sb.deleteCharAt(sb.length()-1); bb.reset(); return sb.toString(); } private static String toHexdump(ByteBuffer bb) { List words = new ArrayList<>(); int i = 0; bb.mark(); while (bb.hasRemaining()) { if (i % 2 == 0) { words.add(""); } byte b = bb.get(); String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); words.set(i / 2, words.get(i / 2) + hex); i++; } bb.reset(); return words.stream().collect(Collectors.joining(" ")); } private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); ByteBuffer[] buffers = frame.getHeaderBlock(); for (int i = 0; i < buffers.length; i++) { hpackIn.decode(buffers[i], endOfHeaders && (i == buffers.length - 1), decoder); } } int getInitialSendWindowSize() { return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE); } void close() { GoAwayFrame f = new GoAwayFrame(); f.setDebugData("Requested by user".getBytes()); // TODO: set last stream. For now zero ok. sendFrame(f); } // BufferHandler methods @Override public ByteBuffer getBuffer(int n) { return client.getBuffer(n); } @Override public void returnBuffer(ByteBuffer buf) { client.returnBuffer(buf); } @Override public void setMinBufferSize(int n) { client.setMinBufferSize(n); } private final Object readlock = new Object(); void asyncReceive(ByteBuffer buffer) { synchronized (readlock) { try { if (reader == null) { reader = new FrameReader(buffer); } else { reader.input(buffer); } while (true) { if (reader.haveFrame()) { List buffers = reader.frame(); ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer); processFrame(bbc); if (bbc.consumed()) { reader = new FrameReader(); return; } else { reader = new FrameReader(reader); } } else return; } } catch (Throwable e) { String msg = Utils.stackTrace(e); Log.logTrace(msg); shutdown(e); } } } void shutdown(Throwable t) { Log.logError(t); closed = true; client2.deleteConnection(this); List c = new LinkedList<>(streams.values()); for (Stream s : c) { s.cancelImpl(t); } connection.close(); } /** * Handles stream 0 (common) frames that apply to whole connection and passes * other stream specific frames to that Stream object. * * Invokes Stream.incoming() which is expected to process frame without * blocking. */ void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException { Http2Frame frame = Http2Frame.readIncoming(bbc); Log.logFrames(frame, "IN"); int streamid = frame.streamid(); if (streamid == 0) { handleCommonFrame(frame); } else { Stream stream = getStream(streamid); if (stream == null) { // should never receive a frame with unknown stream id resetStream(streamid, ResetFrame.PROTOCOL_ERROR); } if (frame instanceof PushPromiseFrame) { PushPromiseFrame pp = (PushPromiseFrame)frame; handlePushPromise(stream, pp); } else if (frame instanceof HeaderFrame) { // decode headers (or continuation) decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); stream.incoming(frame); } else stream.incoming(frame); } } private void handlePushPromise(Stream parent, PushPromiseFrame pp) throws IOException, InterruptedException { HttpRequestImpl parentReq = parent.request; int promisedStreamid = pp.getPromisedStream(); if (promisedStreamid != nextPushStream) { resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); return; } else { nextPushStream += 2; } HeaderDecoder decoder = new HeaderDecoder(); decodeHeaders(pp, decoder); HttpHeadersImpl headers = decoder.headers(); HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); Stream.PushedStream pushStream = createPushStream(parent, pushReq); pushStream.registerStream(promisedStreamid); parent.incoming_pushPromise(pushReq, pushStream); } private void handleCommonFrame(Http2Frame frame) throws IOException, InterruptedException { switch (frame.type()) { case SettingsFrame.TYPE: { SettingsFrame f = (SettingsFrame)frame; handleSettings(f);} break; case PingFrame.TYPE: { PingFrame f = (PingFrame)frame; handlePing(f);} break; case GoAwayFrame.TYPE: { GoAwayFrame f = (GoAwayFrame)frame; handleGoAway(f);} break; case WindowUpdateFrame.TYPE: { WindowUpdateFrame f = (WindowUpdateFrame)frame; handleWindowUpdate(f);} break; default: protocolError(ErrorFrame.PROTOCOL_ERROR); } } void resetStream(int streamid, int code) throws IOException, InterruptedException { Log.logError( "Resetting stream {0,number,integer} with error code {1,number,integer}", streamid, code); ResetFrame frame = new ResetFrame(); frame.streamid(streamid); frame.setErrorCode(code); sendFrame(frame); streams.remove(streamid); } private void handleWindowUpdate(WindowUpdateFrame f) throws IOException, InterruptedException { connectionSendWindow.update(f.getUpdate()); } private void protocolError(int errorCode) throws IOException, InterruptedException { GoAwayFrame frame = new GoAwayFrame(); frame.setErrorCode(errorCode); sendFrame(frame); String msg = "Error code: " + errorCode; shutdown(new IOException("protocol error")); } private void handleSettings(SettingsFrame frame) throws IOException, InterruptedException { if (frame.getFlag(SettingsFrame.ACK)) { // ignore ack frames for now. return; } serverSettings = frame; SettingsFrame ack = getAckFrame(frame.streamid()); sendFrame(ack); } private void handlePing(PingFrame frame) throws IOException, InterruptedException { frame.setFlag(PingFrame.ACK); sendFrame(frame); } private void handleGoAway(GoAwayFrame frame) throws IOException, InterruptedException { //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode())); shutdown(new IOException("GOAWAY received")); } private void initCommon() { clientSettings = client2.getClientSettings(); // serverSettings will be updated by server serverSettings = SettingsFrame.getDefaultSettings(); hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); } /** * Max frame size we are allowed to send */ public int getMaxSendFrameSize() { int param = serverSettings.getParameter(MAX_FRAME_SIZE); if (param == -1) { param = DEFAULT_FRAME_SIZE; } return param; } /** * Max frame size we will receive */ public int getMaxReceiveFrameSize() { return clientSettings.getParameter(MAX_FRAME_SIZE); } // Not sure how useful this is. public int getMaxHeadersSize() { return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); } private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; private static final byte[] PREFACE_BYTES = CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); /** * Sends Connection preface and Settings frame with current preferred * values */ private void sendConnectionPreface() throws IOException { ByteBufferGenerator bg = new ByteBufferGenerator(this); bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES); ByteBuffer[] ba = bg.getBufferArray(); connection.write(ba, 0, ba.length); bg = new ByteBufferGenerator(this); SettingsFrame sf = client2.getClientSettings(); Log.logFrames(sf, "OUT"); sf.writeOutgoing(bg); WindowUpdateFrame wup = new WindowUpdateFrame(); wup.streamid(0); // send a Window update for the receive buffer we are using // minus the initial 64 K specified in protocol wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); wup.computeLength(); wup.writeOutgoing(bg); Log.logFrames(wup, "OUT"); ba = bg.getBufferArray(); connection.write(ba, 0, ba.length); } /** * Returns an existing Stream with given id, or null if doesn't exist */ Stream getStream(int streamid) { return streams.get(streamid); } /** * Creates Stream with given id. */ Stream createStream(Exchange exchange) { Stream stream = new Stream(client, this, exchange); return stream; } Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) { Stream.PushGroup pg = parent.request.pushGroup(); return new Stream.PushedStream(pg, client, this, parent, pushReq); } void putStream(Stream stream, int streamid) { streams.put(streamid, stream); } void deleteStream(Stream stream) { streams.remove(stream.streamid); } static final int MAX_STREAM = Integer.MAX_VALUE - 2; // Number of header bytes in a Headers Frame final static int HEADERS_HEADER_SIZE = 15; // Number of header bytes in a Continuation frame final static int CONTIN_HEADER_SIZE = 9; /** * Encode the headers into a List and then create HEADERS * and CONTINUATION frames from the list and return the List. * * @param frame * @return */ private LinkedList encodeHeaders(OutgoingHeaders frame) { LinkedList buffers = new LinkedList<>(); ByteBuffer buf = getBuffer(); buffers.add(buf); encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers); encodeHeadersImpl(frame.getUserHeaders(), buffers); encodeHeadersImpl(frame.getSystemHeaders(), buffers); for (ByteBuffer b : buffers) { b.flip(); } LinkedList frames = new LinkedList<>(); int maxframesize = getMaxSendFrameSize(); HeadersFrame oframe = new HeadersFrame(); oframe.setFlags(frame.getFlags()); oframe.streamid(frame.streamid()); oframe.setHeaderBlock(getBufferArray(buffers, maxframesize)); frames.add(oframe); // Any buffers left? boolean done = buffers.isEmpty(); if (done) { oframe.setFlag(HeaderFrame.END_HEADERS); } else { ContinuationFrame cf = null; while (!done) { cf = new ContinuationFrame(); cf.streamid(frame.streamid()); cf.setHeaderBlock(getBufferArray(buffers, maxframesize)); frames.add(cf); done = buffers.isEmpty(); } cf.setFlag(HeaderFrame.END_HEADERS); } return frames; } // should always return at least one buffer private static ByteBuffer[] getBufferArray(LinkedList list, int maxsize) { assert maxsize >= BUFSIZE; LinkedList newlist = new LinkedList<>(); int size = list.size(); int nbytes = 0; for (int i=0; i buffers) { ByteBuffer buffer; if (!(buffer = buffers.getLast()).hasRemaining()) { buffer = getBuffer(); buffers.add(buffer); } for (Map.Entry> e : hdrs.map().entrySet()) { String key = e.getKey(); String lkey = key.toLowerCase(); List values = e.getValue(); for (String value : values) { hpackOut.header(lkey, value); boolean encoded = false; do { encoded = hpackOut.encode(buffer); if (!encoded) { buffer = getBuffer(); buffers.add(buffer); } } while (!encoded); } } } public void sendFrames(List frames) throws IOException, InterruptedException { for (Http2Frame frame : frames) { sendFrame(frame); } } static Throwable getExceptionFrom(CompletableFuture cf) { try { cf.get(); return null; } catch (Throwable e) { if (e.getCause() != null) return e.getCause(); else return e; } } void execute(Runnable r) { executor.execute(r, null); } private final Object sendlock = new Object(); /** * */ void sendFrame(Http2Frame frame) { synchronized (sendlock) { try { if (frame instanceof OutgoingHeaders) { OutgoingHeaders oh = (OutgoingHeaders) frame; Stream stream = oh.getStream(); stream.registerStream(nextstreamid); oh.streamid(nextstreamid); nextstreamid += 2; // set outgoing window here. This allows thread sending // body to proceed. stream.updateOutgoingWindow(getInitialSendWindowSize()); LinkedList frames = encodeHeaders(oh); for (Http2Frame f : frames) { sendOneFrame(f); } } else { sendOneFrame(frame); } } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } } } } /** * Send a frame. * * @param frame * @throws IOException */ private void sendOneFrame(Http2Frame frame) throws IOException { ByteBufferGenerator bbg = new ByteBufferGenerator(this); frame.computeLength(); Log.logFrames(frame, "OUT"); frame.writeOutgoing(bbg); ByteBuffer[] currentBufs = bbg.getBufferArray(); connection.write(currentBufs, 0, currentBufs.length); } private SettingsFrame getAckFrame(int streamid) { SettingsFrame frame = new SettingsFrame(); frame.setFlag(SettingsFrame.ACK); frame.streamid(streamid); return frame; } static class HeaderDecoder implements DecodingCallback { HttpHeadersImpl headers; HeaderDecoder() { this.headers = new HttpHeadersImpl(); } @Override public void onDecoded(CharSequence name, CharSequence value) { headers.addHeader(name.toString(), value.toString()); } HttpHeadersImpl headers() { return headers; } } }