< prev index next >

src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java

Print this page
rev 15335 : Async Queues

*** 48,70 **** class PlainHttpConnection extends HttpConnection implements AsyncConnection { protected SocketChannel chan; private volatile boolean connected; private boolean closed; ! Consumer<ByteBuffer> asyncReceiver; ! Consumer<Throwable> errorReceiver; ! Queue<ByteBuffer> asyncOutputQ; final Object reading = new Object(); ! final Object writing = new Object(); @Override public void startReading() { - try { client.registerEvent(new ReadEvent()); - } catch (IOException e) { - shutdown(); - } } class ConnectEvent extends AsyncEvent { CompletableFuture<Void> cf; --- 48,77 ---- class PlainHttpConnection extends HttpConnection implements AsyncConnection { protected SocketChannel chan; private volatile boolean connected; private boolean closed; ! ! // should be volatile to provide proper synchronization action ! volatile Consumer<ByteBuffer> asyncReceiver; ! ! // should be volatile to provide proper synchronization action ! volatile Consumer<Throwable> errorReceiver; ! ! // asyncOutputQ may be non volatile if and only if the following conditions are satisfied: ! // 1. Writing to the field is performed BEFORE setting field HttpConnection.mode (which is volatile) to ASYNC. ! // That provides proper "release" operation. ! // 2. Reading from the field is performed AFTER checking that HttpConnection.mode set to ASYNC. ! // That provides proper "acquire" action. ! AsyncWriteQueue<ByteBuffer[]> asyncOutputQ; ! final Object reading = new Object(); ! @Override public void startReading() { client.registerEvent(new ReadEvent()); } class ConnectEvent extends AsyncEvent { CompletableFuture<Void> cf;
*** 139,202 **** @Override long write(ByteBuffer[] buffers, int start, int number) throws IOException { if (mode != Mode.ASYNC) return chan.write(buffers, start, number); // async ! synchronized(writing) { ! int qlen = asyncOutputQ.size(); ! ByteBuffer[] bufs = Utils.reduce(buffers, start, number); ! long n = Utils.remaining(bufs); ! asyncOutputQ.putAll(bufs); ! if (qlen == 0) ! asyncOutput(); return n; } } ! ByteBuffer asyncBuffer = null; ! void asyncOutput() { ! synchronized (writing) { ! try { ! while (true) { ! if (asyncBuffer == null) { ! asyncBuffer = asyncOutputQ.poll(); ! if (asyncBuffer == null) { ! return; } } ! if (!asyncBuffer.hasRemaining()) { ! asyncBuffer = null; ! continue; } ! int n = chan.write(asyncBuffer); ! //System.err.printf("Written %d bytes to chan\n", n); if (n == 0) { client.registerEvent(new WriteEvent()); return; } } } catch (IOException e) { shutdown(); } } - } - - @Override - long write(ByteBuffer buffer) throws IOException { - if (mode != Mode.ASYNC) - return chan.write(buffer); - // async - synchronized(writing) { - int qlen = asyncOutputQ.size(); - long n = buffer.remaining(); - asyncOutputQ.put(buffer); - if (qlen == 0) - asyncOutput(); - return n; - } - } @Override public String toString() { return "PlainHttpConnection: " + super.toString(); } --- 146,220 ---- @Override long write(ByteBuffer[] buffers, int start, int number) throws IOException { if (mode != Mode.ASYNC) return chan.write(buffers, start, number); // async ! buffers = Utils.reduce(buffers, start, number); ! long n = Utils.remaining(buffers); ! asyncOutputQ.put(buffers); ! flushAsync(); return n; } + + @Override + long write(ByteBuffer buffer) throws IOException { + if (mode != Mode.ASYNC) + return chan.write(buffer); + // async + long n = buffer.remaining(); + asyncOutputQ.put(new ByteBuffer[]{buffer}); + flushAsync(); + return n; } ! // handle registered WriteEvent; invoked from SelectorManager thread ! void flushRegistered() { ! if (mode == Mode.ASYNC) { ! asyncOutputQ.flushDelayed(); ! } ! } ! @Override ! public void writeAsync(ByteBuffer[] buffers) throws IOException { ! if (mode != Mode.ASYNC) { ! write(buffers, 0, buffers.length); ! } else { ! asyncOutputQ.put(buffers); } } ! ! @Override ! public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException { ! if (mode != Mode.ASYNC) { ! write(buffers, 0, buffers.length); ! } else { ! // Unordered frames are sent before existing frames. ! asyncOutputQ.putFirst(buffers); ! } ! } ! ! @Override ! public void flushAsync() { ! if (mode == Mode.ASYNC) { ! asyncOutputQ.flush(); } ! } ! ! void asyncOutput(ByteBuffer[] bufs, Consumer<ByteBuffer[]> setDelayCallback) { ! try { ! while (Utils.remaining(bufs) > 0) { ! long n = chan.write(bufs); if (n == 0) { + setDelayCallback.accept(bufs); client.registerEvent(new WriteEvent()); return; } } } catch (IOException e) { shutdown(); } } @Override public String toString() { return "PlainHttpConnection: " + super.toString(); }
*** 303,313 **** return SelectionKey.OP_WRITE; } @Override public void handle() { ! asyncOutput(); } @Override public void abort() { shutdown(); --- 321,331 ---- return SelectionKey.OP_WRITE; } @Override public void handle() { ! flushRegistered(); } @Override public void abort() { shutdown();
*** 384,403 **** @Override public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) { this.asyncReceiver = asyncReceiver; this.errorReceiver = errorReceiver; ! asyncOutputQ = new Queue<>(); ! asyncOutputQ.registerPutCallback(this::asyncOutput); } @Override CompletableFuture<Void> whenReceivingResponse() { CompletableFuture<Void> cf = new CompletableFuture<>(); - try { client.registerEvent(new ReceiveResponseEvent(cf)); - } catch (IOException e) { - cf.completeExceptionally(e); - } return cf; } } --- 402,416 ---- @Override public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) { this.asyncReceiver = asyncReceiver; this.errorReceiver = errorReceiver; ! this.asyncOutputQ = new AsyncWriteQueue<>(this::asyncOutput); } @Override CompletableFuture<Void> whenReceivingResponse() { CompletableFuture<Void> cf = new CompletableFuture<>(); client.registerEvent(new ReceiveResponseEvent(cf)); return cf; } }
< prev index next >