< prev index next >
src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java
Print this page
rev 15335 : Async Queues
@@ -48,23 +48,30 @@
class PlainHttpConnection extends HttpConnection implements AsyncConnection {
protected SocketChannel chan;
private volatile boolean connected;
private boolean closed;
- Consumer<ByteBuffer> asyncReceiver;
- Consumer<Throwable> errorReceiver;
- Queue<ByteBuffer> asyncOutputQ;
+
+ // should be volatile to provide proper synchronization action
+ volatile Consumer<ByteBuffer> asyncReceiver;
+
+ // should be volatile to provide proper synchronization action
+ volatile Consumer<Throwable> errorReceiver;
+
+ // asyncOutputQ may be non volatile if and only if the following conditions are satisfied:
+ // 1. Writing to the field is performed BEFORE setting field HttpConnection.mode (which is volatile) to ASYNC.
+ // That provides proper "release" operation.
+ // 2. Reading from the field is performed AFTER checking that HttpConnection.mode set to ASYNC.
+ // That provides proper "acquire" action.
+ AsyncWriteQueue<ByteBuffer[]> asyncOutputQ;
+
final Object reading = new Object();
- final Object writing = new Object();
+
@Override
public void startReading() {
- try {
client.registerEvent(new ReadEvent());
- } catch (IOException e) {
- shutdown();
- }
}
class ConnectEvent extends AsyncEvent {
CompletableFuture<Void> cf;
@@ -139,64 +146,75 @@
@Override
long write(ByteBuffer[] buffers, int start, int number) throws IOException {
if (mode != Mode.ASYNC)
return chan.write(buffers, start, number);
// async
- synchronized(writing) {
- int qlen = asyncOutputQ.size();
- ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
- long n = Utils.remaining(bufs);
- asyncOutputQ.putAll(bufs);
- if (qlen == 0)
- asyncOutput();
+ buffers = Utils.reduce(buffers, start, number);
+ long n = Utils.remaining(buffers);
+ asyncOutputQ.put(buffers);
+ flushAsync();
return n;
}
+
+ @Override
+ long write(ByteBuffer buffer) throws IOException {
+ if (mode != Mode.ASYNC)
+ return chan.write(buffer);
+ // async
+ long n = buffer.remaining();
+ asyncOutputQ.put(new ByteBuffer[]{buffer});
+ flushAsync();
+ return n;
}
- ByteBuffer asyncBuffer = null;
+ // handle registered WriteEvent; invoked from SelectorManager thread
+ void flushRegistered() {
+ if (mode == Mode.ASYNC) {
+ asyncOutputQ.flushDelayed();
+ }
+ }
- void asyncOutput() {
- synchronized (writing) {
- try {
- while (true) {
- if (asyncBuffer == null) {
- asyncBuffer = asyncOutputQ.poll();
- if (asyncBuffer == null) {
- return;
+ @Override
+ public void writeAsync(ByteBuffer[] buffers) throws IOException {
+ if (mode != Mode.ASYNC) {
+ write(buffers, 0, buffers.length);
+ } else {
+ asyncOutputQ.put(buffers);
}
}
- if (!asyncBuffer.hasRemaining()) {
- asyncBuffer = null;
- continue;
+
+ @Override
+ public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException {
+ if (mode != Mode.ASYNC) {
+ write(buffers, 0, buffers.length);
+ } else {
+ // Unordered frames are sent before existing frames.
+ asyncOutputQ.putFirst(buffers);
+ }
+ }
+
+ @Override
+ public void flushAsync() {
+ if (mode == Mode.ASYNC) {
+ asyncOutputQ.flush();
}
- int n = chan.write(asyncBuffer);
- //System.err.printf("Written %d bytes to chan\n", n);
+ }
+
+ void asyncOutput(ByteBuffer[] bufs, Consumer<ByteBuffer[]> setDelayCallback) {
+ try {
+ while (Utils.remaining(bufs) > 0) {
+ long n = chan.write(bufs);
if (n == 0) {
+ setDelayCallback.accept(bufs);
client.registerEvent(new WriteEvent());
return;
}
}
} catch (IOException e) {
shutdown();
}
}
- }
-
- @Override
- long write(ByteBuffer buffer) throws IOException {
- if (mode != Mode.ASYNC)
- return chan.write(buffer);
- // async
- synchronized(writing) {
- int qlen = asyncOutputQ.size();
- long n = buffer.remaining();
- asyncOutputQ.put(buffer);
- if (qlen == 0)
- asyncOutput();
- return n;
- }
- }
@Override
public String toString() {
return "PlainHttpConnection: " + super.toString();
}
@@ -303,11 +321,11 @@
return SelectionKey.OP_WRITE;
}
@Override
public void handle() {
- asyncOutput();
+ flushRegistered();
}
@Override
public void abort() {
shutdown();
@@ -384,20 +402,15 @@
@Override
public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
Consumer<Throwable> errorReceiver) {
this.asyncReceiver = asyncReceiver;
this.errorReceiver = errorReceiver;
- asyncOutputQ = new Queue<>();
- asyncOutputQ.registerPutCallback(this::asyncOutput);
+ this.asyncOutputQ = new AsyncWriteQueue<>(this::asyncOutput);
}
@Override
CompletableFuture<Void> whenReceivingResponse() {
CompletableFuture<Void> cf = new CompletableFuture<>();
- try {
client.registerEvent(new ReceiveResponseEvent(cf));
- } catch (IOException e) {
- cf.completeExceptionally(e);
- }
return cf;
}
}
< prev index next >