1 /*
   2  * Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.util.Collections;
  29 import java.util.HashMap;
  30 import java.util.Iterator;
  31 import java.util.Map;
  32 
  33 import org.omg.CORBA.CompletionStatus;
  34 import org.omg.CORBA.SystemException;
  35 
  36 import com.sun.corba.se.pept.encoding.InputObject;
  37 import com.sun.corba.se.pept.encoding.OutputObject;
  38 import com.sun.corba.se.pept.protocol.MessageMediator;
  39 
  40 import com.sun.corba.se.spi.logging.CORBALogDomains;
  41 import com.sun.corba.se.spi.orb.ORB;
  42 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  43 import com.sun.corba.se.spi.transport.CorbaConnection;
  44 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  45 
  46 import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
  47 import com.sun.corba.se.impl.encoding.CDRInputObject;
  48 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  49 import com.sun.corba.se.impl.orbutil.ORBUtility;
  50 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
  51 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
  52 
  53 /**
  54  * @author Harold Carr
  55  */
  56 public class CorbaResponseWaitingRoomImpl
  57     implements
  58         CorbaResponseWaitingRoom
  59 {
  60     final static class OutCallDesc
  61     {
  62         java.lang.Object done = new java.lang.Object();
  63         Thread thread;
  64         MessageMediator messageMediator;
  65         SystemException exception;
  66         InputObject inputObject;
  67     }
  68 
  69     private ORB orb;
  70     private ORBUtilSystemException wrapper ;
  71 
  72     private CorbaConnection connection;
  73     // Maps requestId to an OutCallDesc.
  74     final private Map<Integer, OutCallDesc> out_calls;
  75 
  76     public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
  77     {
  78         this.orb = orb;
  79         wrapper = ORBUtilSystemException.get( orb,
  80             CORBALogDomains.RPC_TRANSPORT ) ;
  81         this.connection = connection;
  82         out_calls =
  83             Collections.synchronizedMap(new HashMap<Integer, OutCallDesc>());
  84     }
  85 
  86     ////////////////////////////////////////////////////
  87     //
  88     // pept.transport.ResponseWaitingRoom
  89     //
  90 
  91     public void registerWaiter(MessageMediator mediator)
  92     {
  93         CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
  94 
  95         if (orb.transportDebugFlag) {
  96             dprint(".registerWaiter: " + opAndId(messageMediator));
  97         }
  98 
  99         Integer requestId = messageMediator.getRequestIdInteger();
 100 
 101         OutCallDesc call = new OutCallDesc();
 102         call.thread = Thread.currentThread();
 103         call.messageMediator = messageMediator;
 104         out_calls.put(requestId, call);
 105     }
 106 
 107     public void unregisterWaiter(MessageMediator mediator)
 108     {
 109         CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
 110 
 111         if (orb.transportDebugFlag) {
 112             dprint(".unregisterWaiter: " + opAndId(messageMediator));
 113         }
 114 
 115         Integer requestId = messageMediator.getRequestIdInteger();
 116 
 117         out_calls.remove(requestId);
 118     }
 119 
 120     public InputObject waitForResponse(MessageMediator mediator)
 121     {
 122       CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
 123 
 124       try {
 125 
 126         InputObject returnStream = null;
 127 
 128         if (orb.transportDebugFlag) {
 129             dprint(".waitForResponse->: " + opAndId(messageMediator));
 130         }
 131 
 132         Integer requestId = messageMediator.getRequestIdInteger();
 133 
 134         if (messageMediator.isOneWay()) {
 135             // The waiter is removed in releaseReply in the same
 136             // way as a normal request.
 137 
 138             if (orb.transportDebugFlag) {
 139                 dprint(".waitForResponse: one way - not waiting: "
 140                        + opAndId(messageMediator));
 141             }
 142 
 143             return null;
 144         }
 145 
 146         OutCallDesc call = out_calls.get(requestId);
 147         if (call == null) {
 148             throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
 149         }
 150 
 151         synchronized(call.done) {
 152 
 153             while (call.inputObject == null && call.exception == null) {
 154                 // Wait for the reply from the server.
 155                 // The ReaderThread reads in the reply IIOP message
 156                 // and signals us.
 157                 try {
 158                     if (orb.transportDebugFlag) {
 159                         dprint(".waitForResponse: waiting: "
 160                                + opAndId(messageMediator));
 161                     }
 162                     call.done.wait();
 163                 } catch (InterruptedException ie) {};
 164             }
 165 
 166             if (call.exception != null) {
 167                 if (orb.transportDebugFlag) {
 168                     dprint(".waitForResponse: exception: "
 169                            + opAndId(messageMediator));
 170                 }
 171                 throw call.exception;
 172             }
 173 
 174             returnStream = call.inputObject;
 175         }
 176 
 177         // REVISIT -- exceptions from unmarshaling code will
 178         // go up through this client thread!
 179 
 180         if (returnStream != null) {
 181             // On fragmented streams the header MUST be unmarshaled here
 182             // (in the client thread) in case it blocks.
 183             // If the header was already unmarshaled, this won't
 184             // do anything
 185             // REVISIT: cast - need interface method.
 186             ((CDRInputObject)returnStream).unmarshalHeader();
 187         }
 188 
 189         return returnStream;
 190 
 191       } finally {
 192         if (orb.transportDebugFlag) {
 193             dprint(".waitForResponse<-: " + opAndId(messageMediator));
 194         }
 195       }
 196     }
 197 
 198     public void responseReceived(InputObject is)
 199     {
 200         CDRInputObject inputObject = (CDRInputObject) is;
 201         LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
 202             inputObject.getMessageHeader();
 203         Integer requestId = new Integer(header.getRequestId());
 204         OutCallDesc call = out_calls.get(requestId);
 205 
 206         if (orb.transportDebugFlag) {
 207             dprint(".responseReceived: id/"
 208                    + requestId  + ": "
 209                    + header);
 210         }
 211 
 212         // This is an interesting case.  It could mean that someone sent us a
 213         // reply message, but we don't know what request it was for.  That
 214         // would probably call for an error.  However, there's another case
 215         // that's normal and we should think about --
 216         //
 217         // If the unmarshaling thread does all of its work inbetween the time
 218         // the ReaderThread gives it the last fragment and gets to the
 219         // out_calls.get line, then it will also be null, so just return;
 220         if (call == null) {
 221             if (orb.transportDebugFlag) {
 222                 dprint(".responseReceived: id/"
 223                        + requestId
 224                        + ": no waiter: "
 225                        + header);
 226             }
 227             return;
 228         }
 229 
 230         // Set the reply InputObject and signal the client thread
 231         // that the reply has been received.
 232         // The thread signalled will remove outcall descriptor if appropriate.
 233         // Otherwise, it'll be removed when last fragment for it has been put on
 234         // BufferManagerRead's queue.
 235         synchronized (call.done) {
 236             CorbaMessageMediator messageMediator = (CorbaMessageMediator)
 237                 call.messageMediator;
 238 
 239             if (orb.transportDebugFlag) {
 240                 dprint(".responseReceived: "
 241                        + opAndId(messageMediator)
 242                        + ": notifying waiters");
 243             }
 244 
 245             messageMediator.setReplyHeader(header);
 246             messageMediator.setInputObject(is);
 247             inputObject.setMessageMediator(messageMediator);
 248             call.inputObject = is;
 249             call.done.notify();
 250         }
 251     }
 252 
 253     public int numberRegistered()
 254     {
 255         return out_calls.size();
 256     }
 257 
 258     //////////////////////////////////////////////////
 259     //
 260     // CorbaResponseWaitingRoom
 261     //
 262 
 263     public void signalExceptionToAllWaiters(SystemException systemException)
 264     {
 265 
 266         if (orb.transportDebugFlag) {
 267             dprint(".signalExceptionToAllWaiters: " + systemException);
 268         }
 269 
 270         synchronized (out_calls) {
 271             if (orb.transportDebugFlag) {
 272                 dprint(".signalExceptionToAllWaiters: out_calls size :" +
 273                        out_calls.size());
 274             }
 275 
 276             for (OutCallDesc call : out_calls.values()) {
 277                 if (orb.transportDebugFlag) {
 278                     dprint(".signalExceptionToAllWaiters: signaling " +
 279                             call);
 280                 }
 281                 synchronized(call.done) {
 282                     try {
 283                         // anything waiting for BufferManagerRead's fragment queue
 284                         // needs to be cancelled
 285                         CorbaMessageMediator corbaMsgMediator =
 286                                      (CorbaMessageMediator)call.messageMediator;
 287                         CDRInputObject inputObject =
 288                                    (CDRInputObject)corbaMsgMediator.getInputObject();
 289                         // IMPORTANT: If inputObject is null, then no need to tell
 290                         //            BufferManagerRead to cancel request processing.
 291                         if (inputObject != null) {
 292                             BufferManagerReadStream bufferManager =
 293                                 (BufferManagerReadStream)inputObject.getBufferManager();
 294                             int requestId = corbaMsgMediator.getRequestId();
 295                             bufferManager.cancelProcessing(requestId);
 296                         }
 297                     } catch (Exception e) {
 298                     } finally {
 299                         // attempt to wake up waiting threads in all cases
 300                         call.inputObject = null;
 301                         call.exception = systemException;
 302                         call.done.notifyAll();
 303                     }
 304                 }
 305             }
 306         }
 307     }
 308 
 309     public MessageMediator getMessageMediator(int requestId)
 310     {
 311         Integer id = new Integer(requestId);
 312         OutCallDesc call = out_calls.get(id);
 313         if (call == null) {
 314             // This can happen when getting early reply fragments for a
 315             // request which has completed (e.g., client marshaling error).
 316             return null;
 317         }
 318         return call.messageMediator;
 319     }
 320 
 321     ////////////////////////////////////////////////////
 322     //
 323     // Implementation.
 324     //
 325 
 326     protected void dprint(String msg)
 327     {
 328         ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
 329     }
 330 
 331     protected String opAndId(CorbaMessageMediator mediator)
 332     {
 333         return ORBUtility.operationNameAndRequestId(mediator);
 334     }
 335 }
 336 
 337 // End of file.