< prev index next >
src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java
Print this page
rev 15335 : Async Queues
*** 48,70 ****
class PlainHttpConnection extends HttpConnection implements AsyncConnection {
protected SocketChannel chan;
private volatile boolean connected;
private boolean closed;
! Consumer<ByteBuffer> asyncReceiver;
! Consumer<Throwable> errorReceiver;
! Queue<ByteBuffer> asyncOutputQ;
final Object reading = new Object();
! final Object writing = new Object();
@Override
public void startReading() {
- try {
client.registerEvent(new ReadEvent());
- } catch (IOException e) {
- shutdown();
- }
}
class ConnectEvent extends AsyncEvent {
CompletableFuture<Void> cf;
--- 48,77 ----
class PlainHttpConnection extends HttpConnection implements AsyncConnection {
protected SocketChannel chan;
private volatile boolean connected;
private boolean closed;
!
! // should be volatile to provide proper synchronization action
! volatile Consumer<ByteBuffer> asyncReceiver;
!
! // should be volatile to provide proper synchronization action
! volatile Consumer<Throwable> errorReceiver;
!
! // asyncOutputQ may be non volatile if and only if the following conditions are satisfied:
! // 1. Writing to the field is performed BEFORE setting field HttpConnection.mode (which is volatile) to ASYNC.
! // That provides proper "release" operation.
! // 2. Reading from the field is performed AFTER checking that HttpConnection.mode set to ASYNC.
! // That provides proper "acquire" action.
! AsyncWriteQueue<ByteBuffer[]> asyncOutputQ;
!
final Object reading = new Object();
!
@Override
public void startReading() {
client.registerEvent(new ReadEvent());
}
class ConnectEvent extends AsyncEvent {
CompletableFuture<Void> cf;
*** 139,202 ****
@Override
long write(ByteBuffer[] buffers, int start, int number) throws IOException {
if (mode != Mode.ASYNC)
return chan.write(buffers, start, number);
// async
! synchronized(writing) {
! int qlen = asyncOutputQ.size();
! ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
! long n = Utils.remaining(bufs);
! asyncOutputQ.putAll(bufs);
! if (qlen == 0)
! asyncOutput();
return n;
}
}
! ByteBuffer asyncBuffer = null;
! void asyncOutput() {
! synchronized (writing) {
! try {
! while (true) {
! if (asyncBuffer == null) {
! asyncBuffer = asyncOutputQ.poll();
! if (asyncBuffer == null) {
! return;
}
}
! if (!asyncBuffer.hasRemaining()) {
! asyncBuffer = null;
! continue;
}
! int n = chan.write(asyncBuffer);
! //System.err.printf("Written %d bytes to chan\n", n);
if (n == 0) {
client.registerEvent(new WriteEvent());
return;
}
}
} catch (IOException e) {
shutdown();
}
}
- }
-
- @Override
- long write(ByteBuffer buffer) throws IOException {
- if (mode != Mode.ASYNC)
- return chan.write(buffer);
- // async
- synchronized(writing) {
- int qlen = asyncOutputQ.size();
- long n = buffer.remaining();
- asyncOutputQ.put(buffer);
- if (qlen == 0)
- asyncOutput();
- return n;
- }
- }
@Override
public String toString() {
return "PlainHttpConnection: " + super.toString();
}
--- 146,220 ----
@Override
long write(ByteBuffer[] buffers, int start, int number) throws IOException {
if (mode != Mode.ASYNC)
return chan.write(buffers, start, number);
// async
! buffers = Utils.reduce(buffers, start, number);
! long n = Utils.remaining(buffers);
! asyncOutputQ.put(buffers);
! flushAsync();
return n;
}
+
+ @Override
+ long write(ByteBuffer buffer) throws IOException {
+ if (mode != Mode.ASYNC)
+ return chan.write(buffer);
+ // async
+ long n = buffer.remaining();
+ asyncOutputQ.put(new ByteBuffer[]{buffer});
+ flushAsync();
+ return n;
}
! // handle registered WriteEvent; invoked from SelectorManager thread
! void flushRegistered() {
! if (mode == Mode.ASYNC) {
! asyncOutputQ.flushDelayed();
! }
! }
! @Override
! public void writeAsync(ByteBuffer[] buffers) throws IOException {
! if (mode != Mode.ASYNC) {
! write(buffers, 0, buffers.length);
! } else {
! asyncOutputQ.put(buffers);
}
}
!
! @Override
! public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException {
! if (mode != Mode.ASYNC) {
! write(buffers, 0, buffers.length);
! } else {
! // Unordered frames are sent before existing frames.
! asyncOutputQ.putFirst(buffers);
! }
! }
!
! @Override
! public void flushAsync() {
! if (mode == Mode.ASYNC) {
! asyncOutputQ.flush();
}
! }
!
! void asyncOutput(ByteBuffer[] bufs, Consumer<ByteBuffer[]> setDelayCallback) {
! try {
! while (Utils.remaining(bufs) > 0) {
! long n = chan.write(bufs);
if (n == 0) {
+ setDelayCallback.accept(bufs);
client.registerEvent(new WriteEvent());
return;
}
}
} catch (IOException e) {
shutdown();
}
}
@Override
public String toString() {
return "PlainHttpConnection: " + super.toString();
}
*** 303,313 ****
return SelectionKey.OP_WRITE;
}
@Override
public void handle() {
! asyncOutput();
}
@Override
public void abort() {
shutdown();
--- 321,331 ----
return SelectionKey.OP_WRITE;
}
@Override
public void handle() {
! flushRegistered();
}
@Override
public void abort() {
shutdown();
*** 384,403 ****
@Override
public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
Consumer<Throwable> errorReceiver) {
this.asyncReceiver = asyncReceiver;
this.errorReceiver = errorReceiver;
! asyncOutputQ = new Queue<>();
! asyncOutputQ.registerPutCallback(this::asyncOutput);
}
@Override
CompletableFuture<Void> whenReceivingResponse() {
CompletableFuture<Void> cf = new CompletableFuture<>();
- try {
client.registerEvent(new ReceiveResponseEvent(cf));
- } catch (IOException e) {
- cf.completeExceptionally(e);
- }
return cf;
}
}
--- 402,416 ----
@Override
public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
Consumer<Throwable> errorReceiver) {
this.asyncReceiver = asyncReceiver;
this.errorReceiver = errorReceiver;
! this.asyncOutputQ = new AsyncWriteQueue<>(this::asyncOutput);
}
@Override
CompletableFuture<Void> whenReceivingResponse() {
CompletableFuture<Void> cf = new CompletableFuture<>();
client.registerEvent(new ReceiveResponseEvent(cf));
return cf;
}
}
< prev index next >