1 /*
   2  * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http.internal.websocket;
  27 
  28 import java.util.concurrent.atomic.AtomicInteger;
  29 import java.util.function.Consumer;
  30 
  31 import static java.util.Objects.requireNonNull;
  32 
  33 /*
  34  * A synchronization aid that assists a number of parties in running a task
  35  * in a mutually exclusive fashion.
  36  *
  37  * To run the task, a party invokes `handle`. To permanently prevent the task
  38  * from subsequent runs, the party invokes `stop`.
  39  *
  40  * The parties do not have to operate in different threads.
  41  *
  42  * The task can be either synchronous or asynchronous.
  43  *
  44  * If the task is synchronous, it is represented with `Runnable`.
  45  * The handler invokes `Runnable.run` to run the task.
  46  *
  47  * If the task is asynchronous, it is represented with `Consumer<Runnable>`.
  48  * The handler invokes `Consumer.accept(end)` to begin the task. The task
  49  * invokes `end.run()` when it has ended.
  50  *
  51  * The next run of the task will not begin until the previous run has finished.
  52  *
  53  * The task may invoke `handle()` by itself, it's a normal situation.
  54  */
  55 public final class CooperativeHandler {
  56 
  57     /*
  58        Since the task is fixed and known beforehand, no blocking synchronization
  59        (locks, queues, etc.) is required. The job can be done solely using
  60        nonblocking primitives.
  61 
  62        The machinery below addresses two problems:
  63 
  64          1. Running the task in a sequential order (no concurrent runs):
  65 
  66                 begin, end, begin, end...
  67 
  68          2. Avoiding indefinite recursion:
  69 
  70                 begin
  71                   end
  72                     begin
  73                       end
  74                         ...
  75 
  76        Problem #1 is solved with a finite state machine with 4 states:
  77 
  78            BEGIN, AGAIN, END, and STOP.
  79 
  80        Problem #2 is solved with a "state modifier" OFFLOAD.
  81 
  82        Parties invoke `handle()` to signal the task must run. A party that has
  83        invoked `handle()` either begins the task or exploits the party that is
  84        either beginning the task or ending it.
  85 
  86        The party that is trying to end the task either ends it or begins it
  87        again.
  88 
  89        To avoid indefinite recursion, before re-running the task tryEnd() sets
  90        OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent")
  91        tryEnd() is available and the "child" must offload the task on to the
  92        "parent". Then a race begins. Whichever invocation of tryEnd() manages
  93        to unset OFFLOAD bit first does not do the work.
  94 
  95        There is at most 1 thread that is beginning the task and at most 2
  96        threads that are trying to end it: "parent" and "child". In case of a
  97        synchronous task "parent" and "child" are the same thread.
  98      */
  99 
 100     private static final int OFFLOAD =  1;
 101     private static final int AGAIN   =  2;
 102     private static final int BEGIN   =  4;
 103     private static final int STOP    =  8;
 104     private static final int END     = 16;
 105 
 106     private final AtomicInteger state = new AtomicInteger(END);
 107     private final Consumer<Runnable> begin;
 108 
 109     public CooperativeHandler(Runnable task) {
 110         this(asyncOf(task));
 111     }
 112 
 113     public CooperativeHandler(Consumer<Runnable> begin) {
 114         this.begin = requireNonNull(begin);
 115     }
 116 
 117     /*
 118      * Runs the task (though maybe by a different party).
 119      *
 120      * The recursion which is possible here will have the maximum depth of 1:
 121      *
 122      *     this.handle()
 123      *         begin.accept()
 124      *             this.handle()
 125      */
 126     public void handle() {
 127         while (true) {
 128             int s = state.get();
 129             if (s == END) {
 130                 if (state.compareAndSet(END, BEGIN)) {
 131                     break;
 132                 }
 133             } else if ((s & BEGIN) != 0) {
 134                 // Tries to change the state to AGAIN, preserving OFFLOAD bit
 135                 if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
 136                     return;
 137                 }
 138             } else if ((s & AGAIN) != 0 || s == STOP) {
 139                 return;
 140             } else {
 141                 throw new InternalError(String.valueOf(s));
 142             }
 143         }
 144         begin.accept(this::tryEnd);
 145     }
 146 
 147     private void tryEnd() {
 148         while (true) {
 149             int s;
 150             while (((s = state.get()) & OFFLOAD) != 0) {
 151                 // Tries to offload ending of the task to the parent
 152                 if (state.compareAndSet(s, s & ~OFFLOAD)) {
 153                     return;
 154                 }
 155             }
 156             while (true) {
 157                 if (s == BEGIN) {
 158                     if (state.compareAndSet(BEGIN, END)) {
 159                         return;
 160                     }
 161                 } else if (s == AGAIN) {
 162                     if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
 163                         break;
 164                     }
 165                 } else if (s == STOP) {
 166                     return;
 167                 } else {
 168                     throw new InternalError(String.valueOf(s));
 169                 }
 170                 s = state.get();
 171             }
 172             begin.accept(this::tryEnd);
 173         }
 174     }
 175 
 176     /*
 177      * Checks whether or not this handler has been permanently stopped.
 178      *
 179      * Should be used from inside the task to poll the status of the handler,
 180      * pretty much the same way as it is done for threads:
 181      *
 182      *     if (!Thread.currentThread().isInterrupted()) {
 183      *         ...
 184      *     }
 185      */
 186     public boolean isStopped() {
 187         return state.get() == STOP;
 188     }
 189 
 190     /*
 191      * Signals this handler to ignore subsequent invocations to `handle()`.
 192      *
 193      * If the task has already begun, this invocation will not affect it,
 194      * unless the task itself uses `isStopped()` method to check the state
 195      * of the handler.
 196      */
 197     public void stop() {
 198         state.set(STOP);
 199     }
 200 
 201     private static Consumer<Runnable> asyncOf(Runnable task) {
 202         requireNonNull(task);
 203         return ender -> {
 204             task.run();
 205             ender.run();
 206         };
 207     }
 208 }