1 /* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.io.*; 28 import java.util.*; 29 import java.rmi.server.LogStream; 30 import java.security.PrivilegedAction; 31 32 import sun.rmi.runtime.Log; 33 34 /** 35 * ConnectionMultiplexer manages the transparent multiplexing of 36 * multiple virtual connections from one endpoint to another through 37 * one given real connection to that endpoint. The input and output 38 * streams for the the underlying real connection must be supplied. 39 * A callback object is also supplied to be informed of new virtual 40 * connections opened by the remote endpoint. After creation, the 41 * run() method must be called in a thread created for demultiplexing 42 * the connections. The openConnection() method is called to 43 * initiate a virtual connection from this endpoint. 44 * 45 * @author Peter Jones 46 */ 47 @SuppressWarnings("deprecation") 48 final class ConnectionMultiplexer { 49 50 /** "multiplex" log level */ 51 static int logLevel = LogStream.parseLevel(getLogLevel()); 52 53 private static String getLogLevel() { 54 return java.security.AccessController.doPrivileged( 55 (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel")); 56 } 57 58 /* multiplex system log */ 59 static final Log multiplexLog = 60 Log.getLog("sun.rmi.transport.tcp.multiplex", 61 "multiplex", ConnectionMultiplexer.logLevel); 62 63 /** multiplexing protocol operation codes */ 64 private final static int OPEN = 0xE1; 65 private final static int CLOSE = 0xE2; 66 private final static int CLOSEACK = 0xE3; 67 private final static int REQUEST = 0xE4; 68 private final static int TRANSMIT = 0xE5; 69 70 /** object to notify for new connections from remote endpoint */ 71 private TCPChannel channel; 72 73 /** input stream for underlying single connection */ 74 private InputStream in; 75 76 /** output stream for underlying single connection */ 77 private OutputStream out; 78 79 /** true if underlying connection originated from this endpoint 80 (used for generating unique connection IDs) */ 81 private boolean orig; 82 83 /** layered stream for reading formatted data from underlying connection */ 84 private DataInputStream dataIn; 85 86 /** layered stream for writing formatted data to underlying connection */ 87 private DataOutputStream dataOut; 88 89 /** table holding currently open connection IDs and related info */ 90 private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7); 91 92 /** number of currently open connections */ 93 private int numConnections = 0; 94 95 /** maximum allowed open connections */ 96 private final static int maxConnections = 256; 97 98 /** ID of last connection opened */ 99 private int lastID = 0x1001; 100 101 /** true if this mechanism is still alive */ 102 private boolean alive = true; 103 104 /** 105 * Create a new ConnectionMultiplexer using the given underlying 106 * input/output stream pair. The run method must be called 107 * (possibly on a new thread) to handle the demultiplexing. 108 * @param channel object to notify when new connection is received 109 * @param in input stream of underlying connection 110 * @param out output stream of underlying connection 111 * @param orig true if this endpoint intiated the underlying 112 * connection (needs to be set differently at both ends) 113 */ 114 public ConnectionMultiplexer( 115 TCPChannel channel, 116 InputStream in, 117 OutputStream out, 118 boolean orig) 119 { 120 this.channel = channel; 121 this.in = in; 122 this.out = out; 123 this.orig = orig; 124 125 dataIn = new DataInputStream(in); 126 dataOut = new DataOutputStream(out); 127 } 128 129 /** 130 * Process multiplexing protocol received from underlying connection. 131 */ 132 public void run() throws IOException 133 { 134 try { 135 int op, id, length; 136 MultiplexConnectionInfo info; 137 138 while (true) { 139 140 // read next op code from remote endpoint 141 op = dataIn.readUnsignedByte(); 142 switch (op) { 143 144 // remote endpoint initiating new connection 145 case OPEN: 146 id = dataIn.readUnsignedShort(); 147 148 if (multiplexLog.isLoggable(Log.VERBOSE)) { 149 multiplexLog.log(Log.VERBOSE, "operation OPEN " + id); 150 } 151 152 info = connectionTable.get(id); 153 if (info != null) 154 throw new IOException( 155 "OPEN: Connection ID already exists"); 156 info = new MultiplexConnectionInfo(id); 157 info.in = new MultiplexInputStream(this, info, 2048); 158 info.out = new MultiplexOutputStream(this, info, 2048); 159 synchronized (connectionTable) { 160 connectionTable.put(id, info); 161 ++ numConnections; 162 } 163 sun.rmi.transport.Connection conn; 164 conn = new TCPConnection(channel, info.in, info.out); 165 channel.acceptMultiplexConnection(conn); 166 break; 167 168 // remote endpoint closing connection 169 case CLOSE: 170 id = dataIn.readUnsignedShort(); 171 172 if (multiplexLog.isLoggable(Log.VERBOSE)) { 173 multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id); 174 } 175 176 info = connectionTable.get(id); 177 if (info == null) 178 throw new IOException( 179 "CLOSE: Invalid connection ID"); 180 info.in.disconnect(); 181 info.out.disconnect(); 182 if (!info.closed) 183 sendCloseAck(info); 184 synchronized (connectionTable) { 185 connectionTable.remove(id); 186 -- numConnections; 187 } 188 break; 189 190 // remote endpoint acknowledging close of connection 191 case CLOSEACK: 192 id = dataIn.readUnsignedShort(); 193 194 if (multiplexLog.isLoggable(Log.VERBOSE)) { 195 multiplexLog.log(Log.VERBOSE, 196 "operation CLOSEACK " + id); 197 } 198 199 info = connectionTable.get(id); 200 if (info == null) 201 throw new IOException( 202 "CLOSEACK: Invalid connection ID"); 203 if (!info.closed) 204 throw new IOException( 205 "CLOSEACK: Connection not closed"); 206 info.in.disconnect(); 207 info.out.disconnect(); 208 synchronized (connectionTable) { 209 connectionTable.remove(id); 210 -- numConnections; 211 } 212 break; 213 214 // remote endpoint declaring additional bytes receivable 215 case REQUEST: 216 id = dataIn.readUnsignedShort(); 217 info = connectionTable.get(id); 218 if (info == null) 219 throw new IOException( 220 "REQUEST: Invalid connection ID"); 221 length = dataIn.readInt(); 222 223 if (multiplexLog.isLoggable(Log.VERBOSE)) { 224 multiplexLog.log(Log.VERBOSE, 225 "operation REQUEST " + id + ": " + length); 226 } 227 228 info.out.request(length); 229 break; 230 231 // remote endpoint transmitting data packet 232 case TRANSMIT: 233 id = dataIn.readUnsignedShort(); 234 info = connectionTable.get(id); 235 if (info == null) 236 throw new IOException("SEND: Invalid connection ID"); 237 length = dataIn.readInt(); 238 239 if (multiplexLog.isLoggable(Log.VERBOSE)) { 240 multiplexLog.log(Log.VERBOSE, 241 "operation TRANSMIT " + id + ": " + length); 242 } 243 244 info.in.receive(length, dataIn); 245 break; 246 247 default: 248 throw new IOException("Invalid operation: " + 249 Integer.toHexString(op)); 250 } 251 } 252 } finally { 253 shutDown(); 254 } 255 } 256 257 /** 258 * Initiate a new multiplexed connection through the underlying 259 * connection. 260 */ 261 public synchronized TCPConnection openConnection() throws IOException 262 { 263 // generate ID that should not be already used 264 // If all possible 32768 IDs are used, 265 // this method will block searching for a new ID forever. 266 int id; 267 do { 268 lastID = (++ lastID) & 0x7FFF; 269 id = lastID; 270 271 // The orig flag (copied to the high bit of the ID) is used 272 // to have two distinct ranges to choose IDs from for the 273 // two endpoints. 274 if (orig) 275 id |= 0x8000; 276 } while (connectionTable.get(id) != null); 277 278 // create multiplexing streams and bookkeeping information 279 MultiplexConnectionInfo info = new MultiplexConnectionInfo(id); 280 info.in = new MultiplexInputStream(this, info, 2048); 281 info.out = new MultiplexOutputStream(this, info, 2048); 282 283 // add to connection table if multiplexer has not died 284 synchronized (connectionTable) { 285 if (!alive) 286 throw new IOException("Multiplexer connection dead"); 287 if (numConnections >= maxConnections) 288 throw new IOException("Cannot exceed " + maxConnections + 289 " simultaneous multiplexed connections"); 290 connectionTable.put(id, info); 291 ++ numConnections; 292 } 293 294 // inform remote endpoint of new connection 295 synchronized (dataOut) { 296 try { 297 dataOut.writeByte(OPEN); 298 dataOut.writeShort(id); 299 dataOut.flush(); 300 } catch (IOException e) { 301 multiplexLog.log(Log.BRIEF, "exception: ", e); 302 303 shutDown(); 304 throw e; 305 } 306 } 307 308 return new TCPConnection(channel, info.in, info.out); 309 } 310 311 /** 312 * Shut down all connections and clean up. 313 */ 314 public void shutDown() 315 { 316 // inform all associated streams 317 synchronized (connectionTable) { 318 // return if multiplexer already officially dead 319 if (!alive) 320 return; 321 alive = false; 322 323 Enumeration<MultiplexConnectionInfo> enum_ = 324 connectionTable.elements(); 325 while (enum_.hasMoreElements()) { 326 MultiplexConnectionInfo info = enum_.nextElement(); 327 info.in.disconnect(); 328 info.out.disconnect(); 329 } 330 connectionTable.clear(); 331 numConnections = 0; 332 } 333 334 // close underlying connection, if possible (and not already done) 335 try { 336 in.close(); 337 } catch (IOException e) { 338 } 339 try { 340 out.close(); 341 } catch (IOException e) { 342 } 343 } 344 345 /** 346 * Send request for more data on connection to remote endpoint. 347 * @param info connection information structure 348 * @param len number of more bytes that can be received 349 */ 350 void sendRequest(MultiplexConnectionInfo info, int len) throws IOException 351 { 352 synchronized (dataOut) { 353 if (alive && !info.closed) 354 try { 355 dataOut.writeByte(REQUEST); 356 dataOut.writeShort(info.id); 357 dataOut.writeInt(len); 358 dataOut.flush(); 359 } catch (IOException e) { 360 multiplexLog.log(Log.BRIEF, "exception: ", e); 361 362 shutDown(); 363 throw e; 364 } 365 } 366 } 367 368 /** 369 * Send packet of requested data on connection to remote endpoint. 370 * @param info connection information structure 371 * @param buf array containing bytes to send 372 * @param off offset of first array index of packet 373 * @param len number of bytes in packet to send 374 */ 375 void sendTransmit(MultiplexConnectionInfo info, 376 byte buf[], int off, int len) throws IOException 377 { 378 synchronized (dataOut) { 379 if (alive && !info.closed) 380 try { 381 dataOut.writeByte(TRANSMIT); 382 dataOut.writeShort(info.id); 383 dataOut.writeInt(len); 384 dataOut.write(buf, off, len); 385 dataOut.flush(); 386 } catch (IOException e) { 387 multiplexLog.log(Log.BRIEF, "exception: ", e); 388 389 shutDown(); 390 throw e; 391 } 392 } 393 } 394 395 /** 396 * Inform remote endpoint that connection has been closed. 397 * @param info connection information structure 398 */ 399 void sendClose(MultiplexConnectionInfo info) throws IOException 400 { 401 info.out.disconnect(); 402 synchronized (dataOut) { 403 if (alive && !info.closed) 404 try { 405 dataOut.writeByte(CLOSE); 406 dataOut.writeShort(info.id); 407 dataOut.flush(); 408 info.closed = true; 409 } catch (IOException e) { 410 multiplexLog.log(Log.BRIEF, "exception: ", e); 411 412 shutDown(); 413 throw e; 414 } 415 } 416 } 417 418 /** 419 * Acknowledge remote endpoint's closing of connection. 420 * @param info connection information structure 421 */ 422 void sendCloseAck(MultiplexConnectionInfo info) throws IOException 423 { 424 synchronized (dataOut) { 425 if (alive && !info.closed) 426 try { 427 dataOut.writeByte(CLOSEACK); 428 dataOut.writeShort(info.id); 429 dataOut.flush(); 430 info.closed = true; 431 } catch (IOException e) { 432 multiplexLog.log(Log.BRIEF, "exception: ", e); 433 434 shutDown(); 435 throw e; 436 } 437 } 438 } 439 440 /** 441 * Shut down connection upon finalization. 442 */ 443 protected void finalize() throws Throwable 444 { 445 super.finalize(); 446 shutDown(); 447 } 448 }