1 /* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.io.DataInputStream; 28 import java.io.DataOutputStream; 29 import java.io.IOException; 30 import java.lang.ref.Reference; 31 import java.lang.ref.SoftReference; 32 import java.net.Socket; 33 import java.rmi.ConnectIOException; 34 import java.rmi.RemoteException; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedAction; 38 import java.util.ArrayList; 39 import java.util.List; 40 import java.util.ListIterator; 41 import java.util.WeakHashMap; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.TimeUnit; 45 import sun.rmi.runtime.Log; 46 import sun.rmi.runtime.NewThreadAction; 47 import sun.rmi.runtime.RuntimeUtil; 48 import sun.rmi.transport.Channel; 49 import sun.rmi.transport.Connection; 50 import sun.rmi.transport.Endpoint; 51 import sun.rmi.transport.TransportConstants; 52 53 /** 54 * TCPChannel is the socket-based implementation of the RMI Channel 55 * abstraction. 56 * 57 * @author Ann Wollrath 58 */ 59 public class TCPChannel implements Channel { 60 /** endpoint for this channel */ 61 private final TCPEndpoint ep; 62 /** transport for this channel */ 63 private final TCPTransport tr; 64 /** list of cached connections */ 65 private final List<TCPConnection> freeList = 66 new ArrayList<>(); 67 /** frees cached connections that have expired (guarded by freeList) */ 68 private Future<?> reaper = null; 69 70 /** using multiplexer (for bi-directional applet communication */ 71 private boolean usingMultiplexer = false; 72 /** connection multiplexer, if used */ 73 private ConnectionMultiplexer multiplexer = null; 74 /** connection acceptor (should be in TCPTransport) */ 75 private ConnectionAcceptor acceptor; 76 77 /** most recently authorized AccessControlContext */ 78 private AccessControlContext okContext; 79 80 /** cache of authorized AccessControlContexts */ 81 private WeakHashMap<AccessControlContext, 82 Reference<AccessControlContext>> authcache; 83 84 /** the SecurityManager which authorized okContext and authcache */ 85 private SecurityManager cacheSecurityManager = null; 86 87 /** client-side connection idle usage timeout */ 88 private static final long idleTimeout = // default 15 seconds 89 AccessController.doPrivileged((PrivilegedAction<Long>) () -> 90 Long.getLong("sun.rmi.transport.connectionTimeout", 15000)); 91 92 /** client-side connection handshake read timeout */ 93 private static final int handshakeTimeout = // default 1 minute 94 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 95 Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000)); 96 97 /** client-side connection response read timeout (after handshake) */ 98 private static final int responseTimeout = // default infinity 99 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 100 Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0)); 101 102 /** thread pool for scheduling delayed tasks */ 103 private static final ScheduledExecutorService scheduler = 104 AccessController.doPrivileged( 105 new RuntimeUtil.GetInstanceAction()).getScheduler(); 106 107 /** 108 * Create channel for endpoint. 109 */ 110 TCPChannel(TCPTransport tr, TCPEndpoint ep) { 111 this.tr = tr; 112 this.ep = ep; 113 } 114 115 /** 116 * Return the endpoint for this channel. 117 */ 118 public Endpoint getEndpoint() { 119 return ep; 120 } 121 122 /** 123 * Checks if the current caller has sufficient privilege to make 124 * a connection to the remote endpoint. 125 * @exception SecurityException if caller is not allowed to use this 126 * Channel. 127 */ 128 private void checkConnectPermission() throws SecurityException { 129 SecurityManager security = System.getSecurityManager(); 130 if (security == null) 131 return; 132 133 if (security != cacheSecurityManager) { 134 // The security manager changed: flush the cache 135 okContext = null; 136 authcache = new WeakHashMap<AccessControlContext, 137 Reference<AccessControlContext>>(); 138 cacheSecurityManager = security; 139 } 140 141 AccessControlContext ctx = AccessController.getContext(); 142 143 // If ctx is the same context as last time, or if it 144 // appears in the cache, bypass the checkConnect. 145 if (okContext == null || 146 !(okContext.equals(ctx) || authcache.containsKey(ctx))) 147 { 148 security.checkConnect(ep.getHost(), ep.getPort()); 149 authcache.put(ctx, new SoftReference<AccessControlContext>(ctx)); 150 // A WeakHashMap is transformed into a SoftHashSet by making 151 // each value softly refer to its own key (Peter's idea). 152 } 153 okContext = ctx; 154 } 155 156 /** 157 * Supplies a connection to the endpoint of the address space 158 * for which this is a channel. The returned connection may 159 * be one retrieved from a cache of idle connections. 160 */ 161 public Connection newConnection() throws RemoteException { 162 TCPConnection conn; 163 164 // loop until we find a free live connection (in which case 165 // we return) or until we run out of freelist (in which case 166 // the loop exits) 167 do { 168 conn = null; 169 // try to get a free connection 170 synchronized (freeList) { 171 int elementPos = freeList.size()-1; 172 173 if (elementPos >= 0) { 174 // If there is a security manager, make sure 175 // the caller is allowed to connect to the 176 // requested endpoint. 177 checkConnectPermission(); 178 conn = freeList.get(elementPos); 179 freeList.remove(elementPos); 180 } 181 } 182 183 // at this point, conn is null iff the freelist is empty, 184 // and nonnull if a free connection of uncertain vitality 185 // has been found. 186 187 if (conn != null) { 188 // check to see if the connection has closed since last use 189 if (!conn.isDead()) { 190 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 191 return conn; 192 } 193 194 // conn is dead, and cannot be reused (reuse => false) 195 this.free(conn, false); 196 } 197 } while (conn != null); 198 199 // none free, so create a new connection 200 return (createConnection()); 201 } 202 203 /** 204 * Create a new connection to the remote endpoint of this channel. 205 * The returned connection is new. The caller must already have 206 * passed a security checkConnect or equivalent. 207 */ 208 private Connection createConnection() throws RemoteException { 209 Connection conn; 210 211 TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); 212 213 if (!usingMultiplexer) { 214 Socket sock = ep.newSocket(); 215 conn = new TCPConnection(this, sock); 216 217 try { 218 DataOutputStream out = 219 new DataOutputStream(conn.getOutputStream()); 220 writeTransportHeader(out); 221 222 // choose protocol (single op if not reusable socket) 223 if (!conn.isReusable()) { 224 out.writeByte(TransportConstants.SingleOpProtocol); 225 } else { 226 out.writeByte(TransportConstants.StreamProtocol); 227 out.flush(); 228 229 /* 230 * Set socket read timeout to configured value for JRMP 231 * connection handshake; this also serves to guard against 232 * non-JRMP servers that do not respond (see 4322806). 233 */ 234 int originalSoTimeout = 0; 235 try { 236 originalSoTimeout = sock.getSoTimeout(); 237 sock.setSoTimeout(handshakeTimeout); 238 } catch (Exception e) { 239 // if we fail to set this, ignore and proceed anyway 240 } 241 242 DataInputStream in = 243 new DataInputStream(conn.getInputStream()); 244 byte ack = in.readByte(); 245 if (ack != TransportConstants.ProtocolAck) { 246 throw new ConnectIOException( 247 ack == TransportConstants.ProtocolNack ? 248 "JRMP StreamProtocol not supported by server" : 249 "non-JRMP server at remote endpoint"); 250 } 251 252 String suggestedHost = in.readUTF(); 253 int suggestedPort = in.readInt(); 254 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 255 TCPTransport.tcpLog.log(Log.VERBOSE, 256 "server suggested " + suggestedHost + ":" + 257 suggestedPort); 258 } 259 260 // set local host name, if unknown 261 TCPEndpoint.setLocalHost(suggestedHost); 262 // do NOT set the default port, because we don't 263 // know if we can't listen YET... 264 265 // write out default endpoint to match protocol 266 // (but it serves no purpose) 267 TCPEndpoint localEp = 268 TCPEndpoint.getLocalEndpoint(0, null, null); 269 out.writeUTF(localEp.getHost()); 270 out.writeInt(localEp.getPort()); 271 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 272 TCPTransport.tcpLog.log(Log.VERBOSE, "using " + 273 localEp.getHost() + ":" + localEp.getPort()); 274 } 275 276 /* 277 * After JRMP handshake, set socket read timeout to value 278 * configured for the rest of the lifetime of the 279 * connection. NOTE: this timeout, if configured to a 280 * finite duration, places an upper bound on the time 281 * that a remote method call is permitted to execute. 282 */ 283 try { 284 /* 285 * If socket factory had set a non-zero timeout on its 286 * own, then restore it instead of using the property- 287 * configured value. 288 */ 289 sock.setSoTimeout((originalSoTimeout != 0 ? 290 originalSoTimeout : 291 responseTimeout)); 292 } catch (Exception e) { 293 // if we fail to set this, ignore and proceed anyway 294 } 295 296 out.flush(); 297 } 298 } catch (IOException e) { 299 if (e instanceof RemoteException) { 300 throw (RemoteException) e; 301 } else { 302 if (conn != null 303 && e instanceof java.net.SocketTimeoutException) 304 { 305 try { 306 conn.close(); 307 } catch (Exception ex) {} 308 } 309 throw new ConnectIOException( 310 "error during JRMP connection establishment", e); 311 } 312 } 313 } else { 314 try { 315 conn = multiplexer.openConnection(); 316 } catch (IOException e) { 317 synchronized (this) { 318 usingMultiplexer = false; 319 multiplexer = null; 320 } 321 throw new ConnectIOException( 322 "error opening virtual connection " + 323 "over multiplexed connection", e); 324 } 325 } 326 return conn; 327 } 328 329 /** 330 * Free the connection generated by this channel. 331 * @param conn The connection 332 * @param reuse If true, the connection is in a state in which it 333 * can be reused for another method call. 334 */ 335 public void free(Connection conn, boolean reuse) { 336 if (conn == null) return; 337 338 if (reuse && conn.isReusable()) { 339 long lastuse = System.currentTimeMillis(); 340 TCPConnection tcpConnection = (TCPConnection) conn; 341 342 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 343 344 /* 345 * Cache connection; if reaper task for expired 346 * connections isn't scheduled, then schedule it. 347 */ 348 synchronized (freeList) { 349 freeList.add(tcpConnection); 350 if (reaper == null) { 351 TCPTransport.tcpLog.log(Log.BRIEF, "create reaper"); 352 353 reaper = scheduler.scheduleWithFixedDelay( 354 new Runnable() { 355 public void run() { 356 TCPTransport.tcpLog.log(Log.VERBOSE, 357 "wake up"); 358 freeCachedConnections(); 359 } 360 }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS); 361 } 362 } 363 364 tcpConnection.setLastUseTime(lastuse); 365 tcpConnection.setExpiration(lastuse + idleTimeout); 366 } else { 367 TCPTransport.tcpLog.log(Log.BRIEF, "close connection"); 368 369 try { 370 conn.close(); 371 } catch (IOException ignored) { 372 } 373 } 374 } 375 376 /** 377 * Send transport header over stream. 378 */ 379 private void writeTransportHeader(DataOutputStream out) 380 throws RemoteException 381 { 382 try { 383 // write out transport header 384 DataOutputStream dataOut = 385 new DataOutputStream(out); 386 dataOut.writeInt(TransportConstants.Magic); 387 dataOut.writeShort(TransportConstants.Version); 388 } catch (IOException e) { 389 throw new ConnectIOException( 390 "error writing JRMP transport header", e); 391 } 392 } 393 394 /** 395 * Use given connection multiplexer object to obtain new connections 396 * through this channel. 397 */ 398 synchronized void useMultiplexer(ConnectionMultiplexer newMultiplexer) { 399 // for now, always just use the last one given 400 multiplexer = newMultiplexer; 401 402 usingMultiplexer = true; 403 } 404 405 /** 406 * Accept a connection provided over a multiplexed channel. 407 */ 408 void acceptMultiplexConnection(Connection conn) { 409 if (acceptor == null) { 410 acceptor = new ConnectionAcceptor(tr); 411 acceptor.startNewAcceptor(); 412 } 413 acceptor.accept(conn); 414 } 415 416 /** 417 * Closes all the connections in the cache, whether timed out or not. 418 */ 419 public void shedCache() { 420 // Build a list of connections, to avoid holding the freeList 421 // lock during (potentially long-running) close() calls. 422 Connection[] conn; 423 synchronized (freeList) { 424 conn = freeList.toArray(new Connection[freeList.size()]); 425 freeList.clear(); 426 } 427 428 // Close all the connections that were free 429 for (int i = conn.length; --i >= 0; ) { 430 Connection c = conn[i]; 431 conn[i] = null; // help gc 432 try { 433 c.close(); 434 } catch (java.io.IOException e) { 435 // eat exception 436 } 437 } 438 } 439 440 private void freeCachedConnections() { 441 /* 442 * Remove each connection whose time out has expired. 443 */ 444 synchronized (freeList) { 445 int size = freeList.size(); 446 447 if (size > 0) { 448 long time = System.currentTimeMillis(); 449 ListIterator<TCPConnection> iter = freeList.listIterator(size); 450 451 while (iter.hasPrevious()) { 452 TCPConnection conn = iter.previous(); 453 if (conn.expired(time)) { 454 TCPTransport.tcpLog.log(Log.VERBOSE, 455 "connection timeout expired"); 456 457 try { 458 conn.close(); 459 } catch (java.io.IOException e) { 460 // eat exception 461 } 462 iter.remove(); 463 } 464 } 465 } 466 467 if (freeList.isEmpty()) { 468 reaper.cancel(false); 469 reaper = null; 470 } 471 } 472 } 473 } 474 475 /** 476 * ConnectionAcceptor manages accepting new connections and giving them 477 * to TCPTransport's message handler on new threads. 478 * 479 * Since this object only needs to know which transport to give new 480 * connections to, it doesn't need to be per-channel as currently 481 * implemented. 482 */ 483 class ConnectionAcceptor implements Runnable { 484 485 /** transport that will handle message on accepted connections */ 486 private TCPTransport transport; 487 488 /** queue of connections to be accepted */ 489 private List<Connection> queue = new ArrayList<>(); 490 491 /** thread ID counter */ 492 private static int threadNum = 0; 493 494 /** 495 * Create a new ConnectionAcceptor that will give connections 496 * to the specified transport on a new thread. 497 */ 498 public ConnectionAcceptor(TCPTransport transport) { 499 this.transport = transport; 500 } 501 502 /** 503 * Start a new thread to accept connections. 504 */ 505 public void startNewAcceptor() { 506 Thread t = AccessController.doPrivileged( 507 new NewThreadAction(ConnectionAcceptor.this, 508 "Multiplex Accept-" + ++ threadNum, 509 true)); 510 t.start(); 511 } 512 513 /** 514 * Add connection to queue of connections to be accepted. 515 */ 516 public void accept(Connection conn) { 517 synchronized (queue) { 518 queue.add(conn); 519 queue.notify(); 520 } 521 } 522 523 /** 524 * Give transport next accepted connection, when available. 525 */ 526 public void run() { 527 Connection conn; 528 529 synchronized (queue) { 530 while (queue.size() == 0) { 531 try { 532 queue.wait(); 533 } catch (InterruptedException e) { 534 } 535 } 536 startNewAcceptor(); 537 conn = queue.remove(0); 538 } 539 540 transport.handleMessages(conn, true); 541 } 542 }