1 /* 2 * Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.Socket; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SelectableChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.SocketChannel; 35 import java.security.AccessController; 36 import java.security.PrivilegedAction; 37 import java.util.Collections; 38 import java.util.Hashtable; 39 import java.util.HashMap; 40 import java.util.Map; 41 42 import org.omg.CORBA.COMM_FAILURE; 43 import org.omg.CORBA.CompletionStatus; 44 import org.omg.CORBA.DATA_CONVERSION; 45 import org.omg.CORBA.INTERNAL; 46 import org.omg.CORBA.MARSHAL; 47 import org.omg.CORBA.OBJECT_NOT_EXIST; 48 import org.omg.CORBA.SystemException; 49 50 import com.sun.org.omg.SendingContext.CodeBase; 51 52 import com.sun.corba.se.pept.broker.Broker; 53 import com.sun.corba.se.pept.encoding.InputObject; 54 import com.sun.corba.se.pept.encoding.OutputObject; 55 import com.sun.corba.se.pept.protocol.MessageMediator; 56 import com.sun.corba.se.pept.transport.Acceptor; 57 import com.sun.corba.se.pept.transport.Connection; 58 import com.sun.corba.se.pept.transport.ConnectionCache; 59 import com.sun.corba.se.pept.transport.ContactInfo; 60 import com.sun.corba.se.pept.transport.EventHandler; 61 import com.sun.corba.se.pept.transport.InboundConnectionCache; 62 import com.sun.corba.se.pept.transport.OutboundConnectionCache; 63 import com.sun.corba.se.pept.transport.ResponseWaitingRoom; 64 import com.sun.corba.se.pept.transport.Selector; 65 66 import com.sun.corba.se.spi.ior.IOR; 67 import com.sun.corba.se.spi.ior.iiop.GIOPVersion; 68 import com.sun.corba.se.spi.logging.CORBALogDomains; 69 import com.sun.corba.se.spi.orb.ORB ; 70 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 71 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 72 import com.sun.corba.se.spi.orbutil.threadpool.Work; 73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 74 import com.sun.corba.se.spi.transport.CorbaContactInfo; 75 import com.sun.corba.se.spi.transport.CorbaConnection; 76 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 77 import com.sun.corba.se.spi.transport.ReadTimeouts; 78 79 import com.sun.corba.se.impl.encoding.CachedCodeBase; 80 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0; 81 import com.sun.corba.se.impl.encoding.CDROutputObject; 82 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0; 83 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo; 84 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry; 85 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 86 import com.sun.corba.se.impl.orbutil.ORBConstants; 87 import com.sun.corba.se.impl.orbutil.ORBUtility; 88 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 89 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; 90 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl; 91 92 /** 93 * @author Harold Carr 94 */ 95 public class SocketOrChannelConnectionImpl 96 extends 97 EventHandlerBase 98 implements 99 CorbaConnection, 100 Work 101 { 102 public static boolean dprintWriteLocks = false; 103 104 // 105 // New transport. 106 // 107 108 protected long enqueueTime; 109 110 protected SocketChannel socketChannel; 111 public SocketChannel getSocketChannel() 112 { 113 return socketChannel; 114 } 115 116 // REVISIT: 117 // protected for test: genericRPCMSGFramework.IIOPConnection constructor. 118 protected CorbaContactInfo contactInfo; 119 protected Acceptor acceptor; 120 protected ConnectionCache connectionCache; 121 122 // 123 // From iiop.Connection.java 124 // 125 126 protected Socket socket; // The socket used for this connection. 127 protected long timeStamp = 0; 128 protected boolean isServer = false; 129 130 // Start at some value other than zero since this is a magic 131 // value in some protocols. 132 protected int requestId = 5; 133 protected CorbaResponseWaitingRoom responseWaitingRoom; 134 protected int state; 135 protected java.lang.Object stateEvent = new java.lang.Object(); 136 protected java.lang.Object writeEvent = new java.lang.Object(); 137 protected boolean writeLocked; 138 protected int serverRequestCount = 0; 139 140 // Server request map: used on the server side of Connection 141 // Maps request ID to IIOPInputStream. 142 Map serverRequestMap = null; 143 144 // This is a flag associated per connection telling us if the 145 // initial set of sending contexts were sent to the receiver 146 // already... 147 protected boolean postInitialContexts = false; 148 149 // Remote reference to CodeBase server (supplies 150 // FullValueDescription, among other things) 151 protected IOR codeBaseServerIOR; 152 153 // CodeBase cache for this connection. This will cache remote operations, 154 // handle connecting, and ensure we don't do any remote operations until 155 // necessary. 156 protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this); 157 158 protected ORBUtilSystemException wrapper ; 159 160 // transport read timeout values 161 protected ReadTimeouts readTimeouts; 162 163 protected boolean shouldReadGiopHeaderOnly; 164 165 // A message mediator used when shouldReadGiopHeaderOnly is 166 // true to maintain request message state across execution in a 167 // SelectorThread and WorkerThread. 168 protected CorbaMessageMediator partialMessageMediator = null; 169 170 // Used in genericRPCMSGFramework test. 171 protected SocketOrChannelConnectionImpl(ORB orb) 172 { 173 this.orb = orb; 174 wrapper = ORBUtilSystemException.get( orb, 175 CORBALogDomains.RPC_TRANSPORT ) ; 176 177 setWork(this); 178 responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this); 179 setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts()); 180 } 181 182 // Both client and servers. 183 protected SocketOrChannelConnectionImpl(ORB orb, 184 boolean useSelectThreadToWait, 185 boolean useWorkerThread) 186 { 187 this(orb) ; 188 setUseSelectThreadToWait(useSelectThreadToWait); 189 setUseWorkerThreadForEvent(useWorkerThread); 190 } 191 192 // Client constructor. 193 public SocketOrChannelConnectionImpl(ORB orb, 194 CorbaContactInfo contactInfo, 195 boolean useSelectThreadToWait, 196 boolean useWorkerThread, 197 String socketType, 198 String hostname, 199 int port) 200 { 201 this(orb, useSelectThreadToWait, useWorkerThread); 202 203 this.contactInfo = contactInfo; 204 205 try { 206 socket = orb.getORBData().getSocketFactory() 207 .createSocket(socketType, 208 new InetSocketAddress(hostname, port)); 209 socketChannel = socket.getChannel(); 210 211 if (socketChannel != null) { 212 boolean isBlocking = !useSelectThreadToWait; 213 socketChannel.configureBlocking(isBlocking); 214 } else { 215 // IMPORTANT: non-channel-backed sockets must use 216 // dedicated reader threads. 217 setUseSelectThreadToWait(false); 218 } 219 if (orb.transportDebugFlag) { 220 dprint(".initialize: connection created: " + socket); 221 } 222 } catch (Throwable t) { 223 throw wrapper.connectFailure(t, socketType, hostname, 224 Integer.toString(port)); 225 } 226 state = OPENING; 227 } 228 229 // Client-side convenience. 230 public SocketOrChannelConnectionImpl(ORB orb, 231 CorbaContactInfo contactInfo, 232 String socketType, 233 String hostname, 234 int port) 235 { 236 this(orb, contactInfo, 237 orb.getORBData().connectionSocketUseSelectThreadToWait(), 238 orb.getORBData().connectionSocketUseWorkerThreadForEvent(), 239 socketType, hostname, port); 240 } 241 242 // Server-side constructor. 243 public SocketOrChannelConnectionImpl(ORB orb, 244 Acceptor acceptor, 245 Socket socket, 246 boolean useSelectThreadToWait, 247 boolean useWorkerThread) 248 { 249 this(orb, useSelectThreadToWait, useWorkerThread); 250 251 this.socket = socket; 252 socketChannel = socket.getChannel(); 253 if (socketChannel != null) { 254 // REVISIT 255 try { 256 boolean isBlocking = !useSelectThreadToWait; 257 socketChannel.configureBlocking(isBlocking); 258 } catch (IOException e) { 259 RuntimeException rte = new RuntimeException(); 260 rte.initCause(e); 261 throw rte; 262 } 263 } 264 this.acceptor = acceptor; 265 266 serverRequestMap = Collections.synchronizedMap(new HashMap()); 267 isServer = true; 268 269 state = ESTABLISHED; 270 } 271 272 // Server-side convenience 273 public SocketOrChannelConnectionImpl(ORB orb, 274 Acceptor acceptor, 275 Socket socket) 276 { 277 this(orb, acceptor, socket, 278 (socket.getChannel() == null 279 ? false 280 : orb.getORBData().connectionSocketUseSelectThreadToWait()), 281 (socket.getChannel() == null 282 ? false 283 : orb.getORBData().connectionSocketUseWorkerThreadForEvent())); 284 } 285 286 //////////////////////////////////////////////////// 287 // 288 // framework.transport.Connection 289 // 290 291 public boolean shouldRegisterReadEvent() 292 { 293 return true; 294 } 295 296 public boolean shouldRegisterServerReadEvent() 297 { 298 return true; 299 } 300 301 public boolean read() 302 { 303 try { 304 if (orb.transportDebugFlag) { 305 dprint(".read->: " + this); 306 } 307 CorbaMessageMediator messageMediator = readBits(); 308 if (messageMediator != null) { 309 // Null can happen when client closes stream 310 // causing purgecalls. 311 return dispatch(messageMediator); 312 } 313 return true; 314 } finally { 315 if (orb.transportDebugFlag) { 316 dprint(".read<-: " + this); 317 } 318 } 319 } 320 321 protected CorbaMessageMediator readBits() 322 { 323 try { 324 325 if (orb.transportDebugFlag) { 326 dprint(".readBits->: " + this); 327 } 328 329 MessageMediator messageMediator; 330 // REVISIT - use common factory base class. 331 if (contactInfo != null) { 332 messageMediator = 333 contactInfo.createMessageMediator(orb, this); 334 } else if (acceptor != null) { 335 messageMediator = acceptor.createMessageMediator(orb, this); 336 } else { 337 throw 338 new RuntimeException("SocketOrChannelConnectionImpl.readBits"); 339 } 340 return (CorbaMessageMediator) messageMediator; 341 342 } catch (ThreadDeath td) { 343 if (orb.transportDebugFlag) { 344 dprint(".readBits: " + this + ": ThreadDeath: " + td, td); 345 } 346 try { 347 purgeCalls(wrapper.connectionAbort(td), false, false); 348 } catch (Throwable t) { 349 if (orb.transportDebugFlag) { 350 dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t); 351 } 352 } 353 throw td; 354 } catch (Throwable ex) { 355 if (orb.transportDebugFlag) { 356 dprint(".readBits: " + this + ": Throwable: " + ex, ex); 357 } 358 359 try { 360 if (ex instanceof INTERNAL) { 361 sendMessageError(GIOPVersion.DEFAULT_VERSION); 362 } 363 } catch (IOException e) { 364 if (orb.transportDebugFlag) { 365 dprint(".readBits: " + this + 366 ": sendMessageError: IOException: " + e, e); 367 } 368 } 369 // REVISIT - make sure reader thread is killed. 370 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 371 // Notify anyone waiting. 372 purgeCalls(wrapper.connectionAbort(ex), true, false); 373 // REVISIT 374 //keepRunning = false; 375 // REVISIT - if this is called after purgeCalls then 376 // the state of the socket is ABORT so the writeLock 377 // in close throws an exception. It is ignored but 378 // causes IBM (screen scraping) tests to fail. 379 //close(); 380 } finally { 381 if (orb.transportDebugFlag) { 382 dprint(".readBits<-: " + this); 383 } 384 } 385 return null; 386 } 387 388 protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) 389 { 390 try { 391 392 if (orb.transportDebugFlag) { 393 dprint(".finishReadingBits->: " + this); 394 } 395 396 // REVISIT - use common factory base class. 397 if (contactInfo != null) { 398 messageMediator = 399 contactInfo.finishCreatingMessageMediator(orb, this, messageMediator); 400 } else if (acceptor != null) { 401 messageMediator = 402 acceptor.finishCreatingMessageMediator(orb, this, messageMediator); 403 } else { 404 throw 405 new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits"); 406 } 407 return (CorbaMessageMediator) messageMediator; 408 409 } catch (ThreadDeath td) { 410 if (orb.transportDebugFlag) { 411 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td); 412 } 413 try { 414 purgeCalls(wrapper.connectionAbort(td), false, false); 415 } catch (Throwable t) { 416 if (orb.transportDebugFlag) { 417 dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t); 418 } 419 } 420 throw td; 421 } catch (Throwable ex) { 422 if (orb.transportDebugFlag) { 423 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex); 424 } 425 426 try { 427 if (ex instanceof INTERNAL) { 428 sendMessageError(GIOPVersion.DEFAULT_VERSION); 429 } 430 } catch (IOException e) { 431 if (orb.transportDebugFlag) { 432 dprint(".finishReadingBits: " + this + 433 ": sendMessageError: IOException: " + e, e); 434 } 435 } 436 // REVISIT - make sure reader thread is killed. 437 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 438 // Notify anyone waiting. 439 purgeCalls(wrapper.connectionAbort(ex), true, false); 440 // REVISIT 441 //keepRunning = false; 442 // REVISIT - if this is called after purgeCalls then 443 // the state of the socket is ABORT so the writeLock 444 // in close throws an exception. It is ignored but 445 // causes IBM (screen scraping) tests to fail. 446 //close(); 447 } finally { 448 if (orb.transportDebugFlag) { 449 dprint(".finishReadingBits<-: " + this); 450 } 451 } 452 return null; 453 } 454 455 protected boolean dispatch(CorbaMessageMediator messageMediator) 456 { 457 try { 458 if (orb.transportDebugFlag) { 459 dprint(".dispatch->: " + this); 460 } 461 462 // 463 // NOTE: 464 // 465 // This call is the transition from the tranport block 466 // to the protocol block. 467 // 468 469 boolean result = 470 messageMediator.getProtocolHandler() 471 .handleRequest(messageMediator); 472 473 return result; 474 475 } catch (ThreadDeath td) { 476 if (orb.transportDebugFlag) { 477 dprint(".dispatch: ThreadDeath", td ); 478 } 479 try { 480 purgeCalls(wrapper.connectionAbort(td), false, false); 481 } catch (Throwable t) { 482 if (orb.transportDebugFlag) { 483 dprint(".dispatch: purgeCalls: Throwable", t); 484 } 485 } 486 throw td; 487 } catch (Throwable ex) { 488 if (orb.transportDebugFlag) { 489 dprint(".dispatch: Throwable", ex ) ; 490 } 491 492 try { 493 if (ex instanceof INTERNAL) { 494 sendMessageError(GIOPVersion.DEFAULT_VERSION); 495 } 496 } catch (IOException e) { 497 if (orb.transportDebugFlag) { 498 dprint(".dispatch: sendMessageError: IOException", e); 499 } 500 } 501 purgeCalls(wrapper.connectionAbort(ex), false, false); 502 // REVISIT 503 //keepRunning = false; 504 } finally { 505 if (orb.transportDebugFlag) { 506 dprint(".dispatch<-: " + this); 507 } 508 } 509 510 return true; 511 } 512 513 public boolean shouldUseDirectByteBuffers() 514 { 515 return getSocketChannel() != null; 516 } 517 518 public ByteBuffer read(int size, int offset, int length, long max_wait_time) 519 throws IOException 520 { 521 if (shouldUseDirectByteBuffers()) { 522 523 ByteBuffer byteBuffer = 524 orb.getByteBufferPool().getByteBuffer(size); 525 526 if (orb.transportDebugFlag) { 527 // print address of ByteBuffer gotten from pool 528 int bbAddress = System.identityHashCode(byteBuffer); 529 StringBuffer sb = new StringBuffer(80); 530 sb.append(".read: got ByteBuffer id ("); 531 sb.append(bbAddress).append(") from ByteBufferPool."); 532 String msgStr = sb.toString(); 533 dprint(msgStr); 534 } 535 536 byteBuffer.position(offset); 537 byteBuffer.limit(size); 538 539 readFully(byteBuffer, length, max_wait_time); 540 541 return byteBuffer; 542 } 543 544 byte[] buf = new byte[size]; 545 readFully(getSocket().getInputStream(), buf, 546 offset, length, max_wait_time); 547 ByteBuffer byteBuffer = ByteBuffer.wrap(buf); 548 byteBuffer.limit(size); 549 return byteBuffer; 550 } 551 552 public ByteBuffer read(ByteBuffer byteBuffer, int offset, 553 int length, long max_wait_time) 554 throws IOException 555 { 556 int size = offset + length; 557 if (shouldUseDirectByteBuffers()) { 558 559 if (! byteBuffer.isDirect()) { 560 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 561 } 562 if (size > byteBuffer.capacity()) { 563 if (orb.transportDebugFlag) { 564 // print address of ByteBuffer being released 565 int bbAddress = System.identityHashCode(byteBuffer); 566 StringBuffer bbsb = new StringBuffer(80); 567 bbsb.append(".read: releasing ByteBuffer id (") 568 .append(bbAddress).append(") to ByteBufferPool."); 569 String bbmsg = bbsb.toString(); 570 dprint(bbmsg); 571 } 572 orb.getByteBufferPool().releaseByteBuffer(byteBuffer); 573 byteBuffer = orb.getByteBufferPool().getByteBuffer(size); 574 } 575 byteBuffer.position(offset); 576 byteBuffer.limit(size); 577 readFully(byteBuffer, length, max_wait_time); 578 byteBuffer.position(0); 579 byteBuffer.limit(size); 580 return byteBuffer; 581 } 582 if (byteBuffer.isDirect()) { 583 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 584 } 585 byte[] buf = new byte[size]; 586 readFully(getSocket().getInputStream(), buf, 587 offset, length, max_wait_time); 588 return ByteBuffer.wrap(buf); 589 } 590 591 public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time) 592 throws IOException 593 { 594 int n = 0; 595 int bytecount = 0; 596 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 597 long total_time_in_wait = 0; 598 599 // The reading of data incorporates a strategy to detect a 600 // rogue client. The strategy is implemented as follows. As 601 // long as data is being read, at least 1 byte or more, we 602 // assume we have a well behaved client. If no data is read, 603 // then we sleep for a time to wait, re-calculate a new time to 604 // wait which is lengthier than the previous time spent waiting. 605 // Then, if the total time spent waiting does not exceed a 606 // maximum time we are willing to wait, we attempt another 607 // read. If the maximum amount of time we are willing to 608 // spend waiting for more data is exceeded, we throw an 609 // IOException. 610 611 // NOTE: Reading of GIOP headers are treated with a smaller 612 // maximum time to wait threshold. Based on extensive 613 // performance testing, all GIOP headers are being 614 // read in 1 read access. 615 616 do { 617 bytecount = getSocketChannel().read(byteBuffer); 618 619 if (bytecount < 0) { 620 throw new IOException("End-of-stream"); 621 } 622 else if (bytecount == 0) { 623 try { 624 Thread.sleep(time_to_wait); 625 total_time_in_wait += time_to_wait; 626 time_to_wait = 627 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 628 } 629 catch (InterruptedException ie) { 630 // ignore exception 631 if (orb.transportDebugFlag) { 632 dprint("readFully(): unexpected exception " 633 + ie.toString()); 634 } 635 } 636 } 637 else { 638 n += bytecount; 639 } 640 } 641 while (n < size && total_time_in_wait < max_wait_time); 642 643 if (n < size && total_time_in_wait >= max_wait_time) 644 { 645 // failed to read entire message 646 throw wrapper.transportReadTimeoutExceeded(new Integer(size), 647 new Integer(n), new Long(max_wait_time), 648 new Long(total_time_in_wait)); 649 } 650 651 getConnectionCache().stampTime(this); 652 } 653 654 // To support non-channel connections. 655 public void readFully(java.io.InputStream is, byte[] buf, 656 int offset, int size, long max_wait_time) 657 throws IOException 658 { 659 int n = 0; 660 int bytecount = 0; 661 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 662 long total_time_in_wait = 0; 663 664 // The reading of data incorporates a strategy to detect a 665 // rogue client. The strategy is implemented as follows. As 666 // long as data is being read, at least 1 byte or more, we 667 // assume we have a well behaved client. If no data is read, 668 // then we sleep for a time to wait, re-calculate a new time to 669 // wait which is lengthier than the previous time spent waiting. 670 // Then, if the total time spent waiting does not exceed a 671 // maximum time we are willing to wait, we attempt another 672 // read. If the maximum amount of time we are willing to 673 // spend waiting for more data is exceeded, we throw an 674 // IOException. 675 676 // NOTE: Reading of GIOP headers are treated with a smaller 677 // maximum time to wait threshold. Based on extensive 678 // performance testing, all GIOP headers are being 679 // read in 1 read access. 680 681 do { 682 bytecount = is.read(buf, offset + n, size - n); 683 if (bytecount < 0) { 684 throw new IOException("End-of-stream"); 685 } 686 else if (bytecount == 0) { 687 try { 688 Thread.sleep(time_to_wait); 689 total_time_in_wait += time_to_wait; 690 time_to_wait = 691 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 692 } 693 catch (InterruptedException ie) { 694 // ignore exception 695 if (orb.transportDebugFlag) { 696 dprint("readFully(): unexpected exception " 697 + ie.toString()); 698 } 699 } 700 } 701 else { 702 n += bytecount; 703 } 704 } 705 while (n < size && total_time_in_wait < max_wait_time); 706 707 if (n < size && total_time_in_wait >= max_wait_time) 708 { 709 // failed to read entire message 710 throw wrapper.transportReadTimeoutExceeded(new Integer(size), 711 new Integer(n), new Long(max_wait_time), 712 new Long(total_time_in_wait)); 713 } 714 715 getConnectionCache().stampTime(this); 716 } 717 718 public void write(ByteBuffer byteBuffer) 719 throws IOException 720 { 721 if (shouldUseDirectByteBuffers()) { 722 /* NOTE: cannot perform this test. If one ask for a 723 ByteBuffer from the pool which is bigger than the size 724 of ByteBuffers managed by the pool, then the pool will 725 return a HeapByteBuffer. 726 if (byteBuffer.hasArray()) { 727 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 728 } 729 */ 730 // IMPORTANT: For non-blocking SocketChannels, there's no guarantee 731 // all bytes are written on first write attempt. 732 do { 733 getSocketChannel().write(byteBuffer); 734 } 735 while (byteBuffer.hasRemaining()); 736 737 } else { 738 if (! byteBuffer.hasArray()) { 739 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 740 } 741 byte[] tmpBuf = byteBuffer.array(); 742 getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit()); 743 getSocket().getOutputStream().flush(); 744 } 745 746 // TimeStamp connection to indicate it has been used 747 // Note granularity of connection usage is assumed for 748 // now to be that of a IIOP packet. 749 getConnectionCache().stampTime(this); 750 } 751 752 /** 753 * Note:it is possible for this to be called more than once 754 */ 755 public synchronized void close() 756 { 757 try { 758 if (orb.transportDebugFlag) { 759 dprint(".close->: " + this); 760 } 761 writeLock(); 762 763 // REVISIT It will be good to have a read lock on the reader thread 764 // before we proceed further, to avoid the reader thread (server side) 765 // from processing requests. This avoids the risk that a new request 766 // will be accepted by ReaderThread while the ListenerThread is 767 // attempting to close this connection. 768 769 if (isBusy()) { // we are busy! 770 writeUnlock(); 771 if (orb.transportDebugFlag) { 772 dprint(".close: isBusy so no close: " + this); 773 } 774 return; 775 } 776 777 try { 778 try { 779 sendCloseConnection(GIOPVersion.V1_0); 780 } catch (Throwable t) { 781 wrapper.exceptionWhenSendingCloseConnection(t); 782 } 783 784 synchronized ( stateEvent ){ 785 state = CLOSE_SENT; 786 stateEvent.notifyAll(); 787 } 788 789 // stop the reader without causing it to do purgeCalls 790 //Exception ex = new Exception(); 791 //reader.stop(ex); // REVISIT 792 793 // NOTE: !!!!!! 794 // This does writeUnlock(). 795 purgeCalls(wrapper.connectionRebind(), false, true); 796 797 } catch (Exception ex) { 798 if (orb.transportDebugFlag) { 799 dprint(".close: exception: " + this, ex); 800 } 801 } 802 try { 803 Selector selector = orb.getTransportManager().getSelector(0); 804 selector.unregisterForEvent(this); 805 if (socketChannel != null) { 806 socketChannel.close(); 807 } 808 socket.close(); 809 } catch (IOException e) { 810 if (orb.transportDebugFlag) { 811 dprint(".close: " + this, e); 812 } 813 } 814 closeConnectionResources(); 815 } finally { 816 if (orb.transportDebugFlag) { 817 dprint(".close<-: " + this); 818 } 819 } 820 } 821 822 public void closeConnectionResources() { 823 if (orb.transportDebugFlag) { 824 dprint(".closeConnectionResources->: " + this); 825 } 826 Selector selector = orb.getTransportManager().getSelector(0); 827 selector.unregisterForEvent(this); 828 try { 829 if (socketChannel != null) 830 socketChannel.close() ; 831 if (socket != null && !socket.isClosed()) 832 socket.close() ; 833 } catch (IOException e) { 834 if (orb.transportDebugFlag) { 835 dprint( ".closeConnectionResources: " + this, e ) ; 836 } 837 } 838 if (orb.transportDebugFlag) { 839 dprint(".closeConnectionResources<-: " + this); 840 } 841 } 842 843 844 public Acceptor getAcceptor() 845 { 846 return acceptor; 847 } 848 849 public ContactInfo getContactInfo() 850 { 851 return contactInfo; 852 } 853 854 public EventHandler getEventHandler() 855 { 856 return this; 857 } 858 859 public OutputObject createOutputObject(MessageMediator messageMediator) 860 { 861 // REVISIT - remove this method from Connection and all it subclasses. 862 throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called."); 863 } 864 865 // This is used by the GIOPOutputObject in order to 866 // throw the correct error when handling code sets. 867 // Can we determine if we are on the server side by 868 // other means? XREVISIT 869 public boolean isServer() 870 { 871 return isServer; 872 } 873 874 public boolean isBusy() 875 { 876 if (serverRequestCount > 0 || 877 getResponseWaitingRoom().numberRegistered() > 0) 878 { 879 return true; 880 } else { 881 return false; 882 } 883 } 884 885 public long getTimeStamp() 886 { 887 return timeStamp; 888 } 889 890 public void setTimeStamp(long time) 891 { 892 timeStamp = time; 893 } 894 895 public void setState(String stateString) 896 { 897 synchronized (stateEvent) { 898 if (stateString.equals("ESTABLISHED")) { 899 state = ESTABLISHED; 900 stateEvent.notifyAll(); 901 } else { 902 // REVISIT: ASSERT 903 } 904 } 905 } 906 907 /** 908 * Sets the writeLock for this connection. 909 * If the writeLock is already set by someone else, block till the 910 * writeLock is released and can set by us. 911 * IMPORTANT: this connection's lock must be acquired before 912 * setting the writeLock and must be unlocked after setting the writeLock. 913 */ 914 public void writeLock() 915 { 916 try { 917 if (dprintWriteLocks && orb.transportDebugFlag) { 918 dprint(".writeLock->: " + this); 919 } 920 // Keep looping till we can set the writeLock. 921 while ( true ) { 922 int localState = state; 923 switch ( localState ) { 924 925 case OPENING: 926 synchronized (stateEvent) { 927 if (state != OPENING) { 928 // somebody has changed 'state' so be careful 929 break; 930 } 931 try { 932 stateEvent.wait(); 933 } catch (InterruptedException ie) { 934 if (orb.transportDebugFlag) { 935 dprint(".writeLock: OPENING InterruptedException: " + this); 936 } 937 } 938 } 939 // Loop back 940 break; 941 942 case ESTABLISHED: 943 synchronized (writeEvent) { 944 if (!writeLocked) { 945 writeLocked = true; 946 return; 947 } 948 949 try { 950 // do not stay here too long if state != ESTABLISHED 951 // Bug 4752117 952 while (state == ESTABLISHED && writeLocked) { 953 writeEvent.wait(100); 954 } 955 } catch (InterruptedException ie) { 956 if (orb.transportDebugFlag) { 957 dprint(".writeLock: ESTABLISHED InterruptedException: " + this); 958 } 959 } 960 } 961 // Loop back 962 break; 963 964 // 965 // XXX 966 // Need to distinguish between client and server roles 967 // here probably. 968 // 969 case ABORT: 970 synchronized ( stateEvent ){ 971 if (state != ABORT) { 972 break; 973 } 974 throw wrapper.writeErrorSend() ; 975 } 976 977 case CLOSE_RECVD: 978 // the connection has been closed or closing 979 // ==> throw rebind exception 980 synchronized ( stateEvent ){ 981 if (state != CLOSE_RECVD) { 982 break; 983 } 984 throw wrapper.connectionCloseRebind() ; 985 } 986 987 default: 988 if (orb.transportDebugFlag) { 989 dprint(".writeLock: default: " + this); 990 } 991 // REVISIT 992 throw new RuntimeException(".writeLock: bad state"); 993 } 994 } 995 } finally { 996 if (dprintWriteLocks && orb.transportDebugFlag) { 997 dprint(".writeLock<-: " + this); 998 } 999 } 1000 } 1001 1002 public void writeUnlock() 1003 { 1004 try { 1005 if (dprintWriteLocks && orb.transportDebugFlag) { 1006 dprint(".writeUnlock->: " + this); 1007 } 1008 synchronized (writeEvent) { 1009 writeLocked = false; 1010 writeEvent.notify(); // wake up one guy waiting to write 1011 } 1012 } finally { 1013 if (dprintWriteLocks && orb.transportDebugFlag) { 1014 dprint(".writeUnlock<-: " + this); 1015 } 1016 } 1017 } 1018 1019 // Assumes the caller handles writeLock and writeUnlock 1020 public void sendWithoutLock(OutputObject outputObject) 1021 { 1022 // Don't we need to check for CloseConnection 1023 // here? REVISIT 1024 1025 // XREVISIT - Shouldn't the MessageMediator 1026 // be the one to handle writing the data here? 1027 1028 try { 1029 1030 // Write the fragment/message 1031 1032 CDROutputObject cdrOutputObject = (CDROutputObject) outputObject; 1033 cdrOutputObject.writeTo(this); 1034 // REVISIT - no flush? 1035 //socket.getOutputStream().flush(); 1036 1037 } catch (IOException e1) { 1038 1039 /* 1040 * ADDED(Ram J) 10/13/2000 In the event of an IOException, try 1041 * sending a CancelRequest for regular requests / locate requests 1042 */ 1043 1044 // Since IIOPOutputStream's msgheader is set only once, and not 1045 // altered during sending multiple fragments, the original 1046 // msgheader will always have the requestId. 1047 // REVISIT This could be optimized to send a CancelRequest only 1048 // if any fragments had been sent already. 1049 1050 /* REVISIT: MOVE TO SUBCONTRACT 1051 Message msg = os.getMessage(); 1052 if (msg.getType() == Message.GIOPRequest || 1053 msg.getType() == Message.GIOPLocateRequest) { 1054 GIOPVersion requestVersion = msg.getGIOPVersion(); 1055 int requestId = MessageBase.getRequestId(msg); 1056 try { 1057 sendCancelRequest(requestVersion, requestId); 1058 } catch (IOException e2) { 1059 // most likely an abortive connection closure. 1060 // ignore, since nothing more can be done. 1061 if (orb.transportDebugFlag) { 1062 1063 } 1064 } 1065 */ 1066 1067 // REVISIT When a send failure happens, purgeCalls() need to be 1068 // called to ensure that the connection is properly removed from 1069 // further usage (ie., cancelling pending requests with COMM_FAILURE 1070 // with an appropriate minor_code CompletionStatus.MAY_BE). 1071 1072 // Relying on the IIOPOutputStream (as noted below) is not 1073 // sufficient as it handles COMM_FAILURE only for the final 1074 // fragment (during invoke processing). Note that COMM_FAILURE could 1075 // happen while sending the initial fragments. 1076 // Also the IIOPOutputStream does not properly close the connection. 1077 // It simply removes the connection from the table. An orderly 1078 // closure is needed (ie., cancel pending requests on the connection 1079 // COMM_FAILURE as well. 1080 1081 // IIOPOutputStream will cleanup the connection info when it 1082 // sees this exception. 1083 SystemException exc = wrapper.writeErrorSend(e1); 1084 purgeCalls(exc, false, true); 1085 throw exc; 1086 } 1087 } 1088 1089 public void registerWaiter(MessageMediator messageMediator) 1090 { 1091 responseWaitingRoom.registerWaiter(messageMediator); 1092 } 1093 1094 public void unregisterWaiter(MessageMediator messageMediator) 1095 { 1096 responseWaitingRoom.unregisterWaiter(messageMediator); 1097 } 1098 1099 public InputObject waitForResponse(MessageMediator messageMediator) 1100 { 1101 return responseWaitingRoom.waitForResponse(messageMediator); 1102 } 1103 1104 public void setConnectionCache(ConnectionCache connectionCache) 1105 { 1106 this.connectionCache = connectionCache; 1107 } 1108 1109 public ConnectionCache getConnectionCache() 1110 { 1111 return connectionCache; 1112 } 1113 1114 //////////////////////////////////////////////////// 1115 // 1116 // EventHandler methods 1117 // 1118 1119 public void setUseSelectThreadToWait(boolean x) 1120 { 1121 useSelectThreadToWait = x; 1122 // REVISIT - Reading of a GIOP header only is information 1123 // that should be passed into the constructor 1124 // from the SocketOrChannelConnection factory. 1125 setReadGiopHeaderOnly(shouldUseSelectThreadToWait()); 1126 } 1127 1128 public void handleEvent() 1129 { 1130 if (orb.transportDebugFlag) { 1131 dprint(".handleEvent->: " + this); 1132 } 1133 getSelectionKey().interestOps(getSelectionKey().interestOps() & 1134 (~ getInterestOps())); 1135 1136 if (shouldUseWorkerThreadForEvent()) { 1137 Throwable throwable = null; 1138 try { 1139 int poolToUse = 0; 1140 if (shouldReadGiopHeaderOnly()) { 1141 partialMessageMediator = readBits(); 1142 poolToUse = 1143 partialMessageMediator.getThreadPoolToUse(); 1144 } 1145 1146 if (orb.transportDebugFlag) { 1147 dprint(".handleEvent: addWork to pool: " + poolToUse); 1148 } 1149 orb.getThreadPoolManager().getThreadPool(poolToUse) 1150 .getWorkQueue(0).addWork(getWork()); 1151 } catch (NoSuchThreadPoolException e) { 1152 throwable = e; 1153 } catch (NoSuchWorkQueueException e) { 1154 throwable = e; 1155 } 1156 // REVISIT: need to close connection. 1157 if (throwable != null) { 1158 if (orb.transportDebugFlag) { 1159 dprint(".handleEvent: " + throwable); 1160 } 1161 INTERNAL i = new INTERNAL("NoSuchThreadPoolException"); 1162 i.initCause(throwable); 1163 throw i; 1164 } 1165 } else { 1166 if (orb.transportDebugFlag) { 1167 dprint(".handleEvent: doWork"); 1168 } 1169 getWork().doWork(); 1170 } 1171 if (orb.transportDebugFlag) { 1172 dprint(".handleEvent<-: " + this); 1173 } 1174 } 1175 1176 public SelectableChannel getChannel() 1177 { 1178 return socketChannel; 1179 } 1180 1181 public int getInterestOps() 1182 { 1183 return SelectionKey.OP_READ; 1184 } 1185 1186 // public Acceptor getAcceptor() - already defined above. 1187 1188 public Connection getConnection() 1189 { 1190 return this; 1191 } 1192 1193 //////////////////////////////////////////////////// 1194 // 1195 // Work methods. 1196 // 1197 1198 public String getName() 1199 { 1200 return this.toString(); 1201 } 1202 1203 public void doWork() 1204 { 1205 try { 1206 if (orb.transportDebugFlag) { 1207 dprint(".doWork->: " + this); 1208 } 1209 1210 // IMPORTANT: Sanity checks on SelectionKeys such as 1211 // SelectorKey.isValid() should not be done 1212 // here. 1213 // 1214 1215 if (!shouldReadGiopHeaderOnly()) { 1216 read(); 1217 } 1218 else { 1219 // get the partialMessageMediator 1220 // created by SelectorThread 1221 CorbaMessageMediator messageMediator = 1222 this.getPartialMessageMediator(); 1223 1224 // read remaining info needed in a MessageMediator 1225 messageMediator = finishReadingBits(messageMediator); 1226 1227 if (messageMediator != null) { 1228 // Null can happen when client closes stream 1229 // causing purgecalls. 1230 dispatch(messageMediator); 1231 } 1232 } 1233 } catch (Throwable t) { 1234 if (orb.transportDebugFlag) { 1235 dprint(".doWork: ignoring Throwable: " 1236 + t 1237 + " " + this); 1238 } 1239 } finally { 1240 if (orb.transportDebugFlag) { 1241 dprint(".doWork<-: " + this); 1242 } 1243 } 1244 } 1245 1246 public void setEnqueueTime(long timeInMillis) 1247 { 1248 enqueueTime = timeInMillis; 1249 } 1250 1251 public long getEnqueueTime() 1252 { 1253 return enqueueTime; 1254 } 1255 1256 //////////////////////////////////////////////////// 1257 // 1258 // spi.transport.CorbaConnection. 1259 // 1260 1261 // IMPORTANT: Reader Threads must NOT read Giop header only. 1262 public boolean shouldReadGiopHeaderOnly() { 1263 return shouldReadGiopHeaderOnly; 1264 } 1265 1266 protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) { 1267 shouldReadGiopHeaderOnly = shouldReadHeaderOnly; 1268 } 1269 1270 public ResponseWaitingRoom getResponseWaitingRoom() 1271 { 1272 return responseWaitingRoom; 1273 } 1274 1275 // REVISIT - inteface defines isServer but already defined in 1276 // higher interface. 1277 1278 public void serverRequestMapPut(int requestId, 1279 CorbaMessageMediator messageMediator) 1280 { 1281 serverRequestMap.put(new Integer(requestId), messageMediator); 1282 } 1283 1284 public CorbaMessageMediator serverRequestMapGet(int requestId) 1285 { 1286 return (CorbaMessageMediator) 1287 serverRequestMap.get(new Integer(requestId)); 1288 } 1289 1290 public void serverRequestMapRemove(int requestId) 1291 { 1292 serverRequestMap.remove(new Integer(requestId)); 1293 } 1294 1295 1296 // REVISIT: this is also defined in: 1297 // com.sun.corba.se.spi.legacy.connection.Connection 1298 public java.net.Socket getSocket() 1299 { 1300 return socket; 1301 } 1302 1303 /** It is possible for a Close Connection to have been 1304 ** sent here, but we will not check for this. A "lazy" 1305 ** Exception will be thrown in the Worker thread after the 1306 ** incoming request has been processed even though the connection 1307 ** is closed before the request is processed. This is o.k because 1308 ** it is a boundary condition. To prevent it we would have to add 1309 ** more locks which would reduce performance in the normal case. 1310 **/ 1311 public synchronized void serverRequestProcessingBegins() 1312 { 1313 serverRequestCount++; 1314 } 1315 1316 public synchronized void serverRequestProcessingEnds() 1317 { 1318 serverRequestCount--; 1319 } 1320 1321 // 1322 // 1323 // 1324 1325 public synchronized int getNextRequestId() 1326 { 1327 return requestId++; 1328 } 1329 1330 // Negotiated code sets for char and wchar data 1331 protected CodeSetComponentInfo.CodeSetContext codeSetContext = null; 1332 1333 public ORB getBroker() 1334 { 1335 return orb; 1336 } 1337 1338 public CodeSetComponentInfo.CodeSetContext getCodeSetContext() { 1339 // Needs to be synchronized for the following case when the client 1340 // doesn't send the code set context twice, and we have two threads 1341 // in ServerRequestDispatcher processCodeSetContext. 1342 // 1343 // Thread A checks to see if there is a context, there is none, so 1344 // it calls setCodeSetContext, getting the synch lock. 1345 // Thread B checks to see if there is a context. If we didn't synch, 1346 // it might decide to outlaw wchar/wstring. 1347 if (codeSetContext == null) { 1348 synchronized(this) { 1349 return codeSetContext; 1350 } 1351 } 1352 1353 return codeSetContext; 1354 } 1355 1356 public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) { 1357 // Double check whether or not we need to do this 1358 if (codeSetContext == null) { 1359 1360 if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null || 1361 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) { 1362 // If the client says it's negotiated a code set that 1363 // isn't a fallback and we never said we support, then 1364 // it has a bug. 1365 throw wrapper.badCodesetsFromClient() ; 1366 } 1367 1368 codeSetContext = csc; 1369 } 1370 } 1371 1372 // 1373 // from iiop.IIOPConnection.java 1374 // 1375 1376 // Map request ID to an InputObject. 1377 // This is so the client thread can start unmarshaling 1378 // the reply and remove it from the out_calls map while the 1379 // ReaderThread can still obtain the input stream to give 1380 // new fragments. Only the ReaderThread touches the clientReplyMap, 1381 // so it doesn't incur synchronization overhead. 1382 1383 public MessageMediator clientRequestMapGet(int requestId) 1384 { 1385 return responseWaitingRoom.getMessageMediator(requestId); 1386 } 1387 1388 protected MessageMediator clientReply_1_1; 1389 1390 public void clientReply_1_1_Put(MessageMediator x) 1391 { 1392 clientReply_1_1 = x; 1393 } 1394 1395 public MessageMediator clientReply_1_1_Get() 1396 { 1397 return clientReply_1_1; 1398 } 1399 1400 public void clientReply_1_1_Remove() 1401 { 1402 clientReply_1_1 = null; 1403 } 1404 1405 protected MessageMediator serverRequest_1_1; 1406 1407 public void serverRequest_1_1_Put(MessageMediator x) 1408 { 1409 serverRequest_1_1 = x; 1410 } 1411 1412 public MessageMediator serverRequest_1_1_Get() 1413 { 1414 return serverRequest_1_1; 1415 } 1416 1417 public void serverRequest_1_1_Remove() 1418 { 1419 serverRequest_1_1 = null; 1420 } 1421 1422 protected String getStateString( int state ) 1423 { 1424 synchronized ( stateEvent ){ 1425 switch (state) { 1426 case OPENING : return "OPENING" ; 1427 case ESTABLISHED : return "ESTABLISHED" ; 1428 case CLOSE_SENT : return "CLOSE_SENT" ; 1429 case CLOSE_RECVD : return "CLOSE_RECVD" ; 1430 case ABORT : return "ABORT" ; 1431 default : return "???" ; 1432 } 1433 } 1434 } 1435 1436 public synchronized boolean isPostInitialContexts() { 1437 return postInitialContexts; 1438 } 1439 1440 // Can never be unset... 1441 public synchronized void setPostInitialContexts(){ 1442 postInitialContexts = true; 1443 } 1444 1445 /** 1446 * Wake up the outstanding requests on the connection, and hand them 1447 * COMM_FAILURE exception with a given minor code. 1448 * 1449 * Also, delete connection from connection table and 1450 * stop the reader thread. 1451 1452 * Note that this should only ever be called by the Reader thread for 1453 * this connection. 1454 * 1455 * @param minor_code The minor code for the COMM_FAILURE major code. 1456 * @param die Kill the reader thread (this thread) before exiting. 1457 */ 1458 public void purgeCalls(SystemException systemException, 1459 boolean die, boolean lockHeld) 1460 { 1461 int minor_code = systemException.minor; 1462 1463 try{ 1464 if (orb.transportDebugFlag) { 1465 dprint(".purgeCalls->: " 1466 + minor_code + "/" + die + "/" + lockHeld 1467 + " " + this); 1468 } 1469 1470 // If this invocation is a result of ThreadDeath caused 1471 // by a previous execution of this routine, just exit. 1472 1473 synchronized ( stateEvent ){ 1474 if ((state == ABORT) || (state == CLOSE_RECVD)) { 1475 if (orb.transportDebugFlag) { 1476 dprint(".purgeCalls: exiting since state is: " 1477 + getStateString(state) 1478 + " " + this); 1479 } 1480 return; 1481 } 1482 } 1483 1484 // Grab the writeLock (freeze the calls) 1485 try { 1486 if (!lockHeld) { 1487 writeLock(); 1488 } 1489 } catch (SystemException ex) { 1490 if (orb.transportDebugFlag) 1491 dprint(".purgeCalls: SystemException" + ex 1492 + "; continuing " + this); 1493 } 1494 1495 // Mark the state of the connection 1496 // and determine the request status 1497 org.omg.CORBA.CompletionStatus completion_status; 1498 synchronized ( stateEvent ){ 1499 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) { 1500 state = CLOSE_RECVD; 1501 systemException.completed = CompletionStatus.COMPLETED_NO; 1502 } else { 1503 state = ABORT; 1504 systemException.completed = CompletionStatus.COMPLETED_MAYBE; 1505 } 1506 stateEvent.notifyAll(); 1507 } 1508 1509 try { 1510 socket.getInputStream().close(); 1511 socket.getOutputStream().close(); 1512 socket.close(); 1513 } catch (Exception ex) { 1514 if (orb.transportDebugFlag) { 1515 dprint(".purgeCalls: Exception closing socket: " + ex 1516 + " " + this); 1517 } 1518 } 1519 1520 // Signal all threads with outstanding requests on this 1521 // connection and give them the SystemException; 1522 1523 responseWaitingRoom.signalExceptionToAllWaiters(systemException); 1524 } finally { 1525 if (contactInfo != null) { 1526 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo); 1527 } else if (acceptor != null) { 1528 ((InboundConnectionCache)getConnectionCache()).remove(this); 1529 } 1530 1531 // 1532 // REVISIT: Stop the reader thread 1533 // 1534 1535 // Signal all the waiters of the writeLock. 1536 // There are 4 types of writeLock waiters: 1537 // 1. Send waiters: 1538 // 2. SendReply waiters: 1539 // 3. cleanUp waiters: 1540 // 4. purge_call waiters: 1541 // 1542 1543 writeUnlock(); 1544 1545 if (orb.transportDebugFlag) { 1546 dprint(".purgeCalls<-: " 1547 + minor_code + "/" + die + "/" + lockHeld 1548 + " " + this); 1549 } 1550 } 1551 } 1552 1553 /************************************************************************* 1554 * The following methods are for dealing with Connection cleaning for 1555 * better scalability of servers in high network load conditions. 1556 **************************************************************************/ 1557 1558 public void sendCloseConnection(GIOPVersion giopVersion) 1559 throws IOException 1560 { 1561 Message msg = MessageBase.createCloseConnection(giopVersion); 1562 sendHelper(giopVersion, msg); 1563 } 1564 1565 public void sendMessageError(GIOPVersion giopVersion) 1566 throws IOException 1567 { 1568 Message msg = MessageBase.createMessageError(giopVersion); 1569 sendHelper(giopVersion, msg); 1570 } 1571 1572 /** 1573 * Send a CancelRequest message. This does not lock the connection, so the 1574 * caller needs to ensure this method is called appropriately. 1575 * @exception IOException - could be due to abortive connection closure. 1576 */ 1577 public void sendCancelRequest(GIOPVersion giopVersion, int requestId) 1578 throws IOException 1579 { 1580 1581 Message msg = MessageBase.createCancelRequest(giopVersion, requestId); 1582 sendHelper(giopVersion, msg); 1583 } 1584 1585 protected void sendHelper(GIOPVersion giopVersion, Message msg) 1586 throws IOException 1587 { 1588 // REVISIT: See comments in CDROutputObject constructor. 1589 CDROutputObject outputObject = 1590 new CDROutputObject((ORB)orb, null, giopVersion, this, msg, 1591 ORBConstants.STREAM_FORMAT_VERSION_1); 1592 msg.write(outputObject); 1593 1594 outputObject.writeTo(this); 1595 } 1596 1597 public void sendCancelRequestWithLock(GIOPVersion giopVersion, 1598 int requestId) 1599 throws IOException 1600 { 1601 writeLock(); 1602 try { 1603 sendCancelRequest(giopVersion, requestId); 1604 } finally { 1605 writeUnlock(); 1606 } 1607 } 1608 1609 // Begin Code Base methods --------------------------------------- 1610 // 1611 // Set this connection's code base IOR. The IOR comes from the 1612 // SendingContext. This is an optional service context, but all 1613 // JavaSoft ORBs send it. 1614 // 1615 // The set and get methods don't need to be synchronized since the 1616 // first possible get would occur during reading a valuetype, and 1617 // that would be after the set. 1618 1619 // Sets this connection's code base IOR. This is done after 1620 // getting the IOR out of the SendingContext service context. 1621 // Our ORBs always send this, but it's optional in CORBA. 1622 1623 public final void setCodeBaseIOR(IOR ior) { 1624 codeBaseServerIOR = ior; 1625 } 1626 1627 public final IOR getCodeBaseIOR() { 1628 return codeBaseServerIOR; 1629 } 1630 1631 // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase 1632 // won't connect to the remote codebase unless it's necessary. 1633 public final CodeBase getCodeBase() { 1634 return cachedCodeBase; 1635 } 1636 1637 // End Code Base methods ----------------------------------------- 1638 1639 // set transport read thresholds 1640 protected void setReadTimeouts(ReadTimeouts readTimeouts) { 1641 this.readTimeouts = readTimeouts; 1642 } 1643 1644 protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) { 1645 partialMessageMediator = messageMediator; 1646 } 1647 1648 protected CorbaMessageMediator getPartialMessageMediator() { 1649 return partialMessageMediator; 1650 } 1651 1652 public String toString() 1653 { 1654 synchronized ( stateEvent ){ 1655 return 1656 "SocketOrChannelConnectionImpl[" + " " 1657 + (socketChannel == null ? 1658 socket.toString() : socketChannel.toString()) + " " 1659 + getStateString( state ) + " " 1660 + shouldUseSelectThreadToWait() + " " 1661 + shouldUseWorkerThreadForEvent() + " " 1662 + shouldReadGiopHeaderOnly() 1663 + "]" ; 1664 } 1665 } 1666 1667 // Must be public - used in encoding. 1668 public void dprint(String msg) 1669 { 1670 ORBUtility.dprint("SocketOrChannelConnectionImpl", msg); 1671 } 1672 1673 protected void dprint(String msg, Throwable t) 1674 { 1675 dprint(msg); 1676 t.printStackTrace(System.out); 1677 } 1678 } 1679 1680 // End of file.