1 /* 2 * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.util.concurrent.Phaser; 25 import java.util.concurrent.TimeUnit; 26 import java.util.concurrent.TimeoutException; 27 import java.util.concurrent.atomic.AtomicInteger; 28 import java.util.concurrent.locks.LockSupport; 29 30 /** 31 * ThreadStateController allows a thread to request this thread to transition 32 * to a specific thread state. The {@linkplain #transitionTo request} is 33 * a blocking call that the calling thread will wait until this thread is about 34 * going to the new state. Only one request of state transition at a time 35 * is supported (the Phaser expects only parties of 2 to arrive and advance 36 * to next phase). 37 */ 38 public class ThreadStateController extends Thread { 39 // used to achieve waiting states 40 private final Object lock; 41 public ThreadStateController(String name, Object lock) { 42 super(name); 43 this.lock = lock; 44 } 45 46 public void checkThreadState(Thread.State expected) { 47 // maximum number of retries when checking for thread state. 48 final int MAX_RETRY = 500; 49 50 // wait for the thread to transition to the expected state. 51 // There is a small window between the thread checking the state 52 // and the thread actual entering that state. 53 Thread.State state; 54 int retryCount=0; 55 while ((state = getState()) != expected && retryCount < MAX_RETRY) { 56 goSleep(10); 57 retryCount++; 58 } 59 60 if (state == null) { 61 throw new RuntimeException(getName() + " expected to have " + 62 expected + " but got null."); 63 } 64 65 if (state != expected) { 66 throw new RuntimeException(String.format("%s expected in %s state but got %s " + 67 "(iterations %d interrupted %d)%n", 68 getName(), expected, state, iterations.get(), interrupted.get())); 69 } 70 } 71 72 public void goSleep(long ms) { 73 try { 74 Thread.sleep(ms); 75 } catch (InterruptedException e) { 76 throw new RuntimeException(e); 77 } 78 } 79 80 // Phaser to sync between the main thread putting 81 // this thread into various states 82 private final Phaser phaser = new Phaser(2); 83 private volatile int newState = S_RUNNABLE; 84 private volatile int state = 0; 85 private boolean done = false; 86 87 private static final int S_RUNNABLE = 1; 88 private static final int S_BLOCKED = 2; 89 private static final int S_WAITING = 3; 90 private static final int S_TIMED_WAITING = 4; 91 private static final int S_PARKED = 5; 92 private static final int S_TIMED_PARKED = 6; 93 private static final int S_SLEEPING = 7; 94 private static final int S_TERMINATE = 8; 95 96 // for debugging 97 private AtomicInteger iterations = new AtomicInteger(); 98 private AtomicInteger interrupted = new AtomicInteger(); 99 public void run() { 100 // this thread has started 101 while (!done) { 102 // state transition 103 int nextState = state; 104 if (newState != state) { 105 nextState = newState; 106 iterations.set(0); 107 interrupted.set(0); 108 } 109 iterations.incrementAndGet(); 110 switch (nextState) { 111 case S_RUNNABLE: { 112 stateChange(nextState); 113 double sum = 0; 114 for (int i = 0; i < 1000; i++) { 115 double r = Math.random(); 116 double x = Math.pow(3, r); 117 sum += x - r; 118 } 119 break; 120 } 121 case S_BLOCKED: { 122 System.out.format("%d: %s is going to block (interations %d)%n", 123 getId(), getName(), iterations.get()); 124 stateChange(nextState); 125 // going to block on lock 126 synchronized (lock) { 127 System.out.format("%d: %s acquired the lock (interations %d)%n", 128 getId(), getName(), iterations.get()); 129 try { 130 // this thread has escaped the BLOCKED state 131 // release the lock and a short wait before continue 132 lock.wait(10); 133 } catch (InterruptedException e) { 134 // ignore 135 interrupted.incrementAndGet(); 136 } 137 } 138 break; 139 } 140 case S_WAITING: { 141 synchronized (lock) { 142 System.out.format("%d: %s is going to waiting (interations %d interrupted %d)%n", 143 getId(), getName(), iterations.get(), interrupted.get()); 144 try { 145 stateChange(nextState); 146 lock.wait(); 147 System.out.format("%d: %s wakes up from waiting (interations %d interrupted %d)%n", 148 getId(), getName(), iterations.get(), interrupted.get()); 149 } catch (InterruptedException e) { 150 // ignore 151 interrupted.incrementAndGet(); 152 } 153 } 154 break; 155 } 156 case S_TIMED_WAITING: { 157 synchronized (lock) { 158 System.out.format("%d: %s is going to timed waiting (interations %d interrupted %d)%n", 159 getId(), getName(), iterations.get(), interrupted.get()); 160 try { 161 stateChange(nextState); 162 lock.wait(10000); 163 System.out.format("%d: %s wakes up from timed waiting (interations %d interrupted %d)%n", 164 getId(), getName(), iterations.get(), interrupted.get()); 165 } catch (InterruptedException e) { 166 // ignore 167 interrupted.incrementAndGet(); 168 } 169 } 170 break; 171 } 172 case S_PARKED: { 173 System.out.format("%d: %s is going to park (interations %d)%n", 174 getId(), getName(), iterations.get()); 175 stateChange(nextState); 176 LockSupport.park(); 177 break; 178 } 179 case S_TIMED_PARKED: { 180 System.out.format("%d: %s is going to timed park (interations %d)%n", 181 getId(), getName(), iterations.get()); 182 long deadline = System.currentTimeMillis() + 10000*1000; 183 stateChange(nextState); 184 LockSupport.parkUntil(deadline); 185 break; 186 } 187 case S_SLEEPING: { 188 System.out.format("%d: %s is going to sleep (interations %d interrupted %d)%n", 189 getId(), getName(), iterations.get(), interrupted.get()); 190 try { 191 stateChange(nextState); 192 Thread.sleep(1000000); 193 } catch (InterruptedException e) { 194 // finish sleeping 195 interrupted.incrementAndGet(); 196 } 197 break; 198 } 199 case S_TERMINATE: { 200 done = true; 201 stateChange(nextState); 202 break; 203 } 204 default: 205 break; 206 } 207 } 208 } 209 210 /** 211 * Change the state if it matches newState. 212 */ 213 private void stateChange(int nextState) { 214 // no state change 215 if (state == nextState) 216 return; 217 218 // transition to the new state 219 if (newState == nextState) { 220 state = nextState; 221 phaser.arrive(); 222 System.out.format("%d: state change: %s %s%n", 223 getId(), toStateName(nextState), phaserToString(phaser)); 224 return; 225 } 226 227 // should never reach here 228 throw new RuntimeException("current " + state + " next " + nextState + 229 " new state " + newState); 230 } 231 232 /** 233 * Blocks until this thread transitions to the given state 234 */ 235 public void transitionTo(Thread.State tstate) throws InterruptedException { 236 switch (tstate) { 237 case RUNNABLE: 238 nextState(S_RUNNABLE); 239 break; 240 case BLOCKED: 241 nextState(S_BLOCKED); 242 break; 243 case WAITING: 244 nextState(S_WAITING); 245 break; 246 case TIMED_WAITING: 247 nextState(S_TIMED_WAITING); 248 break; 249 case TERMINATED: 250 nextState(S_TERMINATE); 251 break; 252 default: 253 break; 254 } 255 } 256 257 /** 258 * Blocks until this thread transitions to sleeping 259 */ 260 public void transitionToSleep() throws InterruptedException { 261 nextState(S_SLEEPING); 262 } 263 264 /** 265 * Blocks until this thread transitions to park or timed park 266 */ 267 public void transitionToPark(boolean timed) throws InterruptedException { 268 nextState(timed ? S_TIMED_PARKED : S_PARKED); 269 } 270 271 private void nextState(int s) throws InterruptedException { 272 final long id = Thread.currentThread().getId(); 273 System.out.format("%d: wait until the thread transitions to %s %s%n", 274 id, toStateName(s), phaserToString(phaser)); 275 this.newState = s; 276 int phase = phaser.arrive(); 277 System.out.format("%d: awaiting party arrive %s %s%n", 278 id, toStateName(s), phaserToString(phaser)); 279 for (;;) { 280 // when this thread has changed its state before it waits or parks 281 // on a lock, a potential race might happen if it misses the notify 282 // or unpark. Hence await for the phaser to advance with timeout 283 // to cope with this race condition. 284 switch (state) { 285 case S_WAITING: 286 case S_TIMED_WAITING: 287 synchronized (lock) { 288 lock.notify(); 289 } 290 break; 291 case S_PARKED: 292 case S_TIMED_PARKED: 293 LockSupport.unpark(this); 294 break; 295 case S_SLEEPING: 296 this.interrupt(); 297 break; 298 case S_BLOCKED: 299 default: 300 break; 301 } 302 try { 303 phaser.awaitAdvanceInterruptibly(phase, 100, TimeUnit.MILLISECONDS); 304 System.out.format("%d: arrived at %s %s%n", 305 id, toStateName(s), phaserToString(phaser)); 306 return; 307 } catch (TimeoutException ex) { 308 // this thread hasn't arrived at this phase 309 System.out.format("%d: Timeout: %s%n", id, phaser); 310 } 311 } 312 } 313 private String phaserToString(Phaser p) { 314 return "[phase = " + p.getPhase() + 315 " parties = " + p.getRegisteredParties() + 316 " arrived = " + p.getArrivedParties() + "]"; 317 } 318 private String toStateName(int state) { 319 switch (state) { 320 case S_RUNNABLE: 321 return "runnable"; 322 case S_WAITING: 323 return "waiting"; 324 case S_TIMED_WAITING: 325 return "timed waiting"; 326 case S_PARKED: 327 return "parked"; 328 case S_TIMED_PARKED: 329 return "timed parked"; 330 case S_SLEEPING: 331 return "sleeping"; 332 case S_BLOCKED: 333 return "blocked"; 334 case S_TERMINATE: 335 return "terminated"; 336 default: 337 return "unknown " + state; 338 } 339 } 340 }