src/share/classes/sun/rmi/transport/tcp/ConnectionMultiplexer.java

Print this page




  68     /** object to notify for new connections from remote endpoint */
  69     private TCPChannel channel;
  70 
  71     /** input stream for underlying single connection */
  72     private InputStream in;
  73 
  74     /** output stream for underlying single connection */
  75     private OutputStream out;
  76 
  77     /** true if underlying connection originated from this endpoint
  78         (used for generating unique connection IDs) */
  79     private boolean orig;
  80 
  81     /** layered stream for reading formatted data from underlying connection */
  82     private DataInputStream dataIn;
  83 
  84     /** layered stream for writing formatted data to underlying connection */
  85     private DataOutputStream dataOut;
  86 
  87     /** table holding currently open connection IDs and related info */
  88     private Hashtable connectionTable = new Hashtable(7);
  89 
  90     /** number of currently open connections */
  91     private int numConnections = 0;
  92 
  93     /** maximum allowed open connections */
  94     private final static int maxConnections = 256;
  95 
  96     /** ID of last connection opened */
  97     private int lastID = 0x1001;
  98 
  99     /** true if this mechanism is still alive */
 100     private boolean alive = true;
 101 
 102     /**
 103      * Create a new ConnectionMultiplexer using the given underlying
 104      * input/output stream pair.  The run method must be called
 105      * (possibly on a new thread) to handle the demultiplexing.
 106      * @param channel object to notify when new connection is received
 107      * @param in input stream of underlying connection
 108      * @param out output stream of underlying connection


 114         InputStream   in,
 115         OutputStream  out,
 116         boolean       orig)
 117     {
 118         this.channel = channel;
 119         this.in      = in;
 120         this.out     = out;
 121         this.orig    = orig;
 122 
 123         dataIn = new DataInputStream(in);
 124         dataOut = new DataOutputStream(out);
 125     }
 126 
 127     /**
 128      * Process multiplexing protocol received from underlying connection.
 129      */
 130     public void run() throws IOException
 131     {
 132         try {
 133             int op, id, length;
 134             Integer idObj;
 135             MultiplexConnectionInfo info;
 136 
 137             while (true) {
 138 
 139                 // read next op code from remote endpoint
 140                 op = dataIn.readUnsignedByte();
 141                 switch (op) {
 142 
 143                 // remote endpoint initiating new connection
 144                 case OPEN:
 145                     id = dataIn.readUnsignedShort();
 146 
 147                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 148                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
 149                     }
 150 
 151                     idObj = new Integer(id);
 152                     info =
 153                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 154                     if (info != null)
 155                         throw new IOException(
 156                             "OPEN: Connection ID already exists");
 157                     info = new MultiplexConnectionInfo(id);
 158                     info.in = new MultiplexInputStream(this, info, 2048);
 159                     info.out = new MultiplexOutputStream(this, info, 2048);
 160                     synchronized (connectionTable) {
 161                         connectionTable.put(idObj, info);
 162                         ++ numConnections;
 163                     }
 164                     sun.rmi.transport.Connection conn;
 165                     conn = new TCPConnection(channel, info.in, info.out);
 166                     channel.acceptMultiplexConnection(conn);
 167                     break;
 168 
 169                 // remote endpoint closing connection
 170                 case CLOSE:
 171                     id = dataIn.readUnsignedShort();
 172 
 173                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 174                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
 175                     }
 176 
 177                     idObj = new Integer(id);
 178                     info =
 179                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 180                     if (info == null)
 181                         throw new IOException(
 182                             "CLOSE: Invalid connection ID");
 183                     info.in.disconnect();
 184                     info.out.disconnect();
 185                     if (!info.closed)
 186                         sendCloseAck(info);
 187                     synchronized (connectionTable) {
 188                         connectionTable.remove(idObj);
 189                         -- numConnections;
 190                     }
 191                     break;
 192 
 193                 // remote endpoint acknowledging close of connection
 194                 case CLOSEACK:
 195                     id = dataIn.readUnsignedShort();
 196 
 197                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 198                         multiplexLog.log(Log.VERBOSE,
 199                             "operation  CLOSEACK " + id);
 200                     }
 201 
 202                     idObj = new Integer(id);
 203                     info =
 204                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 205                     if (info == null)
 206                         throw new IOException(
 207                             "CLOSEACK: Invalid connection ID");
 208                     if (!info.closed)
 209                         throw new IOException(
 210                             "CLOSEACK: Connection not closed");
 211                     info.in.disconnect();
 212                     info.out.disconnect();
 213                     synchronized (connectionTable) {
 214                         connectionTable.remove(idObj);
 215                         -- numConnections;
 216                     }
 217                     break;
 218 
 219                 // remote endpoint declaring additional bytes receivable
 220                 case REQUEST:
 221                     id = dataIn.readUnsignedShort();
 222                     idObj = new Integer(id);
 223                     info =
 224                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 225                     if (info == null)
 226                         throw new IOException(
 227                             "REQUEST: Invalid connection ID");
 228                     length = dataIn.readInt();
 229 
 230                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 231                         multiplexLog.log(Log.VERBOSE,
 232                             "operation  REQUEST " + id + ": " + length);
 233                     }
 234 
 235                     info.out.request(length);
 236                     break;
 237 
 238                 // remote endpoint transmitting data packet
 239                 case TRANSMIT:
 240                     id = dataIn.readUnsignedShort();
 241                     idObj = new Integer(id);
 242                     info =
 243                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 244                     if (info == null)
 245                         throw new IOException("SEND: Invalid connection ID");
 246                     length = dataIn.readInt();
 247 
 248                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 249                         multiplexLog.log(Log.VERBOSE,
 250                             "operation  TRANSMIT " + id + ": " + length);
 251                     }
 252 
 253                     info.in.receive(length, dataIn);
 254                     break;
 255 
 256                 default:
 257                     throw new IOException("Invalid operation: " +
 258                                           Integer.toHexString(op));
 259                 }
 260             }
 261         } finally {
 262             shutDown();
 263         }
 264     }
 265 
 266     /**
 267      * Initiate a new multiplexed connection through the underlying
 268      * connection.
 269      */
 270     public synchronized TCPConnection openConnection() throws IOException
 271     {
 272         // generate ID that should not be already used
 273         // If all possible 32768 IDs are used,
 274         // this method will block searching for a new ID forever.
 275         int id;
 276         Integer idObj;
 277         do {
 278             lastID = (++ lastID) & 0x7FFF;
 279             id = lastID;
 280 
 281             // The orig flag (copied to the high bit of the ID) is used
 282             // to have two distinct ranges to choose IDs from for the
 283             // two endpoints.
 284             if (orig)
 285                 id |= 0x8000;
 286             idObj = new Integer(id);
 287         } while (connectionTable.get(idObj) != null);
 288 
 289         // create multiplexing streams and bookkeeping information
 290         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
 291         info.in = new MultiplexInputStream(this, info, 2048);
 292         info.out = new MultiplexOutputStream(this, info, 2048);
 293 
 294         // add to connection table if multiplexer has not died
 295         synchronized (connectionTable) {
 296             if (!alive)
 297                 throw new IOException("Multiplexer connection dead");
 298             if (numConnections >= maxConnections)
 299                 throw new IOException("Cannot exceed " + maxConnections +
 300                     " simultaneous multiplexed connections");
 301             connectionTable.put(idObj, info);
 302             ++ numConnections;
 303         }
 304 
 305         // inform remote endpoint of new connection
 306         synchronized (dataOut) {
 307             try {
 308                 dataOut.writeByte(OPEN);
 309                 dataOut.writeShort(id);
 310                 dataOut.flush();
 311             } catch (IOException e) {
 312                 multiplexLog.log(Log.BRIEF, "exception: ", e);
 313 
 314                 shutDown();
 315                 throw e;
 316             }
 317         }
 318 
 319         return new TCPConnection(channel, info.in, info.out);
 320     }
 321 
 322     /**
 323      * Shut down all connections and clean up.
 324      */
 325     public void shutDown()
 326     {
 327         // inform all associated streams
 328         synchronized (connectionTable) {
 329             // return if multiplexer already officially dead
 330             if (!alive)
 331                 return;
 332             alive = false;
 333 
 334             Enumeration enum_ = connectionTable.elements();

 335             while (enum_.hasMoreElements()) {
 336                 MultiplexConnectionInfo info =
 337                     (MultiplexConnectionInfo) enum_.nextElement();
 338                 info.in.disconnect();
 339                 info.out.disconnect();
 340             }
 341             connectionTable.clear();
 342             numConnections = 0;
 343         }
 344 
 345         // close underlying connection, if possible (and not already done)
 346         try {
 347             in.close();
 348         } catch (IOException e) {
 349         }
 350         try {
 351             out.close();
 352         } catch (IOException e) {
 353         }
 354     }
 355 
 356     /**
 357      * Send request for more data on connection to remote endpoint.




  68     /** object to notify for new connections from remote endpoint */
  69     private TCPChannel channel;
  70 
  71     /** input stream for underlying single connection */
  72     private InputStream in;
  73 
  74     /** output stream for underlying single connection */
  75     private OutputStream out;
  76 
  77     /** true if underlying connection originated from this endpoint
  78         (used for generating unique connection IDs) */
  79     private boolean orig;
  80 
  81     /** layered stream for reading formatted data from underlying connection */
  82     private DataInputStream dataIn;
  83 
  84     /** layered stream for writing formatted data to underlying connection */
  85     private DataOutputStream dataOut;
  86 
  87     /** table holding currently open connection IDs and related info */
  88     private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
  89 
  90     /** number of currently open connections */
  91     private int numConnections = 0;
  92 
  93     /** maximum allowed open connections */
  94     private final static int maxConnections = 256;
  95 
  96     /** ID of last connection opened */
  97     private int lastID = 0x1001;
  98 
  99     /** true if this mechanism is still alive */
 100     private boolean alive = true;
 101 
 102     /**
 103      * Create a new ConnectionMultiplexer using the given underlying
 104      * input/output stream pair.  The run method must be called
 105      * (possibly on a new thread) to handle the demultiplexing.
 106      * @param channel object to notify when new connection is received
 107      * @param in input stream of underlying connection
 108      * @param out output stream of underlying connection


 114         InputStream   in,
 115         OutputStream  out,
 116         boolean       orig)
 117     {
 118         this.channel = channel;
 119         this.in      = in;
 120         this.out     = out;
 121         this.orig    = orig;
 122 
 123         dataIn = new DataInputStream(in);
 124         dataOut = new DataOutputStream(out);
 125     }
 126 
 127     /**
 128      * Process multiplexing protocol received from underlying connection.
 129      */
 130     public void run() throws IOException
 131     {
 132         try {
 133             int op, id, length;

 134             MultiplexConnectionInfo info;
 135 
 136             while (true) {
 137 
 138                 // read next op code from remote endpoint
 139                 op = dataIn.readUnsignedByte();
 140                 switch (op) {
 141 
 142                 // remote endpoint initiating new connection
 143                 case OPEN:
 144                     id = dataIn.readUnsignedShort();
 145 
 146                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 147                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
 148                     }
 149 
 150                     info = connectionTable.get(id);


 151                     if (info != null)
 152                         throw new IOException(
 153                             "OPEN: Connection ID already exists");
 154                     info = new MultiplexConnectionInfo(id);
 155                     info.in = new MultiplexInputStream(this, info, 2048);
 156                     info.out = new MultiplexOutputStream(this, info, 2048);
 157                     synchronized (connectionTable) {
 158                         connectionTable.put(id, info);
 159                         ++ numConnections;
 160                     }
 161                     sun.rmi.transport.Connection conn;
 162                     conn = new TCPConnection(channel, info.in, info.out);
 163                     channel.acceptMultiplexConnection(conn);
 164                     break;
 165 
 166                 // remote endpoint closing connection
 167                 case CLOSE:
 168                     id = dataIn.readUnsignedShort();
 169 
 170                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 171                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
 172                     }
 173 
 174                     info = connectionTable.get(id);


 175                     if (info == null)
 176                         throw new IOException(
 177                             "CLOSE: Invalid connection ID");
 178                     info.in.disconnect();
 179                     info.out.disconnect();
 180                     if (!info.closed)
 181                         sendCloseAck(info);
 182                     synchronized (connectionTable) {
 183                         connectionTable.remove(id);
 184                         -- numConnections;
 185                     }
 186                     break;
 187 
 188                 // remote endpoint acknowledging close of connection
 189                 case CLOSEACK:
 190                     id = dataIn.readUnsignedShort();
 191 
 192                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 193                         multiplexLog.log(Log.VERBOSE,
 194                             "operation  CLOSEACK " + id);
 195                     }
 196 
 197                     info = connectionTable.get(id);


 198                     if (info == null)
 199                         throw new IOException(
 200                             "CLOSEACK: Invalid connection ID");
 201                     if (!info.closed)
 202                         throw new IOException(
 203                             "CLOSEACK: Connection not closed");
 204                     info.in.disconnect();
 205                     info.out.disconnect();
 206                     synchronized (connectionTable) {
 207                         connectionTable.remove(id);
 208                         -- numConnections;
 209                     }
 210                     break;
 211 
 212                 // remote endpoint declaring additional bytes receivable
 213                 case REQUEST:
 214                     id = dataIn.readUnsignedShort();
 215                     info = connectionTable.get(id);


 216                     if (info == null)
 217                         throw new IOException(
 218                             "REQUEST: Invalid connection ID");
 219                     length = dataIn.readInt();
 220 
 221                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 222                         multiplexLog.log(Log.VERBOSE,
 223                             "operation  REQUEST " + id + ": " + length);
 224                     }
 225 
 226                     info.out.request(length);
 227                     break;
 228 
 229                 // remote endpoint transmitting data packet
 230                 case TRANSMIT:
 231                     id = dataIn.readUnsignedShort();
 232                     info = connectionTable.get(id);


 233                     if (info == null)
 234                         throw new IOException("SEND: Invalid connection ID");
 235                     length = dataIn.readInt();
 236 
 237                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 238                         multiplexLog.log(Log.VERBOSE,
 239                             "operation  TRANSMIT " + id + ": " + length);
 240                     }
 241 
 242                     info.in.receive(length, dataIn);
 243                     break;
 244 
 245                 default:
 246                     throw new IOException("Invalid operation: " +
 247                                           Integer.toHexString(op));
 248                 }
 249             }
 250         } finally {
 251             shutDown();
 252         }
 253     }
 254 
 255     /**
 256      * Initiate a new multiplexed connection through the underlying
 257      * connection.
 258      */
 259     public synchronized TCPConnection openConnection() throws IOException
 260     {
 261         // generate ID that should not be already used
 262         // If all possible 32768 IDs are used,
 263         // this method will block searching for a new ID forever.
 264         int id;

 265         do {
 266             lastID = (++ lastID) & 0x7FFF;
 267             id = lastID;
 268 
 269             // The orig flag (copied to the high bit of the ID) is used
 270             // to have two distinct ranges to choose IDs from for the
 271             // two endpoints.
 272             if (orig)
 273                 id |= 0x8000;
 274         } while (connectionTable.get(id) != null);

 275 
 276         // create multiplexing streams and bookkeeping information
 277         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
 278         info.in = new MultiplexInputStream(this, info, 2048);
 279         info.out = new MultiplexOutputStream(this, info, 2048);
 280 
 281         // add to connection table if multiplexer has not died
 282         synchronized (connectionTable) {
 283             if (!alive)
 284                 throw new IOException("Multiplexer connection dead");
 285             if (numConnections >= maxConnections)
 286                 throw new IOException("Cannot exceed " + maxConnections +
 287                     " simultaneous multiplexed connections");
 288             connectionTable.put(id, info);
 289             ++ numConnections;
 290         }
 291 
 292         // inform remote endpoint of new connection
 293         synchronized (dataOut) {
 294             try {
 295                 dataOut.writeByte(OPEN);
 296                 dataOut.writeShort(id);
 297                 dataOut.flush();
 298             } catch (IOException e) {
 299                 multiplexLog.log(Log.BRIEF, "exception: ", e);
 300 
 301                 shutDown();
 302                 throw e;
 303             }
 304         }
 305 
 306         return new TCPConnection(channel, info.in, info.out);
 307     }
 308 
 309     /**
 310      * Shut down all connections and clean up.
 311      */
 312     public void shutDown()
 313     {
 314         // inform all associated streams
 315         synchronized (connectionTable) {
 316             // return if multiplexer already officially dead
 317             if (!alive)
 318                 return;
 319             alive = false;
 320 
 321             Enumeration<MultiplexConnectionInfo> enum_ = 
 322                     connectionTable.elements();
 323             while (enum_.hasMoreElements()) {
 324                 MultiplexConnectionInfo info = enum_.nextElement();

 325                 info.in.disconnect();
 326                 info.out.disconnect();
 327             }
 328             connectionTable.clear();
 329             numConnections = 0;
 330         }
 331 
 332         // close underlying connection, if possible (and not already done)
 333         try {
 334             in.close();
 335         } catch (IOException e) {
 336         }
 337         try {
 338             out.close();
 339         } catch (IOException e) {
 340         }
 341     }
 342 
 343     /**
 344      * Send request for more data on connection to remote endpoint.