1 /* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.io.DataInputStream; 28 import java.io.DataOutputStream; 29 import java.io.IOException; 30 import java.lang.ref.Reference; 31 import java.lang.ref.SoftReference; 32 import java.net.Socket; 33 import java.rmi.ConnectIOException; 34 import java.rmi.RemoteException; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedAction; 38 import java.util.ArrayList; 39 import java.util.List; 40 import java.util.ListIterator; 41 import java.util.WeakHashMap; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.TimeUnit; 45 import sun.rmi.runtime.Log; 46 import sun.rmi.runtime.NewThreadAction; 47 import sun.rmi.runtime.RuntimeUtil; 48 import sun.rmi.transport.Channel; 49 import sun.rmi.transport.Connection; 50 import sun.rmi.transport.Endpoint; 51 import sun.rmi.transport.TransportConstants; 52 53 /** 54 * TCPChannel is the socket-based implementation of the RMI Channel 55 * abstraction. 56 * 57 * @author Ann Wollrath 58 */ 59 public class TCPChannel implements Channel { 60 /** endpoint for this channel */ 61 private final TCPEndpoint ep; 62 /** transport for this channel */ 63 private final TCPTransport tr; 64 /** list of cached connections */ 65 private final List<TCPConnection> freeList = 66 new ArrayList<>(); 67 /** frees cached connections that have expired (guarded by freeList) */ 68 private Future<?> reaper = null; 69 70 /** using multiplexer (for bi-directional applet communication */ 71 private boolean usingMultiplexer = false; 72 /** connection multiplexer, if used */ 73 private ConnectionMultiplexer multiplexer = null; 74 /** connection acceptor (should be in TCPTransport) */ 75 private ConnectionAcceptor acceptor; 76 77 /** most recently authorized AccessControlContext */ 78 private AccessControlContext okContext; 79 80 /** cache of authorized AccessControlContexts */ 81 private WeakHashMap<AccessControlContext, 82 Reference<AccessControlContext>> authcache; 83 84 /** the SecurityManager which authorized okContext and authcache */ 85 private SecurityManager cacheSecurityManager = null; 86 87 /** client-side connection idle usage timeout */ 88 private static final long idleTimeout = // default 15 seconds 89 AccessController.doPrivileged((PrivilegedAction<Long>) () -> 90 Long.getLong("sun.rmi.transport.connectionTimeout", 15000)); 91 92 /** client-side connection handshake read timeout */ 93 private static final int handshakeTimeout = // default 1 minute 94 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 95 Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000)); 96 97 /** client-side connection response read timeout (after handshake) */ 98 private static final int responseTimeout = // default infinity 99 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 100 Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0)); 101 102 /** thread pool for scheduling delayed tasks */ 103 private static final ScheduledExecutorService scheduler = 104 AccessController.doPrivileged( 105 new RuntimeUtil.GetInstanceAction()).getScheduler(); 106 107 /** 108 * Create channel for endpoint. 109 */ 110 TCPChannel(TCPTransport tr, TCPEndpoint ep) { 111 this.tr = tr; 112 this.ep = ep; 113 } 114 115 /** 116 * Return the endpoint for this channel. 117 */ 118 public Endpoint getEndpoint() { 119 return ep; 120 } 121 122 /** 123 * Checks if the current caller has sufficient privilege to make 124 * a connection to the remote endpoint. 125 * @exception SecurityException if caller is not allowed to use this 126 * Channel. 127 */ 128 private void checkConnectPermission() throws SecurityException { 129 SecurityManager security = System.getSecurityManager(); 130 if (security == null) 131 return; 132 133 if (security != cacheSecurityManager) { 134 // The security manager changed: flush the cache 135 okContext = null; 136 authcache = new WeakHashMap<AccessControlContext, 137 Reference<AccessControlContext>>(); 138 cacheSecurityManager = security; 139 } 140 141 AccessControlContext ctx = AccessController.getContext(); 142 143 // If ctx is the same context as last time, or if it 144 // appears in the cache, bypass the checkConnect. 145 if (okContext == null || 146 !(okContext.equals(ctx) || authcache.containsKey(ctx))) 147 { 148 security.checkConnect(ep.getHost(), ep.getPort()); 149 authcache.put(ctx, new SoftReference<AccessControlContext>(ctx)); 150 // A WeakHashMap is transformed into a SoftHashSet by making 151 // each value softly refer to its own key (Peter's idea). 152 } 153 okContext = ctx; 154 } 155 156 /** 157 * Supplies a connection to the endpoint of the address space 158 * for which this is a channel. The returned connection may 159 * be one retrieved from a cache of idle connections. 160 */ 161 public Connection newConnection() throws RemoteException { 162 TCPConnection conn; 163 164 // loop until we find a free live connection (in which case 165 // we return) or until we run out of freelist (in which case 166 // the loop exits) 167 do { 168 conn = null; 169 // try to get a free connection 170 synchronized (freeList) { 171 int elementPos = freeList.size()-1; 172 173 if (elementPos >= 0) { 174 // If there is a security manager, make sure 175 // the caller is allowed to connect to the 176 // requested endpoint. 177 checkConnectPermission(); 178 conn = freeList.get(elementPos); 179 freeList.remove(elementPos); 180 } 181 } 182 183 // at this point, conn is null iff the freelist is empty, 184 // and nonnull if a free connection of uncertain vitality 185 // has been found. 186 187 if (conn != null) { 188 // check to see if the connection has closed since last use 189 if (!conn.isDead()) { 190 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 191 return conn; 192 } 193 194 // conn is dead, and cannot be reused (reuse => false) 195 this.free(conn, false); 196 } 197 } while (conn != null); 198 199 // none free, so create a new connection 200 return (createConnection()); 201 } 202 203 /** 204 * Create a new connection to the remote endpoint of this channel. 205 * The returned connection is new. The caller must already have 206 * passed a security checkConnect or equivalent. 207 */ 208 private Connection createConnection() throws RemoteException { 209 Connection conn; 210 211 TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); 212 213 if (!usingMultiplexer) { 214 Socket sock = ep.newSocket(); 215 conn = new TCPConnection(this, sock); 216 217 try { 218 DataOutputStream out = 219 new DataOutputStream(conn.getOutputStream()); 220 writeTransportHeader(out); 221 222 // choose protocol (single op if not reusable socket) 223 if (!conn.isReusable()) { 224 out.writeByte(TransportConstants.SingleOpProtocol); 225 } else { 226 out.writeByte(TransportConstants.StreamProtocol); 227 out.flush(); 228 229 /* 230 * Set socket read timeout to configured value for JRMP 231 * connection handshake; this also serves to guard against 232 * non-JRMP servers that do not respond (see 4322806). 233 */ 234 int originalSoTimeout = 0; 235 try { 236 originalSoTimeout = sock.getSoTimeout(); 237 sock.setSoTimeout(handshakeTimeout); 238 } catch (Exception e) { 239 // if we fail to set this, ignore and proceed anyway 240 } 241 242 DataInputStream in = 243 new DataInputStream(conn.getInputStream()); 244 byte ack = in.readByte(); 245 if (ack != TransportConstants.ProtocolAck) { 246 throw new ConnectIOException( 247 ack == TransportConstants.ProtocolNack ? 248 "JRMP StreamProtocol not supported by server" : 249 "non-JRMP server at remote endpoint"); 250 } 251 252 String suggestedHost = in.readUTF(); 253 int suggestedPort = in.readInt(); 254 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 255 TCPTransport.tcpLog.log(Log.VERBOSE, 256 "server suggested " + suggestedHost + ":" + 257 suggestedPort); 258 } 259 260 // set local host name, if unknown 261 TCPEndpoint.setLocalHost(suggestedHost); 262 // do NOT set the default port, because we don't 263 // know if we can't listen YET... 264 265 // write out default endpoint to match protocol 266 // (but it serves no purpose) 267 TCPEndpoint localEp = 268 TCPEndpoint.getLocalEndpoint(0, null, null); 269 out.writeUTF(localEp.getHost()); 270 out.writeInt(localEp.getPort()); 271 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 272 TCPTransport.tcpLog.log(Log.VERBOSE, "using " + 273 localEp.getHost() + ":" + localEp.getPort()); 274 } 275 276 /* 277 * After JRMP handshake, set socket read timeout to value 278 * configured for the rest of the lifetime of the 279 * connection. NOTE: this timeout, if configured to a 280 * finite duration, places an upper bound on the time 281 * that a remote method call is permitted to execute. 282 */ 283 try { 284 /* 285 * If socket factory had set a non-zero timeout on its 286 * own, then restore it instead of using the property- 287 * configured value. 288 */ 289 sock.setSoTimeout((originalSoTimeout != 0 ? 290 originalSoTimeout : 291 responseTimeout)); 292 } catch (Exception e) { 293 // if we fail to set this, ignore and proceed anyway 294 } 295 296 out.flush(); 297 } 298 } catch (IOException e) { 299 try { 300 conn.close(); 301 } catch (Exception ex) {} 302 if (e instanceof RemoteException) { 303 throw (RemoteException) e; 304 } else { 305 throw new ConnectIOException( 306 "error during JRMP connection establishment", e); 307 } 308 } 309 } else { 310 try { 311 conn = multiplexer.openConnection(); 312 } catch (IOException e) { 313 synchronized (this) { 314 usingMultiplexer = false; 315 multiplexer = null; 316 } 317 throw new ConnectIOException( 318 "error opening virtual connection " + 319 "over multiplexed connection", e); 320 } 321 } 322 return conn; 323 } 324 325 /** 326 * Free the connection generated by this channel. 327 * @param conn The connection 328 * @param reuse If true, the connection is in a state in which it 329 * can be reused for another method call. 330 */ 331 public void free(Connection conn, boolean reuse) { 332 if (conn == null) return; 333 334 if (reuse && conn.isReusable()) { 335 long lastuse = System.currentTimeMillis(); 336 TCPConnection tcpConnection = (TCPConnection) conn; 337 338 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 339 340 /* 341 * Cache connection; if reaper task for expired 342 * connections isn't scheduled, then schedule it. 343 */ 344 synchronized (freeList) { 345 freeList.add(tcpConnection); 346 if (reaper == null) { 347 TCPTransport.tcpLog.log(Log.BRIEF, "create reaper"); 348 349 reaper = scheduler.scheduleWithFixedDelay( 350 new Runnable() { 351 public void run() { 352 TCPTransport.tcpLog.log(Log.VERBOSE, 353 "wake up"); 354 freeCachedConnections(); 355 } 356 }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS); 357 } 358 } 359 360 tcpConnection.setLastUseTime(lastuse); 361 tcpConnection.setExpiration(lastuse + idleTimeout); 362 } else { 363 TCPTransport.tcpLog.log(Log.BRIEF, "close connection"); 364 365 try { 366 conn.close(); 367 } catch (IOException ignored) { 368 } 369 } 370 } 371 372 /** 373 * Send transport header over stream. 374 */ 375 private void writeTransportHeader(DataOutputStream out) 376 throws RemoteException 377 { 378 try { 379 // write out transport header 380 DataOutputStream dataOut = 381 new DataOutputStream(out); 382 dataOut.writeInt(TransportConstants.Magic); 383 dataOut.writeShort(TransportConstants.Version); 384 } catch (IOException e) { 385 throw new ConnectIOException( 386 "error writing JRMP transport header", e); 387 } 388 } 389 390 /** 391 * Use given connection multiplexer object to obtain new connections 392 * through this channel. 393 */ 394 synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) { 395 // for now, always just use the last one given 396 multiplexer = newMultiplexer; 397 398 usingMultiplexer = true; 399 } 400 401 /** 402 * Accept a connection provided over a multiplexed channel. 403 */ 404 void acceptMultiplexConnection(Connection conn) { 405 if (acceptor == null) { 406 acceptor = new ConnectionAcceptor(tr); 407 acceptor.startNewAcceptor(); 408 } 409 acceptor.accept(conn); 410 } 411 412 /** 413 * Closes all the connections in the cache, whether timed out or not. 414 */ 415 public void shedCache() { 416 // Build a list of connections, to avoid holding the freeList 417 // lock during (potentially long-running) close() calls. 418 Connection[] conn; 419 synchronized (freeList) { 420 conn = freeList.toArray(new Connection[freeList.size()]); 421 freeList.clear(); 422 } 423 424 // Close all the connections that were free 425 for (int i = conn.length; --i >= 0; ) { 426 Connection c = conn[i]; 427 conn[i] = null; // help gc 428 try { 429 c.close(); 430 } catch (java.io.IOException e) { 431 // eat exception 432 } 433 } 434 } 435 436 private void freeCachedConnections() { 437 /* 438 * Remove each connection whose time out has expired. 439 */ 440 synchronized (freeList) { 441 int size = freeList.size(); 442 443 if (size > 0) { 444 long time = System.currentTimeMillis(); 445 ListIterator<TCPConnection> iter = freeList.listIterator(size); 446 447 while (iter.hasPrevious()) { 448 TCPConnection conn = iter.previous(); 449 if (conn.expired(time)) { 450 TCPTransport.tcpLog.log(Log.VERBOSE, 451 "connection timeout expired"); 452 453 try { 454 conn.close(); 455 } catch (java.io.IOException e) { 456 // eat exception 457 } 458 iter.remove(); 459 } 460 } 461 } 462 463 if (freeList.isEmpty()) { 464 reaper.cancel(false); 465 reaper = null; 466 } 467 } 468 } 469 } 470 471 /** 472 * ConnectionAcceptor manages accepting new connections and giving them 473 * to TCPTransport's message handler on new threads. 474 * 475 * Since this object only needs to know which transport to give new 476 * connections to, it doesn't need to be per-channel as currently 477 * implemented. 478 */ 479 class ConnectionAcceptor implements Runnable { 480 481 /** transport that will handle message on accepted connections */ 482 private TCPTransport transport; 483 484 /** queue of connections to be accepted */ 485 private List<Connection> queue = new ArrayList<>(); 486 487 /** thread ID counter */ 488 private static int threadNum = 0; 489 490 /** 491 * Create a new ConnectionAcceptor that will give connections 492 * to the specified transport on a new thread. 493 */ 494 public ConnectionAcceptor(TCPTransport transport) { 495 this.transport = transport; 496 } 497 498 /** 499 * Start a new thread to accept connections. 500 */ 501 public void startNewAcceptor() { 502 Thread t = AccessController.doPrivileged( 503 new NewThreadAction(ConnectionAcceptor.this, 504 "Multiplex Accept-" + ++ threadNum, 505 true)); 506 t.start(); 507 } 508 509 /** 510 * Add connection to queue of connections to be accepted. 511 */ 512 public void accept(Connection conn) { 513 synchronized (queue) { 514 queue.add(conn); 515 queue.notify(); 516 } 517 } 518 519 /** 520 * Give transport next accepted connection, when available. 521 */ 522 public void run() { 523 Connection conn; 524 525 synchronized (queue) { 526 while (queue.size() == 0) { 527 try { 528 queue.wait(); 529 } catch (InterruptedException e) { 530 } 531 } 532 startNewAcceptor(); 533 conn = queue.remove(0); 534 } 535 536 transport.handleMessages(conn, true); 537 } 538 }