< prev index next >
src/java.httpclient/share/classes/java/net/http/Http2Connection.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 22,65 ****
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
import java.io.IOException;
! import java.net.Authenticator;
! import java.net.CookieManager;
! import java.net.ProxySelector;
import java.net.URI;
! import static java.net.http.Utils.BUFSIZE;
import java.nio.ByteBuffer;
! import java.nio.channels.SelectableChannel;
! import java.nio.channels.SelectionKey;
! import static java.nio.channels.SelectionKey.OP_CONNECT;
! import static java.nio.channels.SelectionKey.OP_READ;
! import static java.nio.channels.SelectionKey.OP_WRITE;
! import java.nio.channels.Selector;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
! import java.util.Set;
! import java.util.concurrent.*;
! import java.security.NoSuchAlgorithmException;
! import java.util.ListIterator;
! import java.util.Optional;
! import java.util.concurrent.Executors;
! import java.util.concurrent.ThreadFactory;
! import javax.net.ssl.SSLContext;
! import javax.net.ssl.SSLParameters;
!
! class Http2Connection {
! static CompletableFuture<Http2Connection> createAsync(
! HttpConnection connection, Http2ClientImpl client2, Exchange exchange) {
! return null;
! }
Http2Connection(HttpConnection connection, Http2ClientImpl client2,
Exchange exchange) throws IOException, InterruptedException {
}
! Stream getStream(int i) {return null;}
! Stream createStream(Exchange ex) {return null;}
! void putConnection() {}
}
--- 22,779 ----
* or visit www.oracle.com if you need additional information or have any
*/
package java.net.http;
import java.io.IOException;
! import java.net.InetSocketAddress;
import java.net.URI;
! import java.net.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
! import java.nio.charset.StandardCharsets;
! import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
! import java.util.Map;
! import java.util.concurrent.CompletableFuture;
! import sun.net.httpclient.hpack.Encoder;
! import sun.net.httpclient.hpack.Decoder;
! import static java.net.http.SettingsFrame.*;
! import static java.net.http.Utils.BUFSIZE;
! import java.util.ArrayList;
! import java.util.Collections;
! import java.util.Formatter;
! import java.util.stream.Collectors;
! import sun.net.httpclient.hpack.DecodingCallback;
!
! /**
! * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
! * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
! *
! * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
! * to a HttpClientImpl.
! *
! * Creation cases:
! * 1) upgraded HTTP/1.1 plain tcp connection
! * 2) prior knowledge directly created plain tcp connection
! * 3) directly created HTTP/2 SSL connection which uses ALPN.
! *
! * Sending is done by writing directly to underlying HttpConnection object which
! * is operating in async mode. No flow control applies on output at this level
! * and all writes are just executed as puts to an output Q belonging to HttpConnection
! * Flow control is implemented by HTTP/2 protocol itself.
! *
! * Hpack header compression
! * and outgoing stream creation is also done here, because these operations
! * must be synchronized at the socket level. Stream objects send frames simply
! * by placing them on the connection's output Queue. sendFrame() is called
! * from a higher level (Stream) thread.
! *
! * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
! * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
! * or handles them directly itself. This thread performs hpack decompression
! * and incoming stream creation (Server push). Incoming frames destined for a
! * stream are provided by calling Stream.incoming().
! */
! class Http2Connection implements BufferHandler {
!
! final Queue<Http2Frame> outputQ;
! volatile boolean closed;
!
! //-------------------------------------
! final HttpConnection connection;
! HttpClientImpl client;
! final Http2ClientImpl client2;
! Map<Integer,Stream> streams;
! int nextstreamid = 3; // stream 1 is registered separately
! int nextPushStream = 2;
! Encoder hpackOut;
! Decoder hpackIn;
! SettingsFrame clientSettings, serverSettings;
! ByteBufferConsumer bbc;
! final LinkedList<ByteBuffer> freeList;
! final String key; // for HttpClientImpl.connections map
! FrameReader reader;
+ // Connection level flow control windows
+ int sendWindow = INITIAL_WINDOW_SIZE;
+
+ final static int DEFAULT_FRAME_SIZE = 16 * 1024;
+ private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
+
+ final ExecutorWrapper executor;
+
+ /**
+ * This is established by the protocol spec and the peer will update it with
+ * WINDOW_UPDATEs, which affects the sendWindow.
+ */
+ final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
+
+ // TODO: need list of control frames from other threads
+ // that need to be sent
+
+ /**
+ * Case 1) Create from upgraded HTTP/1.1 connection.
+ * Is ready to use. Will not be SSL. exchange is the Exchange
+ * that initiated the connection, whose response will be delivered
+ * on a Stream.
+ */
Http2Connection(HttpConnection connection, Http2ClientImpl client2,
Exchange exchange) throws IOException, InterruptedException {
+ this.outputQ = new Queue<>();
+ String msg = "Connection send window size " + Integer.toString(sendWindow);
+ Log.logTrace(msg);
+
+ //this.initialExchange = exchange;
+ assert !(connection instanceof SSLConnection);
+ this.connection = connection;
+ this.client = client2.client;
+ this.client2 = client2;
+ this.executor = client.executorWrapper();
+ this.freeList = new LinkedList<>();
+ this.key = keyFor(connection);
+ streams = Collections.synchronizedMap(new HashMap<>());
+ initCommon();
+ //sendConnectionPreface();
+ Stream initialStream = createStream(exchange);
+ initialStream.registerStream(1);
+ initialStream.requestSent();
+ sendConnectionPreface();
+ connection.configureMode(Mode.ASYNC);
+ // start reading and writing
+ // start reading
+ AsyncConnection asyncConn = (AsyncConnection)connection;
+ asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ asyncReceive(connection.getRemaining());
+ }
+
+ // async style but completes immediately
+ static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
+ Http2ClientImpl client2, Exchange exchange) {
+ CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+ try {
+ Http2Connection c = new Http2Connection(connection, client2, exchange);
+ cf.complete(c);
+ } catch (IOException | InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ return cf;
+ }
+
+ /**
+ * Cases 2) 3)
+ *
+ * request is request to be sent.
+ */
+ Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
+ InetSocketAddress proxy = request.proxy();
+ URI uri = request.uri();
+ InetSocketAddress addr = Utils.getAddress(request);
+ String msg = "Connection send window size " + Integer.toString(sendWindow);
+ Log.logTrace(msg);
+ this.key = keyFor(uri, proxy);
+ this.connection = HttpConnection.getConnection(addr, request, this);
+ streams = Collections.synchronizedMap(new HashMap<>());
+ this.client = request.client();
+ this.client2 = client.client2();
+ this.executor = client.executorWrapper();
+ this.freeList = new LinkedList<>();
+ this.outputQ = new Queue<>();
+ nextstreamid = 1;
+ initCommon();
+ connection.connect();
+ connection.configureMode(Mode.ASYNC);
+ // start reading
+ AsyncConnection asyncConn = (AsyncConnection)connection;
+ asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ sendConnectionPreface();
+ }
+
+ // NEW
+ synchronized void obtainSendWindow(int amount) throws InterruptedException {
+ while (amount > 0) {
+ int n = Math.min(amount, sendWindow);
+ sendWindow -= n;
+ amount -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
+
+ synchronized void updateSendWindow(int amount) {
+ if (sendWindow == 0) {
+ sendWindow += amount;
+ notifyAll();
+ } else
+ sendWindow += amount;
+ }
+
+ synchronized int sendWindow() {
+ return sendWindow;
+ }
+
+ static String keyFor(HttpConnection connection) {
+ boolean isProxy = connection.isProxied();
+ boolean isSecure = connection.isSecure();
+ InetSocketAddress addr = connection.address();
+
+ return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
+ }
+
+ static String keyFor(URI uri, InetSocketAddress proxy) {
+ boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
+ boolean isProxy = proxy != null;
+
+ String host;
+ int port;
+
+ if (isProxy) {
+ host = proxy.getHostString();
+ port = proxy.getPort();
+ } else {
+ host = uri.getHost();
+ port = uri.getPort();
+ }
+ return keyString(isSecure, isProxy, host, port);
+ }
+
+ // {C,S}:{H:P}:host:port
+ // C indicates clear text connection "http"
+ // S indicates secure "https"
+ // H indicates host (direct) connection
+ // P indicates proxy
+ // Eg: "S:H:foo.com:80"
+ static String keyString(boolean secure, boolean proxy, String host, int port) {
+ char c1 = secure ? 'S' : 'C';
+ char c2 = proxy ? 'P' : 'H';
+
+ StringBuilder sb = new StringBuilder(128);
+ sb.append(c1).append(':').append(c2).append(':')
+ .append(host).append(':').append(port);
+ return sb.toString();
+ }
+
+ String key() {
+ return this.key;
+ }
+
+ void putConnection() {
+ client2.putConnection(this);
+ }
+
+ private static String toHexdump1(ByteBuffer bb) {
+ bb.mark();
+ StringBuilder sb = new StringBuilder(512);
+ Formatter f = new Formatter(sb);
+
+ while (bb.hasRemaining()) {
+ int i = Byte.toUnsignedInt(bb.get());
+ f.format("%02x:", i);
+ }
+ sb.deleteCharAt(sb.length()-1);
+ bb.reset();
+ return sb.toString();
+ }
+
+ private static String toHexdump(ByteBuffer bb) {
+ List<String> words = new ArrayList<>();
+ int i = 0;
+ bb.mark();
+ while (bb.hasRemaining()) {
+ if (i % 2 == 0) {
+ words.add("");
+ }
+ byte b = bb.get();
+ String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
+ words.set(i / 2, words.get(i / 2) + hex);
+ i++;
+ }
+ bb.reset();
+ return words.stream().collect(Collectors.joining(" "));
+ }
+
+ private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
+ boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
+
+ ByteBuffer[] buffers = frame.getHeaderBlock();
+ for (int i = 0; i < buffers.length; i++) {
+ hpackIn.decode(buffers[i], endOfHeaders, decoder);
+ }
+ }
+
+ int getInitialSendWindowSize() {
+ return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
+ }
+
+ void close() {
+ GoAwayFrame f = new GoAwayFrame();
+ f.setDebugData("Requested by user".getBytes());
+ // TODO: set last stream. For now zero ok.
+ sendFrame(f);
+ }
+
+ // BufferHandler methods
+
+ @Override
+ public ByteBuffer getBuffer(int n) {
+ return client.getBuffer(n);
+ }
+
+ @Override
+ public void returnBuffer(ByteBuffer buf) {
+ client.returnBuffer(buf);
+ }
+
+ @Override
+ public void setMinBufferSize(int n) {
+ client.setMinBufferSize(n);
+ }
+
+ private final Object readlock = new Object();
+
+ void asyncReceive(ByteBuffer buffer) {
+ synchronized (readlock) {
+ try {
+ if (reader == null) {
+ reader = new FrameReader(buffer);
+ } else {
+ reader.input(buffer);
+ }
+ while (true) {
+ if (reader.haveFrame()) {
+ List<ByteBuffer> buffers = reader.frame();
+
+ ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
+ processFrame(bbc);
+ if (bbc.consumed()) {
+ reader = new FrameReader();
+ return;
+ } else {
+ reader = new FrameReader(reader);
+ }
+ } else
+ return;
+ }
+ } catch (Throwable e) {
+ String msg = Utils.stackTrace(e);
+ Log.logTrace(msg);
+ shutdown(e);
+ }
+ }
+ }
+
+ void shutdown(Throwable t) {
+ System.err.println("Shutdown: " + t);
+ t.printStackTrace();
+ closed = true;
+ client2.deleteConnection(this);
+ Collection<Stream> c = streams.values();
+ for (Stream s : c) {
+ s.cancelImpl(t);
+ }
+ connection.close();
+ }
+
+ /**
+ * Handles stream 0 (common) frames that apply to whole connection and passes
+ * other stream specific frames to that Stream object.
+ *
+ * Invokes Stream.incoming() which is expected to process frame without
+ * blocking.
+ *
+ * @param bbc
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
+ Http2Frame frame = Http2Frame.readIncoming(bbc);
+ Log.logFrames(frame, "IN");
+ int streamid = frame.streamid();
+ if (streamid == 0) {
+ handleCommonFrame(frame);
+ } else {
+ Stream stream = getStream(streamid);
+ if (stream == null) {
+ // should never receive a frame with unknown stream id
+ resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+ }
+ if (frame instanceof PushPromiseFrame) {
+ PushPromiseFrame pp = (PushPromiseFrame)frame;
+ handlePushPromise(stream, pp);
+ } else if (frame instanceof HeaderFrame) {
+ // decode headers (or continuation)
+ decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
+ stream.incoming(frame);
+ } else
+ stream.incoming(frame);
+ }
+ }
+
+ private void handlePushPromise(Stream parent, PushPromiseFrame pp) throws IOException, InterruptedException {
+ HttpRequestImpl parentReq = parent.request;
+ int promisedStreamid = pp.getPromisedStream();
+ if (promisedStreamid != nextPushStream) {
+ resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
+ return;
+ } else {
+ nextPushStream += 2;
+ }
+ HeaderDecoder decoder = new HeaderDecoder();
+ decodeHeaders(pp, decoder);
+ HttpHeadersImpl headers = decoder.headers();
+ HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
+
+ Stream.PushedStream pushStream = createPushStream(parent, pushReq);
+ pushStream.registerStream(promisedStreamid);
+ parent.incoming_pushPromise(pushReq, pushStream);
+ }
+
+ private void handleCommonFrame(Http2Frame frame) throws IOException, InterruptedException {
+ switch (frame.type()) {
+ case SettingsFrame.TYPE:
+ { SettingsFrame f = (SettingsFrame)frame;
+ handleSettings(f);}
+ break;
+ case PingFrame.TYPE:
+ { PingFrame f = (PingFrame)frame;
+ handlePing(f);}
+ break;
+ case GoAwayFrame.TYPE:
+ { GoAwayFrame f = (GoAwayFrame)frame;
+ handleGoAway(f);}
+ break;
+ case WindowUpdateFrame.TYPE:
+ { WindowUpdateFrame f = (WindowUpdateFrame)frame;
+ handleWindowUpdate(f);}
+ break;
+ default:
+ protocolError(ErrorFrame.PROTOCOL_ERROR);
+ }
+ }
+
+ void resetStream(int streamid, int code) throws IOException, InterruptedException {
+ Log.logError(
+ "Resetting stream {0,number,integer} with error code {1,number,integer}",
+ streamid, code);
+ ResetFrame frame = new ResetFrame();
+ frame.streamid(streamid);
+ frame.setErrorCode(code);
+ sendFrame(frame);
+ streams.remove(streamid);
}
! private void handleWindowUpdate(WindowUpdateFrame f) throws IOException, InterruptedException {
! updateSendWindow(f.getUpdate());
! }
!
! private void protocolError(int errorCode) throws IOException, InterruptedException {
! GoAwayFrame frame = new GoAwayFrame();
! frame.setErrorCode(errorCode);
! sendFrame(frame);
! String msg = "Error code: " + errorCode;
! shutdown(new IOException("protocol error"));
! }
!
! private void handleSettings(SettingsFrame frame) throws IOException, InterruptedException {
! if (frame.getFlag(SettingsFrame.ACK)) {
! // ignore ack frames for now.
! return;
! }
! serverSettings = frame;
! SettingsFrame ack = getAckFrame(frame.streamid());
! sendFrame(ack);
! }
!
! private void handlePing(PingFrame frame) throws IOException, InterruptedException {
! frame.setFlag(PingFrame.ACK);
! sendFrame(frame);
! }
!
! private void handleGoAway(GoAwayFrame frame) throws IOException, InterruptedException {
! //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
! shutdown(new IOException("GOAWAY received"));
! }
!
! private void initCommon() {
! clientSettings = client2.getClientSettings();
!
! // serverSettings will be updated by server
! serverSettings = SettingsFrame.getDefaultSettings();
! hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
! hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
! }
!
! /**
! * Max frame size we are allowed to send
! */
! public int getMaxSendFrameSize() {
! int param = serverSettings.getParameter(MAX_FRAME_SIZE);
! if (param == -1) {
! param = DEFAULT_FRAME_SIZE;
! }
! return param;
! }
!
! /**
! * Max frame size we will receive
! */
! public int getMaxReceiveFrameSize() {
! return clientSettings.getParameter(MAX_FRAME_SIZE);
! }
!
! // Not sure how useful this is.
! public int getMaxHeadersSize() {
! return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
! }
!
! private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
!
! private static final byte[] PREFACE_BYTES =
! CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
!
! /**
! * Sends Connection preface and Settings frame with current preferred
! * values
! */
! private void sendConnectionPreface() throws IOException {
! SettingsFrame sf = client2.getClientSettings();
! ByteBufferGenerator bg = new ByteBufferGenerator(this);
! bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
! sf.writeOutgoing(bg);
! Log.logFrames(sf, "OUT");
! WindowUpdateFrame wup = new WindowUpdateFrame();
! wup.streamid(0);
! // send a Window update for the receive buffer we are using
! // minus the initial 64 K specified in protocol
! wup.setUpdate(client2.client.getReceiveBufferSize() - (64 * 1024 - 1));
! wup.setLength();
! wup.writeOutgoing(bg);
! Log.logFrames(wup, "OUT");
! ByteBuffer[] ba = bg.getBufferArray();
! connection.write(ba, 0, ba.length);
! }
!
! /**
! * Returns an existing Stream with given id, or null if doesn't exist
! */
! Stream getStream(int streamid) {
! return streams.get(streamid);
! }
!
! /**
! * Creates Stream with given id.
! */
! Stream createStream(Exchange exchange) {
! Stream stream = new Stream(client, this, exchange);
! return stream;
! }
!
! Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
! Stream.PushGroup<?> pg = parent.request.pushGroup();
! return new Stream.PushedStream(pg, client, this, parent, pushReq);
! }
!
! void putStream(Stream stream, int streamid) {
! streams.put(streamid, stream);
! }
!
! void deleteStream(Stream stream) {
! streams.remove(stream.streamid);
! }
!
! static final int MAX_STREAM = Integer.MAX_VALUE - 2;
!
! // Number of header bytes in a Headers Frame
! final static int HEADERS_HEADER_SIZE = 15;
!
! // Number of header bytes in a Continuation frame
! final static int CONTIN_HEADER_SIZE = 9;
!
! /**
! * Encode the headers into a List<ByteBuffer> and then create HEADERS
! * and CONTINUATION frames from the list and return the List<Http2Frame>.
! *
! * @param frame
! * @return
! */
! private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
! LinkedList<ByteBuffer> buffers = new LinkedList<>();
! ByteBuffer buf = getBuffer();
! buffers.add(buf);
! encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
! encodeHeadersImpl(frame.getUserHeaders(), buffers);
! encodeHeadersImpl(frame.getSystemHeaders(), buffers);
!
! for (ByteBuffer b : buffers) {
! b.flip();
! }
!
! LinkedList<Http2Frame> frames = new LinkedList<>();
! int maxframesize = getMaxSendFrameSize();
!
! HeadersFrame oframe = new HeadersFrame();
! oframe.setFlags(frame.getFlags());
! oframe.streamid(frame.streamid());
!
! oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
! frames.add(oframe);
! // Any buffers left?
! boolean done = buffers.isEmpty();
! if (done) {
! oframe.setFlag(HeaderFrame.END_HEADERS);
! } else {
! ContinuationFrame cf = null;
! while (!done) {
! cf = new ContinuationFrame();
! cf.streamid(frame.streamid());
! cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
! frames.add(cf);
! done = buffers.isEmpty();
! }
! cf.setFlag(HeaderFrame.END_HEADERS);
! }
! return frames;
! }
!
! // should always return at least one buffer
! private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
! assert maxsize >= BUFSIZE;
! LinkedList<ByteBuffer> newlist = new LinkedList<>();
! int size = list.size();
! int nbytes = 0;
! for (int i=0; i<size; i++) {
! ByteBuffer buf = list.getFirst();
! if (nbytes + buf.remaining() <= maxsize) {
! nbytes += buf.remaining();
! newlist.add(buf);
! list.remove();
! } else {
! break;
! }
! }
! return newlist.toArray(empty);
! }
!
! /**
! * Encode all the headers from the given HttpHeadersImpl into the given List.
! */
! private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
! ByteBuffer buffer;
! if (!(buffer = buffers.getLast()).hasRemaining()) {
! buffer = getBuffer();
! buffers.add(buffer);
! }
! for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
! String key = e.getKey();
! String lkey = key.toLowerCase();
! List<String> values = e.getValue();
! for (String value : values) {
! hpackOut.header(lkey, value);
! boolean encoded = false;
! do {
! encoded = hpackOut.encode(buffer);
! if (!encoded) {
! buffer = getBuffer();
! buffers.add(buffer);
! }
! } while (!encoded);
! }
! }
! }
!
! public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
! for (Http2Frame frame : frames) {
! sendFrame(frame);
! }
! }
!
! static Throwable getExceptionFrom(CompletableFuture<?> cf) {
! try {
! cf.get();
! return null;
! } catch (Throwable e) {
! if (e.getCause() != null)
! return e.getCause();
! else
! return e;
! }
! }
!
!
! void execute(Runnable r) {
! executor.execute(r, null);
! }
!
! private final Object sendlock = new Object();
!
! /**
! *
! */
! void sendFrame(Http2Frame frame) {
! synchronized (sendlock) {
! try {
! if (frame instanceof OutgoingHeaders) {
! OutgoingHeaders oh = (OutgoingHeaders) frame;
! Stream stream = oh.getStream();
! stream.registerStream(nextstreamid);
! oh.streamid(nextstreamid);
! nextstreamid += 2;
! // set outgoing window here. This allows thread sending
! // body to proceed.
! stream.updateOutgoingWindow(getInitialSendWindowSize());
! LinkedList<Http2Frame> frames = encodeHeaders(oh);
! for (Http2Frame f : frames) {
! sendOneFrame(f);
! }
! } else {
! sendOneFrame(frame);
! }
!
! } catch (IOException e) {
! if (!closed) {
! Log.logError(e);
! shutdown(e);
! }
! }
! }
! }
!
! /**
! * Send a frame.
! *
! * @param frame
! * @throws IOException
! */
! private void sendOneFrame(Http2Frame frame) throws IOException {
! ByteBufferGenerator bbg = new ByteBufferGenerator(this);
! frame.setLength();
! Log.logFrames(frame, "OUT");
! frame.writeOutgoing(bbg);
! ByteBuffer[] currentBufs = bbg.getBufferArray();
! connection.write(currentBufs, 0, currentBufs.length);
! }
!
!
! private SettingsFrame getAckFrame(int streamid) {
! SettingsFrame frame = new SettingsFrame();
! frame.setFlag(SettingsFrame.ACK);
! frame.streamid(streamid);
! return frame;
! }
!
! static class HeaderDecoder implements DecodingCallback {
! HttpHeadersImpl headers;
!
! HeaderDecoder() {
! this.headers = new HttpHeadersImpl();
! }
!
! @Override
! public void onDecoded(CharSequence name, CharSequence value) {
! headers.addHeader(name.toString(), value.toString());
! }
!
! HttpHeadersImpl headers() {
! return headers;
! }
! }
}
< prev index next >