# HG changeset patch # User Sergey Kuksenko # Date 1470872619 25200 # Wed Aug 10 16:43:39 2016 -0700 # Node ID 51045db6ebd2078510568b93dfd5b0403a1699ca # Parent 28e938880be36d002eabaffffd14e7e0d34af4de JDK-8162497 fix obtainSendWindow deadlock diff --git a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java --- a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java +++ b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java @@ -97,7 +97,7 @@ FrameReader reader; // Connection level flow control windows - int sendWindow = INITIAL_WINDOW_SIZE; + final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE); final static int DEFAULT_FRAME_SIZE = 16 * 1024; private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY; @@ -106,7 +106,7 @@ /** * This is established by the protocol spec and the peer will update it with - * WINDOW_UPDATEs, which affects the sendWindow. + * WINDOW_UPDATEs, which affects the connectionSendWindow. */ final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1; @@ -122,7 +122,7 @@ Http2Connection(HttpConnection connection, Http2ClientImpl client2, Exchange exchange) throws IOException, InterruptedException { this.outputQ = new Queue<>(); - String msg = "Connection send window size " + Integer.toString(sendWindow); + String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); Log.logTrace(msg); //this.initialExchange = exchange; @@ -171,7 +171,7 @@ InetSocketAddress proxy = request.proxy(); URI uri = request.uri(); InetSocketAddress addr = Utils.getAddress(request); - String msg = "Connection send window size " + Integer.toString(sendWindow); + String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); Log.logTrace(msg); this.key = keyFor(uri, proxy); this.connection = HttpConnection.getConnection(addr, request, this); @@ -192,29 +192,6 @@ asyncConn.startReading(); } - // NEW - synchronized void obtainSendWindow(int amount) throws InterruptedException { - while (amount > 0) { - int n = Math.min(amount, sendWindow); - sendWindow -= n; - amount -= n; - if (amount > 0) - wait(); - } - } - - synchronized void updateSendWindow(int amount) { - if (sendWindow == 0) { - sendWindow += amount; - notifyAll(); - } else - sendWindow += amount; - } - - synchronized int sendWindow() { - return sendWindow; - } - static String keyFor(HttpConnection connection) { boolean isProxy = connection.isProxied(); boolean isSecure = connection.isSecure(); @@ -466,7 +443,7 @@ private void handleWindowUpdate(WindowUpdateFrame f) throws IOException, InterruptedException { - updateSendWindow(f.getUpdate()); + connectionSendWindow.update(f.getUpdate()); } private void protocolError(int errorCode) diff --git a/src/java.httpclient/share/classes/java/net/http/Stream.java b/src/java.httpclient/share/classes/java/net/http/Stream.java --- a/src/java.httpclient/share/classes/java/net/http/Stream.java +++ b/src/java.httpclient/share/classes/java/net/http/Stream.java @@ -118,10 +118,9 @@ final FlowController userRequestFlowController = new FlowController(); - final FlowController remoteRequestFlowController = - new FlowController(); final FlowController responseFlowController = new FlowController(); + final WindowControl outgoingWindow = new WindowControl(); final ExecutorWrapper executor; @@ -340,9 +339,7 @@ } void incoming_windowUpdate(WindowUpdateFrame frame) { - int amount = frame.getUpdate(); - if (amount > 0) - remoteRequestFlowController.accept(amount); + outgoingWindow.update(frame.getUpdate()); } void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException { @@ -507,9 +504,8 @@ int amount = buffer.remaining(); // wait for flow control if necessary. Following method will block // until after headers frame is sent, so correct streamid is set. - remoteRequestFlowController.take(amount); - connection.obtainSendWindow(amount); - + outgoingWindow.acquire(amount); + connection.connectionSendWindow.acquire(amount); DataFrame df = new DataFrame(); df.streamid(streamid); if (complete) { @@ -677,8 +673,8 @@ } // called from Http2Connection reader thread - synchronized void updateOutgoingWindow(int update) { - remoteRequestFlowController.accept(update); + void updateOutgoingWindow(int update) { + outgoingWindow.update(update); } void close(String msg) { diff --git a/src/java.httpclient/share/classes/java/net/http/WindowControl.java b/src/java.httpclient/share/classes/java/net/http/WindowControl.java new file mode 100644 --- /dev/null +++ b/src/java.httpclient/share/classes/java/net/http/WindowControl.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + */ +package java.net.http; + +import java.util.concurrent.Semaphore; + +/** + * Connection or stream blocking flow control window. + */ +class WindowControl { + + final Semaphore window; + + WindowControl() { + this(0); + } + + WindowControl(int intialPermits) { + this.window = new Semaphore(intialPermits); + } + + void update(int permits) { + /* TODO + RFC-7540 says: + A sender MUST NOT allow a flow-control window to exceed 2^31-1 + octets. If a sender receives a WINDOW_UPDATE that causes a flow- + control window to exceed this maximum, it MUST terminate either the + stream or the connection, as appropriate. For streams, the sender + sends a RST_STREAM with an error code of FLOW_CONTROL_ERROR; for the + connection, a GOAWAY frame with an error code of FLOW_CONTROL_ERROR + is sent. + */ + if (permits > 0) { + window.release(permits); + } + } + + void acquire(int permits) throws InterruptedException { + if (permits > 0) { + window.acquire(permits); + } + } + + int available() { + return window.availablePermits(); + } +} # HG changeset patch # User Sergey Kuksenko # Date 1470872692 25200 # Wed Aug 10 16:44:52 2016 -0700 # Node ID 406ef588c272627212eab4a8d04ab212a3eb638f # Parent 51045db6ebd2078510568b93dfd5b0403a1699ca JDK-8161004 bulk sendWindowUpdate diff --git a/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java b/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java --- a/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java +++ b/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java @@ -148,7 +148,7 @@ frame.setParameter(MAX_CONCURRENT_STREAMS, Utils.getIntegerNetProperty( "java.net.httpclient.maxstreams", 16)); frame.setParameter(INITIAL_WINDOW_SIZE, Utils.getIntegerNetProperty( - "java.net.httpclient.windowsize", 32 * K)); + "java.net.httpclient.windowsize", Http2Connection.INITIAL_WINDOW_SIZE)); frame.setParameter(MAX_FRAME_SIZE, Utils.getIntegerNetProperty( "java.net.httpclient.maxframesize", 16 * K)); frame.computeLength(); diff --git a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java --- a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java +++ b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java @@ -31,7 +31,7 @@ import java.net.http.HttpConnection.Mode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Collection; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -104,6 +104,7 @@ final ExecutorWrapper executor; + WindowUpdateSender windowUpdater; /** * This is established by the protocol spec and the peer will update it with * WINDOW_UPDATEs, which affects the connectionSendWindow. @@ -485,6 +486,13 @@ serverSettings = SettingsFrame.getDefaultSettings(); hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); + + windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) { + @Override + int getStreamId() { + return 0; + } + }; } /** @@ -529,16 +537,11 @@ SettingsFrame sf = client2.getClientSettings(); Log.logFrames(sf, "OUT"); sf.writeOutgoing(bg); - WindowUpdateFrame wup = new WindowUpdateFrame(); - wup.streamid(0); + ba = bg.getBufferArray(); + connection.write(ba, 0, ba.length); // send a Window update for the receive buffer we are using // minus the initial 64 K specified in protocol - wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); - wup.computeLength(); - wup.writeOutgoing(bg); - Log.logFrames(wup, "OUT"); - ba = bg.getBufferArray(); - connection.write(ba, 0, ba.length); + windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); } /** @@ -669,12 +672,6 @@ } } - public void sendFrames(List frames) throws IOException, InterruptedException { - for (Http2Frame frame : frames) { - sendFrame(frame); - } - } - static Throwable getExceptionFrom(CompletableFuture cf) { try { cf.get(); @@ -710,13 +707,22 @@ // body to proceed. stream.updateOutgoingWindow(getInitialSendWindowSize()); LinkedList frames = encodeHeaders(oh); - for (Http2Frame f : frames) { - sendOneFrame(f); + if (frames.size() == 1) { + sendOneFrame(frames.getFirst()); + } else { + // provide protection from inserting unordered frames between Headers and Continuation + // that is a temporal solution until asycn connection queue will have + // atomic implementation of addAll method. + List bufs = new ArrayList<>(); + for (Http2Frame f : frames) { + bufs.addAll(encodeFrame(f)); + } + ByteBuffer[] currentBufs = bufs.toArray(new ByteBuffer[0]); + connection.write(currentBufs, 0, currentBufs.length); } } else { sendOneFrame(frame); } - } catch (IOException e) { if (!closed) { Log.logError(e); @@ -728,7 +734,6 @@ /** * Send a frame. - * * @param frame * @throws IOException */ @@ -741,6 +746,34 @@ connection.write(currentBufs, 0, currentBufs.length); } + /* + * Direct call of the method bypasses synchronization on "sendlock" and + * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. + * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. + */ + void sendUnorderedFrame(Http2Frame frame){ + try { + ByteBufferGenerator bbg = new ByteBufferGenerator(this); + frame.computeLength(); + Log.logFrames(frame, "OUT"); + frame.writeOutgoing(bbg); + ByteBuffer[] currentBufs = bbg.getBufferArray(); + connection.write(currentBufs, 0, currentBufs.length); + } catch (IOException e) { + if (!closed) { + Log.logError(e); + shutdown(e); + } + } + } + + private List encodeFrame(Http2Frame frame) throws IOException { + ByteBufferGenerator bbg = new ByteBufferGenerator(this); + frame.computeLength(); + Log.logFrames(frame, "OUT"); + frame.writeOutgoing(bbg); + return bbg.getBufferList(); + } private SettingsFrame getAckFrame(int streamid) { SettingsFrame frame = new SettingsFrame(); diff --git a/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java b/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java --- a/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java +++ b/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java @@ -159,7 +159,7 @@ f.setParameter(ENABLE_PUSH, 1); f.setParameter(HEADER_TABLE_SIZE, 4 * K); f.setParameter(MAX_CONCURRENT_STREAMS, 35); - f.setParameter(INITIAL_WINDOW_SIZE, 16 * K); + f.setParameter(INITIAL_WINDOW_SIZE, Http2Connection.INITIAL_WINDOW_SIZE); f.setParameter(MAX_FRAME_SIZE, 16 * K); return f; } diff --git a/src/java.httpclient/share/classes/java/net/http/Stream.java b/src/java.httpclient/share/classes/java/net/http/Stream.java --- a/src/java.httpclient/share/classes/java/net/http/Stream.java +++ b/src/java.httpclient/share/classes/java/net/http/Stream.java @@ -112,7 +112,7 @@ HttpResponse.BodyProcessor responseProcessor; final HttpRequest.BodyProcessor requestProcessor; HttpResponse response; - + final WindowUpdateSender windowUpdater; // state flags boolean requestSent, responseReceived; @@ -163,8 +163,10 @@ Http2Frame frame; DataFrame df = null; try { + boolean endOfStream; do { frame = inputQ.take(); + endOfStream = frame.getFlag(DataFrame.END_STREAM); if (!(frame instanceof DataFrame)) { assert false; continue; @@ -176,8 +178,12 @@ responseFlowController.take(); responseProcessor.onResponseBodyChunk(b); } - sendWindowUpdate(len); - } while (!df.getFlag(DataFrame.END_STREAM)); + connection.windowUpdater.update(len); + if(!endOfStream) { + // if we got the last frame in the stream we shouldn't send WindowUpdate + windowUpdater.update(len); + } + } while (!endOfStream); } catch (InterruptedException e) { throw new IOException(e); } @@ -198,22 +204,6 @@ return cf; } - private void sendWindowUpdate(int increment) - throws IOException, InterruptedException { - if (increment == 0) - return; - LinkedList list = new LinkedList<>(); - WindowUpdateFrame frame = new WindowUpdateFrame(); - frame.streamid(streamid); - frame.setUpdate(increment); - list.add(frame); - frame = new WindowUpdateFrame(); - frame.streamid(0); - frame.setUpdate(increment); - list.add(frame); - connection.sendFrames(list); - } - @Override CompletableFuture sendBodyAsync() { final CompletableFuture cf = new CompletableFuture<>(); @@ -245,6 +235,7 @@ this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW this.inputQ = new Queue<>(); + this.windowUpdater = new StreamWindowUpdateSender(connection); } @SuppressWarnings("unchecked") @@ -264,6 +255,7 @@ this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW this.inputQ = new Queue<>(); + this.windowUpdater = new StreamWindowUpdateSender(connection); } /** @@ -849,4 +841,16 @@ resultCF.completeExceptionally(t); } } + + class StreamWindowUpdateSender extends WindowUpdateSender { + + StreamWindowUpdateSender(Http2Connection connection) { + super(connection); + } + + @Override + int getStreamId() { + return streamid; + } + } } diff --git a/src/java.httpclient/share/classes/java/net/http/WindowUpdateSender.java b/src/java.httpclient/share/classes/java/net/http/WindowUpdateSender.java new file mode 100644 --- /dev/null +++ b/src/java.httpclient/share/classes/java/net/http/WindowUpdateSender.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + */ +package java.net.http; + +import java.util.concurrent.atomic.AtomicInteger; + +abstract class WindowUpdateSender { + + + final int limit; + final Http2Connection connection; + final AtomicInteger received = new AtomicInteger(0); + + WindowUpdateSender(Http2Connection connection) { + this(connection, connection.clientSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE)); + } + + WindowUpdateSender(Http2Connection connection, int initWindowSize) { + this(connection, connection.getMaxReceiveFrameSize(), initWindowSize); + } + + WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) { + this.connection = connection; + int v0 = Math.max(0, initWindowSize - maxFrameSize); + int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize; + v1 = v1 * maxFrameSize / 2; + // send WindowUpdate heuristic: + // - we got data near half of window size + // or + // - remaining window size reached max frame size. + limit = Math.min(v0, v1); + } + + abstract int getStreamId(); + + void update(int delta) { + if (received.addAndGet(delta) > limit) { + synchronized (this) { + int tosend = received.get(); + if( tosend > limit) { + received.getAndAdd(-tosend); + sendWindowUpdate(tosend); + } + } + } + } + + void sendWindowUpdate(int delta) { + WindowUpdateFrame frame = new WindowUpdateFrame(); + frame.streamid(getStreamId()); + frame.setUpdate(delta); + connection.sendUnorderedFrame(frame); + } + + +} # HG changeset patch # User Sergey Kuksenko # Date 1470872824 25200 # Wed Aug 10 16:47:04 2016 -0700 # Node ID 5e8923a8c1c08c803a19d3cd31c1959fa6d5e8d2 # Parent 406ef588c272627212eab4a8d04ab212a3eb638f Async Queues diff --git a/src/java.httpclient/share/classes/java/net/http/AsyncConnection.java b/src/java.httpclient/share/classes/java/net/http/AsyncConnection.java --- a/src/java.httpclient/share/classes/java/net/http/AsyncConnection.java +++ b/src/java.httpclient/share/classes/java/net/http/AsyncConnection.java @@ -25,6 +25,7 @@ package java.net.http; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.function.Consumer; @@ -67,4 +68,28 @@ * an event with the selector thread. */ void startReading(); + + /** + * in async mode put buffers into end of send queue. Should be followed by subsequent flushAsync invocation. + * That allows multiple threads to put buffers into queue while some thread is writing. + */ + void writeAsync(ByteBuffer[] buffers) throws IOException; + + /** + * in async mode may put buffers into beginning of send queue, that break packet sequence and write buffers before + * other buffers in queue. + * Should be followed by subsequent flushAsync invocation. + * That allows multiple threads to put buffers into queue while some thread is writing. + */ + void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException; + + + /** + * should be called after any writeAsync/writeAsyncUnordered invocation. + * If there is a race to flushAsync from several threads one thread (race winner) capture flush operation and write the whole + * queue content. Other threads (race losers) exits from the method (not blocking) and continue execution. + */ + void flushAsync(); + + } diff --git a/src/java.httpclient/share/classes/java/net/http/AsyncReadQueue.java b/src/java.httpclient/share/classes/java/net/http/AsyncReadQueue.java new file mode 100644 --- /dev/null +++ b/src/java.httpclient/share/classes/java/net/http/AsyncReadQueue.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.net.http; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; + +class AsyncReadQueue implements Closeable { + + private static final Object POISON = new Object(); + + private volatile boolean isDelayed = false; + private volatile boolean closed; + + private final BiConsumer> consumeAction; + private final BinaryOperator combineFunction; + + private final BlockingDeque delayedQueue = new LinkedBlockingDeque<>(); + private final Object delayedLock = new Object(); + + private T pushbacked = null; + private boolean cancelTransferToDirect; + + public AsyncReadQueue(BiConsumer> consumeAction, BinaryOperator combineFunction) { + this.consumeAction = consumeAction; + this.combineFunction = combineFunction; + } + + public void put(T e) throws IOException { + ensureQueueOpen(); + if (!isDelayed) { + fastPut(e); + } else { + synchronized (delayedLock) { + if (!isDelayed) { + fastPut(e); + } else { + delayedQueue.offerLast(e); + } + } + } + } + + private void ensureQueueOpen() throws IOException { + if (closed) { + throw new IOException("Queue closed"); + } + } + + private void fastPut(T e) { + assert !isDelayed; + if (pushbacked != null) { + e = combineFunction.apply(pushbacked, e); + pushbacked = null; + } + consumeAction.accept(e, this); + } + + public void pushback(T e) throws IOException { + ensureQueueOpen(); + if (isDelayed) { + T next = checkPoison(delayedQueue.pollFirst()); + if (next != null) { + e = combineFunction.apply(e, next); + } + delayedQueue.offerFirst(e); + } else { + assert pushbacked == null; + pushbacked = e; + } + } + + public T take() throws IOException { + ensureQueueOpen(); + if (isDelayed) { + try { + return checkPoison(delayedQueue.takeFirst()); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } else { + throw new IOException("Illegal queue operation"); + } + } + + @SuppressWarnings("unchecked") + private T checkPoison(Object v) throws IOException { + if (v == POISON) { + delayedQueue.add(POISON); // wake up other blocked threads + throw new IOException("Queue closed"); + } else { + return (T) v; + } + } + + public void setDelayed(T delayedElement) { + if (isDelayed) { // setDelay was invoked at setDirect flush period + synchronized (delayedLock) { + if(delayedElement!=null) { + delayedQueue.offerFirst(delayedElement); + } + cancelTransferToDirect = true; + } + } else { + if(delayedElement!=null) { + delayedQueue.offerFirst(delayedElement); + } + isDelayed = true; + } + } + + public void setDirect() throws IOException { + if (isDelayed) { + synchronized (delayedLock) { + if (isDelayed) { + cancelTransferToDirect = false; + T e = checkPoison(delayedQueue.pollFirst()); + while (e != null) { + boolean hasNext = delayedQueue.isEmpty(); + consumeAction.accept(e, this); + if(cancelTransferToDirect) { + return; + } + e = checkPoison(delayedQueue.pollFirst()); + if (!hasNext && e != null) { + // last element was pushbacked and can't be processed without next element + pushbacked = e; + break; + } + } + isDelayed = false; + } + } + } + } + + @Override + public void close() { + closed = true; + delayedQueue.add(POISON); + } + +} diff --git a/src/java.httpclient/share/classes/java/net/http/AsyncSSLConnection.java b/src/java.httpclient/share/classes/java/net/http/AsyncSSLConnection.java --- a/src/java.httpclient/share/classes/java/net/http/AsyncSSLConnection.java +++ b/src/java.httpclient/share/classes/java/net/http/AsyncSSLConnection.java @@ -45,6 +45,11 @@ sslDelegate = new AsyncSSLDelegate(delegate, client, ap); } + synchronized void configureMode(Mode mode) throws IOException { + super.configureMode(mode); + delegate.configureMode(mode); + } + @Override public void connect() throws IOException, InterruptedException { delegate.connect(); @@ -81,21 +86,46 @@ } @Override - synchronized long write(ByteBuffer[] buffers, int start, int number) throws IOException { + long write(ByteBuffer[] buffers, int start, int number) throws IOException { ByteBuffer[] bufs = Utils.reduce(buffers, start, number); long n = Utils.remaining(bufs); - sslDelegate.write(bufs); + sslDelegate.writeAsync(bufs); + sslDelegate.flushAsync(); return n; } @Override long write(ByteBuffer buffer) throws IOException { long n = buffer.remaining(); - sslDelegate.write(buffer); + sslDelegate.writeAsync(new ByteBuffer[]{buffer}); + sslDelegate.flushAsync(); return n; } @Override + public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException { + if (mode != Mode.ASYNC) { + write(buffers, 0 , buffers.length); + } else { + sslDelegate.writeAsyncUnordered(buffers); + } + } + + @Override + public void writeAsync(ByteBuffer[] buffers) throws IOException { + if (mode != Mode.ASYNC) { + write(buffers, 0 , buffers.length); + } else { + sslDelegate.writeAsync(buffers); + } + } + + @Override + public void flushAsync() { + sslDelegate.flushAsync(); + } + + @Override public void close() { Utils.close(sslDelegate, delegate.channel()); } diff --git a/src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java b/src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java --- a/src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java +++ b/src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java @@ -87,14 +87,14 @@ // outgoing buffers put in this queue first and may remain here // while SSL handshaking happening. - final Queue appOutputQ; + final AsyncWriteQueue appOutputQ; // queue of wrapped ByteBuffers waiting to be sent on socket channel //final Queue channelOutputQ; // Bytes read into this queue before being unwrapped. Backup on this // Q should only happen when the engine is stalled due to delegated tasks - final Queue channelInputQ; + final AsyncReadQueue channelInputQ; // input occurs through the read() method which is expected to be called // when the selector signals some data is waiting to be read. All incoming @@ -105,20 +105,14 @@ final SSLEngine engine; final SSLParameters sslParameters; //final SocketChannel chan; - final HttpConnection lowerOutput; + final AsyncConnection lowerOutput; final HttpClientImpl client; final ExecutorService executor; final BufferHandler bufPool; Consumer receiver; Consumer errorHandler; - // Locks. - final Object reader = new Object(); - final Object writer = new Object(); // synchronizing handshake state final Object handshaker = new Object(); - // flag set when reader or writer is blocked waiting for handshake to finish - boolean writerBlocked; - boolean readerBlocked; // some thread is currently doing the handshake boolean handshaking; @@ -130,8 +124,7 @@ SSLContext context = client.sslContext(); executor = client.executorService(); bufPool = client; - appOutputQ = new Queue<>(); - appOutputQ.registerPutCallback(this::upperWrite); + appOutputQ = new AsyncWriteQueue<>(this::upperWrite); //channelOutputQ = new Queue<>(); //channelOutputQ.registerPutCallback(this::lowerWrite); engine = context.createSSLEngine(); @@ -148,10 +141,9 @@ } logParams(sslParameters); engine.setSSLParameters(sslParameters); - this.lowerOutput = lowerOutput; + this.lowerOutput = (AsyncConnection)lowerOutput; this.client = client; - this.channelInputQ = new Queue<>(); - this.channelInputQ.registerPutCallback(this::upperRead); + this.channelInputQ = new AsyncReadQueue<>(this::upperRead,this::combine); } /** @@ -159,62 +151,43 @@ * * @param src */ - public void write(ByteBuffer[] src) throws IOException { - appOutputQ.putAll(src); + @Override + public void writeAsync(ByteBuffer[] src) throws IOException { + appOutputQ.put(src); } - public void write(ByteBuffer buf) throws IOException { - ByteBuffer[] a = new ByteBuffer[1]; - a[0] = buf; - write(a); + @Override + public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException { + appOutputQ.putFirst(buffers); + } + + @Override + public void flushAsync() { + if(appOutputQ.flush()) { + lowerOutput.flushAsync(); + } } @Override public void close() { - Utils.close(appOutputQ, channelInputQ, lowerOutput); + Utils.close( channelInputQ, (HttpConnection)lowerOutput); } - /** - * Attempts to wrap buffers from appOutputQ and place them on the - * channelOutputQ for writing. If handshaking is happening, then the - * process stalls and last buffers taken off the appOutputQ are put back - * into it until handshaking completes. - * - * This same method is called to try and resume output after a blocking - * handshaking operation has completed. - */ - private void upperWrite() { + private void upperWrite(ByteBuffer[] buffers, Consumer setDelayCallback) { try { - EngineResult r = null; - ByteBuffer[] buffers = appOutputQ.pollAll(Utils.EMPTY_BB_ARRAY); int bytes = Utils.remaining(buffers); while (bytes > 0) { - synchronized (writer) { - r = wrapBuffers(buffers); - int bytesProduced = r.bytesProduced(); - int bytesConsumed = r.bytesConsumed(); - bytes -= bytesConsumed; - if (bytesProduced > 0) { - // pass destination buffer to channelOutputQ. - lowerOutput.write(r.destBuffer); - } - synchronized (handshaker) { - if (r.handshaking()) { - // handshaking is happening or is needed - // so we put the buffers back on Q to process again - // later. It's possible that some may have already - // been processed, which is ok. - appOutputQ.pushbackAll(buffers); - writerBlocked = true; - if (!handshaking()) { - // execute the handshake in another thread. - // This method will be called again to resume sending - // later - doHandshake(r); - } - return; - } - } + EngineResult r = wrapBuffers(buffers); + int bytesProduced = r.bytesProduced(); + int bytesConsumed = r.bytesConsumed(); + bytes -= bytesConsumed; + if (bytesProduced > 0) { + lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer}); + } + if (r.handshaking()) { + setDelayCallback.accept(buffers); + doHandshake(r); + return; } } returnBuffers(buffers); @@ -225,17 +198,26 @@ } private void doHandshake(EngineResult r) { - handshaking = true; - channelInputQ.registerPutCallback(null); - executor.execute(() -> { - try { - doHandshakeImpl(r); - channelInputQ.registerPutCallback(this::upperRead); - } catch (Throwable t) { - close(); - errorHandler.accept(t); + synchronized (handshaker) { + if (!handshaking()) { + // execute the handshake in another thread. + // This method will be called again to resume sending + // later + handshaking = true; + channelInputQ.setDelayed(null); + executor.execute(() -> { + try { + doHandshakeImpl(r); + appOutputQ.flushDelayed(); + lowerOutput.flushAsync(); + channelInputQ.setDirect(); + } catch (Throwable t) { + close(); + errorHandler.accept(t); + } + }); } - }); + } } private void returnBuffers(ByteBuffer[] bufs) { @@ -276,24 +258,10 @@ if (!r.handshaking()) break; } - boolean dowrite = false; - boolean doread = false; // Handshake is finished. Now resume reading and/or writing synchronized(handshaker) { handshaking = false; - if (writerBlocked) { - writerBlocked = false; - dowrite = true; - } - if (readerBlocked) { - readerBlocked = false; - doread = true; - } } - if (dowrite) - upperWrite(); - if (doread) - upperRead(); } // acknowledge a received CLOSE request from peer @@ -356,7 +324,8 @@ EngineResult handshakeWrapAndSend() throws IOException { EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER); if (r.bytesProduced() > 0) { - lowerOutput.write(r.destBuffer); + lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer}); + lowerOutput.flushAsync(); } return r; } @@ -457,65 +426,37 @@ } } - public void upperRead() { + public void upperRead(ByteBuffer srcbuf, AsyncReadQueue inputQ) { EngineResult r; - ByteBuffer srcbuf; - synchronized (reader) { + try { - srcbuf = channelInputQ.poll(); - if (srcbuf == null) { - return; - } - while (true) { + while (srcbuf.hasRemaining()) { r = unwrapBuffer(srcbuf); switch (r.result.getStatus()) { case BUFFER_UNDERFLOW: // Buffer too small. Need to combine with next buf - ByteBuffer nextBuf = channelInputQ.poll(); - if (nextBuf == null) { - // no data available. push buffer back until more data available - channelInputQ.pushback(srcbuf); - return; - } else { - srcbuf = combine(srcbuf, nextBuf); - } - break; + inputQ.pushback(srcbuf); + return; case OK: // check for any handshaking work - synchronized (handshaker) { - if (r.handshaking()) { + if (r.handshaking()) { // handshaking is happening or is needed // so we put the buffer back on Q to process again // later. - channelInputQ.pushback(srcbuf); - readerBlocked = true; - if (!handshaking()) { - // execute the handshake in another thread. - // This method will be called again to resume sending - // later - doHandshake(r); - } + inputQ.setDelayed(srcbuf); + doHandshake(r); return; - } } ByteBuffer dst = r.destBuffer; if (dst.hasRemaining()) { receiver.accept(dst); } } - if (srcbuf.hasRemaining()) { - continue; - } - srcbuf = channelInputQ.poll(); - if (srcbuf == null) { - return; - } } } catch (Throwable t) { close(); errorHandler.accept(t); } - } } /** diff --git a/src/java.httpclient/share/classes/java/net/http/AsyncWriteQueue.java b/src/java.httpclient/share/classes/java/net/http/AsyncWriteQueue.java new file mode 100644 --- /dev/null +++ b/src/java.httpclient/share/classes/java/net/http/AsyncWriteQueue.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package java.net.http; + + +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +class AsyncWriteQueue { + + private static final int IDLE = 0; // nobody is flushing from the queue + private static final int FLUSHING = 1; // there is the only thread flushing from the queue + private static final int REFLUSHING = 2; // while one thread was flushing from the queue + // the other thread put data into the queue. + // flushing thread should recheck queue before switching to idle state. + private static final int DELAYED = 3; // flushing is delayed + // either by PlainHttpConnection.WriteEvent registration, or + // SSL handshaking + + private final AtomicInteger state = new AtomicInteger(IDLE); + private final Deque queue; + private final BiConsumer> consumeAction; + private final Consumer setDelayedCallback; + + private T delayedElement = null; + + AsyncWriteQueue(BiConsumer> consumeAction) { + this.consumeAction = consumeAction; + this.queue = new ConcurrentLinkedDeque<>(); + this.setDelayedCallback = this::setDelayed; + } + + void put(T e) { + queue.addLast(e); + } + + void putFirst(T e) { + queue.addFirst(e); + } + + boolean flush() { + boolean doFlush = false; + Loop: while(true) { + switch (state.get()) { + case IDLE: + if(state.compareAndSet(IDLE, FLUSHING)) { + doFlush = true; + break Loop; + } + break; + case FLUSHING: + if(state.compareAndSet(FLUSHING, REFLUSHING)) { + break Loop; + } + break; + case REFLUSHING: + case DELAYED: + break Loop; + } + } + if(doFlush) { + flushLoop(); + } + return doFlush; + } + + /* + * race invocations of flushDelayed are not allowed. + * flushDelayed should be invoked only from: + * - SelectorManager thread + * - Handshaking thread + */ + void flushDelayed() { + if(!state.compareAndSet(DELAYED, FLUSHING)) { + throw new RuntimeException("Shouldn't happen"); + } + flushLoop(); + } + + private void flushLoop() { + T element = delayedElement; + delayedElement = null; + while(true) { + if(element == null) { + element = queue.poll(); + } + while (element != null) { + consumeAction.accept(element, setDelayedCallback); + if (state.get() == DELAYED) { + return; + } + element = queue.poll(); + } + switch (state.get()) { + case IDLE: + case DELAYED: + throw new RuntimeException("Shouldn't happen"); + case FLUSHING: + if(state.compareAndSet(FLUSHING, IDLE)) { + return; + } + break; + case REFLUSHING: + state.compareAndSet(REFLUSHING, FLUSHING); + break; + } + } + } + + + public void setDelayed(T delayedElement) { + while(true) { + int state = this.state.get(); + switch (state) { + case IDLE: + case DELAYED: + throw new RuntimeException("Shouldn't happen"); + case FLUSHING: + case REFLUSHING: + if(this.state.compareAndSet(state, DELAYED)) { + this.delayedElement = delayedElement; + return; + } + break; + } + } + + } +} diff --git a/src/java.httpclient/share/classes/java/net/http/ClosableBlockingQueue.java b/src/java.httpclient/share/classes/java/net/http/ClosableBlockingQueue.java new file mode 100644 --- /dev/null +++ b/src/java.httpclient/share/classes/java/net/http/ClosableBlockingQueue.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package java.net.http; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class ClosableBlockingQueue implements Closeable { + + private static final Object POISON = new Object(); + + private volatile boolean closed; + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + + public void put(T obj) throws IOException { + if (!closed) { + queue.add(obj); + } else { + throw new IOException("stream closed"); + } + } + + @Override + public void close() { + closed = true; + queue.add(POISON); + } + + @SuppressWarnings("unchecked") + public T take() throws IOException { + if (closed) { + throw new IOException("stream closed"); + } + try { + Object v = queue.take(); + if (v == POISON) { + queue.add(POISON); // wake up other blocked threads + throw new IOException("stream closed"); + } else { + return (T) v; + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + +} diff --git a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java --- a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java +++ b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java @@ -31,7 +31,6 @@ import java.net.http.HttpConnection.Mode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -78,11 +77,11 @@ */ class Http2Connection implements BufferHandler { - final Queue outputQ; volatile boolean closed; //------------------------------------- final HttpConnection connection; + final AsyncConnection connectionAsync; HttpClientImpl client; final Http2ClientImpl client2; Map streams; @@ -91,7 +90,6 @@ Encoder hpackOut; Decoder hpackIn; SettingsFrame clientSettings, serverSettings; - ByteBufferConsumer bbc; final LinkedList freeList; final String key; // for HttpClientImpl.connections map FrameReader reader; @@ -122,13 +120,13 @@ */ Http2Connection(HttpConnection connection, Http2ClientImpl client2, Exchange exchange) throws IOException, InterruptedException { - this.outputQ = new Queue<>(); String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); Log.logTrace(msg); //this.initialExchange = exchange; assert !(connection instanceof SSLConnection); this.connection = connection; + this.connectionAsync = (AsyncConnection)connection; this.client = client2.client(); this.client2 = client2; this.executor = client.executorWrapper(); @@ -141,13 +139,12 @@ initialStream.registerStream(1); initialStream.requestSent(); sendConnectionPreface(); - connection.configureMode(Mode.ASYNC); // start reading and writing // start reading - AsyncConnection asyncConn = (AsyncConnection)connection; - asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown); + connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown); + connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. asyncReceive(connection.getRemaining()); - asyncConn.startReading(); + connectionAsync.startReading(); } // async style but completes immediately @@ -176,21 +173,21 @@ Log.logTrace(msg); this.key = keyFor(uri, proxy); this.connection = HttpConnection.getConnection(addr, request, this); + this.connectionAsync = (AsyncConnection)connection; streams = Collections.synchronizedMap(new HashMap<>()); this.client = request.client(); this.client2 = client.client2(); this.executor = client.executorWrapper(); this.freeList = new LinkedList<>(); - this.outputQ = new Queue<>(); nextstreamid = 1; initCommon(); connection.connect(); - connection.configureMode(Mode.ASYNC); + sendConnectionPreface(); // start reading - AsyncConnection asyncConn = (AsyncConnection)connection; - asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown); - sendConnectionPreface(); - asyncConn.startReading(); + connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown); + connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. + + connectionAsync.startReading(); } static String keyFor(HttpConnection connection) { @@ -470,7 +467,7 @@ private void handlePing(PingFrame frame) throws IOException, InterruptedException { frame.setFlag(PingFrame.ACK); - sendFrame(frame); + sendUnorderedFrame(frame); } private void handleGoAway(GoAwayFrame frame) @@ -530,15 +527,11 @@ private void sendConnectionPreface() throws IOException { ByteBufferGenerator bg = new ByteBufferGenerator(this); bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES); - ByteBuffer[] ba = bg.getBufferArray(); - connection.write(ba, 0, ba.length); - - bg = new ByteBufferGenerator(this); SettingsFrame sf = client2.getClientSettings(); Log.logFrames(sf, "OUT"); sf.writeOutgoing(bg); - ba = bg.getBufferArray(); - connection.write(ba, 0, ba.length); + ByteBuffer[] ba = bg.getBufferArray(); + connection.write(ba, 0, ba.length); // write is performed before switch to async mode // send a Window update for the receive buffer we are using // minus the initial 64 K specified in protocol windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); @@ -697,6 +690,7 @@ void sendFrame(Http2Frame frame) { synchronized (sendlock) { try { + ByteBuffer[] bufs; if (frame instanceof OutgoingHeaders) { OutgoingHeaders oh = (OutgoingHeaders) frame; Stream stream = oh.getStream(); @@ -706,44 +700,53 @@ // set outgoing window here. This allows thread sending // body to proceed. stream.updateOutgoingWindow(getInitialSendWindowSize()); - LinkedList frames = encodeHeaders(oh); - if (frames.size() == 1) { - sendOneFrame(frames.getFirst()); - } else { - // provide protection from inserting unordered frames between Headers and Continuation - // that is a temporal solution until asycn connection queue will have - // atomic implementation of addAll method. - List bufs = new ArrayList<>(); - for (Http2Frame f : frames) { - bufs.addAll(encodeFrame(f)); - } - ByteBuffer[] currentBufs = bufs.toArray(new ByteBuffer[0]); - connection.write(currentBufs, 0, currentBufs.length); - } + bufs = encodeFrames(encodeHeaders(oh)); } else { - sendOneFrame(frame); + bufs = encodeFrame(frame); } + connectionAsync.writeAsync(bufs); + } catch (IOException e) { if (!closed) { Log.logError(e); shutdown(e); } + return; } } + connectionAsync.flushAsync(); } - /** - * Send a frame. - * @param frame - * @throws IOException - */ - private void sendOneFrame(Http2Frame frame) throws IOException { + private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException { ByteBufferGenerator bbg = new ByteBufferGenerator(this); frame.computeLength(); Log.logFrames(frame, "OUT"); frame.writeOutgoing(bbg); - ByteBuffer[] currentBufs = bbg.getBufferArray(); - connection.write(currentBufs, 0, currentBufs.length); + return bbg.getBufferList().toArray(new ByteBuffer[0]); + } + + private ByteBuffer[] encodeFrames(List frames) throws IOException { + List bufs = new ArrayList<>(); + for(Http2Frame frame : frames) { + ByteBufferGenerator bbg = new ByteBufferGenerator(this); + frame.computeLength(); + Log.logFrames(frame, "OUT"); + frame.writeOutgoing(bbg); + bufs.addAll(bbg.getBufferList()); + } + return bufs.toArray(new ByteBuffer[0]); + } + + void sendDataFrame(DataFrame frame) { + try { + connectionAsync.writeAsync(encodeFrame(frame)); + connectionAsync.flushAsync(); + } catch (IOException e) { + if (!closed) { + Log.logError(e); + shutdown(e); + } + } } /* @@ -753,12 +756,8 @@ */ void sendUnorderedFrame(Http2Frame frame){ try { - ByteBufferGenerator bbg = new ByteBufferGenerator(this); - frame.computeLength(); - Log.logFrames(frame, "OUT"); - frame.writeOutgoing(bbg); - ByteBuffer[] currentBufs = bbg.getBufferArray(); - connection.write(currentBufs, 0, currentBufs.length); + connectionAsync.writeAsyncUnordered(encodeFrame(frame)); + connectionAsync.flushAsync(); } catch (IOException e) { if (!closed) { Log.logError(e); @@ -767,14 +766,6 @@ } } - private List encodeFrame(Http2Frame frame) throws IOException { - ByteBufferGenerator bbg = new ByteBufferGenerator(this); - frame.computeLength(); - Log.logFrames(frame, "OUT"); - frame.writeOutgoing(bbg); - return bbg.getBufferList(); - } - private SettingsFrame getAckFrame(int streamid) { SettingsFrame frame = new SettingsFrame(); frame.setFlag(SettingsFrame.ACK); diff --git a/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java b/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java --- a/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java +++ b/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java @@ -154,7 +154,7 @@ * * If exchange needs to block again, then call registerEvent() again */ - void registerEvent(AsyncEvent exchange) throws IOException { + void registerEvent(AsyncEvent exchange) { selmgr.register(exchange); } @@ -227,7 +227,7 @@ // This returns immediately. So caller not allowed to send/receive // on connection. - synchronized void register(AsyncEvent e) throws IOException { + synchronized void register(AsyncEvent e) { registrations.add(e); selector.wakeup(); } diff --git a/src/java.httpclient/share/classes/java/net/http/HttpConnection.java b/src/java.httpclient/share/classes/java/net/http/HttpConnection.java --- a/src/java.httpclient/share/classes/java/net/http/HttpConnection.java +++ b/src/java.httpclient/share/classes/java/net/http/HttpConnection.java @@ -53,7 +53,9 @@ ASYNC } - protected Mode mode; + // mode should be volatile, because of reading of the field is not protected by any synchronization + protected volatile Mode mode; + // address we are connected to. Could be a server or a proxy final InetSocketAddress address; diff --git a/src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java b/src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java --- a/src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java +++ b/src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java @@ -50,19 +50,26 @@ protected SocketChannel chan; private volatile boolean connected; private boolean closed; - Consumer asyncReceiver; - Consumer errorReceiver; - Queue asyncOutputQ; + + // should be volatile to provide proper synchronization action + volatile Consumer asyncReceiver; + + // should be volatile to provide proper synchronization action + volatile Consumer errorReceiver; + + // asyncOutputQ may be non volatile if and only if the following conditions are satisfied: + // 1. Writing to the field is performed BEFORE setting field HttpConnection.mode (which is volatile) to ASYNC. + // That provides proper "release" operation. + // 2. Reading from the field is performed AFTER checking that HttpConnection.mode set to ASYNC. + // That provides proper "acquire" action. + AsyncWriteQueue asyncOutputQ; + final Object reading = new Object(); - final Object writing = new Object(); + @Override public void startReading() { - try { - client.registerEvent(new ReadEvent()); - } catch (IOException e) { - shutdown(); - } + client.registerEvent(new ReadEvent()); } class ConnectEvent extends AsyncEvent { @@ -141,44 +148,11 @@ if (mode != Mode.ASYNC) return chan.write(buffers, start, number); // async - synchronized(writing) { - int qlen = asyncOutputQ.size(); - ByteBuffer[] bufs = Utils.reduce(buffers, start, number); - long n = Utils.remaining(bufs); - asyncOutputQ.putAll(bufs); - if (qlen == 0) - asyncOutput(); - return n; - } - } - - ByteBuffer asyncBuffer = null; - - void asyncOutput() { - synchronized (writing) { - try { - while (true) { - if (asyncBuffer == null) { - asyncBuffer = asyncOutputQ.poll(); - if (asyncBuffer == null) { - return; - } - } - if (!asyncBuffer.hasRemaining()) { - asyncBuffer = null; - continue; - } - int n = chan.write(asyncBuffer); - //System.err.printf("Written %d bytes to chan\n", n); - if (n == 0) { - client.registerEvent(new WriteEvent()); - return; - } - } - } catch (IOException e) { - shutdown(); - } - } + buffers = Utils.reduce(buffers, start, number); + long n = Utils.remaining(buffers); + asyncOutputQ.put(buffers); + flushAsync(); + return n; } @Override @@ -186,13 +160,57 @@ if (mode != Mode.ASYNC) return chan.write(buffer); // async - synchronized(writing) { - int qlen = asyncOutputQ.size(); - long n = buffer.remaining(); - asyncOutputQ.put(buffer); - if (qlen == 0) - asyncOutput(); - return n; + long n = buffer.remaining(); + asyncOutputQ.put(new ByteBuffer[]{buffer}); + flushAsync(); + return n; + } + + // handle registered WriteEvent; invoked from SelectorManager thread + void flushRegistered() { + if (mode == Mode.ASYNC) { + asyncOutputQ.flushDelayed(); + } + } + + @Override + public void writeAsync(ByteBuffer[] buffers) throws IOException { + if (mode != Mode.ASYNC) { + write(buffers, 0, buffers.length); + } else { + asyncOutputQ.put(buffers); + } + } + + @Override + public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException { + if (mode != Mode.ASYNC) { + write(buffers, 0, buffers.length); + } else { + // Unordered frames are sent before existing frames. + asyncOutputQ.putFirst(buffers); + } + } + + @Override + public void flushAsync() { + if (mode == Mode.ASYNC) { + asyncOutputQ.flush(); + } + } + + void asyncOutput(ByteBuffer[] bufs, Consumer setDelayCallback) { + try { + while (Utils.remaining(bufs) > 0) { + long n = chan.write(bufs); + if (n == 0) { + setDelayCallback.accept(bufs); + client.registerEvent(new WriteEvent()); + return; + } + } + } catch (IOException e) { + shutdown(); } } @@ -305,7 +323,7 @@ @Override public void handle() { - asyncOutput(); + flushRegistered(); } @Override @@ -386,18 +404,13 @@ Consumer errorReceiver) { this.asyncReceiver = asyncReceiver; this.errorReceiver = errorReceiver; - asyncOutputQ = new Queue<>(); - asyncOutputQ.registerPutCallback(this::asyncOutput); + this.asyncOutputQ = new AsyncWriteQueue<>(this::asyncOutput); } @Override CompletableFuture whenReceivingResponse() { CompletableFuture cf = new CompletableFuture<>(); - try { - client.registerEvent(new ReceiveResponseEvent(cf)); - } catch (IOException e) { - cf.completeExceptionally(e); - } + client.registerEvent(new ReceiveResponseEvent(cf)); return cf; } } diff --git a/src/java.httpclient/share/classes/java/net/http/Stream.java b/src/java.httpclient/share/classes/java/net/http/Stream.java --- a/src/java.httpclient/share/classes/java/net/http/Stream.java +++ b/src/java.httpclient/share/classes/java/net/http/Stream.java @@ -94,7 +94,7 @@ */ class Stream extends ExchangeImpl { - final Queue inputQ; + final ClosableBlockingQueue inputQ = new ClosableBlockingQueue<>(); volatile int streamid; @@ -160,18 +160,12 @@ // pushes entire response body into response processor // blocking when required by local or remote flow control void receiveData() throws IOException { - Http2Frame frame; DataFrame df = null; try { boolean endOfStream; do { - frame = inputQ.take(); - endOfStream = frame.getFlag(DataFrame.END_STREAM); - if (!(frame instanceof DataFrame)) { - assert false; - continue; - } - df = (DataFrame) frame; + df = inputQ.take(); + endOfStream = df.getFlag(DataFrame.END_STREAM); int len = df.getDataLength(); ByteBuffer[] buffers = df.getData(); for (ByteBuffer b : buffers) { @@ -234,7 +228,6 @@ //this.response_cf = new CompletableFuture(); this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW - this.inputQ = new Queue<>(); this.windowUpdater = new StreamWindowUpdateSender(connection); } @@ -254,7 +247,6 @@ //this.response_cf = new CompletableFuture(); this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW - this.inputQ = new Queue<>(); this.windowUpdater = new StreamWindowUpdateSender(connection); } @@ -272,7 +264,7 @@ // It's okay if there are multiple HeaderFrames. handleResponse(); } else if (frame instanceof DataFrame) { - inputQ.put(frame); + inputQ.put((DataFrame)frame); } else { otherFrame(frame); } @@ -613,7 +605,7 @@ do { df = getDataFrame(); // TODO: check accumulated content length (if not checked below) - connection.sendFrame(df); + connection.sendDataFrame(df); } while (!df.getFlag(DataFrame.END_STREAM)); requestSent(); }