1 /* 2 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package com.sun.corba.se.impl.encoding; 26 27 import java.nio.ByteBuffer; 28 import com.sun.corba.se.pept.transport.ByteBufferPool; 29 import com.sun.corba.se.spi.logging.CORBALogDomains; 30 import com.sun.corba.se.spi.orb.ORB; 31 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 32 import com.sun.corba.se.impl.orbutil.ORBUtility; 33 import com.sun.corba.se.impl.protocol.RequestCanceledException; 34 import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage; 35 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 36 import java.util.*; 37 38 public class BufferManagerReadStream 39 implements BufferManagerRead, MarkAndResetHandler 40 { 41 private boolean receivedCancel = false; 42 private int cancelReqId = 0; 43 44 // We should convert endOfStream to a final static dummy end node 45 private boolean endOfStream = true; 46 private BufferQueue fragmentQueue = new BufferQueue(); 47 private long FRAGMENT_TIMEOUT = 60000; 48 49 // REVISIT - This should go in BufferManagerRead. But, since 50 // BufferManagerRead is an interface. BufferManagerRead 51 // might ought to be an abstract class instead of an 52 // interface. 53 private ORB orb ; 54 private ORBUtilSystemException wrapper ; 55 private boolean debug = false; 56 57 BufferManagerReadStream( ORB orb ) 58 { 59 this.orb = orb ; 60 this.wrapper = ORBUtilSystemException.get( orb, 61 CORBALogDomains.RPC_ENCODING ) ; 62 debug = orb.transportDebugFlag; 63 } 64 65 public void cancelProcessing(int requestId) { 66 synchronized(fragmentQueue) { 67 receivedCancel = true; 68 cancelReqId = requestId; 69 fragmentQueue.notify(); 70 } 71 } 72 73 public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg) 74 { 75 ByteBufferWithInfo bbwi = 76 new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength()); 77 78 synchronized (fragmentQueue) { 79 if (debug) 80 { 81 // print address of ByteBuffer being queued 82 int bbAddress = System.identityHashCode(byteBuffer); 83 StringBuffer sb = new StringBuffer(80); 84 sb.append("processFragment() - queueing ByteBuffer id ("); 85 sb.append(bbAddress).append(") to fragment queue."); 86 String strMsg = sb.toString(); 87 dprint(strMsg); 88 } 89 fragmentQueue.enqueue(bbwi); 90 endOfStream = !msg.moreFragmentsToFollow(); 91 fragmentQueue.notify(); 92 } 93 } 94 95 public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi) 96 { 97 98 ByteBufferWithInfo result = null; 99 100 try { 101 //System.out.println("ENTER underflow"); 102 103 synchronized (fragmentQueue) { 104 105 if (receivedCancel) { 106 throw new RequestCanceledException(cancelReqId); 107 } 108 109 while (fragmentQueue.size() == 0) { 110 111 if (endOfStream) { 112 throw wrapper.endOfStream() ; 113 } 114 115 boolean interrupted = false; 116 try { 117 fragmentQueue.wait(FRAGMENT_TIMEOUT); 118 } catch (InterruptedException e) { 119 interrupted = true; 120 } 121 122 if (!interrupted && fragmentQueue.size() == 0) { 123 throw wrapper.bufferReadManagerTimeout(); 124 } 125 126 if (receivedCancel) { 127 throw new RequestCanceledException(cancelReqId); 128 } 129 } 130 131 result = fragmentQueue.dequeue(); 132 result.fragmented = true; 133 134 if (debug) 135 { 136 // print address of ByteBuffer being dequeued 137 int bbAddr = System.identityHashCode(result.byteBuffer); 138 StringBuffer sb1 = new StringBuffer(80); 139 sb1.append("underflow() - dequeued ByteBuffer id ("); 140 sb1.append(bbAddr).append(") from fragment queue."); 141 String msg1 = sb1.toString(); 142 dprint(msg1); 143 } 144 145 // VERY IMPORTANT 146 // Release bbwi.byteBuffer to the ByteBufferPool only if 147 // this BufferManagerStream is not marked for potential restore. 148 if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null) 149 { 150 ByteBufferPool byteBufferPool = getByteBufferPool(); 151 152 if (debug) 153 { 154 // print address of ByteBuffer being released 155 int bbAddress = System.identityHashCode(bbwi.byteBuffer); 156 StringBuffer sb = new StringBuffer(80); 157 sb.append("underflow() - releasing ByteBuffer id ("); 158 sb.append(bbAddress).append(") to ByteBufferPool."); 159 String msg = sb.toString(); 160 dprint(msg); 161 } 162 163 byteBufferPool.releaseByteBuffer(bbwi.byteBuffer); 164 bbwi.byteBuffer = null; 165 bbwi = null; 166 } 167 } 168 return result; 169 } finally { 170 //System.out.println("EXIT underflow"); 171 } 172 } 173 174 public void init(Message msg) { 175 if (msg != null) 176 endOfStream = !msg.moreFragmentsToFollow(); 177 } 178 179 // Release any queued ByteBufferWithInfo's byteBuffers to the 180 // ByteBufferPoool 181 public void close(ByteBufferWithInfo bbwi) 182 { 183 int inputBbAddress = 0; 184 185 // release ByteBuffers on fragmentQueue 186 if (fragmentQueue != null) 187 { 188 synchronized (fragmentQueue) 189 { 190 // IMPORTANT: The fragment queue may have one ByteBuffer 191 // on it that's also on the CDRInputStream if 192 // this method is called when the stream is 'marked'. 193 // Thus, we'll compare the ByteBuffer passed 194 // in (from a CDRInputStream) with all ByteBuffers 195 // on the stack. If one is found to equal, it will 196 // not be released to the ByteBufferPool. 197 if (bbwi != null) 198 { 199 inputBbAddress = System.identityHashCode(bbwi.byteBuffer); 200 } 201 202 ByteBufferWithInfo abbwi = null; 203 ByteBufferPool byteBufferPool = getByteBufferPool(); 204 while (fragmentQueue.size() != 0) 205 { 206 abbwi = fragmentQueue.dequeue(); 207 if (abbwi != null && abbwi.byteBuffer != null) 208 { 209 int bbAddress = System.identityHashCode(abbwi.byteBuffer); 210 if (inputBbAddress != bbAddress) 211 { 212 if (debug) 213 { 214 // print address of ByteBuffer released 215 StringBuffer sb = new StringBuffer(80); 216 sb.append("close() - fragmentQueue is ") 217 .append("releasing ByteBuffer id (") 218 .append(bbAddress).append(") to ") 219 .append("ByteBufferPool."); 220 String msg = sb.toString(); 221 dprint(msg); 222 } 223 } 224 byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); 225 } 226 } 227 } 228 fragmentQueue = null; 229 } 230 231 // release ByteBuffers on fragmentStack 232 if (fragmentStack != null && fragmentStack.size() != 0) 233 { 234 // IMPORTANT: The fragment stack may have one ByteBuffer 235 // on it that's also on the CDRInputStream if 236 // this method is called when the stream is 'marked'. 237 // Thus, we'll compare the ByteBuffer passed 238 // in (from a CDRInputStream) with all ByteBuffers 239 // on the stack. If one is found to equal, it will 240 // not be released to the ByteBufferPool. 241 if (bbwi != null) 242 { 243 inputBbAddress = System.identityHashCode(bbwi.byteBuffer); 244 } 245 246 ByteBufferWithInfo abbwi = null; 247 ByteBufferPool byteBufferPool = getByteBufferPool(); 248 ListIterator itr = fragmentStack.listIterator(); 249 while (itr.hasNext()) 250 { 251 abbwi = (ByteBufferWithInfo)itr.next(); 252 253 if (abbwi != null && abbwi.byteBuffer != null) 254 { 255 int bbAddress = System.identityHashCode(abbwi.byteBuffer); 256 if (inputBbAddress != bbAddress) 257 { 258 if (debug) 259 { 260 // print address of ByteBuffer being released 261 StringBuffer sb = new StringBuffer(80); 262 sb.append("close() - fragmentStack - releasing ") 263 .append("ByteBuffer id (" + bbAddress + ") to ") 264 .append("ByteBufferPool."); 265 String msg = sb.toString(); 266 dprint(msg); 267 } 268 byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); 269 } 270 } 271 } 272 fragmentStack = null; 273 } 274 275 } 276 277 protected ByteBufferPool getByteBufferPool() 278 { 279 return orb.getByteBufferPool(); 280 } 281 282 private void dprint(String msg) 283 { 284 ORBUtility.dprint("BufferManagerReadStream", msg); 285 } 286 287 // Mark and reset handler ---------------------------------------- 288 289 private boolean markEngaged = false; 290 291 // List of fragment ByteBufferWithInfos received since 292 // the mark was engaged. 293 private LinkedList fragmentStack = null; 294 private RestorableInputStream inputStream = null; 295 296 // Original state of the stream 297 private Object streamMemento = null; 298 299 public void mark(RestorableInputStream inputStream) 300 { 301 this.inputStream = inputStream; 302 markEngaged = true; 303 304 // Get the magic Object that the stream will use to 305 // reconstruct it's state when reset is called 306 streamMemento = inputStream.createStreamMemento(); 307 308 if (fragmentStack != null) { 309 fragmentStack.clear(); 310 } 311 } 312 313 // Collects fragments received since the mark was engaged. 314 public void fragmentationOccured(ByteBufferWithInfo newFragment) 315 { 316 if (!markEngaged) 317 return; 318 319 if (fragmentStack == null) 320 fragmentStack = new LinkedList(); 321 322 fragmentStack.addFirst(new ByteBufferWithInfo(newFragment)); 323 } 324 325 public void reset() 326 { 327 if (!markEngaged) { 328 // REVISIT - call to reset without call to mark 329 return; 330 } 331 332 markEngaged = false; 333 334 // If we actually did peek across fragments, we need 335 // to push those fragments onto the front of the 336 // buffer queue. 337 if (fragmentStack != null && fragmentStack.size() != 0) { 338 ListIterator iter = fragmentStack.listIterator(); 339 340 synchronized(fragmentQueue) { 341 while (iter.hasNext()) { 342 fragmentQueue.push((ByteBufferWithInfo)iter.next()); 343 } 344 } 345 346 fragmentStack.clear(); 347 } 348 349 // Give the stream the magic Object to restore 350 // it's state. 351 inputStream.restoreInternalState(streamMemento); 352 } 353 354 public MarkAndResetHandler getMarkAndResetHandler() { 355 return this; 356 } 357 }