1 /*
   2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.DataInputStream;
  28 import java.io.DataOutputStream;
  29 import java.io.IOException;
  30 import java.lang.ref.Reference;
  31 import java.lang.ref.SoftReference;
  32 import java.net.Socket;
  33 import java.rmi.ConnectIOException;
  34 import java.rmi.RemoteException;
  35 import java.security.AccessControlContext;
  36 import java.security.AccessController;
  37 import java.security.PrivilegedAction;
  38 import java.util.ArrayList;
  39 import java.util.List;
  40 import java.util.ListIterator;
  41 import java.util.WeakHashMap;
  42 import java.util.concurrent.Future;
  43 import java.util.concurrent.ScheduledExecutorService;
  44 import java.util.concurrent.TimeUnit;
  45 import sun.rmi.runtime.Log;
  46 import sun.rmi.runtime.NewThreadAction;
  47 import sun.rmi.runtime.RuntimeUtil;
  48 import sun.rmi.transport.Channel;
  49 import sun.rmi.transport.Connection;
  50 import sun.rmi.transport.Endpoint;
  51 import sun.rmi.transport.TransportConstants;
  52 
  53 /**
  54  * TCPChannel is the socket-based implementation of the RMI Channel
  55  * abstraction.
  56  *
  57  * @author Ann Wollrath
  58  */
  59 public class TCPChannel implements Channel {
  60     /** endpoint for this channel */
  61     private final TCPEndpoint ep;
  62     /** transport for this channel */
  63     private final TCPTransport tr;
  64     /** list of cached connections */
  65     private final List<TCPConnection> freeList =
  66         new ArrayList<>();
  67     /** frees cached connections that have expired (guarded by freeList) */
  68     private Future<?> reaper = null;
  69 
  70     /** using multiplexer (for bi-directional applet communication */
  71     private boolean usingMultiplexer = false;
  72     /** connection multiplexer, if used */
  73     private ConnectionMultiplexer multiplexer = null;
  74     /** connection acceptor (should be in TCPTransport) */
  75     private ConnectionAcceptor acceptor;
  76 
  77     /** most recently authorized AccessControlContext */
  78     private AccessControlContext okContext;
  79 
  80     /** cache of authorized AccessControlContexts */
  81     private WeakHashMap<AccessControlContext,
  82                         Reference<AccessControlContext>> authcache;
  83 
  84     /** the SecurityManager which authorized okContext and authcache */
  85     private SecurityManager cacheSecurityManager = null;
  86 
  87     /** client-side connection idle usage timeout */
  88     private static final long idleTimeout =             // default 15 seconds
  89         AccessController.doPrivileged((PrivilegedAction<Long>) () ->
  90             Long.getLong("sun.rmi.transport.connectionTimeout", 15000));
  91 
  92     /** client-side connection handshake read timeout */
  93     private static final int handshakeTimeout =         // default 1 minute
  94         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
  95             Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000));
  96 
  97     /** client-side connection response read timeout (after handshake) */
  98     private static final int responseTimeout =          // default infinity
  99         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
 100             Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0));
 101 
 102     /** thread pool for scheduling delayed tasks */
 103     private static final ScheduledExecutorService scheduler =
 104         AccessController.doPrivileged(
 105             new RuntimeUtil.GetInstanceAction()).getScheduler();
 106 
 107     /**
 108      * Create channel for endpoint.
 109      */
 110     TCPChannel(TCPTransport tr, TCPEndpoint ep) {
 111         this.tr = tr;
 112         this.ep = ep;
 113     }
 114 
 115     /**
 116      * Return the endpoint for this channel.
 117      */
 118     public Endpoint getEndpoint() {
 119         return ep;
 120     }
 121 
 122     /**
 123      * Checks if the current caller has sufficient privilege to make
 124      * a connection to the remote endpoint.
 125      * @exception SecurityException if caller is not allowed to use this
 126      * Channel.
 127      */
 128     private void checkConnectPermission() throws SecurityException {
 129         SecurityManager security = System.getSecurityManager();
 130         if (security == null)
 131             return;
 132 
 133         if (security != cacheSecurityManager) {
 134             // The security manager changed: flush the cache
 135             okContext = null;
 136             authcache = new WeakHashMap<AccessControlContext,
 137                                         Reference<AccessControlContext>>();
 138             cacheSecurityManager = security;
 139         }
 140 
 141         AccessControlContext ctx = AccessController.getContext();
 142 
 143         // If ctx is the same context as last time, or if it
 144         // appears in the cache, bypass the checkConnect.
 145         if (okContext == null ||
 146             !(okContext.equals(ctx) || authcache.containsKey(ctx)))
 147         {
 148             security.checkConnect(ep.getHost(), ep.getPort());
 149             authcache.put(ctx, new SoftReference<AccessControlContext>(ctx));
 150             // A WeakHashMap is transformed into a SoftHashSet by making
 151             // each value softly refer to its own key (Peter's idea).
 152         }
 153         okContext = ctx;
 154     }
 155 
 156     /**
 157      * Supplies a connection to the endpoint of the address space
 158      * for which this is a channel.  The returned connection may
 159      * be one retrieved from a cache of idle connections.
 160      */
 161     public Connection newConnection() throws RemoteException {
 162         TCPConnection conn;
 163 
 164         // loop until we find a free live connection (in which case
 165         // we return) or until we run out of freelist (in which case
 166         // the loop exits)
 167         do {
 168             conn = null;
 169             // try to get a free connection
 170             synchronized (freeList) {
 171                 int elementPos = freeList.size()-1;
 172 
 173                 if (elementPos >= 0) {
 174                     // If there is a security manager, make sure
 175                     // the caller is allowed to connect to the
 176                     // requested endpoint.
 177                     checkConnectPermission();
 178                     conn = freeList.get(elementPos);
 179                     freeList.remove(elementPos);
 180                 }
 181             }
 182 
 183             // at this point, conn is null iff the freelist is empty,
 184             // and nonnull if a free connection of uncertain vitality
 185             // has been found.
 186 
 187             if (conn != null) {
 188                 // check to see if the connection has closed since last use
 189                 if (!conn.isDead()) {
 190                     TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
 191                     return conn;
 192                 }
 193 
 194                 // conn is dead, and cannot be reused (reuse => false)
 195                 this.free(conn, false);
 196             }
 197         } while (conn != null);
 198 
 199         // none free, so create a new connection
 200         return (createConnection());
 201     }
 202 
 203     /**
 204      * Create a new connection to the remote endpoint of this channel.
 205      * The returned connection is new.  The caller must already have
 206      * passed a security checkConnect or equivalent.
 207      */
 208     private Connection createConnection() throws RemoteException {
 209         Connection conn;
 210 
 211         TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
 212 
 213         if (!usingMultiplexer) {
 214             Socket sock = ep.newSocket();
 215             conn = new TCPConnection(this, sock);
 216 
 217             try {
 218                 DataOutputStream out =
 219                     new DataOutputStream(conn.getOutputStream());
 220                 writeTransportHeader(out);
 221 
 222                 // choose protocol (single op if not reusable socket)
 223                 if (!conn.isReusable()) {
 224                     out.writeByte(TransportConstants.SingleOpProtocol);
 225                 } else {
 226                     out.writeByte(TransportConstants.StreamProtocol);
 227                     out.flush();
 228 
 229                     /*
 230                      * Set socket read timeout to configured value for JRMP
 231                      * connection handshake; this also serves to guard against
 232                      * non-JRMP servers that do not respond (see 4322806).
 233                      */
 234                     int originalSoTimeout = 0;
 235                     try {
 236                         originalSoTimeout = sock.getSoTimeout();
 237                         sock.setSoTimeout(handshakeTimeout);
 238                     } catch (Exception e) {
 239                         // if we fail to set this, ignore and proceed anyway
 240                     }
 241 
 242                     DataInputStream in =
 243                         new DataInputStream(conn.getInputStream());
 244                     byte ack = in.readByte();
 245                     if (ack != TransportConstants.ProtocolAck) {
 246                         throw new ConnectIOException(
 247                             ack == TransportConstants.ProtocolNack ?
 248                             "JRMP StreamProtocol not supported by server" :
 249                             "non-JRMP server at remote endpoint");
 250                     }
 251 
 252                     String suggestedHost = in.readUTF();
 253                     int    suggestedPort = in.readInt();
 254                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
 255                         TCPTransport.tcpLog.log(Log.VERBOSE,
 256                             "server suggested " + suggestedHost + ":" +
 257                             suggestedPort);
 258                     }
 259 
 260                     // set local host name, if unknown
 261                     TCPEndpoint.setLocalHost(suggestedHost);
 262                     // do NOT set the default port, because we don't
 263                     // know if we can't listen YET...
 264 
 265                     // write out default endpoint to match protocol
 266                     // (but it serves no purpose)
 267                     TCPEndpoint localEp =
 268                         TCPEndpoint.getLocalEndpoint(0, null, null);
 269                     out.writeUTF(localEp.getHost());
 270                     out.writeInt(localEp.getPort());
 271                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
 272                         TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
 273                             localEp.getHost() + ":" + localEp.getPort());
 274                     }
 275 
 276                     /*
 277                      * After JRMP handshake, set socket read timeout to value
 278                      * configured for the rest of the lifetime of the
 279                      * connection.  NOTE: this timeout, if configured to a
 280                      * finite duration, places an upper bound on the time
 281                      * that a remote method call is permitted to execute.
 282                      */
 283                     try {
 284                         /*
 285                          * If socket factory had set a non-zero timeout on its
 286                          * own, then restore it instead of using the property-
 287                          * configured value.
 288                          */
 289                         sock.setSoTimeout((originalSoTimeout != 0 ?
 290                                            originalSoTimeout :
 291                                            responseTimeout));
 292                     } catch (Exception e) {
 293                         // if we fail to set this, ignore and proceed anyway
 294                     }
 295 
 296                     out.flush();
 297                 }
 298             } catch (IOException e) {
 299                 if (e instanceof RemoteException)
 300                     throw (RemoteException) e;
 301                 else
 302                     throw new ConnectIOException(
 303                         "error during JRMP connection establishment", e);
 304             }
 305         } else {
 306             try {
 307                 conn = multiplexer.openConnection();
 308             } catch (IOException e) {
 309                 synchronized (this) {
 310                     usingMultiplexer = false;
 311                     multiplexer = null;
 312                 }
 313                 throw new ConnectIOException(
 314                     "error opening virtual connection " +
 315                     "over multiplexed connection", e);
 316             }
 317         }
 318         return conn;
 319     }
 320 
 321     /**
 322      * Free the connection generated by this channel.
 323      * @param conn The connection
 324      * @param reuse If true, the connection is in a state in which it
 325      *        can be reused for another method call.
 326      */
 327     public void free(Connection conn, boolean reuse) {
 328         if (conn == null) return;
 329 
 330         if (reuse && conn.isReusable()) {
 331             long lastuse = System.currentTimeMillis();
 332             TCPConnection tcpConnection = (TCPConnection) conn;
 333 
 334             TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
 335 
 336             /*
 337              * Cache connection; if reaper task for expired
 338              * connections isn't scheduled, then schedule it.
 339              */
 340             synchronized (freeList) {
 341                 freeList.add(tcpConnection);
 342                 if (reaper == null) {
 343                     TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
 344 
 345                     reaper = scheduler.scheduleWithFixedDelay(
 346                         new Runnable() {
 347                             public void run() {
 348                                 TCPTransport.tcpLog.log(Log.VERBOSE,
 349                                                         "wake up");
 350                                 freeCachedConnections();
 351                             }
 352                         }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
 353                 }
 354             }
 355 
 356             tcpConnection.setLastUseTime(lastuse);
 357             tcpConnection.setExpiration(lastuse + idleTimeout);
 358         } else {
 359             TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
 360 
 361             try {
 362                 conn.close();
 363             } catch (IOException ignored) {
 364             }
 365         }
 366     }
 367 
 368     /**
 369      * Send transport header over stream.
 370      */
 371     private void writeTransportHeader(DataOutputStream out)
 372         throws RemoteException
 373     {
 374         try {
 375             // write out transport header
 376             DataOutputStream dataOut =
 377                 new DataOutputStream(out);
 378             dataOut.writeInt(TransportConstants.Magic);
 379             dataOut.writeShort(TransportConstants.Version);
 380         } catch (IOException e) {
 381             throw new ConnectIOException(
 382                 "error writing JRMP transport header", e);
 383         }
 384     }
 385 
 386     /**
 387      * Use given connection multiplexer object to obtain new connections
 388      * through this channel.
 389      */
 390     synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
 391         // for now, always just use the last one given
 392         multiplexer = newMultiplexer;
 393 
 394         usingMultiplexer = true;
 395     }
 396 
 397     /**
 398      * Accept a connection provided over a multiplexed channel.
 399      */
 400     void acceptMultiplexConnection(Connection conn) {
 401         if (acceptor == null) {
 402             acceptor = new ConnectionAcceptor(tr);
 403             acceptor.startNewAcceptor();
 404         }
 405         acceptor.accept(conn);
 406     }
 407 
 408     /**
 409      * Closes all the connections in the cache, whether timed out or not.
 410      */
 411     public void shedCache() {
 412         // Build a list of connections, to avoid holding the freeList
 413         // lock during (potentially long-running) close() calls.
 414         Connection[] conn;
 415         synchronized (freeList) {
 416             conn = freeList.toArray(new Connection[freeList.size()]);
 417             freeList.clear();
 418         }
 419 
 420         // Close all the connections that were free
 421         for (int i = conn.length; --i >= 0; ) {
 422             Connection c = conn[i];
 423             conn[i] = null; // help gc
 424             try {
 425                 c.close();
 426             } catch (java.io.IOException e) {
 427                 // eat exception
 428             }
 429         }
 430     }
 431 
 432     private void freeCachedConnections() {
 433         /*
 434          * Remove each connection whose time out has expired.
 435          */
 436         synchronized (freeList) {
 437             int size = freeList.size();
 438 
 439             if (size > 0) {
 440                 long time = System.currentTimeMillis();
 441                 ListIterator<TCPConnection> iter = freeList.listIterator(size);
 442 
 443                 while (iter.hasPrevious()) {
 444                     TCPConnection conn = iter.previous();
 445                     if (conn.expired(time)) {
 446                         TCPTransport.tcpLog.log(Log.VERBOSE,
 447                             "connection timeout expired");
 448 
 449                         try {
 450                             conn.close();
 451                         } catch (java.io.IOException e) {
 452                             // eat exception
 453                         }
 454                         iter.remove();
 455                     }
 456                 }
 457             }
 458 
 459             if (freeList.isEmpty()) {
 460                 reaper.cancel(false);
 461                 reaper = null;
 462             }
 463         }
 464     }
 465 }
 466 
 467 /**
 468  * ConnectionAcceptor manages accepting new connections and giving them
 469  * to TCPTransport's message handler on new threads.
 470  *
 471  * Since this object only needs to know which transport to give new
 472  * connections to, it doesn't need to be per-channel as currently
 473  * implemented.
 474  */
 475 class ConnectionAcceptor implements Runnable {
 476 
 477     /** transport that will handle message on accepted connections */
 478     private TCPTransport transport;
 479 
 480     /** queue of connections to be accepted */
 481     private List<Connection> queue = new ArrayList<>();
 482 
 483     /** thread ID counter */
 484     private static int threadNum = 0;
 485 
 486     /**
 487      * Create a new ConnectionAcceptor that will give connections
 488      * to the specified transport on a new thread.
 489      */
 490     public ConnectionAcceptor(TCPTransport transport) {
 491         this.transport = transport;
 492     }
 493 
 494     /**
 495      * Start a new thread to accept connections.
 496      */
 497     public void startNewAcceptor() {
 498         Thread t = AccessController.doPrivileged(
 499             new NewThreadAction(ConnectionAcceptor.this,
 500                                 "Multiplex Accept-" + ++ threadNum,
 501                                 true));
 502         t.start();
 503     }
 504 
 505     /**
 506      * Add connection to queue of connections to be accepted.
 507      */
 508     public void accept(Connection conn) {
 509         synchronized (queue) {
 510             queue.add(conn);
 511             queue.notify();
 512         }
 513     }
 514 
 515     /**
 516      * Give transport next accepted connection, when available.
 517      */
 518     public void run() {
 519         Connection conn;
 520 
 521         synchronized (queue) {
 522             while (queue.size() == 0) {
 523                 try {
 524                     queue.wait();
 525                 } catch (InterruptedException e) {
 526                 }
 527             }
 528             startNewAcceptor();
 529             conn = queue.remove(0);
 530         }
 531 
 532         transport.handleMessages(conn, true);
 533     }
 534 }