/* * Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package com.sun.corba.se.impl.transport; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.omg.CORBA.CompletionStatus; import org.omg.CORBA.SystemException; import com.sun.corba.se.pept.encoding.InputObject; import com.sun.corba.se.pept.encoding.OutputObject; import com.sun.corba.se.pept.protocol.MessageMediator; import com.sun.corba.se.spi.logging.CORBALogDomains; import com.sun.corba.se.spi.orb.ORB; import com.sun.corba.se.spi.protocol.CorbaMessageMediator; import com.sun.corba.se.spi.transport.CorbaConnection; import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; import com.sun.corba.se.impl.encoding.BufferManagerReadStream; import com.sun.corba.se.impl.encoding.CDRInputObject; import com.sun.corba.se.impl.logging.ORBUtilSystemException; import com.sun.corba.se.impl.orbutil.ORBUtility; import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage; import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage; /** * @author Harold Carr */ public class CorbaResponseWaitingRoomImpl implements CorbaResponseWaitingRoom { final static class OutCallDesc { java.lang.Object done = new java.lang.Object(); Thread thread; MessageMediator messageMediator; SystemException exception; InputObject inputObject; } private ORB orb; private ORBUtilSystemException wrapper ; private CorbaConnection connection; // Maps requestId to an OutCallDesc. final private Map out_calls; public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection) { this.orb = orb; wrapper = ORBUtilSystemException.get( orb, CORBALogDomains.RPC_TRANSPORT ) ; this.connection = connection; out_calls = Collections.synchronizedMap(new HashMap()); } //////////////////////////////////////////////////// // // pept.transport.ResponseWaitingRoom // public void registerWaiter(MessageMediator mediator) { CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; if (orb.transportDebugFlag) { dprint(".registerWaiter: " + opAndId(messageMediator)); } Integer requestId = messageMediator.getRequestIdInteger(); OutCallDesc call = new OutCallDesc(); call.thread = Thread.currentThread(); call.messageMediator = messageMediator; out_calls.put(requestId, call); } public void unregisterWaiter(MessageMediator mediator) { CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; if (orb.transportDebugFlag) { dprint(".unregisterWaiter: " + opAndId(messageMediator)); } Integer requestId = messageMediator.getRequestIdInteger(); out_calls.remove(requestId); } public InputObject waitForResponse(MessageMediator mediator) { CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator; try { InputObject returnStream = null; if (orb.transportDebugFlag) { dprint(".waitForResponse->: " + opAndId(messageMediator)); } Integer requestId = messageMediator.getRequestIdInteger(); if (messageMediator.isOneWay()) { // The waiter is removed in releaseReply in the same // way as a normal request. if (orb.transportDebugFlag) { dprint(".waitForResponse: one way - not waiting: " + opAndId(messageMediator)); } return null; } OutCallDesc call = out_calls.get(requestId); if (call == null) { throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE); } synchronized(call.done) { while (call.inputObject == null && call.exception == null) { // Wait for the reply from the server. // The ReaderThread reads in the reply IIOP message // and signals us. try { if (orb.transportDebugFlag) { dprint(".waitForResponse: waiting: " + opAndId(messageMediator)); } call.done.wait(); } catch (InterruptedException ie) {}; } if (call.exception != null) { if (orb.transportDebugFlag) { dprint(".waitForResponse: exception: " + opAndId(messageMediator)); } throw call.exception; } returnStream = call.inputObject; } // REVISIT -- exceptions from unmarshaling code will // go up through this client thread! if (returnStream != null) { // On fragmented streams the header MUST be unmarshaled here // (in the client thread) in case it blocks. // If the header was already unmarshaled, this won't // do anything // REVISIT: cast - need interface method. ((CDRInputObject)returnStream).unmarshalHeader(); } return returnStream; } finally { if (orb.transportDebugFlag) { dprint(".waitForResponse<-: " + opAndId(messageMediator)); } } } public void responseReceived(InputObject is) { CDRInputObject inputObject = (CDRInputObject) is; LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage) inputObject.getMessageHeader(); Integer requestId = new Integer(header.getRequestId()); OutCallDesc call = out_calls.get(requestId); if (orb.transportDebugFlag) { dprint(".responseReceived: id/" + requestId + ": " + header); } // This is an interesting case. It could mean that someone sent us a // reply message, but we don't know what request it was for. That // would probably call for an error. However, there's another case // that's normal and we should think about -- // // If the unmarshaling thread does all of its work inbetween the time // the ReaderThread gives it the last fragment and gets to the // out_calls.get line, then it will also be null, so just return; if (call == null) { if (orb.transportDebugFlag) { dprint(".responseReceived: id/" + requestId + ": no waiter: " + header); } return; } // Set the reply InputObject and signal the client thread // that the reply has been received. // The thread signalled will remove outcall descriptor if appropriate. // Otherwise, it'll be removed when last fragment for it has been put on // BufferManagerRead's queue. synchronized (call.done) { CorbaMessageMediator messageMediator = (CorbaMessageMediator) call.messageMediator; if (orb.transportDebugFlag) { dprint(".responseReceived: " + opAndId(messageMediator) + ": notifying waiters"); } messageMediator.setReplyHeader(header); messageMediator.setInputObject(is); inputObject.setMessageMediator(messageMediator); call.inputObject = is; call.done.notify(); } } public int numberRegistered() { return out_calls.size(); } ////////////////////////////////////////////////// // // CorbaResponseWaitingRoom // public void signalExceptionToAllWaiters(SystemException systemException) { if (orb.transportDebugFlag) { dprint(".signalExceptionToAllWaiters: " + systemException); } synchronized (out_calls) { if (orb.transportDebugFlag) { dprint(".signalExceptionToAllWaiters: out_calls size :" + out_calls.size()); } for (OutCallDesc call : out_calls.values()) { if (orb.transportDebugFlag) { dprint(".signalExceptionToAllWaiters: signaling " + call); } synchronized(call.done) { try { // anything waiting for BufferManagerRead's fragment queue // needs to be cancelled CorbaMessageMediator corbaMsgMediator = (CorbaMessageMediator)call.messageMediator; CDRInputObject inputObject = (CDRInputObject)corbaMsgMediator.getInputObject(); // IMPORTANT: If inputObject is null, then no need to tell // BufferManagerRead to cancel request processing. if (inputObject != null) { BufferManagerReadStream bufferManager = (BufferManagerReadStream)inputObject.getBufferManager(); int requestId = corbaMsgMediator.getRequestId(); bufferManager.cancelProcessing(requestId); } } catch (Exception e) { } finally { // attempt to wake up waiting threads in all cases call.inputObject = null; call.exception = systemException; call.done.notifyAll(); } } } } } public MessageMediator getMessageMediator(int requestId) { Integer id = new Integer(requestId); OutCallDesc call = out_calls.get(id); if (call == null) { // This can happen when getting early reply fragments for a // request which has completed (e.g., client marshaling error). return null; } return call.messageMediator; } //////////////////////////////////////////////////// // // Implementation. // protected void dprint(String msg) { ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg); } protected String opAndId(CorbaMessageMediator mediator) { return ORBUtility.operationNameAndRequestId(mediator); } } // End of file.