1 /* 2 * Copyright (c) 1996, 2008, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.io.*; 28 import java.util.*; 29 import java.rmi.server.LogStream; 30 31 import sun.rmi.runtime.Log; 32 33 /** 34 * ConnectionMultiplexer manages the transparent multiplexing of 35 * multiple virtual connections from one endpoint to another through 36 * one given real connection to that endpoint. The input and output 37 * streams for the the underlying real connection must be supplied. 38 * A callback object is also supplied to be informed of new virtual 39 * connections opened by the remote endpoint. After creation, the 40 * run() method must be called in a thread created for demultiplexing 41 * the connections. The openConnection() method is called to 42 * initiate a virtual connection from this endpoint. 43 * 44 * @author Peter Jones 45 */ 46 final class ConnectionMultiplexer { 47 48 /** "multiplex" log level */ 49 static int logLevel = LogStream.parseLevel(getLogLevel()); 50 51 private static String getLogLevel() { 52 return java.security.AccessController.doPrivileged( 53 new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel")); 54 } 55 56 /* multiplex system log */ 57 static final Log multiplexLog = 58 Log.getLog("sun.rmi.transport.tcp.multiplex", 59 "multiplex", ConnectionMultiplexer.logLevel); 60 61 /** multiplexing protocol operation codes */ 62 private final static int OPEN = 0xE1; 63 private final static int CLOSE = 0xE2; 64 private final static int CLOSEACK = 0xE3; 65 private final static int REQUEST = 0xE4; 66 private final static int TRANSMIT = 0xE5; 67 68 /** object to notify for new connections from remote endpoint */ 69 private TCPChannel channel; 70 71 /** input stream for underlying single connection */ 72 private InputStream in; 73 74 /** output stream for underlying single connection */ 75 private OutputStream out; 76 77 /** true if underlying connection originated from this endpoint 78 (used for generating unique connection IDs) */ 79 private boolean orig; 80 81 /** layered stream for reading formatted data from underlying connection */ 82 private DataInputStream dataIn; 83 84 /** layered stream for writing formatted data to underlying connection */ 85 private DataOutputStream dataOut; 86 87 /** table holding currently open connection IDs and related info */ 88 private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7); 89 90 /** number of currently open connections */ 91 private int numConnections = 0; 92 93 /** maximum allowed open connections */ 94 private final static int maxConnections = 256; 95 96 /** ID of last connection opened */ 97 private int lastID = 0x1001; 98 99 /** true if this mechanism is still alive */ 100 private boolean alive = true; 101 102 /** 103 * Create a new ConnectionMultiplexer using the given underlying 104 * input/output stream pair. The run method must be called 105 * (possibly on a new thread) to handle the demultiplexing. 106 * @param channel object to notify when new connection is received 107 * @param in input stream of underlying connection 108 * @param out output stream of underlying connection 109 * @param orig true if this endpoint intiated the underlying 110 * connection (needs to be set differently at both ends) 111 */ 112 public ConnectionMultiplexer( 113 TCPChannel channel, 114 InputStream in, 115 OutputStream out, 116 boolean orig) 117 { 118 this.channel = channel; 119 this.in = in; 120 this.out = out; 121 this.orig = orig; 122 123 dataIn = new DataInputStream(in); 124 dataOut = new DataOutputStream(out); 125 } 126 127 /** 128 * Process multiplexing protocol received from underlying connection. 129 */ 130 public void run() throws IOException 131 { 132 try { 133 int op, id, length; 134 MultiplexConnectionInfo info; 135 136 while (true) { 137 138 // read next op code from remote endpoint 139 op = dataIn.readUnsignedByte(); 140 switch (op) { 141 142 // remote endpoint initiating new connection 143 case OPEN: 144 id = dataIn.readUnsignedShort(); 145 146 if (multiplexLog.isLoggable(Log.VERBOSE)) { 147 multiplexLog.log(Log.VERBOSE, "operation OPEN " + id); 148 } 149 150 info = connectionTable.get(id); 151 if (info != null) 152 throw new IOException( 153 "OPEN: Connection ID already exists"); 154 info = new MultiplexConnectionInfo(id); 155 info.in = new MultiplexInputStream(this, info, 2048); 156 info.out = new MultiplexOutputStream(this, info, 2048); 157 synchronized (connectionTable) { 158 connectionTable.put(id, info); 159 ++ numConnections; 160 } 161 sun.rmi.transport.Connection conn; 162 conn = new TCPConnection(channel, info.in, info.out); 163 channel.acceptMultiplexConnection(conn); 164 break; 165 166 // remote endpoint closing connection 167 case CLOSE: 168 id = dataIn.readUnsignedShort(); 169 170 if (multiplexLog.isLoggable(Log.VERBOSE)) { 171 multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id); 172 } 173 174 info = connectionTable.get(id); 175 if (info == null) 176 throw new IOException( 177 "CLOSE: Invalid connection ID"); 178 info.in.disconnect(); 179 info.out.disconnect(); 180 if (!info.closed) 181 sendCloseAck(info); 182 synchronized (connectionTable) { 183 connectionTable.remove(id); 184 -- numConnections; 185 } 186 break; 187 188 // remote endpoint acknowledging close of connection 189 case CLOSEACK: 190 id = dataIn.readUnsignedShort(); 191 192 if (multiplexLog.isLoggable(Log.VERBOSE)) { 193 multiplexLog.log(Log.VERBOSE, 194 "operation CLOSEACK " + id); 195 } 196 197 info = connectionTable.get(id); 198 if (info == null) 199 throw new IOException( 200 "CLOSEACK: Invalid connection ID"); 201 if (!info.closed) 202 throw new IOException( 203 "CLOSEACK: Connection not closed"); 204 info.in.disconnect(); 205 info.out.disconnect(); 206 synchronized (connectionTable) { 207 connectionTable.remove(id); 208 -- numConnections; 209 } 210 break; 211 212 // remote endpoint declaring additional bytes receivable 213 case REQUEST: 214 id = dataIn.readUnsignedShort(); 215 info = connectionTable.get(id); 216 if (info == null) 217 throw new IOException( 218 "REQUEST: Invalid connection ID"); 219 length = dataIn.readInt(); 220 221 if (multiplexLog.isLoggable(Log.VERBOSE)) { 222 multiplexLog.log(Log.VERBOSE, 223 "operation REQUEST " + id + ": " + length); 224 } 225 226 info.out.request(length); 227 break; 228 229 // remote endpoint transmitting data packet 230 case TRANSMIT: 231 id = dataIn.readUnsignedShort(); 232 info = connectionTable.get(id); 233 if (info == null) 234 throw new IOException("SEND: Invalid connection ID"); 235 length = dataIn.readInt(); 236 237 if (multiplexLog.isLoggable(Log.VERBOSE)) { 238 multiplexLog.log(Log.VERBOSE, 239 "operation TRANSMIT " + id + ": " + length); 240 } 241 242 info.in.receive(length, dataIn); 243 break; 244 245 default: 246 throw new IOException("Invalid operation: " + 247 Integer.toHexString(op)); 248 } 249 } 250 } finally { 251 shutDown(); 252 } 253 } 254 255 /** 256 * Initiate a new multiplexed connection through the underlying 257 * connection. 258 */ 259 public synchronized TCPConnection openConnection() throws IOException 260 { 261 // generate ID that should not be already used 262 // If all possible 32768 IDs are used, 263 // this method will block searching for a new ID forever. 264 int id; 265 do { 266 lastID = (++ lastID) & 0x7FFF; 267 id = lastID; 268 269 // The orig flag (copied to the high bit of the ID) is used 270 // to have two distinct ranges to choose IDs from for the 271 // two endpoints. 272 if (orig) 273 id |= 0x8000; 274 } while (connectionTable.get(id) != null); 275 276 // create multiplexing streams and bookkeeping information 277 MultiplexConnectionInfo info = new MultiplexConnectionInfo(id); 278 info.in = new MultiplexInputStream(this, info, 2048); 279 info.out = new MultiplexOutputStream(this, info, 2048); 280 281 // add to connection table if multiplexer has not died 282 synchronized (connectionTable) { 283 if (!alive) 284 throw new IOException("Multiplexer connection dead"); 285 if (numConnections >= maxConnections) 286 throw new IOException("Cannot exceed " + maxConnections + 287 " simultaneous multiplexed connections"); 288 connectionTable.put(id, info); 289 ++ numConnections; 290 } 291 292 // inform remote endpoint of new connection 293 synchronized (dataOut) { 294 try { 295 dataOut.writeByte(OPEN); 296 dataOut.writeShort(id); 297 dataOut.flush(); 298 } catch (IOException e) { 299 multiplexLog.log(Log.BRIEF, "exception: ", e); 300 301 shutDown(); 302 throw e; 303 } 304 } 305 306 return new TCPConnection(channel, info.in, info.out); 307 } 308 309 /** 310 * Shut down all connections and clean up. 311 */ 312 public void shutDown() 313 { 314 // inform all associated streams 315 synchronized (connectionTable) { 316 // return if multiplexer already officially dead 317 if (!alive) 318 return; 319 alive = false; 320 321 Enumeration<MultiplexConnectionInfo> enum_ = 322 connectionTable.elements(); 323 while (enum_.hasMoreElements()) { 324 MultiplexConnectionInfo info = enum_.nextElement(); 325 info.in.disconnect(); 326 info.out.disconnect(); 327 } 328 connectionTable.clear(); 329 numConnections = 0; 330 } 331 332 // close underlying connection, if possible (and not already done) 333 try { 334 in.close(); 335 } catch (IOException e) { 336 } 337 try { 338 out.close(); 339 } catch (IOException e) { 340 } 341 } 342 343 /** 344 * Send request for more data on connection to remote endpoint. 345 * @param info connection information structure 346 * @param len number of more bytes that can be received 347 */ 348 void sendRequest(MultiplexConnectionInfo info, int len) throws IOException 349 { 350 synchronized (dataOut) { 351 if (alive && !info.closed) 352 try { 353 dataOut.writeByte(REQUEST); 354 dataOut.writeShort(info.id); 355 dataOut.writeInt(len); 356 dataOut.flush(); 357 } catch (IOException e) { 358 multiplexLog.log(Log.BRIEF, "exception: ", e); 359 360 shutDown(); 361 throw e; 362 } 363 } 364 } 365 366 /** 367 * Send packet of requested data on connection to remote endpoint. 368 * @param info connection information structure 369 * @param buf array containg bytes to send 370 * @param off offset of first array index of packet 371 * @param len number of bytes in packet to send 372 */ 373 void sendTransmit(MultiplexConnectionInfo info, 374 byte buf[], int off, int len) throws IOException 375 { 376 synchronized (dataOut) { 377 if (alive && !info.closed) 378 try { 379 dataOut.writeByte(TRANSMIT); 380 dataOut.writeShort(info.id); 381 dataOut.writeInt(len); 382 dataOut.write(buf, off, len); 383 dataOut.flush(); 384 } catch (IOException e) { 385 multiplexLog.log(Log.BRIEF, "exception: ", e); 386 387 shutDown(); 388 throw e; 389 } 390 } 391 } 392 393 /** 394 * Inform remote endpoint that connection has been closed. 395 * @param info connection information structure 396 */ 397 void sendClose(MultiplexConnectionInfo info) throws IOException 398 { 399 info.out.disconnect(); 400 synchronized (dataOut) { 401 if (alive && !info.closed) 402 try { 403 dataOut.writeByte(CLOSE); 404 dataOut.writeShort(info.id); 405 dataOut.flush(); 406 info.closed = true; 407 } catch (IOException e) { 408 multiplexLog.log(Log.BRIEF, "exception: ", e); 409 410 shutDown(); 411 throw e; 412 } 413 } 414 } 415 416 /** 417 * Acknowledge remote endpoint's closing of connection. 418 * @param info connection information structure 419 */ 420 void sendCloseAck(MultiplexConnectionInfo info) throws IOException 421 { 422 synchronized (dataOut) { 423 if (alive && !info.closed) 424 try { 425 dataOut.writeByte(CLOSEACK); 426 dataOut.writeShort(info.id); 427 dataOut.flush(); 428 info.closed = true; 429 } catch (IOException e) { 430 multiplexLog.log(Log.BRIEF, "exception: ", e); 431 432 shutDown(); 433 throw e; 434 } 435 } 436 } 437 438 /** 439 * Shut down connection upon finalization. 440 */ 441 protected void finalize() throws Throwable 442 { 443 super.finalize(); 444 shutDown(); 445 } 446 }