1 /* 2 * Copyright (c) 2001, 2004, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.protocol; 27 28 import java.io.ByteArrayOutputStream; 29 import java.io.IOException; 30 import java.io.PrintWriter; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SelectionKey; 33 import java.util.EmptyStackException; 34 import java.util.Iterator; 35 36 import org.omg.CORBA.Any; 37 import org.omg.CORBA.CompletionStatus; 38 import org.omg.CORBA.ExceptionList; 39 import org.omg.CORBA.INTERNAL; 40 import org.omg.CORBA.Principal; 41 import org.omg.CORBA.SystemException; 42 import org.omg.CORBA.TypeCode; 43 import org.omg.CORBA.UnknownUserException; 44 import org.omg.CORBA.UNKNOWN; 45 import org.omg.CORBA.portable.ResponseHandler; 46 import org.omg.CORBA.portable.UnknownException; 47 import org.omg.CORBA_2_3.portable.InputStream; 48 import org.omg.CORBA_2_3.portable.OutputStream; 49 import org.omg.IOP.ExceptionDetailMessage; 50 import org.omg.IOP.TAG_RMI_CUSTOM_MAX_STREAM_FORMAT; 51 52 import com.sun.corba.se.pept.broker.Broker; 53 import com.sun.corba.se.pept.encoding.InputObject; 54 import com.sun.corba.se.pept.encoding.OutputObject; 55 import com.sun.corba.se.pept.protocol.MessageMediator; 56 import com.sun.corba.se.pept.protocol.ProtocolHandler; 57 import com.sun.corba.se.pept.transport.ByteBufferPool; 58 import com.sun.corba.se.pept.transport.Connection; 59 import com.sun.corba.se.pept.transport.ContactInfo; 60 import com.sun.corba.se.pept.transport.EventHandler; 61 62 import com.sun.corba.se.spi.ior.IOR; 63 import com.sun.corba.se.spi.ior.ObjectKey; 64 import com.sun.corba.se.spi.ior.ObjectKeyTemplate; 65 import com.sun.corba.se.spi.ior.iiop.GIOPVersion; 66 import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate; 67 import com.sun.corba.se.spi.ior.iiop.IIOPProfile; 68 import com.sun.corba.se.spi.ior.iiop.MaxStreamFormatVersionComponent; 69 import com.sun.corba.se.spi.oa.OAInvocationInfo; 70 import com.sun.corba.se.spi.oa.ObjectAdapter; 71 import com.sun.corba.se.spi.orb.ORB; 72 import com.sun.corba.se.spi.orb.ORBVersionFactory; 73 import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 74 import com.sun.corba.se.spi.protocol.CorbaProtocolHandler; 75 import com.sun.corba.se.spi.protocol.CorbaServerRequestDispatcher; 76 import com.sun.corba.se.spi.protocol.ForwardException; 77 import com.sun.corba.se.spi.transport.CorbaConnection; 78 import com.sun.corba.se.spi.transport.CorbaContactInfo; 79 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 80 import com.sun.corba.se.spi.logging.CORBALogDomains; 81 82 import com.sun.corba.se.spi.servicecontext.ORBVersionServiceContext; 83 import com.sun.corba.se.spi.servicecontext.ServiceContexts; 84 import com.sun.corba.se.spi.servicecontext.UEInfoServiceContext; 85 import com.sun.corba.se.spi.servicecontext.MaxStreamFormatVersionServiceContext; 86 import com.sun.corba.se.spi.servicecontext.SendingContextServiceContext; 87 import com.sun.corba.se.spi.servicecontext.UnknownServiceContext; 88 89 import com.sun.corba.se.impl.corba.RequestImpl; 90 import com.sun.corba.se.impl.encoding.BufferManagerFactory; 91 import com.sun.corba.se.impl.encoding.BufferManagerReadStream; 92 import com.sun.corba.se.impl.encoding.CDRInputObject; 93 import com.sun.corba.se.impl.encoding.CDROutputObject; 94 import com.sun.corba.se.impl.encoding.EncapsOutputStream; 95 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 96 import com.sun.corba.se.impl.logging.InterceptorsSystemException; 97 import com.sun.corba.se.impl.orbutil.ORBConstants; 98 import com.sun.corba.se.impl.orbutil.ORBUtility; 99 import com.sun.corba.se.impl.ior.iiop.JavaSerializationComponent; 100 import com.sun.corba.se.impl.protocol.AddressingDispositionException; 101 import com.sun.corba.se.impl.protocol.RequestCanceledException; 102 import com.sun.corba.se.impl.protocol.giopmsgheaders.AddressingDispositionHelper; 103 import com.sun.corba.se.impl.protocol.giopmsgheaders.CancelRequestMessage; 104 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_1; 105 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_2; 106 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage; 107 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_0; 108 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_1; 109 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_2; 110 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage; 111 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage; 112 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_0; 113 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_1; 114 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_2; 115 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 116 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; 117 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageHandler; 118 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage; 119 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_0; 120 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_1; 121 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_2; 122 import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage; 123 import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_0 ; 124 import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_1 ; 125 import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_2 ; 126 127 // REVISIT: make sure no memory leaks in client/server request/reply maps. 128 // REVISIT: normalize requestHeader, replyHeader, messageHeader. 129 130 /** 131 * @author Harold Carr 132 */ 133 public class CorbaMessageMediatorImpl 134 implements 135 CorbaMessageMediator, 136 CorbaProtocolHandler, 137 MessageHandler 138 { 139 protected ORB orb; 140 protected ORBUtilSystemException wrapper ; 141 protected InterceptorsSystemException interceptorWrapper ; 142 protected CorbaContactInfo contactInfo; 143 protected CorbaConnection connection; 144 protected short addrDisposition; 145 protected CDROutputObject outputObject; 146 protected CDRInputObject inputObject; 147 protected Message messageHeader; 148 protected RequestMessage requestHeader; 149 protected LocateReplyOrReplyMessage replyHeader; 150 protected String replyExceptionDetailMessage; 151 protected IOR replyIOR; 152 protected Integer requestIdInteger; 153 protected Message dispatchHeader; 154 protected ByteBuffer dispatchByteBuffer; 155 protected byte streamFormatVersion; 156 protected boolean streamFormatVersionSet = false; 157 158 protected org.omg.CORBA.Request diiRequest; 159 160 protected boolean cancelRequestAlreadySent = false; 161 162 protected ProtocolHandler protocolHandler; 163 protected boolean _executeReturnServantInResponseConstructor = false; 164 protected boolean _executeRemoveThreadInfoInResponseConstructor = false; 165 protected boolean _executePIInResponseConstructor = false; 166 167 // 168 // Client-side constructor. 169 // 170 171 public CorbaMessageMediatorImpl(ORB orb, 172 ContactInfo contactInfo, 173 Connection connection, 174 GIOPVersion giopVersion, 175 IOR ior, 176 int requestId, 177 short addrDisposition, 178 String operationName, 179 boolean isOneWay) 180 { 181 this( orb, connection ) ; 182 183 this.contactInfo = (CorbaContactInfo) contactInfo; 184 this.addrDisposition = addrDisposition; 185 186 streamFormatVersion = 187 getStreamFormatVersionForThisRequest( 188 ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(), 189 giopVersion); 190 streamFormatVersionSet = true; 191 192 requestHeader = (RequestMessage) MessageBase.createRequest( 193 this.orb, 194 giopVersion, 195 ORBUtility.getEncodingVersion(orb, ior), 196 requestId, 197 !isOneWay, 198 ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(), 199 this.addrDisposition, 200 operationName, 201 new ServiceContexts(orb), 202 null); 203 } 204 205 // 206 // Acceptor constructor. 207 // 208 209 public CorbaMessageMediatorImpl(ORB orb, 210 Connection connection) 211 { 212 this.orb = orb; 213 this.connection = (CorbaConnection)connection; 214 this.wrapper = ORBUtilSystemException.get( orb, 215 CORBALogDomains.RPC_PROTOCOL ) ; 216 this.interceptorWrapper = InterceptorsSystemException.get( orb, 217 CORBALogDomains.RPC_PROTOCOL ) ; 218 } 219 220 // 221 // Dispatcher constructor. 222 // 223 224 // Note: in some cases (e.g., a reply message) this message 225 // mediator will only be used for dispatch. Then the original 226 // request side mediator will take over. 227 public CorbaMessageMediatorImpl(ORB orb, 228 CorbaConnection connection, 229 Message dispatchHeader, 230 ByteBuffer byteBuffer) 231 { 232 this( orb, connection ) ; 233 this.dispatchHeader = dispatchHeader; 234 this.dispatchByteBuffer = byteBuffer; 235 } 236 237 //////////////////////////////////////////////////// 238 // 239 // MessageMediator 240 // 241 242 public Broker getBroker() 243 { 244 return orb; 245 } 246 247 public ContactInfo getContactInfo() 248 { 249 return contactInfo; 250 } 251 252 public Connection getConnection() 253 { 254 return connection; 255 } 256 257 public void initializeMessage() 258 { 259 getRequestHeader().write(outputObject); 260 } 261 262 public void finishSendingRequest() 263 { 264 // REVISIT: probably move logic in outputObject to here. 265 outputObject.finishSendingMessage(); 266 } 267 268 public InputObject waitForResponse() 269 { 270 if (getRequestHeader().isResponseExpected()) { 271 return connection.waitForResponse(this); 272 } 273 return null; 274 } 275 276 public void setOutputObject(OutputObject outputObject) 277 { 278 this.outputObject = (CDROutputObject) outputObject; 279 } 280 281 public OutputObject getOutputObject() 282 { 283 return outputObject; 284 } 285 286 public void setInputObject(InputObject inputObject) 287 { 288 this.inputObject = (CDRInputObject) inputObject; 289 } 290 291 public InputObject getInputObject() 292 { 293 return inputObject; 294 } 295 296 //////////////////////////////////////////////////// 297 // 298 // CorbaMessageMediator 299 // 300 301 public void setReplyHeader(LocateReplyOrReplyMessage header) 302 { 303 this.replyHeader = header; 304 this.replyIOR = header.getIOR(); // REVISIT - need separate field? 305 } 306 307 public LocateReplyMessage getLocateReplyHeader() 308 { 309 return (LocateReplyMessage) replyHeader; 310 } 311 312 public ReplyMessage getReplyHeader() 313 { 314 return (ReplyMessage) replyHeader; 315 } 316 317 public void setReplyExceptionDetailMessage(String message) 318 { 319 replyExceptionDetailMessage = message; 320 } 321 322 public RequestMessage getRequestHeader() 323 { 324 return requestHeader; 325 } 326 327 public GIOPVersion getGIOPVersion() 328 { 329 if (messageHeader != null) { 330 return messageHeader.getGIOPVersion(); 331 } 332 return getRequestHeader().getGIOPVersion(); 333 } 334 335 public byte getEncodingVersion() { 336 if (messageHeader != null) { 337 return messageHeader.getEncodingVersion(); 338 } 339 return getRequestHeader().getEncodingVersion(); 340 } 341 342 public int getRequestId() 343 { 344 return getRequestHeader().getRequestId(); 345 } 346 347 public Integer getRequestIdInteger() 348 { 349 if (requestIdInteger == null) { 350 requestIdInteger = new Integer(getRequestHeader().getRequestId()); 351 } 352 return requestIdInteger; 353 } 354 355 public boolean isOneWay() 356 { 357 return ! getRequestHeader().isResponseExpected(); 358 } 359 360 public short getAddrDisposition() 361 { 362 return addrDisposition; 363 } 364 365 public String getOperationName() 366 { 367 return getRequestHeader().getOperation(); 368 } 369 370 public ServiceContexts getRequestServiceContexts() 371 { 372 return getRequestHeader().getServiceContexts(); 373 } 374 375 public ServiceContexts getReplyServiceContexts() 376 { 377 return getReplyHeader().getServiceContexts(); 378 } 379 380 public void sendCancelRequestIfFinalFragmentNotSent() 381 { 382 if ((!sentFullMessage()) && sentFragment() && 383 (!cancelRequestAlreadySent)) 384 { 385 try { 386 if (orb.subcontractDebugFlag) { 387 dprint(".sendCancelRequestIfFinalFragmentNotSent->: " 388 + opAndId(this)); 389 } 390 connection.sendCancelRequestWithLock(getGIOPVersion(), 391 getRequestId()); 392 // Case: first a location forward, then a marshaling 393 // exception (e.g., non-serializable object). Only 394 // send cancel once. 395 cancelRequestAlreadySent = true; 396 } catch (IOException e) { 397 if (orb.subcontractDebugFlag) { 398 dprint(".sendCancelRequestIfFinalFragmentNotSent: !ERROR : " + opAndId(this), 399 e); 400 } 401 402 // REVISIT: we could attempt to send a final incomplete 403 // fragment in this case. 404 throw interceptorWrapper.ioexceptionDuringCancelRequest( 405 CompletionStatus.COMPLETED_MAYBE, e ); 406 } finally { 407 if (orb.subcontractDebugFlag) { 408 dprint(".sendCancelRequestIfFinalFragmentNotSent<-: " 409 + opAndId(this)); 410 } 411 } 412 } 413 } 414 415 public boolean sentFullMessage() 416 { 417 return outputObject.getBufferManager().sentFullMessage(); 418 } 419 420 public boolean sentFragment() 421 { 422 return outputObject.getBufferManager().sentFragment(); 423 } 424 425 public void setDIIInfo(org.omg.CORBA.Request diiRequest) 426 { 427 this.diiRequest = diiRequest; 428 } 429 430 public boolean isDIIRequest() 431 { 432 return diiRequest != null; 433 } 434 435 public Exception unmarshalDIIUserException(String repoId, InputStream is) 436 { 437 if (! isDIIRequest()) { 438 return null; 439 } 440 441 ExceptionList _exceptions = diiRequest.exceptions(); 442 443 try { 444 // Find the typecode for the exception 445 for (int i=0; i<_exceptions.count() ; i++) { 446 TypeCode tc = _exceptions.item(i); 447 if ( tc.id().equals(repoId) ) { 448 // Since we dont have the actual user exception 449 // class, the spec says we have to create an 450 // UnknownUserException and put it in the 451 // environment. 452 Any eany = orb.create_any(); 453 eany.read_value(is, (TypeCode)tc); 454 455 return new UnknownUserException(eany); 456 } 457 } 458 } catch (Exception b) { 459 throw wrapper.unexpectedDiiException(b); 460 } 461 462 // must be a truly unknown exception 463 return wrapper.unknownCorbaExc( CompletionStatus.COMPLETED_MAYBE); 464 } 465 466 public void setDIIException(Exception exception) 467 { 468 diiRequest.env().exception(exception); 469 } 470 471 public void handleDIIReply(InputStream inputStream) 472 { 473 if (! isDIIRequest()) { 474 return; 475 } 476 ((RequestImpl)diiRequest).unmarshalReply(inputStream); 477 } 478 479 public Message getDispatchHeader() 480 { 481 return dispatchHeader; 482 } 483 484 public void setDispatchHeader(Message msg) 485 { 486 dispatchHeader = msg; 487 } 488 489 public ByteBuffer getDispatchBuffer() 490 { 491 return dispatchByteBuffer; 492 } 493 494 public void setDispatchBuffer(ByteBuffer byteBuffer) 495 { 496 dispatchByteBuffer = byteBuffer; 497 } 498 499 public int getThreadPoolToUse() { 500 int poolToUse = 0; 501 Message msg = getDispatchHeader(); 502 // A null msg should never happen. But, we'll be 503 // defensive just in case. 504 if (msg != null) { 505 poolToUse = msg.getThreadPoolToUse(); 506 } 507 return poolToUse; 508 } 509 510 public byte getStreamFormatVersion() 511 { 512 // REVISIT: ContactInfo/Acceptor output object factories 513 // just use this. Maybe need to distinguish: 514 // createOutputObjectForRequest 515 // createOutputObjectForReply 516 // then do getStreamFormatVersionForRequest/ForReply here. 517 if (streamFormatVersionSet) { 518 return streamFormatVersion; 519 } 520 return getStreamFormatVersionForReply(); 521 } 522 523 /** 524 * If the RMI-IIOP maximum stream format version service context 525 * is present, it indicates the maximum stream format version we 526 * could use for the reply. If it isn't present, the default is 527 * 2 for GIOP 1.3 or greater, 1 for lower. 528 * 529 * This is only sent on requests. Clients can find out the 530 * server's maximum by looking for a tagged component in the IOR. 531 */ 532 public byte getStreamFormatVersionForReply() { 533 534 // NOTE: The request service contexts may indicate the max. 535 ServiceContexts svc = getRequestServiceContexts(); 536 537 MaxStreamFormatVersionServiceContext msfvsc 538 = (MaxStreamFormatVersionServiceContext)svc.get( 539 MaxStreamFormatVersionServiceContext.SERVICE_CONTEXT_ID); 540 541 if (msfvsc != null) { 542 byte localMaxVersion = ORBUtility.getMaxStreamFormatVersion(); 543 byte remoteMaxVersion = msfvsc.getMaximumStreamFormatVersion(); 544 545 return (byte)Math.min(localMaxVersion, remoteMaxVersion); 546 } else { 547 // Defaults to 1 for GIOP 1.2 or less, 2 for 548 // GIOP 1.3 or higher. 549 if (getGIOPVersion().lessThan(GIOPVersion.V1_3)) 550 return ORBConstants.STREAM_FORMAT_VERSION_1; 551 else 552 return ORBConstants.STREAM_FORMAT_VERSION_2; 553 } 554 } 555 556 public boolean isSystemExceptionReply() 557 { 558 return replyHeader.getReplyStatus() == ReplyMessage.SYSTEM_EXCEPTION; 559 } 560 561 public boolean isUserExceptionReply() 562 { 563 return replyHeader.getReplyStatus() == ReplyMessage.USER_EXCEPTION; 564 } 565 566 public boolean isLocationForwardReply() 567 { 568 return ( (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD) || 569 (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD_PERM) ); 570 //return replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD; 571 } 572 573 public boolean isDifferentAddrDispositionRequestedReply() 574 { 575 return replyHeader.getReplyStatus() == ReplyMessage.NEEDS_ADDRESSING_MODE; 576 } 577 578 public short getAddrDispositionReply() 579 { 580 return replyHeader.getAddrDisposition(); 581 } 582 583 public IOR getForwardedIOR() 584 { 585 return replyHeader.getIOR(); 586 } 587 588 public SystemException getSystemExceptionReply() 589 { 590 return replyHeader.getSystemException(replyExceptionDetailMessage); 591 } 592 593 //////////////////////////////////////////////////// 594 // 595 // Used by server side. 596 // 597 598 public ObjectKey getObjectKey() 599 { 600 return getRequestHeader().getObjectKey(); 601 } 602 603 public void setProtocolHandler(CorbaProtocolHandler protocolHandler) 604 { 605 throw wrapper.methodShouldNotBeCalled() ; 606 } 607 608 public CorbaProtocolHandler getProtocolHandler() 609 { 610 // REVISIT: should look up in orb registry. 611 return this; 612 } 613 614 //////////////////////////////////////////////////// 615 // 616 // ResponseHandler 617 // 618 619 public org.omg.CORBA.portable.OutputStream createReply() 620 { 621 // Note: relies on side-effect of setting mediator output field. 622 // REVISIT - cast - need interface 623 getProtocolHandler().createResponse(this, (ServiceContexts) null); 624 return (OutputStream) getOutputObject(); 625 } 626 627 public org.omg.CORBA.portable.OutputStream createExceptionReply() 628 { 629 // Note: relies on side-effect of setting mediator output field. 630 // REVISIT - cast - need interface 631 getProtocolHandler().createUserExceptionResponse(this, (ServiceContexts) null); 632 return (OutputStream) getOutputObject(); 633 } 634 635 public boolean executeReturnServantInResponseConstructor() 636 { 637 return _executeReturnServantInResponseConstructor; 638 639 } 640 641 public void setExecuteReturnServantInResponseConstructor(boolean b) 642 { 643 _executeReturnServantInResponseConstructor = b; 644 } 645 646 public boolean executeRemoveThreadInfoInResponseConstructor() 647 { 648 return _executeRemoveThreadInfoInResponseConstructor; 649 } 650 651 public void setExecuteRemoveThreadInfoInResponseConstructor(boolean b) 652 { 653 _executeRemoveThreadInfoInResponseConstructor = b; 654 } 655 656 public boolean executePIInResponseConstructor() 657 { 658 return _executePIInResponseConstructor; 659 } 660 661 public void setExecutePIInResponseConstructor( boolean b ) 662 { 663 _executePIInResponseConstructor = b; 664 } 665 666 private byte getStreamFormatVersionForThisRequest(IOR ior, 667 GIOPVersion giopVersion) 668 { 669 670 byte localMaxVersion 671 = ORBUtility.getMaxStreamFormatVersion(); 672 673 IOR effectiveTargetIOR = 674 ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(); 675 IIOPProfileTemplate temp = 676 (IIOPProfileTemplate)effectiveTargetIOR.getProfile().getTaggedProfileTemplate(); 677 Iterator iter = temp.iteratorById(TAG_RMI_CUSTOM_MAX_STREAM_FORMAT.value); 678 if (!iter.hasNext()) { 679 // Didn't have the max stream format version tagged 680 // component. 681 if (giopVersion.lessThan(GIOPVersion.V1_3)) 682 return ORBConstants.STREAM_FORMAT_VERSION_1; 683 else 684 return ORBConstants.STREAM_FORMAT_VERSION_2; 685 } 686 687 byte remoteMaxVersion 688 = ((MaxStreamFormatVersionComponent)iter.next()).getMaxStreamFormatVersion(); 689 690 return (byte)Math.min(localMaxVersion, remoteMaxVersion); 691 } 692 693 //////////////////////////////////////////////////////////////////////// 694 //////////////////////////////////////////////////////////////////////// 695 //////////////////////////////////////////////////////////////////////// 696 697 // REVISIT - This could be a separate implementation object looked 698 // up in a registry. However it needs some state in the message 699 // mediator so combine for now. 700 701 702 protected boolean isThreadDone = false; 703 704 //////////////////////////////////////////////////// 705 // 706 // pept.protocol.ProtocolHandler 707 // 708 709 public boolean handleRequest(MessageMediator messageMediator) 710 { 711 try { 712 dispatchHeader.callback(this); 713 } catch (IOException e) { 714 // REVISIT - this should be handled internally. 715 ; 716 } 717 return isThreadDone; 718 } 719 720 //////////////////////////////////////////////////// 721 // 722 // iiop.messages.MessageHandler 723 // 724 725 private void setWorkThenPoolOrResumeSelect(Message header) 726 { 727 if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) { 728 resumeSelect(header); 729 } else { 730 // Leader/Follower when using reader thread. 731 // When this thread is done working it will go back in pool. 732 733 isThreadDone = true; 734 735 // First unregister current registration. 736 orb.getTransportManager().getSelector(0) 737 .unregisterForEvent(getConnection().getEventHandler()); 738 // Have another thread become the reader. 739 orb.getTransportManager().getSelector(0) 740 .registerForEvent(getConnection().getEventHandler()); 741 } 742 } 743 744 private void setWorkThenReadOrResumeSelect(Message header) 745 { 746 if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) { 747 resumeSelect(header); 748 } else { 749 // When using reader thread then wen this thread is 750 // done working it will continue reading. 751 isThreadDone = false; 752 } 753 } 754 755 private void resumeSelect(Message header) 756 { 757 // NOTE: VERY IMPORTANT: 758 // Only participate in select after getting to the point 759 // that proper serialization of fragments is ensured. 760 761 if (transportDebug()) { 762 dprint(".resumeSelect:->"); 763 // REVISIT: not-OO: 764 String requestId = "?"; 765 if (header instanceof RequestMessage) { 766 requestId = 767 new Integer(((RequestMessage)header) 768 .getRequestId()).toString(); 769 } else if (header instanceof ReplyMessage) { 770 requestId = 771 new Integer(((ReplyMessage)header) 772 .getRequestId()).toString(); 773 } else if (header instanceof FragmentMessage_1_2) { 774 requestId = 775 new Integer(((FragmentMessage_1_2)header) 776 .getRequestId()).toString(); 777 } 778 dprint(".resumeSelect: id/" 779 + requestId 780 + " " + getConnection() 781 ); 782 783 } 784 785 // IMPORTANT: To avoid bug (4953599), we force the Thread that does the NIO select 786 // to also do the enable/disable of Ops using SelectionKey.interestOps(Ops of Interest). 787 // Otherwise, the SelectionKey.interestOps(Ops of Interest) may block indefinitely in 788 // this thread. 789 EventHandler eventHandler = getConnection().getEventHandler(); 790 orb.getTransportManager().getSelector(0).registerInterestOps(eventHandler); 791 792 if (transportDebug()) { 793 dprint(".resumeSelect:<-"); 794 } 795 } 796 797 private void setInputObject() 798 { 799 // REVISIT: refactor createInputObject (and createMessageMediator) 800 // into base PlugInFactory. Get via connection (either ContactInfo 801 // or Acceptor). 802 if (getConnection().getContactInfo() != null) { 803 inputObject = (CDRInputObject) 804 getConnection().getContactInfo() 805 .createInputObject(orb, this); 806 } else if (getConnection().getAcceptor() != null) { 807 inputObject = (CDRInputObject) 808 getConnection().getAcceptor() 809 .createInputObject(orb, this); 810 } else { 811 throw new RuntimeException("CorbaMessageMediatorImpl.setInputObject"); 812 } 813 inputObject.setMessageMediator(this); 814 setInputObject(inputObject); 815 } 816 817 private void signalResponseReceived() 818 { 819 // This will end up using the MessageMediator associated with 820 // the original request instead of the current mediator (which 821 // need to be constructed to hold the dispatchBuffer and connection). 822 connection.getResponseWaitingRoom() 823 .responseReceived((InputObject)inputObject); 824 } 825 826 // This handles message types for which we don't create classes. 827 public void handleInput(Message header) throws IOException 828 { 829 try { 830 messageHeader = header; 831 832 if (transportDebug()) 833 dprint(".handleInput->: " 834 + MessageBase.typeToString(header.getType())); 835 836 setWorkThenReadOrResumeSelect(header); 837 838 switch(header.getType()) 839 { 840 case Message.GIOPCloseConnection: 841 if (transportDebug()) { 842 dprint(".handleInput: CloseConnection: purging"); 843 } 844 connection.purgeCalls(wrapper.connectionRebind(), true, false); 845 break; 846 case Message.GIOPMessageError: 847 if (transportDebug()) { 848 dprint(".handleInput: MessageError: purging"); 849 } 850 connection.purgeCalls(wrapper.recvMsgError(), true, false); 851 break; 852 default: 853 if (transportDebug()) { 854 dprint(".handleInput: ERROR: " 855 + MessageBase.typeToString(header.getType())); 856 } 857 throw wrapper.badGiopRequestType() ; 858 } 859 releaseByteBufferToPool(); 860 } finally { 861 if (transportDebug()) { 862 dprint(".handleInput<-: " 863 + MessageBase.typeToString(header.getType())); 864 } 865 } 866 } 867 868 public void handleInput(RequestMessage_1_0 header) throws IOException 869 { 870 try { 871 if (transportDebug()) dprint(".REQUEST 1.0->: " + header); 872 try { 873 messageHeader = requestHeader = (RequestMessage) header; 874 setInputObject(); 875 } finally { 876 setWorkThenPoolOrResumeSelect(header); 877 } 878 getProtocolHandler().handleRequest(header, this); 879 } catch (Throwable t) { 880 if (transportDebug()) 881 dprint(".REQUEST 1.0: !!ERROR!!: " + header, t); 882 // Mask the exception from thread.; 883 } finally { 884 if (transportDebug()) dprint(".REQUEST 1.0<-: " + header); 885 } 886 } 887 888 public void handleInput(RequestMessage_1_1 header) throws IOException 889 { 890 try { 891 if (transportDebug()) dprint(".REQUEST 1.1->: " + header); 892 try { 893 messageHeader = requestHeader = (RequestMessage) header; 894 setInputObject(); 895 connection.serverRequest_1_1_Put(this); 896 } finally { 897 setWorkThenPoolOrResumeSelect(header); 898 } 899 getProtocolHandler().handleRequest(header, this); 900 } catch (Throwable t) { 901 if (transportDebug()) 902 dprint(".REQUEST 1.1: !!ERROR!!: " + header, t); 903 // Mask the exception from thread.; 904 } finally { 905 if (transportDebug()) dprint(".REQUEST 1.1<-: " + header); 906 } 907 } 908 909 // REVISIT: this is identical to 1_0 except for fragment part. 910 public void handleInput(RequestMessage_1_2 header) throws IOException 911 { 912 try { 913 try { 914 915 messageHeader = requestHeader = (RequestMessage) header; 916 917 header.unmarshalRequestID(dispatchByteBuffer); 918 setInputObject(); 919 920 if (transportDebug()) dprint(".REQUEST 1.2->: id/" 921 + header.getRequestId() 922 + ": " 923 + header); 924 925 // NOTE: in the old code this used to be done conditionally: 926 // if (header.moreFragmentsToFollow()). 927 // Now we always put it in. We take it out when 928 // the response is done. 929 // This must happen now so if a header is fragmented the stream 930 // may be found. 931 connection.serverRequestMapPut(header.getRequestId(), this); 932 } finally { 933 // Leader/Follower. 934 // Note: This *MUST* come after putting stream in above map 935 // since the header may be fragmented and you do not want to 936 // start reading again until the map above is set. 937 setWorkThenPoolOrResumeSelect(header); 938 } 939 //inputObject.unmarshalHeader(); // done in subcontract. 940 getProtocolHandler().handleRequest(header, this); 941 } catch (Throwable t) { 942 if (transportDebug()) dprint(".REQUEST 1.2: id/" 943 + header.getRequestId() 944 + ": !!ERROR!!: " 945 + header, 946 t); 947 // Mask the exception from thread.; 948 } finally { 949 connection.serverRequestMapRemove(header.getRequestId()); 950 951 if (transportDebug()) dprint(".REQUEST 1.2<-: id/" 952 + header.getRequestId() 953 + ": " 954 + header); 955 } 956 } 957 958 public void handleInput(ReplyMessage_1_0 header) throws IOException 959 { 960 try { 961 try { 962 if (transportDebug()) dprint(".REPLY 1.0->: " + header); 963 messageHeader = replyHeader = (ReplyMessage) header; 964 setInputObject(); 965 966 // REVISIT: this should be done by waiting thread. 967 inputObject.unmarshalHeader(); 968 969 signalResponseReceived(); 970 } finally{ 971 setWorkThenReadOrResumeSelect(header); 972 } 973 } catch (Throwable t) { 974 if (transportDebug())dprint(".REPLY 1.0: !!ERROR!!: " + header, t); 975 // Mask the exception from thread.; 976 } finally { 977 if (transportDebug()) dprint(".REPLY 1.0<-: " + header); 978 } 979 } 980 981 public void handleInput(ReplyMessage_1_1 header) throws IOException 982 { 983 try { 984 if (transportDebug()) dprint(".REPLY 1.1->: " + header); 985 messageHeader = replyHeader = (ReplyMessage) header; 986 setInputObject(); 987 988 if (header.moreFragmentsToFollow()) { 989 990 // More fragments are coming to complete this reply, so keep 991 // a reference to the InputStream so we can add the fragments 992 connection.clientReply_1_1_Put(this); 993 994 // In 1.1, we can't assume that we have the request ID in the 995 // first fragment. Thus, another thread is used 996 // to be the reader while this thread unmarshals 997 // the extended header and wakes up the client thread. 998 setWorkThenPoolOrResumeSelect(header); 999 1000 // REVISIT - error handling. 1001 // This must be done now. 1002 inputObject.unmarshalHeader(); 1003 1004 signalResponseReceived(); 1005 1006 } else { 1007 1008 // Not fragmented, therefore we know the request 1009 // ID is here. Thus, we can unmarshal the extended header 1010 // and wake up the client thread without using a third 1011 // thread as above. 1012 1013 // REVISIT - error handling during unmarshal. 1014 // This must be done now to get the request id. 1015 inputObject.unmarshalHeader(); 1016 1017 signalResponseReceived(); 1018 1019 setWorkThenReadOrResumeSelect(header); 1020 } 1021 } catch (Throwable t) { 1022 if (transportDebug()) dprint(".REPLY 1.1: !!ERROR!!: " + header); 1023 // Mask the exception from thread.; 1024 } finally { 1025 if (transportDebug()) dprint(".REPLY 1.1<-: " + header); 1026 } 1027 } 1028 1029 public void handleInput(ReplyMessage_1_2 header) throws IOException 1030 { 1031 try { 1032 try { 1033 messageHeader = replyHeader = (ReplyMessage) header; 1034 1035 // We know that the request ID is in the first fragment 1036 header.unmarshalRequestID(dispatchByteBuffer); 1037 1038 if (transportDebug()) { 1039 dprint(".REPLY 1.2->: id/" 1040 + + header.getRequestId() 1041 + ": more?: " + header.moreFragmentsToFollow() 1042 + ": " + header); 1043 } 1044 1045 setInputObject(); 1046 1047 signalResponseReceived(); 1048 } finally { 1049 setWorkThenReadOrResumeSelect(header); 1050 } 1051 } catch (Throwable t) { 1052 if (transportDebug()) dprint(".REPLY 1.2: id/" 1053 + header.getRequestId() 1054 + ": !!ERROR!!: " 1055 + header, t); 1056 // Mask the exception from thread.; 1057 } finally { 1058 if (transportDebug()) dprint(".REPLY 1.2<-: id/" 1059 + header.getRequestId() 1060 + ": " 1061 + header); 1062 } 1063 } 1064 1065 public void handleInput(LocateRequestMessage_1_0 header) throws IOException 1066 { 1067 try { 1068 if (transportDebug()) 1069 dprint(".LOCATE_REQUEST 1.0->: " + header); 1070 try { 1071 messageHeader = header; 1072 setInputObject(); 1073 } finally { 1074 setWorkThenPoolOrResumeSelect(header); 1075 } 1076 getProtocolHandler().handleRequest(header, this); 1077 } catch (Throwable t) { 1078 if (transportDebug()) 1079 dprint(".LOCATE_REQUEST 1.0: !!ERROR!!: " + header, t); 1080 // Mask the exception from thread.; 1081 } finally { 1082 if (transportDebug()) 1083 dprint(".LOCATE_REQUEST 1.0<-: " + header); 1084 } 1085 1086 } 1087 1088 public void handleInput(LocateRequestMessage_1_1 header) throws IOException 1089 { 1090 try { 1091 if (transportDebug()) 1092 dprint(".LOCATE_REQUEST 1.1->: " + header); 1093 try { 1094 messageHeader = header; 1095 setInputObject(); 1096 } finally { 1097 setWorkThenPoolOrResumeSelect(header); 1098 } 1099 getProtocolHandler().handleRequest(header, this); 1100 } catch (Throwable t) { 1101 if (transportDebug()) 1102 dprint(".LOCATE_REQUEST 1.1: !!ERROR!!: " + header, t); 1103 // Mask the exception from thread.; 1104 } finally { 1105 if (transportDebug()) 1106 dprint(".LOCATE_REQUEST 1.1<-:" + header); 1107 } 1108 } 1109 1110 public void handleInput(LocateRequestMessage_1_2 header) throws IOException 1111 { 1112 try { 1113 try { 1114 messageHeader = header; 1115 1116 header.unmarshalRequestID(dispatchByteBuffer); 1117 setInputObject(); 1118 1119 if (transportDebug()) 1120 dprint(".LOCATE_REQUEST 1.2->: id/" 1121 + header.getRequestId() 1122 + ": " 1123 + header); 1124 1125 if (header.moreFragmentsToFollow()) { 1126 connection.serverRequestMapPut(header.getRequestId(),this); 1127 } 1128 } finally { 1129 setWorkThenPoolOrResumeSelect(header); 1130 } 1131 getProtocolHandler().handleRequest(header, this); 1132 } catch (Throwable t) { 1133 if (transportDebug()) 1134 dprint(".LOCATE_REQUEST 1.2: id/" 1135 + header.getRequestId() 1136 + ": !!ERROR!!: " 1137 + header, t); 1138 // Mask the exception from thread.; 1139 } finally { 1140 if (transportDebug()) 1141 dprint(".LOCATE_REQUEST 1.2<-: id/" 1142 + header.getRequestId() 1143 + ": " 1144 + header); 1145 } 1146 } 1147 1148 public void handleInput(LocateReplyMessage_1_0 header) throws IOException 1149 { 1150 try { 1151 if (transportDebug()) 1152 dprint(".LOCATE_REPLY 1.0->:" + header); 1153 try { 1154 messageHeader = header; 1155 setInputObject(); 1156 inputObject.unmarshalHeader(); // REVISIT Put in subcontract. 1157 signalResponseReceived(); 1158 } finally { 1159 setWorkThenReadOrResumeSelect(header); 1160 } 1161 } catch (Throwable t) { 1162 if (transportDebug()) 1163 dprint(".LOCATE_REPLY 1.0: !!ERROR!!: " + header, t); 1164 // Mask the exception from thread.; 1165 } finally { 1166 if (transportDebug()) 1167 dprint(".LOCATE_REPLY 1.0<-: " + header); 1168 } 1169 } 1170 1171 public void handleInput(LocateReplyMessage_1_1 header) throws IOException 1172 { 1173 try { 1174 if (transportDebug()) dprint(".LOCATE_REPLY 1.1->: " + header); 1175 try { 1176 messageHeader = header; 1177 setInputObject(); 1178 // Fragmented LocateReplies are not allowed in 1.1. 1179 inputObject.unmarshalHeader(); 1180 signalResponseReceived(); 1181 } finally { 1182 setWorkThenReadOrResumeSelect(header); 1183 } 1184 } catch (Throwable t) { 1185 if (transportDebug()) 1186 dprint(".LOCATE_REPLY 1.1: !!ERROR!!: " + header, t); 1187 // Mask the exception from thread.; 1188 } finally { 1189 if (transportDebug()) dprint(".LOCATE_REPLY 1.1<-: " + header); 1190 } 1191 } 1192 1193 public void handleInput(LocateReplyMessage_1_2 header) throws IOException 1194 { 1195 try { 1196 try { 1197 messageHeader = header; 1198 1199 // No need to put in client reply map - already there. 1200 header.unmarshalRequestID(dispatchByteBuffer); 1201 1202 setInputObject(); 1203 1204 if (transportDebug()) dprint(".LOCATE_REPLY 1.2->: id/" 1205 + header.getRequestId() 1206 + ": " 1207 + header); 1208 1209 signalResponseReceived(); 1210 } finally { 1211 setWorkThenPoolOrResumeSelect(header); // REVISIT 1212 } 1213 } catch (Throwable t) { 1214 if (transportDebug()) 1215 dprint(".LOCATE_REPLY 1.2: id/" 1216 + header.getRequestId() 1217 + ": !!ERROR!!: " 1218 + header, t); 1219 // Mask the exception from thread.; 1220 } finally { 1221 if (transportDebug()) dprint(".LOCATE_REPLY 1.2<-: id/" 1222 + header.getRequestId() 1223 + ": " 1224 + header); 1225 } 1226 } 1227 1228 public void handleInput(FragmentMessage_1_1 header) throws IOException 1229 { 1230 try { 1231 if (transportDebug()) { 1232 dprint(".FRAGMENT 1.1->: " 1233 + "more?: " + header.moreFragmentsToFollow() 1234 + ": " + header); 1235 } 1236 try { 1237 messageHeader = header; 1238 MessageMediator mediator = null; 1239 CDRInputObject inputObject = null; 1240 1241 if (connection.isServer()) { 1242 mediator = connection.serverRequest_1_1_Get(); 1243 } else { 1244 mediator = connection.clientReply_1_1_Get(); 1245 } 1246 if (mediator != null) { 1247 inputObject = (CDRInputObject) mediator.getInputObject(); 1248 } 1249 1250 // If no input stream available, then discard the fragment. 1251 // This can happen: 1252 // 1. if a fragment message is received prior to receiving 1253 // the original request/reply message. Very unlikely. 1254 // 2. if a fragment message is received after the 1255 // reply has been sent (early replies) 1256 // Note: In the case of early replies, the fragments received 1257 // during the request processing (which are never unmarshaled), 1258 // will eventually be discarded by the GC. 1259 if (inputObject == null) { 1260 if (transportDebug()) 1261 dprint(".FRAGMENT 1.1: ++++DISCARDING++++: " + header); 1262 // need to release dispatchByteBuffer to pool if 1263 // we are discarding 1264 releaseByteBufferToPool(); 1265 return; 1266 } 1267 1268 inputObject.getBufferManager() 1269 .processFragment(dispatchByteBuffer, header); 1270 1271 if (! header.moreFragmentsToFollow()) { 1272 if (connection.isServer()) { 1273 connection.serverRequest_1_1_Remove(); 1274 } else { 1275 connection.clientReply_1_1_Remove(); 1276 } 1277 } 1278 } finally { 1279 // NOTE: This *must* come after queing the fragment 1280 // when using the selector to ensure fragments stay in order. 1281 setWorkThenReadOrResumeSelect(header); 1282 } 1283 } catch (Throwable t) { 1284 if (transportDebug()) 1285 dprint(".FRAGMENT 1.1: !!ERROR!!: " + header, t); 1286 // Mask the exception from thread.; 1287 } finally { 1288 if (transportDebug()) dprint(".FRAGMENT 1.1<-: " + header); 1289 } 1290 } 1291 1292 public void handleInput(FragmentMessage_1_2 header) throws IOException 1293 { 1294 try { 1295 try { 1296 messageHeader = header; 1297 1298 // Note: We know it's a 1.2 fragment, we have the data, but 1299 // we need the IIOPInputStream instance to unmarshal the 1300 // request ID... but we need the request ID to get the 1301 // IIOPInputStream instance. So we peek at the raw bytes. 1302 1303 header.unmarshalRequestID(dispatchByteBuffer); 1304 1305 if (transportDebug()) { 1306 dprint(".FRAGMENT 1.2->: id/" 1307 + header.getRequestId() 1308 + ": more?: " + header.moreFragmentsToFollow() 1309 + ": " + header); 1310 } 1311 1312 MessageMediator mediator = null; 1313 InputObject inputObject = null; 1314 1315 if (connection.isServer()) { 1316 mediator = 1317 connection.serverRequestMapGet(header.getRequestId()); 1318 } else { 1319 mediator = 1320 connection.clientRequestMapGet(header.getRequestId()); 1321 } 1322 if (mediator != null) { 1323 inputObject = mediator.getInputObject(); 1324 } 1325 // See 1.1 comments. 1326 if (inputObject == null) { 1327 if (transportDebug()) { 1328 dprint(".FRAGMENT 1.2: id/" 1329 + header.getRequestId() 1330 + ": ++++DISCARDING++++: " 1331 + header); 1332 } 1333 // need to release dispatchByteBuffer to pool if 1334 // we are discarding 1335 releaseByteBufferToPool(); 1336 return; 1337 } 1338 ((CDRInputObject)inputObject) 1339 .getBufferManager().processFragment( 1340 dispatchByteBuffer, header); 1341 1342 // REVISIT: but if it is a server don't you have to remove the 1343 // stream from the map? 1344 if (! connection.isServer()) { 1345 /* REVISIT 1346 * No need to do anything. 1347 * Should we mark that last was received? 1348 if (! header.moreFragmentsToFollow()) { 1349 // Last fragment. 1350 } 1351 */ 1352 } 1353 } finally { 1354 // NOTE: This *must* come after queing the fragment 1355 // when using the selector to ensure fragments stay in order. 1356 setWorkThenReadOrResumeSelect(header); 1357 } 1358 } catch (Throwable t) { 1359 if (transportDebug()) 1360 dprint(".FRAGMENT 1.2: id/" 1361 + header.getRequestId() 1362 + ": !!ERROR!!: " 1363 + header, t); 1364 // Mask the exception from thread.; 1365 } finally { 1366 if (transportDebug()) dprint(".FRAGMENT 1.2<-: id/" 1367 + header.getRequestId() 1368 + ": " 1369 + header); 1370 } 1371 } 1372 1373 public void handleInput(CancelRequestMessage header) throws IOException 1374 { 1375 try { 1376 try { 1377 messageHeader = header; 1378 setInputObject(); 1379 1380 // REVISIT: Move these two to subcontract. 1381 inputObject.unmarshalHeader(); 1382 1383 if (transportDebug()) dprint(".CANCEL->: id/" 1384 + header.getRequestId() + ": " 1385 + header.getGIOPVersion() + ": " 1386 + header); 1387 1388 processCancelRequest(header.getRequestId()); 1389 releaseByteBufferToPool(); 1390 } finally { 1391 setWorkThenReadOrResumeSelect(header); 1392 } 1393 } catch (Throwable t) { 1394 if (transportDebug()) dprint(".CANCEL: id/" 1395 + header.getRequestId() 1396 + ": !!ERROR!!: " 1397 + header, t); 1398 // Mask the exception from thread.; 1399 } finally { 1400 if (transportDebug()) dprint(".CANCEL<-: id/" 1401 + header.getRequestId() + ": " 1402 + header.getGIOPVersion() + ": " 1403 + header); 1404 } 1405 } 1406 1407 private void throwNotImplemented() 1408 { 1409 isThreadDone = false; 1410 throwNotImplemented(""); 1411 } 1412 1413 private void throwNotImplemented(String msg) 1414 { 1415 throw new RuntimeException("CorbaMessageMediatorImpl: not implemented " + msg); 1416 } 1417 1418 private void dprint(String msg, Throwable t) 1419 { 1420 dprint(msg); 1421 t.printStackTrace(System.out); 1422 } 1423 1424 private void dprint(String msg) 1425 { 1426 ORBUtility.dprint("CorbaMessageMediatorImpl", msg); 1427 } 1428 1429 protected String opAndId(CorbaMessageMediator mediator) 1430 { 1431 return ORBUtility.operationNameAndRequestId(mediator); 1432 } 1433 1434 private boolean transportDebug() 1435 { 1436 return orb.transportDebugFlag; 1437 } 1438 1439 // REVISIT: move this to subcontract (but both client and server need it). 1440 private final void processCancelRequest(int cancelReqId) { 1441 1442 // The GIOP version of CancelRequest does not matter, since 1443 // CancelRequest_1_0 could be sent to cancel a request which 1444 // has a different GIOP version. 1445 1446 /* 1447 * CancelRequest processing logic : 1448 * 1449 * - find the request with matching requestId 1450 * 1451 * - call cancelProcessing() in BufferManagerRead [BMR] 1452 * 1453 * - the hope is that worker thread would call BMR.underflow() 1454 * to wait for more fragments to come in. When BMR.underflow() is 1455 * called, if a CancelRequest had already arrived, 1456 * the worker thread would throw ThreadDeath, 1457 * else the thread would wait to be notified of the 1458 * arrival of a new fragment or CancelRequest. Upon notification, 1459 * the woken up thread would check to see if a CancelRequest had 1460 * arrived and if so throw a ThreadDeath or it will continue to 1461 * process the received fragment. 1462 * 1463 * - if all the fragments had been received prior to CancelRequest 1464 * then the worker thread would never block in BMR.underflow(). 1465 * So, setting the abort flag in BMR has no effect. The request 1466 * processing will complete normally. 1467 * 1468 * - in the case where the server has received enough fragments to 1469 * start processing the request and the server sends out 1470 * an early reply. In such a case if the CancelRequest arrives 1471 * after the reply has been sent, it has no effect. 1472 */ 1473 1474 if (!connection.isServer()) { 1475 return; // we do not support bi-directional giop yet, ignore. 1476 } 1477 1478 // Try to get hold of the InputStream buffer. 1479 // In the case of 1.0 requests there is no way to get hold of 1480 // InputStream. Try out the 1.1 and 1.2 cases. 1481 1482 // was the request 1.2 ? 1483 MessageMediator mediator = connection.serverRequestMapGet(cancelReqId); 1484 int requestId ; 1485 if (mediator == null) { 1486 // was the request 1.1 ? 1487 mediator = connection.serverRequest_1_1_Get(); 1488 if (mediator == null) { 1489 // XXX log this! 1490 // either the request was 1.0 1491 // or an early reply has already been sent 1492 // or request processing is over 1493 // or its a spurious CancelRequest 1494 return; // do nothing. 1495 } 1496 1497 requestId = ((CorbaMessageMediator) mediator).getRequestId(); 1498 1499 if (requestId != cancelReqId) { 1500 // A spurious 1.1 CancelRequest has been received. 1501 // XXX log this! 1502 return; // do nothing 1503 } 1504 1505 if (requestId == 0) { // special case 1506 // XXX log this 1507 // this means that 1508 // 1. the 1.1 requests' requestId has not been received 1509 // i.e., a CancelRequest was received even before the 1510 // 1.1 request was received. The spec disallows this. 1511 // 2. or the 1.1 request has a requestId 0. 1512 // 1513 // It is a little tricky to distinguish these two. So, be 1514 // conservative and do not cancel the request. Downside is that 1515 // 1.1 requests with requestId of 0 will never be cancelled. 1516 return; // do nothing 1517 } 1518 } else { 1519 requestId = ((CorbaMessageMediator) mediator).getRequestId(); 1520 } 1521 1522 Message msg = ((CorbaMessageMediator)mediator).getRequestHeader(); 1523 if (msg.getType() != Message.GIOPRequest) { 1524 // Any mediator obtained here should only ever be for a GIOP 1525 // request. 1526 wrapper.badMessageTypeForCancel() ; 1527 } 1528 1529 // At this point we have a valid message mediator that contains 1530 // a valid requestId. 1531 1532 // at this point we have chosen a request to be cancelled. But we 1533 // do not know if the target object's method has been invoked or not. 1534 // Request input stream being available simply means that the request 1535 // processing is not over yet. simply set the abort flag in the 1536 // BMRS and hope that the worker thread would notice it (this can 1537 // happen only if the request stream is being unmarshalled and the 1538 // target's method has not been invoked yet). This guarantees 1539 // that the requests which have been dispatched to the 1540 // target's method will never be cancelled. 1541 1542 BufferManagerReadStream bufferManager = (BufferManagerReadStream) 1543 ((CDRInputObject)mediator.getInputObject()).getBufferManager(); 1544 bufferManager.cancelProcessing(cancelReqId); 1545 } 1546 1547 //////////////////////////////////////////////////// 1548 // 1549 // spi.protocol.CorbaProtocolHandler 1550 // 1551 1552 public void handleRequest(RequestMessage msg, 1553 CorbaMessageMediator messageMediator) 1554 { 1555 try { 1556 beginRequest(messageMediator); 1557 try { 1558 handleRequestRequest(messageMediator); 1559 if (messageMediator.isOneWay()) { 1560 return; 1561 } 1562 } catch (Throwable t) { 1563 if (messageMediator.isOneWay()) { 1564 return; 1565 } 1566 handleThrowableDuringServerDispatch( 1567 messageMediator, t, CompletionStatus.COMPLETED_MAYBE); 1568 } 1569 sendResponse(messageMediator); 1570 } catch (Throwable t) { 1571 dispatchError(messageMediator, "RequestMessage", t); 1572 } finally { 1573 endRequest(messageMediator); 1574 } 1575 } 1576 1577 public void handleRequest(LocateRequestMessage msg, 1578 CorbaMessageMediator messageMediator) 1579 { 1580 try { 1581 beginRequest(messageMediator); 1582 try { 1583 handleLocateRequest(messageMediator); 1584 } catch (Throwable t) { 1585 handleThrowableDuringServerDispatch( 1586 messageMediator, t, CompletionStatus.COMPLETED_MAYBE); 1587 } 1588 sendResponse(messageMediator); 1589 } catch (Throwable t) { 1590 dispatchError(messageMediator, "LocateRequestMessage", t); 1591 } finally { 1592 endRequest(messageMediator); 1593 } 1594 } 1595 1596 private void beginRequest(CorbaMessageMediator messageMediator) 1597 { 1598 ORB orb = (ORB) messageMediator.getBroker(); 1599 if (orb.subcontractDebugFlag) { 1600 dprint(".handleRequest->:"); 1601 } 1602 connection.serverRequestProcessingBegins(); 1603 } 1604 1605 private void dispatchError(CorbaMessageMediator messageMediator, 1606 String msg, Throwable t) 1607 { 1608 if (orb.subcontractDebugFlag) { 1609 dprint(".handleRequest: " + opAndId(messageMediator) 1610 + ": !!ERROR!!: " 1611 + msg, 1612 t); 1613 } 1614 // REVISIT - this makes hcks sendTwoObjects fail 1615 // messageMediator.getConnection().close(); 1616 } 1617 1618 private void sendResponse(CorbaMessageMediator messageMediator) 1619 { 1620 if (orb.subcontractDebugFlag) { 1621 dprint(".handleRequest: " + opAndId(messageMediator) 1622 + ": sending response"); 1623 } 1624 // REVISIT - type and location 1625 CDROutputObject outputObject = (CDROutputObject) 1626 messageMediator.getOutputObject(); 1627 if (outputObject != null) { 1628 // REVISIT - can be null for TRANSIENT below. 1629 outputObject.finishSendingMessage(); 1630 } 1631 } 1632 1633 private void endRequest(CorbaMessageMediator messageMediator) 1634 { 1635 ORB orb = (ORB) messageMediator.getBroker(); 1636 if (orb.subcontractDebugFlag) { 1637 dprint(".handleRequest<-: " + opAndId(messageMediator)); 1638 } 1639 1640 // release NIO ByteBuffers to ByteBufferPool 1641 1642 try { 1643 OutputObject outputObj = messageMediator.getOutputObject(); 1644 if (outputObj != null) { 1645 outputObj.close(); 1646 } 1647 InputObject inputObj = messageMediator.getInputObject(); 1648 if (inputObj != null) { 1649 inputObj.close(); 1650 } 1651 } catch (IOException ex) { 1652 // Given what close() does, this catch shouldn't ever happen. 1653 // See CDRInput/OutputObject.close() for more info. 1654 // It also won't result in a Corba error if an IOException happens. 1655 if (orb.subcontractDebugFlag) { 1656 dprint(".endRequest: IOException:" + ex.getMessage(), ex); 1657 } 1658 } finally { 1659 ((CorbaConnection)messageMediator.getConnection()).serverRequestProcessingEnds(); 1660 } 1661 } 1662 1663 protected void handleRequestRequest(CorbaMessageMediator messageMediator) 1664 { 1665 // Does nothing if already unmarshaled. 1666 ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader(); 1667 1668 ORB orb = (ORB)messageMediator.getBroker(); 1669 orb.checkShutdownState(); 1670 1671 ObjectKey okey = messageMediator.getObjectKey(); 1672 if (orb.subcontractDebugFlag) { 1673 ObjectKeyTemplate oktemp = okey.getTemplate() ; 1674 dprint( ".handleRequest: " + opAndId(messageMediator) 1675 + ": dispatching to scid: " + oktemp.getSubcontractId()); 1676 } 1677 1678 CorbaServerRequestDispatcher sc = okey.getServerRequestDispatcher(orb); 1679 1680 if (orb.subcontractDebugFlag) { 1681 dprint(".handleRequest: " + opAndId(messageMediator) 1682 + ": dispatching to sc: " + sc); 1683 } 1684 1685 if (sc == null) { 1686 throw wrapper.noServerScInDispatch() ; 1687 } 1688 1689 // NOTE: 1690 // This is necessary so mediator can act as ResponseHandler 1691 // and pass necessary info to response constructors located 1692 // in the subcontract. 1693 // REVISIT - same class right now. 1694 //messageMediator.setProtocolHandler(this); 1695 1696 try { 1697 orb.startingDispatch(); 1698 sc.dispatch(messageMediator); 1699 } finally { 1700 orb.finishedDispatch(); 1701 } 1702 } 1703 1704 protected void handleLocateRequest(CorbaMessageMediator messageMediator) 1705 { 1706 ORB orb = (ORB)messageMediator.getBroker(); 1707 LocateRequestMessage msg = (LocateRequestMessage) 1708 messageMediator.getDispatchHeader(); 1709 IOR ior = null; 1710 LocateReplyMessage reply = null; 1711 short addrDisp = -1; 1712 1713 try { 1714 ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader(); 1715 CorbaServerRequestDispatcher sc = 1716 msg.getObjectKey().getServerRequestDispatcher( orb ) ; 1717 if (sc == null) { 1718 return; 1719 } 1720 1721 ior = sc.locate(msg.getObjectKey()); 1722 1723 if ( ior == null ) { 1724 reply = MessageBase.createLocateReply( 1725 orb, msg.getGIOPVersion(), 1726 msg.getEncodingVersion(), 1727 msg.getRequestId(), 1728 LocateReplyMessage.OBJECT_HERE, null); 1729 1730 } else { 1731 reply = MessageBase.createLocateReply( 1732 orb, msg.getGIOPVersion(), 1733 msg.getEncodingVersion(), 1734 msg.getRequestId(), 1735 LocateReplyMessage.OBJECT_FORWARD, ior); 1736 } 1737 // REVISIT: Should we catch SystemExceptions? 1738 1739 } catch (AddressingDispositionException ex) { 1740 1741 // create a response containing the expected target 1742 // addressing disposition. 1743 1744 reply = MessageBase.createLocateReply( 1745 orb, msg.getGIOPVersion(), 1746 msg.getEncodingVersion(), 1747 msg.getRequestId(), 1748 LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE, null); 1749 1750 addrDisp = ex.expectedAddrDisp(); 1751 1752 } catch (RequestCanceledException ex) { 1753 1754 return; // no need to send reply 1755 1756 } catch ( Exception ex ) { 1757 1758 // REVISIT If exception is not OBJECT_NOT_EXIST, it should 1759 // have a different reply 1760 1761 // This handles OBJECT_NOT_EXIST exceptions thrown in 1762 // the subcontract or obj manager. Send back UNKNOWN_OBJECT. 1763 1764 reply = MessageBase.createLocateReply( 1765 orb, msg.getGIOPVersion(), 1766 msg.getEncodingVersion(), 1767 msg.getRequestId(), 1768 LocateReplyMessage.UNKNOWN_OBJECT, null); 1769 } 1770 1771 CDROutputObject outputObject = 1772 createAppropriateOutputObject(messageMediator, 1773 msg, reply); 1774 messageMediator.setOutputObject(outputObject); 1775 outputObject.setMessageMediator(messageMediator); 1776 1777 reply.write(outputObject); 1778 // outputObject.setMessage(reply); // REVISIT - not necessary 1779 if (ior != null) { 1780 ior.write(outputObject); 1781 } 1782 if (addrDisp != -1) { 1783 AddressingDispositionHelper.write(outputObject, addrDisp); 1784 } 1785 } 1786 1787 private CDROutputObject createAppropriateOutputObject( 1788 CorbaMessageMediator messageMediator, 1789 Message msg, LocateReplyMessage reply) 1790 { 1791 CDROutputObject outputObject; 1792 1793 if (msg.getGIOPVersion().lessThan(GIOPVersion.V1_2)) { 1794 // locate msgs 1.0 & 1.1 :=> grow, 1795 // REVISIT - build from factory 1796 outputObject = new CDROutputObject( 1797 (ORB) messageMediator.getBroker(), 1798 this, 1799 GIOPVersion.V1_0, 1800 (CorbaConnection) messageMediator.getConnection(), 1801 reply, 1802 ORBConstants.STREAM_FORMAT_VERSION_1); 1803 } else { 1804 // 1.2 :=> stream 1805 // REVISIT - build from factory 1806 outputObject = new CDROutputObject( 1807 (ORB) messageMediator.getBroker(), 1808 messageMediator, 1809 reply, 1810 ORBConstants.STREAM_FORMAT_VERSION_1); 1811 } 1812 return outputObject; 1813 } 1814 1815 public void handleThrowableDuringServerDispatch( 1816 CorbaMessageMediator messageMediator, 1817 Throwable throwable, 1818 CompletionStatus completionStatus) 1819 { 1820 if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { 1821 dprint(".handleThrowableDuringServerDispatch: " 1822 + opAndId(messageMediator) + ": " 1823 + throwable); 1824 } 1825 1826 // If we haven't unmarshaled the header, we probably don't 1827 // have enough information to even send back a reply. 1828 1829 // REVISIT 1830 // Cannot do this check. When target addressing disposition does 1831 // not match (during header unmarshaling) it throws an exception 1832 // to be handled here. 1833 /* 1834 if (! ((CDRInputObject)messageMediator.getInputObject()) 1835 .unmarshaledHeader()) { 1836 return; 1837 } 1838 */ 1839 handleThrowableDuringServerDispatch(messageMediator, 1840 throwable, 1841 completionStatus, 1842 1); 1843 } 1844 1845 1846 // REVISIT - catch and ignore RequestCanceledException. 1847 1848 protected void handleThrowableDuringServerDispatch( 1849 CorbaMessageMediator messageMediator, 1850 Throwable throwable, 1851 CompletionStatus completionStatus, 1852 int iteration) 1853 { 1854 if (iteration > 10) { 1855 if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { 1856 dprint(".handleThrowableDuringServerDispatch: " 1857 + opAndId(messageMediator) 1858 + ": cannot handle: " 1859 + throwable); 1860 } 1861 1862 // REVISIT - should we close connection? 1863 RuntimeException rte = 1864 new RuntimeException("handleThrowableDuringServerDispatch: " + 1865 "cannot create response."); 1866 rte.initCause(throwable); 1867 throw rte; 1868 } 1869 1870 try { 1871 if (throwable instanceof ForwardException) { 1872 ForwardException fex = (ForwardException)throwable ; 1873 createLocationForward( messageMediator, fex.getIOR(), null ) ; 1874 return; 1875 } 1876 1877 if (throwable instanceof AddressingDispositionException) { 1878 handleAddressingDisposition( 1879 messageMediator, 1880 (AddressingDispositionException)throwable); 1881 return; 1882 } 1883 1884 // Else. 1885 1886 SystemException sex = 1887 convertThrowableToSystemException(throwable, completionStatus); 1888 1889 createSystemExceptionResponse(messageMediator, sex, null); 1890 return; 1891 1892 } catch (Throwable throwable2) { 1893 1894 // User code (e.g., postinvoke, interceptors) may change 1895 // the exception, so we end up back here. 1896 // Report the changed exception. 1897 1898 handleThrowableDuringServerDispatch(messageMediator, 1899 throwable2, 1900 completionStatus, 1901 iteration + 1); 1902 return; 1903 } 1904 } 1905 1906 protected SystemException convertThrowableToSystemException( 1907 Throwable throwable, 1908 CompletionStatus completionStatus) 1909 { 1910 if (throwable instanceof SystemException) { 1911 return (SystemException)throwable; 1912 } 1913 1914 if (throwable instanceof RequestCanceledException) { 1915 // Reporting an exception response causes the 1916 // poa current stack, the interceptor stacks, etc. 1917 // to be balanced. It also notifies interceptors 1918 // that the request was cancelled. 1919 1920 return wrapper.requestCanceled( throwable ) ; 1921 } 1922 1923 // NOTE: We do not trap ThreadDeath above Throwable. 1924 // There is no reason to stop the thread. It is 1925 // just a worker thread. The ORB never throws 1926 // ThreadDeath. Client code may (e.g., in ServantManagers, 1927 // interceptors, or servants) but that should not 1928 // effect the ORB threads. So it is just handled 1929 // generically. 1930 1931 // 1932 // Last resort. 1933 // If user code throws a non-SystemException report it generically. 1934 // 1935 1936 return wrapper.runtimeexception( CompletionStatus.COMPLETED_MAYBE, throwable ) ; 1937 } 1938 1939 protected void handleAddressingDisposition( 1940 CorbaMessageMediator messageMediator, 1941 AddressingDispositionException ex) 1942 { 1943 1944 short addrDisp = -1; 1945 1946 // from iiop.RequestProcessor. 1947 1948 // Respond with expected target addressing disposition. 1949 1950 switch (messageMediator.getRequestHeader().getType()) { 1951 case Message.GIOPRequest : 1952 ReplyMessage replyHeader = MessageBase.createReply( 1953 (ORB)messageMediator.getBroker(), 1954 messageMediator.getGIOPVersion(), 1955 messageMediator.getEncodingVersion(), 1956 messageMediator.getRequestId(), 1957 ReplyMessage.NEEDS_ADDRESSING_MODE, 1958 null, null); 1959 // REVISIT: via acceptor factory. 1960 CDROutputObject outputObject = new CDROutputObject( 1961 (ORB)messageMediator.getBroker(), 1962 this, 1963 messageMediator.getGIOPVersion(), 1964 (CorbaConnection)messageMediator.getConnection(), 1965 replyHeader, 1966 ORBConstants.STREAM_FORMAT_VERSION_1); 1967 messageMediator.setOutputObject(outputObject); 1968 outputObject.setMessageMediator(messageMediator); 1969 replyHeader.write(outputObject); 1970 AddressingDispositionHelper.write(outputObject, 1971 ex.expectedAddrDisp()); 1972 return; 1973 1974 case Message.GIOPLocateRequest : 1975 LocateReplyMessage locateReplyHeader = MessageBase.createLocateReply( 1976 (ORB)messageMediator.getBroker(), 1977 messageMediator.getGIOPVersion(), 1978 messageMediator.getEncodingVersion(), 1979 messageMediator.getRequestId(), 1980 LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE, 1981 null); 1982 1983 addrDisp = ex.expectedAddrDisp(); 1984 1985 // REVISIT: via acceptor factory. 1986 outputObject = 1987 createAppropriateOutputObject(messageMediator, 1988 messageMediator.getRequestHeader(), 1989 locateReplyHeader); 1990 messageMediator.setOutputObject(outputObject); 1991 outputObject.setMessageMediator(messageMediator); 1992 locateReplyHeader.write(outputObject); 1993 IOR ior = null; 1994 if (ior != null) { 1995 ior.write(outputObject); 1996 } 1997 if (addrDisp != -1) { 1998 AddressingDispositionHelper.write(outputObject, addrDisp); 1999 } 2000 return; 2001 } 2002 } 2003 2004 public CorbaMessageMediator createResponse( 2005 CorbaMessageMediator messageMediator, 2006 ServiceContexts svc) 2007 { 2008 // REVISIT: ignore service contexts during framework transition. 2009 // They are set in SubcontractResponseHandler to the wrong connection. 2010 // Then they would be set again here and a duplicate contexts 2011 // exception occurs. 2012 return createResponseHelper( 2013 messageMediator, 2014 getServiceContextsForReply(messageMediator, null)); 2015 } 2016 2017 public CorbaMessageMediator createUserExceptionResponse( 2018 CorbaMessageMediator messageMediator, ServiceContexts svc) 2019 { 2020 // REVISIT - same as above 2021 return createResponseHelper( 2022 messageMediator, 2023 getServiceContextsForReply(messageMediator, null), 2024 true); 2025 } 2026 2027 public CorbaMessageMediator createUnknownExceptionResponse( 2028 CorbaMessageMediator messageMediator, UnknownException ex) 2029 { 2030 // NOTE: This service context container gets augmented in 2031 // tail call. 2032 ServiceContexts contexts = null; 2033 SystemException sys = new UNKNOWN( 0, 2034 CompletionStatus.COMPLETED_MAYBE); 2035 contexts = new ServiceContexts( (ORB)messageMediator.getBroker() ); 2036 UEInfoServiceContext uei = new UEInfoServiceContext(sys); 2037 contexts.put( uei ) ; 2038 return createSystemExceptionResponse(messageMediator, sys, contexts); 2039 } 2040 2041 public CorbaMessageMediator createSystemExceptionResponse( 2042 CorbaMessageMediator messageMediator, 2043 SystemException ex, 2044 ServiceContexts svc) 2045 { 2046 if (messageMediator.getConnection() != null) { 2047 // It is possible that fragments of response have already been 2048 // sent. Then an error may occur (e.g. marshaling error like 2049 // non serializable object). In that case it is too late 2050 // to send the exception. We just return the existing fragmented 2051 // stream here. This will cause an incomplete last fragment 2052 // to be sent. Then the other side will get a marshaling error 2053 // when attempting to unmarshal. 2054 2055 // REVISIT: Impl - make interface method to do the following. 2056 CorbaMessageMediatorImpl mediator = (CorbaMessageMediatorImpl) 2057 ((CorbaConnection)messageMediator.getConnection()) 2058 .serverRequestMapGet(messageMediator.getRequestId()); 2059 2060 OutputObject existingOutputObject = null; 2061 if (mediator != null) { 2062 existingOutputObject = mediator.getOutputObject(); 2063 } 2064 2065 // REVISIT: need to think about messageMediator containing correct 2066 // pointer to output object. 2067 if (existingOutputObject != null && 2068 mediator.sentFragment() && 2069 ! mediator.sentFullMessage()) 2070 { 2071 return mediator; 2072 } 2073 } 2074 2075 // Only do this if interceptors have been initialized on this request 2076 // and have not completed their lifecycle (otherwise the info stack 2077 // may be empty or have a different request's entry on top). 2078 if (messageMediator.executePIInResponseConstructor()) { 2079 // REVISIT: not necessary in framework now? 2080 // Inform Portable Interceptors of the SystemException. This is 2081 // required to be done here because the ending interception point 2082 // is called in the when creating the response below 2083 // but we do not currently write the SystemException into the 2084 // response until after the ending point is called. 2085 ((ORB)messageMediator.getBroker()).getPIHandler().setServerPIInfo( ex ); 2086 } 2087 2088 if (((ORB)messageMediator.getBroker()).subcontractDebugFlag && 2089 ex != null) 2090 { 2091 dprint(".createSystemExceptionResponse: " 2092 + opAndId(messageMediator), 2093 ex); 2094 } 2095 2096 ServiceContexts serviceContexts = 2097 getServiceContextsForReply(messageMediator, svc); 2098 2099 // NOTE: We MUST add the service context before creating 2100 // the response since service contexts are written to the 2101 // stream when the response object is created. 2102 2103 addExceptionDetailMessage(messageMediator, ex, serviceContexts); 2104 2105 CorbaMessageMediator response = 2106 createResponseHelper(messageMediator, serviceContexts, false); 2107 2108 // NOTE: From here on, it is too late to add more service contexts. 2109 // They have already been serialized to the stream (and maybe fragments 2110 // sent). 2111 2112 ORBUtility.writeSystemException( 2113 ex, (OutputStream)response.getOutputObject()); 2114 2115 return response; 2116 } 2117 2118 private void addExceptionDetailMessage(CorbaMessageMediator mediator, 2119 SystemException ex, 2120 ServiceContexts serviceContexts) 2121 { 2122 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2123 PrintWriter pw = new PrintWriter(baos); 2124 ex.printStackTrace(pw); 2125 pw.flush(); // NOTE: you must flush or baos will be empty. 2126 EncapsOutputStream encapsOutputStream = 2127 new EncapsOutputStream((ORB)mediator.getBroker()); 2128 encapsOutputStream.putEndian(); 2129 encapsOutputStream.write_wstring(baos.toString()); 2130 UnknownServiceContext serviceContext = 2131 new UnknownServiceContext(ExceptionDetailMessage.value, 2132 encapsOutputStream.toByteArray()); 2133 serviceContexts.put(serviceContext); 2134 } 2135 2136 public CorbaMessageMediator createLocationForward( 2137 CorbaMessageMediator messageMediator, IOR ior, ServiceContexts svc) 2138 { 2139 ReplyMessage reply 2140 = MessageBase.createReply( 2141 (ORB)messageMediator.getBroker(), 2142 messageMediator.getGIOPVersion(), 2143 messageMediator.getEncodingVersion(), 2144 messageMediator.getRequestId(), 2145 ReplyMessage.LOCATION_FORWARD, 2146 getServiceContextsForReply(messageMediator, svc), 2147 ior); 2148 2149 return createResponseHelper(messageMediator, reply, ior); 2150 } 2151 2152 protected CorbaMessageMediator createResponseHelper( 2153 CorbaMessageMediator messageMediator, ServiceContexts svc) 2154 { 2155 ReplyMessage message = 2156 MessageBase.createReply( 2157 (ORB)messageMediator.getBroker(), 2158 messageMediator.getGIOPVersion(), 2159 messageMediator.getEncodingVersion(), 2160 messageMediator.getRequestId(), 2161 ReplyMessage.NO_EXCEPTION, 2162 svc, 2163 null); 2164 return createResponseHelper(messageMediator, message, null); 2165 } 2166 2167 protected CorbaMessageMediator createResponseHelper( 2168 CorbaMessageMediator messageMediator, ServiceContexts svc,boolean user) 2169 { 2170 ReplyMessage message = 2171 MessageBase.createReply( 2172 (ORB)messageMediator.getBroker(), 2173 messageMediator.getGIOPVersion(), 2174 messageMediator.getEncodingVersion(), 2175 messageMediator.getRequestId(), 2176 user ? ReplyMessage.USER_EXCEPTION : 2177 ReplyMessage.SYSTEM_EXCEPTION, 2178 svc, 2179 null); 2180 return createResponseHelper(messageMediator, message, null); 2181 } 2182 2183 // REVISIT - IOR arg is ignored. 2184 protected CorbaMessageMediator createResponseHelper( 2185 CorbaMessageMediator messageMediator, ReplyMessage reply, IOR ior) 2186 { 2187 // REVISIT - these should be invoked from subcontract. 2188 runServantPostInvoke(messageMediator); 2189 runInterceptors(messageMediator, reply); 2190 runRemoveThreadInfo(messageMediator); 2191 2192 if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { 2193 dprint(".createResponseHelper: " 2194 + opAndId(messageMediator) + ": " 2195 + reply); 2196 } 2197 2198 messageMediator.setReplyHeader(reply); 2199 2200 OutputObject replyOutputObject; 2201 // REVISIT = do not use null. 2202 // 2203 if (messageMediator.getConnection() == null) { 2204 // REVISIT - needs factory 2205 replyOutputObject = 2206 new CDROutputObject(orb, messageMediator, 2207 messageMediator.getReplyHeader(), 2208 messageMediator.getStreamFormatVersion(), 2209 BufferManagerFactory.GROW); 2210 } else { 2211 replyOutputObject = messageMediator.getConnection().getAcceptor() 2212 .createOutputObject(messageMediator.getBroker(), messageMediator); 2213 } 2214 messageMediator.setOutputObject(replyOutputObject); 2215 messageMediator.getOutputObject().setMessageMediator(messageMediator); 2216 2217 reply.write((OutputStream) messageMediator.getOutputObject()); 2218 if (reply.getIOR() != null) { 2219 reply.getIOR().write((OutputStream) messageMediator.getOutputObject()); 2220 } 2221 // REVISIT - not necessary? 2222 //messageMediator.this.replyIOR = reply.getIOR(); 2223 2224 // NOTE: The mediator holds onto output object so return value 2225 // not really necessary. 2226 return messageMediator; 2227 } 2228 2229 protected void runServantPostInvoke(CorbaMessageMediator messageMediator) 2230 { 2231 // Run ServantLocator::postinvoke. This may cause a SystemException 2232 // which will throw out of the constructor and return later 2233 // to construct a reply for that exception. The internal logic 2234 // of returnServant makes sure that postinvoke is only called once. 2235 // REVISIT: instead of instanceof, put method on all orbs. 2236 ORB orb = null; 2237 // This flag is to deal with BootstrapServer use of reply streams, 2238 // with ServerRequestDispatcher's use of reply streams, etc. 2239 if (messageMediator.executeReturnServantInResponseConstructor()) { 2240 // It is possible to get marshaling errors in the skeleton after 2241 // postinvoke has completed. We must set this to false so that 2242 // when the error exception reply is constructed we don't try 2243 // to incorrectly access poa current (which will be the wrong 2244 // one or an empty stack. 2245 messageMediator.setExecuteReturnServantInResponseConstructor(false); 2246 messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(true); 2247 2248 try { 2249 orb = (ORB)messageMediator.getBroker(); 2250 OAInvocationInfo info = orb.peekInvocationInfo() ; 2251 ObjectAdapter oa = info.oa(); 2252 try { 2253 oa.returnServant() ; 2254 } catch (Throwable thr) { 2255 wrapper.unexpectedException( thr ) ; 2256 2257 if (thr instanceof Error) 2258 throw (Error)thr ; 2259 else if (thr instanceof RuntimeException) 2260 throw (RuntimeException)thr ; 2261 } finally { 2262 oa.exit(); 2263 } 2264 } catch (EmptyStackException ese) { 2265 throw wrapper.emptyStackRunServantPostInvoke( ese ) ; 2266 } 2267 } 2268 } 2269 2270 protected void runInterceptors(CorbaMessageMediator messageMediator, 2271 ReplyMessage reply) 2272 { 2273 if( messageMediator.executePIInResponseConstructor() ) { 2274 // Invoke server request ending interception points (send_*): 2275 // Note: this may end up with a SystemException or an internal 2276 // Runtime ForwardRequest 2277 ((ORB)messageMediator.getBroker()).getPIHandler(). 2278 invokeServerPIEndingPoint( reply ); 2279 2280 // Note this will be executed even if a ForwardRequest or 2281 // SystemException is thrown by a Portable Interceptors ending 2282 // point since we end up in this constructor again anyway. 2283 ((ORB)messageMediator.getBroker()).getPIHandler(). 2284 cleanupServerPIRequest(); 2285 2286 // See createSystemExceptionResponse for why this is necesary. 2287 messageMediator.setExecutePIInResponseConstructor(false); 2288 } 2289 } 2290 2291 protected void runRemoveThreadInfo(CorbaMessageMediator messageMediator) 2292 { 2293 // Once you get here then the final reply is available (i.e., 2294 // postinvoke and interceptors have completed. 2295 if (messageMediator.executeRemoveThreadInfoInResponseConstructor()) { 2296 messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(false); 2297 ((ORB)messageMediator.getBroker()).popInvocationInfo() ; 2298 } 2299 } 2300 2301 protected ServiceContexts getServiceContextsForReply( 2302 CorbaMessageMediator messageMediator, ServiceContexts contexts) 2303 { 2304 CorbaConnection c = (CorbaConnection) messageMediator.getConnection(); 2305 2306 if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) { 2307 dprint(".getServiceContextsForReply: " 2308 + opAndId(messageMediator) 2309 + ": " + c); 2310 } 2311 2312 if (contexts == null) { 2313 contexts = new ServiceContexts(((ORB)messageMediator.getBroker())); 2314 } 2315 2316 // NOTE : We only want to send the runtime context the first time 2317 2318 if (c != null && !c.isPostInitialContexts()) { 2319 c.setPostInitialContexts(); 2320 SendingContextServiceContext scsc = 2321 new SendingContextServiceContext( 2322 ((ORB)messageMediator.getBroker()).getFVDCodeBaseIOR()) ; 2323 2324 if (contexts.get( scsc.getId() ) != null) 2325 throw wrapper.duplicateSendingContextServiceContext() ; 2326 2327 contexts.put( scsc ) ; 2328 2329 if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag) 2330 dprint(".getServiceContextsForReply: " 2331 + opAndId(messageMediator) 2332 + ": added SendingContextServiceContext" ) ; 2333 } 2334 2335 // send ORBVersion servicecontext as part of the Reply 2336 2337 ORBVersionServiceContext ovsc 2338 = new ORBVersionServiceContext(ORBVersionFactory.getORBVersion()); 2339 2340 if (contexts.get( ovsc.getId() ) != null) 2341 throw wrapper.duplicateOrbVersionServiceContext() ; 2342 2343 contexts.put( ovsc ) ; 2344 2345 if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag) 2346 dprint(".getServiceContextsForReply: " 2347 + opAndId(messageMediator) 2348 + ": added ORB version service context"); 2349 2350 return contexts; 2351 } 2352 2353 // REVISIT - this method should be migrated to orbutil.ORBUtility 2354 // since all locations that release ByteBuffers use 2355 // very similar logic and debug information. 2356 private void releaseByteBufferToPool() { 2357 if (dispatchByteBuffer != null) { 2358 orb.getByteBufferPool().releaseByteBuffer(dispatchByteBuffer); 2359 if (transportDebug()) { 2360 int bbId = System.identityHashCode(dispatchByteBuffer); 2361 StringBuffer sb = new StringBuffer(); 2362 sb.append(".handleInput: releasing ByteBuffer (" + bbId + 2363 ") to ByteBufferPool"); 2364 dprint(sb.toString()); 2365 } 2366 } 2367 } 2368 } 2369 2370 // End of file.