1 /*
   2  * Copyright (c) 2001, 2004, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.util.Hashtable;
  29 
  30 import org.omg.CORBA.CompletionStatus;
  31 import org.omg.CORBA.SystemException;
  32 
  33 import com.sun.corba.se.pept.encoding.InputObject;
  34 import com.sun.corba.se.pept.encoding.OutputObject;
  35 import com.sun.corba.se.pept.protocol.MessageMediator;
  36 
  37 import com.sun.corba.se.spi.logging.CORBALogDomains;
  38 import com.sun.corba.se.spi.orb.ORB;
  39 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  40 import com.sun.corba.se.spi.transport.CorbaConnection;
  41 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  42 
  43 import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
  44 import com.sun.corba.se.impl.encoding.CDRInputObject;
  45 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  46 import com.sun.corba.se.impl.orbutil.ORBUtility;
  47 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
  48 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
  49 
  50 /**
  51  * @author Harold Carr
  52  */
  53 public class CorbaResponseWaitingRoomImpl
  54     implements
  55         CorbaResponseWaitingRoom
  56 {
  57     final static class OutCallDesc
  58     {
  59         java.lang.Object done = new java.lang.Object();
  60         Thread thread;
  61         MessageMediator messageMediator;
  62         SystemException exception;
  63         InputObject inputObject;
  64     }
  65 
  66     private ORB orb;
  67     private ORBUtilSystemException wrapper ;
  68 
  69     private CorbaConnection connection;
  70     // Maps requestId to an OutCallDesc.
  71     private Hashtable out_calls = null; // REVISIT - use int hastable/map
  72 
  73     public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
  74     {
  75         this.orb = orb;
  76         wrapper = ORBUtilSystemException.get( orb,
  77             CORBALogDomains.RPC_TRANSPORT ) ;
  78         this.connection = connection;
  79         out_calls = new Hashtable();
  80     }
  81 
  82     ////////////////////////////////////////////////////
  83     //
  84     // pept.transport.ResponseWaitingRoom
  85     //
  86 
  87     public void registerWaiter(MessageMediator mediator)
  88     {
  89         CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
  90 
  91         if (orb.transportDebugFlag) {
  92             dprint(".registerWaiter: " + opAndId(messageMediator));
  93         }
  94 
  95         Integer requestId = messageMediator.getRequestIdInteger();
  96 
  97         OutCallDesc call = new OutCallDesc();
  98         call.thread = Thread.currentThread();
  99         call.messageMediator = messageMediator;
 100         out_calls.put(requestId, call);
 101     }
 102 
 103     public void unregisterWaiter(MessageMediator mediator)
 104     {
 105         CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
 106 
 107         if (orb.transportDebugFlag) {
 108             dprint(".unregisterWaiter: " + opAndId(messageMediator));
 109         }
 110 
 111         Integer requestId = messageMediator.getRequestIdInteger();
 112 
 113         out_calls.remove(requestId);
 114     }
 115 
 116     public InputObject waitForResponse(MessageMediator mediator)
 117     {
 118       CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
 119 
 120       try {
 121 
 122         InputObject returnStream = null;
 123 
 124         if (orb.transportDebugFlag) {
 125             dprint(".waitForResponse->: " + opAndId(messageMediator));
 126         }
 127 
 128         Integer requestId = messageMediator.getRequestIdInteger();
 129 
 130         if (messageMediator.isOneWay()) {
 131             // The waiter is removed in releaseReply in the same
 132             // way as a normal request.
 133 
 134             if (orb.transportDebugFlag) {
 135                 dprint(".waitForResponse: one way - not waiting: "
 136                        + opAndId(messageMediator));
 137             }
 138 
 139             return null;
 140         }
 141 
 142         OutCallDesc call = (OutCallDesc)out_calls.get(requestId);
 143         if (call == null) {
 144             throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
 145         }
 146 
 147         synchronized(call.done) {
 148 
 149             while (call.inputObject == null && call.exception == null) {
 150                 // Wait for the reply from the server.
 151                 // The ReaderThread reads in the reply IIOP message
 152                 // and signals us.
 153                 try {
 154                     if (orb.transportDebugFlag) {
 155                         dprint(".waitForResponse: waiting: "
 156                                + opAndId(messageMediator));
 157                     }
 158                     call.done.wait();
 159                 } catch (InterruptedException ie) {};
 160             }
 161 
 162             if (call.exception != null) {
 163                 if (orb.transportDebugFlag) {
 164                     dprint(".waitForResponse: exception: "
 165                            + opAndId(messageMediator));
 166                 }
 167                 throw call.exception;
 168             }
 169 
 170             returnStream = call.inputObject;
 171         }
 172 
 173         // REVISIT -- exceptions from unmarshaling code will
 174         // go up through this client thread!
 175 
 176         if (returnStream != null) {
 177             // On fragmented streams the header MUST be unmarshaled here
 178             // (in the client thread) in case it blocks.
 179             // If the header was already unmarshaled, this won't
 180             // do anything
 181             // REVISIT: cast - need interface method.
 182             ((CDRInputObject)returnStream).unmarshalHeader();
 183         }
 184 
 185         return returnStream;
 186 
 187       } finally {
 188         if (orb.transportDebugFlag) {
 189             dprint(".waitForResponse<-: " + opAndId(messageMediator));
 190         }
 191       }
 192     }
 193 
 194     public void responseReceived(InputObject is)
 195     {
 196         CDRInputObject inputObject = (CDRInputObject) is;
 197         LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
 198             inputObject.getMessageHeader();
 199         Integer requestId = new Integer(header.getRequestId());
 200         OutCallDesc call = (OutCallDesc) out_calls.get(requestId);
 201 
 202         if (orb.transportDebugFlag) {
 203             dprint(".responseReceived: id/"
 204                    + requestId  + ": "
 205                    + header);
 206         }
 207 
 208         // This is an interesting case.  It could mean that someone sent us a
 209         // reply message, but we don't know what request it was for.  That
 210         // would probably call for an error.  However, there's another case
 211         // that's normal and we should think about --
 212         //
 213         // If the unmarshaling thread does all of its work inbetween the time
 214         // the ReaderThread gives it the last fragment and gets to the
 215         // out_calls.get line, then it will also be null, so just return;
 216         if (call == null) {
 217             if (orb.transportDebugFlag) {
 218                 dprint(".responseReceived: id/"
 219                        + requestId
 220                        + ": no waiter: "
 221                        + header);
 222             }
 223             return;
 224         }
 225 
 226         // Set the reply InputObject and signal the client thread
 227         // that the reply has been received.
 228         // The thread signalled will remove outcall descriptor if appropriate.
 229         // Otherwise, it'll be removed when last fragment for it has been put on
 230         // BufferManagerRead's queue.
 231         synchronized (call.done) {
 232             CorbaMessageMediator messageMediator = (CorbaMessageMediator)
 233                 call.messageMediator;
 234 
 235             if (orb.transportDebugFlag) {
 236                 dprint(".responseReceived: "
 237                        + opAndId(messageMediator)
 238                        + ": notifying waiters");
 239             }
 240 
 241             messageMediator.setReplyHeader(header);
 242             messageMediator.setInputObject(is);
 243             inputObject.setMessageMediator(messageMediator);
 244             call.inputObject = is;
 245             call.done.notify();
 246         }
 247     }
 248 
 249     public int numberRegistered()
 250     {
 251         // Note: Hashtable.size() is not synchronized
 252         return out_calls.size();
 253     }
 254 
 255     //////////////////////////////////////////////////
 256     //
 257     // CorbaResponseWaitingRoom
 258     //
 259 
 260     public void signalExceptionToAllWaiters(SystemException systemException)
 261     {
 262 
 263         if (orb.transportDebugFlag) {
 264             dprint(".signalExceptionToAllWaiters: " + systemException);
 265         }
 266 
 267         OutCallDesc call;
 268         java.util.Enumeration e = out_calls.elements();
 269         while(e.hasMoreElements()) {
 270             call = (OutCallDesc) e.nextElement();
 271 
 272             synchronized(call.done){
 273                 // anything waiting for BufferManagerRead's fragment queue
 274                 // needs to be cancelled
 275                 CorbaMessageMediator corbaMsgMediator =
 276                              (CorbaMessageMediator)call.messageMediator;
 277                 CDRInputObject inputObject =
 278                            (CDRInputObject)corbaMsgMediator.getInputObject();
 279                 // IMPORTANT: If inputObject is null, then no need to tell
 280                 //            BufferManagerRead to cancel request processing.
 281                 if (inputObject != null) {
 282                     BufferManagerReadStream bufferManager =
 283                         (BufferManagerReadStream)inputObject.getBufferManager();
 284                     int requestId = corbaMsgMediator.getRequestId();
 285                     bufferManager.cancelProcessing(requestId);
 286                 }
 287                 call.inputObject = null;
 288                 call.exception = systemException;
 289                 call.done.notify();
 290             }
 291         }
 292     }
 293 
 294     public MessageMediator getMessageMediator(int requestId)
 295     {
 296         Integer id = new Integer(requestId);
 297         OutCallDesc call = (OutCallDesc) out_calls.get(id);
 298         if (call == null) {
 299             // This can happen when getting early reply fragments for a
 300             // request which has completed (e.g., client marshaling error).
 301             return null;
 302         }
 303         return call.messageMediator;
 304     }
 305 
 306     ////////////////////////////////////////////////////
 307     //
 308     // Implementation.
 309     //
 310 
 311     protected void dprint(String msg)
 312     {
 313         ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
 314     }
 315 
 316     protected String opAndId(CorbaMessageMediator mediator)
 317     {
 318         return ORBUtility.operationNameAndRequestId(mediator);
 319     }
 320 }
 321 
 322 // End of file.