1 /*
   2  * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.lang.ref.Reference;
  28 import java.lang.ref.SoftReference;
  29 import java.lang.ref.WeakReference;
  30 import java.lang.reflect.InvocationTargetException;
  31 import java.io.DataInputStream;
  32 import java.io.DataOutputStream;
  33 import java.io.IOException;
  34 import java.io.InputStream;
  35 import java.io.OutputStream;
  36 import java.io.BufferedInputStream;
  37 import java.io.BufferedOutputStream;
  38 import java.net.InetAddress;
  39 import java.net.ServerSocket;
  40 import java.net.Socket;
  41 import java.rmi.RemoteException;
  42 import java.rmi.server.ExportException;
  43 import java.rmi.server.LogStream;
  44 import java.rmi.server.RMIFailureHandler;
  45 import java.rmi.server.RMISocketFactory;
  46 import java.rmi.server.RemoteCall;
  47 import java.rmi.server.ServerNotActiveException;
  48 import java.rmi.server.UID;
  49 import java.security.AccessControlContext;
  50 import java.security.AccessController;
  51 import java.util.ArrayList;
  52 import java.util.LinkedList;
  53 import java.util.List;
  54 import java.util.Map;
  55 import java.util.WeakHashMap;
  56 import java.util.logging.Level;
  57 import java.util.concurrent.ExecutorService;
  58 import java.util.concurrent.RejectedExecutionException;
  59 import java.util.concurrent.SynchronousQueue;
  60 import java.util.concurrent.ThreadFactory;
  61 import java.util.concurrent.ThreadPoolExecutor;
  62 import java.util.concurrent.TimeUnit;
  63 import java.util.concurrent.atomic.AtomicInteger;
  64 import sun.rmi.runtime.Log;
  65 import sun.rmi.runtime.NewThreadAction;
  66 import sun.rmi.transport.Channel;
  67 import sun.rmi.transport.Connection;
  68 import sun.rmi.transport.DGCAckHandler;
  69 import sun.rmi.transport.Endpoint;
  70 import sun.rmi.transport.StreamRemoteCall;
  71 import sun.rmi.transport.Target;
  72 import sun.rmi.transport.Transport;
  73 import sun.rmi.transport.TransportConstants;
  74 import sun.rmi.transport.proxy.HttpReceiveSocket;
  75 import sun.security.action.GetIntegerAction;
  76 import sun.security.action.GetLongAction;
  77 import sun.security.action.GetPropertyAction;
  78 
  79 /**
  80  * TCPTransport is the socket-based implementation of the RMI Transport
  81  * abstraction.
  82  *
  83  * @author Ann Wollrath
  84  * @author Peter Jones
  85  */
  86 public class TCPTransport extends Transport {
  87 
  88     /* tcp package log */
  89     static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
  90         LogStream.parseLevel(AccessController.doPrivileged(
  91             new GetPropertyAction("sun.rmi.transport.tcp.logLevel"))));
  92 
  93     /** maximum number of connection handler threads */
  94     private static final int maxConnectionThreads =     // default no limit
  95         AccessController.doPrivileged(
  96             new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads",
  97                                  Integer.MAX_VALUE));
  98 
  99     /** keep alive time for idle connection handler threads */
 100     private static final long threadKeepAliveTime =     // default 1 minute
 101         AccessController.doPrivileged(
 102             new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime",
 103                               60000));
 104 
 105     /** thread pool for connection handlers */
 106     private static final ExecutorService connectionThreadPool =
 107         new ThreadPoolExecutor(0, maxConnectionThreads,
 108             threadKeepAliveTime, TimeUnit.MILLISECONDS,
 109             new SynchronousQueue<Runnable>(),
 110             new ThreadFactory() {
 111                 public Thread newThread(Runnable runnable) {
 112                     return AccessController.doPrivileged(new NewThreadAction(
 113                         runnable, "TCP Connection(idle)", true, true));
 114                 }
 115             });
 116 
 117     /** total connections handled */
 118     private static final AtomicInteger connectionCount = new AtomicInteger(0);
 119 
 120     /** client host for the current thread's connection */
 121     private static final ThreadLocal<ConnectionHandler>
 122         threadConnectionHandler = new ThreadLocal<>();
 123 
 124     /** endpoints for this transport */
 125     private final LinkedList<TCPEndpoint> epList;
 126     /** number of objects exported on this transport */
 127     private int exportCount = 0;
 128     /** server socket for this transport */
 129     private ServerSocket server = null;
 130     /** table mapping endpoints to channels */
 131     private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
 132         new WeakHashMap<>();
 133 
 134     static final RMISocketFactory defaultSocketFactory =
 135         RMISocketFactory.getDefaultSocketFactory();
 136 
 137     /** number of milliseconds in accepted-connection timeout.
 138      * Warning: this should be greater than 15 seconds (the client-side
 139      * timeout), and defaults to 2 hours.
 140      * The maximum representable value is slightly more than 24 days
 141      * and 20 hours.
 142      */
 143     private static final int connectionReadTimeout =    // default 2 hours
 144         AccessController.doPrivileged(
 145             new GetIntegerAction("sun.rmi.transport.tcp.readTimeout",
 146                                  2 * 3600 * 1000));
 147 
 148     /**
 149      * Constructs a TCPTransport.
 150      */
 151     TCPTransport(LinkedList<TCPEndpoint> epList)  {
 152         // assert ((epList.size() != null) && (epList.size() >= 1))
 153         this.epList = epList;
 154         if (tcpLog.isLoggable(Log.BRIEF)) {
 155             tcpLog.log(Log.BRIEF, "Version = " +
 156                 TransportConstants.Version + ", ep = " + getEndpoint());
 157         }
 158     }
 159 
 160     /**
 161      * Closes all cached connections in every channel subordinated to this
 162      * transport.  Currently, this only closes outgoing connections.
 163      */
 164     public void shedConnectionCaches() {
 165         List<TCPChannel> channels;
 166         synchronized (channelTable) {
 167             channels = new ArrayList<TCPChannel>(channelTable.values().size());
 168             for (Reference<TCPChannel> ref : channelTable.values()) {
 169                 TCPChannel ch = ref.get();
 170                 if (ch != null) {
 171                     channels.add(ch);
 172                 }
 173             }
 174         }
 175         for (TCPChannel channel : channels) {
 176             channel.shedCache();
 177         }
 178     }
 179 
 180     /**
 181      * Returns a <I>Channel</I> that generates connections to the
 182      * endpoint <I>ep</I>. A Channel is an object that creates and
 183      * manages connections of a particular type to some particular
 184      * address space.
 185      * @param ep the endpoint to which connections will be generated.
 186      * @return the channel or null if the transport cannot
 187      * generate connections to this endpoint
 188      */
 189     public TCPChannel getChannel(Endpoint ep) {
 190         TCPChannel ch = null;
 191         if (ep instanceof TCPEndpoint) {
 192             synchronized (channelTable) {
 193                 Reference<TCPChannel> ref = channelTable.get(ep);
 194                 if (ref != null) {
 195                     ch = ref.get();
 196                 }
 197                 if (ch == null) {
 198                     TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
 199                     ch = new TCPChannel(this, tcpEndpoint);
 200                     channelTable.put(tcpEndpoint,
 201                                      new WeakReference<TCPChannel>(ch));
 202                 }
 203             }
 204         }
 205         return ch;
 206     }
 207 
 208     /**
 209      * Removes the <I>Channel</I> that generates connections to the
 210      * endpoint <I>ep</I>.
 211      */
 212     public void free(Endpoint ep) {
 213         if (ep instanceof TCPEndpoint) {
 214             synchronized (channelTable) {
 215                 Reference<TCPChannel> ref = channelTable.remove(ep);
 216                 if (ref != null) {
 217                     TCPChannel channel = ref.get();
 218                     if (channel != null) {
 219                         channel.shedCache();
 220                     }
 221                 }
 222             }
 223         }
 224     }
 225 
 226     /**
 227      * Export the object so that it can accept incoming calls.
 228      */
 229     public void exportObject(Target target) throws RemoteException {
 230         /*
 231          * Ensure that a server socket is listening, and count this
 232          * export while synchronized to prevent the server socket from
 233          * being closed due to concurrent unexports.
 234          */
 235         synchronized (this) {
 236             listen();
 237             exportCount++;
 238         }
 239 
 240         /*
 241          * Try to add the Target to the exported object table; keep
 242          * counting this export (to keep server socket open) only if
 243          * that succeeds.
 244          */
 245         boolean ok = false;
 246         try {
 247             super.exportObject(target);
 248             ok = true;
 249         } finally {
 250             if (!ok) {
 251                 synchronized (this) {
 252                     decrementExportCount();
 253                 }
 254             }
 255         }
 256     }
 257 
 258     protected synchronized void targetUnexported() {
 259         decrementExportCount();
 260     }
 261 
 262     /**
 263      * Decrements the count of exported objects, closing the current
 264      * server socket if the count reaches zero.
 265      **/
 266     private void decrementExportCount() {
 267         assert Thread.holdsLock(this);
 268         exportCount--;
 269         if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
 270             ServerSocket ss = server;
 271             server = null;
 272             try {
 273                 ss.close();
 274             } catch (IOException e) {
 275             }
 276         }
 277     }
 278 
 279     /**
 280      * Verify that the current access control context has permission to
 281      * accept the connection being dispatched by the current thread.
 282      */
 283     protected void checkAcceptPermission(AccessControlContext acc) {
 284         SecurityManager sm = System.getSecurityManager();
 285         if (sm == null) {
 286             return;
 287         }
 288         ConnectionHandler h = threadConnectionHandler.get();
 289         if (h == null) {
 290             throw new Error(
 291                 "checkAcceptPermission not in ConnectionHandler thread");
 292         }
 293         h.checkAcceptPermission(sm, acc);
 294     }
 295 
 296     private TCPEndpoint getEndpoint() {
 297         synchronized (epList) {
 298             return epList.getLast();
 299         }
 300     }
 301 
 302     /**
 303      * Listen on transport's endpoint.
 304      */
 305     private void listen() throws RemoteException {
 306         assert Thread.holdsLock(this);
 307         TCPEndpoint ep = getEndpoint();
 308         int port = ep.getPort();
 309 
 310         if (server == null) {
 311             if (tcpLog.isLoggable(Log.BRIEF)) {
 312                 tcpLog.log(Log.BRIEF,
 313                     "(port " + port + ") create server socket");
 314             }
 315 
 316             try {
 317                 server = ep.newServerSocket();
 318                 /*
 319                  * Don't retry ServerSocket if creation fails since
 320                  * "port in use" will cause export to hang if an
 321                  * RMIFailureHandler is not installed.
 322                  */
 323                 Thread t = AccessController.doPrivileged(
 324                     new NewThreadAction(new AcceptLoop(server),
 325                                         "TCP Accept-" + port, true));
 326                 t.start();
 327             } catch (java.net.BindException e) {
 328                 throw new ExportException("Port already in use: " + port, e);
 329             } catch (IOException e) {
 330                 throw new ExportException("Listen failed on port: " + port, e);
 331             }
 332 
 333         } else {
 334             // otherwise verify security access to existing server socket
 335             SecurityManager sm = System.getSecurityManager();
 336             if (sm != null) {
 337                 sm.checkListen(port);
 338             }
 339         }
 340     }
 341 
 342     /**
 343      * Worker for accepting connections from a server socket.
 344      **/
 345     private class AcceptLoop implements Runnable {
 346 
 347         private final ServerSocket serverSocket;
 348 
 349         // state for throttling loop on exceptions (local to accept thread)
 350         private long lastExceptionTime = 0L;
 351         private int recentExceptionCount;
 352 
 353         AcceptLoop(ServerSocket serverSocket) {
 354             this.serverSocket = serverSocket;
 355         }
 356 
 357         public void run() {
 358             try {
 359                 executeAcceptLoop();
 360             } finally {
 361                 try {
 362                     /*
 363                      * Only one accept loop is started per server
 364                      * socket, so after no more connections will be
 365                      * accepted, ensure that the server socket is no
 366                      * longer listening.
 367                      */
 368                     serverSocket.close();
 369                 } catch (IOException e) {
 370                 }
 371             }
 372         }
 373 
 374         /**
 375          * Accepts connections from the server socket and executes
 376          * handlers for them in the thread pool.
 377          **/
 378         private void executeAcceptLoop() {
 379             if (tcpLog.isLoggable(Log.BRIEF)) {
 380                 tcpLog.log(Log.BRIEF, "listening on port " +
 381                            getEndpoint().getPort());
 382             }
 383 
 384             while (true) {
 385                 Socket socket = null;
 386                 try {
 387                     socket = serverSocket.accept();
 388 
 389                     /*
 390                      * Find client host name (or "0.0.0.0" if unknown)
 391                      */
 392                     InetAddress clientAddr = socket.getInetAddress();
 393                     String clientHost = (clientAddr != null
 394                                          ? clientAddr.getHostAddress()
 395                                          : "0.0.0.0");
 396 
 397                     /*
 398                      * Execute connection handler in the thread pool,
 399                      * which uses non-system threads.
 400                      */
 401                     try {
 402                         connectionThreadPool.execute(
 403                             new ConnectionHandler(socket, clientHost));
 404                     } catch (RejectedExecutionException e) {
 405                         closeSocket(socket);
 406                         tcpLog.log(Log.BRIEF,
 407                                    "rejected connection from " + clientHost);
 408                     }
 409 
 410                 } catch (Throwable t) {
 411                     try {
 412                         /*
 413                          * If the server socket has been closed, such
 414                          * as because there are no more exported
 415                          * objects, then we expect accept to throw an
 416                          * exception, so just terminate normally.
 417                          */
 418                         if (serverSocket.isClosed()) {
 419                             break;
 420                         }
 421 
 422                         try {
 423                             if (tcpLog.isLoggable(Level.WARNING)) {
 424                                 tcpLog.log(Level.WARNING,
 425                                            "accept loop for " + serverSocket +
 426                                            " throws", t);
 427                             }
 428                         } catch (Throwable tt) {
 429                         }
 430                     } finally {
 431                         /*
 432                          * Always close the accepted socket (if any)
 433                          * if an exception occurs, but only after
 434                          * logging an unexpected exception.
 435                          */
 436                         if (socket != null) {
 437                             closeSocket(socket);
 438                         }
 439                     }
 440 
 441                     /*
 442                      * In case we're running out of file descriptors,
 443                      * release resources held in caches.
 444                      */
 445                     if (!(t instanceof SecurityException)) {
 446                         try {
 447                             TCPEndpoint.shedConnectionCaches();
 448                         } catch (Throwable tt) {
 449                         }
 450                     }
 451 
 452                     /*
 453                      * A NoClassDefFoundError can occur if no file
 454                      * descriptors are available, in which case this
 455                      * loop should not terminate.
 456                      */
 457                     if (t instanceof Exception ||
 458                         t instanceof OutOfMemoryError ||
 459                         t instanceof NoClassDefFoundError)
 460                     {
 461                         if (!continueAfterAcceptFailure(t)) {
 462                             return;
 463                         }
 464                         // continue loop
 465                     } else {
 466                         throw (Error) t;
 467                     }
 468                 }
 469             }
 470         }
 471 
 472         /**
 473          * Returns true if the accept loop should continue after the
 474          * specified exception has been caught, or false if the accept
 475          * loop should terminate (closing the server socket).  If
 476          * there is an RMIFailureHandler, this method returns the
 477          * result of passing the specified exception to it; otherwise,
 478          * this method always returns true, after sleeping to throttle
 479          * the accept loop if necessary.
 480          **/
 481         private boolean continueAfterAcceptFailure(Throwable t) {
 482             RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
 483             if (fh != null) {
 484                 return fh.failure(t instanceof Exception ? (Exception) t :
 485                                   new InvocationTargetException(t));
 486             } else {
 487                 throttleLoopOnException();
 488                 return true;
 489             }
 490         }
 491 
 492         /**
 493          * Throttles the accept loop after an exception has been
 494          * caught: if a burst of 10 exceptions in 5 seconds occurs,
 495          * then wait for 10 seconds to curb busy CPU usage.
 496          **/
 497         private void throttleLoopOnException() {
 498             long now = System.currentTimeMillis();
 499             if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
 500                 // last exception was long ago (or this is the first)
 501                 lastExceptionTime = now;
 502                 recentExceptionCount = 0;
 503             } else {
 504                 // exception burst window was started recently
 505                 if (++recentExceptionCount >= 10) {
 506                     try {
 507                         Thread.sleep(10000);
 508                     } catch (InterruptedException ignore) {
 509                     }
 510                 }
 511             }
 512         }
 513     }
 514 
 515     /** close socket and eat exception */
 516     private static void closeSocket(Socket sock) {
 517         try {
 518             sock.close();
 519         } catch (IOException ex) {
 520             // eat exception
 521         }
 522     }
 523 
 524     /**
 525      * handleMessages decodes transport operations and handles messages
 526      * appropriately.  If an exception occurs during message handling,
 527      * the socket is closed.
 528      */
 529     void handleMessages(Connection conn, boolean persistent) {
 530         int port = getEndpoint().getPort();
 531 
 532         try {
 533             DataInputStream in = new DataInputStream(conn.getInputStream());
 534             do {
 535                 int op = in.read();     // transport op
 536                 if (op == -1) {
 537                     if (tcpLog.isLoggable(Log.BRIEF)) {
 538                         tcpLog.log(Log.BRIEF, "(port " +
 539                             port + ") connection closed");
 540                     }
 541                     break;
 542                 }
 543 
 544                 if (tcpLog.isLoggable(Log.BRIEF)) {
 545                     tcpLog.log(Log.BRIEF, "(port " + port +
 546                         ") op = " + op);
 547                 }
 548 
 549                 switch (op) {
 550                 case TransportConstants.Call:
 551                     // service incoming RMI call
 552                     RemoteCall call = new StreamRemoteCall(conn);
 553                     if (serviceCall(call) == false)
 554                         return;
 555                     break;
 556 
 557                 case TransportConstants.Ping:
 558                     // send ack for ping
 559                     DataOutputStream out =
 560                         new DataOutputStream(conn.getOutputStream());
 561                     out.writeByte(TransportConstants.PingAck);
 562                     conn.releaseOutputStream();
 563                     break;
 564 
 565                 case TransportConstants.DGCAck:
 566                     DGCAckHandler.received(UID.read(in));
 567                     break;
 568 
 569                 default:
 570                     throw new IOException("unknown transport op " + op);
 571                 }
 572             } while (persistent);
 573 
 574         } catch (IOException e) {
 575             // exception during processing causes connection to close (below)
 576             if (tcpLog.isLoggable(Log.BRIEF)) {
 577                 tcpLog.log(Log.BRIEF, "(port " + port +
 578                     ") exception: ", e);
 579             }
 580         } finally {
 581             try {
 582                 conn.close();
 583             } catch (IOException ex) {
 584                 // eat exception
 585             }
 586         }
 587     }
 588 
 589     /**
 590      * Returns the client host for the current thread's connection.  Throws
 591      * ServerNotActiveException if no connection is active for this thread.
 592      */
 593     public static String getClientHost() throws ServerNotActiveException {
 594         ConnectionHandler h = threadConnectionHandler.get();
 595         if (h != null) {
 596             return h.getClientHost();
 597         } else {
 598             throw new ServerNotActiveException("not in a remote call");
 599         }
 600     }
 601 
 602     /**
 603      * Services messages on accepted connection
 604      */
 605     private class ConnectionHandler implements Runnable {
 606 
 607         /** int value of "POST" in ASCII (Java's specified data formats
 608          *  make this once-reviled tactic again socially acceptable) */
 609         private static final int POST = 0x504f5354;
 610 
 611         /** most recently accept-authorized AccessControlContext */
 612         private AccessControlContext okContext;
 613         /** cache of accept-authorized AccessControlContexts */
 614         private Map<AccessControlContext,
 615                     Reference<AccessControlContext>> authCache;
 616         /** security manager which authorized contexts in authCache */
 617         private SecurityManager cacheSecurityManager = null;
 618 
 619         private Socket socket;
 620         private String remoteHost;
 621 
 622         ConnectionHandler(Socket socket, String remoteHost) {
 623             this.socket = socket;
 624             this.remoteHost = remoteHost;
 625         }
 626 
 627         String getClientHost() {
 628             return remoteHost;
 629         }
 630 
 631         /**
 632          * Verify that the given AccessControlContext has permission to
 633          * accept this connection.
 634          */
 635         void checkAcceptPermission(SecurityManager sm,
 636                                    AccessControlContext acc)
 637         {
 638             /*
 639              * Note: no need to synchronize on cache-related fields, since this
 640              * method only gets called from the ConnectionHandler's thread.
 641              */
 642             if (sm != cacheSecurityManager) {
 643                 okContext = null;
 644                 authCache = new WeakHashMap<AccessControlContext,
 645                                             Reference<AccessControlContext>>();
 646                 cacheSecurityManager = sm;
 647             }
 648             if (acc.equals(okContext) || authCache.containsKey(acc)) {
 649                 return;
 650             }
 651             InetAddress addr = socket.getInetAddress();
 652             String host = (addr != null) ? addr.getHostAddress() : "*";
 653 
 654             sm.checkAccept(host, socket.getPort());
 655 
 656             authCache.put(acc, new SoftReference<AccessControlContext>(acc));
 657             okContext = acc;
 658         }
 659 
 660         public void run() {
 661             Thread t = Thread.currentThread();
 662             String name = t.getName();
 663             try {
 664                 t.setName("RMI TCP Connection(" +
 665                           connectionCount.incrementAndGet() +
 666                           ")-" + remoteHost);
 667                 run0();
 668             } finally {
 669                 t.setName(name);
 670             }
 671         }
 672 
 673         private void run0() {
 674             TCPEndpoint endpoint = getEndpoint();
 675             int port = endpoint.getPort();
 676 
 677             threadConnectionHandler.set(this);
 678 
 679             // set socket to disable Nagle's algorithm (always send
 680             // immediately)
 681             // TBD: should this be left up to socket factory instead?
 682             try {
 683                 socket.setTcpNoDelay(true);
 684             } catch (Exception e) {
 685                 // if we fail to set this, ignore and proceed anyway
 686             }
 687             // set socket to timeout after excessive idle time
 688             try {
 689                 if (connectionReadTimeout > 0)
 690                     socket.setSoTimeout(connectionReadTimeout);
 691             } catch (Exception e) {
 692                 // too bad, continue anyway
 693             }
 694 
 695             try {
 696                 InputStream sockIn = socket.getInputStream();
 697                 InputStream bufIn = sockIn.markSupported()
 698                         ? sockIn
 699                         : new BufferedInputStream(sockIn);
 700 
 701                 // Read magic (or HTTP wrapper)
 702                 bufIn.mark(4);
 703                 DataInputStream in = new DataInputStream(bufIn);
 704                 int magic = in.readInt();
 705 
 706                 if (magic == POST) {
 707                     tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
 708 
 709                     // It's really a HTTP-wrapped request.  Repackage
 710                     // the socket in a HttpReceiveSocket, reinitialize
 711                     // sockIn and in, and reread magic.
 712                     bufIn.reset();      // unread "POST"
 713 
 714                     try {
 715                         socket = new HttpReceiveSocket(socket, bufIn, null);
 716                         remoteHost = "0.0.0.0";
 717                         sockIn = socket.getInputStream();
 718                         bufIn = new BufferedInputStream(sockIn);
 719                         in = new DataInputStream(bufIn);
 720                         magic = in.readInt();
 721 
 722                     } catch (IOException e) {
 723                         throw new RemoteException("Error HTTP-unwrapping call",
 724                                                   e);
 725                     }
 726                 }
 727                 // bufIn's mark will invalidate itself when it overflows
 728                 // so it doesn't have to be turned off
 729 
 730                 // read and verify transport header
 731                 short version = in.readShort();
 732                 if (magic != TransportConstants.Magic ||
 733                     version != TransportConstants.Version) {
 734                     // protocol mismatch detected...
 735                     // just close socket: this would recurse if we marshal an
 736                     // exception to the client and the protocol at other end
 737                     // doesn't match.
 738                     closeSocket(socket);
 739                     return;
 740                 }
 741 
 742                 OutputStream sockOut = socket.getOutputStream();
 743                 BufferedOutputStream bufOut =
 744                     new BufferedOutputStream(sockOut);
 745                 DataOutputStream out = new DataOutputStream(bufOut);
 746 
 747                 int remotePort = socket.getPort();
 748 
 749                 if (tcpLog.isLoggable(Log.BRIEF)) {
 750                     tcpLog.log(Log.BRIEF, "accepted socket from [" +
 751                                      remoteHost + ":" + remotePort + "]");
 752                 }
 753 
 754                 TCPEndpoint ep;
 755                 TCPChannel ch;
 756                 TCPConnection conn;
 757 
 758                 // send ack (or nack) for protocol
 759                 byte protocol = in.readByte();
 760                 switch (protocol) {
 761                 case TransportConstants.SingleOpProtocol:
 762                     // no ack for protocol
 763 
 764                     // create dummy channel for receiving messages
 765                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 766                                          endpoint.getClientSocketFactory(),
 767                                          endpoint.getServerSocketFactory());
 768                     ch = new TCPChannel(TCPTransport.this, ep);
 769                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 770 
 771                     // read input messages
 772                     handleMessages(conn, false);
 773                     break;
 774 
 775                 case TransportConstants.StreamProtocol:
 776                     // send ack
 777                     out.writeByte(TransportConstants.ProtocolAck);
 778 
 779                     // suggest endpoint (in case client doesn't know host name)
 780                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 781                         tcpLog.log(Log.VERBOSE, "(port " + port +
 782                             ") " + "suggesting " + remoteHost + ":" +
 783                             remotePort);
 784                     }
 785 
 786                     out.writeUTF(remoteHost);
 787                     out.writeInt(remotePort);
 788                     out.flush();
 789 
 790                     // read and discard (possibly bogus) endpoint
 791                     // REMIND: would be faster to read 2 bytes then skip N+4
 792                     String clientHost = in.readUTF();
 793                     int    clientPort = in.readInt();
 794                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 795                         tcpLog.log(Log.VERBOSE, "(port " + port +
 796                             ") client using " + clientHost + ":" + clientPort);
 797                     }
 798 
 799                     // create dummy channel for receiving messages
 800                     // (why not use clientHost and clientPort?)
 801                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 802                                          endpoint.getClientSocketFactory(),
 803                                          endpoint.getServerSocketFactory());
 804                     ch = new TCPChannel(TCPTransport.this, ep);
 805                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 806 
 807                     // read input messages
 808                     handleMessages(conn, true);
 809                     break;
 810 
 811                 case TransportConstants.MultiplexProtocol:
 812                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 813                         tcpLog.log(Log.VERBOSE, "(port " + port +
 814                             ") accepting multiplex protocol");
 815                     }
 816 
 817                     // send ack
 818                     out.writeByte(TransportConstants.ProtocolAck);
 819 
 820                     // suggest endpoint (in case client doesn't already have one)
 821                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 822                         tcpLog.log(Log.VERBOSE, "(port " + port +
 823                             ") suggesting " + remoteHost + ":" + remotePort);
 824                     }
 825 
 826                     out.writeUTF(remoteHost);
 827                     out.writeInt(remotePort);
 828                     out.flush();
 829 
 830                     // read endpoint client has decided to use
 831                     ep = new TCPEndpoint(in.readUTF(), in.readInt(),
 832                                          endpoint.getClientSocketFactory(),
 833                                          endpoint.getServerSocketFactory());
 834                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 835                         tcpLog.log(Log.VERBOSE, "(port " +
 836                             port + ") client using " +
 837                             ep.getHost() + ":" + ep.getPort());
 838                     }
 839 
 840                     ConnectionMultiplexer multiplexer;
 841                     synchronized (channelTable) {
 842                         // create or find channel for this endpoint
 843                         ch = getChannel(ep);
 844                         multiplexer =
 845                             new ConnectionMultiplexer(ch, bufIn, sockOut,
 846                                                       false);
 847                         ch.useMultiplexer(multiplexer);
 848                     }
 849                     multiplexer.run();
 850                     break;
 851 
 852                 default:
 853                     // protocol not understood, send nack and close socket
 854                     out.writeByte(TransportConstants.ProtocolNack);
 855                     out.flush();
 856                     break;
 857                 }
 858 
 859             } catch (IOException e) {
 860                 // socket in unknown state: destroy socket
 861                 tcpLog.log(Log.BRIEF, "terminated with exception:", e);
 862             } finally {
 863                 closeSocket(socket);
 864             }
 865         }
 866     }
 867 }