1 /*
   2  * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package com.sun.corba.se.impl.encoding;
  26 
  27 import java.nio.ByteBuffer;
  28 import com.sun.corba.se.pept.transport.ByteBufferPool;
  29 import com.sun.corba.se.spi.logging.CORBALogDomains;
  30 import com.sun.corba.se.spi.orb.ORB;
  31 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  32 import com.sun.corba.se.impl.orbutil.ORBUtility;
  33 import com.sun.corba.se.impl.protocol.RequestCanceledException;
  34 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
  35 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
  36 import java.util.*;
  37 
  38 public class BufferManagerReadStream
  39     implements BufferManagerRead, MarkAndResetHandler
  40 {
  41     private boolean receivedCancel = false;
  42     private int cancelReqId = 0;
  43 
  44     // We should convert endOfStream to a final static dummy end node
  45     private boolean endOfStream = true;
  46     private BufferQueue fragmentQueue = new BufferQueue();
  47     private long FRAGMENT_TIMEOUT = 60000;
  48 
  49     // REVISIT - This should go in BufferManagerRead. But, since
  50     //           BufferManagerRead is an interface. BufferManagerRead
  51     //           might ought to be an abstract class instead of an
  52     //           interface.
  53     private ORB orb ;
  54     private ORBUtilSystemException wrapper ;
  55     private boolean debug = false;
  56 
  57     BufferManagerReadStream( ORB orb )
  58     {
  59         this.orb = orb ;
  60         this.wrapper = ORBUtilSystemException.get( orb,
  61             CORBALogDomains.RPC_ENCODING ) ;
  62         debug = orb.transportDebugFlag;
  63     }
  64 
  65     public void cancelProcessing(int requestId) {
  66         synchronized(fragmentQueue) {
  67             receivedCancel = true;
  68             cancelReqId = requestId;
  69             fragmentQueue.notify();
  70         }
  71     }
  72 
  73     public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
  74     {
  75         ByteBufferWithInfo bbwi =
  76             new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
  77 
  78         synchronized (fragmentQueue) {
  79             if (debug)
  80             {
  81                 // print address of ByteBuffer being queued
  82                 int bbAddress = System.identityHashCode(byteBuffer);
  83                 StringBuffer sb = new StringBuffer(80);
  84                 sb.append("processFragment() - queueing ByteBuffer id (");
  85                 sb.append(bbAddress).append(") to fragment queue.");
  86                 String strMsg = sb.toString();
  87                 dprint(strMsg);
  88             }
  89             fragmentQueue.enqueue(bbwi);
  90             endOfStream = !msg.moreFragmentsToFollow();
  91             fragmentQueue.notify();
  92         }
  93     }
  94 
  95     public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
  96     {
  97 
  98       ByteBufferWithInfo result = null;
  99 
 100       try {
 101           //System.out.println("ENTER underflow");
 102 
 103         synchronized (fragmentQueue) {
 104 
 105             if (receivedCancel) {
 106                 throw new RequestCanceledException(cancelReqId);
 107             }
 108 
 109             while (fragmentQueue.size() == 0) {
 110 
 111                 if (endOfStream) {
 112                     throw wrapper.endOfStream() ;
 113                 }
 114 
 115                 boolean interrupted = false;
 116                 try {
 117                     fragmentQueue.wait(FRAGMENT_TIMEOUT);
 118                 } catch (InterruptedException e) {
 119                     interrupted = true;
 120                 }
 121 
 122                 if (!interrupted && fragmentQueue.size() == 0) {
 123                     throw wrapper.bufferReadManagerTimeout();
 124                 }
 125 
 126                 if (receivedCancel) {
 127                     throw new RequestCanceledException(cancelReqId);
 128                 }
 129             }
 130 
 131             result = fragmentQueue.dequeue();
 132             result.fragmented = true;
 133 
 134             if (debug)
 135             {
 136                 // print address of ByteBuffer being dequeued
 137                 int bbAddr = System.identityHashCode(result.byteBuffer);
 138                 StringBuffer sb1 = new StringBuffer(80);
 139                 sb1.append("underflow() - dequeued ByteBuffer id (");
 140                 sb1.append(bbAddr).append(") from fragment queue.");
 141                 String msg1 = sb1.toString();
 142                 dprint(msg1);
 143             }
 144 
 145             // VERY IMPORTANT
 146             // Release bbwi.byteBuffer to the ByteBufferPool only if
 147             // this BufferManagerStream is not marked for potential restore.
 148             if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
 149             {
 150                 ByteBufferPool byteBufferPool = getByteBufferPool();
 151 
 152                 if (debug)
 153                 {
 154                     // print address of ByteBuffer being released
 155                     int bbAddress = System.identityHashCode(bbwi.byteBuffer);
 156                     StringBuffer sb = new StringBuffer(80);
 157                     sb.append("underflow() - releasing ByteBuffer id (");
 158                     sb.append(bbAddress).append(") to ByteBufferPool.");
 159                     String msg = sb.toString();
 160                     dprint(msg);
 161                 }
 162 
 163                 byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
 164                 bbwi.byteBuffer = null;
 165                 bbwi = null;
 166             }
 167         }
 168         return result;
 169       } finally {
 170           //System.out.println("EXIT underflow");
 171       }
 172     }
 173 
 174     public void init(Message msg) {
 175         if (msg != null)
 176             endOfStream = !msg.moreFragmentsToFollow();
 177     }
 178 
 179     // Release any queued ByteBufferWithInfo's byteBuffers to the
 180     // ByteBufferPoool
 181     public void close(ByteBufferWithInfo bbwi)
 182     {
 183         int inputBbAddress = 0;
 184 
 185         // release ByteBuffers on fragmentQueue
 186         if (fragmentQueue != null)
 187         {
 188             synchronized (fragmentQueue)
 189             {
 190                 // IMPORTANT: The fragment queue may have one ByteBuffer
 191                 //            on it that's also on the CDRInputStream if
 192                 //            this method is called when the stream is 'marked'.
 193                 //            Thus, we'll compare the ByteBuffer passed
 194                 //            in (from a CDRInputStream) with all ByteBuffers
 195                 //            on the stack. If one is found to equal, it will
 196                 //            not be released to the ByteBufferPool.
 197                 if (bbwi != null)
 198                 {
 199                     inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
 200                 }
 201 
 202                 ByteBufferWithInfo abbwi = null;
 203                 ByteBufferPool byteBufferPool = getByteBufferPool();
 204                 while (fragmentQueue.size() != 0)
 205                 {
 206                     abbwi = fragmentQueue.dequeue();
 207                     if (abbwi != null && abbwi.byteBuffer != null)
 208                     {
 209                         int bbAddress = System.identityHashCode(abbwi.byteBuffer);
 210                         if (inputBbAddress != bbAddress)
 211                         {
 212                             if (debug)
 213                             {
 214                                  // print address of ByteBuffer released
 215                                  StringBuffer sb = new StringBuffer(80);
 216                                  sb.append("close() - fragmentQueue is ")
 217                                    .append("releasing ByteBuffer id (")
 218                                    .append(bbAddress).append(") to ")
 219                                    .append("ByteBufferPool.");
 220                                  String msg = sb.toString();
 221                                  dprint(msg);
 222                             }
 223                         }
 224                         byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
 225                     }
 226                 }
 227             }
 228             fragmentQueue = null;
 229         }
 230 
 231         // release ByteBuffers on fragmentStack
 232         if (fragmentStack != null && fragmentStack.size() != 0)
 233         {
 234             // IMPORTANT: The fragment stack may have one ByteBuffer
 235             //            on it that's also on the CDRInputStream if
 236             //            this method is called when the stream is 'marked'.
 237             //            Thus, we'll compare the ByteBuffer passed
 238             //            in (from a CDRInputStream) with all ByteBuffers
 239             //            on the stack. If one is found to equal, it will
 240             //            not be released to the ByteBufferPool.
 241             if (bbwi != null)
 242             {
 243                 inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
 244             }
 245 
 246             ByteBufferWithInfo abbwi = null;
 247             ByteBufferPool byteBufferPool = getByteBufferPool();
 248             ListIterator itr = fragmentStack.listIterator();
 249             while (itr.hasNext())
 250             {
 251                 abbwi = (ByteBufferWithInfo)itr.next();
 252 
 253                 if (abbwi != null && abbwi.byteBuffer != null)
 254                 {
 255                    int bbAddress = System.identityHashCode(abbwi.byteBuffer);
 256                    if (inputBbAddress != bbAddress)
 257                    {
 258                        if (debug)
 259                        {
 260                             // print address of ByteBuffer being released
 261                             StringBuffer sb = new StringBuffer(80);
 262                             sb.append("close() - fragmentStack - releasing ")
 263                               .append("ByteBuffer id (" + bbAddress + ") to ")
 264                               .append("ByteBufferPool.");
 265                             String msg = sb.toString();
 266                             dprint(msg);
 267                        }
 268                        byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
 269                    }
 270                 }
 271             }
 272             fragmentStack = null;
 273         }
 274 
 275     }
 276 
 277     protected ByteBufferPool getByteBufferPool()
 278     {
 279         return orb.getByteBufferPool();
 280     }
 281 
 282     private void dprint(String msg)
 283     {
 284         ORBUtility.dprint("BufferManagerReadStream", msg);
 285     }
 286 
 287     // Mark and reset handler ----------------------------------------
 288 
 289     private boolean markEngaged = false;
 290 
 291     // List of fragment ByteBufferWithInfos received since
 292     // the mark was engaged.
 293     private LinkedList fragmentStack = null;
 294     private RestorableInputStream inputStream = null;
 295 
 296     // Original state of the stream
 297     private Object streamMemento = null;
 298 
 299     public void mark(RestorableInputStream inputStream)
 300     {
 301         this.inputStream = inputStream;
 302         markEngaged = true;
 303 
 304         // Get the magic Object that the stream will use to
 305         // reconstruct it's state when reset is called
 306         streamMemento = inputStream.createStreamMemento();
 307 
 308         if (fragmentStack != null) {
 309             fragmentStack.clear();
 310         }
 311     }
 312 
 313     // Collects fragments received since the mark was engaged.
 314     public void fragmentationOccured(ByteBufferWithInfo newFragment)
 315     {
 316         if (!markEngaged)
 317             return;
 318 
 319         if (fragmentStack == null)
 320             fragmentStack = new LinkedList();
 321 
 322         fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
 323     }
 324 
 325     public void reset()
 326     {
 327         if (!markEngaged) {
 328             // REVISIT - call to reset without call to mark
 329             return;
 330         }
 331 
 332         markEngaged = false;
 333 
 334         // If we actually did peek across fragments, we need
 335         // to push those fragments onto the front of the
 336         // buffer queue.
 337         if (fragmentStack != null && fragmentStack.size() != 0) {
 338             ListIterator iter = fragmentStack.listIterator();
 339 
 340             synchronized(fragmentQueue) {
 341                 while (iter.hasNext()) {
 342                     fragmentQueue.push((ByteBufferWithInfo)iter.next());
 343                 }
 344             }
 345 
 346             fragmentStack.clear();
 347         }
 348 
 349         // Give the stream the magic Object to restore
 350         // it's state.
 351         inputStream.restoreInternalState(streamMemento);
 352     }
 353 
 354     public MarkAndResetHandler getMarkAndResetHandler() {
 355         return this;
 356     }
 357 }