1 /* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.lang.ref.Reference; 28 import java.lang.ref.SoftReference; 29 import java.lang.ref.WeakReference; 30 import java.lang.reflect.InvocationTargetException; 31 import java.lang.reflect.UndeclaredThrowableException; 32 import java.io.DataInputStream; 33 import java.io.DataOutputStream; 34 import java.io.IOException; 35 import java.io.InputStream; 36 import java.io.OutputStream; 37 import java.io.BufferedInputStream; 38 import java.io.BufferedOutputStream; 39 import java.net.InetAddress; 40 import java.net.ServerSocket; 41 import java.net.Socket; 42 import java.rmi.RemoteException; 43 import java.rmi.server.ExportException; 44 import java.rmi.server.LogStream; 45 import java.rmi.server.RMIFailureHandler; 46 import java.rmi.server.RMISocketFactory; 47 import java.rmi.server.RemoteCall; 48 import java.rmi.server.ServerNotActiveException; 49 import java.rmi.server.UID; 50 import java.security.AccessControlContext; 51 import java.security.AccessController; 52 import java.util.ArrayList; 53 import java.util.LinkedList; 54 import java.util.List; 55 import java.util.Map; 56 import java.util.WeakHashMap; 57 import java.util.logging.Level; 58 import java.util.concurrent.ExecutorService; 59 import java.util.concurrent.RejectedExecutionException; 60 import java.util.concurrent.SynchronousQueue; 61 import java.util.concurrent.ThreadFactory; 62 import java.util.concurrent.ThreadPoolExecutor; 63 import java.util.concurrent.TimeUnit; 64 import java.util.concurrent.atomic.AtomicInteger; 65 import sun.rmi.runtime.Log; 66 import sun.rmi.runtime.NewThreadAction; 67 import sun.rmi.transport.Channel; 68 import sun.rmi.transport.Connection; 69 import sun.rmi.transport.DGCAckHandler; 70 import sun.rmi.transport.Endpoint; 71 import sun.rmi.transport.StreamRemoteCall; 72 import sun.rmi.transport.Target; 73 import sun.rmi.transport.Transport; 74 import sun.rmi.transport.TransportConstants; 75 import sun.rmi.transport.proxy.HttpReceiveSocket; 76 import sun.security.action.GetIntegerAction; 77 import sun.security.action.GetLongAction; 78 import sun.security.action.GetPropertyAction; 79 80 /** 81 * TCPTransport is the socket-based implementation of the RMI Transport 82 * abstraction. 83 * 84 * @author Ann Wollrath 85 * @author Peter Jones 86 */ 87 @SuppressWarnings("deprecation") 88 public class TCPTransport extends Transport { 89 90 /* tcp package log */ 91 static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp", 92 LogStream.parseLevel(AccessController.doPrivileged( 93 new GetPropertyAction("sun.rmi.transport.tcp.logLevel")))); 94 95 /** maximum number of connection handler threads */ 96 private static final int maxConnectionThreads = // default no limit 97 AccessController.doPrivileged( 98 new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads", 99 Integer.MAX_VALUE)); 100 101 /** keep alive time for idle connection handler threads */ 102 private static final long threadKeepAliveTime = // default 1 minute 103 AccessController.doPrivileged( 104 new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime", 105 60000)); 106 107 /** thread pool for connection handlers */ 108 private static final ExecutorService connectionThreadPool = 109 new ThreadPoolExecutor(0, maxConnectionThreads, 110 threadKeepAliveTime, TimeUnit.MILLISECONDS, 111 new SynchronousQueue<Runnable>(), 112 new ThreadFactory() { 113 public Thread newThread(Runnable runnable) { 114 return AccessController.doPrivileged(new NewThreadAction( 115 runnable, "TCP Connection(idle)", true, true)); 116 } 117 }); 118 119 /** total connections handled */ 120 private static final AtomicInteger connectionCount = new AtomicInteger(0); 121 122 /** client host for the current thread's connection */ 123 private static final ThreadLocal<ConnectionHandler> 124 threadConnectionHandler = new ThreadLocal<>(); 125 126 /** endpoints for this transport */ 127 private final LinkedList<TCPEndpoint> epList; 128 /** number of objects exported on this transport */ 129 private int exportCount = 0; 130 /** server socket for this transport */ 131 private ServerSocket server = null; 132 /** table mapping endpoints to channels */ 133 private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable = 134 new WeakHashMap<>(); 135 136 static final RMISocketFactory defaultSocketFactory = 137 RMISocketFactory.getDefaultSocketFactory(); 138 139 /** number of milliseconds in accepted-connection timeout. 140 * Warning: this should be greater than 15 seconds (the client-side 141 * timeout), and defaults to 2 hours. 142 * The maximum representable value is slightly more than 24 days 143 * and 20 hours. 144 */ 145 private static final int connectionReadTimeout = // default 2 hours 146 AccessController.doPrivileged( 147 new GetIntegerAction("sun.rmi.transport.tcp.readTimeout", 148 2 * 3600 * 1000)); 149 150 /** 151 * Constructs a TCPTransport. 152 */ 153 TCPTransport(LinkedList<TCPEndpoint> epList) { 154 // assert ((epList.size() != null) && (epList.size() >= 1)) 155 this.epList = epList; 156 if (tcpLog.isLoggable(Log.BRIEF)) { 157 tcpLog.log(Log.BRIEF, "Version = " + 158 TransportConstants.Version + ", ep = " + getEndpoint()); 159 } 160 } 161 162 /** 163 * Closes all cached connections in every channel subordinated to this 164 * transport. Currently, this only closes outgoing connections. 165 */ 166 public void shedConnectionCaches() { 167 List<TCPChannel> channels; 168 synchronized (channelTable) { 169 channels = new ArrayList<TCPChannel>(channelTable.values().size()); 170 for (Reference<TCPChannel> ref : channelTable.values()) { 171 TCPChannel ch = ref.get(); 172 if (ch != null) { 173 channels.add(ch); 174 } 175 } 176 } 177 for (TCPChannel channel : channels) { 178 channel.shedCache(); 179 } 180 } 181 182 /** 183 * Returns a <I>Channel</I> that generates connections to the 184 * endpoint <I>ep</I>. A Channel is an object that creates and 185 * manages connections of a particular type to some particular 186 * address space. 187 * @param ep the endpoint to which connections will be generated. 188 * @return the channel or null if the transport cannot 189 * generate connections to this endpoint 190 */ 191 public TCPChannel getChannel(Endpoint ep) { 192 TCPChannel ch = null; 193 if (ep instanceof TCPEndpoint) { 194 synchronized (channelTable) { 195 Reference<TCPChannel> ref = channelTable.get(ep); 196 if (ref != null) { 197 ch = ref.get(); 198 } 199 if (ch == null) { 200 TCPEndpoint tcpEndpoint = (TCPEndpoint) ep; 201 ch = new TCPChannel(this, tcpEndpoint); 202 channelTable.put(tcpEndpoint, 203 new WeakReference<TCPChannel>(ch)); 204 } 205 } 206 } 207 return ch; 208 } 209 210 /** 211 * Removes the <I>Channel</I> that generates connections to the 212 * endpoint <I>ep</I>. 213 */ 214 public void free(Endpoint ep) { 215 if (ep instanceof TCPEndpoint) { 216 synchronized (channelTable) { 217 Reference<TCPChannel> ref = channelTable.remove(ep); 218 if (ref != null) { 219 TCPChannel channel = ref.get(); 220 if (channel != null) { 221 channel.shedCache(); 222 } 223 } 224 } 225 } 226 } 227 228 /** 229 * Export the object so that it can accept incoming calls. 230 */ 231 public void exportObject(Target target) throws RemoteException { 232 /* 233 * Ensure that a server socket is listening, and count this 234 * export while synchronized to prevent the server socket from 235 * being closed due to concurrent unexports. 236 */ 237 synchronized (this) { 238 listen(); 239 exportCount++; 240 } 241 242 /* 243 * Try to add the Target to the exported object table; keep 244 * counting this export (to keep server socket open) only if 245 * that succeeds. 246 */ 247 boolean ok = false; 248 try { 249 super.exportObject(target); 250 ok = true; 251 } finally { 252 if (!ok) { 253 synchronized (this) { 254 decrementExportCount(); 255 } 256 } 257 } 258 } 259 260 protected synchronized void targetUnexported() { 261 decrementExportCount(); 262 } 263 264 /** 265 * Decrements the count of exported objects, closing the current 266 * server socket if the count reaches zero. 267 **/ 268 private void decrementExportCount() { 269 assert Thread.holdsLock(this); 270 exportCount--; 271 if (exportCount == 0 && getEndpoint().getListenPort() != 0) { 272 ServerSocket ss = server; 273 server = null; 274 try { 275 ss.close(); 276 } catch (IOException e) { 277 } 278 } 279 } 280 281 /** 282 * Verify that the current access control context has permission to 283 * accept the connection being dispatched by the current thread. 284 */ 285 protected void checkAcceptPermission(AccessControlContext acc) { 286 SecurityManager sm = System.getSecurityManager(); 287 if (sm == null) { 288 return; 289 } 290 ConnectionHandler h = threadConnectionHandler.get(); 291 if (h == null) { 292 throw new Error( 293 "checkAcceptPermission not in ConnectionHandler thread"); 294 } 295 h.checkAcceptPermission(sm, acc); 296 } 297 298 private TCPEndpoint getEndpoint() { 299 synchronized (epList) { 300 return epList.getLast(); 301 } 302 } 303 304 /** 305 * Listen on transport's endpoint. 306 */ 307 private void listen() throws RemoteException { 308 assert Thread.holdsLock(this); 309 TCPEndpoint ep = getEndpoint(); 310 int port = ep.getPort(); 311 312 if (server == null) { 313 if (tcpLog.isLoggable(Log.BRIEF)) { 314 tcpLog.log(Log.BRIEF, 315 "(port " + port + ") create server socket"); 316 } 317 318 try { 319 server = ep.newServerSocket(); 320 /* 321 * Don't retry ServerSocket if creation fails since 322 * "port in use" will cause export to hang if an 323 * RMIFailureHandler is not installed. 324 */ 325 Thread t = AccessController.doPrivileged( 326 new NewThreadAction(new AcceptLoop(server), 327 "TCP Accept-" + port, true)); 328 t.start(); 329 } catch (java.net.BindException e) { 330 throw new ExportException("Port already in use: " + port, e); 331 } catch (IOException e) { 332 throw new ExportException("Listen failed on port: " + port, e); 333 } 334 335 } else { 336 // otherwise verify security access to existing server socket 337 SecurityManager sm = System.getSecurityManager(); 338 if (sm != null) { 339 sm.checkListen(port); 340 } 341 } 342 } 343 344 /** 345 * Worker for accepting connections from a server socket. 346 **/ 347 private class AcceptLoop implements Runnable { 348 349 private final ServerSocket serverSocket; 350 351 // state for throttling loop on exceptions (local to accept thread) 352 private long lastExceptionTime = 0L; 353 private int recentExceptionCount; 354 355 AcceptLoop(ServerSocket serverSocket) { 356 this.serverSocket = serverSocket; 357 } 358 359 public void run() { 360 try { 361 executeAcceptLoop(); 362 } finally { 363 try { 364 /* 365 * Only one accept loop is started per server 366 * socket, so after no more connections will be 367 * accepted, ensure that the server socket is no 368 * longer listening. 369 */ 370 serverSocket.close(); 371 } catch (IOException e) { 372 } 373 } 374 } 375 376 /** 377 * Accepts connections from the server socket and executes 378 * handlers for them in the thread pool. 379 **/ 380 private void executeAcceptLoop() { 381 if (tcpLog.isLoggable(Log.BRIEF)) { 382 tcpLog.log(Log.BRIEF, "listening on port " + 383 getEndpoint().getPort()); 384 } 385 386 while (true) { 387 Socket socket = null; 388 try { 389 socket = serverSocket.accept(); 390 391 /* 392 * Find client host name (or "0.0.0.0" if unknown) 393 */ 394 InetAddress clientAddr = socket.getInetAddress(); 395 String clientHost = (clientAddr != null 396 ? clientAddr.getHostAddress() 397 : "0.0.0.0"); 398 399 /* 400 * Execute connection handler in the thread pool, 401 * which uses non-system threads. 402 */ 403 try { 404 connectionThreadPool.execute( 405 new ConnectionHandler(socket, clientHost)); 406 } catch (RejectedExecutionException e) { 407 closeSocket(socket); 408 tcpLog.log(Log.BRIEF, 409 "rejected connection from " + clientHost); 410 } 411 412 } catch (Throwable t) { 413 try { 414 /* 415 * If the server socket has been closed, such 416 * as because there are no more exported 417 * objects, then we expect accept to throw an 418 * exception, so just terminate normally. 419 */ 420 if (serverSocket.isClosed()) { 421 break; 422 } 423 424 try { 425 if (tcpLog.isLoggable(Level.WARNING)) { 426 tcpLog.log(Level.WARNING, 427 "accept loop for " + serverSocket + 428 " throws", t); 429 } 430 } catch (Throwable tt) { 431 } 432 } finally { 433 /* 434 * Always close the accepted socket (if any) 435 * if an exception occurs, but only after 436 * logging an unexpected exception. 437 */ 438 if (socket != null) { 439 closeSocket(socket); 440 } 441 } 442 443 /* 444 * In case we're running out of file descriptors, 445 * release resources held in caches. 446 */ 447 if (!(t instanceof SecurityException)) { 448 try { 449 TCPEndpoint.shedConnectionCaches(); 450 } catch (Throwable tt) { 451 } 452 } 453 454 /* 455 * A NoClassDefFoundError can occur if no file 456 * descriptors are available, in which case this 457 * loop should not terminate. 458 */ 459 if (t instanceof Exception || 460 t instanceof OutOfMemoryError || 461 t instanceof NoClassDefFoundError) 462 { 463 if (!continueAfterAcceptFailure(t)) { 464 return; 465 } 466 // continue loop 467 } else if (t instanceof Error) { 468 throw (Error) t; 469 } else { 470 throw new UndeclaredThrowableException(t); 471 } 472 } 473 } 474 } 475 476 /** 477 * Returns true if the accept loop should continue after the 478 * specified exception has been caught, or false if the accept 479 * loop should terminate (closing the server socket). If 480 * there is an RMIFailureHandler, this method returns the 481 * result of passing the specified exception to it; otherwise, 482 * this method always returns true, after sleeping to throttle 483 * the accept loop if necessary. 484 **/ 485 private boolean continueAfterAcceptFailure(Throwable t) { 486 RMIFailureHandler fh = RMISocketFactory.getFailureHandler(); 487 if (fh != null) { 488 return fh.failure(t instanceof Exception ? (Exception) t : 489 new InvocationTargetException(t)); 490 } else { 491 throttleLoopOnException(); 492 return true; 493 } 494 } 495 496 /** 497 * Throttles the accept loop after an exception has been 498 * caught: if a burst of 10 exceptions in 5 seconds occurs, 499 * then wait for 10 seconds to curb busy CPU usage. 500 **/ 501 private void throttleLoopOnException() { 502 long now = System.currentTimeMillis(); 503 if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) { 504 // last exception was long ago (or this is the first) 505 lastExceptionTime = now; 506 recentExceptionCount = 0; 507 } else { 508 // exception burst window was started recently 509 if (++recentExceptionCount >= 10) { 510 try { 511 Thread.sleep(10000); 512 } catch (InterruptedException ignore) { 513 } 514 } 515 } 516 } 517 } 518 519 /** close socket and eat exception */ 520 private static void closeSocket(Socket sock) { 521 try { 522 sock.close(); 523 } catch (IOException ex) { 524 // eat exception 525 } 526 } 527 528 /** 529 * handleMessages decodes transport operations and handles messages 530 * appropriately. If an exception occurs during message handling, 531 * the socket is closed. 532 */ 533 void handleMessages(Connection conn, boolean persistent) { 534 int port = getEndpoint().getPort(); 535 536 try { 537 DataInputStream in = new DataInputStream(conn.getInputStream()); 538 do { 539 int op = in.read(); // transport op 540 if (op == -1) { 541 if (tcpLog.isLoggable(Log.BRIEF)) { 542 tcpLog.log(Log.BRIEF, "(port " + 543 port + ") connection closed"); 544 } 545 break; 546 } 547 548 if (tcpLog.isLoggable(Log.BRIEF)) { 549 tcpLog.log(Log.BRIEF, "(port " + port + 550 ") op = " + op); 551 } 552 553 switch (op) { 554 case TransportConstants.Call: 555 // service incoming RMI call 556 RemoteCall call = new StreamRemoteCall(conn); 557 if (serviceCall(call) == false) 558 return; 559 break; 560 561 case TransportConstants.Ping: 562 // send ack for ping 563 DataOutputStream out = 564 new DataOutputStream(conn.getOutputStream()); 565 out.writeByte(TransportConstants.PingAck); 566 conn.releaseOutputStream(); 567 break; 568 569 case TransportConstants.DGCAck: 570 DGCAckHandler.received(UID.read(in)); 571 break; 572 573 default: 574 throw new IOException("unknown transport op " + op); 575 } 576 } while (persistent); 577 578 } catch (IOException e) { 579 // exception during processing causes connection to close (below) 580 if (tcpLog.isLoggable(Log.BRIEF)) { 581 tcpLog.log(Log.BRIEF, "(port " + port + 582 ") exception: ", e); 583 } 584 } finally { 585 try { 586 conn.close(); 587 } catch (IOException ex) { 588 // eat exception 589 } 590 } 591 } 592 593 /** 594 * Returns the client host for the current thread's connection. Throws 595 * ServerNotActiveException if no connection is active for this thread. 596 */ 597 public static String getClientHost() throws ServerNotActiveException { 598 ConnectionHandler h = threadConnectionHandler.get(); 599 if (h != null) { 600 return h.getClientHost(); 601 } else { 602 throw new ServerNotActiveException("not in a remote call"); 603 } 604 } 605 606 /** 607 * Services messages on accepted connection 608 */ 609 private class ConnectionHandler implements Runnable { 610 611 /** int value of "POST" in ASCII (Java's specified data formats 612 * make this once-reviled tactic again socially acceptable) */ 613 private static final int POST = 0x504f5354; 614 615 /** most recently accept-authorized AccessControlContext */ 616 private AccessControlContext okContext; 617 /** cache of accept-authorized AccessControlContexts */ 618 private Map<AccessControlContext, 619 Reference<AccessControlContext>> authCache; 620 /** security manager which authorized contexts in authCache */ 621 private SecurityManager cacheSecurityManager = null; 622 623 private Socket socket; 624 private String remoteHost; 625 626 ConnectionHandler(Socket socket, String remoteHost) { 627 this.socket = socket; 628 this.remoteHost = remoteHost; 629 } 630 631 String getClientHost() { 632 return remoteHost; 633 } 634 635 /** 636 * Verify that the given AccessControlContext has permission to 637 * accept this connection. 638 */ 639 void checkAcceptPermission(SecurityManager sm, 640 AccessControlContext acc) 641 { 642 /* 643 * Note: no need to synchronize on cache-related fields, since this 644 * method only gets called from the ConnectionHandler's thread. 645 */ 646 if (sm != cacheSecurityManager) { 647 okContext = null; 648 authCache = new WeakHashMap<AccessControlContext, 649 Reference<AccessControlContext>>(); 650 cacheSecurityManager = sm; 651 } 652 if (acc.equals(okContext) || authCache.containsKey(acc)) { 653 return; 654 } 655 InetAddress addr = socket.getInetAddress(); 656 String host = (addr != null) ? addr.getHostAddress() : "*"; 657 658 sm.checkAccept(host, socket.getPort()); 659 660 authCache.put(acc, new SoftReference<AccessControlContext>(acc)); 661 okContext = acc; 662 } 663 664 public void run() { 665 Thread t = Thread.currentThread(); 666 String name = t.getName(); 667 try { 668 t.setName("RMI TCP Connection(" + 669 connectionCount.incrementAndGet() + 670 ")-" + remoteHost); 671 run0(); 672 } finally { 673 t.setName(name); 674 } 675 } 676 677 private void run0() { 678 TCPEndpoint endpoint = getEndpoint(); 679 int port = endpoint.getPort(); 680 681 threadConnectionHandler.set(this); 682 683 // set socket to disable Nagle's algorithm (always send 684 // immediately) 685 // TBD: should this be left up to socket factory instead? 686 try { 687 socket.setTcpNoDelay(true); 688 } catch (Exception e) { 689 // if we fail to set this, ignore and proceed anyway 690 } 691 // set socket to timeout after excessive idle time 692 try { 693 if (connectionReadTimeout > 0) 694 socket.setSoTimeout(connectionReadTimeout); 695 } catch (Exception e) { 696 // too bad, continue anyway 697 } 698 699 try { 700 InputStream sockIn = socket.getInputStream(); 701 InputStream bufIn = sockIn.markSupported() 702 ? sockIn 703 : new BufferedInputStream(sockIn); 704 705 // Read magic (or HTTP wrapper) 706 bufIn.mark(4); 707 DataInputStream in = new DataInputStream(bufIn); 708 int magic = in.readInt(); 709 710 if (magic == POST) { 711 tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call"); 712 713 // It's really a HTTP-wrapped request. Repackage 714 // the socket in a HttpReceiveSocket, reinitialize 715 // sockIn and in, and reread magic. 716 bufIn.reset(); // unread "POST" 717 718 try { 719 socket = new HttpReceiveSocket(socket, bufIn, null); 720 remoteHost = "0.0.0.0"; 721 sockIn = socket.getInputStream(); 722 bufIn = new BufferedInputStream(sockIn); 723 in = new DataInputStream(bufIn); 724 magic = in.readInt(); 725 726 } catch (IOException e) { 727 throw new RemoteException("Error HTTP-unwrapping call", 728 e); 729 } 730 } 731 // bufIn's mark will invalidate itself when it overflows 732 // so it doesn't have to be turned off 733 734 // read and verify transport header 735 short version = in.readShort(); 736 if (magic != TransportConstants.Magic || 737 version != TransportConstants.Version) { 738 // protocol mismatch detected... 739 // just close socket: this would recurse if we marshal an 740 // exception to the client and the protocol at other end 741 // doesn't match. 742 closeSocket(socket); 743 return; 744 } 745 746 OutputStream sockOut = socket.getOutputStream(); 747 BufferedOutputStream bufOut = 748 new BufferedOutputStream(sockOut); 749 DataOutputStream out = new DataOutputStream(bufOut); 750 751 int remotePort = socket.getPort(); 752 753 if (tcpLog.isLoggable(Log.BRIEF)) { 754 tcpLog.log(Log.BRIEF, "accepted socket from [" + 755 remoteHost + ":" + remotePort + "]"); 756 } 757 758 TCPEndpoint ep; 759 TCPChannel ch; 760 TCPConnection conn; 761 762 // send ack (or nack) for protocol 763 byte protocol = in.readByte(); 764 switch (protocol) { 765 case TransportConstants.SingleOpProtocol: 766 // no ack for protocol 767 768 // create dummy channel for receiving messages 769 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 770 endpoint.getClientSocketFactory(), 771 endpoint.getServerSocketFactory()); 772 ch = new TCPChannel(TCPTransport.this, ep); 773 conn = new TCPConnection(ch, socket, bufIn, bufOut); 774 775 // read input messages 776 handleMessages(conn, false); 777 break; 778 779 case TransportConstants.StreamProtocol: 780 // send ack 781 out.writeByte(TransportConstants.ProtocolAck); 782 783 // suggest endpoint (in case client doesn't know host name) 784 if (tcpLog.isLoggable(Log.VERBOSE)) { 785 tcpLog.log(Log.VERBOSE, "(port " + port + 786 ") " + "suggesting " + remoteHost + ":" + 787 remotePort); 788 } 789 790 out.writeUTF(remoteHost); 791 out.writeInt(remotePort); 792 out.flush(); 793 794 // read and discard (possibly bogus) endpoint 795 // REMIND: would be faster to read 2 bytes then skip N+4 796 String clientHost = in.readUTF(); 797 int clientPort = in.readInt(); 798 if (tcpLog.isLoggable(Log.VERBOSE)) { 799 tcpLog.log(Log.VERBOSE, "(port " + port + 800 ") client using " + clientHost + ":" + clientPort); 801 } 802 803 // create dummy channel for receiving messages 804 // (why not use clientHost and clientPort?) 805 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 806 endpoint.getClientSocketFactory(), 807 endpoint.getServerSocketFactory()); 808 ch = new TCPChannel(TCPTransport.this, ep); 809 conn = new TCPConnection(ch, socket, bufIn, bufOut); 810 811 // read input messages 812 handleMessages(conn, true); 813 break; 814 815 case TransportConstants.MultiplexProtocol: 816 if (tcpLog.isLoggable(Log.VERBOSE)) { 817 tcpLog.log(Log.VERBOSE, "(port " + port + 818 ") accepting multiplex protocol"); 819 } 820 821 // send ack 822 out.writeByte(TransportConstants.ProtocolAck); 823 824 // suggest endpoint (in case client doesn't already have one) 825 if (tcpLog.isLoggable(Log.VERBOSE)) { 826 tcpLog.log(Log.VERBOSE, "(port " + port + 827 ") suggesting " + remoteHost + ":" + remotePort); 828 } 829 830 out.writeUTF(remoteHost); 831 out.writeInt(remotePort); 832 out.flush(); 833 834 // read endpoint client has decided to use 835 ep = new TCPEndpoint(in.readUTF(), in.readInt(), 836 endpoint.getClientSocketFactory(), 837 endpoint.getServerSocketFactory()); 838 if (tcpLog.isLoggable(Log.VERBOSE)) { 839 tcpLog.log(Log.VERBOSE, "(port " + 840 port + ") client using " + 841 ep.getHost() + ":" + ep.getPort()); 842 } 843 844 ConnectionMultiplexer multiplexer; 845 synchronized (channelTable) { 846 // create or find channel for this endpoint 847 ch = getChannel(ep); 848 multiplexer = 849 new ConnectionMultiplexer(ch, bufIn, sockOut, 850 false); 851 ch.useMultiplexer(multiplexer); 852 } 853 multiplexer.run(); 854 break; 855 856 default: 857 // protocol not understood, send nack and close socket 858 out.writeByte(TransportConstants.ProtocolNack); 859 out.flush(); 860 break; 861 } 862 863 } catch (IOException e) { 864 // socket in unknown state: destroy socket 865 tcpLog.log(Log.BRIEF, "terminated with exception:", e); 866 } finally { 867 closeSocket(socket); 868 } 869 } 870 } 871 }