1 /*
   2  * Copyright (c) 1996, 2005, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.lang.ref.Reference;
  28 import java.lang.ref.SoftReference;
  29 import java.lang.ref.WeakReference;
  30 import java.lang.reflect.InvocationTargetException;
  31 import java.io.DataInputStream;
  32 import java.io.DataOutputStream;
  33 import java.io.IOException;
  34 import java.io.InputStream;
  35 import java.io.OutputStream;
  36 import java.io.BufferedInputStream;
  37 import java.io.BufferedOutputStream;
  38 import java.net.InetAddress;
  39 import java.net.ServerSocket;
  40 import java.net.Socket;
  41 import java.rmi.RemoteException;
  42 import java.rmi.server.ExportException;
  43 import java.rmi.server.LogStream;
  44 import java.rmi.server.RMIFailureHandler;
  45 import java.rmi.server.RMISocketFactory;
  46 import java.rmi.server.RemoteCall;
  47 import java.rmi.server.ServerNotActiveException;
  48 import java.rmi.server.UID;
  49 import java.security.AccessControlContext;
  50 import java.security.AccessController;
  51 import java.util.ArrayList;
  52 import java.util.LinkedList;
  53 import java.util.List;
  54 import java.util.Map;
  55 import java.util.WeakHashMap;
  56 import java.util.logging.Level;
  57 import java.util.concurrent.ExecutorService;
  58 import java.util.concurrent.RejectedExecutionException;
  59 import java.util.concurrent.SynchronousQueue;
  60 import java.util.concurrent.ThreadFactory;
  61 import java.util.concurrent.ThreadPoolExecutor;
  62 import java.util.concurrent.TimeUnit;
  63 import java.util.concurrent.atomic.AtomicInteger;
  64 import sun.rmi.runtime.Log;
  65 import sun.rmi.runtime.NewThreadAction;
  66 import sun.rmi.transport.Channel;
  67 import sun.rmi.transport.Connection;
  68 import sun.rmi.transport.DGCAckHandler;
  69 import sun.rmi.transport.Endpoint;
  70 import sun.rmi.transport.StreamRemoteCall;
  71 import sun.rmi.transport.Target;
  72 import sun.rmi.transport.Transport;
  73 import sun.rmi.transport.TransportConstants;
  74 import sun.rmi.transport.proxy.HttpReceiveSocket;
  75 import sun.security.action.GetIntegerAction;
  76 import sun.security.action.GetLongAction;
  77 import sun.security.action.GetPropertyAction;
  78 
  79 /**
  80  * TCPTransport is the socket-based implementation of the RMI Transport
  81  * abstraction.
  82  *
  83  * @author Ann Wollrath
  84  * @author Peter Jones
  85  */
  86 public class TCPTransport extends Transport {
  87 
  88     /* tcp package log */
  89     static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
  90         LogStream.parseLevel(AccessController.doPrivileged(
  91             new GetPropertyAction("sun.rmi.transport.tcp.logLevel"))));
  92 
  93     /** maximum number of connection handler threads */
  94     private static final int maxConnectionThreads =     // default no limit
  95         AccessController.doPrivileged(
  96             new GetIntegerAction("sun.rmi.transport.tcp.maxConnectionThreads",
  97                                  Integer.MAX_VALUE));
  98 
  99     /** keep alive time for idle connection handler threads */
 100     private static final long threadKeepAliveTime =     // default 1 minute
 101         AccessController.doPrivileged(
 102             new GetLongAction("sun.rmi.transport.tcp.threadKeepAliveTime",
 103                               60000));
 104 
 105     /** thread pool for connection handlers */
 106     private static final ExecutorService connectionThreadPool =
 107         new ThreadPoolExecutor(0, maxConnectionThreads,
 108             threadKeepAliveTime, TimeUnit.MILLISECONDS,
 109             new SynchronousQueue<Runnable>(),
 110             new ThreadFactory() {
 111                 public Thread newThread(Runnable runnable) {
 112                     return AccessController.doPrivileged(new NewThreadAction(
 113                         runnable, "TCP Connection(idle)", true, true));
 114                 }
 115             });
 116 
 117     /** total connections handled */
 118     private static final AtomicInteger connectionCount = new AtomicInteger(0);
 119 
 120     /** client host for the current thread's connection */
 121     private static final ThreadLocal<ConnectionHandler>
 122         threadConnectionHandler = new ThreadLocal<>();
 123 
 124     /** endpoints for this transport */
 125     private final LinkedList<TCPEndpoint> epList;
 126     /** number of objects exported on this transport */
 127     private int exportCount = 0;
 128     /** server socket for this transport */
 129     private ServerSocket server = null;
 130     /** table mapping endpoints to channels */
 131     private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
 132         new WeakHashMap<>();
 133 
 134     static final RMISocketFactory defaultSocketFactory =
 135         RMISocketFactory.getDefaultSocketFactory();
 136 
 137     /** number of milliseconds in accepted-connection timeout.
 138      * Warning: this should be greater than 15 seconds (the client-side
 139      * timeout), and defaults to 2 hours.
 140      * The maximum representable value is slightly more than 24 days
 141      * and 20 hours.
 142      */
 143     private static final int connectionReadTimeout =    // default 2 hours
 144         AccessController.doPrivileged(
 145             new GetIntegerAction("sun.rmi.transport.tcp.readTimeout",
 146                                  2 * 3600 * 1000));
 147 
 148     /**
 149      * Constructs a TCPTransport.
 150      */
 151     TCPTransport(LinkedList<TCPEndpoint> epList)  {
 152         // assert ((epList.size() != null) && (epList.size() >= 1))
 153         this.epList = epList;
 154         if (tcpLog.isLoggable(Log.BRIEF)) {
 155             tcpLog.log(Log.BRIEF, "Version = " +
 156                 TransportConstants.Version + ", ep = " + getEndpoint());
 157         }
 158     }
 159 
 160     /**
 161      * Closes all cached connections in every channel subordinated to this
 162      * transport.  Currently, this only closes outgoing connections.
 163      */
 164     public void shedConnectionCaches() {
 165         List<TCPChannel> channels;
 166         synchronized (channelTable) {
 167             channels = new ArrayList<TCPChannel>(channelTable.values().size());
 168             for (Reference<TCPChannel> ref : channelTable.values()) {
 169                 TCPChannel ch = ref.get();
 170                 if (ch != null) {
 171                     channels.add(ch);
 172                 }
 173             }
 174         }
 175         for (TCPChannel channel : channels) {
 176             channel.shedCache();
 177         }
 178     }
 179 
 180     /**
 181      * Returns a <I>Channel</I> that generates connections to the
 182      * endpoint <I>ep</I>. A Channel is an object that creates and
 183      * manages connections of a particular type to some particular
 184      * address space.
 185      * @param ep the endpoint to which connections will be generated.
 186      * @return the channel or null if the transport cannot
 187      * generate connections to this endpoint
 188      */
 189     public TCPChannel getChannel(Endpoint ep) {
 190         TCPChannel ch = null;
 191         if (ep instanceof TCPEndpoint) {
 192             synchronized (channelTable) {
 193                 Reference<TCPChannel> ref = channelTable.get(ep);
 194                 if (ref != null) {
 195                     ch = ref.get();
 196                 }
 197                 if (ch == null) {
 198                     TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
 199                     ch = new TCPChannel(this, tcpEndpoint);
 200                     channelTable.put(tcpEndpoint,
 201                                      new WeakReference<TCPChannel>(ch));
 202                 }
 203             }
 204         }
 205         return ch;
 206     }
 207 
 208     /**
 209      * Removes the <I>Channel</I> that generates connections to the
 210      * endpoint <I>ep</I>.
 211      */
 212     public void free(Endpoint ep) {
 213         if (ep instanceof TCPEndpoint) {
 214             synchronized (channelTable) {
 215                 Reference<TCPChannel> ref = channelTable.remove(ep);
 216                 if (ref != null) {
 217                     TCPChannel channel = ref.get();
 218                     if (channel != null) {
 219                         channel.shedCache();
 220                     }
 221                 }
 222             }
 223         }
 224     }
 225 
 226     /**
 227      * Export the object so that it can accept incoming calls.
 228      */
 229     public void exportObject(Target target) throws RemoteException {
 230         /*
 231          * Ensure that a server socket is listening, and count this
 232          * export while synchronized to prevent the server socket from
 233          * being closed due to concurrent unexports.
 234          */
 235         synchronized (this) {
 236             listen();
 237             exportCount++;
 238         }
 239 
 240         /*
 241          * Try to add the Target to the exported object table; keep
 242          * counting this export (to keep server socket open) only if
 243          * that succeeds.
 244          */
 245         boolean ok = false;
 246         try {
 247             super.exportObject(target);
 248             ok = true;
 249         } finally {
 250             if (!ok) {
 251                 synchronized (this) {
 252                     decrementExportCount();
 253                 }
 254             }
 255         }
 256     }
 257 
 258     protected synchronized void targetUnexported() {
 259         decrementExportCount();
 260     }
 261 
 262     /**
 263      * Decrements the count of exported objects, closing the current
 264      * server socket if the count reaches zero.
 265      **/
 266     private void decrementExportCount() {
 267         assert Thread.holdsLock(this);
 268         exportCount--;
 269         if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
 270             ServerSocket ss = server;
 271             server = null;
 272             try {
 273                 ss.close();
 274             } catch (IOException e) {
 275             }
 276         }
 277     }
 278 
 279     /**
 280      * Verify that the current access control context has permission to
 281      * accept the connection being dispatched by the current thread.
 282      */
 283     protected void checkAcceptPermission(AccessControlContext acc) {
 284         SecurityManager sm = System.getSecurityManager();
 285         if (sm == null) {
 286             return;
 287         }
 288         ConnectionHandler h = threadConnectionHandler.get();
 289         if (h == null) {
 290             throw new Error(
 291                 "checkAcceptPermission not in ConnectionHandler thread");
 292         }
 293         h.checkAcceptPermission(sm, acc);
 294     }
 295 
 296     private TCPEndpoint getEndpoint() {
 297         synchronized (epList) {
 298             return epList.getLast();
 299         }
 300     }
 301 
 302     /**
 303      * Listen on transport's endpoint.
 304      */
 305     private void listen() throws RemoteException {
 306         assert Thread.holdsLock(this);
 307         TCPEndpoint ep = getEndpoint();
 308         int port = ep.getPort();
 309 
 310         if (server == null) {
 311             if (tcpLog.isLoggable(Log.BRIEF)) {
 312                 tcpLog.log(Log.BRIEF,
 313                     "(port " + port + ") create server socket");
 314             }
 315 
 316             try {
 317                 server = ep.newServerSocket();
 318                 /*
 319                  * Don't retry ServerSocket if creation fails since
 320                  * "port in use" will cause export to hang if an
 321                  * RMIFailureHandler is not installed.
 322                  */
 323                 Thread t = AccessController.doPrivileged(
 324                     new NewThreadAction(new AcceptLoop(server),
 325                                         "TCP Accept-" + port, true));
 326                 t.start();
 327             } catch (java.net.BindException e) {
 328                 throw new ExportException("Port already in use: " + port, e);
 329             } catch (IOException e) {
 330                 throw new ExportException("Listen failed on port: " + port, e);
 331             }
 332 
 333         } else {
 334             // otherwise verify security access to existing server socket
 335             SecurityManager sm = System.getSecurityManager();
 336             if (sm != null) {
 337                 sm.checkListen(port);
 338             }
 339         }
 340     }
 341 
 342     /**
 343      * Worker for accepting connections from a server socket.
 344      **/
 345     private class AcceptLoop implements Runnable {
 346 
 347         private final ServerSocket serverSocket;
 348 
 349         // state for throttling loop on exceptions (local to accept thread)
 350         private long lastExceptionTime = 0L;
 351         private int recentExceptionCount;
 352 
 353         AcceptLoop(ServerSocket serverSocket) {
 354             this.serverSocket = serverSocket;
 355         }
 356 
 357         public void run() {
 358             try {
 359                 executeAcceptLoop();
 360             } finally {
 361                 try {
 362                     /*
 363                      * Only one accept loop is started per server
 364                      * socket, so after no more connections will be
 365                      * accepted, ensure that the server socket is no
 366                      * longer listening.
 367                      */
 368                     serverSocket.close();
 369                 } catch (IOException e) {
 370                 }
 371             }
 372         }
 373 
 374         /**
 375          * Accepts connections from the server socket and executes
 376          * handlers for them in the thread pool.
 377          **/
 378         private void executeAcceptLoop() {
 379             if (tcpLog.isLoggable(Log.BRIEF)) {
 380                 tcpLog.log(Log.BRIEF, "listening on port " +
 381                            getEndpoint().getPort());
 382             }
 383 
 384             while (true) {
 385                 Socket socket = null;
 386                 try {
 387                     socket = serverSocket.accept();
 388 
 389                     /*
 390                      * Find client host name (or "0.0.0.0" if unknown)
 391                      */
 392                     InetAddress clientAddr = socket.getInetAddress();
 393                     String clientHost = (clientAddr != null
 394                                          ? clientAddr.getHostAddress()
 395                                          : "0.0.0.0");
 396 
 397                     /*
 398                      * Execute connection handler in the thread pool,
 399                      * which uses non-system threads.
 400                      */
 401                     try {
 402                         connectionThreadPool.execute(
 403                             new ConnectionHandler(socket, clientHost));
 404                     } catch (RejectedExecutionException e) {
 405                         closeSocket(socket);
 406                         tcpLog.log(Log.BRIEF,
 407                                    "rejected connection from " + clientHost);
 408                     }
 409 
 410                 } catch (Throwable t) {
 411                     try {
 412                         /*
 413                          * If the server socket has been closed, such
 414                          * as because there are no more exported
 415                          * objects, then we expect accept to throw an
 416                          * exception, so just terminate normally.
 417                          */
 418                         if (serverSocket.isClosed()) {
 419                             break;
 420                         }
 421 
 422                         try {
 423                             if (tcpLog.isLoggable(Level.WARNING)) {
 424                                 tcpLog.log(Level.WARNING,
 425                                            "accept loop for " + serverSocket +
 426                                            " throws", t);
 427                             }
 428                         } catch (Throwable tt) {
 429                         }
 430                     } finally {
 431                         /*
 432                          * Always close the accepted socket (if any)
 433                          * if an exception occurs, but only after
 434                          * logging an unexpected exception.
 435                          */
 436                         if (socket != null) {
 437                             closeSocket(socket);
 438                         }
 439                     }
 440 
 441                     /*
 442                      * In case we're running out of file descriptors,
 443                      * release resources held in caches.
 444                      */
 445                     if (!(t instanceof SecurityException)) {
 446                         try {
 447                             TCPEndpoint.shedConnectionCaches();
 448                         } catch (Throwable tt) {
 449                         }
 450                     }
 451 
 452                     /*
 453                      * A NoClassDefFoundError can occur if no file
 454                      * descriptors are available, in which case this
 455                      * loop should not terminate.
 456                      */
 457                     if (t instanceof Exception ||
 458                         t instanceof OutOfMemoryError ||
 459                         t instanceof NoClassDefFoundError)
 460                     {
 461                         if (!continueAfterAcceptFailure(t)) {
 462                             return;
 463                         }
 464                         // continue loop
 465                     } else if (t instanceof Error) {
 466                         throw (Error) t;
 467                     } else {
 468                         throw new Error(t.getMessage(), t.getCause()); 
 469                     }
 470                 }
 471             }
 472         }
 473 
 474         /**
 475          * Returns true if the accept loop should continue after the
 476          * specified exception has been caught, or false if the accept
 477          * loop should terminate (closing the server socket).  If
 478          * there is an RMIFailureHandler, this method returns the
 479          * result of passing the specified exception to it; otherwise,
 480          * this method always returns true, after sleeping to throttle
 481          * the accept loop if necessary.
 482          **/
 483         private boolean continueAfterAcceptFailure(Throwable t) {
 484             RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
 485             if (fh != null) {
 486                 return fh.failure(t instanceof Exception ? (Exception) t :
 487                                   new InvocationTargetException(t));
 488             } else {
 489                 throttleLoopOnException();
 490                 return true;
 491             }
 492         }
 493 
 494         /**
 495          * Throttles the accept loop after an exception has been
 496          * caught: if a burst of 10 exceptions in 5 seconds occurs,
 497          * then wait for 10 seconds to curb busy CPU usage.
 498          **/
 499         private void throttleLoopOnException() {
 500             long now = System.currentTimeMillis();
 501             if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
 502                 // last exception was long ago (or this is the first)
 503                 lastExceptionTime = now;
 504                 recentExceptionCount = 0;
 505             } else {
 506                 // exception burst window was started recently
 507                 if (++recentExceptionCount >= 10) {
 508                     try {
 509                         Thread.sleep(10000);
 510                     } catch (InterruptedException ignore) {
 511                     }
 512                 }
 513             }
 514         }
 515     }
 516 
 517     /** close socket and eat exception */
 518     private static void closeSocket(Socket sock) {
 519         try {
 520             sock.close();
 521         } catch (IOException ex) {
 522             // eat exception
 523         }
 524     }
 525 
 526     /**
 527      * handleMessages decodes transport operations and handles messages
 528      * appropriately.  If an exception occurs during message handling,
 529      * the socket is closed.
 530      */
 531     void handleMessages(Connection conn, boolean persistent) {
 532         int port = getEndpoint().getPort();
 533 
 534         try {
 535             DataInputStream in = new DataInputStream(conn.getInputStream());
 536             do {
 537                 int op = in.read();     // transport op
 538                 if (op == -1) {
 539                     if (tcpLog.isLoggable(Log.BRIEF)) {
 540                         tcpLog.log(Log.BRIEF, "(port " +
 541                             port + ") connection closed");
 542                     }
 543                     break;
 544                 }
 545 
 546                 if (tcpLog.isLoggable(Log.BRIEF)) {
 547                     tcpLog.log(Log.BRIEF, "(port " + port +
 548                         ") op = " + op);
 549                 }
 550 
 551                 switch (op) {
 552                 case TransportConstants.Call:
 553                     // service incoming RMI call
 554                     RemoteCall call = new StreamRemoteCall(conn);
 555                     if (serviceCall(call) == false)
 556                         return;
 557                     break;
 558 
 559                 case TransportConstants.Ping:
 560                     // send ack for ping
 561                     DataOutputStream out =
 562                         new DataOutputStream(conn.getOutputStream());
 563                     out.writeByte(TransportConstants.PingAck);
 564                     conn.releaseOutputStream();
 565                     break;
 566 
 567                 case TransportConstants.DGCAck:
 568                     DGCAckHandler.received(UID.read(in));
 569                     break;
 570 
 571                 default:
 572                     throw new IOException("unknown transport op " + op);
 573                 }
 574             } while (persistent);
 575 
 576         } catch (IOException e) {
 577             // exception during processing causes connection to close (below)
 578             if (tcpLog.isLoggable(Log.BRIEF)) {
 579                 tcpLog.log(Log.BRIEF, "(port " + port +
 580                     ") exception: ", e);
 581             }
 582         } finally {
 583             try {
 584                 conn.close();
 585             } catch (IOException ex) {
 586                 // eat exception
 587             }
 588         }
 589     }
 590 
 591     /**
 592      * Returns the client host for the current thread's connection.  Throws
 593      * ServerNotActiveException if no connection is active for this thread.
 594      */
 595     public static String getClientHost() throws ServerNotActiveException {
 596         ConnectionHandler h = threadConnectionHandler.get();
 597         if (h != null) {
 598             return h.getClientHost();
 599         } else {
 600             throw new ServerNotActiveException("not in a remote call");
 601         }
 602     }
 603 
 604     /**
 605      * Services messages on accepted connection
 606      */
 607     private class ConnectionHandler implements Runnable {
 608 
 609         /** int value of "POST" in ASCII (Java's specified data formats
 610          *  make this once-reviled tactic again socially acceptable) */
 611         private static final int POST = 0x504f5354;
 612 
 613         /** most recently accept-authorized AccessControlContext */
 614         private AccessControlContext okContext;
 615         /** cache of accept-authorized AccessControlContexts */
 616         private Map<AccessControlContext,
 617                     Reference<AccessControlContext>> authCache;
 618         /** security manager which authorized contexts in authCache */
 619         private SecurityManager cacheSecurityManager = null;
 620 
 621         private Socket socket;
 622         private String remoteHost;
 623 
 624         ConnectionHandler(Socket socket, String remoteHost) {
 625             this.socket = socket;
 626             this.remoteHost = remoteHost;
 627         }
 628 
 629         String getClientHost() {
 630             return remoteHost;
 631         }
 632 
 633         /**
 634          * Verify that the given AccessControlContext has permission to
 635          * accept this connection.
 636          */
 637         void checkAcceptPermission(SecurityManager sm,
 638                                    AccessControlContext acc)
 639         {
 640             /*
 641              * Note: no need to synchronize on cache-related fields, since this
 642              * method only gets called from the ConnectionHandler's thread.
 643              */
 644             if (sm != cacheSecurityManager) {
 645                 okContext = null;
 646                 authCache = new WeakHashMap<AccessControlContext,
 647                                             Reference<AccessControlContext>>();
 648                 cacheSecurityManager = sm;
 649             }
 650             if (acc.equals(okContext) || authCache.containsKey(acc)) {
 651                 return;
 652             }
 653             InetAddress addr = socket.getInetAddress();
 654             String host = (addr != null) ? addr.getHostAddress() : "*";
 655 
 656             sm.checkAccept(host, socket.getPort());
 657 
 658             authCache.put(acc, new SoftReference<AccessControlContext>(acc));
 659             okContext = acc;
 660         }
 661 
 662         public void run() {
 663             Thread t = Thread.currentThread();
 664             String name = t.getName();
 665             try {
 666                 t.setName("RMI TCP Connection(" +
 667                           connectionCount.incrementAndGet() +
 668                           ")-" + remoteHost);
 669                 run0();
 670             } finally {
 671                 t.setName(name);
 672             }
 673         }
 674 
 675         private void run0() {
 676             TCPEndpoint endpoint = getEndpoint();
 677             int port = endpoint.getPort();
 678 
 679             threadConnectionHandler.set(this);
 680 
 681             // set socket to disable Nagle's algorithm (always send
 682             // immediately)
 683             // TBD: should this be left up to socket factory instead?
 684             try {
 685                 socket.setTcpNoDelay(true);
 686             } catch (Exception e) {
 687                 // if we fail to set this, ignore and proceed anyway
 688             }
 689             // set socket to timeout after excessive idle time
 690             try {
 691                 if (connectionReadTimeout > 0)
 692                     socket.setSoTimeout(connectionReadTimeout);
 693             } catch (Exception e) {
 694                 // too bad, continue anyway
 695             }
 696 
 697             try {
 698                 InputStream sockIn = socket.getInputStream();
 699                 InputStream bufIn = sockIn.markSupported()
 700                         ? sockIn
 701                         : new BufferedInputStream(sockIn);
 702 
 703                 // Read magic (or HTTP wrapper)
 704                 bufIn.mark(4);
 705                 DataInputStream in = new DataInputStream(bufIn);
 706                 int magic = in.readInt();
 707 
 708                 if (magic == POST) {
 709                     tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
 710 
 711                     // It's really a HTTP-wrapped request.  Repackage
 712                     // the socket in a HttpReceiveSocket, reinitialize
 713                     // sockIn and in, and reread magic.
 714                     bufIn.reset();      // unread "POST"
 715 
 716                     try {
 717                         socket = new HttpReceiveSocket(socket, bufIn, null);
 718                         remoteHost = "0.0.0.0";
 719                         sockIn = socket.getInputStream();
 720                         bufIn = new BufferedInputStream(sockIn);
 721                         in = new DataInputStream(bufIn);
 722                         magic = in.readInt();
 723 
 724                     } catch (IOException e) {
 725                         throw new RemoteException("Error HTTP-unwrapping call",
 726                                                   e);
 727                     }
 728                 }
 729                 // bufIn's mark will invalidate itself when it overflows
 730                 // so it doesn't have to be turned off
 731 
 732                 // read and verify transport header
 733                 short version = in.readShort();
 734                 if (magic != TransportConstants.Magic ||
 735                     version != TransportConstants.Version) {
 736                     // protocol mismatch detected...
 737                     // just close socket: this would recurse if we marshal an
 738                     // exception to the client and the protocol at other end
 739                     // doesn't match.
 740                     closeSocket(socket);
 741                     return;
 742                 }
 743 
 744                 OutputStream sockOut = socket.getOutputStream();
 745                 BufferedOutputStream bufOut =
 746                     new BufferedOutputStream(sockOut);
 747                 DataOutputStream out = new DataOutputStream(bufOut);
 748 
 749                 int remotePort = socket.getPort();
 750 
 751                 if (tcpLog.isLoggable(Log.BRIEF)) {
 752                     tcpLog.log(Log.BRIEF, "accepted socket from [" +
 753                                      remoteHost + ":" + remotePort + "]");
 754                 }
 755 
 756                 TCPEndpoint ep;
 757                 TCPChannel ch;
 758                 TCPConnection conn;
 759 
 760                 // send ack (or nack) for protocol
 761                 byte protocol = in.readByte();
 762                 switch (protocol) {
 763                 case TransportConstants.SingleOpProtocol:
 764                     // no ack for protocol
 765 
 766                     // create dummy channel for receiving messages
 767                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 768                                          endpoint.getClientSocketFactory(),
 769                                          endpoint.getServerSocketFactory());
 770                     ch = new TCPChannel(TCPTransport.this, ep);
 771                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 772 
 773                     // read input messages
 774                     handleMessages(conn, false);
 775                     break;
 776 
 777                 case TransportConstants.StreamProtocol:
 778                     // send ack
 779                     out.writeByte(TransportConstants.ProtocolAck);
 780 
 781                     // suggest endpoint (in case client doesn't know host name)
 782                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 783                         tcpLog.log(Log.VERBOSE, "(port " + port +
 784                             ") " + "suggesting " + remoteHost + ":" +
 785                             remotePort);
 786                     }
 787 
 788                     out.writeUTF(remoteHost);
 789                     out.writeInt(remotePort);
 790                     out.flush();
 791 
 792                     // read and discard (possibly bogus) endpoint
 793                     // REMIND: would be faster to read 2 bytes then skip N+4
 794                     String clientHost = in.readUTF();
 795                     int    clientPort = in.readInt();
 796                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 797                         tcpLog.log(Log.VERBOSE, "(port " + port +
 798                             ") client using " + clientHost + ":" + clientPort);
 799                     }
 800 
 801                     // create dummy channel for receiving messages
 802                     // (why not use clientHost and clientPort?)
 803                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 804                                          endpoint.getClientSocketFactory(),
 805                                          endpoint.getServerSocketFactory());
 806                     ch = new TCPChannel(TCPTransport.this, ep);
 807                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 808 
 809                     // read input messages
 810                     handleMessages(conn, true);
 811                     break;
 812 
 813                 case TransportConstants.MultiplexProtocol:
 814                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 815                         tcpLog.log(Log.VERBOSE, "(port " + port +
 816                             ") accepting multiplex protocol");
 817                     }
 818 
 819                     // send ack
 820                     out.writeByte(TransportConstants.ProtocolAck);
 821 
 822                     // suggest endpoint (in case client doesn't already have one)
 823                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 824                         tcpLog.log(Log.VERBOSE, "(port " + port +
 825                             ") suggesting " + remoteHost + ":" + remotePort);
 826                     }
 827 
 828                     out.writeUTF(remoteHost);
 829                     out.writeInt(remotePort);
 830                     out.flush();
 831 
 832                     // read endpoint client has decided to use
 833                     ep = new TCPEndpoint(in.readUTF(), in.readInt(),
 834                                          endpoint.getClientSocketFactory(),
 835                                          endpoint.getServerSocketFactory());
 836                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 837                         tcpLog.log(Log.VERBOSE, "(port " +
 838                             port + ") client using " +
 839                             ep.getHost() + ":" + ep.getPort());
 840                     }
 841 
 842                     ConnectionMultiplexer multiplexer;
 843                     synchronized (channelTable) {
 844                         // create or find channel for this endpoint
 845                         ch = getChannel(ep);
 846                         multiplexer =
 847                             new ConnectionMultiplexer(ch, bufIn, sockOut,
 848                                                       false);
 849                         ch.useMultiplexer(multiplexer);
 850                     }
 851                     multiplexer.run();
 852                     break;
 853 
 854                 default:
 855                     // protocol not understood, send nack and close socket
 856                     out.writeByte(TransportConstants.ProtocolNack);
 857                     out.flush();
 858                     break;
 859                 }
 860 
 861             } catch (IOException e) {
 862                 // socket in unknown state: destroy socket
 863                 tcpLog.log(Log.BRIEF, "terminated with exception:", e);
 864             } finally {
 865                 closeSocket(socket);
 866             }
 867         }
 868     }
 869 }