1 /* 2 * Copyright (c) 2001, 2004, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.util.Hashtable; 29 30 import org.omg.CORBA.CompletionStatus; 31 import org.omg.CORBA.SystemException; 32 33 import com.sun.corba.se.pept.encoding.InputObject; 34 import com.sun.corba.se.pept.encoding.OutputObject; 35 import com.sun.corba.se.pept.protocol.MessageMediator; 36 37 import com.sun.corba.se.spi.logging.CORBALogDomains; 38 import com.sun.corba.se.spi.orb.ORB; 39 import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 40 import com.sun.corba.se.spi.transport.CorbaConnection; 41 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 42 43 import com.sun.corba.se.impl.encoding.BufferManagerReadStream; 44 import com.sun.corba.se.impl.encoding.CDRInputObject; 45 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 46 import com.sun.corba.se.impl.orbutil.ORBUtility; 47 import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage; 48 import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage; 49 50 /** 51 * @author Harold Carr 52 */ 53 public class CorbaResponseWaitingRoomImpl 54 implements 55 CorbaResponseWaitingRoom 56 { 57 final static class OutCallDesc 58 { 59 java.lang.Object done = new java.lang.Object(); 60 Thread thread; 61 MessageMediator messageMediator; 62 SystemException exception; 63 InputObject inputObject; 64 } 65 66 private ORB orb; 67 private ORBUtilSystemException wrapper ; 68 69 private CorbaConnection connection; 70 // Maps requestId to an OutCallDesc. 71 private Hashtable out_calls = null; // REVISIT - use int hastable/map 72 73 public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection) 74 { 75 this.orb = orb; 76 wrapper = ORBUtilSystemException.get( orb, 77 CORBALogDomains.RPC_TRANSPORT ) ; 78 this.connection = connection; 79 out_calls = new Hashtable(); 80 } 81 82 //////////////////////////////////////////////////// 83 // 84 // pept.transport.ResponseWaitingRoom 85 // 86 87 public void registerWaiter(MessageMediator mediator) 88 { 89 CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; 90 91 if (orb.transportDebugFlag) { 92 dprint(".registerWaiter: " + opAndId(messageMediator)); 93 } 94 95 Integer requestId = messageMediator.getRequestIdInteger(); 96 97 OutCallDesc call = new OutCallDesc(); 98 call.thread = Thread.currentThread(); 99 call.messageMediator = messageMediator; 100 out_calls.put(requestId, call); 101 } 102 103 public void unregisterWaiter(MessageMediator mediator) 104 { 105 CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; 106 107 if (orb.transportDebugFlag) { 108 dprint(".unregisterWaiter: " + opAndId(messageMediator)); 109 } 110 111 Integer requestId = messageMediator.getRequestIdInteger(); 112 113 out_calls.remove(requestId); 114 } 115 116 public InputObject waitForResponse(MessageMediator mediator) 117 { 118 CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; 119 120 try { 121 122 InputObject returnStream = null; 123 124 if (orb.transportDebugFlag) { 125 dprint(".waitForResponse->: " + opAndId(messageMediator)); 126 } 127 128 Integer requestId = messageMediator.getRequestIdInteger(); 129 130 if (messageMediator.isOneWay()) { 131 // The waiter is removed in releaseReply in the same 132 // way as a normal request. 133 134 if (orb.transportDebugFlag) { 135 dprint(".waitForResponse: one way - not waiting: " 136 + opAndId(messageMediator)); 137 } 138 139 return null; 140 } 141 142 OutCallDesc call = (OutCallDesc)out_calls.get(requestId); 143 if (call == null) { 144 throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE); 145 } 146 147 synchronized(call.done) { 148 149 while (call.inputObject == null && call.exception == null) { 150 // Wait for the reply from the server. 151 // The ReaderThread reads in the reply IIOP message 152 // and signals us. 153 try { 154 if (orb.transportDebugFlag) { 155 dprint(".waitForResponse: waiting: " 156 + opAndId(messageMediator)); 157 } 158 call.done.wait(); 159 } catch (InterruptedException ie) {}; 160 } 161 162 if (call.exception != null) { 163 if (orb.transportDebugFlag) { 164 dprint(".waitForResponse: exception: " 165 + opAndId(messageMediator)); 166 } 167 throw call.exception; 168 } 169 170 returnStream = call.inputObject; 171 } 172 173 // REVISIT -- exceptions from unmarshaling code will 174 // go up through this client thread! 175 176 if (returnStream != null) { 177 // On fragmented streams the header MUST be unmarshaled here 178 // (in the client thread) in case it blocks. 179 // If the header was already unmarshaled, this won't 180 // do anything 181 // REVISIT: cast - need interface method. 182 ((CDRInputObject)returnStream).unmarshalHeader(); 183 } 184 185 return returnStream; 186 187 } finally { 188 if (orb.transportDebugFlag) { 189 dprint(".waitForResponse<-: " + opAndId(messageMediator)); 190 } 191 } 192 } 193 194 public void responseReceived(InputObject is) 195 { 196 CDRInputObject inputObject = (CDRInputObject) is; 197 LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage) 198 inputObject.getMessageHeader(); 199 Integer requestId = new Integer(header.getRequestId()); 200 OutCallDesc call = (OutCallDesc) out_calls.get(requestId); 201 202 if (orb.transportDebugFlag) { 203 dprint(".responseReceived: id/" 204 + requestId + ": " 205 + header); 206 } 207 208 // This is an interesting case. It could mean that someone sent us a 209 // reply message, but we don't know what request it was for. That 210 // would probably call for an error. However, there's another case 211 // that's normal and we should think about -- 212 // 213 // If the unmarshaling thread does all of its work inbetween the time 214 // the ReaderThread gives it the last fragment and gets to the 215 // out_calls.get line, then it will also be null, so just return; 216 if (call == null) { 217 if (orb.transportDebugFlag) { 218 dprint(".responseReceived: id/" 219 + requestId 220 + ": no waiter: " 221 + header); 222 } 223 return; 224 } 225 226 // Set the reply InputObject and signal the client thread 227 // that the reply has been received. 228 // The thread signalled will remove outcall descriptor if appropriate. 229 // Otherwise, it'll be removed when last fragment for it has been put on 230 // BufferManagerRead's queue. 231 synchronized (call.done) { 232 CorbaMessageMediator messageMediator = (CorbaMessageMediator) 233 call.messageMediator; 234 235 if (orb.transportDebugFlag) { 236 dprint(".responseReceived: " 237 + opAndId(messageMediator) 238 + ": notifying waiters"); 239 } 240 241 messageMediator.setReplyHeader(header); 242 messageMediator.setInputObject(is); 243 inputObject.setMessageMediator(messageMediator); 244 call.inputObject = is; 245 call.done.notify(); 246 } 247 } 248 249 public int numberRegistered() 250 { 251 // Note: Hashtable.size() is not synchronized 252 return out_calls.size(); 253 } 254 255 ////////////////////////////////////////////////// 256 // 257 // CorbaResponseWaitingRoom 258 // 259 260 public void signalExceptionToAllWaiters(SystemException systemException) 261 { 262 263 if (orb.transportDebugFlag) { 264 dprint(".signalExceptionToAllWaiters: " + systemException); 265 } 266 267 OutCallDesc call; 268 java.util.Enumeration e = out_calls.elements(); 269 while(e.hasMoreElements()) { 270 call = (OutCallDesc) e.nextElement(); 271 272 synchronized(call.done){ 273 // anything waiting for BufferManagerRead's fragment queue 274 // needs to be cancelled 275 CorbaMessageMediator corbaMsgMediator = 276 (CorbaMessageMediator)call.messageMediator; 277 CDRInputObject inputObject = 278 (CDRInputObject)corbaMsgMediator.getInputObject(); 279 // IMPORTANT: If inputObject is null, then no need to tell 280 // BufferManagerRead to cancel request processing. 281 if (inputObject != null) { 282 BufferManagerReadStream bufferManager = 283 (BufferManagerReadStream)inputObject.getBufferManager(); 284 int requestId = corbaMsgMediator.getRequestId(); 285 bufferManager.cancelProcessing(requestId); 286 } 287 call.inputObject = null; 288 call.exception = systemException; 289 call.done.notify(); 290 } 291 } 292 } 293 294 public MessageMediator getMessageMediator(int requestId) 295 { 296 Integer id = new Integer(requestId); 297 OutCallDesc call = (OutCallDesc) out_calls.get(id); 298 if (call == null) { 299 // This can happen when getting early reply fragments for a 300 // request which has completed (e.g., client marshaling error). 301 return null; 302 } 303 return call.messageMediator; 304 } 305 306 //////////////////////////////////////////////////// 307 // 308 // Implementation. 309 // 310 311 protected void dprint(String msg) 312 { 313 ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg); 314 } 315 316 protected String opAndId(CorbaMessageMediator mediator) 317 { 318 return ORBUtility.operationNameAndRequestId(mediator); 319 } 320 } 321 322 // End of file.