< prev index next >
src/java.httpclient/share/classes/java/net/http/Http2Connection.java
Print this page
rev 15333 : JDK-8162497 fix obtainSendWindow deadlock
rev 15334 : JDK-8161004 bulk sendWindowUpdate
rev 15335 : Async Queues
@@ -29,11 +29,10 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -76,39 +75,39 @@
* and incoming stream creation (Server push). Incoming frames destined for a
* stream are provided by calling Stream.incoming().
*/
class Http2Connection implements BufferHandler {
- final Queue<Http2Frame> outputQ;
volatile boolean closed;
//-------------------------------------
final HttpConnection connection;
+ final AsyncConnection connectionAsync;
HttpClientImpl client;
final Http2ClientImpl client2;
Map<Integer,Stream> streams;
int nextstreamid = 3; // stream 1 is registered separately
int nextPushStream = 2;
Encoder hpackOut;
Decoder hpackIn;
SettingsFrame clientSettings, serverSettings;
- ByteBufferConsumer bbc;
final LinkedList<ByteBuffer> freeList;
final String key; // for HttpClientImpl.connections map
FrameReader reader;
// Connection level flow control windows
- int sendWindow = INITIAL_WINDOW_SIZE;
+ final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE);
final static int DEFAULT_FRAME_SIZE = 16 * 1024;
private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
final ExecutorWrapper executor;
+ WindowUpdateSender windowUpdater;
/**
* This is established by the protocol spec and the peer will update it with
- * WINDOW_UPDATEs, which affects the sendWindow.
+ * WINDOW_UPDATEs, which affects the connectionSendWindow.
*/
final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
// TODO: need list of control frames from other threads
// that need to be sent
@@ -119,17 +118,17 @@
* that initiated the connection, whose response will be delivered
* on a Stream.
*/
Http2Connection(HttpConnection connection, Http2ClientImpl client2,
Exchange exchange) throws IOException, InterruptedException {
- this.outputQ = new Queue<>();
- String msg = "Connection send window size " + Integer.toString(sendWindow);
+ String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
Log.logTrace(msg);
//this.initialExchange = exchange;
assert !(connection instanceof SSLConnection);
this.connection = connection;
+ this.connectionAsync = (AsyncConnection)connection;
this.client = client2.client();
this.client2 = client2;
this.executor = client.executorWrapper();
this.freeList = new LinkedList<>();
this.key = keyFor(connection);
@@ -138,17 +137,16 @@
//sendConnectionPreface();
Stream initialStream = createStream(exchange);
initialStream.registerStream(1);
initialStream.requestSent();
sendConnectionPreface();
- connection.configureMode(Mode.ASYNC);
// start reading and writing
// start reading
- AsyncConnection asyncConn = (AsyncConnection)connection;
- asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
asyncReceive(connection.getRemaining());
- asyncConn.startReading();
+ connectionAsync.startReading();
}
// async style but completes immediately
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2, Exchange exchange) {
@@ -169,52 +167,29 @@
*/
Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
InetSocketAddress proxy = request.proxy();
URI uri = request.uri();
InetSocketAddress addr = Utils.getAddress(request);
- String msg = "Connection send window size " + Integer.toString(sendWindow);
+ String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
Log.logTrace(msg);
this.key = keyFor(uri, proxy);
this.connection = HttpConnection.getConnection(addr, request, this);
+ this.connectionAsync = (AsyncConnection)connection;
streams = Collections.synchronizedMap(new HashMap<>());
this.client = request.client();
this.client2 = client.client2();
this.executor = client.executorWrapper();
this.freeList = new LinkedList<>();
- this.outputQ = new Queue<>();
nextstreamid = 1;
initCommon();
connection.connect();
- connection.configureMode(Mode.ASYNC);
- // start reading
- AsyncConnection asyncConn = (AsyncConnection)connection;
- asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
sendConnectionPreface();
- asyncConn.startReading();
- }
-
- // NEW
- synchronized void obtainSendWindow(int amount) throws InterruptedException {
- while (amount > 0) {
- int n = Math.min(amount, sendWindow);
- sendWindow -= n;
- amount -= n;
- if (amount > 0)
- wait();
- }
- }
-
- synchronized void updateSendWindow(int amount) {
- if (sendWindow == 0) {
- sendWindow += amount;
- notifyAll();
- } else
- sendWindow += amount;
- }
+ // start reading
+ connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
- synchronized int sendWindow() {
- return sendWindow;
+ connectionAsync.startReading();
}
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
@@ -464,11 +439,11 @@
streams.remove(streamid);
}
private void handleWindowUpdate(WindowUpdateFrame f)
throws IOException, InterruptedException {
- updateSendWindow(f.getUpdate());
+ connectionSendWindow.update(f.getUpdate());
}
private void protocolError(int errorCode)
throws IOException, InterruptedException {
GoAwayFrame frame = new GoAwayFrame();
@@ -490,11 +465,11 @@
}
private void handlePing(PingFrame frame)
throws IOException, InterruptedException {
frame.setFlag(PingFrame.ACK);
- sendFrame(frame);
+ sendUnorderedFrame(frame);
}
private void handleGoAway(GoAwayFrame frame)
throws IOException, InterruptedException {
//System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
@@ -506,10 +481,17 @@
// serverSettings will be updated by server
serverSettings = SettingsFrame.getDefaultSettings();
hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
+
+ windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) {
+ @Override
+ int getStreamId() {
+ return 0;
+ }
+ };
}
/**
* Max frame size we are allowed to send
*/
@@ -543,27 +525,18 @@
* values
*/
private void sendConnectionPreface() throws IOException {
ByteBufferGenerator bg = new ByteBufferGenerator(this);
bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
- ByteBuffer[] ba = bg.getBufferArray();
- connection.write(ba, 0, ba.length);
-
- bg = new ByteBufferGenerator(this);
SettingsFrame sf = client2.getClientSettings();
Log.logFrames(sf, "OUT");
sf.writeOutgoing(bg);
- WindowUpdateFrame wup = new WindowUpdateFrame();
- wup.streamid(0);
+ ByteBuffer[] ba = bg.getBufferArray();
+ connection.write(ba, 0, ba.length); // write is performed before switch to async mode
// send a Window update for the receive buffer we are using
// minus the initial 64 K specified in protocol
- wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
- wup.computeLength();
- wup.writeOutgoing(bg);
- Log.logFrames(wup, "OUT");
- ba = bg.getBufferArray();
- connection.write(ba, 0, ba.length);
+ windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
}
/**
* Returns an existing Stream with given id, or null if doesn't exist
*/
@@ -690,16 +663,10 @@
} while (!encoded);
}
}
}
- public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
- for (Http2Frame frame : frames) {
- sendFrame(frame);
- }
- }
-
static Throwable getExceptionFrom(CompletableFuture<?> cf) {
try {
cf.get();
return null;
} catch (Throwable e) {
@@ -721,51 +688,85 @@
*
*/
void sendFrame(Http2Frame frame) {
synchronized (sendlock) {
try {
+ ByteBuffer[] bufs;
if (frame instanceof OutgoingHeaders) {
OutgoingHeaders oh = (OutgoingHeaders) frame;
Stream stream = oh.getStream();
stream.registerStream(nextstreamid);
oh.streamid(nextstreamid);
nextstreamid += 2;
// set outgoing window here. This allows thread sending
// body to proceed.
stream.updateOutgoingWindow(getInitialSendWindowSize());
- LinkedList<Http2Frame> frames = encodeHeaders(oh);
- for (Http2Frame f : frames) {
- sendOneFrame(f);
- }
+ bufs = encodeFrames(encodeHeaders(oh));
} else {
- sendOneFrame(frame);
+ bufs = encodeFrame(frame);
}
+ connectionAsync.writeAsync(bufs);
} catch (IOException e) {
if (!closed) {
Log.logError(e);
shutdown(e);
}
+ return;
}
}
+ connectionAsync.flushAsync();
}
- /**
- * Send a frame.
- *
- * @param frame
- * @throws IOException
- */
- private void sendOneFrame(Http2Frame frame) throws IOException {
+ private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException {
ByteBufferGenerator bbg = new ByteBufferGenerator(this);
frame.computeLength();
Log.logFrames(frame, "OUT");
frame.writeOutgoing(bbg);
- ByteBuffer[] currentBufs = bbg.getBufferArray();
- connection.write(currentBufs, 0, currentBufs.length);
+ return bbg.getBufferList().toArray(new ByteBuffer[0]);
}
+ private ByteBuffer[] encodeFrames(List<Http2Frame> frames) throws IOException {
+ List<ByteBuffer> bufs = new ArrayList<>();
+ for(Http2Frame frame : frames) {
+ ByteBufferGenerator bbg = new ByteBufferGenerator(this);
+ frame.computeLength();
+ Log.logFrames(frame, "OUT");
+ frame.writeOutgoing(bbg);
+ bufs.addAll(bbg.getBufferList());
+ }
+ return bufs.toArray(new ByteBuffer[0]);
+ }
+
+ void sendDataFrame(DataFrame frame) {
+ try {
+ connectionAsync.writeAsync(encodeFrame(frame));
+ connectionAsync.flushAsync();
+ } catch (IOException e) {
+ if (!closed) {
+ Log.logError(e);
+ shutdown(e);
+ }
+ }
+ }
+
+ /*
+ * Direct call of the method bypasses synchronization on "sendlock" and
+ * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
+ * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
+ */
+ void sendUnorderedFrame(Http2Frame frame){
+ try {
+ connectionAsync.writeAsyncUnordered(encodeFrame(frame));
+ connectionAsync.flushAsync();
+ } catch (IOException e) {
+ if (!closed) {
+ Log.logError(e);
+ shutdown(e);
+ }
+ }
+ }
private SettingsFrame getAckFrame(int streamid) {
SettingsFrame frame = new SettingsFrame();
frame.setFlag(SettingsFrame.ACK);
frame.streamid(streamid);
< prev index next >