1 /*
   2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.*;
  28 import java.util.*;
  29 import java.rmi.server.LogStream;
  30 
  31 import sun.rmi.runtime.Log;
  32 
  33 /**
  34  * ConnectionMultiplexer manages the transparent multiplexing of
  35  * multiple virtual connections from one endpoint to another through
  36  * one given real connection to that endpoint.  The input and output
  37  * streams for the the underlying real connection must be supplied.
  38  * A callback object is also supplied to be informed of new virtual
  39  * connections opened by the remote endpoint.  After creation, the
  40  * run() method must be called in a thread created for demultiplexing
  41  * the connections.  The openConnection() method is called to
  42  * initiate a virtual connection from this endpoint.
  43  *
  44  * @author Peter Jones
  45  */
  46 @SuppressWarnings("deprecation")
  47 final class ConnectionMultiplexer {
  48 
  49     /** "multiplex" log level */
  50     static int logLevel = LogStream.parseLevel(getLogLevel());
  51 
  52     private static String getLogLevel() {
  53         return java.security.AccessController.doPrivileged(
  54             new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel"));
  55     }
  56 
  57     /* multiplex system log */
  58     static final Log multiplexLog =
  59         Log.getLog("sun.rmi.transport.tcp.multiplex",
  60                    "multiplex", ConnectionMultiplexer.logLevel);
  61 
  62     /** multiplexing protocol operation codes */
  63     private final static int OPEN     = 0xE1;
  64     private final static int CLOSE    = 0xE2;
  65     private final static int CLOSEACK = 0xE3;
  66     private final static int REQUEST  = 0xE4;
  67     private final static int TRANSMIT = 0xE5;
  68 
  69     /** object to notify for new connections from remote endpoint */
  70     private TCPChannel channel;
  71 
  72     /** input stream for underlying single connection */
  73     private InputStream in;
  74 
  75     /** output stream for underlying single connection */
  76     private OutputStream out;
  77 
  78     /** true if underlying connection originated from this endpoint
  79         (used for generating unique connection IDs) */
  80     private boolean orig;
  81 
  82     /** layered stream for reading formatted data from underlying connection */
  83     private DataInputStream dataIn;
  84 
  85     /** layered stream for writing formatted data to underlying connection */
  86     private DataOutputStream dataOut;
  87 
  88     /** table holding currently open connection IDs and related info */
  89     private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
  90 
  91     /** number of currently open connections */
  92     private int numConnections = 0;
  93 
  94     /** maximum allowed open connections */
  95     private final static int maxConnections = 256;
  96 
  97     /** ID of last connection opened */
  98     private int lastID = 0x1001;
  99 
 100     /** true if this mechanism is still alive */
 101     private boolean alive = true;
 102 
 103     /**
 104      * Create a new ConnectionMultiplexer using the given underlying
 105      * input/output stream pair.  The run method must be called
 106      * (possibly on a new thread) to handle the demultiplexing.
 107      * @param channel object to notify when new connection is received
 108      * @param in input stream of underlying connection
 109      * @param out output stream of underlying connection
 110      * @param orig true if this endpoint intiated the underlying
 111      *        connection (needs to be set differently at both ends)
 112      */
 113     public ConnectionMultiplexer(
 114         TCPChannel    channel,
 115         InputStream   in,
 116         OutputStream  out,
 117         boolean       orig)
 118     {
 119         this.channel = channel;
 120         this.in      = in;
 121         this.out     = out;
 122         this.orig    = orig;
 123 
 124         dataIn = new DataInputStream(in);
 125         dataOut = new DataOutputStream(out);
 126     }
 127 
 128     /**
 129      * Process multiplexing protocol received from underlying connection.
 130      */
 131     public void run() throws IOException
 132     {
 133         try {
 134             int op, id, length;
 135             MultiplexConnectionInfo info;
 136 
 137             while (true) {
 138 
 139                 // read next op code from remote endpoint
 140                 op = dataIn.readUnsignedByte();
 141                 switch (op) {
 142 
 143                 // remote endpoint initiating new connection
 144                 case OPEN:
 145                     id = dataIn.readUnsignedShort();
 146 
 147                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 148                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
 149                     }
 150 
 151                     info = connectionTable.get(id);
 152                     if (info != null)
 153                         throw new IOException(
 154                             "OPEN: Connection ID already exists");
 155                     info = new MultiplexConnectionInfo(id);
 156                     info.in = new MultiplexInputStream(this, info, 2048);
 157                     info.out = new MultiplexOutputStream(this, info, 2048);
 158                     synchronized (connectionTable) {
 159                         connectionTable.put(id, info);
 160                         ++ numConnections;
 161                     }
 162                     sun.rmi.transport.Connection conn;
 163                     conn = new TCPConnection(channel, info.in, info.out);
 164                     channel.acceptMultiplexConnection(conn);
 165                     break;
 166 
 167                 // remote endpoint closing connection
 168                 case CLOSE:
 169                     id = dataIn.readUnsignedShort();
 170 
 171                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 172                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
 173                     }
 174 
 175                     info = connectionTable.get(id);
 176                     if (info == null)
 177                         throw new IOException(
 178                             "CLOSE: Invalid connection ID");
 179                     info.in.disconnect();
 180                     info.out.disconnect();
 181                     if (!info.closed)
 182                         sendCloseAck(info);
 183                     synchronized (connectionTable) {
 184                         connectionTable.remove(id);
 185                         -- numConnections;
 186                     }
 187                     break;
 188 
 189                 // remote endpoint acknowledging close of connection
 190                 case CLOSEACK:
 191                     id = dataIn.readUnsignedShort();
 192 
 193                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 194                         multiplexLog.log(Log.VERBOSE,
 195                             "operation  CLOSEACK " + id);
 196                     }
 197 
 198                     info = connectionTable.get(id);
 199                     if (info == null)
 200                         throw new IOException(
 201                             "CLOSEACK: Invalid connection ID");
 202                     if (!info.closed)
 203                         throw new IOException(
 204                             "CLOSEACK: Connection not closed");
 205                     info.in.disconnect();
 206                     info.out.disconnect();
 207                     synchronized (connectionTable) {
 208                         connectionTable.remove(id);
 209                         -- numConnections;
 210                     }
 211                     break;
 212 
 213                 // remote endpoint declaring additional bytes receivable
 214                 case REQUEST:
 215                     id = dataIn.readUnsignedShort();
 216                     info = connectionTable.get(id);
 217                     if (info == null)
 218                         throw new IOException(
 219                             "REQUEST: Invalid connection ID");
 220                     length = dataIn.readInt();
 221 
 222                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 223                         multiplexLog.log(Log.VERBOSE,
 224                             "operation  REQUEST " + id + ": " + length);
 225                     }
 226 
 227                     info.out.request(length);
 228                     break;
 229 
 230                 // remote endpoint transmitting data packet
 231                 case TRANSMIT:
 232                     id = dataIn.readUnsignedShort();
 233                     info = connectionTable.get(id);
 234                     if (info == null)
 235                         throw new IOException("SEND: Invalid connection ID");
 236                     length = dataIn.readInt();
 237 
 238                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 239                         multiplexLog.log(Log.VERBOSE,
 240                             "operation  TRANSMIT " + id + ": " + length);
 241                     }
 242 
 243                     info.in.receive(length, dataIn);
 244                     break;
 245 
 246                 default:
 247                     throw new IOException("Invalid operation: " +
 248                                           Integer.toHexString(op));
 249                 }
 250             }
 251         } finally {
 252             shutDown();
 253         }
 254     }
 255 
 256     /**
 257      * Initiate a new multiplexed connection through the underlying
 258      * connection.
 259      */
 260     public synchronized TCPConnection openConnection() throws IOException
 261     {
 262         // generate ID that should not be already used
 263         // If all possible 32768 IDs are used,
 264         // this method will block searching for a new ID forever.
 265         int id;
 266         do {
 267             lastID = (++ lastID) & 0x7FFF;
 268             id = lastID;
 269 
 270             // The orig flag (copied to the high bit of the ID) is used
 271             // to have two distinct ranges to choose IDs from for the
 272             // two endpoints.
 273             if (orig)
 274                 id |= 0x8000;
 275         } while (connectionTable.get(id) != null);
 276 
 277         // create multiplexing streams and bookkeeping information
 278         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
 279         info.in = new MultiplexInputStream(this, info, 2048);
 280         info.out = new MultiplexOutputStream(this, info, 2048);
 281 
 282         // add to connection table if multiplexer has not died
 283         synchronized (connectionTable) {
 284             if (!alive)
 285                 throw new IOException("Multiplexer connection dead");
 286             if (numConnections >= maxConnections)
 287                 throw new IOException("Cannot exceed " + maxConnections +
 288                     " simultaneous multiplexed connections");
 289             connectionTable.put(id, info);
 290             ++ numConnections;
 291         }
 292 
 293         // inform remote endpoint of new connection
 294         synchronized (dataOut) {
 295             try {
 296                 dataOut.writeByte(OPEN);
 297                 dataOut.writeShort(id);
 298                 dataOut.flush();
 299             } catch (IOException e) {
 300                 multiplexLog.log(Log.BRIEF, "exception: ", e);
 301 
 302                 shutDown();
 303                 throw e;
 304             }
 305         }
 306 
 307         return new TCPConnection(channel, info.in, info.out);
 308     }
 309 
 310     /**
 311      * Shut down all connections and clean up.
 312      */
 313     public void shutDown()
 314     {
 315         // inform all associated streams
 316         synchronized (connectionTable) {
 317             // return if multiplexer already officially dead
 318             if (!alive)
 319                 return;
 320             alive = false;
 321 
 322             Enumeration<MultiplexConnectionInfo> enum_ =
 323                     connectionTable.elements();
 324             while (enum_.hasMoreElements()) {
 325                 MultiplexConnectionInfo info = enum_.nextElement();
 326                 info.in.disconnect();
 327                 info.out.disconnect();
 328             }
 329             connectionTable.clear();
 330             numConnections = 0;
 331         }
 332 
 333         // close underlying connection, if possible (and not already done)
 334         try {
 335             in.close();
 336         } catch (IOException e) {
 337         }
 338         try {
 339             out.close();
 340         } catch (IOException e) {
 341         }
 342     }
 343 
 344     /**
 345      * Send request for more data on connection to remote endpoint.
 346      * @param info connection information structure
 347      * @param len number of more bytes that can be received
 348      */
 349     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
 350     {
 351         synchronized (dataOut) {
 352             if (alive && !info.closed)
 353                 try {
 354                     dataOut.writeByte(REQUEST);
 355                     dataOut.writeShort(info.id);
 356                     dataOut.writeInt(len);
 357                     dataOut.flush();
 358                 } catch (IOException e) {
 359                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 360 
 361                     shutDown();
 362                     throw e;
 363                 }
 364         }
 365     }
 366 
 367     /**
 368      * Send packet of requested data on connection to remote endpoint.
 369      * @param info connection information structure
 370      * @param buf array containing bytes to send
 371      * @param off offset of first array index of packet
 372      * @param len number of bytes in packet to send
 373      */
 374     void sendTransmit(MultiplexConnectionInfo info,
 375                       byte buf[], int off, int len) throws IOException
 376     {
 377         synchronized (dataOut) {
 378             if (alive && !info.closed)
 379                 try {
 380                     dataOut.writeByte(TRANSMIT);
 381                     dataOut.writeShort(info.id);
 382                     dataOut.writeInt(len);
 383                     dataOut.write(buf, off, len);
 384                     dataOut.flush();
 385                 } catch (IOException e) {
 386                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 387 
 388                     shutDown();
 389                     throw e;
 390                 }
 391         }
 392     }
 393 
 394     /**
 395      * Inform remote endpoint that connection has been closed.
 396      * @param info connection information structure
 397      */
 398     void sendClose(MultiplexConnectionInfo info) throws IOException
 399     {
 400         info.out.disconnect();
 401         synchronized (dataOut) {
 402             if (alive && !info.closed)
 403                 try {
 404                     dataOut.writeByte(CLOSE);
 405                     dataOut.writeShort(info.id);
 406                     dataOut.flush();
 407                     info.closed = true;
 408                 } catch (IOException e) {
 409                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 410 
 411                     shutDown();
 412                     throw e;
 413                 }
 414         }
 415     }
 416 
 417     /**
 418      * Acknowledge remote endpoint's closing of connection.
 419      * @param info connection information structure
 420      */
 421     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
 422     {
 423         synchronized (dataOut) {
 424             if (alive && !info.closed)
 425                 try {
 426                     dataOut.writeByte(CLOSEACK);
 427                     dataOut.writeShort(info.id);
 428                     dataOut.flush();
 429                     info.closed = true;
 430                 } catch (IOException e) {
 431                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 432 
 433                     shutDown();
 434                     throw e;
 435                 }
 436         }
 437     }
 438 
 439     /**
 440      * Shut down connection upon finalization.
 441      */
 442     protected void finalize() throws Throwable
 443     {
 444         super.finalize();
 445         shutDown();
 446     }
 447 }