1 /*
   2  * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.rmi.transport.tcp;
  26 
  27 import java.lang.ref.Reference;
  28 import java.lang.ref.SoftReference;
  29 import java.lang.ref.WeakReference;
  30 import java.lang.reflect.InvocationTargetException;
  31 import java.lang.reflect.UndeclaredThrowableException;
  32 import java.io.DataInputStream;
  33 import java.io.DataOutputStream;
  34 import java.io.IOException;
  35 import java.io.InputStream;
  36 import java.io.OutputStream;
  37 import java.io.BufferedInputStream;
  38 import java.io.BufferedOutputStream;
  39 import java.net.InetAddress;
  40 import java.net.ServerSocket;
  41 import java.net.Socket;
  42 import java.rmi.RemoteException;
  43 import java.rmi.server.ExportException;
  44 import java.rmi.server.LogStream;
  45 import java.rmi.server.RMIFailureHandler;
  46 import java.rmi.server.RMISocketFactory;
  47 import java.rmi.server.RemoteCall;
  48 import java.rmi.server.ServerNotActiveException;
  49 import java.rmi.server.UID;
  50 import java.security.AccessControlContext;
  51 import java.security.AccessController;
  52 import java.security.PrivilegedAction;
  53 import java.util.ArrayList;
  54 import java.util.LinkedList;
  55 import java.util.List;
  56 import java.util.Map;
  57 import java.util.WeakHashMap;
  58 import java.util.logging.Level;
  59 import java.util.concurrent.ExecutorService;
  60 import java.util.concurrent.RejectedExecutionException;
  61 import java.util.concurrent.SynchronousQueue;
  62 import java.util.concurrent.ThreadFactory;
  63 import java.util.concurrent.ThreadPoolExecutor;
  64 import java.util.concurrent.TimeUnit;
  65 import java.util.concurrent.atomic.AtomicInteger;
  66 import sun.rmi.runtime.Log;
  67 import sun.rmi.runtime.NewThreadAction;
  68 import sun.rmi.transport.Channel;
  69 import sun.rmi.transport.Connection;
  70 import sun.rmi.transport.DGCAckHandler;
  71 import sun.rmi.transport.Endpoint;
  72 import sun.rmi.transport.StreamRemoteCall;
  73 import sun.rmi.transport.Target;
  74 import sun.rmi.transport.Transport;
  75 import sun.rmi.transport.TransportConstants;
  76 import sun.rmi.transport.proxy.HttpReceiveSocket;
  77 
  78 /**
  79  * TCPTransport is the socket-based implementation of the RMI Transport
  80  * abstraction.
  81  *
  82  * @author Ann Wollrath
  83  * @author Peter Jones
  84  */
  85 @SuppressWarnings("deprecation")
  86 public class TCPTransport extends Transport {
  87 
  88     /* tcp package log */
  89     static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
  90         LogStream.parseLevel(AccessController.doPrivileged(
  91             (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.logLevel"))));
  92 
  93     /** maximum number of connection handler threads */
  94     private static final int maxConnectionThreads =     // default no limit
  95         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
  96             Integer.getInteger("sun.rmi.transport.tcp.maxConnectionThreads",
  97                                Integer.MAX_VALUE));
  98 
  99     /** keep alive time for idle connection handler threads */
 100     private static final long threadKeepAliveTime =     // default 1 minute
 101         AccessController.doPrivileged((PrivilegedAction<Long>) () ->
 102             Long.getLong("sun.rmi.transport.tcp.threadKeepAliveTime", 60000));
 103 
 104     /** thread pool for connection handlers */
 105     private static final ExecutorService connectionThreadPool =
 106         new ThreadPoolExecutor(0, maxConnectionThreads,
 107             threadKeepAliveTime, TimeUnit.MILLISECONDS,
 108             new SynchronousQueue<Runnable>(),
 109             new ThreadFactory() {
 110                 public Thread newThread(Runnable runnable) {
 111                     return AccessController.doPrivileged(new NewThreadAction(
 112                         runnable, "TCP Connection(idle)", true, true));
 113                 }
 114             });
 115 
 116     /** total connections handled */
 117     private static final AtomicInteger connectionCount = new AtomicInteger(0);
 118 
 119     /** client host for the current thread's connection */
 120     private static final ThreadLocal<ConnectionHandler>
 121         threadConnectionHandler = new ThreadLocal<>();
 122 
 123     /** endpoints for this transport */
 124     private final LinkedList<TCPEndpoint> epList;
 125     /** number of objects exported on this transport */
 126     private int exportCount = 0;
 127     /** server socket for this transport */
 128     private ServerSocket server = null;
 129     /** table mapping endpoints to channels */
 130     private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
 131         new WeakHashMap<>();
 132 
 133     static final RMISocketFactory defaultSocketFactory =
 134         RMISocketFactory.getDefaultSocketFactory();
 135 
 136     /** number of milliseconds in accepted-connection timeout.
 137      * Warning: this should be greater than 15 seconds (the client-side
 138      * timeout), and defaults to 2 hours.
 139      * The maximum representable value is slightly more than 24 days
 140      * and 20 hours.
 141      */
 142     private static final int connectionReadTimeout =    // default 2 hours
 143         AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
 144             Integer.getInteger("sun.rmi.transport.tcp.readTimeout", 2 * 3600 * 1000));
 145 
 146     /**
 147      * Constructs a TCPTransport.
 148      */
 149     TCPTransport(LinkedList<TCPEndpoint> epList)  {
 150         // assert ((epList.size() != null) && (epList.size() >= 1))
 151         this.epList = epList;
 152         if (tcpLog.isLoggable(Log.BRIEF)) {
 153             tcpLog.log(Log.BRIEF, "Version = " +
 154                 TransportConstants.Version + ", ep = " + getEndpoint());
 155         }
 156     }
 157 
 158     /**
 159      * Closes all cached connections in every channel subordinated to this
 160      * transport.  Currently, this only closes outgoing connections.
 161      */
 162     public void shedConnectionCaches() {
 163         List<TCPChannel> channels;
 164         synchronized (channelTable) {
 165             channels = new ArrayList<TCPChannel>(channelTable.values().size());
 166             for (Reference<TCPChannel> ref : channelTable.values()) {
 167                 TCPChannel ch = ref.get();
 168                 if (ch != null) {
 169                     channels.add(ch);
 170                 }
 171             }
 172         }
 173         for (TCPChannel channel : channels) {
 174             channel.shedCache();
 175         }
 176     }
 177 
 178     /**
 179      * Returns a <I>Channel</I> that generates connections to the
 180      * endpoint <I>ep</I>. A Channel is an object that creates and
 181      * manages connections of a particular type to some particular
 182      * address space.
 183      * @param ep the endpoint to which connections will be generated.
 184      * @return the channel or null if the transport cannot
 185      * generate connections to this endpoint
 186      */
 187     public TCPChannel getChannel(Endpoint ep) {
 188         TCPChannel ch = null;
 189         if (ep instanceof TCPEndpoint) {
 190             synchronized (channelTable) {
 191                 Reference<TCPChannel> ref = channelTable.get(ep);
 192                 if (ref != null) {
 193                     ch = ref.get();
 194                 }
 195                 if (ch == null) {
 196                     TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
 197                     ch = new TCPChannel(this, tcpEndpoint);
 198                     channelTable.put(tcpEndpoint,
 199                                      new WeakReference<TCPChannel>(ch));
 200                 }
 201             }
 202         }
 203         return ch;
 204     }
 205 
 206     /**
 207      * Removes the <I>Channel</I> that generates connections to the
 208      * endpoint <I>ep</I>.
 209      */
 210     public void free(Endpoint ep) {
 211         if (ep instanceof TCPEndpoint) {
 212             synchronized (channelTable) {
 213                 Reference<TCPChannel> ref = channelTable.remove(ep);
 214                 if (ref != null) {
 215                     TCPChannel channel = ref.get();
 216                     if (channel != null) {
 217                         channel.shedCache();
 218                     }
 219                 }
 220             }
 221         }
 222     }
 223 
 224     /**
 225      * Export the object so that it can accept incoming calls.
 226      */
 227     public void exportObject(Target target) throws RemoteException {
 228         /*
 229          * Ensure that a server socket is listening, and count this
 230          * export while synchronized to prevent the server socket from
 231          * being closed due to concurrent unexports.
 232          */
 233         synchronized (this) {
 234             listen();
 235             exportCount++;
 236         }
 237 
 238         /*
 239          * Try to add the Target to the exported object table; keep
 240          * counting this export (to keep server socket open) only if
 241          * that succeeds.
 242          */
 243         boolean ok = false;
 244         try {
 245             super.exportObject(target);
 246             ok = true;
 247         } finally {
 248             if (!ok) {
 249                 synchronized (this) {
 250                     decrementExportCount();
 251                 }
 252             }
 253         }
 254     }
 255 
 256     protected synchronized void targetUnexported() {
 257         decrementExportCount();
 258     }
 259 
 260     /**
 261      * Decrements the count of exported objects, closing the current
 262      * server socket if the count reaches zero.
 263      **/
 264     private void decrementExportCount() {
 265         assert Thread.holdsLock(this);
 266         exportCount--;
 267         if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
 268             ServerSocket ss = server;
 269             server = null;
 270             try {
 271                 ss.close();
 272             } catch (IOException e) {
 273             }
 274         }
 275     }
 276 
 277     /**
 278      * Verify that the current access control context has permission to
 279      * accept the connection being dispatched by the current thread.
 280      */
 281     protected void checkAcceptPermission(AccessControlContext acc) {
 282         SecurityManager sm = System.getSecurityManager();
 283         if (sm == null) {
 284             return;
 285         }
 286         ConnectionHandler h = threadConnectionHandler.get();
 287         if (h == null) {
 288             throw new Error(
 289                 "checkAcceptPermission not in ConnectionHandler thread");
 290         }
 291         h.checkAcceptPermission(sm, acc);
 292     }
 293 
 294     private TCPEndpoint getEndpoint() {
 295         synchronized (epList) {
 296             return epList.getLast();
 297         }
 298     }
 299 
 300     /**
 301      * Listen on transport's endpoint.
 302      */
 303     private void listen() throws RemoteException {
 304         assert Thread.holdsLock(this);
 305         TCPEndpoint ep = getEndpoint();
 306         int port = ep.getPort();
 307 
 308         if (server == null) {
 309             if (tcpLog.isLoggable(Log.BRIEF)) {
 310                 tcpLog.log(Log.BRIEF,
 311                     "(port " + port + ") create server socket");
 312             }
 313 
 314             try {
 315                 server = ep.newServerSocket();
 316                 /*
 317                  * Don't retry ServerSocket if creation fails since
 318                  * "port in use" will cause export to hang if an
 319                  * RMIFailureHandler is not installed.
 320                  */
 321                 Thread t = AccessController.doPrivileged(
 322                     new NewThreadAction(new AcceptLoop(server),
 323                                         "TCP Accept-" + port, true));
 324                 t.start();
 325             } catch (java.net.BindException e) {
 326                 throw new ExportException("Port already in use: " + port, e);
 327             } catch (IOException e) {
 328                 throw new ExportException("Listen failed on port: " + port, e);
 329             }
 330 
 331         } else {
 332             // otherwise verify security access to existing server socket
 333             SecurityManager sm = System.getSecurityManager();
 334             if (sm != null) {
 335                 sm.checkListen(port);
 336             }
 337         }
 338     }
 339 
 340     /**
 341      * Worker for accepting connections from a server socket.
 342      **/
 343     private class AcceptLoop implements Runnable {
 344 
 345         private final ServerSocket serverSocket;
 346 
 347         // state for throttling loop on exceptions (local to accept thread)
 348         private long lastExceptionTime = 0L;
 349         private int recentExceptionCount;
 350 
 351         AcceptLoop(ServerSocket serverSocket) {
 352             this.serverSocket = serverSocket;
 353         }
 354 
 355         public void run() {
 356             try {
 357                 executeAcceptLoop();
 358             } finally {
 359                 try {
 360                     /*
 361                      * Only one accept loop is started per server
 362                      * socket, so after no more connections will be
 363                      * accepted, ensure that the server socket is no
 364                      * longer listening.
 365                      */
 366                     serverSocket.close();
 367                 } catch (IOException e) {
 368                 }
 369             }
 370         }
 371 
 372         /**
 373          * Accepts connections from the server socket and executes
 374          * handlers for them in the thread pool.
 375          **/
 376         private void executeAcceptLoop() {
 377             if (tcpLog.isLoggable(Log.BRIEF)) {
 378                 tcpLog.log(Log.BRIEF, "listening on port " +
 379                            getEndpoint().getPort());
 380             }
 381 
 382             while (true) {
 383                 Socket socket = null;
 384                 try {
 385                     socket = serverSocket.accept();
 386 
 387                     /*
 388                      * Find client host name (or "0.0.0.0" if unknown)
 389                      */
 390                     InetAddress clientAddr = socket.getInetAddress();
 391                     String clientHost = (clientAddr != null
 392                                          ? clientAddr.getHostAddress()
 393                                          : "0.0.0.0");
 394 
 395                     /*
 396                      * Execute connection handler in the thread pool,
 397                      * which uses non-system threads.
 398                      */
 399                     try {
 400                         connectionThreadPool.execute(
 401                             new ConnectionHandler(socket, clientHost));
 402                     } catch (RejectedExecutionException e) {
 403                         closeSocket(socket);
 404                         tcpLog.log(Log.BRIEF,
 405                                    "rejected connection from " + clientHost);
 406                     }
 407 
 408                 } catch (Throwable t) {
 409                     try {
 410                         /*
 411                          * If the server socket has been closed, such
 412                          * as because there are no more exported
 413                          * objects, then we expect accept to throw an
 414                          * exception, so just terminate normally.
 415                          */
 416                         if (serverSocket.isClosed()) {
 417                             break;
 418                         }
 419 
 420                         try {
 421                             if (tcpLog.isLoggable(Level.WARNING)) {
 422                                 tcpLog.log(Level.WARNING,
 423                                            "accept loop for " + serverSocket +
 424                                            " throws", t);
 425                             }
 426                         } catch (Throwable tt) {
 427                         }
 428                     } finally {
 429                         /*
 430                          * Always close the accepted socket (if any)
 431                          * if an exception occurs, but only after
 432                          * logging an unexpected exception.
 433                          */
 434                         if (socket != null) {
 435                             closeSocket(socket);
 436                         }
 437                     }
 438 
 439                     /*
 440                      * In case we're running out of file descriptors,
 441                      * release resources held in caches.
 442                      */
 443                     if (!(t instanceof SecurityException)) {
 444                         try {
 445                             TCPEndpoint.shedConnectionCaches();
 446                         } catch (Throwable tt) {
 447                         }
 448                     }
 449 
 450                     /*
 451                      * A NoClassDefFoundError can occur if no file
 452                      * descriptors are available, in which case this
 453                      * loop should not terminate.
 454                      */
 455                     if (t instanceof Exception ||
 456                         t instanceof OutOfMemoryError ||
 457                         t instanceof NoClassDefFoundError)
 458                     {
 459                         if (!continueAfterAcceptFailure(t)) {
 460                             return;
 461                         }
 462                         // continue loop
 463                     } else if (t instanceof Error) {
 464                         throw (Error) t;
 465                     } else {
 466                         throw new UndeclaredThrowableException(t);
 467                     }
 468                 }
 469             }
 470         }
 471 
 472         /**
 473          * Returns true if the accept loop should continue after the
 474          * specified exception has been caught, or false if the accept
 475          * loop should terminate (closing the server socket).  If
 476          * there is an RMIFailureHandler, this method returns the
 477          * result of passing the specified exception to it; otherwise,
 478          * this method always returns true, after sleeping to throttle
 479          * the accept loop if necessary.
 480          **/
 481         private boolean continueAfterAcceptFailure(Throwable t) {
 482             RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
 483             if (fh != null) {
 484                 return fh.failure(t instanceof Exception ? (Exception) t :
 485                                   new InvocationTargetException(t));
 486             } else {
 487                 throttleLoopOnException();
 488                 return true;
 489             }
 490         }
 491 
 492         /**
 493          * Throttles the accept loop after an exception has been
 494          * caught: if a burst of 10 exceptions in 5 seconds occurs,
 495          * then wait for 10 seconds to curb busy CPU usage.
 496          **/
 497         private void throttleLoopOnException() {
 498             long now = System.currentTimeMillis();
 499             if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
 500                 // last exception was long ago (or this is the first)
 501                 lastExceptionTime = now;
 502                 recentExceptionCount = 0;
 503             } else {
 504                 // exception burst window was started recently
 505                 if (++recentExceptionCount >= 10) {
 506                     try {
 507                         Thread.sleep(10000);
 508                     } catch (InterruptedException ignore) {
 509                     }
 510                 }
 511             }
 512         }
 513     }
 514 
 515     /** close socket and eat exception */
 516     private static void closeSocket(Socket sock) {
 517         try {
 518             sock.close();
 519         } catch (IOException ex) {
 520             // eat exception
 521         }
 522     }
 523 
 524     /**
 525      * handleMessages decodes transport operations and handles messages
 526      * appropriately.  If an exception occurs during message handling,
 527      * the socket is closed.
 528      */
 529     void handleMessages(Connection conn, boolean persistent) {
 530         int port = getEndpoint().getPort();
 531 
 532         try {
 533             DataInputStream in = new DataInputStream(conn.getInputStream());
 534             do {
 535                 int op = in.read();     // transport op
 536                 if (op == -1) {
 537                     if (tcpLog.isLoggable(Log.BRIEF)) {
 538                         tcpLog.log(Log.BRIEF, "(port " +
 539                             port + ") connection closed");
 540                     }
 541                     break;
 542                 }
 543 
 544                 if (tcpLog.isLoggable(Log.BRIEF)) {
 545                     tcpLog.log(Log.BRIEF, "(port " + port +
 546                         ") op = " + op);
 547                 }
 548 
 549                 switch (op) {
 550                 case TransportConstants.Call:
 551                     // service incoming RMI call
 552                     RemoteCall call = new StreamRemoteCall(conn);
 553                     if (serviceCall(call) == false)
 554                         return;
 555                     break;
 556 
 557                 case TransportConstants.Ping:
 558                     // send ack for ping
 559                     DataOutputStream out =
 560                         new DataOutputStream(conn.getOutputStream());
 561                     out.writeByte(TransportConstants.PingAck);
 562                     conn.releaseOutputStream();
 563                     break;
 564 
 565                 case TransportConstants.DGCAck:
 566                     DGCAckHandler.received(UID.read(in));
 567                     break;
 568 
 569                 default:
 570                     throw new IOException("unknown transport op " + op);
 571                 }
 572             } while (persistent);
 573 
 574         } catch (IOException e) {
 575             // exception during processing causes connection to close (below)
 576             if (tcpLog.isLoggable(Log.BRIEF)) {
 577                 tcpLog.log(Log.BRIEF, "(port " + port +
 578                     ") exception: ", e);
 579             }
 580         } finally {
 581             try {
 582                 conn.close();
 583             } catch (IOException ex) {
 584                 // eat exception
 585             }
 586         }
 587     }
 588 
 589     /**
 590      * Returns the client host for the current thread's connection.  Throws
 591      * ServerNotActiveException if no connection is active for this thread.
 592      */
 593     public static String getClientHost() throws ServerNotActiveException {
 594         ConnectionHandler h = threadConnectionHandler.get();
 595         if (h != null) {
 596             return h.getClientHost();
 597         } else {
 598             throw new ServerNotActiveException("not in a remote call");
 599         }
 600     }
 601 
 602     /**
 603      * Services messages on accepted connection
 604      */
 605     private class ConnectionHandler implements Runnable {
 606 
 607         /** int value of "POST" in ASCII (Java's specified data formats
 608          *  make this once-reviled tactic again socially acceptable) */
 609         private static final int POST = 0x504f5354;
 610 
 611         /** most recently accept-authorized AccessControlContext */
 612         private AccessControlContext okContext;
 613         /** cache of accept-authorized AccessControlContexts */
 614         private Map<AccessControlContext,
 615                     Reference<AccessControlContext>> authCache;
 616         /** security manager which authorized contexts in authCache */
 617         private SecurityManager cacheSecurityManager = null;
 618 
 619         private Socket socket;
 620         private String remoteHost;
 621 
 622         ConnectionHandler(Socket socket, String remoteHost) {
 623             this.socket = socket;
 624             this.remoteHost = remoteHost;
 625         }
 626 
 627         String getClientHost() {
 628             return remoteHost;
 629         }
 630 
 631         /**
 632          * Verify that the given AccessControlContext has permission to
 633          * accept this connection.
 634          */
 635         void checkAcceptPermission(SecurityManager sm,
 636                                    AccessControlContext acc)
 637         {
 638             /*
 639              * Note: no need to synchronize on cache-related fields, since this
 640              * method only gets called from the ConnectionHandler's thread.
 641              */
 642             if (sm != cacheSecurityManager) {
 643                 okContext = null;
 644                 authCache = new WeakHashMap<AccessControlContext,
 645                                             Reference<AccessControlContext>>();
 646                 cacheSecurityManager = sm;
 647             }
 648             if (acc.equals(okContext) || authCache.containsKey(acc)) {
 649                 return;
 650             }
 651             InetAddress addr = socket.getInetAddress();
 652             String host = (addr != null) ? addr.getHostAddress() : "*";
 653 
 654             sm.checkAccept(host, socket.getPort());
 655 
 656             authCache.put(acc, new SoftReference<AccessControlContext>(acc));
 657             okContext = acc;
 658         }
 659 
 660         public void run() {
 661             Thread t = Thread.currentThread();
 662             String name = t.getName();
 663             try {
 664                 t.setName("RMI TCP Connection(" +
 665                           connectionCount.incrementAndGet() +
 666                           ")-" + remoteHost);
 667                 run0();
 668             } finally {
 669                 t.setName(name);
 670             }
 671         }
 672 
 673         private void run0() {
 674             TCPEndpoint endpoint = getEndpoint();
 675             int port = endpoint.getPort();
 676 
 677             threadConnectionHandler.set(this);
 678 
 679             // set socket to disable Nagle's algorithm (always send
 680             // immediately)
 681             // TBD: should this be left up to socket factory instead?
 682             try {
 683                 socket.setTcpNoDelay(true);
 684             } catch (Exception e) {
 685                 // if we fail to set this, ignore and proceed anyway
 686             }
 687             // set socket to timeout after excessive idle time
 688             try {
 689                 if (connectionReadTimeout > 0)
 690                     socket.setSoTimeout(connectionReadTimeout);
 691             } catch (Exception e) {
 692                 // too bad, continue anyway
 693             }
 694 
 695             try {
 696                 InputStream sockIn = socket.getInputStream();
 697                 InputStream bufIn = sockIn.markSupported()
 698                         ? sockIn
 699                         : new BufferedInputStream(sockIn);
 700 
 701                 // Read magic (or HTTP wrapper)
 702                 bufIn.mark(4);
 703                 DataInputStream in = new DataInputStream(bufIn);
 704                 int magic = in.readInt();
 705 
 706                 if (magic == POST) {
 707                     tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
 708 
 709                     // It's really a HTTP-wrapped request.  Repackage
 710                     // the socket in a HttpReceiveSocket, reinitialize
 711                     // sockIn and in, and reread magic.
 712                     bufIn.reset();      // unread "POST"
 713 
 714                     try {
 715                         socket = new HttpReceiveSocket(socket, bufIn, null);
 716                         remoteHost = "0.0.0.0";
 717                         sockIn = socket.getInputStream();
 718                         bufIn = new BufferedInputStream(sockIn);
 719                         in = new DataInputStream(bufIn);
 720                         magic = in.readInt();
 721 
 722                     } catch (IOException e) {
 723                         throw new RemoteException("Error HTTP-unwrapping call",
 724                                                   e);
 725                     }
 726                 }
 727                 // bufIn's mark will invalidate itself when it overflows
 728                 // so it doesn't have to be turned off
 729 
 730                 // read and verify transport header
 731                 short version = in.readShort();
 732                 if (magic != TransportConstants.Magic ||
 733                     version != TransportConstants.Version) {
 734                     // protocol mismatch detected...
 735                     // just close socket: this would recurse if we marshal an
 736                     // exception to the client and the protocol at other end
 737                     // doesn't match.
 738                     closeSocket(socket);
 739                     return;
 740                 }
 741 
 742                 OutputStream sockOut = socket.getOutputStream();
 743                 BufferedOutputStream bufOut =
 744                     new BufferedOutputStream(sockOut);
 745                 DataOutputStream out = new DataOutputStream(bufOut);
 746 
 747                 int remotePort = socket.getPort();
 748 
 749                 if (tcpLog.isLoggable(Log.BRIEF)) {
 750                     tcpLog.log(Log.BRIEF, "accepted socket from [" +
 751                                      remoteHost + ":" + remotePort + "]");
 752                 }
 753 
 754                 TCPEndpoint ep;
 755                 TCPChannel ch;
 756                 TCPConnection conn;
 757 
 758                 // send ack (or nack) for protocol
 759                 byte protocol = in.readByte();
 760                 switch (protocol) {
 761                 case TransportConstants.SingleOpProtocol:
 762                     // no ack for protocol
 763 
 764                     // create dummy channel for receiving messages
 765                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 766                                          endpoint.getClientSocketFactory(),
 767                                          endpoint.getServerSocketFactory());
 768                     ch = new TCPChannel(TCPTransport.this, ep);
 769                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 770 
 771                     // read input messages
 772                     handleMessages(conn, false);
 773                     break;
 774 
 775                 case TransportConstants.StreamProtocol:
 776                     // send ack
 777                     out.writeByte(TransportConstants.ProtocolAck);
 778 
 779                     // suggest endpoint (in case client doesn't know host name)
 780                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 781                         tcpLog.log(Log.VERBOSE, "(port " + port +
 782                             ") " + "suggesting " + remoteHost + ":" +
 783                             remotePort);
 784                     }
 785 
 786                     out.writeUTF(remoteHost);
 787                     out.writeInt(remotePort);
 788                     out.flush();
 789 
 790                     // read and discard (possibly bogus) endpoint
 791                     // REMIND: would be faster to read 2 bytes then skip N+4
 792                     String clientHost = in.readUTF();
 793                     int    clientPort = in.readInt();
 794                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 795                         tcpLog.log(Log.VERBOSE, "(port " + port +
 796                             ") client using " + clientHost + ":" + clientPort);
 797                     }
 798 
 799                     // create dummy channel for receiving messages
 800                     // (why not use clientHost and clientPort?)
 801                     ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
 802                                          endpoint.getClientSocketFactory(),
 803                                          endpoint.getServerSocketFactory());
 804                     ch = new TCPChannel(TCPTransport.this, ep);
 805                     conn = new TCPConnection(ch, socket, bufIn, bufOut);
 806 
 807                     // read input messages
 808                     handleMessages(conn, true);
 809                     break;
 810 
 811                 case TransportConstants.MultiplexProtocol:
 812                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 813                         tcpLog.log(Log.VERBOSE, "(port " + port +
 814                             ") accepting multiplex protocol");
 815                     }
 816 
 817                     // send ack
 818                     out.writeByte(TransportConstants.ProtocolAck);
 819 
 820                     // suggest endpoint (in case client doesn't already have one)
 821                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 822                         tcpLog.log(Log.VERBOSE, "(port " + port +
 823                             ") suggesting " + remoteHost + ":" + remotePort);
 824                     }
 825 
 826                     out.writeUTF(remoteHost);
 827                     out.writeInt(remotePort);
 828                     out.flush();
 829 
 830                     // read endpoint client has decided to use
 831                     ep = new TCPEndpoint(in.readUTF(), in.readInt(),
 832                                          endpoint.getClientSocketFactory(),
 833                                          endpoint.getServerSocketFactory());
 834                     if (tcpLog.isLoggable(Log.VERBOSE)) {
 835                         tcpLog.log(Log.VERBOSE, "(port " +
 836                             port + ") client using " +
 837                             ep.getHost() + ":" + ep.getPort());
 838                     }
 839 
 840                     ConnectionMultiplexer multiplexer;
 841                     synchronized (channelTable) {
 842                         // create or find channel for this endpoint
 843                         ch = getChannel(ep);
 844                         multiplexer =
 845                             new ConnectionMultiplexer(ch, bufIn, sockOut,
 846                                                       false);
 847                         ch.useMultiplexer(multiplexer);
 848                     }
 849                     multiplexer.run();
 850                     break;
 851 
 852                 default:
 853                     // protocol not understood, send nack and close socket
 854                     out.writeByte(TransportConstants.ProtocolNack);
 855                     out.flush();
 856                     break;
 857                 }
 858 
 859             } catch (IOException e) {
 860                 // socket in unknown state: destroy socket
 861                 tcpLog.log(Log.BRIEF, "terminated with exception:", e);
 862             } finally {
 863                 closeSocket(socket);
 864             }
 865         }
 866     }
 867 }