/* * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package sun.rmi.transport.tcp; import java.lang.ref.Reference; import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.lang.reflect.InvocationTargetException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.rmi.RemoteException; import java.rmi.server.ExportException; import java.rmi.server.LogStream; import java.rmi.server.RMIFailureHandler; import java.rmi.server.RMISocketFactory; import java.rmi.server.RemoteCall; import java.rmi.server.ServerNotActiveException; import java.rmi.server.UID; import java.security.AccessControlContext; import java.security.AccessController; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.WeakHashMap; import java.util.logging.Level; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import sun.rmi.runtime.Log; import sun.rmi.runtime.NewThreadAction; import sun.rmi.transport.Channel; import sun.rmi.transport.Connection; import sun.rmi.transport.DGCAckHandler; import sun.rmi.transport.Endpoint; import sun.rmi.transport.StreamRemoteCall; import sun.rmi.transport.Target; import sun.rmi.transport.Transport; import sun.rmi.transport.TransportConstants; import sun.rmi.transport.proxy.HttpReceiveSocket; import sun.security.action.GetIntegerAction; import sun.security.action.GetLongAction; import sun.security.action.GetPropertyAction; /** * TCPTransport is the socket-based implementation of the RMI Transport * abstraction. * * @author Ann Wollrath * @author Peter Jones */ public class TCPTransport extends Transport { /* tcp package log */ static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp", LogStream.parseLevel(AccessController.doPrivileged( new GetPropertyAction("sun.rmi.transport.tcp.logLevel")))); /** maximum number of connection handler threads */ private static final int maxConnectionThreads = // default no limit AccessController.doPrivileged( new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads", Integer.MAX_VALUE)); /** keep alive time for idle connection handler threads */ private static final long threadKeepAliveTime = // default 1 minute AccessController.doPrivileged( new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime", 60000)); /** thread pool for connection handlers */ private static final ExecutorService connectionThreadPool = new ThreadPoolExecutor(0, maxConnectionThreads, threadKeepAliveTime, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { return AccessController.doPrivileged(new NewThreadAction( runnable, "TCP Connection(idle)", true, true)); } }); /** total connections handled */ private static final AtomicInteger connectionCount = new AtomicInteger(0); /** client host for the current thread's connection */ private static final ThreadLocal threadConnectionHandler = new ThreadLocal<>(); /** endpoints for this transport */ private final LinkedList epList; /** number of objects exported on this transport */ private int exportCount = 0; /** server socket for this transport */ private ServerSocket server = null; /** table mapping endpoints to channels */ private final Map> channelTable = new WeakHashMap<>(); static final RMISocketFactory defaultSocketFactory = RMISocketFactory.getDefaultSocketFactory(); /** number of milliseconds in accepted-connection timeout. * Warning: this should be greater than 15 seconds (the client-side * timeout), and defaults to 2 hours. * The maximum representable value is slightly more than 24 days * and 20 hours. */ private static final int connectionReadTimeout = // default 2 hours AccessController.doPrivileged( new GetIntegerAction("sun.rmi.transport.tcp.readTimeout", 2 * 3600 * 1000)); /** * Constructs a TCPTransport. */ TCPTransport(LinkedList epList) { // assert ((epList.size() != null) && (epList.size() >= 1)) this.epList = epList; if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "Version = " + TransportConstants.Version + ", ep = " + getEndpoint()); } } /** * Closes all cached connections in every channel subordinated to this * transport. Currently, this only closes outgoing connections. */ public void shedConnectionCaches() { List channels; synchronized (channelTable) { channels = new ArrayList(channelTable.values().size()); for (Reference ref : channelTable.values()) { TCPChannel ch = ref.get(); if (ch != null) { channels.add(ch); } } } for (TCPChannel channel : channels) { channel.shedCache(); } } /** * Returns a Channel that generates connections to the * endpoint ep. A Channel is an object that creates and * manages connections of a particular type to some particular * address space. * @param ep the endpoint to which connections will be generated. * @return the channel or null if the transport cannot * generate connections to this endpoint */ public TCPChannel getChannel(Endpoint ep) { TCPChannel ch = null; if (ep instanceof TCPEndpoint) { synchronized (channelTable) { Reference ref = channelTable.get(ep); if (ref != null) { ch = ref.get(); } if (ch == null) { TCPEndpoint tcpEndpoint = (TCPEndpoint) ep; ch = new TCPChannel(this, tcpEndpoint); channelTable.put(tcpEndpoint, new WeakReference(ch)); } } } return ch; } /** * Removes the Channel that generates connections to the * endpoint ep. */ public void free(Endpoint ep) { if (ep instanceof TCPEndpoint) { synchronized (channelTable) { Reference ref = channelTable.remove(ep); if (ref != null) { TCPChannel channel = ref.get(); if (channel != null) { channel.shedCache(); } } } } } /** * Export the object so that it can accept incoming calls. */ public void exportObject(Target target) throws RemoteException { /* * Ensure that a server socket is listening, and count this * export while synchronized to prevent the server socket from * being closed due to concurrent unexports. */ synchronized (this) { listen(); exportCount++; } /* * Try to add the Target to the exported object table; keep * counting this export (to keep server socket open) only if * that succeeds. */ boolean ok = false; try { super.exportObject(target); ok = true; } finally { if (!ok) { synchronized (this) { decrementExportCount(); } } } } protected synchronized void targetUnexported() { decrementExportCount(); } /** * Decrements the count of exported objects, closing the current * server socket if the count reaches zero. **/ private void decrementExportCount() { assert Thread.holdsLock(this); exportCount--; if (exportCount == 0 && getEndpoint().getListenPort() != 0) { ServerSocket ss = server; server = null; try { ss.close(); } catch (IOException e) { } } } /** * Verify that the current access control context has permission to * accept the connection being dispatched by the current thread. */ protected void checkAcceptPermission(AccessControlContext acc) { SecurityManager sm = System.getSecurityManager(); if (sm == null) { return; } ConnectionHandler h = threadConnectionHandler.get(); if (h == null) { throw new Error( "checkAcceptPermission not in ConnectionHandler thread"); } h.checkAcceptPermission(sm, acc); } private TCPEndpoint getEndpoint() { synchronized (epList) { return epList.getLast(); } } /** * Listen on transport's endpoint. */ private void listen() throws RemoteException { assert Thread.holdsLock(this); TCPEndpoint ep = getEndpoint(); int port = ep.getPort(); if (server == null) { if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "(port " + port + ") create server socket"); } try { server = ep.newServerSocket(); /* * Don't retry ServerSocket if creation fails since * "port in use" will cause export to hang if an * RMIFailureHandler is not installed. */ Thread t = AccessController.doPrivileged( new NewThreadAction(new AcceptLoop(server), "TCP Accept-" + port, true)); t.start(); } catch (java.net.BindException e) { throw new ExportException("Port already in use: " + port, e); } catch (IOException e) { throw new ExportException("Listen failed on port: " + port, e); } } else { // otherwise verify security access to existing server socket SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkListen(port); } } } /** * Worker for accepting connections from a server socket. **/ private class AcceptLoop implements Runnable { private final ServerSocket serverSocket; // state for throttling loop on exceptions (local to accept thread) private long lastExceptionTime = 0L; private int recentExceptionCount; AcceptLoop(ServerSocket serverSocket) { this.serverSocket = serverSocket; } public void run() { try { executeAcceptLoop(); } finally { try { /* * Only one accept loop is started per server * socket, so after no more connections will be * accepted, ensure that the server socket is no * longer listening. */ serverSocket.close(); } catch (IOException e) { } } } /** * Accepts connections from the server socket and executes * handlers for them in the thread pool. **/ private void executeAcceptLoop() { if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "listening on port " + getEndpoint().getPort()); } while (true) { Socket socket = null; try { socket = serverSocket.accept(); /* * Find client host name (or "0.0.0.0" if unknown) */ InetAddress clientAddr = socket.getInetAddress(); String clientHost = (clientAddr != null ? clientAddr.getHostAddress() : "0.0.0.0"); /* * Execute connection handler in the thread pool, * which uses non-system threads. */ try { connectionThreadPool.execute( new ConnectionHandler(socket, clientHost)); } catch (RejectedExecutionException e) { closeSocket(socket); tcpLog.log(Log.BRIEF, "rejected connection from " + clientHost); } } catch (Throwable t) { try { /* * If the server socket has been closed, such * as because there are no more exported * objects, then we expect accept to throw an * exception, so just terminate normally. */ if (serverSocket.isClosed()) { break; } try { if (tcpLog.isLoggable(Level.WARNING)) { tcpLog.log(Level.WARNING, "accept loop for " + serverSocket + " throws", t); } } catch (Throwable tt) { } } finally { /* * Always close the accepted socket (if any) * if an exception occurs, but only after * logging an unexpected exception. */ if (socket != null) { closeSocket(socket); } } /* * In case we're running out of file descriptors, * release resources held in caches. */ if (!(t instanceof SecurityException)) { try { TCPEndpoint.shedConnectionCaches(); } catch (Throwable tt) { } } /* * A NoClassDefFoundError can occur if no file * descriptors are available, in which case this * loop should not terminate. */ if (t instanceof Exception || t instanceof OutOfMemoryError || t instanceof NoClassDefFoundError) { if (!continueAfterAcceptFailure(t)) { return; } // continue loop } else { throw (Error) t; } } } } /** * Returns true if the accept loop should continue after the * specified exception has been caught, or false if the accept * loop should terminate (closing the server socket). If * there is an RMIFailureHandler, this method returns the * result of passing the specified exception to it; otherwise, * this method always returns true, after sleeping to throttle * the accept loop if necessary. **/ private boolean continueAfterAcceptFailure(Throwable t) { RMIFailureHandler fh = RMISocketFactory.getFailureHandler(); if (fh != null) { return fh.failure(t instanceof Exception ? (Exception) t : new InvocationTargetException(t)); } else { throttleLoopOnException(); return true; } } /** * Throttles the accept loop after an exception has been * caught: if a burst of 10 exceptions in 5 seconds occurs, * then wait for 10 seconds to curb busy CPU usage. **/ private void throttleLoopOnException() { long now = System.currentTimeMillis(); if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) { // last exception was long ago (or this is the first) lastExceptionTime = now; recentExceptionCount = 0; } else { // exception burst window was started recently if (++recentExceptionCount >= 10) { try { Thread.sleep(10000); } catch (InterruptedException ignore) { } } } } } /** close socket and eat exception */ private static void closeSocket(Socket sock) { try { sock.close(); } catch (IOException ex) { // eat exception } } /** * handleMessages decodes transport operations and handles messages * appropriately. If an exception occurs during message handling, * the socket is closed. */ void handleMessages(Connection conn, boolean persistent) { int port = getEndpoint().getPort(); try { DataInputStream in = new DataInputStream(conn.getInputStream()); do { int op = in.read(); // transport op if (op == -1) { if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "(port " + port + ") connection closed"); } break; } if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "(port " + port + ") op = " + op); } switch (op) { case TransportConstants.Call: // service incoming RMI call RemoteCall call = new StreamRemoteCall(conn); if (serviceCall(call) == false) return; break; case TransportConstants.Ping: // send ack for ping DataOutputStream out = new DataOutputStream(conn.getOutputStream()); out.writeByte(TransportConstants.PingAck); conn.releaseOutputStream(); break; case TransportConstants.DGCAck: DGCAckHandler.received(UID.read(in)); break; default: throw new IOException("unknown transport op " + op); } } while (persistent); } catch (IOException e) { // exception during processing causes connection to close (below) if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "(port " + port + ") exception: ", e); } } finally { try { conn.close(); } catch (IOException ex) { // eat exception } } } /** * Returns the client host for the current thread's connection. Throws * ServerNotActiveException if no connection is active for this thread. */ public static String getClientHost() throws ServerNotActiveException { ConnectionHandler h = threadConnectionHandler.get(); if (h != null) { return h.getClientHost(); } else { throw new ServerNotActiveException("not in a remote call"); } } /** * Services messages on accepted connection */ private class ConnectionHandler implements Runnable { /** int value of "POST" in ASCII (Java's specified data formats * make this once-reviled tactic again socially acceptable) */ private static final int POST = 0x504f5354; /** most recently accept-authorized AccessControlContext */ private AccessControlContext okContext; /** cache of accept-authorized AccessControlContexts */ private Map> authCache; /** security manager which authorized contexts in authCache */ private SecurityManager cacheSecurityManager = null; private Socket socket; private String remoteHost; ConnectionHandler(Socket socket, String remoteHost) { this.socket = socket; this.remoteHost = remoteHost; } String getClientHost() { return remoteHost; } /** * Verify that the given AccessControlContext has permission to * accept this connection. */ void checkAcceptPermission(SecurityManager sm, AccessControlContext acc) { /* * Note: no need to synchronize on cache-related fields, since this * method only gets called from the ConnectionHandler's thread. */ if (sm != cacheSecurityManager) { okContext = null; authCache = new WeakHashMap>(); cacheSecurityManager = sm; } if (acc.equals(okContext) || authCache.containsKey(acc)) { return; } InetAddress addr = socket.getInetAddress(); String host = (addr != null) ? addr.getHostAddress() : "*"; sm.checkAccept(host, socket.getPort()); authCache.put(acc, new SoftReference(acc)); okContext = acc; } public void run() { Thread t = Thread.currentThread(); String name = t.getName(); try { t.setName("RMI TCP Connection(" + connectionCount.incrementAndGet() + ")-" + remoteHost); run0(); } finally { t.setName(name); } } private void run0() { TCPEndpoint endpoint = getEndpoint(); int port = endpoint.getPort(); threadConnectionHandler.set(this); // set socket to disable Nagle's algorithm (always send // immediately) // TBD: should this be left up to socket factory instead? try { socket.setTcpNoDelay(true); } catch (Exception e) { // if we fail to set this, ignore and proceed anyway } // set socket to timeout after excessive idle time try { if (connectionReadTimeout > 0) socket.setSoTimeout(connectionReadTimeout); } catch (Exception e) { // too bad, continue anyway } try { InputStream sockIn = socket.getInputStream(); InputStream bufIn = sockIn.markSupported() ? sockIn : new BufferedInputStream(sockIn); // Read magic (or HTTP wrapper) bufIn.mark(4); DataInputStream in = new DataInputStream(bufIn); int magic = in.readInt(); if (magic == POST) { tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call"); // It's really a HTTP-wrapped request. Repackage // the socket in a HttpReceiveSocket, reinitialize // sockIn and in, and reread magic. bufIn.reset(); // unread "POST" try { socket = new HttpReceiveSocket(socket, bufIn, null); remoteHost = "0.0.0.0"; sockIn = socket.getInputStream(); bufIn = new BufferedInputStream(sockIn); in = new DataInputStream(bufIn); magic = in.readInt(); } catch (IOException e) { throw new RemoteException("Error HTTP-unwrapping call", e); } } // bufIn's mark will invalidate itself when it overflows // so it doesn't have to be turned off // read and verify transport header short version = in.readShort(); if (magic != TransportConstants.Magic || version != TransportConstants.Version) { // protocol mismatch detected... // just close socket: this would recurse if we marshal an // exception to the client and the protocol at other end // doesn't match. closeSocket(socket); return; } OutputStream sockOut = socket.getOutputStream(); BufferedOutputStream bufOut = new BufferedOutputStream(sockOut); DataOutputStream out = new DataOutputStream(bufOut); int remotePort = socket.getPort(); if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "accepted socket from [" + remoteHost + ":" + remotePort + "]"); } TCPEndpoint ep; TCPChannel ch; TCPConnection conn; // send ack (or nack) for protocol byte protocol = in.readByte(); switch (protocol) { case TransportConstants.SingleOpProtocol: // no ack for protocol // create dummy channel for receiving messages ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), endpoint.getClientSocketFactory(), endpoint.getServerSocketFactory()); ch = new TCPChannel(TCPTransport.this, ep); conn = new TCPConnection(ch, socket, bufIn, bufOut); // read input messages handleMessages(conn, false); break; case TransportConstants.StreamProtocol: // send ack out.writeByte(TransportConstants.ProtocolAck); // suggest endpoint (in case client doesn't know host name) if (tcpLog.isLoggable(Log.VERBOSE)) { tcpLog.log(Log.VERBOSE, "(port " + port + ") " + "suggesting " + remoteHost + ":" + remotePort); } out.writeUTF(remoteHost); out.writeInt(remotePort); out.flush(); // read and discard (possibly bogus) endpoint // REMIND: would be faster to read 2 bytes then skip N+4 String clientHost = in.readUTF(); int clientPort = in.readInt(); if (tcpLog.isLoggable(Log.VERBOSE)) { tcpLog.log(Log.VERBOSE, "(port " + port + ") client using " + clientHost + ":" + clientPort); } // create dummy channel for receiving messages // (why not use clientHost and clientPort?) ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), endpoint.getClientSocketFactory(), endpoint.getServerSocketFactory()); ch = new TCPChannel(TCPTransport.this, ep); conn = new TCPConnection(ch, socket, bufIn, bufOut); // read input messages handleMessages(conn, true); break; case TransportConstants.MultiplexProtocol: if (tcpLog.isLoggable(Log.VERBOSE)) { tcpLog.log(Log.VERBOSE, "(port " + port + ") accepting multiplex protocol"); } // send ack out.writeByte(TransportConstants.ProtocolAck); // suggest endpoint (in case client doesn't already have one) if (tcpLog.isLoggable(Log.VERBOSE)) { tcpLog.log(Log.VERBOSE, "(port " + port + ") suggesting " + remoteHost + ":" + remotePort); } out.writeUTF(remoteHost); out.writeInt(remotePort); out.flush(); // read endpoint client has decided to use ep = new TCPEndpoint(in.readUTF(), in.readInt(), endpoint.getClientSocketFactory(), endpoint.getServerSocketFactory()); if (tcpLog.isLoggable(Log.VERBOSE)) { tcpLog.log(Log.VERBOSE, "(port " + port + ") client using " + ep.getHost() + ":" + ep.getPort()); } ConnectionMultiplexer multiplexer; synchronized (channelTable) { // create or find channel for this endpoint ch = getChannel(ep); multiplexer = new ConnectionMultiplexer(ch, bufIn, sockOut, false); ch.useMultiplexer(multiplexer); } multiplexer.run(); break; default: // protocol not understood, send nack and close socket out.writeByte(TransportConstants.ProtocolNack); out.flush(); break; } } catch (IOException e) { // socket in unknown state: destroy socket tcpLog.log(Log.BRIEF, "terminated with exception:", e); } finally { closeSocket(socket); } } } }