/* * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package sun.rmi.transport.tcp; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.lang.ref.Reference; import java.lang.ref.SoftReference; import java.net.Socket; import java.rmi.ConnectIOException; import java.rmi.RemoteException; import java.security.AccessControlContext; import java.security.AccessController; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import java.util.WeakHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import sun.rmi.runtime.Log; import sun.rmi.runtime.NewThreadAction; import sun.rmi.runtime.RuntimeUtil; import sun.rmi.transport.Channel; import sun.rmi.transport.Connection; import sun.rmi.transport.Endpoint; import sun.rmi.transport.TransportConstants; import sun.security.action.GetIntegerAction; import sun.security.action.GetLongAction; /** * TCPChannel is the socket-based implementation of the RMI Channel * abstraction. * * @author Ann Wollrath */ public class TCPChannel implements Channel { /** endpoint for this channel */ private final TCPEndpoint ep; /** transport for this channel */ private final TCPTransport tr; /** list of cached connections */ private final List freeList = new ArrayList<>(); /** frees cached connections that have expired (guarded by freeList) */ private Future reaper = null; /** using multiplexer (for bi-directional applet communication */ private boolean usingMultiplexer = false; /** connection multiplexer, if used */ private ConnectionMultiplexer multiplexer = null; /** connection acceptor (should be in TCPTransport) */ private ConnectionAcceptor acceptor; /** most recently authorized AccessControlContext */ private AccessControlContext okContext; /** cache of authorized AccessControlContexts */ private WeakHashMap> authcache; /** the SecurityManager which authorized okContext and authcache */ private SecurityManager cacheSecurityManager = null; /** client-side connection idle usage timeout */ private static final long idleTimeout = // default 15 seconds AccessController.doPrivileged( new GetLongAction("sun.rmi.transport.connectionTimeout", 15000)); /** client-side connection handshake read timeout */ private static final int handshakeTimeout = // default 1 minute AccessController.doPrivileged( new GetIntegerAction("sun.rmi.transport.tcp.handshakeTimeout", 60000)); /** client-side connection response read timeout (after handshake) */ private static final int responseTimeout = // default infinity AccessController.doPrivileged( new GetIntegerAction("sun.rmi.transport.tcp.responseTimeout", 0)); /** thread pool for scheduling delayed tasks */ private static final ScheduledExecutorService scheduler = AccessController.doPrivileged( new RuntimeUtil.GetInstanceAction()).getScheduler(); /** * Create channel for endpoint. */ TCPChannel(TCPTransport tr, TCPEndpoint ep) { this.tr = tr; this.ep = ep; } /** * Return the endpoint for this channel. */ public Endpoint getEndpoint() { return ep; } /** * Checks if the current caller has sufficient privilege to make * a connection to the remote endpoint. * @exception SecurityException if caller is not allowed to use this * Channel. */ private void checkConnectPermission() throws SecurityException { SecurityManager security = System.getSecurityManager(); if (security == null) return; if (security != cacheSecurityManager) { // The security manager changed: flush the cache okContext = null; authcache = new WeakHashMap>(); cacheSecurityManager = security; } AccessControlContext ctx = AccessController.getContext(); // If ctx is the same context as last time, or if it // appears in the cache, bypass the checkConnect. if (okContext == null || !(okContext.equals(ctx) || authcache.containsKey(ctx))) { security.checkConnect(ep.getHost(), ep.getPort()); authcache.put(ctx, new SoftReference(ctx)); // A WeakHashMap is transformed into a SoftHashSet by making // each value softly refer to its own key (Peter's idea). } okContext = ctx; } /** * Supplies a connection to the endpoint of the address space * for which this is a channel. The returned connection may * be one retrieved from a cache of idle connections. */ public Connection newConnection() throws RemoteException { TCPConnection conn; // loop until we find a free live connection (in which case // we return) or until we run out of freelist (in which case // the loop exits) do { conn = null; // try to get a free connection synchronized (freeList) { int elementPos = freeList.size()-1; if (elementPos >= 0) { // If there is a security manager, make sure // the caller is allowed to connect to the // requested endpoint. checkConnectPermission(); conn = freeList.get(elementPos); freeList.remove(elementPos); } } // at this point, conn is null iff the freelist is empty, // and nonnull if a free connection of uncertain vitality // has been found. if (conn != null) { // check to see if the connection has closed since last use if (!conn.isDead()) { TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); return conn; } // conn is dead, and cannot be reused (reuse => false) this.free(conn, false); } } while (conn != null); // none free, so create a new connection return (createConnection()); } /** * Create a new connection to the remote endpoint of this channel. * The returned connection is new. The caller must already have * passed a security checkConnect or equivalent. */ private Connection createConnection() throws RemoteException { Connection conn; TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); if (!usingMultiplexer) { Socket sock = ep.newSocket(); conn = new TCPConnection(this, sock); try { DataOutputStream out = new DataOutputStream(conn.getOutputStream()); writeTransportHeader(out); // choose protocol (single op if not reusable socket) if (!conn.isReusable()) { out.writeByte(TransportConstants.SingleOpProtocol); } else { out.writeByte(TransportConstants.StreamProtocol); out.flush(); /* * Set socket read timeout to configured value for JRMP * connection handshake; this also serves to guard against * non-JRMP servers that do not respond (see 4322806). */ int originalSoTimeout = 0; try { originalSoTimeout = sock.getSoTimeout(); sock.setSoTimeout(handshakeTimeout); } catch (Exception e) { // if we fail to set this, ignore and proceed anyway } DataInputStream in = new DataInputStream(conn.getInputStream()); byte ack = in.readByte(); if (ack != TransportConstants.ProtocolAck) { throw new ConnectIOException( ack == TransportConstants.ProtocolNack ? "JRMP StreamProtocol not supported by server" : "non-JRMP server at remote endpoint"); } String suggestedHost = in.readUTF(); int suggestedPort = in.readInt(); if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { TCPTransport.tcpLog.log(Log.VERBOSE, "server suggested " + suggestedHost + ":" + suggestedPort); } // set local host name, if unknown TCPEndpoint.setLocalHost(suggestedHost); // do NOT set the default port, because we don't // know if we can't listen YET... // write out default endpoint to match protocol // (but it serves no purpose) TCPEndpoint localEp = TCPEndpoint.getLocalEndpoint(0, null, null); out.writeUTF(localEp.getHost()); out.writeInt(localEp.getPort()); if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { TCPTransport.tcpLog.log(Log.VERBOSE, "using " + localEp.getHost() + ":" + localEp.getPort()); } /* * After JRMP handshake, set socket read timeout to value * configured for the rest of the lifetime of the * connection. NOTE: this timeout, if configured to a * finite duration, places an upper bound on the time * that a remote method call is permitted to execute. */ try { /* * If socket factory had set a non-zero timeout on its * own, then restore it instead of using the property- * configured value. */ sock.setSoTimeout((originalSoTimeout != 0 ? originalSoTimeout : responseTimeout)); } catch (Exception e) { // if we fail to set this, ignore and proceed anyway } out.flush(); } } catch (IOException e) { if (e instanceof RemoteException) throw (RemoteException) e; else throw new ConnectIOException( "error during JRMP connection establishment", e); } } else { try { conn = multiplexer.openConnection(); } catch (IOException e) { synchronized (this) { usingMultiplexer = false; multiplexer = null; } throw new ConnectIOException( "error opening virtual connection " + "over multiplexed connection", e); } } return conn; } /** * Free the connection generated by this channel. * @param conn The connection * @param reuse If true, the connection is in a state in which it * can be reused for another method call. */ public void free(Connection conn, boolean reuse) { if (conn == null) return; if (reuse && conn.isReusable()) { long lastuse = System.currentTimeMillis(); TCPConnection tcpConnection = (TCPConnection) conn; TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); /* * Cache connection; if reaper task for expired * connections isn't scheduled, then schedule it. */ synchronized (freeList) { freeList.add(tcpConnection); if (reaper == null) { TCPTransport.tcpLog.log(Log.BRIEF, "create reaper"); reaper = scheduler.scheduleWithFixedDelay( new Runnable() { public void run() { TCPTransport.tcpLog.log(Log.VERBOSE, "wake up"); freeCachedConnections(); } }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS); } } tcpConnection.setLastUseTime(lastuse); tcpConnection.setExpiration(lastuse + idleTimeout); } else { TCPTransport.tcpLog.log(Log.BRIEF, "close connection"); try { conn.close(); } catch (IOException ignored) { } } } /** * Send transport header over stream. */ private void writeTransportHeader(DataOutputStream out) throws RemoteException { try { // write out transport header DataOutputStream dataOut = new DataOutputStream(out); dataOut.writeInt(TransportConstants.Magic); dataOut.writeShort(TransportConstants.Version); } catch (IOException e) { throw new ConnectIOException( "error writing JRMP transport header", e); } } /** * Use given connection multiplexer object to obtain new connections * through this channel. */ synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) { // for now, always just use the last one given multiplexer = newMultiplexer; usingMultiplexer = true; } /** * Accept a connection provided over a multiplexed channel. */ void acceptMultiplexConnection(Connection conn) { if (acceptor == null) { acceptor = new ConnectionAcceptor(tr); acceptor.startNewAcceptor(); } acceptor.accept(conn); } /** * Closes all the connections in the cache, whether timed out or not. */ public void shedCache() { // Build a list of connections, to avoid holding the freeList // lock during (potentially long-running) close() calls. Connection[] conn; synchronized (freeList) { conn = freeList.toArray(new Connection[freeList.size()]); freeList.clear(); } // Close all the connections that were free for (int i = conn.length; --i >= 0; ) { Connection c = conn[i]; conn[i] = null; // help gc try { c.close(); } catch (java.io.IOException e) { // eat exception } } } private void freeCachedConnections() { /* * Remove each connection whose time out has expired. */ synchronized (freeList) { int size = freeList.size(); if (size > 0) { long time = System.currentTimeMillis(); ListIterator iter = freeList.listIterator(size); while (iter.hasPrevious()) { TCPConnection conn = iter.previous(); if (conn.expired(time)) { TCPTransport.tcpLog.log(Log.VERBOSE, "connection timeout expired"); try { conn.close(); } catch (java.io.IOException e) { // eat exception } iter.remove(); } } } if (freeList.isEmpty()) { reaper.cancel(false); reaper = null; } } } } /** * ConnectionAcceptor manages accepting new connections and giving them * to TCPTransport's message handler on new threads. * * Since this object only needs to know which transport to give new * connections to, it doesn't need to be per-channel as currently * implemented. */ class ConnectionAcceptor implements Runnable { /** transport that will handle message on accepted connections */ private TCPTransport transport; /** queue of connections to be accepted */ private List queue = new ArrayList<>(); /** thread ID counter */ private static int threadNum = 0; /** * Create a new ConnectionAcceptor that will give connections * to the specified transport on a new thread. */ public ConnectionAcceptor(TCPTransport transport) { this.transport = transport; } /** * Start a new thread to accept connections. */ public void startNewAcceptor() { Thread t = AccessController.doPrivileged( new NewThreadAction(ConnectionAcceptor.this, "Multiplex Accept-" + ++ threadNum, true)); t.start(); } /** * Add connection to queue of connections to be accepted. */ public void accept(Connection conn) { synchronized (queue) { queue.add(conn); queue.notify(); } } /** * Give transport next accepted conection, when available. */ public void run() { Connection conn; synchronized (queue) { while (queue.size() == 0) { try { queue.wait(); } catch (InterruptedException e) { } } startNewAcceptor(); conn = queue.remove(0); } transport.handleMessages(conn, true); } }