diff -r add6b5c0ab15 src/java.httpclient/share/classes/java/net/http/Http2Connection.java --- a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java Wed Aug 31 14:34:04 2016 -0700 +++ b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java Thu Sep 01 13:22:37 2016 -0700 @@ -46,7 +46,7 @@ import sun.net.httpclient.hpack.Encoder; import sun.net.httpclient.hpack.Decoder; import sun.net.httpclient.hpack.DecodingCallback; -import static sun.net.httpclient.frame.HeadersFrame.END_STREAM; + import static sun.net.httpclient.frame.SettingsFrame.*; import static sun.net.httpclient.common.Utils.BUFSIZE; @@ -93,14 +93,15 @@ int nextPushStream = 2; Encoder hpackOut; Decoder hpackIn; - SettingsFrame clientSettings, serverSettings; + SettingsFrame clientSettings; + volatile SettingsFrame serverSettings; ByteBufferConsumer bbc; final LinkedList freeList; final String key; // for HttpClientImpl.connections map FrameReader reader; // Connection level flow control windows - int sendWindow = INITIAL_WINDOW_SIZE; + final SoftSemaphore connectionSendWindow = new SoftSemaphore(INITIAL_WINDOW_SIZE); WindowUpdateSender windowUpdater; @@ -129,7 +130,7 @@ throws IOException, InterruptedException { this.outputQ = new Queue<>(); - String msg = "Connection send window size " + sendWindow; + String msg = "Connection send window size " + connectionSendWindow.available(); Log.logTrace(msg); //this.initialExchange = exchange; @@ -182,7 +183,7 @@ InetSocketAddress proxy = request.proxy(client); URI uri = request.uri(); InetSocketAddress addr = request.getAddress(client); - String msg = "Connection send window size " + Integer.toString(sendWindow); + String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available()); Log.logTrace(msg); this.key = keyFor(uri, proxy); this.connection = HttpConnection.getConnection(addr, client, request, this); @@ -201,29 +202,6 @@ sendConnectionPreface(); } - synchronized void obtainSendWindow(int amount) throws InterruptedException { - while (amount > 0) { - int n = Math.min(amount, sendWindow); - sendWindow -= n; - amount -= n; - if (amount > 0) { - wait(); - } - } - } - - synchronized void updateSendWindow(int amount) { - if (sendWindow == 0) { - sendWindow += amount; - notifyAll(); - } else { - sendWindow += amount; - } - } - - synchronized int sendWindow() { - return sendWindow; - } static String keyFor(HttpConnection connection) { boolean isProxy = connection.isProxied(); @@ -462,9 +440,7 @@ } } - void resetStream(int streamid, int code) - throws IOException, InterruptedException - { + void resetStream(int streamid, int code) { Log.logError( "Resetting stream {0,number,integer} with error code {1,number,integer}", streamid, code); @@ -475,15 +451,20 @@ streams.remove(streamid); } - private void handleWindowUpdate(WindowUpdateFrame f) - throws IOException, InterruptedException - { - updateSendWindow(f.getUpdate()); + private void handleWindowUpdate(WindowUpdateFrame f) { + int amount = f.getUpdate(); + if (amount <= 0) { + protocolError(ErrorFrame.PROTOCOL_ERROR); + } else { + try { + connectionSendWindow.update(amount); + } catch (SoftSemaphore.SoftSemaphoreOverflow softSemaphoreOverflow) { + protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow + } + } } - private void protocolError(int errorCode) - throws IOException, InterruptedException - { + private void protocolError(int errorCode) { GoAwayFrame frame = new GoAwayFrame(); frame.setErrorCode(errorCode); sendFrame(frame); @@ -494,13 +475,42 @@ private void handleSettings(SettingsFrame frame) throws IOException, InterruptedException { + assert frame.streamid() == 0; if (frame.getFlag(SettingsFrame.ACK)) { - // ignore ack frames for now. - return; + if (frame.length() != 0) + protocolError(GoAwayFrame.FRAME_SIZE_ERROR); + // TODO: any specific action needed for ACKs ?? + } else { + int oldWindowSize = serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE); + int newWindowSize = frame.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE); + int diff = newWindowSize - oldWindowSize; + if (diff != 0) { + synchronized (streams) { + // prevent new streams creation until adjstment is finished + // TODO - TOTHINK about more effective synchronisation scheme + // and proper way to switching from synchronizedMap to ConcurrentMap + ArrayList> toreset = new ArrayList<>(); + for(Stream stream :streams.values()) { + if(stream.streamid != 0 && (stream.streamid % 2) != 0) { + try { + stream.streamSendWindow.update(diff); + } catch (SoftSemaphore.SoftSemaphoreOverflow softSemaphoreOverflow) { + toreset.add(stream); + } + } + } + for(Stream stream : toreset) { + resetStream(stream.streamid, ResetFrame.FLOW_CONTROL_ERROR); + } + serverSettings = frame; // avoid race between exit of synchronization block and + // a moment when new created stream capture new serverSettings + } + } else { + serverSettings = frame; + } + SettingsFrame ack = getAckFrame(frame.streamid()); + sendFrame(ack); } - serverSettings = frame; - SettingsFrame ack = getAckFrame(frame.streamid()); - sendFrame(ack); } private void handlePing(PingFrame frame) @@ -733,7 +743,7 @@ nextstreamid += 2; // set outgoing window here. This allows thread sending // body to proceed. - stream.updateOutgoingWindow(getInitialSendWindowSize()); + stream.setInitialOutgoingWindow(getInitialSendWindowSize()); List frames = encodeHeaders(oh); // provide protection from inserting unordered frames between Headers and Continuation // that is a temporal solution until asycn connection queue will have diff -r add6b5c0ab15 src/java.httpclient/share/classes/java/net/http/Stream.java --- a/src/java.httpclient/share/classes/java/net/http/Stream.java Wed Aug 31 14:34:04 2016 -0700 +++ b/src/java.httpclient/share/classes/java/net/http/Stream.java Thu Sep 01 13:22:37 2016 -0700 @@ -127,10 +127,7 @@ // state flags boolean requestSent, responseReceived; - final FlowController requestFlowController = - new FlowController(); - final FlowController responseFlowController = - new FlowController(); + final SoftSemaphore streamSendWindow = new SoftSemaphore(); final WindowUpdateSender windowUpdater; @@ -326,8 +323,18 @@ void incoming_windowUpdate(WindowUpdateFrame frame) { int amount = frame.getUpdate(); - if (amount > 0) { - requestFlowController.accept(amount); + + if (amount <= 0) { + Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", + streamid, streamid, amount); + connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); + } else { + assert streamid != 0; + try { + streamSendWindow.update(amount); + } catch (SoftSemaphore.SoftSemaphoreOverflow softSemaphoreOverflow) { + connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); + } } } @@ -441,45 +448,6 @@ } } - /** - * A simple general purpose blocking flow controller - */ - class FlowController { - int permits; - - FlowController() { - this.permits = 0; - } - - public synchronized void accept(long n) { - if (n < 1) { - throw new InternalError("FlowController.accept called with " + n); - } - if (permits == 0) { - permits += n; - notifyAll(); - } else { - permits += n; - } - } - - public synchronized void take() throws InterruptedException { - take(1); - } - - public synchronized void take(int amount) throws InterruptedException { - assert permits >= 0; - while (amount > 0) { - int n = Math.min(amount, permits); - permits -= n; - amount -= n; - if (amount > 0) { - wait(); - } - } - } - } - /** Sets endStreamReceived. Should be called only once. */ void setEndStreamReceived() { assert endStreamReceived == false: "Unexpected endStream already set"; @@ -535,11 +503,11 @@ try { while (item.hasRemaining()) { - DataFrame df = getDataFrame(item, false); + DataFrame df = getDataFrame(item); connection.sendFrame(df); } subscription.request(1); - } catch (InterruptedException ex) { + } catch (InterruptedException | IOException ex) { subscription.cancel(); requestBodyCF.completeExceptionally(ex); } @@ -567,7 +535,7 @@ // to send one more empty data frame to // send END_STREAM to the server - indicating // that we're finished with this stream. - DataFrame df = getDataFrame(null, true); + DataFrame df = getEmptyEndStreamDataFrame(); connection.sendFrame(df); } requestBodyCF.complete(null); @@ -581,9 +549,9 @@ * Copys buf into a ByteBuffer[]. Only copy up to maxpayloadlen. * This will be the contents of one Data frame. */ - private ByteBuffer[] copyBuffer(ByteBuffer buf) { + private ByteBuffer[] copyBuffer(ByteBuffer buf, int amount) { LinkedList l = new LinkedList<>(); - int n = Math.min(maxpayloadLen, buf.remaining()); + int n = amount;; while (n > 0) { ByteBuffer b = Utils.getBuffer(); @@ -598,33 +566,38 @@ return l.toArray(Utils.EMPTY_BB_ARRAY); } - DataFrame getDataFrame(ByteBuffer buffer, boolean complete) - throws InterruptedException - { - ByteBuffer[] buffers; - if (buffer == null) { - buffers = Utils.EMPTY_BB_ARRAY; - } else { - buffers = copyBuffer(buffer); - if (!buffer.hasRemaining()) - requestProcessor.returnBuffer(buffer); + DataFrame getDataFrame(ByteBuffer buffer) + throws InterruptedException, IOException { + int requestAmount = Math.min(maxpayloadLen, buffer.remaining()); // TODO maxpayloadlen may be changed + int streamAmount = streamSendWindow.acquire(requestAmount); + int actualAmount = connection.connectionSendWindow.acquire(streamAmount); + try { + streamSendWindow.update(streamAmount - actualAmount); + } catch (SoftSemaphore.SoftSemaphoreOverflow softSemaphoreOverflow) { + // overflow here means that we got stream WindowUpdate frame which causes overflow + // between streamSendWindow.acquire and streamSendWindow.update invocations. + connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); + // TODO what to do next? return? } - int amount = Utils.remaining(buffers); - // wait for flow control if necessary. Following method will block - // until after headers frame is sent, so correct streamid is set. - requestFlowController.take(amount); - connection.obtainSendWindow(amount); - + ByteBuffer[] buffers = copyBuffer(buffer, actualAmount); + if (!buffer.hasRemaining()) + requestProcessor.returnBuffer(buffer); DataFrame df = new DataFrame(); df.streamid(streamid); - if (complete) { - df.setFlag(DataFrame.END_STREAM); - } df.setData(buffers); df.computeLength(); return df; } + private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { + DataFrame df = new DataFrame(); + df.streamid(streamid); + df.setFlag(DataFrame.END_STREAM); + df.setData(Utils.EMPTY_BB_ARRAY); + df.computeLength(); + return df; + } + /** * A List of responses relating to this stream. Normally there is only * one response, but intermediate responses like 100 are allowed @@ -740,17 +713,19 @@ Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); } inputQ.close(); + streamSendWindow.close(); completeResponseExceptionally(e); - try { - connection.resetStream(streamid, ResetFrame.CANCEL); - } catch (IOException | InterruptedException ex) { - Log.logError(ex); - } + connection.resetStream(streamid, ResetFrame.CANCEL); } // called from Http2Connection reader thread - synchronized void updateOutgoingWindow(int update) { - requestFlowController.accept(update); + void setInitialOutgoingWindow(int update) { + try { + streamSendWindow.update(update); + } catch (SoftSemaphore.SoftSemaphoreOverflow softSemaphoreOverflow) { + // shouldn't happen + throw new InternalError("Illegal SoftSemaphore overflow"); + } } void close(String msg) { diff -r add6b5c0ab15 src/java.httpclient/share/classes/sun/net/httpclient/common/SoftSemaphore.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.httpclient/share/classes/sun/net/httpclient/common/SoftSemaphore.java Thu Sep 01 13:22:37 2016 -0700 @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + */ +package sun.net.httpclient.common; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Connection or stream blocking flow control window. + */ +public class SoftSemaphore { + + public class SoftSemaphoreOverflow extends Exception { + private static final long serialVersionUID = 1L; + + /* TODO + RFC-7540 says: + A sender MUST NOT allow a flow-control window to exceed 2^31-1 + octets. If a sender receives a WINDOW_UPDATE that causes a flow- + control window to exceed this maximum, it MUST terminate either the + stream or the connection, as appropriate. For streams, the sender + sends a RST_STREAM with an error code of FLOW_CONTROL_ERROR; for the + connection, a GOAWAY frame with an error code of FLOW_CONTROL_ERROR + is sent. + */ + + public SoftSemaphoreOverflow() { + super(); + } + + public SoftSemaphoreOverflow(String message) { + super(message); + } + + } + + private final AtomicInteger permits; + private boolean closed = false; + + public SoftSemaphore() { + this(0); + } + + public SoftSemaphore(int initialPermits) { + this.permits = new AtomicInteger(initialPermits); + } + + public int available() { + return permits.get(); + } + + /** + * increase or decrease amount of available pernmits + * @param releases + * @throws SoftSemaphoreOverflow + */ + public void update(int releases) throws SoftSemaphoreOverflow { + if (releases > 0) { + if (!tryRelease(releases)) { + synchronized (this) { + if (permits.addAndGet(releases) > 0) { + notifyAll(); + } + } + } + } else if (releases < 0) { + permits.getAndAdd(releases); + } + } + + /** + * Acquire permits. + * if requested amoint is greater than available permits - acquire available permits. + * The method block iff amount of available permits less than or equals to zero. + * @param acquires + * @return amount of obtained permits + * @throws InterruptedException + */ + public int acquire(int acquires) throws InterruptedException, IOException { + if (acquires > 0) { + for (;;) { + int acq; + if ((acq = tryAcquire(acquires)) > 0) { + return acq; + } + synchronized (this) { + if ((acq = tryAcquire(acquires)) > 0) { + return acq; + } + wait(); + if(closed) { + throw new IOException("stream closed"); + } + } + } + + } + return acquires; + } + + public void close() { + synchronized (this) { + closed = true; + notifyAll(); + } + } + + private boolean tryRelease(int releases) throws SoftSemaphoreOverflow { + for (;;) { + int current = permits.get(); + if(current <= 0) // should awake waiters + return false; + int next = current + releases; + if (next < current) // overflow + throw new SoftSemaphoreOverflow("Maximum permit count exceeded"); + if (permits.compareAndSet(current, next)) + return true; + } + } + + private int tryAcquire(int acquires) { + for (;;) { + int available = permits.get(); + if (available <= 0) { + return 0; + } + if(available <= acquires) { + if (permits.compareAndSet(available, 0)) + return available; + } else { + int remaining = available - acquires; + if (permits.compareAndSet(available, remaining)) + return acquires; + } + } + } + +}