1 /*
   2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.*;
  28 import java.util.*;
  29 import java.rmi.server.LogStream;
  30 import java.security.PrivilegedAction;
  31 
  32 import sun.rmi.runtime.Log;
  33 
  34 /**
  35  * ConnectionMultiplexer manages the transparent multiplexing of
  36  * multiple virtual connections from one endpoint to another through
  37  * one given real connection to that endpoint.  The input and output
  38  * streams for the the underlying real connection must be supplied.
  39  * A callback object is also supplied to be informed of new virtual
  40  * connections opened by the remote endpoint.  After creation, the
  41  * run() method must be called in a thread created for demultiplexing
  42  * the connections.  The openConnection() method is called to
  43  * initiate a virtual connection from this endpoint.
  44  *
  45  * @author Peter Jones
  46  */
  47 @SuppressWarnings("deprecation")
  48 final class ConnectionMultiplexer {
  49 
  50     /** "multiplex" log level */
  51     static int logLevel = LogStream.parseLevel(getLogLevel());
  52 
  53     private static String getLogLevel() {
  54         return java.security.AccessController.doPrivileged(
  55            (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel"));
  56     }
  57 
  58     /* multiplex system log */
  59     static final Log multiplexLog =
  60         Log.getLog("sun.rmi.transport.tcp.multiplex",
  61                    "multiplex", ConnectionMultiplexer.logLevel);
  62 
  63     /** multiplexing protocol operation codes */
  64     private final static int OPEN     = 0xE1;
  65     private final static int CLOSE    = 0xE2;
  66     private final static int CLOSEACK = 0xE3;
  67     private final static int REQUEST  = 0xE4;
  68     private final static int TRANSMIT = 0xE5;
  69 
  70     /** object to notify for new connections from remote endpoint */
  71     private TCPChannel channel;
  72 
  73     /** input stream for underlying single connection */
  74     private InputStream in;
  75 
  76     /** output stream for underlying single connection */
  77     private OutputStream out;
  78 
  79     /** true if underlying connection originated from this endpoint
  80         (used for generating unique connection IDs) */
  81     private boolean orig;
  82 
  83     /** layered stream for reading formatted data from underlying connection */
  84     private DataInputStream dataIn;
  85 
  86     /** layered stream for writing formatted data to underlying connection */
  87     private DataOutputStream dataOut;
  88 
  89     /** table holding currently open connection IDs and related info */
  90     private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
  91 
  92     /** number of currently open connections */
  93     private int numConnections = 0;
  94 
  95     /** maximum allowed open connections */
  96     private final static int maxConnections = 256;
  97 
  98     /** ID of last connection opened */
  99     private int lastID = 0x1001;
 100 
 101     /** true if this mechanism is still alive */
 102     private boolean alive = true;
 103 
 104     /**
 105      * Create a new ConnectionMultiplexer using the given underlying
 106      * input/output stream pair.  The run method must be called
 107      * (possibly on a new thread) to handle the demultiplexing.
 108      * @param channel object to notify when new connection is received
 109      * @param in input stream of underlying connection
 110      * @param out output stream of underlying connection
 111      * @param orig true if this endpoint intiated the underlying
 112      *        connection (needs to be set differently at both ends)
 113      */
 114     public ConnectionMultiplexer(
 115         TCPChannel    channel,
 116         InputStream   in,
 117         OutputStream  out,
 118         boolean       orig)
 119     {
 120         this.channel = channel;
 121         this.in      = in;
 122         this.out     = out;
 123         this.orig    = orig;
 124 
 125         dataIn = new DataInputStream(in);
 126         dataOut = new DataOutputStream(out);
 127     }
 128 
 129     /**
 130      * Process multiplexing protocol received from underlying connection.
 131      */
 132     public void run() throws IOException
 133     {
 134         try {
 135             int op, id, length;
 136             MultiplexConnectionInfo info;
 137 
 138             while (true) {
 139 
 140                 // read next op code from remote endpoint
 141                 op = dataIn.readUnsignedByte();
 142                 switch (op) {
 143 
 144                 // remote endpoint initiating new connection
 145                 case OPEN:
 146                     id = dataIn.readUnsignedShort();
 147 
 148                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 149                         multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
 150                     }
 151 
 152                     info = connectionTable.get(id);
 153                     if (info != null)
 154                         throw new IOException(
 155                             "OPEN: Connection ID already exists");
 156                     info = new MultiplexConnectionInfo(id);
 157                     info.in = new MultiplexInputStream(this, info, 2048);
 158                     info.out = new MultiplexOutputStream(this, info, 2048);
 159                     synchronized (connectionTable) {
 160                         connectionTable.put(id, info);
 161                         ++ numConnections;
 162                     }
 163                     sun.rmi.transport.Connection conn;
 164                     conn = new TCPConnection(channel, info.in, info.out);
 165                     channel.acceptMultiplexConnection(conn);
 166                     break;
 167 
 168                 // remote endpoint closing connection
 169                 case CLOSE:
 170                     id = dataIn.readUnsignedShort();
 171 
 172                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 173                         multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
 174                     }
 175 
 176                     info = connectionTable.get(id);
 177                     if (info == null)
 178                         throw new IOException(
 179                             "CLOSE: Invalid connection ID");
 180                     info.in.disconnect();
 181                     info.out.disconnect();
 182                     if (!info.closed)
 183                         sendCloseAck(info);
 184                     synchronized (connectionTable) {
 185                         connectionTable.remove(id);
 186                         -- numConnections;
 187                     }
 188                     break;
 189 
 190                 // remote endpoint acknowledging close of connection
 191                 case CLOSEACK:
 192                     id = dataIn.readUnsignedShort();
 193 
 194                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 195                         multiplexLog.log(Log.VERBOSE,
 196                             "operation  CLOSEACK " + id);
 197                     }
 198 
 199                     info = connectionTable.get(id);
 200                     if (info == null)
 201                         throw new IOException(
 202                             "CLOSEACK: Invalid connection ID");
 203                     if (!info.closed)
 204                         throw new IOException(
 205                             "CLOSEACK: Connection not closed");
 206                     info.in.disconnect();
 207                     info.out.disconnect();
 208                     synchronized (connectionTable) {
 209                         connectionTable.remove(id);
 210                         -- numConnections;
 211                     }
 212                     break;
 213 
 214                 // remote endpoint declaring additional bytes receivable
 215                 case REQUEST:
 216                     id = dataIn.readUnsignedShort();
 217                     info = connectionTable.get(id);
 218                     if (info == null)
 219                         throw new IOException(
 220                             "REQUEST: Invalid connection ID");
 221                     length = dataIn.readInt();
 222 
 223                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 224                         multiplexLog.log(Log.VERBOSE,
 225                             "operation  REQUEST " + id + ": " + length);
 226                     }
 227 
 228                     info.out.request(length);
 229                     break;
 230 
 231                 // remote endpoint transmitting data packet
 232                 case TRANSMIT:
 233                     id = dataIn.readUnsignedShort();
 234                     info = connectionTable.get(id);
 235                     if (info == null)
 236                         throw new IOException("SEND: Invalid connection ID");
 237                     length = dataIn.readInt();
 238 
 239                     if (multiplexLog.isLoggable(Log.VERBOSE)) {
 240                         multiplexLog.log(Log.VERBOSE,
 241                             "operation  TRANSMIT " + id + ": " + length);
 242                     }
 243 
 244                     info.in.receive(length, dataIn);
 245                     break;
 246 
 247                 default:
 248                     throw new IOException("Invalid operation: " +
 249                                           Integer.toHexString(op));
 250                 }
 251             }
 252         } finally {
 253             shutDown();
 254         }
 255     }
 256 
 257     /**
 258      * Initiate a new multiplexed connection through the underlying
 259      * connection.
 260      */
 261     public synchronized TCPConnection openConnection() throws IOException
 262     {
 263         // generate ID that should not be already used
 264         // If all possible 32768 IDs are used,
 265         // this method will block searching for a new ID forever.
 266         int id;
 267         do {
 268             lastID = (++ lastID) & 0x7FFF;
 269             id = lastID;
 270 
 271             // The orig flag (copied to the high bit of the ID) is used
 272             // to have two distinct ranges to choose IDs from for the
 273             // two endpoints.
 274             if (orig)
 275                 id |= 0x8000;
 276         } while (connectionTable.get(id) != null);
 277 
 278         // create multiplexing streams and bookkeeping information
 279         MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
 280         info.in = new MultiplexInputStream(this, info, 2048);
 281         info.out = new MultiplexOutputStream(this, info, 2048);
 282 
 283         // add to connection table if multiplexer has not died
 284         synchronized (connectionTable) {
 285             if (!alive)
 286                 throw new IOException("Multiplexer connection dead");
 287             if (numConnections >= maxConnections)
 288                 throw new IOException("Cannot exceed " + maxConnections +
 289                     " simultaneous multiplexed connections");
 290             connectionTable.put(id, info);
 291             ++ numConnections;
 292         }
 293 
 294         // inform remote endpoint of new connection
 295         synchronized (dataOut) {
 296             try {
 297                 dataOut.writeByte(OPEN);
 298                 dataOut.writeShort(id);
 299                 dataOut.flush();
 300             } catch (IOException e) {
 301                 multiplexLog.log(Log.BRIEF, "exception: ", e);
 302 
 303                 shutDown();
 304                 throw e;
 305             }
 306         }
 307 
 308         return new TCPConnection(channel, info.in, info.out);
 309     }
 310 
 311     /**
 312      * Shut down all connections and clean up.
 313      */
 314     public void shutDown()
 315     {
 316         // inform all associated streams
 317         synchronized (connectionTable) {
 318             // return if multiplexer already officially dead
 319             if (!alive)
 320                 return;
 321             alive = false;
 322 
 323             Enumeration<MultiplexConnectionInfo> enum_ =
 324                     connectionTable.elements();
 325             while (enum_.hasMoreElements()) {
 326                 MultiplexConnectionInfo info = enum_.nextElement();
 327                 info.in.disconnect();
 328                 info.out.disconnect();
 329             }
 330             connectionTable.clear();
 331             numConnections = 0;
 332         }
 333 
 334         // close underlying connection, if possible (and not already done)
 335         try {
 336             in.close();
 337         } catch (IOException e) {
 338         }
 339         try {
 340             out.close();
 341         } catch (IOException e) {
 342         }
 343     }
 344 
 345     /**
 346      * Send request for more data on connection to remote endpoint.
 347      * @param info connection information structure
 348      * @param len number of more bytes that can be received
 349      */
 350     void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
 351     {
 352         synchronized (dataOut) {
 353             if (alive && !info.closed)
 354                 try {
 355                     dataOut.writeByte(REQUEST);
 356                     dataOut.writeShort(info.id);
 357                     dataOut.writeInt(len);
 358                     dataOut.flush();
 359                 } catch (IOException e) {
 360                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 361 
 362                     shutDown();
 363                     throw e;
 364                 }
 365         }
 366     }
 367 
 368     /**
 369      * Send packet of requested data on connection to remote endpoint.
 370      * @param info connection information structure
 371      * @param buf array containing bytes to send
 372      * @param off offset of first array index of packet
 373      * @param len number of bytes in packet to send
 374      */
 375     void sendTransmit(MultiplexConnectionInfo info,
 376                       byte buf[], int off, int len) throws IOException
 377     {
 378         synchronized (dataOut) {
 379             if (alive && !info.closed)
 380                 try {
 381                     dataOut.writeByte(TRANSMIT);
 382                     dataOut.writeShort(info.id);
 383                     dataOut.writeInt(len);
 384                     dataOut.write(buf, off, len);
 385                     dataOut.flush();
 386                 } catch (IOException e) {
 387                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 388 
 389                     shutDown();
 390                     throw e;
 391                 }
 392         }
 393     }
 394 
 395     /**
 396      * Inform remote endpoint that connection has been closed.
 397      * @param info connection information structure
 398      */
 399     void sendClose(MultiplexConnectionInfo info) throws IOException
 400     {
 401         info.out.disconnect();
 402         synchronized (dataOut) {
 403             if (alive && !info.closed)
 404                 try {
 405                     dataOut.writeByte(CLOSE);
 406                     dataOut.writeShort(info.id);
 407                     dataOut.flush();
 408                     info.closed = true;
 409                 } catch (IOException e) {
 410                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 411 
 412                     shutDown();
 413                     throw e;
 414                 }
 415         }
 416     }
 417 
 418     /**
 419      * Acknowledge remote endpoint's closing of connection.
 420      * @param info connection information structure
 421      */
 422     void sendCloseAck(MultiplexConnectionInfo info) throws IOException
 423     {
 424         synchronized (dataOut) {
 425             if (alive && !info.closed)
 426                 try {
 427                     dataOut.writeByte(CLOSEACK);
 428                     dataOut.writeShort(info.id);
 429                     dataOut.flush();
 430                     info.closed = true;
 431                 } catch (IOException e) {
 432                     multiplexLog.log(Log.BRIEF, "exception: ", e);
 433 
 434                     shutDown();
 435                     throw e;
 436                 }
 437         }
 438     }
 439 
 440     /**
 441      * Shut down connection upon finalization.
 442      */
 443     protected void finalize() throws Throwable
 444     {
 445         super.finalize();
 446         shutDown();
 447     }
 448 }