< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/WindowController.java

Print this page


   1 /*
   2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 


  28 import java.util.Map;
  29 import java.util.HashMap;
  30 import java.util.concurrent.locks.Condition;


  31 import java.util.concurrent.locks.ReentrantLock;

  32 
  33 /**
  34  * A Simple blocking Send Window Flow-Controller that is used to control
  35  * outgoing Connection and Stream flows, per HTTP/2 connection.
  36  *
  37  * A Http2Connection has its own unique single instance of a WindowController
  38  * that it shares with its Streams. Each stream must acquire the appropriate
  39  * amount of Send Window from the controller before sending data.
  40  *
  41  * WINDOW_UPDATE frames, both connection and stream specific, must notify the
  42  * controller of their increments. SETTINGS frame's INITIAL_WINDOW_SIZE must
  43  * notify the controller so that it can adjust the active stream's window size.
  44  */
  45 final class WindowController {
  46 




  47     /**
  48      * Default initial connection Flow-Control Send Window size, as per HTTP/2.
  49      */
  50     private static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
  51 
  52     /** The connection Send Window size. */
  53     private int connectionWindowSize;
  54     /** A Map of the active streams, where the key is the stream id, and the
  55      *  value is the stream's Send Window size, which may be negative. */
  56     private final Map<Integer,Integer> streams = new HashMap<>();





  57 
  58     private final ReentrantLock controllerLock = new ReentrantLock();
  59 
  60     private final Condition notExhausted = controllerLock.newCondition();
  61 
  62     /** A Controller with the default initial window size. */
  63     WindowController() {
  64         connectionWindowSize = DEFAULT_INITIAL_WINDOW_SIZE;
  65     }
  66 
  67     /** A Controller with the given initial window size. */
  68     WindowController(int initialConnectionWindowSize) {
  69         connectionWindowSize = initialConnectionWindowSize;
  70     }
  71 
  72     /** Registers the given stream with this controller. */
  73     void registerStream(int streamid, int initialStreamWindowSize) {
  74         controllerLock.lock();
  75         try {
  76             Integer old = streams.put(streamid, initialStreamWindowSize);
  77             if (old != null)
  78                 throw new InternalError("Unexpected entry [" + old + "] for streamid: " + streamid);

  79         } finally {
  80             controllerLock.unlock();
  81         }
  82     }
  83 
  84     /** Removes/De-registers the given stream with this controller. */
  85     void removeStream(int streamid) {
  86         controllerLock.lock();
  87         try {
  88             Integer old = streams.remove(streamid);
  89             // Odd stream numbers (client streams) should have been registered.
  90             // Even stream numbers (server streams - aka Push Streams) should
  91             // not be registered
  92             final boolean isClientStream = (streamid % 2) == 1;
  93             if (old == null && isClientStream) {
  94                 throw new InternalError("Expected entry for streamid: " + streamid);
  95             } else if (old != null && !isClientStream) {
  96                 throw new InternalError("Unexpected entry for streamid: " + streamid);
  97             }
  98         } finally {
  99             controllerLock.unlock();
 100         }
 101     }
 102 
 103     /**
 104      * Attempts to acquire the requested amount of Send Window for the given
 105      * stream.
 106      *
 107      * The actual amount of Send Window available may differ from the requested
 108      * amount. The actual amount, returned by this method, is the minimum of,
 109      * 1) the requested amount, 2) the stream's Send Window, and 3) the
 110      * connection's Send Window.
 111      *
 112      * This method ( currently ) blocks until some positive amount of Send
 113      * Window is available.




 114      */
 115     int tryAcquire(int requestAmount, int streamid) throws InterruptedException {
 116         controllerLock.lock();
 117         try {
 118             int x = 0;
 119             Integer streamSize = 0;
 120             while (x <= 0) {
 121                 streamSize = streams.get(streamid);
 122                 if (streamSize == null)
 123                     throw new InternalError("Expected entry for streamid: " + streamid);
 124                 x = Math.min(requestAmount,

 125                              Math.min(streamSize, connectionWindowSize));
 126 
 127                 if (x <= 0)  // stream window size may be negative
 128                     notExhausted.await();







 129             }
 130 




 131             streamSize -= x;
 132             streams.put(streamid, streamSize);
 133             connectionWindowSize -= x;




 134             return x;
 135         } finally {
 136             controllerLock.unlock();
 137         }
 138     }
 139 
 140     /**
 141      * Increases the Send Window size for the connection.
 142      *




 143      * @return false if, and only if, the addition of the given amount would
 144      *         cause the Send Window to exceed 2^31-1 (overflow), otherwise true
 145      */
 146     boolean increaseConnectionWindow(int amount) {

 147         controllerLock.lock();
 148         try {
 149             int size = connectionWindowSize;
 150             size += amount;
 151             if (size < 0)
 152                 return false;
 153             connectionWindowSize = size;
 154             notExhausted.signalAll();


























 155         } finally {
 156             controllerLock.unlock();
 157         }



 158         return true;
 159     }
 160 
 161     /**
 162      * Increases the Send Window size for the given stream.
 163      *




 164      * @return false if, and only if, the addition of the given amount would
 165      *         cause the Send Window to exceed 2^31-1 (overflow), otherwise true
 166      */
 167     boolean increaseStreamWindow(int amount, int streamid) {

 168         controllerLock.lock();
 169         try {
 170             Integer size = streams.get(streamid);
 171             if (size == null)
 172                 throw new InternalError("Expected entry for streamid: " + streamid);
 173             size += amount;
 174             if (size < 0)
 175                 return false;
 176             streams.put(streamid, size);
 177             notExhausted.signalAll();













 178         } finally {
 179             controllerLock.unlock();
 180         }




 181         return true;
 182     }
 183 
 184     /**
 185      * Adjusts, either increases or decreases, the active streams registered
 186      * with this controller.  May result in a stream's Send Window size becoming
 187      * negative.
 188      */
 189     void adjustActiveStreams(int adjustAmount) {
 190         assert adjustAmount != 0;
 191 
 192         controllerLock.lock();
 193         try {
 194             for (Map.Entry<Integer,Integer> entry : streams.entrySet()) {
 195                 int streamid = entry.getKey();
 196                 // the API only supports sending on Streams initialed by
 197                 // the client, i.e. odd stream numbers
 198                 if (streamid != 0 && (streamid % 2) != 0) {
 199                     Integer size = entry.getValue();
 200                     size += adjustAmount;
 201                     streams.put(streamid, size);


 202                 }
 203             }
 204         } finally {
 205             controllerLock.unlock();
 206         }
 207     }
 208 
 209     /** Returns the Send Window size for the connection. */
 210     int connectionWindowSize() {
 211         controllerLock.lock();
 212         try {
 213             return connectionWindowSize;
 214         } finally {
 215             controllerLock.unlock();
 216         }
 217     }
 218 
 219     /** Returns the Send Window size for the given stream. */
 220     int streamWindowSize(int streamid) {
 221         controllerLock.lock();
 222         try {
 223             Integer size = streams.get(streamid);
 224             if (size == null)
 225                 throw new InternalError("Expected entry for streamid: " + streamid);
 226             return size;
 227         } finally {
 228             controllerLock.unlock();;
 229         }
 230     }

 231 }
   1 /*
   2  * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.lang.System.Logger.Level;
  29 import java.util.ArrayList;
  30 import java.util.Map;
  31 import java.util.HashMap;
  32 import java.util.Iterator;
  33 import java.util.LinkedHashMap;
  34 import java.util.List;
  35 import java.util.concurrent.locks.ReentrantLock;
  36 import jdk.incubator.http.internal.common.Utils;
  37 
  38 /**
  39  * A Send Window Flow-Controller that is used to control outgoing Connection
  40  * and Stream flows, per HTTP/2 connection.
  41  *
  42  * A Http2Connection has its own unique single instance of a WindowController
  43  * that it shares with its Streams. Each stream must acquire the appropriate
  44  * amount of Send Window from the controller before sending data.
  45  *
  46  * WINDOW_UPDATE frames, both connection and stream specific, must notify the
  47  * controller of their increments. SETTINGS frame's INITIAL_WINDOW_SIZE must
  48  * notify the controller so that it can adjust the active stream's window size.
  49  */
  50 final class WindowController {
  51 
  52     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
  53     static final System.Logger DEBUG_LOGGER =
  54             Utils.getDebugLogger("WindowController"::toString, DEBUG);
  55 
  56     /**
  57      * Default initial connection Flow-Control Send Window size, as per HTTP/2.
  58      */
  59     private static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
  60 
  61     /** The connection Send Window size. */
  62     private int connectionWindowSize;
  63     /** A Map of the active streams, where the key is the stream id, and the
  64      *  value is the stream's Send Window size, which may be negative. */
  65     private final Map<Integer,Integer> streams = new HashMap<>();
  66     /** A Map of streams awaiting Send Window. The key is the stream id. The
  67      * value is a pair of the Stream ( representing the key's stream id ) and
  68      * the requested amount of send Window. */
  69     private final Map<Integer, Map.Entry<Stream<?>, Integer>> pending
  70             = new LinkedHashMap<>();
  71 
  72     private final ReentrantLock controllerLock = new ReentrantLock();
  73 


  74     /** A Controller with the default initial window size. */
  75     WindowController() {
  76         connectionWindowSize = DEFAULT_INITIAL_WINDOW_SIZE;
  77     }
  78 
  79 //    /** A Controller with the given initial window size. */
  80 //    WindowController(int initialConnectionWindowSize) {
  81 //        connectionWindowSize = initialConnectionWindowSize;
  82 //    }
  83 
  84     /** Registers the given stream with this controller. */
  85     void registerStream(int streamid, int initialStreamWindowSize) {
  86         controllerLock.lock();
  87         try {
  88             Integer old = streams.put(streamid, initialStreamWindowSize);
  89             if (old != null)
  90                 throw new InternalError("Unexpected entry ["
  91                         + old + "] for streamid: " + streamid);
  92         } finally {
  93             controllerLock.unlock();
  94         }
  95     }
  96 
  97     /** Removes/De-registers the given stream with this controller. */
  98     void removeStream(int streamid) {
  99         controllerLock.lock();
 100         try {
 101             Integer old = streams.remove(streamid);
 102             // Odd stream numbers (client streams) should have been registered.
 103             // Even stream numbers (server streams - aka Push Streams) should
 104             // not be registered
 105             final boolean isClientStream = (streamid % 2) == 1;
 106             if (old == null && isClientStream) {
 107                 throw new InternalError("Expected entry for streamid: " + streamid);
 108             } else if (old != null && !isClientStream) {
 109                 throw new InternalError("Unexpected entry for streamid: " + streamid);
 110             }
 111         } finally {
 112             controllerLock.unlock();
 113         }
 114     }
 115 
 116     /**
 117      * Attempts to acquire the requested amount of Send Window for the given
 118      * stream.
 119      *
 120      * The actual amount of Send Window available may differ from the requested
 121      * amount. The actual amount, returned by this method, is the minimum of,
 122      * 1) the requested amount, 2) the stream's Send Window, and 3) the
 123      * connection's Send Window.
 124      *
 125      * A negative or zero value is returned if there's no window available.
 126      * When the result is negative or zero, this method arranges for the
 127      * given stream's {@link Stream#signalWindowUpdate()} method to be invoke at
 128      * a later time when the connection and/or stream window's have been
 129      * increased. The {@code tryAcquire} method should then be invoked again to
 130      * attempt to acquire the available window.
 131      */
 132     int tryAcquire(int requestAmount, int streamid, Stream<?> stream) {
 133         controllerLock.lock();
 134         try {
 135             Integer streamSize = streams.get(streamid);



 136             if (streamSize == null)
 137                 throw new InternalError("Expected entry for streamid: "
 138                                         + streamid);
 139             int x = Math.min(requestAmount,
 140                              Math.min(streamSize, connectionWindowSize));
 141 
 142             if (x <= 0)  { // stream window size may be negative
 143                 DEBUG_LOGGER.log(Level.DEBUG,
 144                         "Stream %d requesting %d but only %d available (stream: %d, connection: %d)",
 145                       streamid, requestAmount, Math.min(streamSize, connectionWindowSize),
 146                       streamSize, connectionWindowSize);
 147                 // If there's not enough window size available, put the
 148                 // caller in a pending list.
 149                 pending.put(streamid, Map.entry(stream, requestAmount));
 150                 return x;
 151             }
 152 
 153             // Remove the caller from the pending list ( if was waiting ).
 154             pending.remove(streamid);
 155 
 156             // Update window sizes and return the allocated amount to the caller.
 157             streamSize -= x;
 158             streams.put(streamid, streamSize);
 159             connectionWindowSize -= x;
 160             DEBUG_LOGGER.log(Level.DEBUG,
 161                   "Stream %d amount allocated %d, now %d available (stream: %d, connection: %d)",
 162                   streamid, x, Math.min(streamSize, connectionWindowSize),
 163                   streamSize, connectionWindowSize);
 164             return x;
 165         } finally {
 166             controllerLock.unlock();
 167         }
 168     }
 169 
 170     /**
 171      * Increases the Send Window size for the connection.
 172      *
 173      * A number of awaiting requesters, from unfulfilled tryAcquire requests,
 174      * may have their stream's {@link Stream#signalWindowUpdate()} method
 175      * scheduled to run ( i.e. awake awaiters ).
 176      *
 177      * @return false if, and only if, the addition of the given amount would
 178      *         cause the Send Window to exceed 2^31-1 (overflow), otherwise true
 179      */
 180     boolean increaseConnectionWindow(int amount) {
 181         List<Stream<?>> candidates = null;
 182         controllerLock.lock();
 183         try {
 184             int size = connectionWindowSize;
 185             size += amount;
 186             if (size < 0)
 187                 return false;
 188             connectionWindowSize = size;
 189             DEBUG_LOGGER.log(Level.DEBUG, "Connection window size is now %d", size);
 190 
 191             // Notify waiting streams, until the new increased window size is
 192             // effectively exhausted.
 193             Iterator<Map.Entry<Integer,Map.Entry<Stream<?>,Integer>>> iter =
 194                     pending.entrySet().iterator();
 195 
 196             while (iter.hasNext() && size > 0) {
 197                 Map.Entry<Integer,Map.Entry<Stream<?>,Integer>> item = iter.next();
 198                 Integer streamSize = streams.get(item.getKey());
 199                 if (streamSize == null) {
 200                     iter.remove();
 201                 } else {
 202                     Map.Entry<Stream<?>,Integer> e = item.getValue();
 203                     int requestedAmount = e.getValue();
 204                     // only wakes up the pending streams for which there is
 205                     // at least 1 byte of space in both windows
 206                     int minAmount = 1;
 207                     if (size >= minAmount && streamSize >= minAmount) {
 208                         size -= Math.min(streamSize, requestedAmount);
 209                         iter.remove();
 210                         if (candidates == null)
 211                             candidates = new ArrayList<>();
 212                         candidates.add(e.getKey());
 213                     }
 214                 }
 215             }
 216         } finally {
 217             controllerLock.unlock();
 218         }
 219         if (candidates != null) {
 220             candidates.forEach(Stream::signalWindowUpdate);
 221         }
 222         return true;
 223     }
 224 
 225     /**
 226      * Increases the Send Window size for the given stream.
 227      *
 228      * If the given stream is awaiting window size, from an unfulfilled
 229      * tryAcquire request, it will have its stream's {@link
 230      * Stream#signalWindowUpdate()} method scheduled to run ( i.e. awoken ).
 231      *
 232      * @return false if, and only if, the addition of the given amount would
 233      *         cause the Send Window to exceed 2^31-1 (overflow), otherwise true
 234      */
 235     boolean increaseStreamWindow(int amount, int streamid) {
 236         Stream<?> s = null;
 237         controllerLock.lock();
 238         try {
 239             Integer size = streams.get(streamid);
 240             if (size == null)
 241                 throw new InternalError("Expected entry for streamid: " + streamid);
 242             size += amount;
 243             if (size < 0)
 244                 return false;
 245             streams.put(streamid, size);
 246             DEBUG_LOGGER.log(Level.DEBUG,
 247                              "Stream %s window size is now %s", streamid, size);
 248 
 249             Map.Entry<Stream<?>,Integer> p = pending.get(streamid);
 250             if (p != null) {
 251                 int minAmount = 1;
 252                 // only wakes up the pending stream if there is at least
 253                 // 1 byte of space in both windows
 254                 if (size >= minAmount
 255                         && connectionWindowSize >= minAmount) {
 256                      pending.remove(streamid);
 257                      s = p.getKey();
 258                 }
 259             }
 260         } finally {
 261             controllerLock.unlock();
 262         }
 263 
 264         if (s != null)
 265             s.signalWindowUpdate();
 266 
 267         return true;
 268     }
 269 
 270     /**
 271      * Adjusts, either increases or decreases, the active streams registered
 272      * with this controller.  May result in a stream's Send Window size becoming
 273      * negative.
 274      */
 275     void adjustActiveStreams(int adjustAmount) {
 276         assert adjustAmount != 0;
 277 
 278         controllerLock.lock();
 279         try {
 280             for (Map.Entry<Integer,Integer> entry : streams.entrySet()) {
 281                 int streamid = entry.getKey();
 282                 // the API only supports sending on Streams initialed by
 283                 // the client, i.e. odd stream numbers
 284                 if (streamid != 0 && (streamid % 2) != 0) {
 285                     Integer size = entry.getValue();
 286                     size += adjustAmount;
 287                     streams.put(streamid, size);
 288                     DEBUG_LOGGER.log(Level.DEBUG,
 289                         "Stream %s window size is now %s", streamid, size);
 290                 }
 291             }
 292         } finally {
 293             controllerLock.unlock();
 294         }
 295     }
 296 
 297     /** Returns the Send Window size for the connection. */
 298     int connectionWindowSize() {
 299         controllerLock.lock();
 300         try {
 301             return connectionWindowSize;
 302         } finally {
 303             controllerLock.unlock();
 304         }
 305     }
 306 
 307 //    /** Returns the Send Window size for the given stream. */
 308 //    int streamWindowSize(int streamid) {
 309 //        controllerLock.lock();
 310 //        try {
 311 //            Integer size = streams.get(streamid);
 312 //            if (size == null)
 313 //                throw new InternalError("Expected entry for streamid: " + streamid);
 314 //            return size;
 315 //        } finally {
 316 //            controllerLock.unlock();
 317 //        }
 318 //    }
 319 
 320 }
< prev index next >