1 /* 2 * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.lang.ref.Reference; 28 import java.lang.ref.SoftReference; 29 import java.lang.ref.WeakReference; 30 import java.lang.reflect.InvocationTargetException; 31 import java.io.DataInputStream; 32 import java.io.DataOutputStream; 33 import java.io.IOException; 34 import java.io.InputStream; 35 import java.io.OutputStream; 36 import java.io.BufferedInputStream; 37 import java.io.BufferedOutputStream; 38 import java.net.InetAddress; 39 import java.net.ServerSocket; 40 import java.net.Socket; 41 import java.rmi.RemoteException; 42 import java.rmi.server.ExportException; 43 import java.rmi.server.LogStream; 44 import java.rmi.server.RMIFailureHandler; 45 import java.rmi.server.RMISocketFactory; 46 import java.rmi.server.RemoteCall; 47 import java.rmi.server.ServerNotActiveException; 48 import java.rmi.server.UID; 49 import java.security.AccessControlContext; 50 import java.security.AccessController; 51 import java.util.ArrayList; 52 import java.util.LinkedList; 53 import java.util.List; 54 import java.util.Map; 55 import java.util.WeakHashMap; 56 import java.util.logging.Level; 57 import java.util.concurrent.ExecutorService; 58 import java.util.concurrent.RejectedExecutionException; 59 import java.util.concurrent.SynchronousQueue; 60 import java.util.concurrent.ThreadFactory; 61 import java.util.concurrent.ThreadPoolExecutor; 62 import java.util.concurrent.TimeUnit; 63 import java.util.concurrent.atomic.AtomicInteger; 64 import sun.rmi.runtime.Log; 65 import sun.rmi.runtime.NewThreadAction; 66 import sun.rmi.transport.Channel; 67 import sun.rmi.transport.Connection; 68 import sun.rmi.transport.DGCAckHandler; 69 import sun.rmi.transport.Endpoint; 70 import sun.rmi.transport.StreamRemoteCall; 71 import sun.rmi.transport.Target; 72 import sun.rmi.transport.Transport; 73 import sun.rmi.transport.TransportConstants; 74 import sun.rmi.transport.proxy.HttpReceiveSocket; 75 import sun.security.action.GetIntegerAction; 76 import sun.security.action.GetLongAction; 77 import sun.security.action.GetPropertyAction; 78 79 /** 80 * TCPTransport is the socket-based implementation of the RMI Transport 81 * abstraction. 82 * 83 * @author Ann Wollrath 84 * @author Peter Jones 85 */ 86 public class TCPTransport extends Transport { 87 88 /* tcp package log */ 89 static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp", 90 LogStream.parseLevel(AccessController.doPrivileged( 91 new GetPropertyAction("sun.rmi.transport.tcp.logLevel")))); 92 93 /** maximum number of connection handler threads */ 94 private static final int maxConnectionThreads = // default no limit 95 AccessController.doPrivileged( 96 new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads", 97 Integer.MAX_VALUE)); 98 99 /** keep alive time for idle connection handler threads */ 100 private static final long threadKeepAliveTime = // default 1 minute 101 AccessController.doPrivileged( 102 new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime", 103 60000)); 104 105 /** thread pool for connection handlers */ 106 private static final ExecutorService connectionThreadPool = 107 new ThreadPoolExecutor(0, maxConnectionThreads, 108 threadKeepAliveTime, TimeUnit.MILLISECONDS, 109 new SynchronousQueue<Runnable>(), 110 new ThreadFactory() { 111 public Thread newThread(Runnable runnable) { 112 return AccessController.doPrivileged(new NewThreadAction( 113 runnable, "TCP Connection(idle)", true, true)); 114 } 115 }); 116 117 /** total connections handled */ 118 private static final AtomicInteger connectionCount = new AtomicInteger(0); 119 120 /** client host for the current thread's connection */ 121 private static final ThreadLocal<ConnectionHandler> 122 threadConnectionHandler = new ThreadLocal<>(); 123 124 /** endpoints for this transport */ 125 private final LinkedList<TCPEndpoint> epList; 126 /** number of objects exported on this transport */ 127 private int exportCount = 0; 128 /** server socket for this transport */ 129 private ServerSocket server = null; 130 /** table mapping endpoints to channels */ 131 private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable = 132 new WeakHashMap<>(); 133 134 static final RMISocketFactory defaultSocketFactory = 135 RMISocketFactory.getDefaultSocketFactory(); 136 137 /** number of milliseconds in accepted-connection timeout. 138 * Warning: this should be greater than 15 seconds (the client-side 139 * timeout), and defaults to 2 hours. 140 * The maximum representable value is slightly more than 24 days 141 * and 20 hours. 142 */ 143 private static final int connectionReadTimeout = // default 2 hours 144 AccessController.doPrivileged( 145 new GetIntegerAction("sun.rmi.transport.tcp.readTimeout", 146 2 * 3600 * 1000)); 147 148 /** 149 * Constructs a TCPTransport. 150 */ 151 TCPTransport(LinkedList<TCPEndpoint> epList) { 152 // assert ((epList.size() != null) && (epList.size() >= 1)) 153 this.epList = epList; 154 if (tcpLog.isLoggable(Log.BRIEF)) { 155 tcpLog.log(Log.BRIEF, "Version = " + 156 TransportConstants.Version + ", ep = " + getEndpoint()); 157 } 158 } 159 160 /** 161 * Closes all cached connections in every channel subordinated to this 162 * transport. Currently, this only closes outgoing connections. 163 */ 164 public void shedConnectionCaches() { 165 List<TCPChannel> channels; 166 synchronized (channelTable) { 167 channels = new ArrayList<TCPChannel>(channelTable.values().size()); 168 for (Reference<TCPChannel> ref : channelTable.values()) { 169 TCPChannel ch = ref.get(); 170 if (ch != null) { 171 channels.add(ch); 172 } 173 } 174 } 175 for (TCPChannel channel : channels) { 176 channel.shedCache(); 177 } 178 } 179 180 /** 181 * Returns a <I>Channel</I> that generates connections to the 182 * endpoint <I>ep</I>. A Channel is an object that creates and 183 * manages connections of a particular type to some particular 184 * address space. 185 * @param ep the endpoint to which connections will be generated. 186 * @return the channel or null if the transport cannot 187 * generate connections to this endpoint 188 */ 189 public TCPChannel getChannel(Endpoint ep) { 190 TCPChannel ch = null; 191 if (ep instanceof TCPEndpoint) { 192 synchronized (channelTable) { 193 Reference<TCPChannel> ref = channelTable.get(ep); 194 if (ref != null) { 195 ch = ref.get(); 196 } 197 if (ch == null) { 198 TCPEndpoint tcpEndpoint = (TCPEndpoint) ep; 199 ch = new TCPChannel(this, tcpEndpoint); 200 channelTable.put(tcpEndpoint, 201 new WeakReference<TCPChannel>(ch)); 202 } 203 } 204 } 205 return ch; 206 } 207 208 /** 209 * Removes the <I>Channel</I> that generates connections to the 210 * endpoint <I>ep</I>. 211 */ 212 public void free(Endpoint ep) { 213 if (ep instanceof TCPEndpoint) { 214 synchronized (channelTable) { 215 Reference<TCPChannel> ref = channelTable.remove(ep); 216 if (ref != null) { 217 TCPChannel channel = ref.get(); 218 if (channel != null) { 219 channel.shedCache(); 220 } 221 } 222 } 223 } 224 } 225 226 /** 227 * Export the object so that it can accept incoming calls. 228 */ 229 public void exportObject(Target target) throws RemoteException { 230 /* 231 * Ensure that a server socket is listening, and count this 232 * export while synchronized to prevent the server socket from 233 * being closed due to concurrent unexports. 234 */ 235 synchronized (this) { 236 listen(); 237 exportCount++; 238 } 239 240 /* 241 * Try to add the Target to the exported object table; keep 242 * counting this export (to keep server socket open) only if 243 * that succeeds. 244 */ 245 boolean ok = false; 246 try { 247 super.exportObject(target); 248 ok = true; 249 } finally { 250 if (!ok) { 251 synchronized (this) { 252 decrementExportCount(); 253 } 254 } 255 } 256 } 257 258 protected synchronized void targetUnexported() { 259 decrementExportCount(); 260 } 261 262 /** 263 * Decrements the count of exported objects, closing the current 264 * server socket if the count reaches zero. 265 **/ 266 private void decrementExportCount() { 267 assert Thread.holdsLock(this); 268 exportCount--; 269 if (exportCount == 0 && getEndpoint().getListenPort() != 0) { 270 ServerSocket ss = server; 271 server = null; 272 try { 273 ss.close(); 274 } catch (IOException e) { 275 } 276 } 277 } 278 279 /** 280 * Verify that the current access control context has permission to 281 * accept the connection being dispatched by the current thread. 282 */ 283 protected void checkAcceptPermission(AccessControlContext acc) { 284 SecurityManager sm = System.getSecurityManager(); 285 if (sm == null) { 286 return; 287 } 288 ConnectionHandler h = threadConnectionHandler.get(); 289 if (h == null) { 290 throw new Error( 291 "checkAcceptPermission not in ConnectionHandler thread"); 292 } 293 h.checkAcceptPermission(sm, acc); 294 } 295 296 private TCPEndpoint getEndpoint() { 297 synchronized (epList) { 298 return epList.getLast(); 299 } 300 } 301 302 /** 303 * Listen on transport's endpoint. 304 */ 305 private void listen() throws RemoteException { 306 assert Thread.holdsLock(this); 307 TCPEndpoint ep = getEndpoint(); 308 int port = ep.getPort(); 309 310 if (server == null) { 311 if (tcpLog.isLoggable(Log.BRIEF)) { 312 tcpLog.log(Log.BRIEF, 313 "(port " + port + ") create server socket"); 314 } 315 316 try { 317 server = ep.newServerSocket(); 318 /* 319 * Don't retry ServerSocket if creation fails since 320 * "port in use" will cause export to hang if an 321 * RMIFailureHandler is not installed. 322 */ 323 Thread t = AccessController.doPrivileged( 324 new NewThreadAction(new AcceptLoop(server), 325 "TCP Accept-" + port, true)); 326 t.start(); 327 } catch (java.net.BindException e) { 328 throw new ExportException("Port already in use: " + port, e); 329 } catch (IOException e) { 330 throw new ExportException("Listen failed on port: " + port, e); 331 } 332 333 } else { 334 // otherwise verify security access to existing server socket 335 SecurityManager sm = System.getSecurityManager(); 336 if (sm != null) { 337 sm.checkListen(port); 338 } 339 } 340 } 341 342 /** 343 * Worker for accepting connections from a server socket. 344 **/ 345 private class AcceptLoop implements Runnable { 346 347 private final ServerSocket serverSocket; 348 349 // state for throttling loop on exceptions (local to accept thread) 350 private long lastExceptionTime = 0L; 351 private int recentExceptionCount; 352 353 AcceptLoop(ServerSocket serverSocket) { 354 this.serverSocket = serverSocket; 355 } 356 357 public void run() { 358 try { 359 executeAcceptLoop(); 360 } finally { 361 try { 362 /* 363 * Only one accept loop is started per server 364 * socket, so after no more connections will be 365 * accepted, ensure that the server socket is no 366 * longer listening. 367 */ 368 serverSocket.close(); 369 } catch (IOException e) { 370 } 371 } 372 } 373 374 /** 375 * Accepts connections from the server socket and executes 376 * handlers for them in the thread pool. 377 **/ 378 private void executeAcceptLoop() { 379 if (tcpLog.isLoggable(Log.BRIEF)) { 380 tcpLog.log(Log.BRIEF, "listening on port " + 381 getEndpoint().getPort()); 382 } 383 384 while (true) { 385 Socket socket = null; 386 try { 387 socket = serverSocket.accept(); 388 389 /* 390 * Find client host name (or "0.0.0.0" if unknown) 391 */ 392 InetAddress clientAddr = socket.getInetAddress(); 393 String clientHost = (clientAddr != null 394 ? clientAddr.getHostAddress() 395 : "0.0.0.0"); 396 397 /* 398 * Execute connection handler in the thread pool, 399 * which uses non-system threads. 400 */ 401 try { 402 connectionThreadPool.execute( 403 new ConnectionHandler(socket, clientHost)); 404 } catch (RejectedExecutionException e) { 405 closeSocket(socket); 406 tcpLog.log(Log.BRIEF, 407 "rejected connection from " + clientHost); 408 } 409 410 } catch (Throwable t) { 411 try { 412 /* 413 * If the server socket has been closed, such 414 * as because there are no more exported 415 * objects, then we expect accept to throw an 416 * exception, so just terminate normally. 417 */ 418 if (serverSocket.isClosed()) { 419 break; 420 } 421 422 try { 423 if (tcpLog.isLoggable(Level.WARNING)) { 424 tcpLog.log(Level.WARNING, 425 "accept loop for " + serverSocket + 426 " throws", t); 427 } 428 } catch (Throwable tt) { 429 } 430 } finally { 431 /* 432 * Always close the accepted socket (if any) 433 * if an exception occurs, but only after 434 * logging an unexpected exception. 435 */ 436 if (socket != null) { 437 closeSocket(socket); 438 } 439 } 440 441 /* 442 * In case we're running out of file descriptors, 443 * release resources held in caches. 444 */ 445 if (!(t instanceof SecurityException)) { 446 try { 447 TCPEndpoint.shedConnectionCaches(); 448 } catch (Throwable tt) { 449 } 450 } 451 452 /* 453 * A NoClassDefFoundError can occur if no file 454 * descriptors are available, in which case this 455 * loop should not terminate. 456 */ 457 if (t instanceof Exception || 458 t instanceof OutOfMemoryError || 459 t instanceof NoClassDefFoundError) 460 { 461 if (!continueAfterAcceptFailure(t)) { 462 return; 463 } 464 // continue loop 465 } else { 466 throw (Error) t; 467 } 468 } 469 } 470 } 471 472 /** 473 * Returns true if the accept loop should continue after the 474 * specified exception has been caught, or false if the accept 475 * loop should terminate (closing the server socket). If 476 * there is an RMIFailureHandler, this method returns the 477 * result of passing the specified exception to it; otherwise, 478 * this method always returns true, after sleeping to throttle 479 * the accept loop if necessary. 480 **/ 481 private boolean continueAfterAcceptFailure(Throwable t) { 482 RMIFailureHandler fh = RMISocketFactory.getFailureHandler(); 483 if (fh != null) { 484 return fh.failure(t instanceof Exception ? (Exception) t : 485 new InvocationTargetException(t)); 486 } else { 487 throttleLoopOnException(); 488 return true; 489 } 490 } 491 492 /** 493 * Throttles the accept loop after an exception has been 494 * caught: if a burst of 10 exceptions in 5 seconds occurs, 495 * then wait for 10 seconds to curb busy CPU usage. 496 **/ 497 private void throttleLoopOnException() { 498 long now = System.currentTimeMillis(); 499 if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) { 500 // last exception was long ago (or this is the first) 501 lastExceptionTime = now; 502 recentExceptionCount = 0; 503 } else { 504 // exception burst window was started recently 505 if (++recentExceptionCount >= 10) { 506 try { 507 Thread.sleep(10000); 508 } catch (InterruptedException ignore) { 509 } 510 } 511 } 512 } 513 } 514 515 /** close socket and eat exception */ 516 private static void closeSocket(Socket sock) { 517 try { 518 sock.close(); 519 } catch (IOException ex) { 520 // eat exception 521 } 522 } 523 524 /** 525 * handleMessages decodes transport operations and handles messages 526 * appropriately. If an exception occurs during message handling, 527 * the socket is closed. 528 */ 529 void handleMessages(Connection conn, boolean persistent) { 530 int port = getEndpoint().getPort(); 531 532 try { 533 DataInputStream in = new DataInputStream(conn.getInputStream()); 534 do { 535 int op = in.read(); // transport op 536 if (op == -1) { 537 if (tcpLog.isLoggable(Log.BRIEF)) { 538 tcpLog.log(Log.BRIEF, "(port " + 539 port + ") connection closed"); 540 } 541 break; 542 } 543 544 if (tcpLog.isLoggable(Log.BRIEF)) { 545 tcpLog.log(Log.BRIEF, "(port " + port + 546 ") op = " + op); 547 } 548 549 switch (op) { 550 case TransportConstants.Call: 551 // service incoming RMI call 552 RemoteCall call = new StreamRemoteCall(conn); 553 if (serviceCall(call) == false) 554 return; 555 break; 556 557 case TransportConstants.Ping: 558 // send ack for ping 559 DataOutputStream out = 560 new DataOutputStream(conn.getOutputStream()); 561 out.writeByte(TransportConstants.PingAck); 562 conn.releaseOutputStream(); 563 break; 564 565 case TransportConstants.DGCAck: 566 DGCAckHandler.received(UID.read(in)); 567 break; 568 569 default: 570 throw new IOException("unknown transport op " + op); 571 } 572 } while (persistent); 573 574 } catch (IOException e) { 575 // exception during processing causes connection to close (below) 576 if (tcpLog.isLoggable(Log.BRIEF)) { 577 tcpLog.log(Log.BRIEF, "(port " + port + 578 ") exception: ", e); 579 } 580 } finally { 581 try { 582 conn.close(); 583 } catch (IOException ex) { 584 // eat exception 585 } 586 } 587 } 588 589 /** 590 * Returns the client host for the current thread's connection. Throws 591 * ServerNotActiveException if no connection is active for this thread. 592 */ 593 public static String getClientHost() throws ServerNotActiveException { 594 ConnectionHandler h = threadConnectionHandler.get(); 595 if (h != null) { 596 return h.getClientHost(); 597 } else { 598 throw new ServerNotActiveException("not in a remote call"); 599 } 600 } 601 602 /** 603 * Services messages on accepted connection 604 */ 605 private class ConnectionHandler implements Runnable { 606 607 /** int value of "POST" in ASCII (Java's specified data formats 608 * make this once-reviled tactic again socially acceptable) */ 609 private static final int POST = 0x504f5354; 610 611 /** most recently accept-authorized AccessControlContext */ 612 private AccessControlContext okContext; 613 /** cache of accept-authorized AccessControlContexts */ 614 private Map<AccessControlContext, 615 Reference<AccessControlContext>> authCache; 616 /** security manager which authorized contexts in authCache */ 617 private SecurityManager cacheSecurityManager = null; 618 619 private Socket socket; 620 private String remoteHost; 621 622 ConnectionHandler(Socket socket, String remoteHost) { 623 this.socket = socket; 624 this.remoteHost = remoteHost; 625 } 626 627 String getClientHost() { 628 return remoteHost; 629 } 630 631 /** 632 * Verify that the given AccessControlContext has permission to 633 * accept this connection. 634 */ 635 void checkAcceptPermission(SecurityManager sm, 636 AccessControlContext acc) 637 { 638 /* 639 * Note: no need to synchronize on cache-related fields, since this 640 * method only gets called from the ConnectionHandler's thread. 641 */ 642 if (sm != cacheSecurityManager) { 643 okContext = null; 644 authCache = new WeakHashMap<AccessControlContext, 645 Reference<AccessControlContext>>(); 646 cacheSecurityManager = sm; 647 } 648 if (acc.equals(okContext) || authCache.containsKey(acc)) { 649 return; 650 } 651 InetAddress addr = socket.getInetAddress(); 652 String host = (addr != null) ? addr.getHostAddress() : "*"; 653 654 sm.checkAccept(host, socket.getPort()); 655 656 authCache.put(acc, new SoftReference<AccessControlContext>(acc)); 657 okContext = acc; 658 } 659 660 public void run() { 661 Thread t = Thread.currentThread(); 662 String name = t.getName(); 663 try { 664 t.setName("RMI TCP Connection(" + 665 connectionCount.incrementAndGet() + 666 ")-" + remoteHost); 667 run0(); 668 } finally { 669 t.setName(name); 670 } 671 } 672 673 private void run0() { 674 TCPEndpoint endpoint = getEndpoint(); 675 int port = endpoint.getPort(); 676 677 threadConnectionHandler.set(this); 678 679 // set socket to disable Nagle's algorithm (always send 680 // immediately) 681 // TBD: should this be left up to socket factory instead? 682 try { 683 socket.setTcpNoDelay(true); 684 } catch (Exception e) { 685 // if we fail to set this, ignore and proceed anyway 686 } 687 // set socket to timeout after excessive idle time 688 try { 689 if (connectionReadTimeout > 0) 690 socket.setSoTimeout(connectionReadTimeout); 691 } catch (Exception e) { 692 // too bad, continue anyway 693 } 694 695 try { 696 InputStream sockIn = socket.getInputStream(); 697 InputStream bufIn = sockIn.markSupported() 698 ? sockIn 699 : new BufferedInputStream(sockIn); 700 701 // Read magic (or HTTP wrapper) 702 bufIn.mark(4); 703 DataInputStream in = new DataInputStream(bufIn); 704 int magic = in.readInt(); 705 706 if (magic == POST) { 707 tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call"); 708 709 // It's really a HTTP-wrapped request. Repackage 710 // the socket in a HttpReceiveSocket, reinitialize 711 // sockIn and in, and reread magic. 712 bufIn.reset(); // unread "POST" 713 714 try { 715 socket = new HttpReceiveSocket(socket, bufIn, null); 716 remoteHost = "0.0.0.0"; 717 sockIn = socket.getInputStream(); 718 bufIn = new BufferedInputStream(sockIn); 719 in = new DataInputStream(bufIn); 720 magic = in.readInt(); 721 722 } catch (IOException e) { 723 throw new RemoteException("Error HTTP-unwrapping call", 724 e); 725 } 726 } 727 // bufIn's mark will invalidate itself when it overflows 728 // so it doesn't have to be turned off 729 730 // read and verify transport header 731 short version = in.readShort(); 732 if (magic != TransportConstants.Magic || 733 version != TransportConstants.Version) { 734 // protocol mismatch detected... 735 // just close socket: this would recurse if we marshal an 736 // exception to the client and the protocol at other end 737 // doesn't match. 738 closeSocket(socket); 739 return; 740 } 741 742 OutputStream sockOut = socket.getOutputStream(); 743 BufferedOutputStream bufOut = 744 new BufferedOutputStream(sockOut); 745 DataOutputStream out = new DataOutputStream(bufOut); 746 747 int remotePort = socket.getPort(); 748 749 if (tcpLog.isLoggable(Log.BRIEF)) { 750 tcpLog.log(Log.BRIEF, "accepted socket from [" + 751 remoteHost + ":" + remotePort + "]"); 752 } 753 754 TCPEndpoint ep; 755 TCPChannel ch; 756 TCPConnection conn; 757 758 // send ack (or nack) for protocol 759 byte protocol = in.readByte(); 760 switch (protocol) { 761 case TransportConstants.SingleOpProtocol: 762 // no ack for protocol 763 764 // create dummy channel for receiving messages 765 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 766 endpoint.getClientSocketFactory(), 767 endpoint.getServerSocketFactory()); 768 ch = new TCPChannel(TCPTransport.this, ep); 769 conn = new TCPConnection(ch, socket, bufIn, bufOut); 770 771 // read input messages 772 handleMessages(conn, false); 773 break; 774 775 case TransportConstants.StreamProtocol: 776 // send ack 777 out.writeByte(TransportConstants.ProtocolAck); 778 779 // suggest endpoint (in case client doesn't know host name) 780 if (tcpLog.isLoggable(Log.VERBOSE)) { 781 tcpLog.log(Log.VERBOSE, "(port " + port + 782 ") " + "suggesting " + remoteHost + ":" + 783 remotePort); 784 } 785 786 out.writeUTF(remoteHost); 787 out.writeInt(remotePort); 788 out.flush(); 789 790 // read and discard (possibly bogus) endpoint 791 // REMIND: would be faster to read 2 bytes then skip N+4 792 String clientHost = in.readUTF(); 793 int clientPort = in.readInt(); 794 if (tcpLog.isLoggable(Log.VERBOSE)) { 795 tcpLog.log(Log.VERBOSE, "(port " + port + 796 ") client using " + clientHost + ":" + clientPort); 797 } 798 799 // create dummy channel for receiving messages 800 // (why not use clientHost and clientPort?) 801 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 802 endpoint.getClientSocketFactory(), 803 endpoint.getServerSocketFactory()); 804 ch = new TCPChannel(TCPTransport.this, ep); 805 conn = new TCPConnection(ch, socket, bufIn, bufOut); 806 807 // read input messages 808 handleMessages(conn, true); 809 break; 810 811 case TransportConstants.MultiplexProtocol: 812 if (tcpLog.isLoggable(Log.VERBOSE)) { 813 tcpLog.log(Log.VERBOSE, "(port " + port + 814 ") accepting multiplex protocol"); 815 } 816 817 // send ack 818 out.writeByte(TransportConstants.ProtocolAck); 819 820 // suggest endpoint (in case client doesn't already have one) 821 if (tcpLog.isLoggable(Log.VERBOSE)) { 822 tcpLog.log(Log.VERBOSE, "(port " + port + 823 ") suggesting " + remoteHost + ":" + remotePort); 824 } 825 826 out.writeUTF(remoteHost); 827 out.writeInt(remotePort); 828 out.flush(); 829 830 // read endpoint client has decided to use 831 ep = new TCPEndpoint(in.readUTF(), in.readInt(), 832 endpoint.getClientSocketFactory(), 833 endpoint.getServerSocketFactory()); 834 if (tcpLog.isLoggable(Log.VERBOSE)) { 835 tcpLog.log(Log.VERBOSE, "(port " + 836 port + ") client using " + 837 ep.getHost() + ":" + ep.getPort()); 838 } 839 840 ConnectionMultiplexer multiplexer; 841 synchronized (channelTable) { 842 // create or find channel for this endpoint 843 ch = getChannel(ep); 844 multiplexer = 845 new ConnectionMultiplexer(ch, bufIn, sockOut, 846 false); 847 ch.useMultiplexer(multiplexer); 848 } 849 multiplexer.run(); 850 break; 851 852 default: 853 // protocol not understood, send nack and close socket 854 out.writeByte(TransportConstants.ProtocolNack); 855 out.flush(); 856 break; 857 } 858 859 } catch (IOException e) { 860 // socket in unknown state: destroy socket 861 tcpLog.log(Log.BRIEF, "terminated with exception:", e); 862 } finally { 863 closeSocket(socket); 864 } 865 } 866 } 867 }