1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.common; 27 28 import java.util.concurrent.Executor; 29 import java.util.concurrent.atomic.AtomicInteger; 30 31 import static java.util.Objects.requireNonNull; 32 33 /** 34 * A scheduler of ( repeatable ) tasks that MUST be run sequentially. 35 * 36 * <p> This class can be used as a synchronization aid that assists a number of 37 * parties in running a task in a mutually exclusive fashion. 38 * 39 * <p> To run the task, a party invokes {@code runOrSchedule}. To permanently 40 * prevent the task from subsequent runs, the party invokes {@code stop}. 41 * 42 * <p> The parties can, but do not have to, operate in different threads. 43 * 44 * <p> The task can be either synchronous ( completes when its {@code run} 45 * method returns ), or asynchronous ( completed when its 46 * {@code DeferredCompleter} is explicitly completed ). 47 * 48 * <p> The next run of the task will not begin until the previous run has 49 * finished. 50 * 51 * <p> The task may invoke {@code runOrSchedule} itself, which may be a normal 52 * situation. 53 */ 54 public final class SequentialScheduler { 55 56 /* 57 Since the task is fixed and known beforehand, no blocking synchronization 58 (locks, queues, etc.) is required. The job can be done solely using 59 nonblocking primitives. 60 61 The machinery below addresses two problems: 62 63 1. Running the task in a sequential order (no concurrent runs): 64 65 begin, end, begin, end... 66 67 2. Avoiding indefinite recursion: 68 69 begin 70 end 71 begin 72 end 73 ... 74 75 Problem #1 is solved with a finite state machine with 4 states: 76 77 BEGIN, AGAIN, END, and STOP. 78 79 Problem #2 is solved with a "state modifier" OFFLOAD. 80 81 Parties invoke `runOrSchedule()` to signal the task must run. A party 82 that has invoked `runOrSchedule()` either begins the task or exploits the 83 party that is either beginning the task or ending it. 84 85 The party that is trying to end the task either ends it or begins it 86 again. 87 88 To avoid indefinite recursion, before re-running the task the 89 TryEndDeferredCompleter sets the OFFLOAD bit, signalling to its "child" 90 TryEndDeferredCompleter that this ("parent") TryEndDeferredCompleter is 91 available and the "child" must offload the task on to the "parent". Then 92 a race begins. Whichever invocation of TryEndDeferredCompleter.complete 93 manages to unset OFFLOAD bit first does not do the work. 94 95 There is at most 1 thread that is beginning the task and at most 2 96 threads that are trying to end it: "parent" and "child". In case of a 97 synchronous task "parent" and "child" are the same thread. 98 */ 99 100 /** 101 * An interface to signal the completion of a {@link RestartableTask}. 102 * 103 * <p> The invocation of {@code complete} completes the task. The invocation 104 * of {@code complete} may restart the task, if an attempt has previously 105 * been made to run the task while it was already running. 106 * 107 * @apiNote {@code DeferredCompleter} is useful when a task is not necessary 108 * complete when its {@code run} method returns, but will complete at a 109 * later time, and maybe in different thread. This type exists for 110 * readability purposes at use-sites only. 111 */ 112 public static abstract class DeferredCompleter { 113 114 /** Extensible from this (outer) class ONLY. */ 115 private DeferredCompleter() { } 116 117 /** Completes the task. Must be called once, and once only. */ 118 public abstract void complete(); 119 } 120 121 /** 122 * A restartable task. 123 */ 124 @FunctionalInterface 125 public interface RestartableTask { 126 127 /** 128 * The body of the task. 129 * 130 * @param taskCompleter 131 * A completer that must be invoked once, and only once, 132 * when this task is logically finished 133 */ 134 void run(DeferredCompleter taskCompleter); 135 } 136 137 /** 138 * A complete restartable task is one which is simple and self-contained. 139 * It completes once its {@code run} method returns. 140 */ 141 public static abstract class CompleteRestartableTask 142 implements RestartableTask 143 { 144 @Override 145 public final void run(DeferredCompleter taskCompleter) { 146 try { 147 run(); 148 } finally { 149 taskCompleter.complete(); 150 } 151 } 152 153 /** The body of the task. */ 154 protected abstract void run(); 155 } 156 157 /** 158 * A RestartableTask that runs its main loop within a 159 * synchronized block to place a memory barrier around it. 160 * Because the main loop can't run concurrently in two treads, 161 * then the lock shouldn't be contended and no deadlock should 162 * ever be possible. 163 */ 164 public static final class SynchronizedRestartableTask 165 extends CompleteRestartableTask { 166 private final Runnable mainLoop; 167 private final Object lock = new Object(); 168 public SynchronizedRestartableTask(Runnable mainLoop) { 169 this.mainLoop = mainLoop; 170 } 171 172 @Override 173 protected void run() { 174 synchronized(lock) { 175 mainLoop.run(); 176 } 177 } 178 } 179 180 private static final int OFFLOAD = 1; 181 private static final int AGAIN = 2; 182 private static final int BEGIN = 4; 183 private static final int STOP = 8; 184 private static final int END = 16; 185 186 private final AtomicInteger state = new AtomicInteger(END); 187 private final RestartableTask restartableTask; 188 private final DeferredCompleter completer; 189 private final SchedulableTask schedulableTask; 190 191 /** 192 * A simple task that can be pushed on an executor to execute 193 * {@code restartableTask.run(completer)}. 194 */ 195 private final class SchedulableTask implements Runnable { 196 @Override 197 public void run() { 198 restartableTask.run(completer); 199 } 200 } 201 202 public SequentialScheduler(RestartableTask restartableTask) { 203 this.restartableTask = requireNonNull(restartableTask); 204 this.completer = new TryEndDeferredCompleter(); 205 this.schedulableTask = new SchedulableTask(); 206 } 207 208 /** 209 * Runs or schedules the task to be run. 210 * 211 * @implSpec The recursion which is possible here must be bounded: 212 * 213 * <pre>{@code 214 * this.runOrSchedule() 215 * completer.complete() 216 * this.runOrSchedule() 217 * ... 218 * }</pre> 219 * 220 * @implNote The recursion in this implementation has the maximum 221 * depth of 1. 222 */ 223 public void runOrSchedule() { 224 runOrSchedule(schedulableTask, null); 225 } 226 227 /** 228 * Runs or schedules the task to be run in the provided executor. 229 * 230 * <p> This method can be used when potential executing from a calling 231 * thread is not desirable. 232 * 233 * @param executor 234 * An executor in which to execute the task, if the task needs 235 * to be executed. 236 * 237 * @apiNote The given executor can be {@code null} in which case calling 238 * {@code deferOrSchedule(null)} is strictly equivalent to calling 239 * {@code runOrSchedule()}. 240 */ 241 public void deferOrSchedule(Executor executor) { // TODO: why this name? why not runOrSchedule? 242 runOrSchedule(schedulableTask, executor); 243 } 244 245 private void runOrSchedule(SchedulableTask task, Executor executor) { 246 while (true) { 247 int s = state.get(); 248 if (s == END) { 249 if (state.compareAndSet(END, BEGIN)) { 250 break; 251 } 252 } else if ((s & BEGIN) != 0) { 253 // Tries to change the state to AGAIN, preserving OFFLOAD bit 254 if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) { 255 return; 256 } 257 } else if ((s & AGAIN) != 0 || s == STOP) { 258 /* In the case of AGAIN the scheduler does not provide 259 happens-before relationship between actions prior to 260 runOrSchedule() and actions that happen in task.run(). 261 The reason is that no volatile write is done in this case, 262 and the call piggybacks on the call that has actually set 263 AGAIN state. */ 264 return; 265 } else { 266 // Non-existent state, or the one that cannot be offloaded 267 throw new InternalError(String.valueOf(s)); 268 } 269 } 270 if (executor == null) { 271 task.run(); 272 } else { 273 executor.execute(task); 274 } 275 } 276 277 /** The only concrete {@code DeferredCompleter} implementation. */ 278 private class TryEndDeferredCompleter extends DeferredCompleter { 279 280 @Override 281 public void complete() { 282 while (true) { 283 int s; 284 while (((s = state.get()) & OFFLOAD) != 0) { 285 // Tries to offload ending of the task to the parent 286 if (state.compareAndSet(s, s & ~OFFLOAD)) { 287 return; 288 } 289 } 290 while (true) { 291 if ((s & OFFLOAD) != 0) { 292 /* OFFLOAD bit can never be observed here. Otherwise 293 it would mean there is another invocation of 294 "complete" that can run the task. */ 295 throw new InternalError(String.valueOf(s)); 296 } 297 if (s == BEGIN) { 298 if (state.compareAndSet(BEGIN, END)) { 299 return; 300 } 301 } else if (s == AGAIN) { 302 if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) { 303 break; 304 } 305 } else if (s == STOP) { 306 return; 307 } else if (s == END) { 308 throw new IllegalStateException("Duplicate completion"); 309 } else { 310 // Non-existent state 311 throw new InternalError(String.valueOf(s)); 312 } 313 s = state.get(); 314 } 315 restartableTask.run(completer); 316 } 317 } 318 } 319 320 /** 321 * Tells whether, or not, this scheduler has been permanently stopped. 322 * 323 * <p> Should be used from inside the task to poll the status of the 324 * scheduler, pretty much the same way as it is done for threads: 325 * <pre>{@code 326 * if (!Thread.currentThread().isInterrupted()) { 327 * ... 328 * } 329 * }</pre> 330 */ 331 public boolean isStopped() { 332 return state.get() == STOP; 333 } 334 335 /** 336 * Stops this scheduler. Subsequent invocations of {@code runOrSchedule} 337 * are effectively no-ops. 338 * 339 * <p> If the task has already begun, this invocation will not affect it, 340 * unless the task itself uses {@code isStopped()} method to check the state 341 * of the handler. 342 */ 343 public void stop() { 344 state.set(STOP); 345 } 346 347 /** 348 * Returns a new {@code SequentialScheduler} that executes the provided 349 * {@code mainLoop} from within a {@link SynchronizedRestartableTask}. 350 * 351 * @apiNote 352 * This is equivalent to calling 353 * {@code new SequentialScheduler(new SynchronizedRestartableTask(mainloop));} 354 * The main loop must not do any blocking operation. 355 * 356 * @param mainloop The main loop of the new sequential scheduler. 357 * @return a new {@code SequentialScheduler} that executes the provided 358 * {@code mainLoop} from within a {@link SynchronizedRestartableTask}. 359 */ 360 public static SequentialScheduler synchronizedScheduler(Runnable mainloop) { 361 return new SequentialScheduler(new SynchronizedRestartableTask(mainloop)); 362 } 363 364 }