1 /* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.io.DataInputStream; 28 import java.io.DataOutputStream; 29 import java.io.IOException; 30 import java.lang.ref.Reference; 31 import java.lang.ref.SoftReference; 32 import java.net.Socket; 33 import java.rmi.ConnectIOException; 34 import java.rmi.RemoteException; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedAction; 38 import java.util.ArrayList; 39 import java.util.List; 40 import java.util.ListIterator; 41 import java.util.WeakHashMap; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.TimeUnit; 45 import sun.rmi.runtime.Log; 46 import sun.rmi.runtime.NewThreadAction; 47 import sun.rmi.runtime.RuntimeUtil; 48 import sun.rmi.transport.Channel; 49 import sun.rmi.transport.Connection; 50 import sun.rmi.transport.Endpoint; 51 import sun.rmi.transport.TransportConstants; 52 53 /** 54 * TCPChannel is the socket-based implementation of the RMI Channel 55 * abstraction. 56 * 57 * @author Ann Wollrath 58 */ 59 public class TCPChannel implements Channel { 60 /** endpoint for this channel */ 61 private final TCPEndpoint ep; 62 /** transport for this channel */ 63 private final TCPTransport tr; 64 /** list of cached connections */ 65 private final List<TCPConnection> freeList = 66 new ArrayList<>(); 67 /** frees cached connections that have expired (guarded by freeList) */ 68 private Future<?> reaper = null; 69 70 /** using multiplexer (for bi-directional applet communication */ 71 private boolean usingMultiplexer = false; 72 /** connection multiplexer, if used */ 73 private ConnectionMultiplexer multiplexer = null; 74 /** connection acceptor (should be in TCPTransport) */ 75 private ConnectionAcceptor acceptor; 76 77 /** most recently authorized AccessControlContext */ 78 private AccessControlContext okContext; 79 80 /** cache of authorized AccessControlContexts */ 81 private WeakHashMap<AccessControlContext, 82 Reference<AccessControlContext>> authcache; 83 84 /** the SecurityManager which authorized okContext and authcache */ 85 private SecurityManager cacheSecurityManager = null; 86 87 /** client-side connection idle usage timeout */ 88 private static final long idleTimeout = // default 15 seconds 89 AccessController.doPrivileged((PrivilegedAction<Long>) () -> 90 Long.getLong("sun.rmi.transport.connectionTimeout", 15000)); 91 92 /** client-side connection handshake read timeout */ 93 private static final int handshakeTimeout = // default 1 minute 94 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 95 Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000)); 96 97 /** client-side connection response read timeout (after handshake) */ 98 private static final int responseTimeout = // default infinity 99 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 100 Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0)); 101 102 /** thread pool for scheduling delayed tasks */ 103 private static final ScheduledExecutorService scheduler = 104 AccessController.doPrivileged( 105 new RuntimeUtil.GetInstanceAction()).getScheduler(); 106 107 /** 108 * Create channel for endpoint. 109 */ 110 TCPChannel(TCPTransport tr, TCPEndpoint ep) { 111 this.tr = tr; 112 this.ep = ep; 113 } 114 115 /** 116 * Return the endpoint for this channel. 117 */ 118 public Endpoint getEndpoint() { 119 return ep; 120 } 121 122 /** 123 * Checks if the current caller has sufficient privilege to make 124 * a connection to the remote endpoint. 125 * @exception SecurityException if caller is not allowed to use this 126 * Channel. 127 */ 128 private void checkConnectPermission() throws SecurityException { 129 SecurityManager security = System.getSecurityManager(); 130 if (security == null) 131 return; 132 133 if (security != cacheSecurityManager) { 134 // The security manager changed: flush the cache 135 okContext = null; 136 authcache = new WeakHashMap<AccessControlContext, 137 Reference<AccessControlContext>>(); 138 cacheSecurityManager = security; 139 } 140 141 AccessControlContext ctx = AccessController.getContext(); 142 143 // If ctx is the same context as last time, or if it 144 // appears in the cache, bypass the checkConnect. 145 if (okContext == null || 146 !(okContext.equals(ctx) || authcache.containsKey(ctx))) 147 { 148 security.checkConnect(ep.getHost(), ep.getPort()); 149 authcache.put(ctx, new SoftReference<AccessControlContext>(ctx)); 150 // A WeakHashMap is transformed into a SoftHashSet by making 151 // each value softly refer to its own key (Peter's idea). 152 } 153 okContext = ctx; 154 } 155 156 /** 157 * Supplies a connection to the endpoint of the address space 158 * for which this is a channel. The returned connection may 159 * be one retrieved from a cache of idle connections. 160 */ 161 public Connection newConnection() throws RemoteException { 162 TCPConnection conn; 163 164 // loop until we find a free live connection (in which case 165 // we return) or until we run out of freelist (in which case 166 // the loop exits) 167 do { 168 conn = null; 169 // try to get a free connection 170 synchronized (freeList) { 171 int elementPos = freeList.size()-1; 172 173 if (elementPos >= 0) { 174 // If there is a security manager, make sure 175 // the caller is allowed to connect to the 176 // requested endpoint. 177 checkConnectPermission(); 178 conn = freeList.get(elementPos); 179 freeList.remove(elementPos); 180 } 181 } 182 183 // at this point, conn is null iff the freelist is empty, 184 // and nonnull if a free connection of uncertain vitality 185 // has been found. 186 187 if (conn != null) { 188 // check to see if the connection has closed since last use 189 if (!conn.isDead()) { 190 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 191 return conn; 192 } 193 194 // conn is dead, and cannot be reused (reuse => false) 195 this.free(conn, false); 196 } 197 } while (conn != null); 198 199 // none free, so create a new connection 200 return (createConnection()); 201 } 202 203 /** 204 * Create a new connection to the remote endpoint of this channel. 205 * The returned connection is new. The caller must already have 206 * passed a security checkConnect or equivalent. 207 */ 208 private Connection createConnection() throws RemoteException { 209 Connection conn; 210 211 TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); 212 213 if (!usingMultiplexer) { 214 Socket sock = ep.newSocket(); 215 conn = new TCPConnection(this, sock); 216 217 try { 218 DataOutputStream out = 219 new DataOutputStream(conn.getOutputStream()); 220 writeTransportHeader(out); 221 222 // choose protocol (single op if not reusable socket) 223 if (!conn.isReusable()) { 224 out.writeByte(TransportConstants.SingleOpProtocol); 225 } else { 226 out.writeByte(TransportConstants.StreamProtocol); 227 out.flush(); 228 229 /* 230 * Set socket read timeout to configured value for JRMP 231 * connection handshake; this also serves to guard against 232 * non-JRMP servers that do not respond (see 4322806). 233 */ 234 int originalSoTimeout = 0; 235 try { 236 originalSoTimeout = sock.getSoTimeout(); 237 sock.setSoTimeout(handshakeTimeout); 238 } catch (Exception e) { 239 // if we fail to set this, ignore and proceed anyway 240 } 241 242 DataInputStream in = 243 new DataInputStream(conn.getInputStream()); 244 byte ack = in.readByte(); 245 if (ack != TransportConstants.ProtocolAck) { 246 throw new ConnectIOException( 247 ack == TransportConstants.ProtocolNack ? 248 "JRMP StreamProtocol not supported by server" : 249 "non-JRMP server at remote endpoint"); 250 } 251 252 String suggestedHost = in.readUTF(); 253 int suggestedPort = in.readInt(); 254 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 255 TCPTransport.tcpLog.log(Log.VERBOSE, 256 "server suggested " + suggestedHost + ":" + 257 suggestedPort); 258 } 259 260 // set local host name, if unknown 261 TCPEndpoint.setLocalHost(suggestedHost); 262 // do NOT set the default port, because we don't 263 // know if we can't listen YET... 264 265 // write out default endpoint to match protocol 266 // (but it serves no purpose) 267 TCPEndpoint localEp = 268 TCPEndpoint.getLocalEndpoint(0, null, null); 269 out.writeUTF(localEp.getHost()); 270 out.writeInt(localEp.getPort()); 271 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 272 TCPTransport.tcpLog.log(Log.VERBOSE, "using " + 273 localEp.getHost() + ":" + localEp.getPort()); 274 } 275 276 /* 277 * After JRMP handshake, set socket read timeout to value 278 * configured for the rest of the lifetime of the 279 * connection. NOTE: this timeout, if configured to a 280 * finite duration, places an upper bound on the time 281 * that a remote method call is permitted to execute. 282 */ 283 try { 284 /* 285 * If socket factory had set a non-zero timeout on its 286 * own, then restore it instead of using the property- 287 * configured value. 288 */ 289 sock.setSoTimeout((originalSoTimeout != 0 ? 290 originalSoTimeout : 291 responseTimeout)); 292 } catch (Exception e) { 293 // if we fail to set this, ignore and proceed anyway 294 } 295 296 out.flush(); 297 } 298 } catch (IOException e) { 299 if (e instanceof RemoteException) 300 throw (RemoteException) e; 301 else 302 throw new ConnectIOException( 303 "error during JRMP connection establishment", e); 304 } 305 } else { 306 try { 307 conn = multiplexer.openConnection(); 308 } catch (IOException e) { 309 synchronized (this) { 310 usingMultiplexer = false; 311 multiplexer = null; 312 } 313 throw new ConnectIOException( 314 "error opening virtual connection " + 315 "over multiplexed connection", e); 316 } 317 } 318 return conn; 319 } 320 321 /** 322 * Free the connection generated by this channel. 323 * @param conn The connection 324 * @param reuse If true, the connection is in a state in which it 325 * can be reused for another method call. 326 */ 327 public void free(Connection conn, boolean reuse) { 328 if (conn == null) return; 329 330 if (reuse && conn.isReusable()) { 331 long lastuse = System.currentTimeMillis(); 332 TCPConnection tcpConnection = (TCPConnection) conn; 333 334 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 335 336 /* 337 * Cache connection; if reaper task for expired 338 * connections isn't scheduled, then schedule it. 339 */ 340 synchronized (freeList) { 341 freeList.add(tcpConnection); 342 if (reaper == null) { 343 TCPTransport.tcpLog.log(Log.BRIEF, "create reaper"); 344 345 reaper = scheduler.scheduleWithFixedDelay( 346 new Runnable() { 347 public void run() { 348 TCPTransport.tcpLog.log(Log.VERBOSE, 349 "wake up"); 350 freeCachedConnections(); 351 } 352 }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS); 353 } 354 } 355 356 tcpConnection.setLastUseTime(lastuse); 357 tcpConnection.setExpiration(lastuse + idleTimeout); 358 } else { 359 TCPTransport.tcpLog.log(Log.BRIEF, "close connection"); 360 361 try { 362 conn.close(); 363 } catch (IOException ignored) { 364 } 365 } 366 } 367 368 /** 369 * Send transport header over stream. 370 */ 371 private void writeTransportHeader(DataOutputStream out) 372 throws RemoteException 373 { 374 try { 375 // write out transport header 376 DataOutputStream dataOut = 377 new DataOutputStream(out); 378 dataOut.writeInt(TransportConstants.Magic); 379 dataOut.writeShort(TransportConstants.Version); 380 } catch (IOException e) { 381 throw new ConnectIOException( 382 "error writing JRMP transport header", e); 383 } 384 } 385 386 /** 387 * Use given connection multiplexer object to obtain new connections 388 * through this channel. 389 */ 390 synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) { 391 // for now, always just use the last one given 392 multiplexer = newMultiplexer; 393 394 usingMultiplexer = true; 395 } 396 397 /** 398 * Accept a connection provided over a multiplexed channel. 399 */ 400 void acceptMultiplexConnection(Connection conn) { 401 if (acceptor == null) { 402 acceptor = new ConnectionAcceptor(tr); 403 acceptor.startNewAcceptor(); 404 } 405 acceptor.accept(conn); 406 } 407 408 /** 409 * Closes all the connections in the cache, whether timed out or not. 410 */ 411 public void shedCache() { 412 // Build a list of connections, to avoid holding the freeList 413 // lock during (potentially long-running) close() calls. 414 Connection[] conn; 415 synchronized (freeList) { 416 conn = freeList.toArray(new Connection[freeList.size()]); 417 freeList.clear(); 418 } 419 420 // Close all the connections that were free 421 for (int i = conn.length; --i >= 0; ) { 422 Connection c = conn[i]; 423 conn[i] = null; // help gc 424 try { 425 c.close(); 426 } catch (java.io.IOException e) { 427 // eat exception 428 } 429 } 430 } 431 432 private void freeCachedConnections() { 433 /* 434 * Remove each connection whose time out has expired. 435 */ 436 synchronized (freeList) { 437 int size = freeList.size(); 438 439 if (size > 0) { 440 long time = System.currentTimeMillis(); 441 ListIterator<TCPConnection> iter = freeList.listIterator(size); 442 443 while (iter.hasPrevious()) { 444 TCPConnection conn = iter.previous(); 445 if (conn.expired(time)) { 446 TCPTransport.tcpLog.log(Log.VERBOSE, 447 "connection timeout expired"); 448 449 try { 450 conn.close(); 451 } catch (java.io.IOException e) { 452 // eat exception 453 } 454 iter.remove(); 455 } 456 } 457 } 458 459 if (freeList.isEmpty()) { 460 reaper.cancel(false); 461 reaper = null; 462 } 463 } 464 } 465 } 466 467 /** 468 * ConnectionAcceptor manages accepting new connections and giving them 469 * to TCPTransport's message handler on new threads. 470 * 471 * Since this object only needs to know which transport to give new 472 * connections to, it doesn't need to be per-channel as currently 473 * implemented. 474 */ 475 class ConnectionAcceptor implements Runnable { 476 477 /** transport that will handle message on accepted connections */ 478 private TCPTransport transport; 479 480 /** queue of connections to be accepted */ 481 private List<Connection> queue = new ArrayList<>(); 482 483 /** thread ID counter */ 484 private static int threadNum = 0; 485 486 /** 487 * Create a new ConnectionAcceptor that will give connections 488 * to the specified transport on a new thread. 489 */ 490 public ConnectionAcceptor(TCPTransport transport) { 491 this.transport = transport; 492 } 493 494 /** 495 * Start a new thread to accept connections. 496 */ 497 public void startNewAcceptor() { 498 Thread t = AccessController.doPrivileged( 499 new NewThreadAction(ConnectionAcceptor.this, 500 "Multiplex Accept-" + ++ threadNum, 501 true)); 502 t.start(); 503 } 504 505 /** 506 * Add connection to queue of connections to be accepted. 507 */ 508 public void accept(Connection conn) { 509 synchronized (queue) { 510 queue.add(conn); 511 queue.notify(); 512 } 513 } 514 515 /** 516 * Give transport next accepted connection, when available. 517 */ 518 public void run() { 519 Connection conn; 520 521 synchronized (queue) { 522 while (queue.size() == 0) { 523 try { 524 queue.wait(); 525 } catch (InterruptedException e) { 526 } 527 } 528 startNewAcceptor(); 529 conn = queue.remove(0); 530 } 531 532 transport.handleMessages(conn, true); 533 } 534 }