1 /* 2 * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.rmi.transport.tcp; 26 27 import java.lang.ref.Reference; 28 import java.lang.ref.SoftReference; 29 import java.lang.ref.WeakReference; 30 import java.lang.reflect.InvocationTargetException; 31 import java.io.DataInputStream; 32 import java.io.DataOutputStream; 33 import java.io.IOException; 34 import java.io.InputStream; 35 import java.io.OutputStream; 36 import java.io.BufferedInputStream; 37 import java.io.BufferedOutputStream; 38 import java.net.InetAddress; 39 import java.net.ServerSocket; 40 import java.net.Socket; 41 import java.rmi.RemoteException; 42 import java.rmi.server.ExportException; 43 import java.rmi.server.LogStream; 44 import java.rmi.server.RMIFailureHandler; 45 import java.rmi.server.RMISocketFactory; 46 import java.rmi.server.RemoteCall; 47 import java.rmi.server.ServerNotActiveException; 48 import java.rmi.server.UID; 49 import java.security.AccessControlContext; 50 import java.security.AccessController; 51 import java.util.ArrayList; 52 import java.util.LinkedList; 53 import java.util.List; 54 import java.util.Map; 55 import java.util.WeakHashMap; 56 import java.util.logging.Level; 57 import java.util.concurrent.ExecutorService; 58 import java.util.concurrent.RejectedExecutionException; 59 import java.util.concurrent.SynchronousQueue; 60 import java.util.concurrent.ThreadFactory; 61 import java.util.concurrent.ThreadPoolExecutor; 62 import java.util.concurrent.TimeUnit; 63 import java.util.concurrent.atomic.AtomicInteger; 64 import sun.rmi.runtime.Log; 65 import sun.rmi.runtime.NewThreadAction; 66 import sun.rmi.transport.Channel; 67 import sun.rmi.transport.Connection; 68 import sun.rmi.transport.DGCAckHandler; 69 import sun.rmi.transport.Endpoint; 70 import sun.rmi.transport.StreamRemoteCall; 71 import sun.rmi.transport.Target; 72 import sun.rmi.transport.Transport; 73 import sun.rmi.transport.TransportConstants; 74 import sun.rmi.transport.proxy.HttpReceiveSocket; 75 import sun.security.action.GetIntegerAction; 76 import sun.security.action.GetLongAction; 77 import sun.security.action.GetPropertyAction; 78 79 /** 80 * TCPTransport is the socket-based implementation of the RMI Transport 81 * abstraction. 82 * 83 * @author Ann Wollrath 84 * @author Peter Jones 85 */ 86 public class TCPTransport extends Transport { 87 88 /* tcp package log */ 89 static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp", 90 LogStream.parseLevel(AccessController.doPrivileged( 91 new GetPropertyAction("sun.rmi.transport.tcp.logLevel")))); 92 93 /** maximum number of connection handler threads */ 94 private static final int maxConnectionThreads = // default no limit 95 AccessController.doPrivileged( 96 new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads", 97 Integer.MAX_VALUE)); 98 99 /** keep alive time for idle connection handler threads */ 100 private static final long threadKeepAliveTime = // default 1 minute 101 AccessController.doPrivileged( 102 new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime", 103 60000)); 104 105 /** thread pool for connection handlers */ 106 private static final ExecutorService connectionThreadPool = 107 new ThreadPoolExecutor(0, maxConnectionThreads, 108 threadKeepAliveTime, TimeUnit.MILLISECONDS, 109 new SynchronousQueue<Runnable>(), 110 new ThreadFactory() { 111 public Thread newThread(Runnable runnable) { 112 return AccessController.doPrivileged(new NewThreadAction( 113 runnable, "TCP Connection(idle)", true, true)); 114 } 115 }); 116 117 /** total connections handled */ 118 private static final AtomicInteger connectionCount = new AtomicInteger(0); 119 120 /** client host for the current thread's connection */ 121 private static final ThreadLocal<ConnectionHandler> 122 threadConnectionHandler = new ThreadLocal<>(); 123 124 /** endpoints for this transport */ 125 private final LinkedList<TCPEndpoint> epList; 126 /** number of objects exported on this transport */ 127 private int exportCount = 0; 128 /** server socket for this transport */ 129 private ServerSocket server = null; 130 /** table mapping endpoints to channels */ 131 private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable = 132 new WeakHashMap<>(); 133 134 static final RMISocketFactory defaultSocketFactory = 135 RMISocketFactory.getDefaultSocketFactory(); 136 137 /** number of milliseconds in accepted-connection timeout. 138 * Warning: this should be greater than 15 seconds (the client-side 139 * timeout), and defaults to 2 hours. 140 * The maximum representable value is slightly more than 24 days 141 * and 20 hours. 142 */ 143 private static final int connectionReadTimeout = // default 2 hours 144 AccessController.doPrivileged( 145 new GetIntegerAction("sun.rmi.transport.tcp.readTimeout", 146 2 * 3600 * 1000)); 147 148 /** 149 * Constructs a TCPTransport. 150 */ 151 TCPTransport(LinkedList<TCPEndpoint> epList) { 152 // assert ((epList.size() != null) && (epList.size() >= 1)) 153 this.epList = epList; 154 if (tcpLog.isLoggable(Log.BRIEF)) { 155 tcpLog.log(Log.BRIEF, "Version = " + 156 TransportConstants.Version + ", ep = " + getEndpoint()); 157 } 158 } 159 160 /** 161 * Closes all cached connections in every channel subordinated to this 162 * transport. Currently, this only closes outgoing connections. 163 */ 164 public void shedConnectionCaches() { 165 List<TCPChannel> channels; 166 synchronized (channelTable) { 167 channels = new ArrayList<TCPChannel>(channelTable.values().size()); 168 for (Reference<TCPChannel> ref : channelTable.values()) { 169 TCPChannel ch = ref.get(); 170 if (ch != null) { 171 channels.add(ch); 172 } 173 } 174 } 175 for (TCPChannel channel : channels) { 176 channel.shedCache(); 177 } 178 } 179 180 /** 181 * Returns a <I>Channel</I> that generates connections to the 182 * endpoint <I>ep</I>. A Channel is an object that creates and 183 * manages connections of a particular type to some particular 184 * address space. 185 * @param ep the endpoint to which connections will be generated. 186 * @return the channel or null if the transport cannot 187 * generate connections to this endpoint 188 */ 189 public TCPChannel getChannel(Endpoint ep) { 190 TCPChannel ch = null; 191 if (ep instanceof TCPEndpoint) { 192 synchronized (channelTable) { 193 Reference<TCPChannel> ref = channelTable.get(ep); 194 if (ref != null) { 195 ch = ref.get(); 196 } 197 if (ch == null) { 198 TCPEndpoint tcpEndpoint = (TCPEndpoint) ep; 199 ch = new TCPChannel(this, tcpEndpoint); 200 channelTable.put(tcpEndpoint, 201 new WeakReference<TCPChannel>(ch)); 202 } 203 } 204 } 205 return ch; 206 } 207 208 /** 209 * Removes the <I>Channel</I> that generates connections to the 210 * endpoint <I>ep</I>. 211 */ 212 public void free(Endpoint ep) { 213 if (ep instanceof TCPEndpoint) { 214 synchronized (channelTable) { 215 Reference<TCPChannel> ref = channelTable.remove(ep); 216 if (ref != null) { 217 TCPChannel channel = ref.get(); 218 if (channel != null) { 219 channel.shedCache(); 220 } 221 } 222 } 223 } 224 } 225 226 /** 227 * Export the object so that it can accept incoming calls. 228 */ 229 public void exportObject(Target target) throws RemoteException { 230 /* 231 * Ensure that a server socket is listening, and count this 232 * export while synchronized to prevent the server socket from 233 * being closed due to concurrent unexports. 234 */ 235 synchronized (this) { 236 listen(); 237 exportCount++; 238 } 239 240 /* 241 * Try to add the Target to the exported object table; keep 242 * counting this export (to keep server socket open) only if 243 * that succeeds. 244 */ 245 boolean ok = false; 246 try { 247 super.exportObject(target); 248 ok = true; 249 } finally { 250 if (!ok) { 251 synchronized (this) { 252 decrementExportCount(); 253 } 254 } 255 } 256 } 257 258 protected synchronized void targetUnexported() { 259 decrementExportCount(); 260 } 261 262 /** 263 * Decrements the count of exported objects, closing the current 264 * server socket if the count reaches zero. 265 **/ 266 private void decrementExportCount() { 267 assert Thread.holdsLock(this); 268 exportCount--; 269 if (exportCount == 0 && getEndpoint().getListenPort() != 0) { 270 ServerSocket ss = server; 271 server = null; 272 try { 273 ss.close(); 274 } catch (IOException e) { 275 } 276 } 277 } 278 279 /** 280 * Verify that the current access control context has permission to 281 * accept the connection being dispatched by the current thread. 282 */ 283 protected void checkAcceptPermission(AccessControlContext acc) { 284 SecurityManager sm = System.getSecurityManager(); 285 if (sm == null) { 286 return; 287 } 288 ConnectionHandler h = threadConnectionHandler.get(); 289 if (h == null) { 290 throw new Error( 291 "checkAcceptPermission not in ConnectionHandler thread"); 292 } 293 h.checkAcceptPermission(sm, acc); 294 } 295 296 private TCPEndpoint getEndpoint() { 297 synchronized (epList) { 298 return epList.getLast(); 299 } 300 } 301 302 /** 303 * Listen on transport's endpoint. 304 */ 305 private void listen() throws RemoteException { 306 assert Thread.holdsLock(this); 307 TCPEndpoint ep = getEndpoint(); 308 int port = ep.getPort(); 309 310 if (server == null) { 311 if (tcpLog.isLoggable(Log.BRIEF)) { 312 tcpLog.log(Log.BRIEF, 313 "(port " + port + ") create server socket"); 314 } 315 316 try { 317 server = ep.newServerSocket(); 318 /* 319 * Don't retry ServerSocket if creation fails since 320 * "port in use" will cause export to hang if an 321 * RMIFailureHandler is not installed. 322 */ 323 Thread t = AccessController.doPrivileged( 324 new NewThreadAction(new AcceptLoop(server), 325 "TCP Accept-" + port, true)); 326 t.start(); 327 } catch (java.net.BindException e) { 328 throw new ExportException("Port already in use: " + port, e); 329 } catch (IOException e) { 330 throw new ExportException("Listen failed on port: " + port, e); 331 } 332 333 } else { 334 // otherwise verify security access to existing server socket 335 SecurityManager sm = System.getSecurityManager(); 336 if (sm != null) { 337 sm.checkListen(port); 338 } 339 } 340 } 341 342 /** 343 * Worker for accepting connections from a server socket. 344 **/ 345 private class AcceptLoop implements Runnable { 346 347 private final ServerSocket serverSocket; 348 349 // state for throttling loop on exceptions (local to accept thread) 350 private long lastExceptionTime = 0L; 351 private int recentExceptionCount; 352 353 AcceptLoop(ServerSocket serverSocket) { 354 this.serverSocket = serverSocket; 355 } 356 357 public void run() { 358 try { 359 executeAcceptLoop(); 360 } finally { 361 try { 362 /* 363 * Only one accept loop is started per server 364 * socket, so after no more connections will be 365 * accepted, ensure that the server socket is no 366 * longer listening. 367 */ 368 serverSocket.close(); 369 } catch (IOException e) { 370 } 371 } 372 } 373 374 /** 375 * Accepts connections from the server socket and executes 376 * handlers for them in the thread pool. 377 **/ 378 private void executeAcceptLoop() { 379 if (tcpLog.isLoggable(Log.BRIEF)) { 380 tcpLog.log(Log.BRIEF, "listening on port " + 381 getEndpoint().getPort()); 382 } 383 384 while (true) { 385 Socket socket = null; 386 try { 387 socket = serverSocket.accept(); 388 389 /* 390 * Find client host name (or "0.0.0.0" if unknown) 391 */ 392 InetAddress clientAddr = socket.getInetAddress(); 393 String clientHost = (clientAddr != null 394 ? clientAddr.getHostAddress() 395 : "0.0.0.0"); 396 397 /* 398 * Execute connection handler in the thread pool, 399 * which uses non-system threads. 400 */ 401 try { 402 connectionThreadPool.execute( 403 new ConnectionHandler(socket, clientHost)); 404 } catch (RejectedExecutionException e) { 405 closeSocket(socket); 406 tcpLog.log(Log.BRIEF, 407 "rejected connection from " + clientHost); 408 } 409 410 } catch (Throwable t) { 411 try { 412 /* 413 * If the server socket has been closed, such 414 * as because there are no more exported 415 * objects, then we expect accept to throw an 416 * exception, so just terminate normally. 417 */ 418 if (serverSocket.isClosed()) { 419 break; 420 } 421 422 try { 423 if (tcpLog.isLoggable(Level.WARNING)) { 424 tcpLog.log(Level.WARNING, 425 "accept loop for " + serverSocket + 426 " throws", t); 427 } 428 } catch (Throwable tt) { 429 } 430 } finally { 431 /* 432 * Always close the accepted socket (if any) 433 * if an exception occurs, but only after 434 * logging an unexpected exception. 435 */ 436 if (socket != null) { 437 closeSocket(socket); 438 } 439 } 440 441 /* 442 * In case we're running out of file descriptors, 443 * release resources held in caches. 444 */ 445 if (!(t instanceof SecurityException)) { 446 try { 447 TCPEndpoint.shedConnectionCaches(); 448 } catch (Throwable tt) { 449 } 450 } 451 452 /* 453 * A NoClassDefFoundError can occur if no file 454 * descriptors are available, in which case this 455 * loop should not terminate. 456 */ 457 if (t instanceof Exception || 458 t instanceof OutOfMemoryError || 459 t instanceof NoClassDefFoundError) 460 { 461 if (!continueAfterAcceptFailure(t)) { 462 return; 463 } 464 // continue loop 465 } else if (t instanceof Error) { 466 throw (Error) t; 467 } else { 468 throw new Error(t.getMessage(), t.getCause()); 469 } 470 } 471 } 472 } 473 474 /** 475 * Returns true if the accept loop should continue after the 476 * specified exception has been caught, or false if the accept 477 * loop should terminate (closing the server socket). If 478 * there is an RMIFailureHandler, this method returns the 479 * result of passing the specified exception to it; otherwise, 480 * this method always returns true, after sleeping to throttle 481 * the accept loop if necessary. 482 **/ 483 private boolean continueAfterAcceptFailure(Throwable t) { 484 RMIFailureHandler fh = RMISocketFactory.getFailureHandler(); 485 if (fh != null) { 486 return fh.failure(t instanceof Exception ? (Exception) t : 487 new InvocationTargetException(t)); 488 } else { 489 throttleLoopOnException(); 490 return true; 491 } 492 } 493 494 /** 495 * Throttles the accept loop after an exception has been 496 * caught: if a burst of 10 exceptions in 5 seconds occurs, 497 * then wait for 10 seconds to curb busy CPU usage. 498 **/ 499 private void throttleLoopOnException() { 500 long now = System.currentTimeMillis(); 501 if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) { 502 // last exception was long ago (or this is the first) 503 lastExceptionTime = now; 504 recentExceptionCount = 0; 505 } else { 506 // exception burst window was started recently 507 if (++recentExceptionCount >= 10) { 508 try { 509 Thread.sleep(10000); 510 } catch (InterruptedException ignore) { 511 } 512 } 513 } 514 } 515 } 516 517 /** close socket and eat exception */ 518 private static void closeSocket(Socket sock) { 519 try { 520 sock.close(); 521 } catch (IOException ex) { 522 // eat exception 523 } 524 } 525 526 /** 527 * handleMessages decodes transport operations and handles messages 528 * appropriately. If an exception occurs during message handling, 529 * the socket is closed. 530 */ 531 void handleMessages(Connection conn, boolean persistent) { 532 int port = getEndpoint().getPort(); 533 534 try { 535 DataInputStream in = new DataInputStream(conn.getInputStream()); 536 do { 537 int op = in.read(); // transport op 538 if (op == -1) { 539 if (tcpLog.isLoggable(Log.BRIEF)) { 540 tcpLog.log(Log.BRIEF, "(port " + 541 port + ") connection closed"); 542 } 543 break; 544 } 545 546 if (tcpLog.isLoggable(Log.BRIEF)) { 547 tcpLog.log(Log.BRIEF, "(port " + port + 548 ") op = " + op); 549 } 550 551 switch (op) { 552 case TransportConstants.Call: 553 // service incoming RMI call 554 RemoteCall call = new StreamRemoteCall(conn); 555 if (serviceCall(call) == false) 556 return; 557 break; 558 559 case TransportConstants.Ping: 560 // send ack for ping 561 DataOutputStream out = 562 new DataOutputStream(conn.getOutputStream()); 563 out.writeByte(TransportConstants.PingAck); 564 conn.releaseOutputStream(); 565 break; 566 567 case TransportConstants.DGCAck: 568 DGCAckHandler.received(UID.read(in)); 569 break; 570 571 default: 572 throw new IOException("unknown transport op " + op); 573 } 574 } while (persistent); 575 576 } catch (IOException e) { 577 // exception during processing causes connection to close (below) 578 if (tcpLog.isLoggable(Log.BRIEF)) { 579 tcpLog.log(Log.BRIEF, "(port " + port + 580 ") exception: ", e); 581 } 582 } finally { 583 try { 584 conn.close(); 585 } catch (IOException ex) { 586 // eat exception 587 } 588 } 589 } 590 591 /** 592 * Returns the client host for the current thread's connection. Throws 593 * ServerNotActiveException if no connection is active for this thread. 594 */ 595 public static String getClientHost() throws ServerNotActiveException { 596 ConnectionHandler h = threadConnectionHandler.get(); 597 if (h != null) { 598 return h.getClientHost(); 599 } else { 600 throw new ServerNotActiveException("not in a remote call"); 601 } 602 } 603 604 /** 605 * Services messages on accepted connection 606 */ 607 private class ConnectionHandler implements Runnable { 608 609 /** int value of "POST" in ASCII (Java's specified data formats 610 * make this once-reviled tactic again socially acceptable) */ 611 private static final int POST = 0x504f5354; 612 613 /** most recently accept-authorized AccessControlContext */ 614 private AccessControlContext okContext; 615 /** cache of accept-authorized AccessControlContexts */ 616 private Map<AccessControlContext, 617 Reference<AccessControlContext>> authCache; 618 /** security manager which authorized contexts in authCache */ 619 private SecurityManager cacheSecurityManager = null; 620 621 private Socket socket; 622 private String remoteHost; 623 624 ConnectionHandler(Socket socket, String remoteHost) { 625 this.socket = socket; 626 this.remoteHost = remoteHost; 627 } 628 629 String getClientHost() { 630 return remoteHost; 631 } 632 633 /** 634 * Verify that the given AccessControlContext has permission to 635 * accept this connection. 636 */ 637 void checkAcceptPermission(SecurityManager sm, 638 AccessControlContext acc) 639 { 640 /* 641 * Note: no need to synchronize on cache-related fields, since this 642 * method only gets called from the ConnectionHandler's thread. 643 */ 644 if (sm != cacheSecurityManager) { 645 okContext = null; 646 authCache = new WeakHashMap<AccessControlContext, 647 Reference<AccessControlContext>>(); 648 cacheSecurityManager = sm; 649 } 650 if (acc.equals(okContext) || authCache.containsKey(acc)) { 651 return; 652 } 653 InetAddress addr = socket.getInetAddress(); 654 String host = (addr != null) ? addr.getHostAddress() : "*"; 655 656 sm.checkAccept(host, socket.getPort()); 657 658 authCache.put(acc, new SoftReference<AccessControlContext>(acc)); 659 okContext = acc; 660 } 661 662 public void run() { 663 Thread t = Thread.currentThread(); 664 String name = t.getName(); 665 try { 666 t.setName("RMI TCP Connection(" + 667 connectionCount.incrementAndGet() + 668 ")-" + remoteHost); 669 run0(); 670 } finally { 671 t.setName(name); 672 } 673 } 674 675 private void run0() { 676 TCPEndpoint endpoint = getEndpoint(); 677 int port = endpoint.getPort(); 678 679 threadConnectionHandler.set(this); 680 681 // set socket to disable Nagle's algorithm (always send 682 // immediately) 683 // TBD: should this be left up to socket factory instead? 684 try { 685 socket.setTcpNoDelay(true); 686 } catch (Exception e) { 687 // if we fail to set this, ignore and proceed anyway 688 } 689 // set socket to timeout after excessive idle time 690 try { 691 if (connectionReadTimeout > 0) 692 socket.setSoTimeout(connectionReadTimeout); 693 } catch (Exception e) { 694 // too bad, continue anyway 695 } 696 697 try { 698 InputStream sockIn = socket.getInputStream(); 699 InputStream bufIn = sockIn.markSupported() 700 ? sockIn 701 : new BufferedInputStream(sockIn); 702 703 // Read magic (or HTTP wrapper) 704 bufIn.mark(4); 705 DataInputStream in = new DataInputStream(bufIn); 706 int magic = in.readInt(); 707 708 if (magic == POST) { 709 tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call"); 710 711 // It's really a HTTP-wrapped request. Repackage 712 // the socket in a HttpReceiveSocket, reinitialize 713 // sockIn and in, and reread magic. 714 bufIn.reset(); // unread "POST" 715 716 try { 717 socket = new HttpReceiveSocket(socket, bufIn, null); 718 remoteHost = "0.0.0.0"; 719 sockIn = socket.getInputStream(); 720 bufIn = new BufferedInputStream(sockIn); 721 in = new DataInputStream(bufIn); 722 magic = in.readInt(); 723 724 } catch (IOException e) { 725 throw new RemoteException("Error HTTP-unwrapping call", 726 e); 727 } 728 } 729 // bufIn's mark will invalidate itself when it overflows 730 // so it doesn't have to be turned off 731 732 // read and verify transport header 733 short version = in.readShort(); 734 if (magic != TransportConstants.Magic || 735 version != TransportConstants.Version) { 736 // protocol mismatch detected... 737 // just close socket: this would recurse if we marshal an 738 // exception to the client and the protocol at other end 739 // doesn't match. 740 closeSocket(socket); 741 return; 742 } 743 744 OutputStream sockOut = socket.getOutputStream(); 745 BufferedOutputStream bufOut = 746 new BufferedOutputStream(sockOut); 747 DataOutputStream out = new DataOutputStream(bufOut); 748 749 int remotePort = socket.getPort(); 750 751 if (tcpLog.isLoggable(Log.BRIEF)) { 752 tcpLog.log(Log.BRIEF, "accepted socket from [" + 753 remoteHost + ":" + remotePort + "]"); 754 } 755 756 TCPEndpoint ep; 757 TCPChannel ch; 758 TCPConnection conn; 759 760 // send ack (or nack) for protocol 761 byte protocol = in.readByte(); 762 switch (protocol) { 763 case TransportConstants.SingleOpProtocol: 764 // no ack for protocol 765 766 // create dummy channel for receiving messages 767 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 768 endpoint.getClientSocketFactory(), 769 endpoint.getServerSocketFactory()); 770 ch = new TCPChannel(TCPTransport.this, ep); 771 conn = new TCPConnection(ch, socket, bufIn, bufOut); 772 773 // read input messages 774 handleMessages(conn, false); 775 break; 776 777 case TransportConstants.StreamProtocol: 778 // send ack 779 out.writeByte(TransportConstants.ProtocolAck); 780 781 // suggest endpoint (in case client doesn't know host name) 782 if (tcpLog.isLoggable(Log.VERBOSE)) { 783 tcpLog.log(Log.VERBOSE, "(port " + port + 784 ") " + "suggesting " + remoteHost + ":" + 785 remotePort); 786 } 787 788 out.writeUTF(remoteHost); 789 out.writeInt(remotePort); 790 out.flush(); 791 792 // read and discard (possibly bogus) endpoint 793 // REMIND: would be faster to read 2 bytes then skip N+4 794 String clientHost = in.readUTF(); 795 int clientPort = in.readInt(); 796 if (tcpLog.isLoggable(Log.VERBOSE)) { 797 tcpLog.log(Log.VERBOSE, "(port " + port + 798 ") client using " + clientHost + ":" + clientPort); 799 } 800 801 // create dummy channel for receiving messages 802 // (why not use clientHost and clientPort?) 803 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 804 endpoint.getClientSocketFactory(), 805 endpoint.getServerSocketFactory()); 806 ch = new TCPChannel(TCPTransport.this, ep); 807 conn = new TCPConnection(ch, socket, bufIn, bufOut); 808 809 // read input messages 810 handleMessages(conn, true); 811 break; 812 813 case TransportConstants.MultiplexProtocol: 814 if (tcpLog.isLoggable(Log.VERBOSE)) { 815 tcpLog.log(Log.VERBOSE, "(port " + port + 816 ") accepting multiplex protocol"); 817 } 818 819 // send ack 820 out.writeByte(TransportConstants.ProtocolAck); 821 822 // suggest endpoint (in case client doesn't already have one) 823 if (tcpLog.isLoggable(Log.VERBOSE)) { 824 tcpLog.log(Log.VERBOSE, "(port " + port + 825 ") suggesting " + remoteHost + ":" + remotePort); 826 } 827 828 out.writeUTF(remoteHost); 829 out.writeInt(remotePort); 830 out.flush(); 831 832 // read endpoint client has decided to use 833 ep = new TCPEndpoint(in.readUTF(), in.readInt(), 834 endpoint.getClientSocketFactory(), 835 endpoint.getServerSocketFactory()); 836 if (tcpLog.isLoggable(Log.VERBOSE)) { 837 tcpLog.log(Log.VERBOSE, "(port " + 838 port + ") client using " + 839 ep.getHost() + ":" + ep.getPort()); 840 } 841 842 ConnectionMultiplexer multiplexer; 843 synchronized (channelTable) { 844 // create or find channel for this endpoint 845 ch = getChannel(ep); 846 multiplexer = 847 new ConnectionMultiplexer(ch, bufIn, sockOut, 848 false); 849 ch.useMultiplexer(multiplexer); 850 } 851 multiplexer.run(); 852 break; 853 854 default: 855 // protocol not understood, send nack and close socket 856 out.writeByte(TransportConstants.ProtocolNack); 857 out.flush(); 858 break; 859 } 860 861 } catch (IOException e) { 862 // socket in unknown state: destroy socket 863 tcpLog.log(Log.BRIEF, "terminated with exception:", e); 864 } finally { 865 closeSocket(socket); 866 } 867 } 868 } 869 }