1 /*
   2  * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http.internal.common;
  27 
  28 import java.util.concurrent.Executor;
  29 import java.util.concurrent.atomic.AtomicInteger;
  30 
  31 import static java.util.Objects.requireNonNull;
  32 
  33 /**
  34  * A scheduler of ( repeatable ) tasks that MUST be run sequentially.
  35  *
  36  * <p> This class can be used as a synchronization aid that assists a number of
  37  * parties in running a task in a mutually exclusive fashion.
  38  *
  39  * <p> To run the task, a party invokes {@code runOrSchedule}. To permanently
  40  * prevent the task from subsequent runs, the party invokes {@code stop}.
  41  *
  42  * <p> The parties can, but do not have to, operate in different threads.
  43  *
  44  * <p> The task can be either synchronous ( completes when its {@code run}
  45  * method returns ), or asynchronous ( completed when its
  46  * {@code DeferredCompleter} is explicitly completed ).
  47  *
  48  * <p> The next run of the task will not begin until the previous run has
  49  * finished.
  50  *
  51  * <p> The task may invoke {@code runOrSchedule} itself, which may be a normal
  52  * situation.
  53  */
  54 public final class SequentialScheduler {
  55 
  56     /*
  57        Since the task is fixed and known beforehand, no blocking synchronization
  58        (locks, queues, etc.) is required. The job can be done solely using
  59        nonblocking primitives.
  60 
  61        The machinery below addresses two problems:
  62 
  63          1. Running the task in a sequential order (no concurrent runs):
  64 
  65                 begin, end, begin, end...
  66 
  67          2. Avoiding indefinite recursion:
  68 
  69                 begin
  70                   end
  71                     begin
  72                       end
  73                         ...
  74 
  75        Problem #1 is solved with a finite state machine with 4 states:
  76 
  77            BEGIN, AGAIN, END, and STOP.
  78 
  79        Problem #2 is solved with a "state modifier" OFFLOAD.
  80 
  81        Parties invoke `runOrSchedule()` to signal the task must run. A party
  82        that has invoked `runOrSchedule()` either begins the task or exploits the
  83        party that is either beginning the task or ending it.
  84 
  85        The party that is trying to end the task either ends it or begins it
  86        again.
  87 
  88        To avoid indefinite recursion, before re-running the task the
  89        TryEndDeferredCompleter sets the OFFLOAD bit, signalling to its "child"
  90        TryEndDeferredCompleter that this ("parent") TryEndDeferredCompleter is
  91        available and the "child" must offload the task on to the "parent". Then
  92        a race begins. Whichever invocation of TryEndDeferredCompleter.complete
  93        manages to unset OFFLOAD bit first does not do the work.
  94 
  95        There is at most 1 thread that is beginning the task and at most 2
  96        threads that are trying to end it: "parent" and "child". In case of a
  97        synchronous task "parent" and "child" are the same thread.
  98      */
  99 
 100     /**
 101      * An interface to signal the completion of a {@link RestartableTask}.
 102      *
 103      * <p> The invocation of {@code complete} completes the task. The invocation
 104      * of {@code complete} may restart the task, if an attempt has previously
 105      * been made to run the task while it was already running.
 106      *
 107      * @apiNote {@code DeferredCompleter} is useful when a task is not necessary
 108      * complete when its {@code run} method returns, but will complete at a
 109      * later time, and maybe in different thread. This type exists for
 110      * readability purposes at use-sites only.
 111      */
 112     public static abstract class DeferredCompleter {
 113 
 114         /** Extensible from this (outer) class ONLY. */
 115         private DeferredCompleter() { }
 116 
 117         /** Completes the task. Must be called once, and once only. */
 118         public abstract void complete();
 119     }
 120 
 121     /**
 122      * A restartable task.
 123      */
 124     @FunctionalInterface
 125     public interface RestartableTask {
 126 
 127         /**
 128          * The body of the task.
 129          *
 130          * @param taskCompleter
 131          *         A completer that must be invoked once, and only once,
 132          *         when this task is logically finished
 133          */
 134         void run(DeferredCompleter taskCompleter);
 135     }
 136 
 137     /**
 138      * A complete restartable task is one which is simple and self-contained.
 139      * It completes once its {@code run} method returns.
 140      */
 141     public static abstract class CompleteRestartableTask
 142         implements RestartableTask
 143     {
 144         @Override
 145         public final void run(DeferredCompleter taskCompleter) {
 146             try {
 147                 run();
 148             } finally {
 149                 taskCompleter.complete();
 150             }
 151         }
 152 
 153         /** The body of the task. */
 154         protected abstract void run();
 155     }
 156 
 157     /**
 158      * A RestartableTask that runs its main loop within a
 159      * synchronized block to place a memory barrier around it.
 160      * Because the main loop can't run concurrently in two treads,
 161      * then the lock shouldn't be contended and no deadlock should
 162      * ever be possible.
 163      */
 164     public static final class SynchronizedRestartableTask
 165             extends CompleteRestartableTask {
 166         private final Runnable mainLoop;
 167         private final Object lock = new Object();
 168         public SynchronizedRestartableTask(Runnable mainLoop) {
 169             this.mainLoop = mainLoop;
 170         }
 171 
 172         @Override
 173         protected void run() {
 174             synchronized(lock) {
 175                 mainLoop.run();
 176             }
 177         }
 178     }
 179 
 180     private static final int OFFLOAD =  1;
 181     private static final int AGAIN   =  2;
 182     private static final int BEGIN   =  4;
 183     private static final int STOP    =  8;
 184     private static final int END     = 16;
 185 
 186     private final AtomicInteger state = new AtomicInteger(END);
 187     private final RestartableTask restartableTask;
 188     private final DeferredCompleter completer;
 189     private final SchedulableTask schedulableTask;
 190 
 191     /**
 192      * A simple task that can be pushed on an executor to execute
 193      * {@code restartableTask.run(completer)}.
 194      */
 195     private final class SchedulableTask implements Runnable {
 196         @Override
 197         public void run() {
 198             restartableTask.run(completer);
 199         }
 200     }
 201 
 202     public SequentialScheduler(RestartableTask restartableTask) {
 203         this.restartableTask = requireNonNull(restartableTask);
 204         this.completer = new TryEndDeferredCompleter();
 205         this.schedulableTask = new SchedulableTask();
 206     }
 207 
 208     /**
 209      * Runs or schedules the task to be run.
 210      *
 211      * @implSpec The recursion which is possible here must be bounded:
 212      *
 213      *  <pre>{@code
 214      *     this.runOrSchedule()
 215      *         completer.complete()
 216      *             this.runOrSchedule()
 217      *                 ...
 218      * }</pre>
 219      *
 220      * @implNote The recursion in this implementation has the maximum
 221      * depth of 1.
 222      */
 223     public void runOrSchedule() {
 224         runOrSchedule(schedulableTask, null);
 225     }
 226 
 227     /**
 228      * Runs or schedules the task to be run in the provided executor.
 229      *
 230      * <p> This method can be used when potential executing from a calling
 231      * thread is not desirable.
 232      *
 233      * @param executor
 234      *         An executor in which to execute the task, if the task needs
 235      *         to be executed.
 236      *
 237      * @apiNote The given executor can be {@code null} in which case calling
 238      * {@code deferOrSchedule(null)} is strictly equivalent to calling
 239      * {@code runOrSchedule()}.
 240      */
 241     public void deferOrSchedule(Executor executor) { // TODO: why this name? why not runOrSchedule?
 242         runOrSchedule(schedulableTask, executor);
 243     }
 244 
 245     private void runOrSchedule(SchedulableTask task, Executor executor) {
 246         while (true) {
 247             int s = state.get();
 248             if (s == END) {
 249                 if (state.compareAndSet(END, BEGIN)) {
 250                     break;
 251                 }
 252             } else if ((s & BEGIN) != 0) {
 253                 // Tries to change the state to AGAIN, preserving OFFLOAD bit
 254                 if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
 255                     return;
 256                 }
 257             } else if ((s & AGAIN) != 0 || s == STOP) {
 258                 /* In the case of AGAIN the scheduler does not provide
 259                    happens-before relationship between actions prior to
 260                    runOrSchedule() and actions that happen in task.run().
 261                    The reason is that no volatile write is done in this case,
 262                    and the call piggybacks on the call that has actually set
 263                    AGAIN state. */
 264                 return;
 265             } else {
 266                 // Non-existent state, or the one that cannot be offloaded
 267                 throw new InternalError(String.valueOf(s));
 268             }
 269         }
 270         if (executor == null) {
 271             task.run();
 272         } else {
 273             executor.execute(task);
 274         }
 275     }
 276 
 277     /** The only concrete {@code DeferredCompleter} implementation. */
 278     private class TryEndDeferredCompleter extends DeferredCompleter {
 279 
 280         @Override
 281         public void complete() {
 282             while (true) {
 283                 int s;
 284                 while (((s = state.get()) & OFFLOAD) != 0) {
 285                     // Tries to offload ending of the task to the parent
 286                     if (state.compareAndSet(s, s & ~OFFLOAD)) {
 287                         return;
 288                     }
 289                 }
 290                 while (true) {
 291                     if ((s & OFFLOAD) != 0) {
 292                         /* OFFLOAD bit can never be observed here. Otherwise
 293                            it would mean there is another invocation of
 294                            "complete" that can run the task. */
 295                         throw new InternalError(String.valueOf(s));
 296                     }
 297                     if (s == BEGIN) {
 298                         if (state.compareAndSet(BEGIN, END)) {
 299                             return;
 300                         }
 301                     } else if (s == AGAIN) {
 302                         if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
 303                             break;
 304                         }
 305                     } else if (s == STOP) {
 306                         return;
 307                     } else if (s == END) {
 308                         throw new IllegalStateException("Duplicate completion");
 309                     } else {
 310                         // Non-existent state
 311                         throw new InternalError(String.valueOf(s));
 312                     }
 313                     s = state.get();
 314                 }
 315                 restartableTask.run(completer);
 316             }
 317         }
 318     }
 319 
 320     /**
 321      * Tells whether, or not, this scheduler has been permanently stopped.
 322      *
 323      * <p> Should be used from inside the task to poll the status of the
 324      * scheduler, pretty much the same way as it is done for threads:
 325      * <pre>{@code
 326      *     if (!Thread.currentThread().isInterrupted()) {
 327      *         ...
 328      *     }
 329      * }</pre>
 330      */
 331     public boolean isStopped() {
 332         return state.get() == STOP;
 333     }
 334 
 335     /**
 336      * Stops this scheduler.  Subsequent invocations of {@code runOrSchedule}
 337      * are effectively no-ops.
 338      *
 339      * <p> If the task has already begun, this invocation will not affect it,
 340      * unless the task itself uses {@code isStopped()} method to check the state
 341      * of the handler.
 342      */
 343     public void stop() {
 344         state.set(STOP);
 345     }
 346 
 347     /**
 348      * Returns a new {@code SequentialScheduler} that executes the provided
 349      * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
 350      *
 351      * @apiNote
 352      * This is equivalent to calling
 353      * {@code new SequentialScheduler(new SynchronizedRestartableTask(mainloop));}
 354      * The main loop must not do any blocking operation.
 355      *
 356      * @param mainloop The main loop of the new sequential scheduler.
 357      * @return a new {@code SequentialScheduler} that executes the provided
 358      * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
 359      */
 360     public static SequentialScheduler synchronizedScheduler(Runnable mainloop) {
 361         return new SequentialScheduler(new SynchronizedRestartableTask(mainloop));
 362     }
 363 
 364 }