1 /*
   2  * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.io.DataInputStream;
  28 import java.io.DataOutputStream;
  29 import java.io.IOException;
  30 import java.lang.ref.Reference;
  31 import java.lang.ref.SoftReference;
  32 import java.net.Socket;
  33 import java.rmi.ConnectIOException;
  34 import java.rmi.RemoteException;
  35 import java.security.AccessControlContext;
  36 import java.security.AccessController;
  37 import java.util.ArrayList;
  38 import java.util.List;
  39 import java.util.ListIterator;
  40 import java.util.WeakHashMap;
  41 import java.util.concurrent.Future;
  42 import java.util.concurrent.ScheduledExecutorService;
  43 import java.util.concurrent.TimeUnit;
  44 import sun.rmi.runtime.Log;
  45 import sun.rmi.runtime.NewThreadAction;
  46 import sun.rmi.runtime.RuntimeUtil;
  47 import sun.rmi.transport.Channel;
  48 import sun.rmi.transport.Connection;
  49 import sun.rmi.transport.Endpoint;
  50 import sun.rmi.transport.TransportConstants;
  51 import sun.security.action.GetIntegerAction;
  52 import sun.security.action.GetLongAction;
  53 
  54 /**
  55  * TCPChannel is the socket-based implementation of the RMI Channel
  56  * abstraction.
  57  *
  58  * @author Ann Wollrath
  59  */
  60 public class TCPChannel implements Channel {
  61     /** endpoint for this channel */
  62     private final TCPEndpoint ep;
  63     /** transport for this channel */
  64     private final TCPTransport tr;
  65     /** list of cached connections */
  66     private final List<TCPConnection> freeList =
  67         new ArrayList<>();
  68     /** frees cached connections that have expired (guarded by freeList) */
  69     private Future<?> reaper = null;
  70 
  71     /** using multiplexer (for bi-directional applet communication */
  72     private boolean usingMultiplexer = false;
  73     /** connection multiplexer, if used */
  74     private ConnectionMultiplexer multiplexer = null;
  75     /** connection acceptor (should be in TCPTransport) */
  76     private ConnectionAcceptor acceptor;
  77 
  78     /** most recently authorized AccessControlContext */
  79     private AccessControlContext okContext;
  80 
  81     /** cache of authorized AccessControlContexts */
  82     private WeakHashMap<AccessControlContext,
  83                         Reference<AccessControlContext>> authcache;
  84 
  85     /** the SecurityManager which authorized okContext and authcache */
  86     private SecurityManager cacheSecurityManager = null;
  87 
  88     /** client-side connection idle usage timeout */
  89     private static final long idleTimeout =             // default 15 seconds
  90         AccessController.doPrivileged(
  91             new GetLongAction("sun.rmi.transport.connectionTimeout", 15000));
  92 
  93     /** client-side connection handshake read timeout */
  94     private static final int handshakeTimeout =         // default 1 minute
  95         AccessController.doPrivileged(
  96             new GetIntegerAction("sun.rmi.transport.tcp.handshakeTimeout",
  97                                  60000));
  98 
  99     /** client-side connection response read timeout (after handshake) */
 100     private static final int responseTimeout =          // default infinity
 101         AccessController.doPrivileged(
 102             new GetIntegerAction("sun.rmi.transport.tcp.responseTimeout", 0));
 103 
 104     /** thread pool for scheduling delayed tasks */
 105     private static final ScheduledExecutorService scheduler =
 106         AccessController.doPrivileged(
 107             new RuntimeUtil.GetInstanceAction()).getScheduler();
 108 
 109     /**
 110      * Create channel for endpoint.
 111      */
 112     TCPChannel(TCPTransport tr, TCPEndpoint ep) {
 113         this.tr = tr;
 114         this.ep = ep;
 115     }
 116 
 117     /**
 118      * Return the endpoint for this channel.
 119      */
 120     public Endpoint getEndpoint() {
 121         return ep;
 122     }
 123 
 124     /**
 125      * Checks if the current caller has sufficient privilege to make
 126      * a connection to the remote endpoint.
 127      * @exception SecurityException if caller is not allowed to use this
 128      * Channel.
 129      */
 130     private void checkConnectPermission() throws SecurityException {
 131         SecurityManager security = System.getSecurityManager();
 132         if (security == null)
 133             return;
 134 
 135         if (security != cacheSecurityManager) {
 136             // The security manager changed: flush the cache
 137             okContext = null;
 138             authcache = new WeakHashMap<AccessControlContext,
 139                                         Reference<AccessControlContext>>();
 140             cacheSecurityManager = security;
 141         }
 142 
 143         AccessControlContext ctx = AccessController.getContext();
 144 
 145         // If ctx is the same context as last time, or if it
 146         // appears in the cache, bypass the checkConnect.
 147         if (okContext == null ||
 148             !(okContext.equals(ctx) || authcache.containsKey(ctx)))
 149         {
 150             security.checkConnect(ep.getHost(), ep.getPort());
 151             authcache.put(ctx, new SoftReference<AccessControlContext>(ctx));
 152             // A WeakHashMap is transformed into a SoftHashSet by making
 153             // each value softly refer to its own key (Peter's idea).
 154         }
 155         okContext = ctx;
 156     }
 157 
 158     /**
 159      * Supplies a connection to the endpoint of the address space
 160      * for which this is a channel.  The returned connection may
 161      * be one retrieved from a cache of idle connections.
 162      */
 163     public Connection newConnection() throws RemoteException {
 164         TCPConnection conn;
 165 
 166         // loop until we find a free live connection (in which case
 167         // we return) or until we run out of freelist (in which case
 168         // the loop exits)
 169         do {
 170             conn = null;
 171             // try to get a free connection
 172             synchronized (freeList) {
 173                 int elementPos = freeList.size()-1;
 174 
 175                 if (elementPos >= 0) {
 176                     // If there is a security manager, make sure
 177                     // the caller is allowed to connect to the
 178                     // requested endpoint.
 179                     checkConnectPermission();
 180                     conn = freeList.get(elementPos);
 181                     freeList.remove(elementPos);
 182                 }
 183             }
 184 
 185             // at this point, conn is null iff the freelist is empty,
 186             // and nonnull if a free connection of uncertain vitality
 187             // has been found.
 188 
 189             if (conn != null) {
 190                 // check to see if the connection has closed since last use
 191                 if (!conn.isDead()) {
 192                     TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
 193                     return conn;
 194                 }
 195 
 196                 // conn is dead, and cannot be reused (reuse => false)
 197                 this.free(conn, false);
 198             }
 199         } while (conn != null);
 200 
 201         // none free, so create a new connection
 202         return (createConnection());
 203     }
 204 
 205     /**
 206      * Create a new connection to the remote endpoint of this channel.
 207      * The returned connection is new.  The caller must already have
 208      * passed a security checkConnect or equivalent.
 209      */
 210     private Connection createConnection() throws RemoteException {
 211         Connection conn;
 212 
 213         TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
 214 
 215         if (!usingMultiplexer) {
 216             Socket sock = ep.newSocket();
 217             conn = new TCPConnection(this, sock);
 218 
 219             try {
 220                 DataOutputStream out =
 221                     new DataOutputStream(conn.getOutputStream());
 222                 writeTransportHeader(out);
 223 
 224                 // choose protocol (single op if not reusable socket)
 225                 if (!conn.isReusable()) {
 226                     out.writeByte(TransportConstants.SingleOpProtocol);
 227                 } else {
 228                     out.writeByte(TransportConstants.StreamProtocol);
 229                     out.flush();
 230 
 231                     /*
 232                      * Set socket read timeout to configured value for JRMP
 233                      * connection handshake; this also serves to guard against
 234                      * non-JRMP servers that do not respond (see 4322806).
 235                      */
 236                     int originalSoTimeout = 0;
 237                     try {
 238                         originalSoTimeout = sock.getSoTimeout();
 239                         sock.setSoTimeout(handshakeTimeout);
 240                     } catch (Exception e) {
 241                         // if we fail to set this, ignore and proceed anyway
 242                     }
 243 
 244                     DataInputStream in =
 245                         new DataInputStream(conn.getInputStream());
 246                     byte ack = in.readByte();
 247                     if (ack != TransportConstants.ProtocolAck) {
 248                         throw new ConnectIOException(
 249                             ack == TransportConstants.ProtocolNack ?
 250                             "JRMP StreamProtocol not supported by server" :
 251                             "non-JRMP server at remote endpoint");
 252                     }
 253 
 254                     String suggestedHost = in.readUTF();
 255                     int    suggestedPort = in.readInt();
 256                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
 257                         TCPTransport.tcpLog.log(Log.VERBOSE,
 258                             "server suggested " + suggestedHost + ":" +
 259                             suggestedPort);
 260                     }
 261 
 262                     // set local host name, if unknown
 263                     TCPEndpoint.setLocalHost(suggestedHost);
 264                     // do NOT set the default port, because we don't
 265                     // know if we can't listen YET...
 266 
 267                     // write out default endpoint to match protocol
 268                     // (but it serves no purpose)
 269                     TCPEndpoint localEp =
 270                         TCPEndpoint.getLocalEndpoint(0, null, null);
 271                     out.writeUTF(localEp.getHost());
 272                     out.writeInt(localEp.getPort());
 273                     if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
 274                         TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
 275                             localEp.getHost() + ":" + localEp.getPort());
 276                     }
 277 
 278                     /*
 279                      * After JRMP handshake, set socket read timeout to value
 280                      * configured for the rest of the lifetime of the
 281                      * connection.  NOTE: this timeout, if configured to a
 282                      * finite duration, places an upper bound on the time
 283                      * that a remote method call is permitted to execute.
 284                      */
 285                     try {
 286                         /*
 287                          * If socket factory had set a non-zero timeout on its
 288                          * own, then restore it instead of using the property-
 289                          * configured value.
 290                          */
 291                         sock.setSoTimeout((originalSoTimeout != 0 ?
 292                                            originalSoTimeout :
 293                                            responseTimeout));
 294                     } catch (Exception e) {
 295                         // if we fail to set this, ignore and proceed anyway
 296                     }
 297 
 298                     out.flush();
 299                 }
 300             } catch (IOException e) {
 301                 if (e instanceof RemoteException)
 302                     throw (RemoteException) e;
 303                 else
 304                     throw new ConnectIOException(
 305                         "error during JRMP connection establishment", e);
 306             }
 307         } else {
 308             try {
 309                 conn = multiplexer.openConnection();
 310             } catch (IOException e) {
 311                 synchronized (this) {
 312                     usingMultiplexer = false;
 313                     multiplexer = null;
 314                 }
 315                 throw new ConnectIOException(
 316                     "error opening virtual connection " +
 317                     "over multiplexed connection", e);
 318             }
 319         }
 320         return conn;
 321     }
 322 
 323     /**
 324      * Free the connection generated by this channel.
 325      * @param conn The connection
 326      * @param reuse If true, the connection is in a state in which it
 327      *        can be reused for another method call.
 328      */
 329     public void free(Connection conn, boolean reuse) {
 330         if (conn == null) return;
 331 
 332         if (reuse && conn.isReusable()) {
 333             long lastuse = System.currentTimeMillis();
 334             TCPConnection tcpConnection = (TCPConnection) conn;
 335 
 336             TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
 337 
 338             /*
 339              * Cache connection; if reaper task for expired
 340              * connections isn't scheduled, then schedule it.
 341              */
 342             synchronized (freeList) {
 343                 freeList.add(tcpConnection);
 344                 if (reaper == null) {
 345                     TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
 346 
 347                     reaper = scheduler.scheduleWithFixedDelay(
 348                         new Runnable() {
 349                             public void run() {
 350                                 TCPTransport.tcpLog.log(Log.VERBOSE,
 351                                                         "wake up");
 352                                 freeCachedConnections();
 353                             }
 354                         }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
 355                 }
 356             }
 357 
 358             tcpConnection.setLastUseTime(lastuse);
 359             tcpConnection.setExpiration(lastuse + idleTimeout);
 360         } else {
 361             TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
 362 
 363             try {
 364                 conn.close();
 365             } catch (IOException ignored) {
 366             }
 367         }
 368     }
 369 
 370     /**
 371      * Send transport header over stream.
 372      */
 373     private void writeTransportHeader(DataOutputStream out)
 374         throws RemoteException
 375     {
 376         try {
 377             // write out transport header
 378             DataOutputStream dataOut =
 379                 new DataOutputStream(out);
 380             dataOut.writeInt(TransportConstants.Magic);
 381             dataOut.writeShort(TransportConstants.Version);
 382         } catch (IOException e) {
 383             throw new ConnectIOException(
 384                 "error writing JRMP transport header", e);
 385         }
 386     }
 387 
 388     /**
 389      * Use given connection multiplexer object to obtain new connections
 390      * through this channel.
 391      */
 392     synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) {
 393         // for now, always just use the last one given
 394         multiplexer = newMultiplexer;
 395 
 396         usingMultiplexer = true;
 397     }
 398 
 399     /**
 400      * Accept a connection provided over a multiplexed channel.
 401      */
 402     void acceptMultiplexConnection(Connection conn) {
 403         if (acceptor == null) {
 404             acceptor = new ConnectionAcceptor(tr);
 405             acceptor.startNewAcceptor();
 406         }
 407         acceptor.accept(conn);
 408     }
 409 
 410     /**
 411      * Closes all the connections in the cache, whether timed out or not.
 412      */
 413     public void shedCache() {
 414         // Build a list of connections, to avoid holding the freeList
 415         // lock during (potentially long-running) close() calls.
 416         Connection[] conn;
 417         synchronized (freeList) {
 418             conn = freeList.toArray(new Connection[freeList.size()]);
 419             freeList.clear();
 420         }
 421 
 422         // Close all the connections that were free
 423         for (int i = conn.length; --i >= 0; ) {
 424             Connection c = conn[i];
 425             conn[i] = null; // help gc
 426             try {
 427                 c.close();
 428             } catch (java.io.IOException e) {
 429                 // eat exception
 430             }
 431         }
 432     }
 433 
 434     private void freeCachedConnections() {
 435         /*
 436          * Remove each connection whose time out has expired.
 437          */
 438         synchronized (freeList) {
 439             int size = freeList.size();
 440 
 441             if (size > 0) {
 442                 long time = System.currentTimeMillis();
 443                 ListIterator<TCPConnection> iter = freeList.listIterator(size);
 444 
 445                 while (iter.hasPrevious()) {
 446                     TCPConnection conn = iter.previous();
 447                     if (conn.expired(time)) {
 448                         TCPTransport.tcpLog.log(Log.VERBOSE,
 449                             "connection timeout expired");
 450 
 451                         try {
 452                             conn.close();
 453                         } catch (java.io.IOException e) {
 454                             // eat exception
 455                         }
 456                         iter.remove();
 457                     }
 458                 }
 459             }
 460 
 461             if (freeList.isEmpty()) {
 462                 reaper.cancel(false);
 463                 reaper = null;
 464             }
 465         }
 466     }
 467 }
 468 
 469 /**
 470  * ConnectionAcceptor manages accepting new connections and giving them
 471  * to TCPTransport's message handler on new threads.
 472  *
 473  * Since this object only needs to know which transport to give new
 474  * connections to, it doesn't need to be per-channel as currently
 475  * implemented.
 476  */
 477 class ConnectionAcceptor implements Runnable {
 478 
 479     /** transport that will handle message on accepted connections */
 480     private TCPTransport transport;
 481 
 482     /** queue of connections to be accepted */
 483     private List<Connection> queue = new ArrayList<>();
 484 
 485     /** thread ID counter */
 486     private static int threadNum = 0;
 487 
 488     /**
 489      * Create a new ConnectionAcceptor that will give connections
 490      * to the specified transport on a new thread.
 491      */
 492     public ConnectionAcceptor(TCPTransport transport) {
 493         this.transport = transport;
 494     }
 495 
 496     /**
 497      * Start a new thread to accept connections.
 498      */
 499     public void startNewAcceptor() {
 500         Thread t = AccessController.doPrivileged(
 501             new NewThreadAction(ConnectionAcceptor.this,
 502                                 "Multiplex Accept-" + ++ threadNum,
 503                                 true));
 504         t.start();
 505     }
 506 
 507     /**
 508      * Add connection to queue of connections to be accepted.
 509      */
 510     public void accept(Connection conn) {
 511         synchronized (queue) {
 512             queue.add(conn);
 513             queue.notify();
 514         }
 515     }
 516 
 517     /**
 518      * Give transport next accepted conection, when available.
 519      */
 520     public void run() {
 521         Connection conn;
 522 
 523         synchronized (queue) {
 524             while (queue.size() == 0) {
 525                 try {
 526                     queue.wait();
 527                 } catch (InterruptedException e) {
 528                 }
 529             }
 530             startNewAcceptor();
 531             conn = queue.remove(0);
 532         }
 533 
 534         transport.handleMessages(conn, true);
 535     }
 536 }