/* * Copyright (c) 1996, 2008, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package sun.rmi.transport.tcp; import java.io.*; import java.util.*; import java.rmi.server.LogStream; import sun.rmi.runtime.Log; /** * ConnectionMultiplexer manages the transparent multiplexing of * multiple virtual connections from one endpoint to another through * one given real connection to that endpoint. The input and output * streams for the the underlying real connection must be supplied. * A callback object is also supplied to be informed of new virtual * connections opened by the remote endpoint. After creation, the * run() method must be called in a thread created for demultiplexing * the connections. The openConnection() method is called to * initiate a virtual connection from this endpoint. * * @author Peter Jones */ final class ConnectionMultiplexer { /** "multiplex" log level */ static int logLevel = LogStream.parseLevel(getLogLevel()); private static String getLogLevel() { return java.security.AccessController.doPrivileged( new sun.security.action.GetPropertyAction("sun.rmi.transport.tcp.multiplex.logLevel")); } /* multiplex system log */ static final Log multiplexLog = Log.getLog("sun.rmi.transport.tcp.multiplex", "multiplex", ConnectionMultiplexer.logLevel); /** multiplexing protocol operation codes */ private final static int OPEN = 0xE1; private final static int CLOSE = 0xE2; private final static int CLOSEACK = 0xE3; private final static int REQUEST = 0xE4; private final static int TRANSMIT = 0xE5; /** object to notify for new connections from remote endpoint */ private TCPChannel channel; /** input stream for underlying single connection */ private InputStream in; /** output stream for underlying single connection */ private OutputStream out; /** true if underlying connection originated from this endpoint (used for generating unique connection IDs) */ private boolean orig; /** layered stream for reading formatted data from underlying connection */ private DataInputStream dataIn; /** layered stream for writing formatted data to underlying connection */ private DataOutputStream dataOut; /** table holding currently open connection IDs and related info */ private Hashtable connectionTable = new Hashtable<>(7); /** number of currently open connections */ private int numConnections = 0; /** maximum allowed open connections */ private final static int maxConnections = 256; /** ID of last connection opened */ private int lastID = 0x1001; /** true if this mechanism is still alive */ private boolean alive = true; /** * Create a new ConnectionMultiplexer using the given underlying * input/output stream pair. The run method must be called * (possibly on a new thread) to handle the demultiplexing. * @param channel object to notify when new connection is received * @param in input stream of underlying connection * @param out output stream of underlying connection * @param orig true if this endpoint intiated the underlying * connection (needs to be set differently at both ends) */ public ConnectionMultiplexer( TCPChannel channel, InputStream in, OutputStream out, boolean orig) { this.channel = channel; this.in = in; this.out = out; this.orig = orig; dataIn = new DataInputStream(in); dataOut = new DataOutputStream(out); } /** * Process multiplexing protocol received from underlying connection. */ public void run() throws IOException { try { int op, id, length; MultiplexConnectionInfo info; while (true) { // read next op code from remote endpoint op = dataIn.readUnsignedByte(); switch (op) { // remote endpoint initiating new connection case OPEN: id = dataIn.readUnsignedShort(); if (multiplexLog.isLoggable(Log.VERBOSE)) { multiplexLog.log(Log.VERBOSE, "operation OPEN " + id); } info = connectionTable.get(id); if (info != null) throw new IOException( "OPEN: Connection ID already exists"); info = new MultiplexConnectionInfo(id); info.in = new MultiplexInputStream(this, info, 2048); info.out = new MultiplexOutputStream(this, info, 2048); synchronized (connectionTable) { connectionTable.put(id, info); ++ numConnections; } sun.rmi.transport.Connection conn; conn = new TCPConnection(channel, info.in, info.out); channel.acceptMultiplexConnection(conn); break; // remote endpoint closing connection case CLOSE: id = dataIn.readUnsignedShort(); if (multiplexLog.isLoggable(Log.VERBOSE)) { multiplexLog.log(Log.VERBOSE, "operation CLOSE " + id); } info = connectionTable.get(id); if (info == null) throw new IOException( "CLOSE: Invalid connection ID"); info.in.disconnect(); info.out.disconnect(); if (!info.closed) sendCloseAck(info); synchronized (connectionTable) { connectionTable.remove(id); -- numConnections; } break; // remote endpoint acknowledging close of connection case CLOSEACK: id = dataIn.readUnsignedShort(); if (multiplexLog.isLoggable(Log.VERBOSE)) { multiplexLog.log(Log.VERBOSE, "operation CLOSEACK " + id); } info = connectionTable.get(id); if (info == null) throw new IOException( "CLOSEACK: Invalid connection ID"); if (!info.closed) throw new IOException( "CLOSEACK: Connection not closed"); info.in.disconnect(); info.out.disconnect(); synchronized (connectionTable) { connectionTable.remove(id); -- numConnections; } break; // remote endpoint declaring additional bytes receivable case REQUEST: id = dataIn.readUnsignedShort(); info = connectionTable.get(id); if (info == null) throw new IOException( "REQUEST: Invalid connection ID"); length = dataIn.readInt(); if (multiplexLog.isLoggable(Log.VERBOSE)) { multiplexLog.log(Log.VERBOSE, "operation REQUEST " + id + ": " + length); } info.out.request(length); break; // remote endpoint transmitting data packet case TRANSMIT: id = dataIn.readUnsignedShort(); info = connectionTable.get(id); if (info == null) throw new IOException("SEND: Invalid connection ID"); length = dataIn.readInt(); if (multiplexLog.isLoggable(Log.VERBOSE)) { multiplexLog.log(Log.VERBOSE, "operation TRANSMIT " + id + ": " + length); } info.in.receive(length, dataIn); break; default: throw new IOException("Invalid operation: " + Integer.toHexString(op)); } } } finally { shutDown(); } } /** * Initiate a new multiplexed connection through the underlying * connection. */ public synchronized TCPConnection openConnection() throws IOException { // generate ID that should not be already used // If all possible 32768 IDs are used, // this method will block searching for a new ID forever. int id; do { lastID = (++ lastID) & 0x7FFF; id = lastID; // The orig flag (copied to the high bit of the ID) is used // to have two distinct ranges to choose IDs from for the // two endpoints. if (orig) id |= 0x8000; } while (connectionTable.get(id) != null); // create multiplexing streams and bookkeeping information MultiplexConnectionInfo info = new MultiplexConnectionInfo(id); info.in = new MultiplexInputStream(this, info, 2048); info.out = new MultiplexOutputStream(this, info, 2048); // add to connection table if multiplexer has not died synchronized (connectionTable) { if (!alive) throw new IOException("Multiplexer connection dead"); if (numConnections >= maxConnections) throw new IOException("Cannot exceed " + maxConnections + " simultaneous multiplexed connections"); connectionTable.put(id, info); ++ numConnections; } // inform remote endpoint of new connection synchronized (dataOut) { try { dataOut.writeByte(OPEN); dataOut.writeShort(id); dataOut.flush(); } catch (IOException e) { multiplexLog.log(Log.BRIEF, "exception: ", e); shutDown(); throw e; } } return new TCPConnection(channel, info.in, info.out); } /** * Shut down all connections and clean up. */ public void shutDown() { // inform all associated streams synchronized (connectionTable) { // return if multiplexer already officially dead if (!alive) return; alive = false; Enumeration enum_ = connectionTable.elements(); while (enum_.hasMoreElements()) { MultiplexConnectionInfo info = enum_.nextElement(); info.in.disconnect(); info.out.disconnect(); } connectionTable.clear(); numConnections = 0; } // close underlying connection, if possible (and not already done) try { in.close(); } catch (IOException e) { } try { out.close(); } catch (IOException e) { } } /** * Send request for more data on connection to remote endpoint. * @param info connection information structure * @param len number of more bytes that can be received */ void sendRequest(MultiplexConnectionInfo info, int len) throws IOException { synchronized (dataOut) { if (alive && !info.closed) try { dataOut.writeByte(REQUEST); dataOut.writeShort(info.id); dataOut.writeInt(len); dataOut.flush(); } catch (IOException e) { multiplexLog.log(Log.BRIEF, "exception: ", e); shutDown(); throw e; } } } /** * Send packet of requested data on connection to remote endpoint. * @param info connection information structure * @param buf array containg bytes to send * @param off offset of first array index of packet * @param len number of bytes in packet to send */ void sendTransmit(MultiplexConnectionInfo info, byte buf[], int off, int len) throws IOException { synchronized (dataOut) { if (alive && !info.closed) try { dataOut.writeByte(TRANSMIT); dataOut.writeShort(info.id); dataOut.writeInt(len); dataOut.write(buf, off, len); dataOut.flush(); } catch (IOException e) { multiplexLog.log(Log.BRIEF, "exception: ", e); shutDown(); throw e; } } } /** * Inform remote endpoint that connection has been closed. * @param info connection information structure */ void sendClose(MultiplexConnectionInfo info) throws IOException { info.out.disconnect(); synchronized (dataOut) { if (alive && !info.closed) try { dataOut.writeByte(CLOSE); dataOut.writeShort(info.id); dataOut.flush(); info.closed = true; } catch (IOException e) { multiplexLog.log(Log.BRIEF, "exception: ", e); shutDown(); throw e; } } } /** * Acknowledge remote endpoint's closing of connection. * @param info connection information structure */ void sendCloseAck(MultiplexConnectionInfo info) throws IOException { synchronized (dataOut) { if (alive && !info.closed) try { dataOut.writeByte(CLOSEACK); dataOut.writeShort(info.id); dataOut.flush(); info.closed = true; } catch (IOException e) { multiplexLog.log(Log.BRIEF, "exception: ", e); shutDown(); throw e; } } } /** * Shut down connection upon finalization. */ protected void finalize() throws Throwable { super.finalize(); shutDown(); } }