1 /* 2 * Copyright (c) 2001, 2002, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /* 27 File: ConditionVariable.java 28 29 Originally written by Doug Lea and released into the public domain. 30 This may be used for any purposes whatsoever without acknowledgment. 31 Thanks for the assistance and support of Sun Microsystems Labs, 32 and everyone contributing, testing, and using this code. 33 34 History: 35 Date Who What 36 11Jun1998 dl Create public version 37 08dec2001 kmc Added support for Reentrant Mutexes 38 */ 39 40 package com.sun.corba.se.impl.orbutil.concurrent; 41 42 import com.sun.corba.se.impl.orbutil.ORBUtility ; 43 44 /** 45 * This class is designed for fans of POSIX pthreads programming. 46 * If you restrict yourself to Mutexes and CondVars, you can 47 * use most of your favorite constructions. Don't randomly mix them 48 * with synchronized methods or blocks though. 49 * <p> 50 * Method names and behavior are as close as is reasonable to 51 * those in POSIX. 52 * <p> 53 * <b>Sample Usage.</b> Here is a full version of a bounded buffer 54 * that implements the BoundedChannel interface, written in 55 * a style reminscent of that in POSIX programming books. 56 * <pre> 57 * class CVBuffer implements BoundedChannel { 58 * private final Mutex mutex; 59 * private final CondVar notFull; 60 * private final CondVar notEmpty; 61 * private int count = 0; 62 * private int takePtr = 0; 63 * private int putPtr = 0; 64 * private final Object[] array; 65 * 66 * public CVBuffer(int capacity) { 67 * array = new Object[capacity]; 68 * mutex = new Mutex(); 69 * notFull = new CondVar(mutex); 70 * notEmpty = new CondVar(mutex); 71 * } 72 * 73 * public int capacity() { return array.length; } 74 * 75 * public void put(Object x) throws InterruptedException { 76 * mutex.acquire(); 77 * try { 78 * while (count == array.length) { 79 * notFull.await(); 80 * } 81 * array[putPtr] = x; 82 * putPtr = (putPtr + 1) % array.length; 83 * ++count; 84 * notEmpty.signal(); 85 * } 86 * finally { 87 * mutex.release(); 88 * } 89 * } 90 * 91 * public Object take() throws InterruptedException { 92 * Object x = null; 93 * mutex.acquire(); 94 * try { 95 * while (count == 0) { 96 * notEmpty.await(); 97 * } 98 * x = array[takePtr]; 99 * array[takePtr] = null; 100 * takePtr = (takePtr + 1) % array.length; 101 * --count; 102 * notFull.signal(); 103 * } 104 * finally { 105 * mutex.release(); 106 * } 107 * return x; 108 * } 109 * 110 * public boolean offer(Object x, long msecs) throws InterruptedException { 111 * mutex.acquire(); 112 * try { 113 * if (count == array.length) { 114 * notFull.timedwait(msecs); 115 * if (count == array.length) 116 * return false; 117 * } 118 * array[putPtr] = x; 119 * putPtr = (putPtr + 1) % array.length; 120 * ++count; 121 * notEmpty.signal(); 122 * return true; 123 * } 124 * finally { 125 * mutex.release(); 126 * } 127 * } 128 * 129 * public Object poll(long msecs) throws InterruptedException { 130 * Object x = null; 131 * mutex.acquire(); 132 * try { 133 * if (count == 0) { 134 * notEmpty.timedwait(msecs); 135 * if (count == 0) 136 * return null; 137 * } 138 * x = array[takePtr]; 139 * array[takePtr] = null; 140 * takePtr = (takePtr + 1) % array.length; 141 * --count; 142 * notFull.signal(); 143 * } 144 * finally { 145 * mutex.release(); 146 * } 147 * return x; 148 * } 149 * } 150 * 151 * </pre> 152 * @see Mutex 153 * [<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] 154 **/ 155 156 public class CondVar { 157 158 protected boolean debug_ ; 159 160 /** The mutex **/ 161 protected final Sync mutex_; 162 protected final ReentrantMutex remutex_; 163 164 private int releaseMutex() 165 { 166 int count = 1 ; 167 168 if (remutex_!=null) 169 count = remutex_.releaseAll() ; 170 else 171 mutex_.release() ; 172 173 return count ; 174 } 175 176 private void acquireMutex( int count ) throws InterruptedException 177 { 178 if (remutex_!=null) 179 remutex_.acquireAll( count ) ; 180 else 181 mutex_.acquire() ; 182 } 183 184 /** 185 * Create a new CondVar that relies on the given mutual 186 * exclusion lock. 187 * @param mutex A mutual exclusion lock which must either be non-reentrant, 188 * or else be ReentrantMutex. 189 * Standard usage is to supply an instance of <code>Mutex</code>, 190 * but, for example, a Semaphore initialized to 1 also works. 191 * On the other hand, many other Sync implementations would not 192 * work here, so some care is required to supply a sensible 193 * synchronization object. 194 * In normal use, the mutex should be one that is used for <em>all</em> 195 * synchronization of the object using the CondVar. Generally, 196 * to prevent nested monitor lockouts, this 197 * object should not use any native Java synchronized blocks. 198 **/ 199 200 public CondVar(Sync mutex, boolean debug) { 201 debug_ = debug ; 202 mutex_ = mutex; 203 if (mutex instanceof ReentrantMutex) 204 remutex_ = (ReentrantMutex)mutex; 205 else 206 remutex_ = null; 207 } 208 209 public CondVar( Sync mutex ) { 210 this( mutex, false ) ; 211 } 212 213 /** 214 * Wait for notification. This operation at least momentarily 215 * releases the mutex. The mutex is always held upon return, 216 * even if interrupted. 217 * @exception InterruptedException if the thread was interrupted 218 * before or during the wait. However, if the thread is interrupted 219 * after the wait but during mutex re-acquisition, the interruption 220 * is ignored, while still ensuring 221 * that the currentThread's interruption state stays true, so can 222 * be probed by callers. 223 **/ 224 public void await() throws InterruptedException { 225 int count = 0 ; 226 if (Thread.interrupted()) 227 throw new InterruptedException(); 228 229 try { 230 if (debug_) 231 ORBUtility.dprintTrace( this, "await enter" ) ; 232 233 synchronized(this) { 234 count = releaseMutex() ; 235 try { 236 wait(); 237 } catch (InterruptedException ex) { 238 notify(); 239 throw ex; 240 } 241 } 242 } finally { 243 // Must ignore interrupt on re-acquire 244 boolean interrupted = false; 245 for (;;) { 246 try { 247 acquireMutex( count ); 248 break; 249 } catch (InterruptedException ex) { 250 interrupted = true; 251 } 252 } 253 254 if (interrupted) { 255 Thread.currentThread().interrupt(); 256 } 257 258 if (debug_) 259 ORBUtility.dprintTrace( this, "await exit" ) ; 260 } 261 } 262 263 /** 264 * Wait for at most msecs for notification. 265 * This operation at least momentarily 266 * releases the mutex. The mutex is always held upon return, 267 * even if interrupted. 268 * @param msecs The time to wait. A value less than or equal to zero 269 * causes a momentarily release 270 * and re-acquire of the mutex, and always returns false. 271 * @return false if at least msecs have elapsed 272 * upon resumption; else true. A 273 * false return does NOT necessarily imply that the thread was 274 * not notified. For example, it might have been notified 275 * after the time elapsed but just before resuming. 276 * @exception InterruptedException if the thread was interrupted 277 * before or during the wait. 278 **/ 279 280 public boolean timedwait(long msecs) throws InterruptedException { 281 282 if (Thread.interrupted()) 283 throw new InterruptedException(); 284 285 boolean success = false; 286 int count = 0; 287 288 try { 289 if (debug_) 290 ORBUtility.dprintTrace( this, "timedwait enter" ) ; 291 292 synchronized(this) { 293 count = releaseMutex() ; 294 try { 295 if (msecs > 0) { 296 long start = System.currentTimeMillis(); 297 wait(msecs); 298 success = System.currentTimeMillis() - start <= msecs; 299 } 300 } catch (InterruptedException ex) { 301 notify(); 302 throw ex; 303 } 304 } 305 } finally { 306 // Must ignore interrupt on re-acquire 307 boolean interrupted = false; 308 for (;;) { 309 try { 310 acquireMutex( count ) ; 311 break; 312 } catch (InterruptedException ex) { 313 interrupted = true; 314 } 315 } 316 317 if (interrupted) { 318 Thread.currentThread().interrupt(); 319 } 320 321 if (debug_) 322 ORBUtility.dprintTrace( this, "timedwait exit" ) ; 323 } 324 return success; 325 } 326 327 /** 328 * Notify a waiting thread. 329 * If one exists, a non-interrupted thread will return 330 * normally (i.e., not via InterruptedException) from await or timedwait. 331 **/ 332 public synchronized void signal() { 333 notify(); 334 } 335 336 /** Notify all waiting threads **/ 337 public synchronized void broadcast() { 338 notifyAll(); 339 } 340 }