1 /* 2 * Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.util.Collections; 29 import java.util.HashMap; 30 import java.util.Iterator; 31 import java.util.Map; 32 33 import org.omg.CORBA.CompletionStatus; 34 import org.omg.CORBA.SystemException; 35 36 import com.sun.corba.se.pept.encoding.InputObject; 37 import com.sun.corba.se.pept.encoding.OutputObject; 38 import com.sun.corba.se.pept.protocol.MessageMediator; 39 40 import com.sun.corba.se.spi.logging.CORBALogDomains; 41 import com.sun.corba.se.spi.orb.ORB; 42 import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 43 import com.sun.corba.se.spi.transport.CorbaConnection; 44 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 45 46 import com.sun.corba.se.impl.encoding.BufferManagerReadStream; 47 import com.sun.corba.se.impl.encoding.CDRInputObject; 48 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 49 import com.sun.corba.se.impl.orbutil.ORBUtility; 50 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage; 51 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage; 52 53 /** 54 * @author Harold Carr 55 */ 56 public class CorbaResponseWaitingRoomImpl 57 implements 58 CorbaResponseWaitingRoom 59 { 60 final static class OutCallDesc 61 { 62 java.lang.Object done = new java.lang.Object(); 63 Thread thread; 64 MessageMediator messageMediator; 65 SystemException exception; 66 InputObject inputObject; 67 } 68 69 private ORB orb; 70 private ORBUtilSystemException wrapper ; 71 72 private CorbaConnection connection; 73 // Maps requestId to an OutCallDesc. 74 final private Map<Integer, OutCallDesc> out_calls; 75 76 public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection) 77 { 78 this.orb = orb; 79 wrapper = ORBUtilSystemException.get( orb, 80 CORBALogDomains.RPC_TRANSPORT ) ; 81 this.connection = connection; 82 out_calls = 83 Collections.synchronizedMap(new HashMap<Integer, OutCallDesc>()); 84 } 85 86 //////////////////////////////////////////////////// 87 // 88 // pept.transport.ResponseWaitingRoom 89 // 90 91 public void registerWaiter(MessageMediator mediator) 92 { 93 CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; 94 95 if (orb.transportDebugFlag) { 96 dprint(".registerWaiter: " + opAndId(messageMediator)); 97 } 98 99 Integer requestId = messageMediator.getRequestIdInteger(); 100 101 OutCallDesc call = new OutCallDesc(); 102 call.thread = Thread.currentThread(); 103 call.messageMediator = messageMediator; 104 out_calls.put(requestId, call); 105 } 106 107 public void unregisterWaiter(MessageMediator mediator) 108 { 109 CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; 110 111 if (orb.transportDebugFlag) { 112 dprint(".unregisterWaiter: " + opAndId(messageMediator)); 113 } 114 115 Integer requestId = messageMediator.getRequestIdInteger(); 116 117 out_calls.remove(requestId); 118 } 119 120 public InputObject waitForResponse(MessageMediator mediator) 121 { 122 CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; 123 124 try { 125 126 InputObject returnStream = null; 127 128 if (orb.transportDebugFlag) { 129 dprint(".waitForResponse->: " + opAndId(messageMediator)); 130 } 131 132 Integer requestId = messageMediator.getRequestIdInteger(); 133 134 if (messageMediator.isOneWay()) { 135 // The waiter is removed in releaseReply in the same 136 // way as a normal request. 137 138 if (orb.transportDebugFlag) { 139 dprint(".waitForResponse: one way - not waiting: " 140 + opAndId(messageMediator)); 141 } 142 143 return null; 144 } 145 146 OutCallDesc call = out_calls.get(requestId); 147 if (call == null) { 148 throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE); 149 } 150 151 synchronized(call.done) { 152 153 while (call.inputObject == null && call.exception == null) { 154 // Wait for the reply from the server. 155 // The ReaderThread reads in the reply IIOP message 156 // and signals us. 157 try { 158 if (orb.transportDebugFlag) { 159 dprint(".waitForResponse: waiting: " 160 + opAndId(messageMediator)); 161 } 162 call.done.wait(); 163 } catch (InterruptedException ie) {}; 164 } 165 166 if (call.exception != null) { 167 if (orb.transportDebugFlag) { 168 dprint(".waitForResponse: exception: " 169 + opAndId(messageMediator)); 170 } 171 throw call.exception; 172 } 173 174 returnStream = call.inputObject; 175 } 176 177 // REVISIT -- exceptions from unmarshaling code will 178 // go up through this client thread! 179 180 if (returnStream != null) { 181 // On fragmented streams the header MUST be unmarshaled here 182 // (in the client thread) in case it blocks. 183 // If the header was already unmarshaled, this won't 184 // do anything 185 // REVISIT: cast - need interface method. 186 ((CDRInputObject)returnStream).unmarshalHeader(); 187 } 188 189 return returnStream; 190 191 } finally { 192 if (orb.transportDebugFlag) { 193 dprint(".waitForResponse<-: " + opAndId(messageMediator)); 194 } 195 } 196 } 197 198 public void responseReceived(InputObject is) 199 { 200 CDRInputObject inputObject = (CDRInputObject) is; 201 LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage) 202 inputObject.getMessageHeader(); 203 Integer requestId = new Integer(header.getRequestId()); 204 OutCallDesc call = out_calls.get(requestId); 205 206 if (orb.transportDebugFlag) { 207 dprint(".responseReceived: id/" 208 + requestId + ": " 209 + header); 210 } 211 212 // This is an interesting case. It could mean that someone sent us a 213 // reply message, but we don't know what request it was for. That 214 // would probably call for an error. However, there's another case 215 // that's normal and we should think about -- 216 // 217 // If the unmarshaling thread does all of its work inbetween the time 218 // the ReaderThread gives it the last fragment and gets to the 219 // out_calls.get line, then it will also be null, so just return; 220 if (call == null) { 221 if (orb.transportDebugFlag) { 222 dprint(".responseReceived: id/" 223 + requestId 224 + ": no waiter: " 225 + header); 226 } 227 return; 228 } 229 230 // Set the reply InputObject and signal the client thread 231 // that the reply has been received. 232 // The thread signalled will remove outcall descriptor if appropriate. 233 // Otherwise, it'll be removed when last fragment for it has been put on 234 // BufferManagerRead's queue. 235 synchronized (call.done) { 236 CorbaMessageMediator messageMediator = (CorbaMessageMediator) 237 call.messageMediator; 238 239 if (orb.transportDebugFlag) { 240 dprint(".responseReceived: " 241 + opAndId(messageMediator) 242 + ": notifying waiters"); 243 } 244 245 messageMediator.setReplyHeader(header); 246 messageMediator.setInputObject(is); 247 inputObject.setMessageMediator(messageMediator); 248 call.inputObject = is; 249 call.done.notify(); 250 } 251 } 252 253 public int numberRegistered() 254 { 255 return out_calls.size(); 256 } 257 258 ////////////////////////////////////////////////// 259 // 260 // CorbaResponseWaitingRoom 261 // 262 263 public void signalExceptionToAllWaiters(SystemException systemException) 264 { 265 266 if (orb.transportDebugFlag) { 267 dprint(".signalExceptionToAllWaiters: " + systemException); 268 } 269 270 synchronized (out_calls) { 271 if (orb.transportDebugFlag) { 272 dprint(".signalExceptionToAllWaiters: out_calls size :" + 273 out_calls.size()); 274 } 275 276 for (OutCallDesc call : out_calls.values()) { 277 if (orb.transportDebugFlag) { 278 dprint(".signalExceptionToAllWaiters: signaling " + 279 call); 280 } 281 synchronized(call.done) { 282 try { 283 // anything waiting for BufferManagerRead's fragment queue 284 // needs to be cancelled 285 CorbaMessageMediator corbaMsgMediator = 286 (CorbaMessageMediator)call.messageMediator; 287 CDRInputObject inputObject = 288 (CDRInputObject)corbaMsgMediator.getInputObject(); 289 // IMPORTANT: If inputObject is null, then no need to tell 290 // BufferManagerRead to cancel request processing. 291 if (inputObject != null) { 292 BufferManagerReadStream bufferManager = 293 (BufferManagerReadStream)inputObject.getBufferManager(); 294 int requestId = corbaMsgMediator.getRequestId(); 295 bufferManager.cancelProcessing(requestId); 296 } 297 } catch (Exception e) { 298 } finally { 299 // attempt to wake up waiting threads in all cases 300 call.inputObject = null; 301 call.exception = systemException; 302 call.done.notifyAll(); 303 } 304 } 305 } 306 } 307 } 308 309 public MessageMediator getMessageMediator(int requestId) 310 { 311 Integer id = new Integer(requestId); 312 OutCallDesc call = out_calls.get(id); 313 if (call == null) { 314 // This can happen when getting early reply fragments for a 315 // request which has completed (e.g., client marshaling error). 316 return null; 317 } 318 return call.messageMediator; 319 } 320 321 //////////////////////////////////////////////////// 322 // 323 // Implementation. 324 // 325 326 protected void dprint(String msg) 327 { 328 ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg); 329 } 330 331 protected String opAndId(CorbaMessageMediator mediator) 332 { 333 return ORBUtility.operationNameAndRequestId(mediator); 334 } 335 } 336 337 // End of file.