1 /*
   2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.util.Map;
  29 import java.util.HashMap;
  30 import java.util.concurrent.locks.Condition;
  31 import java.util.concurrent.locks.ReentrantLock;
  32 
  33 /**
  34  * A Simple blocking Send Window Flow-Controller that is used to control
  35  * outgoing Connection and Stream flows, per HTTP/2 connection.
  36  *
  37  * A Http2Connection has its own unique single instance of a WindowController
  38  * that it shares with its Streams. Each stream must acquire the appropriate
  39  * amount of Send Window from the controller before sending data.
  40  *
  41  * WINDOW_UPDATE frames, both connection and stream specific, must notify the
  42  * controller of their increments. SETTINGS frame's INITIAL_WINDOW_SIZE must
  43  * notify the controller so that it can adjust the active stream's window size.
  44  */
  45 final class WindowController {
  46 
  47     /**
  48      * Default initial connection Flow-Control Send Window size, as per HTTP/2.
  49      */
  50     private static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
  51 
  52     /** The connection Send Window size. */
  53     private int connectionWindowSize;
  54     /** A Map of the active streams, where the key is the stream id, and the
  55      *  value is the stream's Send Window size, which may be negative. */
  56     private final Map<Integer,Integer> streams = new HashMap<>();
  57 
  58     private final ReentrantLock controllerLock = new ReentrantLock();
  59 
  60     private final Condition notExhausted = controllerLock.newCondition();
  61 
  62     /** A Controller with the default initial window size. */
  63     WindowController() {
  64         connectionWindowSize = DEFAULT_INITIAL_WINDOW_SIZE;
  65     }
  66 
  67     /** A Controller with the given initial window size. */
  68     WindowController(int initialConnectionWindowSize) {
  69         connectionWindowSize = initialConnectionWindowSize;
  70     }
  71 
  72     /** Registers the given stream with this controller. */
  73     void registerStream(int streamid, int initialStreamWindowSize) {
  74         controllerLock.lock();
  75         try {
  76             Integer old = streams.put(streamid, initialStreamWindowSize);
  77             if (old != null)
  78                 throw new InternalError("Unexpected entry [" + old + "] for streamid: " + streamid);
  79         } finally {
  80             controllerLock.unlock();
  81         }
  82     }
  83 
  84     /** Removes/De-registers the given stream with this controller. */
  85     void removeStream(int streamid) {
  86         controllerLock.lock();
  87         try {
  88             Integer old = streams.remove(streamid);
  89             // Odd stream numbers (client streams) should have been registered.
  90             // Even stream numbers (server streams - aka Push Streams) should
  91             // not be registered
  92             final boolean isClientStream = (streamid % 2) == 1;
  93             if (old == null && isClientStream) {
  94                 throw new InternalError("Expected entry for streamid: " + streamid);
  95             } else if (old != null && !isClientStream) {
  96                 throw new InternalError("Unexpected entry for streamid: " + streamid);
  97             }
  98         } finally {
  99             controllerLock.unlock();
 100         }
 101     }
 102 
 103     /**
 104      * Attempts to acquire the requested amount of Send Window for the given
 105      * stream.
 106      *
 107      * The actual amount of Send Window available may differ from the requested
 108      * amount. The actual amount, returned by this method, is the minimum of,
 109      * 1) the requested amount, 2) the stream's Send Window, and 3) the
 110      * connection's Send Window.
 111      *
 112      * This method ( currently ) blocks until some positive amount of Send
 113      * Window is available.
 114      */
 115     int tryAcquire(int requestAmount, int streamid) throws InterruptedException {
 116         controllerLock.lock();
 117         try {
 118             int x = 0;
 119             Integer streamSize = 0;
 120             while (x <= 0) {
 121                 streamSize = streams.get(streamid);
 122                 if (streamSize == null)
 123                     throw new InternalError("Expected entry for streamid: " + streamid);
 124                 x = Math.min(requestAmount,
 125                              Math.min(streamSize, connectionWindowSize));
 126 
 127                 if (x <= 0)  // stream window size may be negative
 128                     notExhausted.await();
 129             }
 130 
 131             streamSize -= x;
 132             streams.put(streamid, streamSize);
 133             connectionWindowSize -= x;
 134             return x;
 135         } finally {
 136             controllerLock.unlock();
 137         }
 138     }
 139 
 140     /**
 141      * Increases the Send Window size for the connection.
 142      *
 143      * @return false if, and only if, the addition of the given amount would
 144      *         cause the Send Window to exceed 2^31-1 (overflow), otherwise true
 145      */
 146     boolean increaseConnectionWindow(int amount) {
 147         controllerLock.lock();
 148         try {
 149             int size = connectionWindowSize;
 150             size += amount;
 151             if (size < 0)
 152                 return false;
 153             connectionWindowSize = size;
 154             notExhausted.signalAll();
 155         } finally {
 156             controllerLock.unlock();
 157         }
 158         return true;
 159     }
 160 
 161     /**
 162      * Increases the Send Window size for the given stream.
 163      *
 164      * @return false if, and only if, the addition of the given amount would
 165      *         cause the Send Window to exceed 2^31-1 (overflow), otherwise true
 166      */
 167     boolean increaseStreamWindow(int amount, int streamid) {
 168         controllerLock.lock();
 169         try {
 170             Integer size = streams.get(streamid);
 171             if (size == null)
 172                 throw new InternalError("Expected entry for streamid: " + streamid);
 173             size += amount;
 174             if (size < 0)
 175                 return false;
 176             streams.put(streamid, size);
 177             notExhausted.signalAll();
 178         } finally {
 179             controllerLock.unlock();
 180         }
 181         return true;
 182     }
 183 
 184     /**
 185      * Adjusts, either increases or decreases, the active streams registered
 186      * with this controller.  May result in a stream's Send Window size becoming
 187      * negative.
 188      */
 189     void adjustActiveStreams(int adjustAmount) {
 190         assert adjustAmount != 0;
 191 
 192         controllerLock.lock();
 193         try {
 194             for (Map.Entry<Integer,Integer> entry : streams.entrySet()) {
 195                 int streamid = entry.getKey();
 196                 // the API only supports sending on Streams initialed by
 197                 // the client, i.e. odd stream numbers
 198                 if (streamid != 0 && (streamid % 2) != 0) {
 199                     Integer size = entry.getValue();
 200                     size += adjustAmount;
 201                     streams.put(streamid, size);
 202                 }
 203             }
 204         } finally {
 205             controllerLock.unlock();
 206         }
 207     }
 208 
 209     /** Returns the Send Window size for the connection. */
 210     int connectionWindowSize() {
 211         controllerLock.lock();
 212         try {
 213             return connectionWindowSize;
 214         } finally {
 215             controllerLock.unlock();
 216         }
 217     }
 218 
 219     /** Returns the Send Window size for the given stream. */
 220     int streamWindowSize(int streamid) {
 221         controllerLock.lock();
 222         try {
 223             Integer size = streams.get(streamid);
 224             if (size == null)
 225                 throw new InternalError("Expected entry for streamid: " + streamid);
 226             return size;
 227         } finally {
 228             controllerLock.unlock();;
 229         }
 230     }
 231 }