1 /*
   2  * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.Socket;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.SelectableChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.SocketChannel;
  35 import java.security.AccessController;
  36 import java.security.PrivilegedAction;
  37 import java.util.Collections;
  38 import java.util.Hashtable;
  39 import java.util.HashMap;
  40 import java.util.Map;
  41 
  42 import org.omg.CORBA.COMM_FAILURE;
  43 import org.omg.CORBA.CompletionStatus;
  44 import org.omg.CORBA.DATA_CONVERSION;
  45 import org.omg.CORBA.INTERNAL;
  46 import org.omg.CORBA.MARSHAL;
  47 import org.omg.CORBA.OBJECT_NOT_EXIST;
  48 import org.omg.CORBA.SystemException;
  49 
  50 import com.sun.org.omg.SendingContext.CodeBase;
  51 
  52 import com.sun.corba.se.pept.broker.Broker;
  53 import com.sun.corba.se.pept.encoding.InputObject;
  54 import com.sun.corba.se.pept.encoding.OutputObject;
  55 import com.sun.corba.se.pept.protocol.MessageMediator;
  56 import com.sun.corba.se.pept.transport.Acceptor;
  57 import com.sun.corba.se.pept.transport.Connection;
  58 import com.sun.corba.se.pept.transport.ConnectionCache;
  59 import com.sun.corba.se.pept.transport.ContactInfo;
  60 import com.sun.corba.se.pept.transport.EventHandler;
  61 import com.sun.corba.se.pept.transport.InboundConnectionCache;
  62 import com.sun.corba.se.pept.transport.OutboundConnectionCache;
  63 import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
  64 import com.sun.corba.se.pept.transport.Selector;
  65 
  66 import com.sun.corba.se.spi.ior.IOR;
  67 import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
  68 import com.sun.corba.se.spi.logging.CORBALogDomains;
  69 import com.sun.corba.se.spi.orb.ORB ;
  70 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  71 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  72 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  74 import com.sun.corba.se.spi.transport.CorbaContactInfo;
  75 import com.sun.corba.se.spi.transport.CorbaConnection;
  76 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  77 import com.sun.corba.se.spi.transport.ReadTimeouts;
  78 
  79 import com.sun.corba.se.impl.encoding.CachedCodeBase;
  80 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
  81 import com.sun.corba.se.impl.encoding.CDROutputObject;
  82 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
  83 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
  84 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
  85 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  86 import com.sun.corba.se.impl.orbutil.ORBConstants;
  87 import com.sun.corba.se.impl.orbutil.ORBUtility;
  88 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
  89 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
  90 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
  91 
  92 /**
  93  * @author Harold Carr
  94  */
  95 public class SocketOrChannelConnectionImpl
  96     extends
  97         EventHandlerBase
  98     implements
  99         CorbaConnection,
 100         Work
 101 {
 102     public static boolean dprintWriteLocks = false;
 103 
 104     //
 105     // New transport.
 106     //
 107 
 108     protected long enqueueTime;
 109 
 110     protected SocketChannel socketChannel;
 111     public SocketChannel getSocketChannel()
 112     {
 113         return socketChannel;
 114     }
 115 
 116     // REVISIT:
 117     // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
 118     protected CorbaContactInfo contactInfo;
 119     protected Acceptor acceptor;
 120     protected ConnectionCache connectionCache;
 121 
 122     //
 123     // From iiop.Connection.java
 124     //
 125 
 126     protected Socket socket;    // The socket used for this connection.
 127     protected long timeStamp = 0;
 128     protected boolean isServer = false;
 129 
 130     // Start at some value other than zero since this is a magic
 131     // value in some protocols.
 132     protected int requestId = 5;
 133     protected CorbaResponseWaitingRoom responseWaitingRoom;
 134     protected int state;
 135     protected java.lang.Object stateEvent = new java.lang.Object();
 136     protected java.lang.Object writeEvent = new java.lang.Object();
 137     protected boolean writeLocked;
 138     protected int serverRequestCount = 0;
 139 
 140     // Server request map: used on the server side of Connection
 141     // Maps request ID to IIOPInputStream.
 142     Map serverRequestMap = null;
 143 
 144     // This is a flag associated per connection telling us if the
 145     // initial set of sending contexts were sent to the receiver
 146     // already...
 147     protected boolean postInitialContexts = false;
 148 
 149     // Remote reference to CodeBase server (supplies
 150     // FullValueDescription, among other things)
 151     protected IOR codeBaseServerIOR;
 152 
 153     // CodeBase cache for this connection.  This will cache remote operations,
 154     // handle connecting, and ensure we don't do any remote operations until
 155     // necessary.
 156     protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
 157 
 158     protected ORBUtilSystemException wrapper ;
 159 
 160     // transport read timeout values
 161     protected ReadTimeouts readTimeouts;
 162 
 163     protected boolean shouldReadGiopHeaderOnly;
 164 
 165     // A message mediator used when shouldReadGiopHeaderOnly is
 166     // true to maintain request message state across execution in a
 167     // SelectorThread and WorkerThread.
 168     protected CorbaMessageMediator partialMessageMediator = null;
 169 
 170     // Used in genericRPCMSGFramework test.
 171     protected SocketOrChannelConnectionImpl(ORB orb)
 172     {
 173         this.orb = orb;
 174         wrapper = ORBUtilSystemException.get( orb,
 175             CORBALogDomains.RPC_TRANSPORT ) ;
 176 
 177         setWork(this);
 178         responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
 179         setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
 180     }
 181 
 182     // Both client and servers.
 183     protected SocketOrChannelConnectionImpl(ORB orb,
 184                                             boolean useSelectThreadToWait,
 185                                             boolean useWorkerThread)
 186     {
 187         this(orb) ;
 188         setUseSelectThreadToWait(useSelectThreadToWait);
 189         setUseWorkerThreadForEvent(useWorkerThread);
 190     }
 191 
 192     // Client constructor.
 193     public SocketOrChannelConnectionImpl(ORB orb,
 194                                          CorbaContactInfo contactInfo,
 195                                          boolean useSelectThreadToWait,
 196                                          boolean useWorkerThread,
 197                                          String socketType,
 198                                          String hostname,
 199                                          int port)
 200     {
 201         this(orb, useSelectThreadToWait, useWorkerThread);
 202 
 203         this.contactInfo = contactInfo;
 204 
 205         try {
 206             socket = orb.getORBData().getSocketFactory()
 207                 .createSocket(socketType,
 208                               new InetSocketAddress(hostname, port));
 209             socketChannel = socket.getChannel();
 210 
 211             if (socketChannel != null) {
 212                 boolean isBlocking = !useSelectThreadToWait;
 213                 socketChannel.configureBlocking(isBlocking);
 214             } else {
 215                 // IMPORTANT: non-channel-backed sockets must use
 216                 // dedicated reader threads.
 217                 setUseSelectThreadToWait(false);
 218             }
 219             if (orb.transportDebugFlag) {
 220                 dprint(".initialize: connection created: " + socket);
 221             }
 222         } catch (Throwable t) {
 223             throw wrapper.connectFailure(t, socketType, hostname,
 224                                          Integer.toString(port));
 225         }
 226         state = OPENING;
 227     }
 228 
 229     // Client-side convenience.
 230     public SocketOrChannelConnectionImpl(ORB orb,
 231                                          CorbaContactInfo contactInfo,
 232                                          String socketType,
 233                                          String hostname,
 234                                          int port)
 235     {
 236         this(orb, contactInfo,
 237              orb.getORBData().connectionSocketUseSelectThreadToWait(),
 238              orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
 239              socketType, hostname, port);
 240     }
 241 
 242     // Server-side constructor.
 243     public SocketOrChannelConnectionImpl(ORB orb,
 244                                          Acceptor acceptor,
 245                                          Socket socket,
 246                                          boolean useSelectThreadToWait,
 247                                          boolean useWorkerThread)
 248     {
 249         this(orb, useSelectThreadToWait, useWorkerThread);
 250 
 251         this.socket = socket;
 252         socketChannel = socket.getChannel();
 253         if (socketChannel != null) {
 254             // REVISIT
 255             try {
 256                 boolean isBlocking = !useSelectThreadToWait;
 257                 socketChannel.configureBlocking(isBlocking);
 258             } catch (IOException e) {
 259                 RuntimeException rte = new RuntimeException();
 260                 rte.initCause(e);
 261                 throw rte;
 262             }
 263         }
 264         this.acceptor = acceptor;
 265 
 266         serverRequestMap = Collections.synchronizedMap(new HashMap());
 267         isServer = true;
 268 
 269         state = ESTABLISHED;
 270     }
 271 
 272     // Server-side convenience
 273     public SocketOrChannelConnectionImpl(ORB orb,
 274                                          Acceptor acceptor,
 275                                          Socket socket)
 276     {
 277         this(orb, acceptor, socket,
 278              (socket.getChannel() == null
 279               ? false
 280               : orb.getORBData().connectionSocketUseSelectThreadToWait()),
 281              (socket.getChannel() == null
 282               ? false
 283               : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
 284     }
 285 
 286     ////////////////////////////////////////////////////
 287     //
 288     // framework.transport.Connection
 289     //
 290 
 291     public boolean shouldRegisterReadEvent()
 292     {
 293         return true;
 294     }
 295 
 296     public boolean shouldRegisterServerReadEvent()
 297     {
 298         return true;
 299     }
 300 
 301     public boolean read()
 302     {
 303         try {
 304             if (orb.transportDebugFlag) {
 305                 dprint(".read->: " + this);
 306             }
 307             CorbaMessageMediator messageMediator = readBits();
 308             if (messageMediator != null) {
 309                 // Null can happen when client closes stream
 310                 // causing purgecalls.
 311                 return dispatch(messageMediator);
 312             }
 313             return true;
 314         } finally {
 315             if (orb.transportDebugFlag) {
 316                 dprint(".read<-: " + this);
 317             }
 318         }
 319     }
 320 
 321     protected CorbaMessageMediator readBits()
 322     {
 323         try {
 324 
 325             if (orb.transportDebugFlag) {
 326                 dprint(".readBits->: " + this);
 327             }
 328 
 329             MessageMediator messageMediator;
 330             // REVISIT - use common factory base class.
 331             if (contactInfo != null) {
 332                 messageMediator =
 333                     contactInfo.createMessageMediator(orb, this);
 334             } else if (acceptor != null) {
 335                 messageMediator = acceptor.createMessageMediator(orb, this);
 336             } else {
 337                 throw
 338                     new RuntimeException("SocketOrChannelConnectionImpl.readBits");
 339             }
 340             return (CorbaMessageMediator) messageMediator;
 341 
 342         } catch (ThreadDeath td) {
 343             if (orb.transportDebugFlag) {
 344                 dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
 345             }
 346             try {
 347                 purgeCalls(wrapper.connectionAbort(td), false, false);
 348             } catch (Throwable t) {
 349                 if (orb.transportDebugFlag) {
 350                     dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
 351                 }
 352             }
 353             throw td;
 354         } catch (Throwable ex) {
 355             if (orb.transportDebugFlag) {
 356                 dprint(".readBits: " + this + ": Throwable: " + ex, ex);
 357             }
 358 
 359             try {
 360                 if (ex instanceof INTERNAL) {
 361                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
 362                 }
 363             } catch (IOException e) {
 364                 if (orb.transportDebugFlag) {
 365                     dprint(".readBits: " + this +
 366                            ": sendMessageError: IOException: " + e, e);
 367                 }
 368             }
 369             // REVISIT - make sure reader thread is killed.
 370             Selector selector = orb.getTransportManager().getSelector(0);
 371             if (selector != null) {
 372                 selector.unregisterForEvent(this);
 373             }
 374             // Notify anyone waiting.
 375             purgeCalls(wrapper.connectionAbort(ex), true, false);
 376             // REVISIT
 377             //keepRunning = false;
 378             // REVISIT - if this is called after purgeCalls then
 379             // the state of the socket is ABORT so the writeLock
 380             // in close throws an exception.  It is ignored but
 381             // causes IBM (screen scraping) tests to fail.
 382             //close();
 383         } finally {
 384             if (orb.transportDebugFlag) {
 385                 dprint(".readBits<-: " + this);
 386             }
 387         }
 388         return null;
 389     }
 390 
 391     protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
 392     {
 393         try {
 394 
 395             if (orb.transportDebugFlag) {
 396                 dprint(".finishReadingBits->: " + this);
 397             }
 398 
 399             // REVISIT - use common factory base class.
 400             if (contactInfo != null) {
 401                 messageMediator =
 402                     contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
 403             } else if (acceptor != null) {
 404                 messageMediator =
 405                     acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
 406             } else {
 407                 throw
 408                     new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
 409             }
 410             return (CorbaMessageMediator) messageMediator;
 411 
 412         } catch (ThreadDeath td) {
 413             if (orb.transportDebugFlag) {
 414                 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
 415             }
 416             try {
 417                 purgeCalls(wrapper.connectionAbort(td), false, false);
 418             } catch (Throwable t) {
 419                 if (orb.transportDebugFlag) {
 420                     dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
 421                 }
 422             }
 423             throw td;
 424         } catch (Throwable ex) {
 425             if (orb.transportDebugFlag) {
 426                 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
 427             }
 428 
 429             try {
 430                 if (ex instanceof INTERNAL) {
 431                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
 432                 }
 433             } catch (IOException e) {
 434                 if (orb.transportDebugFlag) {
 435                     dprint(".finishReadingBits: " + this +
 436                            ": sendMessageError: IOException: " + e, e);
 437                 }
 438             }
 439             // REVISIT - make sure reader thread is killed.
 440             orb.getTransportManager().getSelector(0).unregisterForEvent(this);
 441             // Notify anyone waiting.
 442             purgeCalls(wrapper.connectionAbort(ex), true, false);
 443             // REVISIT
 444             //keepRunning = false;
 445             // REVISIT - if this is called after purgeCalls then
 446             // the state of the socket is ABORT so the writeLock
 447             // in close throws an exception.  It is ignored but
 448             // causes IBM (screen scraping) tests to fail.
 449             //close();
 450         } finally {
 451             if (orb.transportDebugFlag) {
 452                 dprint(".finishReadingBits<-: " + this);
 453             }
 454         }
 455         return null;
 456     }
 457 
 458     protected boolean dispatch(CorbaMessageMediator messageMediator)
 459     {
 460         try {
 461             if (orb.transportDebugFlag) {
 462                 dprint(".dispatch->: " + this);
 463             }
 464 
 465             //
 466             // NOTE:
 467             //
 468             // This call is the transition from the tranport block
 469             // to the protocol block.
 470             //
 471 
 472             boolean result =
 473                 messageMediator.getProtocolHandler()
 474                 .handleRequest(messageMediator);
 475 
 476             return result;
 477 
 478         } catch (ThreadDeath td) {
 479             if (orb.transportDebugFlag) {
 480                 dprint(".dispatch: ThreadDeath", td );
 481             }
 482             try {
 483                 purgeCalls(wrapper.connectionAbort(td), false, false);
 484             } catch (Throwable t) {
 485                 if (orb.transportDebugFlag) {
 486                     dprint(".dispatch: purgeCalls: Throwable", t);
 487                 }
 488             }
 489             throw td;
 490         } catch (Throwable ex) {
 491             if (orb.transportDebugFlag) {
 492                 dprint(".dispatch: Throwable", ex ) ;
 493             }
 494 
 495             try {
 496                 if (ex instanceof INTERNAL) {
 497                     sendMessageError(GIOPVersion.DEFAULT_VERSION);
 498                 }
 499             } catch (IOException e) {
 500                 if (orb.transportDebugFlag) {
 501                     dprint(".dispatch: sendMessageError: IOException", e);
 502                 }
 503             }
 504             purgeCalls(wrapper.connectionAbort(ex), false, false);
 505             // REVISIT
 506             //keepRunning = false;
 507         } finally {
 508             if (orb.transportDebugFlag) {
 509                 dprint(".dispatch<-: " + this);
 510             }
 511         }
 512 
 513         return true;
 514     }
 515 
 516     public boolean shouldUseDirectByteBuffers()
 517     {
 518         return getSocketChannel() != null;
 519     }
 520 
 521     public ByteBuffer read(int size, int offset, int length, long max_wait_time)
 522         throws IOException
 523     {
 524         if (shouldUseDirectByteBuffers()) {
 525 
 526             ByteBuffer byteBuffer =
 527                 orb.getByteBufferPool().getByteBuffer(size);
 528 
 529             if (orb.transportDebugFlag) {
 530                 // print address of ByteBuffer gotten from pool
 531                 int bbAddress = System.identityHashCode(byteBuffer);
 532                 StringBuffer sb = new StringBuffer(80);
 533                 sb.append(".read: got ByteBuffer id (");
 534                 sb.append(bbAddress).append(") from ByteBufferPool.");
 535                 String msgStr = sb.toString();
 536                 dprint(msgStr);
 537             }
 538 
 539             byteBuffer.position(offset);
 540             byteBuffer.limit(size);
 541 
 542             readFully(byteBuffer, length, max_wait_time);
 543 
 544             return byteBuffer;
 545         }
 546 
 547         byte[] buf = new byte[size];
 548         readFully(getSocket().getInputStream(), buf,
 549                   offset, length, max_wait_time);
 550         ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
 551         byteBuffer.limit(size);
 552         return byteBuffer;
 553     }
 554 
 555     public ByteBuffer read(ByteBuffer byteBuffer, int offset,
 556                            int length, long max_wait_time)
 557         throws IOException
 558     {
 559         int size = offset + length;
 560         if (shouldUseDirectByteBuffers()) {
 561 
 562             if (! byteBuffer.isDirect()) {
 563                 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
 564             }
 565             if (size > byteBuffer.capacity()) {
 566                 if (orb.transportDebugFlag) {
 567                     // print address of ByteBuffer being released
 568                     int bbAddress = System.identityHashCode(byteBuffer);
 569                     StringBuffer bbsb = new StringBuffer(80);
 570                     bbsb.append(".read: releasing ByteBuffer id (")
 571                         .append(bbAddress).append(") to ByteBufferPool.");
 572                     String bbmsg = bbsb.toString();
 573                     dprint(bbmsg);
 574                 }
 575                 orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
 576                 byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
 577             }
 578             byteBuffer.position(offset);
 579             byteBuffer.limit(size);
 580             readFully(byteBuffer, length, max_wait_time);
 581             byteBuffer.position(0);
 582             byteBuffer.limit(size);
 583             return byteBuffer;
 584         }
 585         if (byteBuffer.isDirect()) {
 586             throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
 587         }
 588         byte[] buf = new byte[size];
 589         readFully(getSocket().getInputStream(), buf,
 590                   offset, length, max_wait_time);
 591         return ByteBuffer.wrap(buf);
 592     }
 593 
 594     public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
 595         throws IOException
 596     {
 597         int n = 0;
 598         int bytecount = 0;
 599         long time_to_wait = readTimeouts.get_initial_time_to_wait();
 600         long total_time_in_wait = 0;
 601 
 602         // The reading of data incorporates a strategy to detect a
 603         // rogue client. The strategy is implemented as follows. As
 604         // long as data is being read, at least 1 byte or more, we
 605         // assume we have a well behaved client. If no data is read,
 606         // then we sleep for a time to wait, re-calculate a new time to
 607         // wait which is lengthier than the previous time spent waiting.
 608         // Then, if the total time spent waiting does not exceed a
 609         // maximum time we are willing to wait, we attempt another
 610         // read. If the maximum amount of time we are willing to
 611         // spend waiting for more data is exceeded, we throw an
 612         // IOException.
 613 
 614         // NOTE: Reading of GIOP headers are treated with a smaller
 615         //       maximum time to wait threshold. Based on extensive
 616         //       performance testing, all GIOP headers are being
 617         //       read in 1 read access.
 618 
 619         do {
 620             bytecount = getSocketChannel().read(byteBuffer);
 621 
 622             if (bytecount < 0) {
 623                 throw new IOException("End-of-stream");
 624             }
 625             else if (bytecount == 0) {
 626                 try {
 627                     Thread.sleep(time_to_wait);
 628                     total_time_in_wait += time_to_wait;
 629                     time_to_wait =
 630                         (long)(time_to_wait*readTimeouts.get_backoff_factor());
 631                 }
 632                 catch (InterruptedException ie) {
 633                     // ignore exception
 634                     if (orb.transportDebugFlag) {
 635                         dprint("readFully(): unexpected exception "
 636                                 + ie.toString());
 637                     }
 638                 }
 639             }
 640             else {
 641                 n += bytecount;
 642             }
 643         }
 644         while (n < size && total_time_in_wait < max_wait_time);
 645 
 646         if (n < size && total_time_in_wait >= max_wait_time)
 647         {
 648             // failed to read entire message
 649             throw wrapper.transportReadTimeoutExceeded(new Integer(size),
 650                                       new Integer(n), new Long(max_wait_time),
 651                                       new Long(total_time_in_wait));
 652         }
 653 
 654         getConnectionCache().stampTime(this);
 655     }
 656 
 657     // To support non-channel connections.
 658     public void readFully(java.io.InputStream is, byte[] buf,
 659                           int offset, int size, long max_wait_time)
 660         throws IOException
 661     {
 662         int n = 0;
 663         int bytecount = 0;
 664         long time_to_wait = readTimeouts.get_initial_time_to_wait();
 665         long total_time_in_wait = 0;
 666 
 667         // The reading of data incorporates a strategy to detect a
 668         // rogue client. The strategy is implemented as follows. As
 669         // long as data is being read, at least 1 byte or more, we
 670         // assume we have a well behaved client. If no data is read,
 671         // then we sleep for a time to wait, re-calculate a new time to
 672         // wait which is lengthier than the previous time spent waiting.
 673         // Then, if the total time spent waiting does not exceed a
 674         // maximum time we are willing to wait, we attempt another
 675         // read. If the maximum amount of time we are willing to
 676         // spend waiting for more data is exceeded, we throw an
 677         // IOException.
 678 
 679         // NOTE: Reading of GIOP headers are treated with a smaller
 680         //       maximum time to wait threshold. Based on extensive
 681         //       performance testing, all GIOP headers are being
 682         //       read in 1 read access.
 683 
 684         do {
 685             bytecount = is.read(buf, offset + n, size - n);
 686             if (bytecount < 0) {
 687                 throw new IOException("End-of-stream");
 688             }
 689             else if (bytecount == 0) {
 690                 try {
 691                     Thread.sleep(time_to_wait);
 692                     total_time_in_wait += time_to_wait;
 693                     time_to_wait =
 694                         (long)(time_to_wait*readTimeouts.get_backoff_factor());
 695                 }
 696                 catch (InterruptedException ie) {
 697                     // ignore exception
 698                     if (orb.transportDebugFlag) {
 699                         dprint("readFully(): unexpected exception "
 700                                 + ie.toString());
 701                     }
 702                 }
 703             }
 704             else {
 705                 n += bytecount;
 706             }
 707         }
 708         while (n < size && total_time_in_wait < max_wait_time);
 709 
 710         if (n < size && total_time_in_wait >= max_wait_time)
 711         {
 712             // failed to read entire message
 713             throw wrapper.transportReadTimeoutExceeded(new Integer(size),
 714                                       new Integer(n), new Long(max_wait_time),
 715                                       new Long(total_time_in_wait));
 716         }
 717 
 718         getConnectionCache().stampTime(this);
 719     }
 720 
 721     public void write(ByteBuffer byteBuffer)
 722         throws IOException
 723     {
 724         if (shouldUseDirectByteBuffers()) {
 725             /* NOTE: cannot perform this test.  If one ask for a
 726                ByteBuffer from the pool which is bigger than the size
 727                of ByteBuffers managed by the pool, then the pool will
 728                return a HeapByteBuffer.
 729             if (byteBuffer.hasArray()) {
 730                 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
 731             }
 732             */
 733             // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
 734             //            all bytes are written on first write attempt.
 735             do {
 736                 getSocketChannel().write(byteBuffer);
 737             }
 738             while (byteBuffer.hasRemaining());
 739 
 740         } else {
 741             if (! byteBuffer.hasArray()) {
 742                 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
 743             }
 744             byte[] tmpBuf = byteBuffer.array();
 745             getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
 746             getSocket().getOutputStream().flush();
 747         }
 748 
 749         // TimeStamp connection to indicate it has been used
 750         // Note granularity of connection usage is assumed for
 751         // now to be that of a IIOP packet.
 752         getConnectionCache().stampTime(this);
 753     }
 754 
 755     /**
 756      * Note:it is possible for this to be called more than once
 757      */
 758     public synchronized void close()
 759     {
 760         try {
 761             if (orb.transportDebugFlag) {
 762                 dprint(".close->: " + this);
 763             }
 764             writeLock();
 765 
 766             // REVISIT It will be good to have a read lock on the reader thread
 767             // before we proceed further, to avoid the reader thread (server side)
 768             // from processing requests. This avoids the risk that a new request
 769             // will be accepted by ReaderThread while the ListenerThread is
 770             // attempting to close this connection.
 771 
 772             if (isBusy()) { // we are busy!
 773                 writeUnlock();
 774                 if (orb.transportDebugFlag) {
 775                     dprint(".close: isBusy so no close: " + this);
 776                 }
 777                 return;
 778             }
 779 
 780             try {
 781                 try {
 782                     sendCloseConnection(GIOPVersion.V1_0);
 783                 } catch (Throwable t) {
 784                     wrapper.exceptionWhenSendingCloseConnection(t);
 785                 }
 786 
 787                 synchronized ( stateEvent ){
 788                     state = CLOSE_SENT;
 789                     stateEvent.notifyAll();
 790                 }
 791 
 792                 // stop the reader without causing it to do purgeCalls
 793                 //Exception ex = new Exception();
 794                 //reader.stop(ex); // REVISIT
 795 
 796                 // NOTE: !!!!!!
 797                 // This does writeUnlock().
 798                 purgeCalls(wrapper.connectionRebind(), false, true);
 799 
 800             } catch (Exception ex) {
 801                 if (orb.transportDebugFlag) {
 802                     dprint(".close: exception: " + this, ex);
 803                 }
 804             }
 805             try {
 806                 Selector selector = orb.getTransportManager().getSelector(0);
 807                 if (selector != null) {
 808                     selector.unregisterForEvent(this);
 809                 }
 810                 if (socketChannel != null) {
 811                     socketChannel.close();
 812                 }
 813                 socket.close();
 814             } catch (IOException e) {
 815                 if (orb.transportDebugFlag) {
 816                     dprint(".close: " + this, e);
 817                 }
 818             }
 819             closeConnectionResources();
 820         } finally {
 821             if (orb.transportDebugFlag) {
 822                 dprint(".close<-: " + this);
 823             }
 824         }
 825     }
 826 
 827     public void closeConnectionResources() {
 828            if (orb.transportDebugFlag) {
 829                dprint(".closeConnectionResources->: " + this);
 830            }
 831            Selector selector = orb.getTransportManager().getSelector(0);
 832            if (selector != null) {
 833                selector.unregisterForEvent(this);
 834            }
 835            try {
 836              if (socketChannel != null)
 837               socketChannel.close() ;
 838                 if (socket != null && !socket.isClosed())
 839                 socket.close() ;
 840            } catch (IOException e) {
 841              if (orb.transportDebugFlag) {
 842                  dprint( ".closeConnectionResources: " + this, e ) ;
 843              }
 844            }
 845            if (orb.transportDebugFlag) {
 846                dprint(".closeConnectionResources<-: " + this);
 847            }
 848      }
 849 
 850 
 851     public Acceptor getAcceptor()
 852     {
 853         return acceptor;
 854     }
 855 
 856     public ContactInfo getContactInfo()
 857     {
 858         return contactInfo;
 859     }
 860 
 861     public EventHandler getEventHandler()
 862     {
 863         return this;
 864     }
 865 
 866     public OutputObject createOutputObject(MessageMediator messageMediator)
 867     {
 868         // REVISIT - remove this method from Connection and all it subclasses.
 869         throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
 870     }
 871 
 872     // This is used by the GIOPOutputObject in order to
 873     // throw the correct error when handling code sets.
 874     // Can we determine if we are on the server side by
 875     // other means?  XREVISIT
 876     public boolean isServer()
 877     {
 878         return isServer;
 879     }
 880 
 881     public boolean isBusy()
 882     {
 883         if (serverRequestCount > 0 ||
 884             getResponseWaitingRoom().numberRegistered() > 0)
 885         {
 886             return true;
 887         } else {
 888             return false;
 889         }
 890     }
 891 
 892     public long getTimeStamp()
 893     {
 894         return timeStamp;
 895     }
 896 
 897     public void setTimeStamp(long time)
 898     {
 899         timeStamp = time;
 900     }
 901 
 902     public void setState(String stateString)
 903     {
 904         synchronized (stateEvent) {
 905             if (stateString.equals("ESTABLISHED")) {
 906                 state =  ESTABLISHED;
 907                 stateEvent.notifyAll();
 908             } else {
 909                 // REVISIT: ASSERT
 910             }
 911         }
 912     }
 913 
 914     /**
 915      * Sets the writeLock for this connection.
 916      * If the writeLock is already set by someone else, block till the
 917      * writeLock is released and can set by us.
 918      * IMPORTANT: this connection's lock must be acquired before
 919      * setting the writeLock and must be unlocked after setting the writeLock.
 920      */
 921     public void writeLock()
 922     {
 923       try {
 924         if (dprintWriteLocks && orb.transportDebugFlag) {
 925             dprint(".writeLock->: " + this);
 926         }
 927         // Keep looping till we can set the writeLock.
 928         while ( true ) {
 929             int localState = state;
 930             switch ( localState ) {
 931 
 932             case OPENING:
 933                 synchronized (stateEvent) {
 934                     if (state != OPENING) {
 935                         // somebody has changed 'state' so be careful
 936                         break;
 937                     }
 938                     try {
 939                         stateEvent.wait();
 940                     } catch (InterruptedException ie) {
 941                         if (orb.transportDebugFlag) {
 942                             dprint(".writeLock: OPENING InterruptedException: " + this);
 943                         }
 944                     }
 945                 }
 946                 // Loop back
 947                 break;
 948 
 949             case ESTABLISHED:
 950                 synchronized (writeEvent) {
 951                     if (!writeLocked) {
 952                         writeLocked = true;
 953                         return;
 954                     }
 955 
 956                     try {
 957                         // do not stay here too long if state != ESTABLISHED
 958                         // Bug 4752117
 959                         while (state == ESTABLISHED && writeLocked) {
 960                             writeEvent.wait(100);
 961                         }
 962                     } catch (InterruptedException ie) {
 963                         if (orb.transportDebugFlag) {
 964                             dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
 965                         }
 966                     }
 967                 }
 968                 // Loop back
 969                 break;
 970 
 971                 //
 972                 // XXX
 973                 // Need to distinguish between client and server roles
 974                 // here probably.
 975                 //
 976             case ABORT:
 977                 synchronized ( stateEvent ){
 978                     if (state != ABORT) {
 979                         break;
 980                     }
 981                     throw wrapper.writeErrorSend() ;
 982                 }
 983 
 984             case CLOSE_RECVD:
 985                 // the connection has been closed or closing
 986                 // ==> throw rebind exception
 987                 synchronized ( stateEvent ){
 988                     if (state != CLOSE_RECVD) {
 989                         break;
 990                     }
 991                     throw wrapper.connectionCloseRebind() ;
 992                 }
 993 
 994             default:
 995                 if (orb.transportDebugFlag) {
 996                     dprint(".writeLock: default: " + this);
 997                 }
 998                 // REVISIT
 999                 throw new RuntimeException(".writeLock: bad state");
1000             }
1001         }
1002       } finally {
1003         if (dprintWriteLocks && orb.transportDebugFlag) {
1004             dprint(".writeLock<-: " + this);
1005         }
1006       }
1007     }
1008 
1009     public void writeUnlock()
1010     {
1011         try {
1012             if (dprintWriteLocks && orb.transportDebugFlag) {
1013                 dprint(".writeUnlock->: " + this);
1014             }
1015             synchronized (writeEvent) {
1016                 writeLocked = false;
1017                 writeEvent.notify(); // wake up one guy waiting to write
1018             }
1019         } finally {
1020             if (dprintWriteLocks && orb.transportDebugFlag) {
1021                 dprint(".writeUnlock<-: " + this);
1022             }
1023         }
1024     }
1025 
1026     // Assumes the caller handles writeLock and writeUnlock
1027     public void sendWithoutLock(OutputObject outputObject)
1028     {
1029         // Don't we need to check for CloseConnection
1030         // here?  REVISIT
1031 
1032         // XREVISIT - Shouldn't the MessageMediator
1033         // be the one to handle writing the data here?
1034 
1035         try {
1036 
1037             // Write the fragment/message
1038 
1039             CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
1040             cdrOutputObject.writeTo(this);
1041             // REVISIT - no flush?
1042             //socket.getOutputStream().flush();
1043 
1044         } catch (IOException e1) {
1045 
1046             /*
1047              * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
1048              * sending a CancelRequest for regular requests / locate requests
1049              */
1050 
1051             // Since IIOPOutputStream's msgheader is set only once, and not
1052             // altered during sending multiple fragments, the original
1053             // msgheader will always have the requestId.
1054             // REVISIT This could be optimized to send a CancelRequest only
1055             // if any fragments had been sent already.
1056 
1057             /* REVISIT: MOVE TO SUBCONTRACT
1058             Message msg = os.getMessage();
1059             if (msg.getType() == Message.GIOPRequest ||
1060                     msg.getType() == Message.GIOPLocateRequest) {
1061                 GIOPVersion requestVersion = msg.getGIOPVersion();
1062                 int requestId = MessageBase.getRequestId(msg);
1063                 try {
1064                     sendCancelRequest(requestVersion, requestId);
1065                 } catch (IOException e2) {
1066                     // most likely an abortive connection closure.
1067                     // ignore, since nothing more can be done.
1068                     if (orb.transportDebugFlag) {
1069 
1070                 }
1071             }
1072             */
1073 
1074             // REVISIT When a send failure happens, purgeCalls() need to be
1075             // called to ensure that the connection is properly removed from
1076             // further usage (ie., cancelling pending requests with COMM_FAILURE
1077             // with an appropriate minor_code CompletionStatus.MAY_BE).
1078 
1079             // Relying on the IIOPOutputStream (as noted below) is not
1080             // sufficient as it handles COMM_FAILURE only for the final
1081             // fragment (during invoke processing). Note that COMM_FAILURE could
1082             // happen while sending the initial fragments.
1083             // Also the IIOPOutputStream does not properly close the connection.
1084             // It simply removes the connection from the table. An orderly
1085             // closure is needed (ie., cancel pending requests on the connection
1086             // COMM_FAILURE as well.
1087 
1088             // IIOPOutputStream will cleanup the connection info when it
1089             // sees this exception.
1090             SystemException exc = wrapper.writeErrorSend(e1);
1091             purgeCalls(exc, false, true);
1092             throw exc;
1093         }
1094     }
1095 
1096     public void registerWaiter(MessageMediator messageMediator)
1097     {
1098         responseWaitingRoom.registerWaiter(messageMediator);
1099     }
1100 
1101     public void unregisterWaiter(MessageMediator messageMediator)
1102     {
1103         responseWaitingRoom.unregisterWaiter(messageMediator);
1104     }
1105 
1106     public InputObject waitForResponse(MessageMediator messageMediator)
1107     {
1108         return responseWaitingRoom.waitForResponse(messageMediator);
1109     }
1110 
1111     public void setConnectionCache(ConnectionCache connectionCache)
1112     {
1113         this.connectionCache = connectionCache;
1114     }
1115 
1116     public ConnectionCache getConnectionCache()
1117     {
1118         return connectionCache;
1119     }
1120 
1121     ////////////////////////////////////////////////////
1122     //
1123     // EventHandler methods
1124     //
1125 
1126     public void setUseSelectThreadToWait(boolean x)
1127     {
1128         useSelectThreadToWait = x;
1129         // REVISIT - Reading of a GIOP header only is information
1130         //           that should be passed into the constructor
1131         //           from the SocketOrChannelConnection factory.
1132         setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
1133     }
1134 
1135     public void handleEvent()
1136     {
1137         if (orb.transportDebugFlag) {
1138             dprint(".handleEvent->: " + this);
1139         }
1140         getSelectionKey().interestOps(getSelectionKey().interestOps() &
1141                                       (~ getInterestOps()));
1142 
1143         if (shouldUseWorkerThreadForEvent()) {
1144             Throwable throwable = null;
1145             try {
1146                 int poolToUse = 0;
1147                 if (shouldReadGiopHeaderOnly()) {
1148                     partialMessageMediator = readBits();
1149                     poolToUse =
1150                         partialMessageMediator.getThreadPoolToUse();
1151                 }
1152 
1153                 if (orb.transportDebugFlag) {
1154                     dprint(".handleEvent: addWork to pool: " + poolToUse);
1155                 }
1156                 orb.getThreadPoolManager().getThreadPool(poolToUse)
1157                     .getWorkQueue(0).addWork(getWork());
1158             } catch (NoSuchThreadPoolException e) {
1159                 throwable = e;
1160             } catch (NoSuchWorkQueueException e) {
1161                 throwable = e;
1162             }
1163             // REVISIT: need to close connection.
1164             if (throwable != null) {
1165                 if (orb.transportDebugFlag) {
1166                     dprint(".handleEvent: " + throwable);
1167                 }
1168                 INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
1169                 i.initCause(throwable);
1170                 throw i;
1171             }
1172         } else {
1173             if (orb.transportDebugFlag) {
1174                 dprint(".handleEvent: doWork");
1175             }
1176             getWork().doWork();
1177         }
1178         if (orb.transportDebugFlag) {
1179             dprint(".handleEvent<-: " + this);
1180         }
1181     }
1182 
1183     public SelectableChannel getChannel()
1184     {
1185         return socketChannel;
1186     }
1187 
1188     public int getInterestOps()
1189     {
1190         return SelectionKey.OP_READ;
1191     }
1192 
1193     //    public Acceptor getAcceptor() - already defined above.
1194 
1195     public Connection getConnection()
1196     {
1197         return this;
1198     }
1199 
1200     ////////////////////////////////////////////////////
1201     //
1202     // Work methods.
1203     //
1204 
1205     public String getName()
1206     {
1207         return this.toString();
1208     }
1209 
1210     public void doWork()
1211     {
1212         try {
1213             if (orb.transportDebugFlag) {
1214                 dprint(".doWork->: " + this);
1215             }
1216 
1217             // IMPORTANT: Sanity checks on SelectionKeys such as
1218             //            SelectorKey.isValid() should not be done
1219             //            here.
1220             //
1221 
1222             if (!shouldReadGiopHeaderOnly()) {
1223                 read();
1224             }
1225             else {
1226                 // get the partialMessageMediator
1227                 // created by SelectorThread
1228                 CorbaMessageMediator messageMediator =
1229                                          this.getPartialMessageMediator();
1230 
1231                 // read remaining info needed in a MessageMediator
1232                 messageMediator = finishReadingBits(messageMediator);
1233 
1234                 if (messageMediator != null) {
1235                     // Null can happen when client closes stream
1236                     // causing purgecalls.
1237                     dispatch(messageMediator);
1238                 }
1239             }
1240         } catch (Throwable t) {
1241             if (orb.transportDebugFlag) {
1242                 dprint(".doWork: ignoring Throwable: "
1243                        + t
1244                        + " " + this);
1245             }
1246         } finally {
1247             if (orb.transportDebugFlag) {
1248                 dprint(".doWork<-: " + this);
1249             }
1250         }
1251     }
1252 
1253     public void setEnqueueTime(long timeInMillis)
1254     {
1255         enqueueTime = timeInMillis;
1256     }
1257 
1258     public long getEnqueueTime()
1259     {
1260         return enqueueTime;
1261     }
1262 
1263     ////////////////////////////////////////////////////
1264     //
1265     // spi.transport.CorbaConnection.
1266     //
1267 
1268     // IMPORTANT: Reader Threads must NOT read Giop header only.
1269     public boolean shouldReadGiopHeaderOnly() {
1270         return shouldReadGiopHeaderOnly;
1271     }
1272 
1273     protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
1274         shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
1275     }
1276 
1277     public ResponseWaitingRoom getResponseWaitingRoom()
1278     {
1279         return responseWaitingRoom;
1280     }
1281 
1282     // REVISIT - inteface defines isServer but already defined in
1283     // higher interface.
1284 
1285     public void serverRequestMapPut(int requestId,
1286                                     CorbaMessageMediator messageMediator)
1287     {
1288         serverRequestMap.put(new Integer(requestId), messageMediator);
1289     }
1290 
1291     public CorbaMessageMediator serverRequestMapGet(int requestId)
1292     {
1293         return (CorbaMessageMediator)
1294             serverRequestMap.get(new Integer(requestId));
1295     }
1296 
1297     public void serverRequestMapRemove(int requestId)
1298     {
1299         serverRequestMap.remove(new Integer(requestId));
1300     }
1301 
1302 
1303     // REVISIT: this is also defined in:
1304     // com.sun.corba.se.spi.legacy.connection.Connection
1305     public java.net.Socket getSocket()
1306     {
1307         return socket;
1308     }
1309 
1310     /** It is possible for a Close Connection to have been
1311      ** sent here, but we will not check for this. A "lazy"
1312      ** Exception will be thrown in the Worker thread after the
1313      ** incoming request has been processed even though the connection
1314      ** is closed before the request is processed. This is o.k because
1315      ** it is a boundary condition. To prevent it we would have to add
1316      ** more locks which would reduce performance in the normal case.
1317      **/
1318     public synchronized void serverRequestProcessingBegins()
1319     {
1320         serverRequestCount++;
1321     }
1322 
1323     public synchronized void serverRequestProcessingEnds()
1324     {
1325         serverRequestCount--;
1326     }
1327 
1328     //
1329     //
1330     //
1331 
1332     public synchronized int getNextRequestId()
1333     {
1334         return requestId++;
1335     }
1336 
1337     // Negotiated code sets for char and wchar data
1338     protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
1339 
1340     public ORB getBroker()
1341     {
1342         return orb;
1343     }
1344 
1345     public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
1346         // Needs to be synchronized for the following case when the client
1347         // doesn't send the code set context twice, and we have two threads
1348         // in ServerRequestDispatcher processCodeSetContext.
1349         //
1350         // Thread A checks to see if there is a context, there is none, so
1351         //     it calls setCodeSetContext, getting the synch lock.
1352         // Thread B checks to see if there is a context.  If we didn't synch,
1353         //     it might decide to outlaw wchar/wstring.
1354         if (codeSetContext == null) {
1355             synchronized(this) {
1356                 return codeSetContext;
1357             }
1358         }
1359 
1360         return codeSetContext;
1361     }
1362 
1363     public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
1364         // Double check whether or not we need to do this
1365         if (codeSetContext == null) {
1366 
1367             if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
1368                 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
1369                 // If the client says it's negotiated a code set that
1370                 // isn't a fallback and we never said we support, then
1371                 // it has a bug.
1372                 throw wrapper.badCodesetsFromClient() ;
1373             }
1374 
1375             codeSetContext = csc;
1376         }
1377     }
1378 
1379     //
1380     // from iiop.IIOPConnection.java
1381     //
1382 
1383     // Map request ID to an InputObject.
1384     // This is so the client thread can start unmarshaling
1385     // the reply and remove it from the out_calls map while the
1386     // ReaderThread can still obtain the input stream to give
1387     // new fragments.  Only the ReaderThread touches the clientReplyMap,
1388     // so it doesn't incur synchronization overhead.
1389 
1390     public MessageMediator clientRequestMapGet(int requestId)
1391     {
1392         return responseWaitingRoom.getMessageMediator(requestId);
1393     }
1394 
1395     protected MessageMediator clientReply_1_1;
1396 
1397     public void clientReply_1_1_Put(MessageMediator x)
1398     {
1399         clientReply_1_1 = x;
1400     }
1401 
1402     public MessageMediator clientReply_1_1_Get()
1403     {
1404         return  clientReply_1_1;
1405     }
1406 
1407     public void clientReply_1_1_Remove()
1408     {
1409         clientReply_1_1 = null;
1410     }
1411 
1412     protected MessageMediator serverRequest_1_1;
1413 
1414     public void serverRequest_1_1_Put(MessageMediator x)
1415     {
1416         serverRequest_1_1 = x;
1417     }
1418 
1419     public MessageMediator serverRequest_1_1_Get()
1420     {
1421         return  serverRequest_1_1;
1422     }
1423 
1424     public void serverRequest_1_1_Remove()
1425     {
1426         serverRequest_1_1 = null;
1427     }
1428 
1429     protected String getStateString( int state )
1430     {
1431         synchronized ( stateEvent ){
1432             switch (state) {
1433             case OPENING : return "OPENING" ;
1434             case ESTABLISHED : return "ESTABLISHED" ;
1435             case CLOSE_SENT : return "CLOSE_SENT" ;
1436             case CLOSE_RECVD : return "CLOSE_RECVD" ;
1437             case ABORT : return "ABORT" ;
1438             default : return "???" ;
1439             }
1440         }
1441     }
1442 
1443     public synchronized boolean isPostInitialContexts() {
1444         return postInitialContexts;
1445     }
1446 
1447     // Can never be unset...
1448     public synchronized void setPostInitialContexts(){
1449         postInitialContexts = true;
1450     }
1451 
1452     /**
1453      * Wake up the outstanding requests on the connection, and hand them
1454      * COMM_FAILURE exception with a given minor code.
1455      *
1456      * Also, delete connection from connection table and
1457      * stop the reader thread.
1458 
1459      * Note that this should only ever be called by the Reader thread for
1460      * this connection.
1461      *
1462      * @param minor_code The minor code for the COMM_FAILURE major code.
1463      * @param die Kill the reader thread (this thread) before exiting.
1464      */
1465     public void purgeCalls(SystemException systemException,
1466                            boolean die, boolean lockHeld)
1467     {
1468         int minor_code = systemException.minor;
1469 
1470         try{
1471             if (orb.transportDebugFlag) {
1472                 dprint(".purgeCalls->: "
1473                        + minor_code + "/" + die + "/" + lockHeld
1474                        + " " + this);
1475             }
1476 
1477             // If this invocation is a result of ThreadDeath caused
1478             // by a previous execution of this routine, just exit.
1479 
1480             synchronized ( stateEvent ){
1481                 if ((state == ABORT) || (state == CLOSE_RECVD)) {
1482                     if (orb.transportDebugFlag) {
1483                         dprint(".purgeCalls: exiting since state is: "
1484                                + getStateString(state)
1485                                + " " + this);
1486                     }
1487                     return;
1488                 }
1489             }
1490 
1491             // Grab the writeLock (freeze the calls)
1492             try {
1493                 if (!lockHeld) {
1494                     writeLock();
1495                 }
1496             } catch (SystemException ex) {
1497                 if (orb.transportDebugFlag)
1498                     dprint(".purgeCalls: SystemException" + ex
1499                            + "; continuing " + this);
1500             }
1501 
1502             // Mark the state of the connection
1503             // and determine the request status
1504             org.omg.CORBA.CompletionStatus completion_status;
1505             synchronized ( stateEvent ){
1506                 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
1507                     state = CLOSE_RECVD;
1508                     systemException.completed = CompletionStatus.COMPLETED_NO;
1509                 } else {
1510                     state = ABORT;
1511                     systemException.completed = CompletionStatus.COMPLETED_MAYBE;
1512                 }
1513                 stateEvent.notifyAll();
1514             }
1515 
1516             try {
1517                 socket.getInputStream().close();
1518                 socket.getOutputStream().close();
1519                 socket.close();
1520             } catch (Exception ex) {
1521                 if (orb.transportDebugFlag) {
1522                     dprint(".purgeCalls: Exception closing socket: " + ex
1523                            + " " + this);
1524                 }
1525             }
1526 
1527             // Signal all threads with outstanding requests on this
1528             // connection and give them the SystemException;
1529 
1530             responseWaitingRoom.signalExceptionToAllWaiters(systemException);
1531         } finally {
1532             if (contactInfo != null) {
1533                 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
1534             } else if (acceptor != null) {
1535                 ((InboundConnectionCache)getConnectionCache()).remove(this);
1536             }
1537 
1538             //
1539             // REVISIT: Stop the reader thread
1540             //
1541 
1542             // Signal all the waiters of the writeLock.
1543             // There are 4 types of writeLock waiters:
1544             // 1. Send waiters:
1545             // 2. SendReply waiters:
1546             // 3. cleanUp waiters:
1547             // 4. purge_call waiters:
1548             //
1549 
1550             writeUnlock();
1551 
1552             if (orb.transportDebugFlag) {
1553                 dprint(".purgeCalls<-: "
1554                        + minor_code + "/" + die + "/" + lockHeld
1555                        + " " + this);
1556             }
1557         }
1558     }
1559 
1560     /*************************************************************************
1561     * The following methods are for dealing with Connection cleaning for
1562     * better scalability of servers in high network load conditions.
1563     **************************************************************************/
1564 
1565     public void sendCloseConnection(GIOPVersion giopVersion)
1566         throws IOException
1567     {
1568         Message msg = MessageBase.createCloseConnection(giopVersion);
1569         sendHelper(giopVersion, msg);
1570     }
1571 
1572     public void sendMessageError(GIOPVersion giopVersion)
1573         throws IOException
1574     {
1575         Message msg = MessageBase.createMessageError(giopVersion);
1576         sendHelper(giopVersion, msg);
1577     }
1578 
1579     /**
1580      * Send a CancelRequest message. This does not lock the connection, so the
1581      * caller needs to ensure this method is called appropriately.
1582      * @exception IOException - could be due to abortive connection closure.
1583      */
1584     public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
1585         throws IOException
1586     {
1587 
1588         Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
1589         sendHelper(giopVersion, msg);
1590     }
1591 
1592     protected void sendHelper(GIOPVersion giopVersion, Message msg)
1593         throws IOException
1594     {
1595         // REVISIT: See comments in CDROutputObject constructor.
1596         CDROutputObject outputObject =
1597             sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion,
1598                                 this, msg, ORBConstants.STREAM_FORMAT_VERSION_1);
1599         msg.write(outputObject);
1600 
1601         outputObject.writeTo(this);
1602     }
1603 
1604     public void sendCancelRequestWithLock(GIOPVersion giopVersion,
1605                                           int requestId)
1606         throws IOException
1607     {
1608         writeLock();
1609         try {
1610             sendCancelRequest(giopVersion, requestId);
1611         } finally {
1612             writeUnlock();
1613         }
1614     }
1615 
1616     // Begin Code Base methods ---------------------------------------
1617     //
1618     // Set this connection's code base IOR.  The IOR comes from the
1619     // SendingContext.  This is an optional service context, but all
1620     // JavaSoft ORBs send it.
1621     //
1622     // The set and get methods don't need to be synchronized since the
1623     // first possible get would occur during reading a valuetype, and
1624     // that would be after the set.
1625 
1626     // Sets this connection's code base IOR.  This is done after
1627     // getting the IOR out of the SendingContext service context.
1628     // Our ORBs always send this, but it's optional in CORBA.
1629 
1630     public final void setCodeBaseIOR(IOR ior) {
1631         codeBaseServerIOR = ior;
1632     }
1633 
1634     public final IOR getCodeBaseIOR() {
1635         return codeBaseServerIOR;
1636     }
1637 
1638     // Get a CodeBase stub to use in unmarshaling.  The CachedCodeBase
1639     // won't connect to the remote codebase unless it's necessary.
1640     public final CodeBase getCodeBase() {
1641         return cachedCodeBase;
1642     }
1643 
1644     // End Code Base methods -----------------------------------------
1645 
1646     // set transport read thresholds
1647     protected void setReadTimeouts(ReadTimeouts readTimeouts) {
1648         this.readTimeouts = readTimeouts;
1649     }
1650 
1651     protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
1652         partialMessageMediator = messageMediator;
1653     }
1654 
1655     protected CorbaMessageMediator getPartialMessageMediator() {
1656         return partialMessageMediator;
1657     }
1658 
1659     public String toString()
1660     {
1661         synchronized ( stateEvent ){
1662             return
1663                 "SocketOrChannelConnectionImpl[" + " "
1664                 + (socketChannel == null ?
1665                    socket.toString() : socketChannel.toString()) + " "
1666                 + getStateString( state ) + " "
1667                 + shouldUseSelectThreadToWait() + " "
1668                 + shouldUseWorkerThreadForEvent() + " "
1669                 + shouldReadGiopHeaderOnly()
1670                 + "]" ;
1671         }
1672     }
1673 
1674     // Must be public - used in encoding.
1675     public void dprint(String msg)
1676     {
1677         ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
1678     }
1679 
1680     protected void dprint(String msg, Throwable t)
1681     {
1682         dprint(msg);
1683         t.printStackTrace(System.out);
1684     }
1685 }
1686 
1687 // End of file.