1 /*
   2  * Copyright (c) 1996, 2008, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.*;
  28 import java.util.*;
  29 import java.rmi.server.LogStream;
  30 
  31 import sun.rmi.runtime.Log;
  32 
  33 /**
  34  * ConnectionMultiplexer manages the transparent multiplexing of
  35  * multiple virtual connections from one endpoint to another through
  36  * one given real connection to that endpoint.  The input and output
  37  * streams for the the underlying real connection must be supplied.
  38  * A callback object is also supplied to be informed of new virtual
  39  * connections opened by the remote endpoint.  After creation, the
  40  * run() method must be called in a thread created for demultiplexing
  41  * the connections.  The openConnection() method is called to
  42  * initiate a virtual connection from this endpoint.
  43  *
  44  * @author Peter Jones
  45  */
  46 final class ConnectionMultiplexer {
  47 
  48     /** "multiplex" log level */
  49     static int logLevel = LogStream.parseLevel(getLogLevel());
  50 
  51     private static String getLogLevel() {
  52         return java.security.AccessController.doPrivileged(
  53             new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
  54     }
  55 
  56     /* multiplex system log */
  57     static final Log multiplexLog =
  58         Log.getLog("sun.rmi.transport.tcp.multiplex",
  59                    "multiplex", ConnectionMultiplexer.logLevel);
  60 
  61     /** multiplexing protocol operation codes */
  62     private final static int OPEN     = 0xE1;
  63     private final static int CLOSE    = 0xE2;
  64     private final static int CLOSEACK = 0xE3;
  65     private final static int REQUEST  = 0xE4;
  66     private final static int TRANSMIT = 0xE5;
  67 
  68     /** object to notify for new connections from remote endpoint */
  69     private TCPChannel channel;
  70 
  71     /** input stream for underlying single connection */
  72     private InputStream in;
  73 
  74     /** output stream for underlying single connection */
  75     private OutputStream out;
  76 
  77     /** true if underlying connection originated from this endpoint
  78         (used for generating unique connection IDs) */
  79     private boolean orig;
  80 
  81     /** layered stream for reading formatted data from underlying connection */
  82     private DataInputStream dataIn;
  83 
  84     /** layered stream for writing formatted data to underlying connection */
  85     private DataOutputStream dataOut;
  86 
  87     /** table holding currently open connection IDs and related info */
  88     private Hashtable connectionTable = new Hashtable(7);
  89 
  90     /** number of currently open connections */
  91     private int numConnections = 0;
  92 
  93     /** maximum allowed open connections */
  94     private final static int maxConnections = 256;
  95 
  96     /** ID of last connection opened */
  97     private int lastID = 0x1001;
  98 
  99     /** true if this mechanism is still alive */
 100     private boolean alive = true;
 101 
 102     /**
 103      * Create a new ConnectionMultiplexer using the given underlying
 104      * input/output stream pair.  The run method must be called
 105      * (possibly on a new thread) to handle the demultiplexing.
 106      * @param channel object to notify when new connection is received
 107      * @param in input stream of underlying connection
 108      * @param out output stream of underlying connection
 109      * @param orig true if this endpoint intiated the underlying
 110      *        connection (needs to be set differently at both ends)
 111      */
 112     public ConnectionMultiplexer(
 113         TCPChannel    channel,
 114         InputStream   in,
 115         OutputStream  out,
 116         boolean       orig)
 117     {
 118         this.channel = channel;
 119         this.in      = in;
 120         this.out     = out;
 121         this.orig    = orig;
 122 
 123         dataIn = new DataInputStream(in);
 124         dataOut = new DataOutputStream(out);
 125     }
 126 
 127     /**
 128      * Process multiplexing protocol received from underlying connection.
 129      */
 130     public void run() throws IOException
 131     {
 132         try {
 133             int op, id, length;
 134             Integer idObj;
 135             MultiplexConnectionInfo info;
 136 
 137             while (true) {
 138 
 139                 // read next op code from remote endpoint
 140                 op = dataIn.readUnsignedByte();
 141                 switch (op) {
 142 
 143                 // remote endpoint initiating new connection
 144                 case OPEN:
 145                     id = dataIn.readUnsignedShort();
 146 
 147                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 148                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
 149                     }
 150 
 151                     idObj = new Integer(id);
 152                     info =
 153                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 154                     if (info != null)
 155                         throw new IOException(
 156                             "OPEN: Connection ID already exists");
 157                     info = new MultiplexConnectionInfo(id);
 158                     info.in = new MultiplexInputStream(this, info, 2048);
 159                     info.out = new MultiplexOutputStream(this, info, 2048);
 160                     synchronized (connectionTable) {
 161                         connectionTable.put(idObj, info);
 162                         ++ numConnections;
 163                     }
 164                     sun.rmi.transport.Connection conn;
 165                     conn = new TCPConnection(channel, info.in, info.out);
 166                     channel.acceptMultiplexConnection(conn);
 167                     break;
 168 
 169                 // remote endpoint closing connection
 170                 case CLOSE:
 171                     id = dataIn.readUnsignedShort();
 172 
 173                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 174                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
 175                     }
 176 
 177                     idObj = new Integer(id);
 178                     info =
 179                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 180                     if (info == null)
 181                         throw new IOException(
 182                             "CLOSE: Invalid connection ID");
 183                     info.in.disconnect();
 184                     info.out.disconnect();
 185                     if (!info.closed)
 186                         sendCloseAck(info);
 187                     synchronized (connectionTable) {
 188                         connectionTable.remove(idObj);
 189                         -- numConnections;
 190                     }
 191                     break;
 192 
 193                 // remote endpoint acknowledging close of connection
 194                 case CLOSEACK:
 195                     id = dataIn.readUnsignedShort();
 196 
 197                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 198                         multiplexLog.log(Log.VERBOSE,
 199                             "operation  CLOSEACK " + id);
 200                     }
 201 
 202                     idObj = new Integer(id);
 203                     info =
 204                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 205                     if (info == null)
 206                         throw new IOException(
 207                             "CLOSEACK: Invalid connection ID");
 208                     if (!info.closed)
 209                         throw new IOException(
 210                             "CLOSEACK: Connection not closed");
 211                     info.in.disconnect();
 212                     info.out.disconnect();
 213                     synchronized (connectionTable) {
 214                         connectionTable.remove(idObj);
 215                         -- numConnections;
 216                     }
 217                     break;
 218 
 219                 // remote endpoint declaring additional bytes receivable
 220                 case REQUEST:
 221                     id = dataIn.readUnsignedShort();
 222                     idObj = new Integer(id);
 223                     info =
 224                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 225                     if (info == null)
 226                         throw new IOException(
 227                             "REQUEST: Invalid connection ID");
 228                     length = dataIn.readInt();
 229 
 230                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 231                         multiplexLog.log(Log.VERBOSE,
 232                             "operation  REQUEST " + id + ": " + length);
 233                     }
 234 
 235                     info.out.request(length);
 236                     break;
 237 
 238                 // remote endpoint transmitting data packet
 239                 case TRANSMIT:
 240                     id = dataIn.readUnsignedShort();
 241                     idObj = new Integer(id);
 242                     info =
 243                         (MultiplexConnectionInfo) connectionTable.get(idObj);
 244                     if (info == null)
 245                         throw new IOException("SEND: Invalid connection ID");
 246                     length = dataIn.readInt();
 247 
 248                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 249                         multiplexLog.log(Log.VERBOSE,
 250                             "operation  TRANSMIT " + id + ": " + length);
 251                     }
 252 
 253                     info.in.receive(length, dataIn);
 254                     break;
 255 
 256                 default:
 257                     throw new IOException("Invalid operation: " +
 258                                           Integer.toHexString(op));
 259                 }
 260             }
 261         } finally {
 262             shutDown();
 263         }
 264     }
 265 
 266     /**
 267      * Initiate a new multiplexed connection through the underlying
 268      * connection.
 269      */
 270     public synchronized TCPConnection openConnection() throws IOException
 271     {
 272         // generate ID that should not be already used
 273         // If all possible 32768 IDs are used,
 274         // this method will block searching for a new ID forever.
 275         int id;
 276         Integer idObj;
 277         do {
 278             lastID = (++ lastID) & 0x7FFF;
 279             id = lastID;
 280 
 281             // The orig flag (copied to the high bit of the ID) is used
 282             // to have two distinct ranges to choose IDs from for the
 283             // two endpoints.
 284             if (orig)
 285                 id |= 0x8000;
 286             idObj = new Integer(id);
 287         } while (connectionTable.get(idObj) != null);
 288 
 289         // create multiplexing streams and bookkeeping information
 290         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
 291         info.in = new MultiplexInputStream(this, info, 2048);
 292         info.out = new MultiplexOutputStream(this, info, 2048);
 293 
 294         // add to connection table if multiplexer has not died
 295         synchronized (connectionTable) {
 296             if (!alive)
 297                 throw new IOException("Multiplexer connection dead");
 298             if (numConnections >= maxConnections)
 299                 throw new IOException("Cannot exceed " + maxConnections +
 300                     " simultaneous multiplexed connections");
 301             connectionTable.put(idObj, info);
 302             ++ numConnections;
 303         }
 304 
 305         // inform remote endpoint of new connection
 306         synchronized (dataOut) {
 307             try {
 308                 dataOut.writeByte(OPEN);
 309                 dataOut.writeShort(id);
 310                 dataOut.flush();
 311             } catch (IOException e) {
 312                 multiplexLog.log(Log.BRIEF, "exception: ", e);
 313 
 314                 shutDown();
 315                 throw e;
 316             }
 317         }
 318 
 319         return new TCPConnection(channel, info.in, info.out);
 320     }
 321 
 322     /**
 323      * Shut down all connections and clean up.
 324      */
 325     public void shutDown()
 326     {
 327         // inform all associated streams
 328         synchronized (connectionTable) {
 329             // return if multiplexer already officially dead
 330             if (!alive)
 331                 return;
 332             alive = false;
 333 
 334             Enumeration enum_ = connectionTable.elements();
 335             while (enum_.hasMoreElements()) {
 336                 MultiplexConnectionInfo info =
 337                     (MultiplexConnectionInfo) enum_.nextElement();
 338                 info.in.disconnect();
 339                 info.out.disconnect();
 340             }
 341             connectionTable.clear();
 342             numConnections = 0;
 343         }
 344 
 345         // close underlying connection, if possible (and not already done)
 346         try {
 347             in.close();
 348         } catch (IOException e) {
 349         }
 350         try {
 351             out.close();
 352         } catch (IOException e) {
 353         }
 354     }
 355 
 356     /**
 357      * Send request for more data on connection to remote endpoint.
 358      * @param info connection information structure
 359      * @param len number of more bytes that can be received
 360      */
 361     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
 362     {
 363         synchronized (dataOut) {
 364             if (alive && !info.closed)
 365                 try {
 366                     dataOut.writeByte(REQUEST);
 367                     dataOut.writeShort(info.id);
 368                     dataOut.writeInt(len);
 369                     dataOut.flush();
 370                 } catch (IOException e) {
 371                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 372 
 373                     shutDown();
 374                     throw e;
 375                 }
 376         }
 377     }
 378 
 379     /**
 380      * Send packet of requested data on connection to remote endpoint.
 381      * @param info connection information structure
 382      * @param buf array containg bytes to send
 383      * @param off offset of first array index of packet
 384      * @param len number of bytes in packet to send
 385      */
 386     void sendTransmit(MultiplexConnectionInfo info,
 387                       byte buf[], int off, int len) throws IOException
 388     {
 389         synchronized (dataOut) {
 390             if (alive && !info.closed)
 391                 try {
 392                     dataOut.writeByte(TRANSMIT);
 393                     dataOut.writeShort(info.id);
 394                     dataOut.writeInt(len);
 395                     dataOut.write(buf, off, len);
 396                     dataOut.flush();
 397                 } catch (IOException e) {
 398                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 399 
 400                     shutDown();
 401                     throw e;
 402                 }
 403         }
 404     }
 405 
 406     /**
 407      * Inform remote endpoint that connection has been closed.
 408      * @param info connection information structure
 409      */
 410     void sendClose(MultiplexConnectionInfo info) throws IOException
 411     {
 412         info.out.disconnect();
 413         synchronized (dataOut) {
 414             if (alive && !info.closed)
 415                 try {
 416                     dataOut.writeByte(CLOSE);
 417                     dataOut.writeShort(info.id);
 418                     dataOut.flush();
 419                     info.closed = true;
 420                 } catch (IOException e) {
 421                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 422 
 423                     shutDown();
 424                     throw e;
 425                 }
 426         }
 427     }
 428 
 429     /**
 430      * Acknowledge remote endpoint's closing of connection.
 431      * @param info connection information structure
 432      */
 433     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
 434     {
 435         synchronized (dataOut) {
 436             if (alive && !info.closed)
 437                 try {
 438                     dataOut.writeByte(CLOSEACK);
 439                     dataOut.writeShort(info.id);
 440                     dataOut.flush();
 441                     info.closed = true;
 442                 } catch (IOException e) {
 443                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 444 
 445                     shutDown();
 446                     throw e;
 447                 }
 448         }
 449     }
 450 
 451     /**
 452      * Shut down connection upon finalization.
 453      */
 454     protected void finalize() throws Throwable
 455     {
 456         super.finalize();
 457         shutDown();
 458     }
 459 }