1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.util.Map; 29 import java.util.HashMap; 30 import java.util.concurrent.locks.Condition; 31 import java.util.concurrent.locks.ReentrantLock; 32 33 /** 34 * A Simple blocking Send Window Flow-Controller that is used to control 35 * outgoing Connection and Stream flows, per HTTP/2 connection. 36 * 37 * A Http2Connection has its own unique single instance of a WindowController 38 * that it shares with its Streams. Each stream must acquire the appropriate 39 * amount of Send Window from the controller before sending data. 40 * 41 * WINDOW_UPDATE frames, both connection and stream specific, must notify the 42 * controller of their increments. SETTINGS frame's INITIAL_WINDOW_SIZE must 43 * notify the controller so that it can adjust the active stream's window size. 44 */ 45 final class WindowController { 46 47 /** 48 * Default initial connection Flow-Control Send Window size, as per HTTP/2. 49 */ 50 private static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024 - 1; 51 52 /** The connection Send Window size. */ 53 private int connectionWindowSize; 54 /** A Map of the active streams, where the key is the stream id, and the 55 * value is the stream's Send Window size, which may be negative. */ 56 private final Map<Integer,Integer> streams = new HashMap<>(); 57 58 private final ReentrantLock controllerLock = new ReentrantLock(); 59 60 private final Condition notExhausted = controllerLock.newCondition(); 61 62 /** A Controller with the default initial window size. */ 63 WindowController() { 64 connectionWindowSize = DEFAULT_INITIAL_WINDOW_SIZE; 65 } 66 67 /** A Controller with the given initial window size. */ 68 WindowController(int initialConnectionWindowSize) { 69 connectionWindowSize = initialConnectionWindowSize; 70 } 71 72 /** Registers the given stream with this controller. */ 73 void registerStream(int streamid, int initialStreamWindowSize) { 74 controllerLock.lock(); 75 try { 76 Integer old = streams.put(streamid, initialStreamWindowSize); 77 if (old != null) 78 throw new InternalError("Unexpected entry [" + old + "] for streamid: " + streamid); 79 } finally { 80 controllerLock.unlock(); 81 } 82 } 83 84 /** Removes/De-registers the given stream with this controller. */ 85 void removeStream(int streamid) { 86 controllerLock.lock(); 87 try { 88 Integer old = streams.remove(streamid); 89 // Odd stream numbers (client streams) should have been registered. 90 // Even stream numbers (server streams - aka Push Streams) should 91 // not be registered 92 final boolean isClientStream = (streamid % 2) == 1; 93 if (old == null && isClientStream) { 94 throw new InternalError("Expected entry for streamid: " + streamid); 95 } else if (old != null && !isClientStream) { 96 throw new InternalError("Unexpected entry for streamid: " + streamid); 97 } 98 } finally { 99 controllerLock.unlock(); 100 } 101 } 102 103 /** 104 * Attempts to acquire the requested amount of Send Window for the given 105 * stream. 106 * 107 * The actual amount of Send Window available may differ from the requested 108 * amount. The actual amount, returned by this method, is the minimum of, 109 * 1) the requested amount, 2) the stream's Send Window, and 3) the 110 * connection's Send Window. 111 * 112 * This method ( currently ) blocks until some positive amount of Send 113 * Window is available. 114 */ 115 int tryAcquire(int requestAmount, int streamid) throws InterruptedException { 116 controllerLock.lock(); 117 try { 118 int x = 0; 119 Integer streamSize = 0; 120 while (x <= 0) { 121 streamSize = streams.get(streamid); 122 if (streamSize == null) 123 throw new InternalError("Expected entry for streamid: " + streamid); 124 x = Math.min(requestAmount, 125 Math.min(streamSize, connectionWindowSize)); 126 127 if (x <= 0) // stream window size may be negative 128 notExhausted.await(); 129 } 130 131 streamSize -= x; 132 streams.put(streamid, streamSize); 133 connectionWindowSize -= x; 134 return x; 135 } finally { 136 controllerLock.unlock(); 137 } 138 } 139 140 /** 141 * Increases the Send Window size for the connection. 142 * 143 * @return false if, and only if, the addition of the given amount would 144 * cause the Send Window to exceed 2^31-1 (overflow), otherwise true 145 */ 146 boolean increaseConnectionWindow(int amount) { 147 controllerLock.lock(); 148 try { 149 int size = connectionWindowSize; 150 size += amount; 151 if (size < 0) 152 return false; 153 connectionWindowSize = size; 154 notExhausted.signalAll(); 155 } finally { 156 controllerLock.unlock(); 157 } 158 return true; 159 } 160 161 /** 162 * Increases the Send Window size for the given stream. 163 * 164 * @return false if, and only if, the addition of the given amount would 165 * cause the Send Window to exceed 2^31-1 (overflow), otherwise true 166 */ 167 boolean increaseStreamWindow(int amount, int streamid) { 168 controllerLock.lock(); 169 try { 170 Integer size = streams.get(streamid); 171 if (size == null) 172 throw new InternalError("Expected entry for streamid: " + streamid); 173 size += amount; 174 if (size < 0) 175 return false; 176 streams.put(streamid, size); 177 notExhausted.signalAll(); 178 } finally { 179 controllerLock.unlock(); 180 } 181 return true; 182 } 183 184 /** 185 * Adjusts, either increases or decreases, the active streams registered 186 * with this controller. May result in a stream's Send Window size becoming 187 * negative. 188 */ 189 void adjustActiveStreams(int adjustAmount) { 190 assert adjustAmount != 0; 191 192 controllerLock.lock(); 193 try { 194 for (Map.Entry<Integer,Integer> entry : streams.entrySet()) { 195 int streamid = entry.getKey(); 196 // the API only supports sending on Streams initialed by 197 // the client, i.e. odd stream numbers 198 if (streamid != 0 && (streamid % 2) != 0) { 199 Integer size = entry.getValue(); 200 size += adjustAmount; 201 streams.put(streamid, size); 202 } 203 } 204 } finally { 205 controllerLock.unlock(); 206 } 207 } 208 209 /** Returns the Send Window size for the connection. */ 210 int connectionWindowSize() { 211 controllerLock.lock(); 212 try { 213 return connectionWindowSize; 214 } finally { 215 controllerLock.unlock(); 216 } 217 } 218 219 /** Returns the Send Window size for the given stream. */ 220 int streamWindowSize(int streamid) { 221 controllerLock.lock(); 222 try { 223 Integer size = streams.get(streamid); 224 if (size == null) 225 throw new InternalError("Expected entry for streamid: " + streamid); 226 return size; 227 } finally { 228 controllerLock.unlock();; 229 } 230 } 231 } | 1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.lang.System.Logger.Level; 29 import java.util.ArrayList; 30 import java.util.Map; 31 import java.util.HashMap; 32 import java.util.Iterator; 33 import java.util.LinkedHashMap; 34 import java.util.List; 35 import java.util.concurrent.locks.ReentrantLock; 36 import jdk.incubator.http.internal.common.Utils; 37 38 /** 39 * A Send Window Flow-Controller that is used to control outgoing Connection 40 * and Stream flows, per HTTP/2 connection. 41 * 42 * A Http2Connection has its own unique single instance of a WindowController 43 * that it shares with its Streams. Each stream must acquire the appropriate 44 * amount of Send Window from the controller before sending data. 45 * 46 * WINDOW_UPDATE frames, both connection and stream specific, must notify the 47 * controller of their increments. SETTINGS frame's INITIAL_WINDOW_SIZE must 48 * notify the controller so that it can adjust the active stream's window size. 49 */ 50 final class WindowController { 51 52 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag 53 static final System.Logger DEBUG_LOGGER = 54 Utils.getDebugLogger("WindowController"::toString, DEBUG); 55 56 /** 57 * Default initial connection Flow-Control Send Window size, as per HTTP/2. 58 */ 59 private static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024 - 1; 60 61 /** The connection Send Window size. */ 62 private int connectionWindowSize; 63 /** A Map of the active streams, where the key is the stream id, and the 64 * value is the stream's Send Window size, which may be negative. */ 65 private final Map<Integer,Integer> streams = new HashMap<>(); 66 /** A Map of streams awaiting Send Window. The key is the stream id. The 67 * value is a pair of the Stream ( representing the key's stream id ) and 68 * the requested amount of send Window. */ 69 private final Map<Integer, Map.Entry<Stream<?>, Integer>> pending 70 = new LinkedHashMap<>(); 71 72 private final ReentrantLock controllerLock = new ReentrantLock(); 73 74 /** A Controller with the default initial window size. */ 75 WindowController() { 76 connectionWindowSize = DEFAULT_INITIAL_WINDOW_SIZE; 77 } 78 79 // /** A Controller with the given initial window size. */ 80 // WindowController(int initialConnectionWindowSize) { 81 // connectionWindowSize = initialConnectionWindowSize; 82 // } 83 84 /** Registers the given stream with this controller. */ 85 void registerStream(int streamid, int initialStreamWindowSize) { 86 controllerLock.lock(); 87 try { 88 Integer old = streams.put(streamid, initialStreamWindowSize); 89 if (old != null) 90 throw new InternalError("Unexpected entry [" 91 + old + "] for streamid: " + streamid); 92 } finally { 93 controllerLock.unlock(); 94 } 95 } 96 97 /** Removes/De-registers the given stream with this controller. */ 98 void removeStream(int streamid) { 99 controllerLock.lock(); 100 try { 101 Integer old = streams.remove(streamid); 102 // Odd stream numbers (client streams) should have been registered. 103 // Even stream numbers (server streams - aka Push Streams) should 104 // not be registered 105 final boolean isClientStream = (streamid % 2) == 1; 106 if (old == null && isClientStream) { 107 throw new InternalError("Expected entry for streamid: " + streamid); 108 } else if (old != null && !isClientStream) { 109 throw new InternalError("Unexpected entry for streamid: " + streamid); 110 } 111 } finally { 112 controllerLock.unlock(); 113 } 114 } 115 116 /** 117 * Attempts to acquire the requested amount of Send Window for the given 118 * stream. 119 * 120 * The actual amount of Send Window available may differ from the requested 121 * amount. The actual amount, returned by this method, is the minimum of, 122 * 1) the requested amount, 2) the stream's Send Window, and 3) the 123 * connection's Send Window. 124 * 125 * A negative or zero value is returned if there's no window available. 126 * When the result is negative or zero, this method arranges for the 127 * given stream's {@link Stream#signalWindowUpdate()} method to be invoke at 128 * a later time when the connection and/or stream window's have been 129 * increased. The {@code tryAcquire} method should then be invoked again to 130 * attempt to acquire the available window. 131 */ 132 int tryAcquire(int requestAmount, int streamid, Stream<?> stream) { 133 controllerLock.lock(); 134 try { 135 Integer streamSize = streams.get(streamid); 136 if (streamSize == null) 137 throw new InternalError("Expected entry for streamid: " 138 + streamid); 139 int x = Math.min(requestAmount, 140 Math.min(streamSize, connectionWindowSize)); 141 142 if (x <= 0) { // stream window size may be negative 143 DEBUG_LOGGER.log(Level.DEBUG, 144 "Stream %d requesting %d but only %d available (stream: %d, connection: %d)", 145 streamid, requestAmount, Math.min(streamSize, connectionWindowSize), 146 streamSize, connectionWindowSize); 147 // If there's not enough window size available, put the 148 // caller in a pending list. 149 pending.put(streamid, Map.entry(stream, requestAmount)); 150 return x; 151 } 152 153 // Remove the caller from the pending list ( if was waiting ). 154 pending.remove(streamid); 155 156 // Update window sizes and return the allocated amount to the caller. 157 streamSize -= x; 158 streams.put(streamid, streamSize); 159 connectionWindowSize -= x; 160 DEBUG_LOGGER.log(Level.DEBUG, 161 "Stream %d amount allocated %d, now %d available (stream: %d, connection: %d)", 162 streamid, x, Math.min(streamSize, connectionWindowSize), 163 streamSize, connectionWindowSize); 164 return x; 165 } finally { 166 controllerLock.unlock(); 167 } 168 } 169 170 /** 171 * Increases the Send Window size for the connection. 172 * 173 * A number of awaiting requesters, from unfulfilled tryAcquire requests, 174 * may have their stream's {@link Stream#signalWindowUpdate()} method 175 * scheduled to run ( i.e. awake awaiters ). 176 * 177 * @return false if, and only if, the addition of the given amount would 178 * cause the Send Window to exceed 2^31-1 (overflow), otherwise true 179 */ 180 boolean increaseConnectionWindow(int amount) { 181 List<Stream<?>> candidates = null; 182 controllerLock.lock(); 183 try { 184 int size = connectionWindowSize; 185 size += amount; 186 if (size < 0) 187 return false; 188 connectionWindowSize = size; 189 DEBUG_LOGGER.log(Level.DEBUG, "Connection window size is now %d", size); 190 191 // Notify waiting streams, until the new increased window size is 192 // effectively exhausted. 193 Iterator<Map.Entry<Integer,Map.Entry<Stream<?>,Integer>>> iter = 194 pending.entrySet().iterator(); 195 196 while (iter.hasNext() && size > 0) { 197 Map.Entry<Integer,Map.Entry<Stream<?>,Integer>> item = iter.next(); 198 Integer streamSize = streams.get(item.getKey()); 199 if (streamSize == null) { 200 iter.remove(); 201 } else { 202 Map.Entry<Stream<?>,Integer> e = item.getValue(); 203 int requestedAmount = e.getValue(); 204 // only wakes up the pending streams for which there is 205 // at least 1 byte of space in both windows 206 int minAmount = 1; 207 if (size >= minAmount && streamSize >= minAmount) { 208 size -= Math.min(streamSize, requestedAmount); 209 iter.remove(); 210 if (candidates == null) 211 candidates = new ArrayList<>(); 212 candidates.add(e.getKey()); 213 } 214 } 215 } 216 } finally { 217 controllerLock.unlock(); 218 } 219 if (candidates != null) { 220 candidates.forEach(Stream::signalWindowUpdate); 221 } 222 return true; 223 } 224 225 /** 226 * Increases the Send Window size for the given stream. 227 * 228 * If the given stream is awaiting window size, from an unfulfilled 229 * tryAcquire request, it will have its stream's {@link 230 * Stream#signalWindowUpdate()} method scheduled to run ( i.e. awoken ). 231 * 232 * @return false if, and only if, the addition of the given amount would 233 * cause the Send Window to exceed 2^31-1 (overflow), otherwise true 234 */ 235 boolean increaseStreamWindow(int amount, int streamid) { 236 Stream<?> s = null; 237 controllerLock.lock(); 238 try { 239 Integer size = streams.get(streamid); 240 if (size == null) 241 throw new InternalError("Expected entry for streamid: " + streamid); 242 size += amount; 243 if (size < 0) 244 return false; 245 streams.put(streamid, size); 246 DEBUG_LOGGER.log(Level.DEBUG, 247 "Stream %s window size is now %s", streamid, size); 248 249 Map.Entry<Stream<?>,Integer> p = pending.get(streamid); 250 if (p != null) { 251 int minAmount = 1; 252 // only wakes up the pending stream if there is at least 253 // 1 byte of space in both windows 254 if (size >= minAmount 255 && connectionWindowSize >= minAmount) { 256 pending.remove(streamid); 257 s = p.getKey(); 258 } 259 } 260 } finally { 261 controllerLock.unlock(); 262 } 263 264 if (s != null) 265 s.signalWindowUpdate(); 266 267 return true; 268 } 269 270 /** 271 * Adjusts, either increases or decreases, the active streams registered 272 * with this controller. May result in a stream's Send Window size becoming 273 * negative. 274 */ 275 void adjustActiveStreams(int adjustAmount) { 276 assert adjustAmount != 0; 277 278 controllerLock.lock(); 279 try { 280 for (Map.Entry<Integer,Integer> entry : streams.entrySet()) { 281 int streamid = entry.getKey(); 282 // the API only supports sending on Streams initialed by 283 // the client, i.e. odd stream numbers 284 if (streamid != 0 && (streamid % 2) != 0) { 285 Integer size = entry.getValue(); 286 size += adjustAmount; 287 streams.put(streamid, size); 288 DEBUG_LOGGER.log(Level.DEBUG, 289 "Stream %s window size is now %s", streamid, size); 290 } 291 } 292 } finally { 293 controllerLock.unlock(); 294 } 295 } 296 297 /** Returns the Send Window size for the connection. */ 298 int connectionWindowSize() { 299 controllerLock.lock(); 300 try { 301 return connectionWindowSize; 302 } finally { 303 controllerLock.unlock(); 304 } 305 } 306 307 // /** Returns the Send Window size for the given stream. */ 308 // int streamWindowSize(int streamid) { 309 // controllerLock.lock(); 310 // try { 311 // Integer size = streams.get(streamid); 312 // if (size == null) 313 // throw new InternalError("Expected entry for streamid: " + streamid); 314 // return size; 315 // } finally { 316 // controllerLock.unlock(); 317 // } 318 // } 319 320 } |