1 /*
   2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.lang.ref.Reference;
  28 import java.lang.ref.SoftReference;
  29 import java.lang.ref.WeakReference;
  30 import java.lang.reflect.InvocationTargetException;
  31 import java.lang.reflect.UndeclaredThrowableException;
  32 import java.io.DataInputStream;
  33 import java.io.DataOutputStream;
  34 import java.io.IOException;
  35 import java.io.InputStream;
  36 import java.io.OutputStream;
  37 import java.io.BufferedInputStream;
  38 import java.io.BufferedOutputStream;
  39 import java.net.InetAddress;
  40 import java.net.ServerSocket;
  41 import java.net.Socket;
  42 import java.rmi.RemoteException;
  43 import java.rmi.server.ExportException;
  44 import java.rmi.server.LogStream;
  45 import java.rmi.server.RMIFailureHandler;
  46 import java.rmi.server.RMISocketFactory;
  47 import java.rmi.server.RemoteCall;
  48 import java.rmi.server.ServerNotActiveException;
  49 import java.rmi.server.UID;
  50 import java.security.AccessControlContext;
  51 import java.security.AccessController;
  52 import java.util.ArrayList;
  53 import java.util.LinkedList;
  54 import java.util.List;
  55 import java.util.Map;
  56 import java.util.WeakHashMap;
  57 import java.util.logging.Level;
  58 import java.util.concurrent.ExecutorService;
  59 import java.util.concurrent.RejectedExecutionException;
  60 import java.util.concurrent.SynchronousQueue;
  61 import java.util.concurrent.ThreadFactory;
  62 import java.util.concurrent.ThreadPoolExecutor;
  63 import java.util.concurrent.TimeUnit;
  64 import java.util.concurrent.atomic.AtomicInteger;
  65 import sun.rmi.runtime.Log;
  66 import sun.rmi.runtime.NewThreadAction;
  67 import sun.rmi.transport.Channel;
  68 import sun.rmi.transport.Connection;
  69 import sun.rmi.transport.DGCAckHandler;
  70 import sun.rmi.transport.Endpoint;
  71 import sun.rmi.transport.StreamRemoteCall;
  72 import sun.rmi.transport.Target;
  73 import sun.rmi.transport.Transport;
  74 import sun.rmi.transport.TransportConstants;
  75 import sun.rmi.transport.proxy.HttpReceiveSocket;
  76 import sun.security.action.GetIntegerAction;
  77 import sun.security.action.GetLongAction;
  78 import sun.security.action.GetPropertyAction;
  79 
  80 /**
  81  * TCPTransport is the socket-based implementation of the RMI Transport
  82  * abstraction.
  83  *
  84  * @author Ann Wollrath
  85  * @author Peter Jones
  86  */
  87 @SuppressWarnings("deprecation")
  88 public class TCPTransport extends Transport {
  89 
  90     /* tcp package log */
  91     static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
  92         LogStream.parseLevel(AccessController.doPrivileged(
  93             new GetPropertyAction("sun.rmi.transport.tcp.logLevel"))));
  94 
  95     /** maximum number of connection handler threads */
  96     private static final int maxConnectionThreads =     // default no limit
  97         AccessController.doPrivileged(
  98             new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads",
  99                                  Integer.MAX_VALUE));
 100 
 101     /** keep alive time for idle connection handler threads */
 102     private static final long threadKeepAliveTime =     // default 1 minute
 103         AccessController.doPrivileged(
 104             new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime",
 105                               60000));
 106 
 107     /** thread pool for connection handlers */
 108     private static final ExecutorService connectionThreadPool =
 109         new ThreadPoolExecutor(0, maxConnectionThreads,
 110             threadKeepAliveTime, TimeUnit.MILLISECONDS,
 111             new SynchronousQueue<Runnable>(),
 112             new ThreadFactory() {
 113                 public Thread newThread(Runnable runnable) {
 114                     return AccessController.doPrivileged(new NewThreadAction(
 115                         runnable, "TCP Connection(idle)", true, true));
 116                 }
 117             });
 118 
 119     /** total connections handled */
 120     private static final AtomicInteger connectionCount = new AtomicInteger(0);
 121 
 122     /** client host for the current thread's connection */
 123     private static final ThreadLocal<ConnectionHandler>
 124         threadConnectionHandler = new ThreadLocal<>();
 125 
 126     /** endpoints for this transport */
 127     private final LinkedList<TCPEndpoint> epList;
 128     /** number of objects exported on this transport */
 129     private int exportCount = 0;
 130     /** server socket for this transport */
 131     private ServerSocket server = null;
 132     /** table mapping endpoints to channels */
 133     private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
 134         new WeakHashMap<>();
 135 
 136     static final RMISocketFactory defaultSocketFactory =
 137         RMISocketFactory.getDefaultSocketFactory();
 138 
 139     /** number of milliseconds in accepted-connection timeout.
 140      * Warning: this should be greater than 15 seconds (the client-side
 141      * timeout), and defaults to 2 hours.
 142      * The maximum representable value is slightly more than 24 days
 143      * and 20 hours.
 144      */
 145     private static final int connectionReadTimeout =    // default 2 hours
 146         AccessController.doPrivileged(
 147             new GetIntegerAction("sun.rmi.transport.tcp.readTimeout",
 148                                  2 * 3600 * 1000));
 149 
 150     /**
 151      * Constructs a TCPTransport.
 152      */
 153     TCPTransport(LinkedList<TCPEndpoint> epList)  {
 154         // assert ((epList.size() != null) && (epList.size() >= 1))
 155         this.epList = epList;
 156         if (tcpLog.isLoggable(Log.BRIEF)) {
 157             tcpLog.log(Log.BRIEF, "Version = " +
 158                 TransportConstants.Version + ", ep = " + getEndpoint());
 159         }
 160     }
 161 
 162     /**
 163      * Closes all cached connections in every channel subordinated to this
 164      * transport.  Currently, this only closes outgoing connections.
 165      */
 166     public void shedConnectionCaches() {
 167         List<TCPChannel> channels;
 168         synchronized (channelTable) {
 169             channels = new ArrayList<TCPChannel>(channelTable.values().size());
 170             for (Reference<TCPChannel> ref : channelTable.values()) {
 171                 TCPChannel ch = ref.get();
 172                 if (ch != null) {
 173                     channels.add(ch);
 174                 }
 175             }
 176         }
 177         for (TCPChannel channel : channels) {
 178             channel.shedCache();
 179         }
 180     }
 181 
 182     /**
 183      * Returns a <I>Channel</I> that generates connections to the
 184      * endpoint <I>ep</I>. A Channel is an object that creates and
 185      * manages connections of a particular type to some particular
 186      * address space.
 187      * @param ep the endpoint to which connections will be generated.
 188      * @return the channel or null if the transport cannot
 189      * generate connections to this endpoint
 190      */
 191     public TCPChannel getChannel(Endpoint ep) {
 192         TCPChannel ch = null;
 193         if (ep instanceof TCPEndpoint) {
 194             synchronized (channelTable) {
 195                 Reference<TCPChannel> ref = channelTable.get(ep);
 196                 if (ref != null) {
 197                     ch = ref.get();
 198                 }
 199                 if (ch == null) {
 200                     TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
 201                     ch = new TCPChannel(this, tcpEndpoint);
 202                     channelTable.put(tcpEndpoint,
 203                                      new WeakReference<TCPChannel>(ch));
 204                 }
 205             }
 206         }
 207         return ch;
 208     }
 209 
 210     /**
 211      * Removes the <I>Channel</I> that generates connections to the
 212      * endpoint <I>ep</I>.
 213      */
 214     public void free(Endpoint ep) {
 215         if (ep instanceof TCPEndpoint) {
 216             synchronized (channelTable) {
 217                 Reference<TCPChannel> ref = channelTable.remove(ep);
 218                 if (ref != null) {
 219                     TCPChannel channel = ref.get();
 220                     if (channel != null) {
 221                         channel.shedCache();
 222                     }
 223                 }
 224             }
 225         }
 226     }
 227 
 228     /**
 229      * Export the object so that it can accept incoming calls.
 230      */
 231     public void exportObject(Target target) throws RemoteException {
 232         /*
 233          * Ensure that a server socket is listening, and count this
 234          * export while synchronized to prevent the server socket from
 235          * being closed due to concurrent unexports.
 236          */
 237         synchronized (this) {
 238             listen();
 239             exportCount++;
 240         }
 241 
 242         /*
 243          * Try to add the Target to the exported object table; keep
 244          * counting this export (to keep server socket open) only if
 245          * that succeeds.
 246          */
 247         boolean ok = false;
 248         try {
 249             super.exportObject(target);
 250             ok = true;
 251         } finally {
 252             if (!ok) {
 253                 synchronized (this) {
 254                     decrementExportCount();
 255                 }
 256             }
 257         }
 258     }
 259 
 260     protected synchronized void targetUnexported() {
 261         decrementExportCount();
 262     }
 263 
 264     /**
 265      * Decrements the count of exported objects, closing the current
 266      * server socket if the count reaches zero.
 267      **/
 268     private void decrementExportCount() {
 269         assert Thread.holdsLock(this);
 270         exportCount--;
 271         if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
 272             ServerSocket ss = server;
 273             server = null;
 274             try {
 275                 ss.close();
 276             } catch (IOException e) {
 277             }
 278         }
 279     }
 280 
 281     /**
 282      * Verify that the current access control context has permission to
 283      * accept the connection being dispatched by the current thread.
 284      */
 285     protected void checkAcceptPermission(AccessControlContext acc) {
 286         SecurityManager sm = System.getSecurityManager();
 287         if (sm == null) {
 288             return;
 289         }
 290         ConnectionHandler h = threadConnectionHandler.get();
 291         if (h == null) {
 292             throw new Error(
 293                 "checkAcceptPermission not in ConnectionHandler thread");
 294         }
 295         h.checkAcceptPermission(sm, acc);
 296     }
 297 
 298     private TCPEndpoint getEndpoint() {
 299         synchronized (epList) {
 300             return epList.getLast();
 301         }
 302     }
 303 
 304     /**
 305      * Listen on transport's endpoint.
 306      */
 307     private void listen() throws RemoteException {
 308         assert Thread.holdsLock(this);
 309         TCPEndpoint ep = getEndpoint();
 310         int port = ep.getPort();
 311 
 312         if (server == null) {
 313             if (tcpLog.isLoggable(Log.BRIEF)) {
 314                 tcpLog.log(Log.BRIEF,
 315                     "(port " + port + ") create server socket");
 316             }
 317 
 318             try {
 319                 server = ep.newServerSocket();
 320                 /*
 321                  * Don't retry ServerSocket if creation fails since
 322                  * "port in use" will cause export to hang if an
 323                  * RMIFailureHandler is not installed.
 324                  */
 325                 Thread t = AccessController.doPrivileged(
 326                     new NewThreadAction(new AcceptLoop(server),
 327                                         "TCP Accept-" + port, true));
 328                 t.start();
 329             } catch (java.net.BindException e) {
 330                 throw new ExportException("Port already in use: " + port, e);
 331             } catch (IOException e) {
 332                 throw new ExportException("Listen failed on port: " + port, e);
 333             }
 334 
 335         } else {
 336             // otherwise verify security access to existing server socket
 337             SecurityManager sm = System.getSecurityManager();
 338             if (sm != null) {
 339                 sm.checkListen(port);
 340             }
 341         }
 342     }
 343 
 344     /**
 345      * Worker for accepting connections from a server socket.
 346      **/
 347     private class AcceptLoop implements Runnable {
 348 
 349         private final ServerSocket serverSocket;
 350 
 351         // state for throttling loop on exceptions (local to accept thread)
 352         private long lastExceptionTime = 0L;
 353         private int recentExceptionCount;
 354 
 355         AcceptLoop(ServerSocket serverSocket) {
 356             this.serverSocket = serverSocket;
 357         }
 358 
 359         public void run() {
 360             try {
 361                 executeAcceptLoop();
 362             } finally {
 363                 try {
 364                     /*
 365                      * Only one accept loop is started per server
 366                      * socket, so after no more connections will be
 367                      * accepted, ensure that the server socket is no
 368                      * longer listening.
 369                      */
 370                     serverSocket.close();
 371                 } catch (IOException e) {
 372                 }
 373             }
 374         }
 375 
 376         /**
 377          * Accepts connections from the server socket and executes
 378          * handlers for them in the thread pool.
 379          **/
 380         private void executeAcceptLoop() {
 381             if (tcpLog.isLoggable(Log.BRIEF)) {
 382                 tcpLog.log(Log.BRIEF, "listening on port " +
 383                            getEndpoint().getPort());
 384             }
 385 
 386             while (true) {
 387                 Socket socket = null;
 388                 try {
 389                     socket = serverSocket.accept();
 390 
 391                     /*
 392                      * Find client host name (or "0.0.0.0" if unknown)
 393                      */
 394                     InetAddress clientAddr = socket.getInetAddress();
 395                     String clientHost = (clientAddr != null
 396                                          ? clientAddr.getHostAddress()
 397                                          : "0.0.0.0");
 398 
 399                     /*
 400                      * Execute connection handler in the thread pool,
 401                      * which uses non-system threads.
 402                      */
 403                     try {
 404                         connectionThreadPool.execute(
 405                             new ConnectionHandler(socket, clientHost));
 406                     } catch (RejectedExecutionException e) {
 407                         closeSocket(socket);
 408                         tcpLog.log(Log.BRIEF,
 409                                    "rejected connection from " + clientHost);
 410                     }
 411 
 412                 } catch (Throwable t) {
 413                     try {
 414                         /*
 415                          * If the server socket has been closed, such
 416                          * as because there are no more exported
 417                          * objects, then we expect accept to throw an
 418                          * exception, so just terminate normally.
 419                          */
 420                         if (serverSocket.isClosed()) {
 421                             break;
 422                         }
 423 
 424                         try {
 425                             if (tcpLog.isLoggable(Level.WARNING)) {
 426                                 tcpLog.log(Level.WARNING,
 427                                            "accept loop for " + serverSocket +
 428                                            " throws", t);
 429                             }
 430                         } catch (Throwable tt) {
 431                         }
 432                     } finally {
 433                         /*
 434                          * Always close the accepted socket (if any)
 435                          * if an exception occurs, but only after
 436                          * logging an unexpected exception.
 437                          */
 438                         if (socket != null) {
 439                             closeSocket(socket);
 440                         }
 441                     }
 442 
 443                     /*
 444                      * In case we're running out of file descriptors,
 445                      * release resources held in caches.
 446                      */
 447                     if (!(t instanceof SecurityException)) {
 448                         try {
 449                             TCPEndpoint.shedConnectionCaches();
 450                         } catch (Throwable tt) {
 451                         }
 452                     }
 453 
 454                     /*
 455                      * A NoClassDefFoundError can occur if no file
 456                      * descriptors are available, in which case this
 457                      * loop should not terminate.
 458                      */
 459                     if (t instanceof Exception ||
 460                         t instanceof OutOfMemoryError ||
 461                         t instanceof NoClassDefFoundError)
 462                     {
 463                         if (!continueAfterAcceptFailure(t)) {
 464                             return;
 465                         }
 466                         // continue loop
 467                     } else if (t instanceof Error) {
 468                         throw (Error) t;
 469                     } else {
 470                         throw new UndeclaredThrowableException(t);
 471                     }
 472                 }
 473             }
 474         }
 475 
 476         /**
 477          * Returns true if the accept loop should continue after the
 478          * specified exception has been caught, or false if the accept
 479          * loop should terminate (closing the server socket).  If
 480          * there is an RMIFailureHandler, this method returns the
 481          * result of passing the specified exception to it; otherwise,
 482          * this method always returns true, after sleeping to throttle
 483          * the accept loop if necessary.
 484          **/
 485         private boolean continueAfterAcceptFailure(Throwable t) {
 486             RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
 487             if (fh != null) {
 488                 return fh.failure(t instanceof Exception ? (Exception) t :
 489                                   new InvocationTargetException(t));
 490             } else {
 491                 throttleLoopOnException();
 492                 return true;
 493             }
 494         }
 495 
 496         /**
 497          * Throttles the accept loop after an exception has been
 498          * caught: if a burst of 10 exceptions in 5 seconds occurs,
 499          * then wait for 10 seconds to curb busy CPU usage.
 500          **/
 501         private void throttleLoopOnException() {
 502             long now = System.currentTimeMillis();
 503             if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
 504                 // last exception was long ago (or this is the first)
 505                 lastExceptionTime = now;
 506                 recentExceptionCount = 0;
 507             } else {
 508                 // exception burst window was started recently
 509                 if (++recentExceptionCount >= 10) {
 510                     try {
 511                         Thread.sleep(10000);
 512                     } catch (InterruptedException ignore) {
 513                     }
 514                 }
 515             }
 516         }
 517     }
 518 
 519     /** close socket and eat exception */
 520     private static void closeSocket(Socket sock) {
 521         try {
 522             sock.close();
 523         } catch (IOException ex) {
 524             // eat exception
 525         }
 526     }
 527 
 528     /**
 529      * handleMessages decodes transport operations and handles messages
 530      * appropriately.  If an exception occurs during message handling,
 531      * the socket is closed.
 532      */
 533     void handleMessages(Connection conn, boolean persistent) {
 534         int port = getEndpoint().getPort();
 535 
 536         try {
 537             DataInputStream in = new DataInputStream(conn.getInputStream());
 538             do {
 539                 int op = in.read();     // transport op
 540                 if (op == -1) {
 541                     if (tcpLog.isLoggable(Log.BRIEF)) {
 542                         tcpLog.log(Log.BRIEF, "(port " +
 543                             port + ") connection closed");
 544                     }
 545                     break;
 546                 }
 547 
 548                 if (tcpLog.isLoggable(Log.BRIEF)) {
 549                     tcpLog.log(Log.BRIEF, "(port " + port +
 550                         ") op = " + op);
 551                 }
 552 
 553                 switch (op) {
 554                 case TransportConstants.Call:
 555                     // service incoming RMI call
 556                     RemoteCall call = new StreamRemoteCall(conn);
 557                     if (serviceCall(call) == false)
 558                         return;
 559                     break;
 560 
 561                 case TransportConstants.Ping:
 562                     // send ack for ping
 563                     DataOutputStream out =
 564                         new DataOutputStream(conn.getOutputStream());
 565                     out.writeByte(TransportConstants.PingAck);
 566                     conn.releaseOutputStream();
 567                     break;
 568 
 569                 case TransportConstants.DGCAck:
 570                     DGCAckHandler.received(UID.read(in));
 571                     break;
 572 
 573                 default:
 574                     throw new IOException("unknown transport op " + op);
 575                 }
 576             } while (persistent);
 577 
 578         } catch (IOException e) {
 579             // exception during processing causes connection to close (below)
 580             if (tcpLog.isLoggable(Log.BRIEF)) {
 581                 tcpLog.log(Log.BRIEF, "(port " + port +
 582                     ") exception: ", e);
 583             }
 584         } finally {
 585             try {
 586                 conn.close();
 587             } catch (IOException ex) {
 588                 // eat exception
 589             }
 590         }
 591     }
 592 
 593     /**
 594      * Returns the client host for the current thread's connection.  Throws
 595      * ServerNotActiveException if no connection is active for this thread.
 596      */
 597     public static String getClientHost() throws ServerNotActiveException {
 598         ConnectionHandler h = threadConnectionHandler.get();
 599         if (h != null) {
 600             return h.getClientHost();
 601         } else {
 602             throw new ServerNotActiveException("not in a remote call");
 603         }
 604     }
 605 
 606     /**
 607      * Services messages on accepted connection
 608      */
 609     private class ConnectionHandler implements Runnable {
 610 
 611         /** int value of "POST" in ASCII (Java's specified data formats
 612          *  make this once-reviled tactic again socially acceptable) */
 613         private static final int POST = 0x504f5354;
 614 
 615         /** most recently accept-authorized AccessControlContext */
 616         private AccessControlContext okContext;
 617         /** cache of accept-authorized AccessControlContexts */
 618         private Map<AccessControlContext,
 619                     Reference<AccessControlContext>> authCache;
 620         /** security manager which authorized contexts in authCache */
 621         private SecurityManager cacheSecurityManager = null;
 622 
 623         private Socket socket;
 624         private String remoteHost;
 625 
 626         ConnectionHandler(Socket socket, String remoteHost) {
 627             this.socket = socket;
 628             this.remoteHost = remoteHost;
 629         }
 630 
 631         String getClientHost() {
 632             return remoteHost;
 633         }
 634 
 635         /**
 636          * Verify that the given AccessControlContext has permission to
 637          * accept this connection.
 638          */
 639         void checkAcceptPermission(SecurityManager sm,
 640                                    AccessControlContext acc)
 641         {
 642             /*
 643              * Note: no need to synchronize on cache-related fields, since this
 644              * method only gets called from the ConnectionHandler's thread.
 645              */
 646             if (sm != cacheSecurityManager) {
 647                 okContext = null;
 648                 authCache = new WeakHashMap<AccessControlContext,
 649                                             Reference<AccessControlContext>>();
 650                 cacheSecurityManager = sm;
 651             }
 652             if (acc.equals(okContext) || authCache.containsKey(acc)) {
 653                 return;
 654             }
 655             InetAddress addr = socket.getInetAddress();
 656             String host = (addr != null) ? addr.getHostAddress() : "*";
 657 
 658             sm.checkAccept(host, socket.getPort());
 659 
 660             authCache.put(acc, new SoftReference<AccessControlContext>(acc));
 661             okContext = acc;
 662         }
 663 
 664         public void run() {
 665             Thread t = Thread.currentThread();
 666             String name = t.getName();
 667             try {
 668                 t.setName("RMI TCP Connection(" +
 669                           connectionCount.incrementAndGet() +
 670                           ")-" + remoteHost);
 671                 run0();
 672             } finally {
 673                 t.setName(name);
 674             }
 675         }
 676 
 677         private void run0() {
 678             TCPEndpoint endpoint = getEndpoint();
 679             int port = endpoint.getPort();
 680 
 681             threadConnectionHandler.set(this);
 682 
 683             // set socket to disable Nagle's algorithm (always send
 684             // immediately)
 685             // TBD: should this be left up to socket factory instead?
 686             try {
 687                 socket.setTcpNoDelay(true);
 688             } catch (Exception e) {
 689                 // if we fail to set this, ignore and proceed anyway
 690             }
 691             // set socket to timeout after excessive idle time
 692             try {
 693                 if (connectionReadTimeout > 0)
 694                     socket.setSoTimeout(connectionReadTimeout);
 695             } catch (Exception e) {
 696                 // too bad, continue anyway
 697             }
 698 
 699             try {
 700                 InputStream sockIn = socket.getInputStream();
 701                 InputStream bufIn = sockIn.markSupported()
 702                         ? sockIn
 703                         : new BufferedInputStream(sockIn);
 704 
 705                 // Read magic (or HTTP wrapper)
 706                 bufIn.mark(4);
 707                 DataInputStream in = new DataInputStream(bufIn);
 708                 int magic = in.readInt();
 709 
 710                 if (magic == POST) {
 711                     tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
 712 
 713                     // It's really a HTTP-wrapped request.  Repackage
 714                     // the socket in a HttpReceiveSocket, reinitialize
 715                     // sockIn and in, and reread magic.
 716                     bufIn.reset();      // unread "POST"
 717 
 718                     try {
 719                         socket = new HttpReceiveSocket(socket, bufIn, null);
 720                         remoteHost = "0.0.0.0";
 721                         sockIn = socket.getInputStream();
 722                         bufIn = new BufferedInputStream(sockIn);
 723                         in = new DataInputStream(bufIn);
 724                         magic = in.readInt();
 725 
 726                     } catch (IOException e) {
 727                         throw new RemoteException("Error HTTP-unwrapping call",
 728                                                   e);
 729                     }
 730                 }
 731                 // bufIn's mark will invalidate itself when it overflows
 732                 // so it doesn't have to be turned off
 733 
 734                 // read and verify transport header
 735                 short version = in.readShort();
 736                 if (magic != TransportConstants.Magic ||
 737                     version != TransportConstants.Version) {
 738                     // protocol mismatch detected...
 739                     // just close socket: this would recurse if we marshal an
 740                     // exception to the client and the protocol at other end
 741                     // doesn't match.
 742                     closeSocket(socket);
 743                     return;
 744                 }
 745 
 746                 OutputStream sockOut = socket.getOutputStream();
 747                 BufferedOutputStream bufOut =
 748                     new BufferedOutputStream(sockOut);
 749                 DataOutputStream out = new DataOutputStream(bufOut);
 750 
 751                 int remotePort = socket.getPort();
 752 
 753                 if (tcpLog.isLoggable(Log.BRIEF)) {
 754                     tcpLog.log(Log.BRIEF, "accepted socket from [" +
 755                                      remoteHost + ":" + remotePort + "]");
 756                 }
 757 
 758                 TCPEndpoint ep;
 759                 TCPChannel ch;
 760                 TCPConnection conn;
 761 
 762                 // send ack (or nack) for protocol
 763                 byte protocol = in.readByte();
 764                 switch (protocol) {
 765                 case TransportConstants.SingleOpProtocol:
 766                     // no ack for protocol
 767 
 768                     // create dummy channel for receiving messages
 769                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 770                                          endpoint.getClientSocketFactory(),
 771                                          endpoint.getServerSocketFactory());
 772                     ch = new TCPChannel(TCPTransport.this, ep);
 773                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 774 
 775                     // read input messages
 776                     handleMessages(conn, false);
 777                     break;
 778 
 779                 case TransportConstants.StreamProtocol:
 780                     // send ack
 781                     out.writeByte(TransportConstants.ProtocolAck);
 782 
 783                     // suggest endpoint (in case client doesn't know host name)
 784                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 785                         tcpLog.log(Log.VERBOSE, "(port " + port +
 786                             ") " + "suggesting " + remoteHost + ":" +
 787                             remotePort);
 788                     }
 789 
 790                     out.writeUTF(remoteHost);
 791                     out.writeInt(remotePort);
 792                     out.flush();
 793 
 794                     // read and discard (possibly bogus) endpoint
 795                     // REMIND: would be faster to read 2 bytes then skip N+4
 796                     String clientHost = in.readUTF();
 797                     int    clientPort = in.readInt();
 798                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 799                         tcpLog.log(Log.VERBOSE, "(port " + port +
 800                             ") client using " + clientHost + ":" + clientPort);
 801                     }
 802 
 803                     // create dummy channel for receiving messages
 804                     // (why not use clientHost and clientPort?)
 805                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 806                                          endpoint.getClientSocketFactory(),
 807                                          endpoint.getServerSocketFactory());
 808                     ch = new TCPChannel(TCPTransport.this, ep);
 809                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 810 
 811                     // read input messages
 812                     handleMessages(conn, true);
 813                     break;
 814 
 815                 case TransportConstants.MultiplexProtocol:
 816                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 817                         tcpLog.log(Log.VERBOSE, "(port " + port +
 818                             ") accepting multiplex protocol");
 819                     }
 820 
 821                     // send ack
 822                     out.writeByte(TransportConstants.ProtocolAck);
 823 
 824                     // suggest endpoint (in case client doesn't already have one)
 825                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 826                         tcpLog.log(Log.VERBOSE, "(port " + port +
 827                             ") suggesting " + remoteHost + ":" + remotePort);
 828                     }
 829 
 830                     out.writeUTF(remoteHost);
 831                     out.writeInt(remotePort);
 832                     out.flush();
 833 
 834                     // read endpoint client has decided to use
 835                     ep = new TCPEndpoint(in.readUTF(), in.readInt(),
 836                                          endpoint.getClientSocketFactory(),
 837                                          endpoint.getServerSocketFactory());
 838                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 839                         tcpLog.log(Log.VERBOSE, "(port " +
 840                             port + ") client using " +
 841                             ep.getHost() + ":" + ep.getPort());
 842                     }
 843 
 844                     ConnectionMultiplexer multiplexer;
 845                     synchronized (channelTable) {
 846                         // create or find channel for this endpoint
 847                         ch = getChannel(ep);
 848                         multiplexer =
 849                             new ConnectionMultiplexer(ch, bufIn, sockOut,
 850                                                       false);
 851                         ch.useMultiplexer(multiplexer);
 852                     }
 853                     multiplexer.run();
 854                     break;
 855 
 856                 default:
 857                     // protocol not understood, send nack and close socket
 858                     out.writeByte(TransportConstants.ProtocolNack);
 859                     out.flush();
 860                     break;
 861                 }
 862 
 863             } catch (IOException e) {
 864                 // socket in unknown state: destroy socket
 865                 tcpLog.log(Log.BRIEF, "terminated with exception:", e);
 866             } finally {
 867                 closeSocket(socket);
 868             }
 869         }
 870     }
 871 }