1 /*
   2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.DataInputStream;
  28 import java.io.DataOutputStream;
  29 import java.io.IOException;
  30 import java.lang.ref.Reference;
  31 import java.lang.ref.SoftReference;
  32 import java.net.Socket;
  33 import java.rmi.ConnectIOException;
  34 import java.rmi.RemoteException;
  35 import java.security.AccessControlContext;
  36 import java.security.AccessController;
  37 import java.security.PrivilegedAction;
  38 import java.util.ArrayList;
  39 import java.util.List;
  40 import java.util.ListIterator;
  41 import java.util.WeakHashMap;
  42 import java.util.concurrent.Future;
  43 import java.util.concurrent.ScheduledExecutorService;
  44 import java.util.concurrent.TimeUnit;
  45 import sun.rmi.runtime.Log;
  46 import sun.rmi.runtime.NewThreadAction;
  47 import sun.rmi.runtime.RuntimeUtil;
  48 import sun.rmi.transport.Channel;
  49 import sun.rmi.transport.Connection;
  50 import sun.rmi.transport.Endpoint;
  51 import sun.rmi.transport.TransportConstants;
  52 
  53 /**
  54  * TCPChannel is the socket-based implementation of the RMI Channel
  55  * abstraction.
  56  *
  57  * @author Ann Wollrath
  58  */
  59 public class TCPChannel implements Channel {
  60     /** endpoint for this channel */
  61     private final TCPEndpoint ep;
  62     /** transport for this channel */
  63     private final TCPTransport tr;
  64     /** list of cached connections */
  65     private final List<TCPConnection> freeList =
  66         new ArrayList<>();
  67     /** frees cached connections that have expired (guarded by freeList) */
  68     private Future<?> reaper = null;
  69 
  70     /** using multiplexer (for bi-directional applet communication */
  71     private boolean usingMultiplexer = false;
  72     /** connection multiplexer, if used */
  73     private ConnectionMultiplexer multiplexer = null;
  74     /** connection acceptor (should be in TCPTransport) */
  75     private ConnectionAcceptor acceptor;
  76 
  77     /** most recently authorized AccessControlContext */
  78     private AccessControlContext okContext;
  79 
  80     /** cache of authorized AccessControlContexts */
  81     private WeakHashMap<AccessControlContext,
  82                         Reference<AccessControlContext>> authcache;
  83 
  84     /** the SecurityManager which authorized okContext and authcache */
  85     private SecurityManager cacheSecurityManager = null;
  86 
  87     /** client-side connection idle usage timeout */
  88     private static final long idleTimeout =             // default 15 seconds
  89         AccessController.doPrivileged((PrivilegedAction<Long>) () ->
  90             Long.getLong("sun.rmi.transport.connectionTimeout", 15000));
  91 
  92     /** client-side connection handshake read timeout */
  93     private static final int handshakeTimeout =         // default 1 minute
  94         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
  95             Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000));
  96 
  97     /** client-side connection response read timeout (after handshake) */
  98     private static final int responseTimeout =          // default infinity
  99         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
 100             Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0));
 101 
 102     /** thread pool for scheduling delayed tasks */
 103     private static final ScheduledExecutorService scheduler =
 104         AccessController.doPrivileged(
 105             new RuntimeUtil.GetInstanceAction()).getScheduler();
 106 
 107     /**
 108      * Create channel for endpoint.
 109      */
 110     TCPChannel(TCPTransport tr, TCPEndpoint ep) {
 111         this.tr = tr;
 112         this.ep = ep;
 113     }
 114 
 115     /**
 116      * Return the endpoint for this channel.
 117      */
 118     public Endpoint getEndpoint() {
 119         return ep;
 120     }
 121 
 122     /**
 123      * Checks if the current caller has sufficient privilege to make
 124      * a connection to the remote endpoint.
 125      * @exception SecurityException if caller is not allowed to use this
 126      * Channel.
 127      */
 128     private void checkConnectPermission() throws SecurityException {
 129         SecurityManager security = System.getSecurityManager();
 130         if (security == null)
 131             return;
 132 
 133         if (security != cacheSecurityManager) {
 134             // The security manager changed: flush the cache
 135             okContext = null;
 136             authcache = new WeakHashMap<AccessControlContext,
 137                                         Reference<AccessControlContext>>();
 138             cacheSecurityManager = security;
 139         }
 140 
 141         AccessControlContext ctx = AccessController.getContext();
 142 
 143         // If ctx is the same context as last time, or if it
 144         // appears in the cache, bypass the checkConnect.
 145         if (okContext == null ||
 146             !(okContext.equals(ctx) || authcache.containsKey(ctx)))
 147         {
 148             security.checkConnect(ep.getHost(), ep.getPort());
 149             authcache.put(ctx, new SoftReference<AccessControlContext>(ctx));
 150             // A WeakHashMap is transformed into a SoftHashSet by making
 151             // each value softly refer to its own key (Peter's idea).
 152         }
 153         okContext = ctx;
 154     }
 155 
 156     /**
 157      * Supplies a connection to the endpoint of the address space
 158      * for which this is a channel.  The returned connection may
 159      * be one retrieved from a cache of idle connections.
 160      */
 161     public Connection newConnection() throws RemoteException {
 162         TCPConnection conn;
 163 
 164         // loop until we find a free live connection (in which case
 165         // we return) or until we run out of freelist (in which case
 166         // the loop exits)
 167         do {
 168             conn = null;
 169             // try to get a free connection
 170             synchronized (freeList) {
 171                 int elementPos = freeList.size()-1;
 172 
 173                 if (elementPos >= 0) {
 174                     // If there is a security manager, make sure
 175                     // the caller is allowed to connect to the
 176                     // requested endpoint.
 177                     checkConnectPermission();
 178                     conn = freeList.get(elementPos);
 179                     freeList.remove(elementPos);
 180                 }
 181             }
 182 
 183             // at this point, conn is null iff the freelist is empty,
 184             // and nonnull if a free connection of uncertain vitality
 185             // has been found.
 186 
 187             if (conn != null) {
 188                 // check to see if the connection has closed since last use
 189                 if (!conn.isDead()) {
 190                     TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
 191                     return conn;
 192                 }
 193 
 194                 // conn is dead, and cannot be reused (reuse => false)
 195                 this.free(conn, false);
 196             }
 197         } while (conn != null);
 198 
 199         // none free, so create a new connection
 200         return (createConnection());
 201     }
 202 
 203     /**
 204      * Create a new connection to the remote endpoint of this channel.
 205      * The returned connection is new.  The caller must already have
 206      * passed a security checkConnect or equivalent.
 207      */
 208     private Connection createConnection() throws RemoteException {
 209         Connection conn;
 210 
 211         TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
 212 
 213         if (!usingMultiplexer) {
 214             Socket sock = ep.newSocket();
 215             conn = new TCPConnection(this, sock);
 216 
 217             try {
 218                 DataOutputStream out =
 219                     new DataOutputStream(conn.getOutputStream());
 220                 writeTransportHeader(out);
 221 
 222                 // choose protocol (single op if not reusable socket)
 223                 if (!conn.isReusable()) {
 224                     out.writeByte(TransportConstants.SingleOpProtocol);
 225                 } else {
 226                     out.writeByte(TransportConstants.StreamProtocol);
 227                     out.flush();
 228 
 229                     /*
 230                      * Set socket read timeout to configured value for JRMP
 231                      * connection handshake; this also serves to guard against
 232                      * non-JRMP servers that do not respond (see 4322806).
 233                      */
 234                     int originalSoTimeout = 0;
 235                     try {
 236                         originalSoTimeout = sock.getSoTimeout();
 237                         sock.setSoTimeout(handshakeTimeout);
 238                     } catch (Exception e) {
 239                         // if we fail to set this, ignore and proceed anyway
 240                     }
 241 
 242                     DataInputStream in =
 243                         new DataInputStream(conn.getInputStream());
 244                     byte ack = in.readByte();
 245                     if (ack != TransportConstants.ProtocolAck) {
 246                         throw new ConnectIOException(
 247                             ack == TransportConstants.ProtocolNack ?
 248                             "JRMP StreamProtocol not supported by server" :
 249                             "non-JRMP server at remote endpoint");
 250                     }
 251 
 252                     String suggestedHost = in.readUTF();
 253                     int    suggestedPort = in.readInt();
 254                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
 255                         TCPTransport.tcpLog.log(Log.VERBOSE,
 256                             "server suggested " + suggestedHost + ":" +
 257                             suggestedPort);
 258                     }
 259 
 260                     // set local host name, if unknown
 261                     TCPEndpoint.setLocalHost(suggestedHost);
 262                     // do NOT set the default port, because we don't
 263                     // know if we can't listen YET...
 264 
 265                     // write out default endpoint to match protocol
 266                     // (but it serves no purpose)
 267                     TCPEndpoint localEp =
 268                         TCPEndpoint.getLocalEndpoint(0, null, null);
 269                     out.writeUTF(localEp.getHost());
 270                     out.writeInt(localEp.getPort());
 271                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
 272                         TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
 273                             localEp.getHost() + ":" + localEp.getPort());
 274                     }
 275 
 276                     /*
 277                      * After JRMP handshake, set socket read timeout to value
 278                      * configured for the rest of the lifetime of the
 279                      * connection.  NOTE: this timeout, if configured to a
 280                      * finite duration, places an upper bound on the time
 281                      * that a remote method call is permitted to execute.
 282                      */
 283                     try {
 284                         /*
 285                          * If socket factory had set a non-zero timeout on its
 286                          * own, then restore it instead of using the property-
 287                          * configured value.
 288                          */
 289                         sock.setSoTimeout((originalSoTimeout != 0 ?
 290                                            originalSoTimeout :
 291                                            responseTimeout));
 292                     } catch (Exception e) {
 293                         // if we fail to set this, ignore and proceed anyway
 294                     }
 295 
 296                     out.flush();
 297                 }
 298             } catch (IOException e) {
 299                 try {
 300                     conn.close();
 301                 } catch (Exception ex) {}
 302                 if (e instanceof RemoteException) {
 303                     throw (RemoteException) e;
 304                 } else {
 305                     throw new ConnectIOException(
 306                         "error during JRMP connection establishment", e);
 307                 }
 308             }
 309         } else {
 310             try {
 311                 conn = multiplexer.openConnection();
 312             } catch (IOException e) {
 313                 synchronized (this) {
 314                     usingMultiplexer = false;
 315                     multiplexer = null;
 316                 }
 317                 throw new ConnectIOException(
 318                     "error opening virtual connection " +
 319                     "over multiplexed connection", e);
 320             }
 321         }
 322         return conn;
 323     }
 324 
 325     /**
 326      * Free the connection generated by this channel.
 327      * @param conn The connection
 328      * @param reuse If true, the connection is in a state in which it
 329      *        can be reused for another method call.
 330      */
 331     public void free(Connection conn, boolean reuse) {
 332         if (conn == null) return;
 333 
 334         if (reuse && conn.isReusable()) {
 335             long lastuse = System.currentTimeMillis();
 336             TCPConnection tcpConnection = (TCPConnection) conn;
 337 
 338             TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
 339 
 340             /*
 341              * Cache connection; if reaper task for expired
 342              * connections isn't scheduled, then schedule it.
 343              */
 344             synchronized (freeList) {
 345                 freeList.add(tcpConnection);
 346                 if (reaper == null) {
 347                     TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
 348 
 349                     reaper = scheduler.scheduleWithFixedDelay(
 350                         new Runnable() {
 351                             public void run() {
 352                                 TCPTransport.tcpLog.log(Log.VERBOSE,
 353                                                         "wake up");
 354                                 freeCachedConnections();
 355                             }
 356                         }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
 357                 }
 358             }
 359 
 360             tcpConnection.setLastUseTime(lastuse);
 361             tcpConnection.setExpiration(lastuse + idleTimeout);
 362         } else {
 363             TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
 364 
 365             try {
 366                 conn.close();
 367             } catch (IOException ignored) {
 368             }
 369         }
 370     }
 371 
 372     /**
 373      * Send transport header over stream.
 374      */
 375     private void writeTransportHeader(DataOutputStream out)
 376         throws RemoteException
 377     {
 378         try {
 379             // write out transport header
 380             DataOutputStream dataOut =
 381                 new DataOutputStream(out);
 382             dataOut.writeInt(TransportConstants.Magic);
 383             dataOut.writeShort(TransportConstants.Version);
 384         } catch (IOException e) {
 385             throw new ConnectIOException(
 386                 "error writing JRMP transport header", e);
 387         }
 388     }
 389 
 390     /**
 391      * Use given connection multiplexer object to obtain new connections
 392      * through this channel.
 393      */
 394     synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
 395         // for now, always just use the last one given
 396         multiplexer = newMultiplexer;
 397 
 398         usingMultiplexer = true;
 399     }
 400 
 401     /**
 402      * Accept a connection provided over a multiplexed channel.
 403      */
 404     void acceptMultiplexConnection(Connection conn) {
 405         if (acceptor == null) {
 406             acceptor = new ConnectionAcceptor(tr);
 407             acceptor.startNewAcceptor();
 408         }
 409         acceptor.accept(conn);
 410     }
 411 
 412     /**
 413      * Closes all the connections in the cache, whether timed out or not.
 414      */
 415     public void shedCache() {
 416         // Build a list of connections, to avoid holding the freeList
 417         // lock during (potentially long-running) close() calls.
 418         Connection[] conn;
 419         synchronized (freeList) {
 420             conn = freeList.toArray(new Connection[freeList.size()]);
 421             freeList.clear();
 422         }
 423 
 424         // Close all the connections that were free
 425         for (int i = conn.length; --i >= 0; ) {
 426             Connection c = conn[i];
 427             conn[i] = null; // help gc
 428             try {
 429                 c.close();
 430             } catch (java.io.IOException e) {
 431                 // eat exception
 432             }
 433         }
 434     }
 435 
 436     private void freeCachedConnections() {
 437         /*
 438          * Remove each connection whose time out has expired.
 439          */
 440         synchronized (freeList) {
 441             int size = freeList.size();
 442 
 443             if (size > 0) {
 444                 long time = System.currentTimeMillis();
 445                 ListIterator<TCPConnection> iter = freeList.listIterator(size);
 446 
 447                 while (iter.hasPrevious()) {
 448                     TCPConnection conn = iter.previous();
 449                     if (conn.expired(time)) {
 450                         TCPTransport.tcpLog.log(Log.VERBOSE,
 451                             "connection timeout expired");
 452 
 453                         try {
 454                             conn.close();
 455                         } catch (java.io.IOException e) {
 456                             // eat exception
 457                         }
 458                         iter.remove();
 459                     }
 460                 }
 461             }
 462 
 463             if (freeList.isEmpty()) {
 464                 reaper.cancel(false);
 465                 reaper = null;
 466             }
 467         }
 468     }
 469 }
 470 
 471 /**
 472  * ConnectionAcceptor manages accepting new connections and giving them
 473  * to TCPTransport's message handler on new threads.
 474  *
 475  * Since this object only needs to know which transport to give new
 476  * connections to, it doesn't need to be per-channel as currently
 477  * implemented.
 478  */
 479 class ConnectionAcceptor implements Runnable {
 480 
 481     /** transport that will handle message on accepted connections */
 482     private TCPTransport transport;
 483 
 484     /** queue of connections to be accepted */
 485     private List<Connection> queue = new ArrayList<>();
 486 
 487     /** thread ID counter */
 488     private static int threadNum = 0;
 489 
 490     /**
 491      * Create a new ConnectionAcceptor that will give connections
 492      * to the specified transport on a new thread.
 493      */
 494     public ConnectionAcceptor(TCPTransport transport) {
 495         this.transport = transport;
 496     }
 497 
 498     /**
 499      * Start a new thread to accept connections.
 500      */
 501     public void startNewAcceptor() {
 502         Thread t = AccessController.doPrivileged(
 503             new NewThreadAction(ConnectionAcceptor.this,
 504                                 "Multiplex Accept-" + ++ threadNum,
 505                                 true));
 506         t.start();
 507     }
 508 
 509     /**
 510      * Add connection to queue of connections to be accepted.
 511      */
 512     public void accept(Connection conn) {
 513         synchronized (queue) {
 514             queue.add(conn);
 515             queue.notify();
 516         }
 517     }
 518 
 519     /**
 520      * Give transport next accepted connection, when available.
 521      */
 522     public void run() {
 523         Connection conn;
 524 
 525         synchronized (queue) {
 526             while (queue.size() == 0) {
 527                 try {
 528                     queue.wait();
 529                 } catch (InterruptedException e) {
 530                 }
 531             }
 532             startNewAcceptor();
 533             conn = queue.remove(0);
 534         }
 535 
 536         transport.handleMessages(conn, true);
 537     }
 538 }