1 /*
   2  * Copyright (c) 2001, 2009, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.Socket;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;
  35 import java.security.AccessController;
  36 import java.security.PrivilegedAction;
  37 import java.util.Collections;
  38 import java.util.Hashtable;
  39 import java.util.HashMap;
  40 import java.util.Map;
  41 
  42 import org.omg.CORBA.COMM_FAILURE;
  43 import org.omg.CORBA.CompletionStatus;
  44 import org.omg.CORBA.DATA_CONVERSION;
  45 import org.omg.CORBA.INTERNAL;
  46 import org.omg.CORBA.MARSHAL;
  47 import org.omg.CORBA.OBJECT_NOT_EXIST;
  48 import org.omg.CORBA.SystemException;
  49 
  50 import com.sun.org.omg.SendingContext.CodeBase;
  51 
  52 import com.sun.corba.se.pept.broker.Broker;
  53 import com.sun.corba.se.pept.encoding.InputObject;
  54 import com.sun.corba.se.pept.encoding.OutputObject;
  55 import com.sun.corba.se.pept.protocol.MessageMediator;
  56 import com.sun.corba.se.pept.transport.Acceptor;
  57 import com.sun.corba.se.pept.transport.Connection;
  58 import com.sun.corba.se.pept.transport.ConnectionCache;
  59 import com.sun.corba.se.pept.transport.ContactInfo;
  60 import com.sun.corba.se.pept.transport.EventHandler;
  61 import com.sun.corba.se.pept.transport.InboundConnectionCache;
  62 import com.sun.corba.se.pept.transport.OutboundConnectionCache;
  63 import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
  64 import com.sun.corba.se.pept.transport.Selector;
  65 
  66 import com.sun.corba.se.spi.ior.IOR;
  67 import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
  68 import com.sun.corba.se.spi.logging.CORBALogDomains;
  69 import com.sun.corba.se.spi.orb.ORB ;
  70 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  71 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  72 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  74 import com.sun.corba.se.spi.transport.CorbaContactInfo;
  75 import com.sun.corba.se.spi.transport.CorbaConnection;
  76 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  77 import com.sun.corba.se.spi.transport.ReadTimeouts;
  78 
  79 import com.sun.corba.se.impl.encoding.CachedCodeBase;
  80 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
  81 import com.sun.corba.se.impl.encoding.CDROutputObject;
  82 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
  83 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
  84 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
  85 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  86 import com.sun.corba.se.impl.orbutil.ORBConstants;
  87 import com.sun.corba.se.impl.orbutil.ORBUtility;
  88 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
  89 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
  90 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
  91 
  92 /**
  93  * @author Harold Carr
  94  */
  95 public class SocketOrChannelConnectionImpl
  96     extends
  97         EventHandlerBase
  98     implements
  99         CorbaConnection,
 100         Work
 101 {
 102     public static boolean dprintWriteLocks = false;
 103 
 104     //
 105     // New transport.
 106     //
 107 
 108     protected long enqueueTime;
 109 
 110     protected SocketChannel socketChannel;
 111     public SocketChannel getSocketChannel()
 112     {
 113         return socketChannel;
 114     }
 115 
 116     // REVISIT:
 117     // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
 118     protected CorbaContactInfo contactInfo;
 119     protected Acceptor acceptor;
 120     protected ConnectionCache connectionCache;
 121 
 122     //
 123     // From iiop.Connection.java
 124     //
 125 
 126     protected Socket socket;    // The socket used for this connection.
 127     protected long timeStamp = 0;
 128     protected boolean isServer = false;
 129 
 130     // Start at some value other than zero since this is a magic
 131     // value in some protocols.
 132     protected int requestId = 5;
 133     protected CorbaResponseWaitingRoom responseWaitingRoom;
 134     protected int state;
 135     protected java.lang.Object stateEvent = new java.lang.Object();
 136     protected java.lang.Object writeEvent = new java.lang.Object();
 137     protected boolean writeLocked;
 138     protected int serverRequestCount = 0;
 139 
 140     // Server request map: used on the server side of Connection
 141     // Maps request ID to IIOPInputStream.
 142     Map serverRequestMap = null;
 143 
 144     // This is a flag associated per connection telling us if the
 145     // initial set of sending contexts were sent to the receiver
 146     // already...
 147     protected boolean postInitialContexts = false;
 148 
 149     // Remote reference to CodeBase server (supplies
 150     // FullValueDescription, among other things)
 151     protected IOR codeBaseServerIOR;
 152 
 153     // CodeBase cache for this connection.  This will cache remote operations,
 154     // handle connecting, and ensure we don't do any remote operations until
 155     // necessary.
 156     protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
 157 
 158     protected ORBUtilSystemException wrapper ;
 159 
 160     // transport read timeout values
 161     protected ReadTimeouts readTimeouts;
 162 
 163     protected boolean shouldReadGiopHeaderOnly;
 164 
 165     // A message mediator used when shouldReadGiopHeaderOnly is
 166     // true to maintain request message state across execution in a
 167     // SelectorThread and WorkerThread.
 168     protected CorbaMessageMediator partialMessageMediator = null;
 169 
 170     // Used in genericRPCMSGFramework test.
 171     protected SocketOrChannelConnectionImpl(ORB orb)
 172     {
 173         this.orb = orb;
 174         wrapper = ORBUtilSystemException.get( orb,
 175             CORBALogDomains.RPC_TRANSPORT ) ;
 176 
 177         setWork(this);
 178         responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
 179         setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
 180     }
 181 
 182     // Both client and servers.
 183     protected SocketOrChannelConnectionImpl(ORB orb,
 184                                             boolean useSelectThreadToWait,
 185                                             boolean useWorkerThread)
 186     {
 187         this(orb) ;
 188         setUseSelectThreadToWait(useSelectThreadToWait);
 189         setUseWorkerThreadForEvent(useWorkerThread);
 190     }
 191 
 192     // Client constructor.
 193     public SocketOrChannelConnectionImpl(ORB orb,
 194                                          CorbaContactInfo contactInfo,
 195                                          boolean useSelectThreadToWait,
 196                                          boolean useWorkerThread,
 197                                          String socketType,
 198                                          String hostname,
 199                                          int port)
 200     {
 201         this(orb, useSelectThreadToWait, useWorkerThread);
 202 
 203         this.contactInfo = contactInfo;
 204 
 205         try {
 206             socket = orb.getORBData().getSocketFactory()
 207                 .createSocket(socketType,
 208                               new InetSocketAddress(hostname, port));
 209             socketChannel = socket.getChannel();
 210 
 211             if (socketChannel != null) {
 212                 boolean isBlocking = !useSelectThreadToWait;
 213                 socketChannel.configureBlocking(isBlocking);
 214             } else {
 215                 // IMPORTANT: non-channel-backed sockets must use
 216                 // dedicated reader threads.
 217                 setUseSelectThreadToWait(false);
 218             }
 219             if (orb.transportDebugFlag) {
 220                 dprint(".initialize: connection created: " + socket);
 221             }
 222         } catch (Throwable t) {
 223             throw wrapper.connectFailure(t, socketType, hostname,
 224                                          Integer.toString(port));
 225         }
 226         state = OPENING;
 227     }
 228 
 229     // Client-side convenience.
 230     public SocketOrChannelConnectionImpl(ORB orb,
 231                                          CorbaContactInfo contactInfo,
 232                                          String socketType,
 233                                          String hostname,
 234                                          int port)
 235     {
 236         this(orb, contactInfo,
 237              orb.getORBData().connectionSocketUseSelectThreadToWait(),
 238              orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
 239              socketType, hostname, port);
 240     }
 241 
 242     // Server-side constructor.
 243     public SocketOrChannelConnectionImpl(ORB orb,
 244                                          Acceptor acceptor,
 245                                          Socket socket,
 246                                          boolean useSelectThreadToWait,
 247                                          boolean useWorkerThread)
 248     {
 249         this(orb, useSelectThreadToWait, useWorkerThread);
 250 
 251         this.socket = socket;
 252         socketChannel = socket.getChannel();
 253         if (socketChannel != null) {
 254             // REVISIT
 255             try {
 256                 boolean isBlocking = !useSelectThreadToWait;
 257                 socketChannel.configureBlocking(isBlocking);
 258             } catch (IOException e) {
 259                 RuntimeException rte = new RuntimeException();
 260                 rte.initCause(e);
 261                 throw rte;
 262             }
 263         }
 264         this.acceptor = acceptor;
 265 
 266         serverRequestMap = Collections.synchronizedMap(new HashMap());
 267         isServer = true;
 268 
 269         state = ESTABLISHED;
 270     }
 271 
 272     // Server-side convenience
 273     public SocketOrChannelConnectionImpl(ORB orb,
 274                                          Acceptor acceptor,
 275                                          Socket socket)
 276     {
 277         this(orb, acceptor, socket,
 278              (socket.getChannel() == null
 279               ? false
 280               : orb.getORBData().connectionSocketUseSelectThreadToWait()),
 281              (socket.getChannel() == null
 282               ? false
 283               : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
 284     }
 285 
 286     ////////////////////////////////////////////////////
 287     //
 288     // framework.transport.Connection
 289     //
 290 
 291     public boolean shouldRegisterReadEvent()
 292     {
 293         return true;
 294     }
 295 
 296     public boolean shouldRegisterServerReadEvent()
 297     {
 298         return true;
 299     }
 300 
 301     public boolean read()
 302     {
 303         try {
 304             if (orb.transportDebugFlag) {
 305                 dprint(".read->: " + this);
 306             }
 307             CorbaMessageMediator messageMediator = readBits();
 308             if (messageMediator != null) {
 309                 // Null can happen when client closes stream
 310                 // causing purgecalls.
 311                 return dispatch(messageMediator);
 312             }
 313             return true;
 314         } finally {
 315             if (orb.transportDebugFlag) {
 316                 dprint(".read<-: " + this);
 317             }
 318         }
 319     }
 320 
 321     protected CorbaMessageMediator readBits()
 322     {
 323         try {
 324 
 325             if (orb.transportDebugFlag) {
 326                 dprint(".readBits->: " + this);
 327             }
 328 
 329             MessageMediator messageMediator;
 330             // REVISIT - use common factory base class.
 331             if (contactInfo != null) {
 332                 messageMediator =
 333                     contactInfo.createMessageMediator(orb, this);
 334             } else if (acceptor != null) {
 335                 messageMediator = acceptor.createMessageMediator(orb, this);
 336             } else {
 337                 throw
 338                     new RuntimeException("SocketOrChannelConnectionImpl.readBits");
 339             }
 340             return (CorbaMessageMediator) messageMediator;
 341 
 342         } catch (ThreadDeath td) {
 343             if (orb.transportDebugFlag) {
 344                 dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
 345             }
 346             try {
 347                 purgeCalls(wrapper.connectionAbort(td), false, false);
 348             } catch (Throwable t) {
 349                 if (orb.transportDebugFlag) {
 350                     dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
 351                 }
 352             }
 353             throw td;
 354         } catch (Throwable ex) {
 355             if (orb.transportDebugFlag) {
 356                 dprint(".readBits: " + this + ": Throwable: " + ex, ex);
 357             }
 358 
 359             try {
 360                 if (ex instanceof INTERNAL) {
 361                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
 362                 }
 363             } catch (IOException e) {
 364                 if (orb.transportDebugFlag) {
 365                     dprint(".readBits: " + this +
 366                            ": sendMessageError: IOException: " + e, e);
 367                 }
 368             }
 369             // REVISIT - make sure reader thread is killed.
 370             orb.getTransportManager().getSelector(0).unregisterForEvent(this);
 371             // Notify anyone waiting.
 372             purgeCalls(wrapper.connectionAbort(ex), true, false);
 373             // REVISIT
 374             //keepRunning = false;
 375             // REVISIT - if this is called after purgeCalls then
 376             // the state of the socket is ABORT so the writeLock
 377             // in close throws an exception.  It is ignored but
 378             // causes IBM (screen scraping) tests to fail.
 379             //close();
 380         } finally {
 381             if (orb.transportDebugFlag) {
 382                 dprint(".readBits<-: " + this);
 383             }
 384         }
 385         return null;
 386     }
 387 
 388     protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
 389     {
 390         try {
 391 
 392             if (orb.transportDebugFlag) {
 393                 dprint(".finishReadingBits->: " + this);
 394             }
 395 
 396             // REVISIT - use common factory base class.
 397             if (contactInfo != null) {
 398                 messageMediator =
 399                     contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
 400             } else if (acceptor != null) {
 401                 messageMediator =
 402                     acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
 403             } else {
 404                 throw
 405                     new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
 406             }
 407             return (CorbaMessageMediator) messageMediator;
 408 
 409         } catch (ThreadDeath td) {
 410             if (orb.transportDebugFlag) {
 411                 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
 412             }
 413             try {
 414                 purgeCalls(wrapper.connectionAbort(td), false, false);
 415             } catch (Throwable t) {
 416                 if (orb.transportDebugFlag) {
 417                     dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
 418                 }
 419             }
 420             throw td;
 421         } catch (Throwable ex) {
 422             if (orb.transportDebugFlag) {
 423                 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
 424             }
 425 
 426             try {
 427                 if (ex instanceof INTERNAL) {
 428                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
 429                 }
 430             } catch (IOException e) {
 431                 if (orb.transportDebugFlag) {
 432                     dprint(".finishReadingBits: " + this +
 433                            ": sendMessageError: IOException: " + e, e);
 434                 }
 435             }
 436             // REVISIT - make sure reader thread is killed.
 437             orb.getTransportManager().getSelector(0).unregisterForEvent(this);
 438             // Notify anyone waiting.
 439             purgeCalls(wrapper.connectionAbort(ex), true, false);
 440             // REVISIT
 441             //keepRunning = false;
 442             // REVISIT - if this is called after purgeCalls then
 443             // the state of the socket is ABORT so the writeLock
 444             // in close throws an exception.  It is ignored but
 445             // causes IBM (screen scraping) tests to fail.
 446             //close();
 447         } finally {
 448             if (orb.transportDebugFlag) {
 449                 dprint(".finishReadingBits<-: " + this);
 450             }
 451         }
 452         return null;
 453     }
 454 
 455     protected boolean dispatch(CorbaMessageMediator messageMediator)
 456     {
 457         try {
 458             if (orb.transportDebugFlag) {
 459                 dprint(".dispatch->: " + this);
 460             }
 461 
 462             //
 463             // NOTE:
 464             //
 465             // This call is the transition from the tranport block
 466             // to the protocol block.
 467             //
 468 
 469             boolean result =
 470                 messageMediator.getProtocolHandler()
 471                 .handleRequest(messageMediator);
 472 
 473             return result;
 474 
 475         } catch (ThreadDeath td) {
 476             if (orb.transportDebugFlag) {
 477                 dprint(".dispatch: ThreadDeath", td );
 478             }
 479             try {
 480                 purgeCalls(wrapper.connectionAbort(td), false, false);
 481             } catch (Throwable t) {
 482                 if (orb.transportDebugFlag) {
 483                     dprint(".dispatch: purgeCalls: Throwable", t);
 484                 }
 485             }
 486             throw td;
 487         } catch (Throwable ex) {
 488             if (orb.transportDebugFlag) {
 489                 dprint(".dispatch: Throwable", ex ) ;
 490             }
 491 
 492             try {
 493                 if (ex instanceof INTERNAL) {
 494                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
 495                 }
 496             } catch (IOException e) {
 497                 if (orb.transportDebugFlag) {
 498                     dprint(".dispatch: sendMessageError: IOException", e);
 499                 }
 500             }
 501             purgeCalls(wrapper.connectionAbort(ex), false, false);
 502             // REVISIT
 503             //keepRunning = false;
 504         } finally {
 505             if (orb.transportDebugFlag) {
 506                 dprint(".dispatch<-: " + this);
 507             }
 508         }
 509 
 510         return true;
 511     }
 512 
 513     public boolean shouldUseDirectByteBuffers()
 514     {
 515         return getSocketChannel() != null;
 516     }
 517 
 518     public ByteBuffer read(int size, int offset, int length, long max_wait_time)
 519         throws IOException
 520     {
 521         if (shouldUseDirectByteBuffers()) {
 522 
 523             ByteBuffer byteBuffer =
 524                 orb.getByteBufferPool().getByteBuffer(size);
 525 
 526             if (orb.transportDebugFlag) {
 527                 // print address of ByteBuffer gotten from pool
 528                 int bbAddress = System.identityHashCode(byteBuffer);
 529                 StringBuffer sb = new StringBuffer(80);
 530                 sb.append(".read: got ByteBuffer id (");
 531                 sb.append(bbAddress).append(") from ByteBufferPool.");
 532                 String msgStr = sb.toString();
 533                 dprint(msgStr);
 534             }
 535 
 536             byteBuffer.position(offset);
 537             byteBuffer.limit(size);
 538 
 539             readFully(byteBuffer, length, max_wait_time);
 540 
 541             return byteBuffer;
 542         }
 543 
 544         byte[] buf = new byte[size];
 545         readFully(getSocket().getInputStream(), buf,
 546                   offset, length, max_wait_time);
 547         ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
 548         byteBuffer.limit(size);
 549         return byteBuffer;
 550     }
 551 
 552     public ByteBuffer read(ByteBuffer byteBuffer, int offset,
 553                            int length, long max_wait_time)
 554         throws IOException
 555     {
 556         int size = offset + length;
 557         if (shouldUseDirectByteBuffers()) {
 558 
 559             if (! byteBuffer.isDirect()) {
 560                 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
 561             }
 562             if (size > byteBuffer.capacity()) {
 563                 if (orb.transportDebugFlag) {
 564                     // print address of ByteBuffer being released
 565                     int bbAddress = System.identityHashCode(byteBuffer);
 566                     StringBuffer bbsb = new StringBuffer(80);
 567                     bbsb.append(".read: releasing ByteBuffer id (")
 568                         .append(bbAddress).append(") to ByteBufferPool.");
 569                     String bbmsg = bbsb.toString();
 570                     dprint(bbmsg);
 571                 }
 572                 orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
 573                 byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
 574             }
 575             byteBuffer.position(offset);
 576             byteBuffer.limit(size);
 577             readFully(byteBuffer, length, max_wait_time);
 578             byteBuffer.position(0);
 579             byteBuffer.limit(size);
 580             return byteBuffer;
 581         }
 582         if (byteBuffer.isDirect()) {
 583             throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
 584         }
 585         byte[] buf = new byte[size];
 586         readFully(getSocket().getInputStream(), buf,
 587                   offset, length, max_wait_time);
 588         return ByteBuffer.wrap(buf);
 589     }
 590 
 591     public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
 592         throws IOException
 593     {
 594         int n = 0;
 595         int bytecount = 0;
 596         long time_to_wait = readTimeouts.get_initial_time_to_wait();
 597         long total_time_in_wait = 0;
 598 
 599         // The reading of data incorporates a strategy to detect a
 600         // rogue client. The strategy is implemented as follows. As
 601         // long as data is being read, at least 1 byte or more, we
 602         // assume we have a well behaved client. If no data is read,
 603         // then we sleep for a time to wait, re-calculate a new time to
 604         // wait which is lengthier than the previous time spent waiting.
 605         // Then, if the total time spent waiting does not exceed a
 606         // maximum time we are willing to wait, we attempt another
 607         // read. If the maximum amount of time we are willing to
 608         // spend waiting for more data is exceeded, we throw an
 609         // IOException.
 610 
 611         // NOTE: Reading of GIOP headers are treated with a smaller
 612         //       maximum time to wait threshold. Based on extensive
 613         //       performance testing, all GIOP headers are being
 614         //       read in 1 read access.
 615 
 616         do {
 617             bytecount = getSocketChannel().read(byteBuffer);
 618 
 619             if (bytecount < 0) {
 620                 throw new IOException("End-of-stream");
 621             }
 622             else if (bytecount == 0) {
 623                 try {
 624                     Thread.sleep(time_to_wait);
 625                     total_time_in_wait += time_to_wait;
 626                     time_to_wait =
 627                         (long)(time_to_wait*readTimeouts.get_backoff_factor());
 628                 }
 629                 catch (InterruptedException ie) {
 630                     // ignore exception
 631                     if (orb.transportDebugFlag) {
 632                         dprint("readFully(): unexpected exception "
 633                                 + ie.toString());
 634                     }
 635                 }
 636             }
 637             else {
 638                 n += bytecount;
 639             }
 640         }
 641         while (n < size && total_time_in_wait < max_wait_time);
 642 
 643         if (n < size && total_time_in_wait >= max_wait_time)
 644         {
 645             // failed to read entire message
 646             throw wrapper.transportReadTimeoutExceeded(new Integer(size),
 647                                       new Integer(n), new Long(max_wait_time),
 648                                       new Long(total_time_in_wait));
 649         }
 650 
 651         getConnectionCache().stampTime(this);
 652     }
 653 
 654     // To support non-channel connections.
 655     public void readFully(java.io.InputStream is, byte[] buf,
 656                           int offset, int size, long max_wait_time)
 657         throws IOException
 658     {
 659         int n = 0;
 660         int bytecount = 0;
 661         long time_to_wait = readTimeouts.get_initial_time_to_wait();
 662         long total_time_in_wait = 0;
 663 
 664         // The reading of data incorporates a strategy to detect a
 665         // rogue client. The strategy is implemented as follows. As
 666         // long as data is being read, at least 1 byte or more, we
 667         // assume we have a well behaved client. If no data is read,
 668         // then we sleep for a time to wait, re-calculate a new time to
 669         // wait which is lengthier than the previous time spent waiting.
 670         // Then, if the total time spent waiting does not exceed a
 671         // maximum time we are willing to wait, we attempt another
 672         // read. If the maximum amount of time we are willing to
 673         // spend waiting for more data is exceeded, we throw an
 674         // IOException.
 675 
 676         // NOTE: Reading of GIOP headers are treated with a smaller
 677         //       maximum time to wait threshold. Based on extensive
 678         //       performance testing, all GIOP headers are being
 679         //       read in 1 read access.
 680 
 681         do {
 682             bytecount = is.read(buf, offset + n, size - n);
 683             if (bytecount < 0) {
 684                 throw new IOException("End-of-stream");
 685             }
 686             else if (bytecount == 0) {
 687                 try {
 688                     Thread.sleep(time_to_wait);
 689                     total_time_in_wait += time_to_wait;
 690                     time_to_wait =
 691                         (long)(time_to_wait*readTimeouts.get_backoff_factor());
 692                 }
 693                 catch (InterruptedException ie) {
 694                     // ignore exception
 695                     if (orb.transportDebugFlag) {
 696                         dprint("readFully(): unexpected exception "
 697                                 + ie.toString());
 698                     }
 699                 }
 700             }
 701             else {
 702                 n += bytecount;
 703             }
 704         }
 705         while (n < size && total_time_in_wait < max_wait_time);
 706 
 707         if (n < size && total_time_in_wait >= max_wait_time)
 708         {
 709             // failed to read entire message
 710             throw wrapper.transportReadTimeoutExceeded(new Integer(size),
 711                                       new Integer(n), new Long(max_wait_time),
 712                                       new Long(total_time_in_wait));
 713         }
 714 
 715         getConnectionCache().stampTime(this);
 716     }
 717 
 718     public void write(ByteBuffer byteBuffer)
 719         throws IOException
 720     {
 721         if (shouldUseDirectByteBuffers()) {
 722             /* NOTE: cannot perform this test.  If one ask for a
 723                ByteBuffer from the pool which is bigger than the size
 724                of ByteBuffers managed by the pool, then the pool will
 725                return a HeapByteBuffer.
 726             if (byteBuffer.hasArray()) {
 727                 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
 728             }
 729             */
 730             // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
 731             //            all bytes are written on first write attempt.
 732             do {
 733                 getSocketChannel().write(byteBuffer);
 734             }
 735             while (byteBuffer.hasRemaining());
 736 
 737         } else {
 738             if (! byteBuffer.hasArray()) {
 739                 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
 740             }
 741             byte[] tmpBuf = byteBuffer.array();
 742             getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
 743             getSocket().getOutputStream().flush();
 744         }
 745 
 746         // TimeStamp connection to indicate it has been used
 747         // Note granularity of connection usage is assumed for
 748         // now to be that of a IIOP packet.
 749         getConnectionCache().stampTime(this);
 750     }
 751 
 752     /**
 753      * Note:it is possible for this to be called more than once
 754      */
 755     public synchronized void close()
 756     {
 757         try {
 758             if (orb.transportDebugFlag) {
 759                 dprint(".close->: " + this);
 760             }
 761             writeLock();
 762 
 763             // REVISIT It will be good to have a read lock on the reader thread
 764             // before we proceed further, to avoid the reader thread (server side)
 765             // from processing requests. This avoids the risk that a new request
 766             // will be accepted by ReaderThread while the ListenerThread is
 767             // attempting to close this connection.
 768 
 769             if (isBusy()) { // we are busy!
 770                 writeUnlock();
 771                 if (orb.transportDebugFlag) {
 772                     dprint(".close: isBusy so no close: " + this);
 773                 }
 774                 return;
 775             }
 776 
 777             try {
 778                 try {
 779                     sendCloseConnection(GIOPVersion.V1_0);
 780                 } catch (Throwable t) {
 781                     wrapper.exceptionWhenSendingCloseConnection(t);
 782                 }
 783 
 784                 synchronized ( stateEvent ){
 785                     state = CLOSE_SENT;
 786                     stateEvent.notifyAll();
 787                 }
 788 
 789                 // stop the reader without causing it to do purgeCalls
 790                 //Exception ex = new Exception();
 791                 //reader.stop(ex); // REVISIT
 792 
 793                 // NOTE: !!!!!!
 794                 // This does writeUnlock().
 795                 purgeCalls(wrapper.connectionRebind(), false, true);
 796 
 797             } catch (Exception ex) {
 798                 if (orb.transportDebugFlag) {
 799                     dprint(".close: exception: " + this, ex);
 800                 }
 801             }
 802             try {
 803                 Selector selector = orb.getTransportManager().getSelector(0);
 804                 selector.unregisterForEvent(this);
 805                 if (socketChannel != null) {
 806                     socketChannel.close();
 807                 }
 808                 socket.close();
 809             } catch (IOException e) {
 810                 if (orb.transportDebugFlag) {
 811                     dprint(".close: " + this, e);
 812                 }
 813             }
 814             closeConnectionResources();
 815         } finally {
 816             if (orb.transportDebugFlag) {
 817                 dprint(".close<-: " + this);
 818             }
 819         }
 820     }
 821 
 822     public void closeConnectionResources() {
 823            if (orb.transportDebugFlag) {
 824                dprint(".closeConnectionResources->: " + this);
 825            }
 826            Selector selector = orb.getTransportManager().getSelector(0);
 827            selector.unregisterForEvent(this);
 828            try {
 829              if (socketChannel != null)
 830               socketChannel.close() ;
 831                 if (socket != null && !socket.isClosed())
 832                 socket.close() ;
 833            } catch (IOException e) {
 834              if (orb.transportDebugFlag) {
 835                  dprint( ".closeConnectionResources: " + this, e ) ;
 836              }
 837            }
 838            if (orb.transportDebugFlag) {
 839                dprint(".closeConnectionResources<-: " + this);
 840            }
 841      }
 842 
 843 
 844     public Acceptor getAcceptor()
 845     {
 846         return acceptor;
 847     }
 848 
 849     public ContactInfo getContactInfo()
 850     {
 851         return contactInfo;
 852     }
 853 
 854     public EventHandler getEventHandler()
 855     {
 856         return this;
 857     }
 858 
 859     public OutputObject createOutputObject(MessageMediator messageMediator)
 860     {
 861         // REVISIT - remove this method from Connection and all it subclasses.
 862         throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
 863     }
 864 
 865     // This is used by the GIOPOutputObject in order to
 866     // throw the correct error when handling code sets.
 867     // Can we determine if we are on the server side by
 868     // other means?  XREVISIT
 869     public boolean isServer()
 870     {
 871         return isServer;
 872     }
 873 
 874     public boolean isBusy()
 875     {
 876         if (serverRequestCount > 0 ||
 877             getResponseWaitingRoom().numberRegistered() > 0)
 878         {
 879             return true;
 880         } else {
 881             return false;
 882         }
 883     }
 884 
 885     public long getTimeStamp()
 886     {
 887         return timeStamp;
 888     }
 889 
 890     public void setTimeStamp(long time)
 891     {
 892         timeStamp = time;
 893     }
 894 
 895     public void setState(String stateString)
 896     {
 897         synchronized (stateEvent) {
 898             if (stateString.equals("ESTABLISHED")) {
 899                 state =  ESTABLISHED;
 900                 stateEvent.notifyAll();
 901             } else {
 902                 // REVISIT: ASSERT
 903             }
 904         }
 905     }
 906 
 907     /**
 908      * Sets the writeLock for this connection.
 909      * If the writeLock is already set by someone else, block till the
 910      * writeLock is released and can set by us.
 911      * IMPORTANT: this connection's lock must be acquired before
 912      * setting the writeLock and must be unlocked after setting the writeLock.
 913      */
 914     public void writeLock()
 915     {
 916       try {
 917         if (dprintWriteLocks && orb.transportDebugFlag) {
 918             dprint(".writeLock->: " + this);
 919         }
 920         // Keep looping till we can set the writeLock.
 921         while ( true ) {
 922             int localState = state;
 923             switch ( localState ) {
 924 
 925             case OPENING:
 926                 synchronized (stateEvent) {
 927                     if (state != OPENING) {
 928                         // somebody has changed 'state' so be careful
 929                         break;
 930                     }
 931                     try {
 932                         stateEvent.wait();
 933                     } catch (InterruptedException ie) {
 934                         if (orb.transportDebugFlag) {
 935                             dprint(".writeLock: OPENING InterruptedException: " + this);
 936                         }
 937                     }
 938                 }
 939                 // Loop back
 940                 break;
 941 
 942             case ESTABLISHED:
 943                 synchronized (writeEvent) {
 944                     if (!writeLocked) {
 945                         writeLocked = true;
 946                         return;
 947                     }
 948 
 949                     try {
 950                         // do not stay here too long if state != ESTABLISHED
 951                         // Bug 4752117
 952                         while (state == ESTABLISHED && writeLocked) {
 953                             writeEvent.wait(100);
 954                         }
 955                     } catch (InterruptedException ie) {
 956                         if (orb.transportDebugFlag) {
 957                             dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
 958                         }
 959                     }
 960                 }
 961                 // Loop back
 962                 break;
 963 
 964                 //
 965                 // XXX
 966                 // Need to distinguish between client and server roles
 967                 // here probably.
 968                 //
 969             case ABORT:
 970                 synchronized ( stateEvent ){
 971                     if (state != ABORT) {
 972                         break;
 973                     }
 974                     throw wrapper.writeErrorSend() ;
 975                 }
 976 
 977             case CLOSE_RECVD:
 978                 // the connection has been closed or closing
 979                 // ==> throw rebind exception
 980                 synchronized ( stateEvent ){
 981                     if (state != CLOSE_RECVD) {
 982                         break;
 983                     }
 984                     throw wrapper.connectionCloseRebind() ;
 985                 }
 986 
 987             default:
 988                 if (orb.transportDebugFlag) {
 989                     dprint(".writeLock: default: " + this);
 990                 }
 991                 // REVISIT
 992                 throw new RuntimeException(".writeLock: bad state");
 993             }
 994         }
 995       } finally {
 996         if (dprintWriteLocks && orb.transportDebugFlag) {
 997             dprint(".writeLock<-: " + this);
 998         }
 999       }
1000     }
1001 
1002     public void writeUnlock()
1003     {
1004         try {
1005             if (dprintWriteLocks && orb.transportDebugFlag) {
1006                 dprint(".writeUnlock->: " + this);
1007             }
1008             synchronized (writeEvent) {
1009                 writeLocked = false;
1010                 writeEvent.notify(); // wake up one guy waiting to write
1011             }
1012         } finally {
1013             if (dprintWriteLocks && orb.transportDebugFlag) {
1014                 dprint(".writeUnlock<-: " + this);
1015             }
1016         }
1017     }
1018 
1019     // Assumes the caller handles writeLock and writeUnlock
1020     public void sendWithoutLock(OutputObject outputObject)
1021     {
1022         // Don't we need to check for CloseConnection
1023         // here?  REVISIT
1024 
1025         // XREVISIT - Shouldn't the MessageMediator
1026         // be the one to handle writing the data here?
1027 
1028         try {
1029 
1030             // Write the fragment/message
1031 
1032             CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
1033             cdrOutputObject.writeTo(this);
1034             // REVISIT - no flush?
1035             //socket.getOutputStream().flush();
1036 
1037         } catch (IOException e1) {
1038 
1039             /*
1040              * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
1041              * sending a CancelRequest for regular requests / locate requests
1042              */
1043 
1044             // Since IIOPOutputStream's msgheader is set only once, and not
1045             // altered during sending multiple fragments, the original
1046             // msgheader will always have the requestId.
1047             // REVISIT This could be optimized to send a CancelRequest only
1048             // if any fragments had been sent already.
1049 
1050             /* REVISIT: MOVE TO SUBCONTRACT
1051             Message msg = os.getMessage();
1052             if (msg.getType() == Message.GIOPRequest ||
1053                     msg.getType() == Message.GIOPLocateRequest) {
1054                 GIOPVersion requestVersion = msg.getGIOPVersion();
1055                 int requestId = MessageBase.getRequestId(msg);
1056                 try {
1057                     sendCancelRequest(requestVersion, requestId);
1058                 } catch (IOException e2) {
1059                     // most likely an abortive connection closure.
1060                     // ignore, since nothing more can be done.
1061                     if (orb.transportDebugFlag) {
1062 
1063                 }
1064             }
1065             */
1066 
1067             // REVISIT When a send failure happens, purgeCalls() need to be
1068             // called to ensure that the connection is properly removed from
1069             // further usage (ie., cancelling pending requests with COMM_FAILURE
1070             // with an appropriate minor_code CompletionStatus.MAY_BE).
1071 
1072             // Relying on the IIOPOutputStream (as noted below) is not
1073             // sufficient as it handles COMM_FAILURE only for the final
1074             // fragment (during invoke processing). Note that COMM_FAILURE could
1075             // happen while sending the initial fragments.
1076             // Also the IIOPOutputStream does not properly close the connection.
1077             // It simply removes the connection from the table. An orderly
1078             // closure is needed (ie., cancel pending requests on the connection
1079             // COMM_FAILURE as well.
1080 
1081             // IIOPOutputStream will cleanup the connection info when it
1082             // sees this exception.
1083             SystemException exc = wrapper.writeErrorSend(e1);
1084             purgeCalls(exc, false, true);
1085             throw exc;
1086         }
1087     }
1088 
1089     public void registerWaiter(MessageMediator messageMediator)
1090     {
1091         responseWaitingRoom.registerWaiter(messageMediator);
1092     }
1093 
1094     public void unregisterWaiter(MessageMediator messageMediator)
1095     {
1096         responseWaitingRoom.unregisterWaiter(messageMediator);
1097     }
1098 
1099     public InputObject waitForResponse(MessageMediator messageMediator)
1100     {
1101         return responseWaitingRoom.waitForResponse(messageMediator);
1102     }
1103 
1104     public void setConnectionCache(ConnectionCache connectionCache)
1105     {
1106         this.connectionCache = connectionCache;
1107     }
1108 
1109     public ConnectionCache getConnectionCache()
1110     {
1111         return connectionCache;
1112     }
1113 
1114     ////////////////////////////////////////////////////
1115     //
1116     // EventHandler methods
1117     //
1118 
1119     public void setUseSelectThreadToWait(boolean x)
1120     {
1121         useSelectThreadToWait = x;
1122         // REVISIT - Reading of a GIOP header only is information
1123         //           that should be passed into the constructor
1124         //           from the SocketOrChannelConnection factory.
1125         setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
1126     }
1127 
1128     public void handleEvent()
1129     {
1130         if (orb.transportDebugFlag) {
1131             dprint(".handleEvent->: " + this);
1132         }
1133         getSelectionKey().interestOps(getSelectionKey().interestOps() &
1134                                       (~ getInterestOps()));
1135 
1136         if (shouldUseWorkerThreadForEvent()) {
1137             Throwable throwable = null;
1138             try {
1139                 int poolToUse = 0;
1140                 if (shouldReadGiopHeaderOnly()) {
1141                     partialMessageMediator = readBits();
1142                     poolToUse =
1143                         partialMessageMediator.getThreadPoolToUse();
1144                 }
1145 
1146                 if (orb.transportDebugFlag) {
1147                     dprint(".handleEvent: addWork to pool: " + poolToUse);
1148                 }
1149                 orb.getThreadPoolManager().getThreadPool(poolToUse)
1150                     .getWorkQueue(0).addWork(getWork());
1151             } catch (NoSuchThreadPoolException e) {
1152                 throwable = e;
1153             } catch (NoSuchWorkQueueException e) {
1154                 throwable = e;
1155             }
1156             // REVISIT: need to close connection.
1157             if (throwable != null) {
1158                 if (orb.transportDebugFlag) {
1159                     dprint(".handleEvent: " + throwable);
1160                 }
1161                 INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
1162                 i.initCause(throwable);
1163                 throw i;
1164             }
1165         } else {
1166             if (orb.transportDebugFlag) {
1167                 dprint(".handleEvent: doWork");
1168             }
1169             getWork().doWork();
1170         }
1171         if (orb.transportDebugFlag) {
1172             dprint(".handleEvent<-: " + this);
1173         }
1174     }
1175 
1176     public SelectableChannel getChannel()
1177     {
1178         return socketChannel;
1179     }
1180 
1181     public int getInterestOps()
1182     {
1183         return SelectionKey.OP_READ;
1184     }
1185 
1186     //    public Acceptor getAcceptor() - already defined above.
1187 
1188     public Connection getConnection()
1189     {
1190         return this;
1191     }
1192 
1193     ////////////////////////////////////////////////////
1194     //
1195     // Work methods.
1196     //
1197 
1198     public String getName()
1199     {
1200         return this.toString();
1201     }
1202 
1203     public void doWork()
1204     {
1205         try {
1206             if (orb.transportDebugFlag) {
1207                 dprint(".doWork->: " + this);
1208             }
1209 
1210             // IMPORTANT: Sanity checks on SelectionKeys such as
1211             //            SelectorKey.isValid() should not be done
1212             //            here.
1213             //
1214 
1215             if (!shouldReadGiopHeaderOnly()) {
1216                 read();
1217             }
1218             else {
1219                 // get the partialMessageMediator
1220                 // created by SelectorThread
1221                 CorbaMessageMediator messageMediator =
1222                                          this.getPartialMessageMediator();
1223 
1224                 // read remaining info needed in a MessageMediator
1225                 messageMediator = finishReadingBits(messageMediator);
1226 
1227                 if (messageMediator != null) {
1228                     // Null can happen when client closes stream
1229                     // causing purgecalls.
1230                     dispatch(messageMediator);
1231                 }
1232             }
1233         } catch (Throwable t) {
1234             if (orb.transportDebugFlag) {
1235                 dprint(".doWork: ignoring Throwable: "
1236                        + t
1237                        + " " + this);
1238             }
1239         } finally {
1240             if (orb.transportDebugFlag) {
1241                 dprint(".doWork<-: " + this);
1242             }
1243         }
1244     }
1245 
1246     public void setEnqueueTime(long timeInMillis)
1247     {
1248         enqueueTime = timeInMillis;
1249     }
1250 
1251     public long getEnqueueTime()
1252     {
1253         return enqueueTime;
1254     }
1255 
1256     ////////////////////////////////////////////////////
1257     //
1258     // spi.transport.CorbaConnection.
1259     //
1260 
1261     // IMPORTANT: Reader Threads must NOT read Giop header only.
1262     public boolean shouldReadGiopHeaderOnly() {
1263         return shouldReadGiopHeaderOnly;
1264     }
1265 
1266     protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
1267         shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
1268     }
1269 
1270     public ResponseWaitingRoom getResponseWaitingRoom()
1271     {
1272         return responseWaitingRoom;
1273     }
1274 
1275     // REVISIT - inteface defines isServer but already defined in
1276     // higher interface.
1277 
1278     public void serverRequestMapPut(int requestId,
1279                                     CorbaMessageMediator messageMediator)
1280     {
1281         serverRequestMap.put(new Integer(requestId), messageMediator);
1282     }
1283 
1284     public CorbaMessageMediator serverRequestMapGet(int requestId)
1285     {
1286         return (CorbaMessageMediator)
1287             serverRequestMap.get(new Integer(requestId));
1288     }
1289 
1290     public void serverRequestMapRemove(int requestId)
1291     {
1292         serverRequestMap.remove(new Integer(requestId));
1293     }
1294 
1295 
1296     // REVISIT: this is also defined in:
1297     // com.sun.corba.se.spi.legacy.connection.Connection
1298     public java.net.Socket getSocket()
1299     {
1300         return socket;
1301     }
1302 
1303     /** It is possible for a Close Connection to have been
1304      ** sent here, but we will not check for this. A "lazy"
1305      ** Exception will be thrown in the Worker thread after the
1306      ** incoming request has been processed even though the connection
1307      ** is closed before the request is processed. This is o.k because
1308      ** it is a boundary condition. To prevent it we would have to add
1309      ** more locks which would reduce performance in the normal case.
1310      **/
1311     public synchronized void serverRequestProcessingBegins()
1312     {
1313         serverRequestCount++;
1314     }
1315 
1316     public synchronized void serverRequestProcessingEnds()
1317     {
1318         serverRequestCount--;
1319     }
1320 
1321     //
1322     //
1323     //
1324 
1325     public synchronized int getNextRequestId()
1326     {
1327         return requestId++;
1328     }
1329 
1330     // Negotiated code sets for char and wchar data
1331     protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
1332 
1333     public ORB getBroker()
1334     {
1335         return orb;
1336     }
1337 
1338     public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
1339         // Needs to be synchronized for the following case when the client
1340         // doesn't send the code set context twice, and we have two threads
1341         // in ServerRequestDispatcher processCodeSetContext.
1342         //
1343         // Thread A checks to see if there is a context, there is none, so
1344         //     it calls setCodeSetContext, getting the synch lock.
1345         // Thread B checks to see if there is a context.  If we didn't synch,
1346         //     it might decide to outlaw wchar/wstring.
1347         if (codeSetContext == null) {
1348             synchronized(this) {
1349                 return codeSetContext;
1350             }
1351         }
1352 
1353         return codeSetContext;
1354     }
1355 
1356     public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
1357         // Double check whether or not we need to do this
1358         if (codeSetContext == null) {
1359 
1360             if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
1361                 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
1362                 // If the client says it's negotiated a code set that
1363                 // isn't a fallback and we never said we support, then
1364                 // it has a bug.
1365                 throw wrapper.badCodesetsFromClient() ;
1366             }
1367 
1368             codeSetContext = csc;
1369         }
1370     }
1371 
1372     //
1373     // from iiop.IIOPConnection.java
1374     //
1375 
1376     // Map request ID to an InputObject.
1377     // This is so the client thread can start unmarshaling
1378     // the reply and remove it from the out_calls map while the
1379     // ReaderThread can still obtain the input stream to give
1380     // new fragments.  Only the ReaderThread touches the clientReplyMap,
1381     // so it doesn't incur synchronization overhead.
1382 
1383     public MessageMediator clientRequestMapGet(int requestId)
1384     {
1385         return responseWaitingRoom.getMessageMediator(requestId);
1386     }
1387 
1388     protected MessageMediator clientReply_1_1;
1389 
1390     public void clientReply_1_1_Put(MessageMediator x)
1391     {
1392         clientReply_1_1 = x;
1393     }
1394 
1395     public MessageMediator clientReply_1_1_Get()
1396     {
1397         return  clientReply_1_1;
1398     }
1399 
1400     public void clientReply_1_1_Remove()
1401     {
1402         clientReply_1_1 = null;
1403     }
1404 
1405     protected MessageMediator serverRequest_1_1;
1406 
1407     public void serverRequest_1_1_Put(MessageMediator x)
1408     {
1409         serverRequest_1_1 = x;
1410     }
1411 
1412     public MessageMediator serverRequest_1_1_Get()
1413     {
1414         return  serverRequest_1_1;
1415     }
1416 
1417     public void serverRequest_1_1_Remove()
1418     {
1419         serverRequest_1_1 = null;
1420     }
1421 
1422     protected String getStateString( int state )
1423     {
1424         synchronized ( stateEvent ){
1425             switch (state) {
1426             case OPENING : return "OPENING" ;
1427             case ESTABLISHED : return "ESTABLISHED" ;
1428             case CLOSE_SENT : return "CLOSE_SENT" ;
1429             case CLOSE_RECVD : return "CLOSE_RECVD" ;
1430             case ABORT : return "ABORT" ;
1431             default : return "???" ;
1432             }
1433         }
1434     }
1435 
1436     public synchronized boolean isPostInitialContexts() {
1437         return postInitialContexts;
1438     }
1439 
1440     // Can never be unset...
1441     public synchronized void setPostInitialContexts(){
1442         postInitialContexts = true;
1443     }
1444 
1445     /**
1446      * Wake up the outstanding requests on the connection, and hand them
1447      * COMM_FAILURE exception with a given minor code.
1448      *
1449      * Also, delete connection from connection table and
1450      * stop the reader thread.
1451 
1452      * Note that this should only ever be called by the Reader thread for
1453      * this connection.
1454      *
1455      * @param minor_code The minor code for the COMM_FAILURE major code.
1456      * @param die Kill the reader thread (this thread) before exiting.
1457      */
1458     public void purgeCalls(SystemException systemException,
1459                            boolean die, boolean lockHeld)
1460     {
1461         int minor_code = systemException.minor;
1462 
1463         try{
1464             if (orb.transportDebugFlag) {
1465                 dprint(".purgeCalls->: "
1466                        + minor_code + "/" + die + "/" + lockHeld
1467                        + " " + this);
1468             }
1469 
1470             // If this invocation is a result of ThreadDeath caused
1471             // by a previous execution of this routine, just exit.
1472 
1473             synchronized ( stateEvent ){
1474                 if ((state == ABORT) || (state == CLOSE_RECVD)) {
1475                     if (orb.transportDebugFlag) {
1476                         dprint(".purgeCalls: exiting since state is: "
1477                                + getStateString(state)
1478                                + " " + this);
1479                     }
1480                     return;
1481                 }
1482             }
1483 
1484             // Grab the writeLock (freeze the calls)
1485             try {
1486                 if (!lockHeld) {
1487                     writeLock();
1488                 }
1489             } catch (SystemException ex) {
1490                 if (orb.transportDebugFlag)
1491                     dprint(".purgeCalls: SystemException" + ex
1492                            + "; continuing " + this);
1493             }
1494 
1495             // Mark the state of the connection
1496             // and determine the request status
1497             org.omg.CORBA.CompletionStatus completion_status;
1498             synchronized ( stateEvent ){
1499                 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
1500                     state = CLOSE_RECVD;
1501                     systemException.completed = CompletionStatus.COMPLETED_NO;
1502                 } else {
1503                     state = ABORT;
1504                     systemException.completed = CompletionStatus.COMPLETED_MAYBE;
1505                 }
1506                 stateEvent.notifyAll();
1507             }
1508 
1509             try {
1510                 socket.getInputStream().close();
1511                 socket.getOutputStream().close();
1512                 socket.close();
1513             } catch (Exception ex) {
1514                 if (orb.transportDebugFlag) {
1515                     dprint(".purgeCalls: Exception closing socket: " + ex
1516                            + " " + this);
1517                 }
1518             }
1519 
1520             // Signal all threads with outstanding requests on this
1521             // connection and give them the SystemException;
1522 
1523             responseWaitingRoom.signalExceptionToAllWaiters(systemException);
1524 
1525             if (contactInfo != null) {
1526                 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
1527             } else if (acceptor != null) {
1528                 ((InboundConnectionCache)getConnectionCache()).remove(this);
1529             }
1530 
1531             //
1532             // REVISIT: Stop the reader thread
1533             //
1534 
1535             // Signal all the waiters of the writeLock.
1536             // There are 4 types of writeLock waiters:
1537             // 1. Send waiters:
1538             // 2. SendReply waiters:
1539             // 3. cleanUp waiters:
1540             // 4. purge_call waiters:
1541             //
1542 
1543             writeUnlock();
1544 
1545         } finally {
1546             if (orb.transportDebugFlag) {
1547                 dprint(".purgeCalls<-: "
1548                        + minor_code + "/" + die + "/" + lockHeld
1549                        + " " + this);
1550             }
1551         }
1552     }
1553 
1554     /*************************************************************************
1555     * The following methods are for dealing with Connection cleaning for
1556     * better scalability of servers in high network load conditions.
1557     **************************************************************************/
1558 
1559     public void sendCloseConnection(GIOPVersion giopVersion)
1560         throws IOException
1561     {
1562         Message msg = MessageBase.createCloseConnection(giopVersion);
1563         sendHelper(giopVersion, msg);
1564     }
1565 
1566     public void sendMessageError(GIOPVersion giopVersion)
1567         throws IOException
1568     {
1569         Message msg = MessageBase.createMessageError(giopVersion);
1570         sendHelper(giopVersion, msg);
1571     }
1572 
1573     /**
1574      * Send a CancelRequest message. This does not lock the connection, so the
1575      * caller needs to ensure this method is called appropriately.
1576      * @exception IOException - could be due to abortive connection closure.
1577      */
1578     public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
1579         throws IOException
1580     {
1581 
1582         Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
1583         sendHelper(giopVersion, msg);
1584     }
1585 
1586     protected void sendHelper(GIOPVersion giopVersion, Message msg)
1587         throws IOException
1588     {
1589         // REVISIT: See comments in CDROutputObject constructor.
1590         CDROutputObject outputObject =
1591             new CDROutputObject((ORB)orb, null, giopVersion, this, msg,
1592                                 ORBConstants.STREAM_FORMAT_VERSION_1);
1593         msg.write(outputObject);
1594 
1595         outputObject.writeTo(this);
1596     }
1597 
1598     public void sendCancelRequestWithLock(GIOPVersion giopVersion,
1599                                           int requestId)
1600         throws IOException
1601     {
1602         writeLock();
1603         try {
1604             sendCancelRequest(giopVersion, requestId);
1605         } finally {
1606             writeUnlock();
1607         }
1608     }
1609 
1610     // Begin Code Base methods ---------------------------------------
1611     //
1612     // Set this connection's code base IOR.  The IOR comes from the
1613     // SendingContext.  This is an optional service context, but all
1614     // JavaSoft ORBs send it.
1615     //
1616     // The set and get methods don't need to be synchronized since the
1617     // first possible get would occur during reading a valuetype, and
1618     // that would be after the set.
1619 
1620     // Sets this connection's code base IOR.  This is done after
1621     // getting the IOR out of the SendingContext service context.
1622     // Our ORBs always send this, but it's optional in CORBA.
1623 
1624     public final void setCodeBaseIOR(IOR ior) {
1625         codeBaseServerIOR = ior;
1626     }
1627 
1628     public final IOR getCodeBaseIOR() {
1629         return codeBaseServerIOR;
1630     }
1631 
1632     // Get a CodeBase stub to use in unmarshaling.  The CachedCodeBase
1633     // won't connect to the remote codebase unless it's necessary.
1634     public final CodeBase getCodeBase() {
1635         return cachedCodeBase;
1636     }
1637 
1638     // End Code Base methods -----------------------------------------
1639 
1640     // set transport read thresholds
1641     protected void setReadTimeouts(ReadTimeouts readTimeouts) {
1642         this.readTimeouts = readTimeouts;
1643     }
1644 
1645     protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
1646         partialMessageMediator = messageMediator;
1647     }
1648 
1649     protected CorbaMessageMediator getPartialMessageMediator() {
1650         return partialMessageMediator;
1651     }
1652 
1653     public String toString()
1654     {
1655         synchronized ( stateEvent ){
1656             return
1657                 "SocketOrChannelConnectionImpl[" + " "
1658                 + (socketChannel == null ?
1659                    socket.toString() : socketChannel.toString()) + " "
1660                 + getStateString( state ) + " "
1661                 + shouldUseSelectThreadToWait() + " "
1662                 + shouldUseWorkerThreadForEvent() + " "
1663                 + shouldReadGiopHeaderOnly()
1664                 + "]" ;
1665         }
1666     }
1667 
1668     // Must be public - used in encoding.
1669     public void dprint(String msg)
1670     {
1671         ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
1672     }
1673 
1674     protected void dprint(String msg, Throwable t)
1675     {
1676         dprint(msg);
1677         t.printStackTrace(System.out);
1678     }
1679 }
1680 
1681 // End of file.