1 /*
   2  * Copyright (c) 1996, 2008, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.*;
  28 import java.util.*;
  29 import java.rmi.server.LogStream;
  30 
  31 import sun.rmi.runtime.Log;
  32 
  33 /**
  34  * ConnectionMultiplexer manages the transparent multiplexing of
  35  * multiple virtual connections from one endpoint to another through
  36  * one given real connection to that endpoint.  The input and output
  37  * streams for the the underlying real connection must be supplied.
  38  * A callback object is also supplied to be informed of new virtual
  39  * connections opened by the remote endpoint.  After creation, the
  40  * run() method must be called in a thread created for demultiplexing
  41  * the connections.  The openConnection() method is called to
  42  * initiate a virtual connection from this endpoint.
  43  *
  44  * @author Peter Jones
  45  */
  46 final class ConnectionMultiplexer {
  47 
  48     /** "multiplex" log level */
  49     static int logLevel = LogStream.parseLevel(getLogLevel());
  50 
  51     private static String getLogLevel() {
  52         return java.security.AccessController.doPrivileged(
  53             new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
  54     }
  55 
  56     /* multiplex system log */
  57     static final Log multiplexLog =
  58         Log.getLog("sun.rmi.transport.tcp.multiplex",
  59                    "multiplex", ConnectionMultiplexer.logLevel);
  60 
  61     /** multiplexing protocol operation codes */
  62     private final static int OPEN     = 0xE1;
  63     private final static int CLOSE    = 0xE2;
  64     private final static int CLOSEACK = 0xE3;
  65     private final static int REQUEST  = 0xE4;
  66     private final static int TRANSMIT = 0xE5;
  67 
  68     /** object to notify for new connections from remote endpoint */
  69     private TCPChannel channel;
  70 
  71     /** input stream for underlying single connection */
  72     private InputStream in;
  73 
  74     /** output stream for underlying single connection */
  75     private OutputStream out;
  76 
  77     /** true if underlying connection originated from this endpoint
  78         (used for generating unique connection IDs) */
  79     private boolean orig;
  80 
  81     /** layered stream for reading formatted data from underlying connection */
  82     private DataInputStream dataIn;
  83 
  84     /** layered stream for writing formatted data to underlying connection */
  85     private DataOutputStream dataOut;
  86 
  87     /** table holding currently open connection IDs and related info */
  88     private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
  89 
  90     /** number of currently open connections */
  91     private int numConnections = 0;
  92 
  93     /** maximum allowed open connections */
  94     private final static int maxConnections = 256;
  95 
  96     /** ID of last connection opened */
  97     private int lastID = 0x1001;
  98 
  99     /** true if this mechanism is still alive */
 100     private boolean alive = true;
 101 
 102     /**
 103      * Create a new ConnectionMultiplexer using the given underlying
 104      * input/output stream pair.  The run method must be called
 105      * (possibly on a new thread) to handle the demultiplexing.
 106      * @param channel object to notify when new connection is received
 107      * @param in input stream of underlying connection
 108      * @param out output stream of underlying connection
 109      * @param orig true if this endpoint intiated the underlying
 110      *        connection (needs to be set differently at both ends)
 111      */
 112     public ConnectionMultiplexer(
 113         TCPChannel    channel,
 114         InputStream   in,
 115         OutputStream  out,
 116         boolean       orig)
 117     {
 118         this.channel = channel;
 119         this.in      = in;
 120         this.out     = out;
 121         this.orig    = orig;
 122 
 123         dataIn = new DataInputStream(in);
 124         dataOut = new DataOutputStream(out);
 125     }
 126 
 127     /**
 128      * Process multiplexing protocol received from underlying connection.
 129      */
 130     public void run() throws IOException
 131     {
 132         try {
 133             int op, id, length;
 134             MultiplexConnectionInfo info;
 135 
 136             while (true) {
 137 
 138                 // read next op code from remote endpoint
 139                 op = dataIn.readUnsignedByte();
 140                 switch (op) {
 141 
 142                 // remote endpoint initiating new connection
 143                 case OPEN:
 144                     id = dataIn.readUnsignedShort();
 145 
 146                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 147                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
 148                     }
 149 
 150                     info = connectionTable.get(id);
 151                     if (info != null)
 152                         throw new IOException(
 153                             "OPEN: Connection ID already exists");
 154                     info = new MultiplexConnectionInfo(id);
 155                     info.in = new MultiplexInputStream(this, info, 2048);
 156                     info.out = new MultiplexOutputStream(this, info, 2048);
 157                     synchronized (connectionTable) {
 158                         connectionTable.put(id, info);
 159                         ++ numConnections;
 160                     }
 161                     sun.rmi.transport.Connection conn;
 162                     conn = new TCPConnection(channel, info.in, info.out);
 163                     channel.acceptMultiplexConnection(conn);
 164                     break;
 165 
 166                 // remote endpoint closing connection
 167                 case CLOSE:
 168                     id = dataIn.readUnsignedShort();
 169 
 170                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 171                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
 172                     }
 173 
 174                     info = connectionTable.get(id);
 175                     if (info == null)
 176                         throw new IOException(
 177                             "CLOSE: Invalid connection ID");
 178                     info.in.disconnect();
 179                     info.out.disconnect();
 180                     if (!info.closed)
 181                         sendCloseAck(info);
 182                     synchronized (connectionTable) {
 183                         connectionTable.remove(id);
 184                         -- numConnections;
 185                     }
 186                     break;
 187 
 188                 // remote endpoint acknowledging close of connection
 189                 case CLOSEACK:
 190                     id = dataIn.readUnsignedShort();
 191 
 192                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 193                         multiplexLog.log(Log.VERBOSE,
 194                             "operation  CLOSEACK " + id);
 195                     }
 196 
 197                     info = connectionTable.get(id);
 198                     if (info == null)
 199                         throw new IOException(
 200                             "CLOSEACK: Invalid connection ID");
 201                     if (!info.closed)
 202                         throw new IOException(
 203                             "CLOSEACK: Connection not closed");
 204                     info.in.disconnect();
 205                     info.out.disconnect();
 206                     synchronized (connectionTable) {
 207                         connectionTable.remove(id);
 208                         -- numConnections;
 209                     }
 210                     break;
 211 
 212                 // remote endpoint declaring additional bytes receivable
 213                 case REQUEST:
 214                     id = dataIn.readUnsignedShort();
 215                     info = connectionTable.get(id);
 216                     if (info == null)
 217                         throw new IOException(
 218                             "REQUEST: Invalid connection ID");
 219                     length = dataIn.readInt();
 220 
 221                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 222                         multiplexLog.log(Log.VERBOSE,
 223                             "operation  REQUEST " + id + ": " + length);
 224                     }
 225 
 226                     info.out.request(length);
 227                     break;
 228 
 229                 // remote endpoint transmitting data packet
 230                 case TRANSMIT:
 231                     id = dataIn.readUnsignedShort();
 232                     info = connectionTable.get(id);
 233                     if (info == null)
 234                         throw new IOException("SEND: Invalid connection ID");
 235                     length = dataIn.readInt();
 236 
 237                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 238                         multiplexLog.log(Log.VERBOSE,
 239                             "operation  TRANSMIT " + id + ": " + length);
 240                     }
 241 
 242                     info.in.receive(length, dataIn);
 243                     break;
 244 
 245                 default:
 246                     throw new IOException("Invalid operation: " +
 247                                           Integer.toHexString(op));
 248                 }
 249             }
 250         } finally {
 251             shutDown();
 252         }
 253     }
 254 
 255     /**
 256      * Initiate a new multiplexed connection through the underlying
 257      * connection.
 258      */
 259     public synchronized TCPConnection openConnection() throws IOException
 260     {
 261         // generate ID that should not be already used
 262         // If all possible 32768 IDs are used,
 263         // this method will block searching for a new ID forever.
 264         int id;
 265         do {
 266             lastID = (++ lastID) & 0x7FFF;
 267             id = lastID;
 268 
 269             // The orig flag (copied to the high bit of the ID) is used
 270             // to have two distinct ranges to choose IDs from for the
 271             // two endpoints.
 272             if (orig)
 273                 id |= 0x8000;
 274         } while (connectionTable.get(id) != null);
 275 
 276         // create multiplexing streams and bookkeeping information
 277         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
 278         info.in = new MultiplexInputStream(this, info, 2048);
 279         info.out = new MultiplexOutputStream(this, info, 2048);
 280 
 281         // add to connection table if multiplexer has not died
 282         synchronized (connectionTable) {
 283             if (!alive)
 284                 throw new IOException("Multiplexer connection dead");
 285             if (numConnections >= maxConnections)
 286                 throw new IOException("Cannot exceed " + maxConnections +
 287                     " simultaneous multiplexed connections");
 288             connectionTable.put(id, info);
 289             ++ numConnections;
 290         }
 291 
 292         // inform remote endpoint of new connection
 293         synchronized (dataOut) {
 294             try {
 295                 dataOut.writeByte(OPEN);
 296                 dataOut.writeShort(id);
 297                 dataOut.flush();
 298             } catch (IOException e) {
 299                 multiplexLog.log(Log.BRIEF, "exception: ", e);
 300 
 301                 shutDown();
 302                 throw e;
 303             }
 304         }
 305 
 306         return new TCPConnection(channel, info.in, info.out);
 307     }
 308 
 309     /**
 310      * Shut down all connections and clean up.
 311      */
 312     public void shutDown()
 313     {
 314         // inform all associated streams
 315         synchronized (connectionTable) {
 316             // return if multiplexer already officially dead
 317             if (!alive)
 318                 return;
 319             alive = false;
 320 
 321             Enumeration<MultiplexConnectionInfo> enum_ = 
 322                     connectionTable.elements();
 323             while (enum_.hasMoreElements()) {
 324                 MultiplexConnectionInfo info = enum_.nextElement();
 325                 info.in.disconnect();
 326                 info.out.disconnect();
 327             }
 328             connectionTable.clear();
 329             numConnections = 0;
 330         }
 331 
 332         // close underlying connection, if possible (and not already done)
 333         try {
 334             in.close();
 335         } catch (IOException e) {
 336         }
 337         try {
 338             out.close();
 339         } catch (IOException e) {
 340         }
 341     }
 342 
 343     /**
 344      * Send request for more data on connection to remote endpoint.
 345      * @param info connection information structure
 346      * @param len number of more bytes that can be received
 347      */
 348     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
 349     {
 350         synchronized (dataOut) {
 351             if (alive && !info.closed)
 352                 try {
 353                     dataOut.writeByte(REQUEST);
 354                     dataOut.writeShort(info.id);
 355                     dataOut.writeInt(len);
 356                     dataOut.flush();
 357                 } catch (IOException e) {
 358                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 359 
 360                     shutDown();
 361                     throw e;
 362                 }
 363         }
 364     }
 365 
 366     /**
 367      * Send packet of requested data on connection to remote endpoint.
 368      * @param info connection information structure
 369      * @param buf array containg bytes to send
 370      * @param off offset of first array index of packet
 371      * @param len number of bytes in packet to send
 372      */
 373     void sendTransmit(MultiplexConnectionInfo info,
 374                       byte buf[], int off, int len) throws IOException
 375     {
 376         synchronized (dataOut) {
 377             if (alive && !info.closed)
 378                 try {
 379                     dataOut.writeByte(TRANSMIT);
 380                     dataOut.writeShort(info.id);
 381                     dataOut.writeInt(len);
 382                     dataOut.write(buf, off, len);
 383                     dataOut.flush();
 384                 } catch (IOException e) {
 385                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 386 
 387                     shutDown();
 388                     throw e;
 389                 }
 390         }
 391     }
 392 
 393     /**
 394      * Inform remote endpoint that connection has been closed.
 395      * @param info connection information structure
 396      */
 397     void sendClose(MultiplexConnectionInfo info) throws IOException
 398     {
 399         info.out.disconnect();
 400         synchronized (dataOut) {
 401             if (alive && !info.closed)
 402                 try {
 403                     dataOut.writeByte(CLOSE);
 404                     dataOut.writeShort(info.id);
 405                     dataOut.flush();
 406                     info.closed = true;
 407                 } catch (IOException e) {
 408                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 409 
 410                     shutDown();
 411                     throw e;
 412                 }
 413         }
 414     }
 415 
 416     /**
 417      * Acknowledge remote endpoint's closing of connection.
 418      * @param info connection information structure
 419      */
 420     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
 421     {
 422         synchronized (dataOut) {
 423             if (alive && !info.closed)
 424                 try {
 425                     dataOut.writeByte(CLOSEACK);
 426                     dataOut.writeShort(info.id);
 427                     dataOut.flush();
 428                     info.closed = true;
 429                 } catch (IOException e) {
 430                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 431 
 432                     shutDown();
 433                     throw e;
 434                 }
 435         }
 436     }
 437 
 438     /**
 439      * Shut down connection upon finalization.
 440      */
 441     protected void finalize() throws Throwable
 442     {
 443         super.finalize();
 444         shutDown();
 445     }
 446 }