1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.websocket; 27 28 import java.util.concurrent.atomic.AtomicInteger; 29 import java.util.function.Consumer; 30 31 import static java.util.Objects.requireNonNull; 32 33 /* 34 * A synchronization aid that assists a number of parties in running a task 35 * in a mutually exclusive fashion. 36 * 37 * To run the task, a party invokes `handle`. To permanently prevent the task 38 * from subsequent runs, the party invokes `stop`. 39 * 40 * The parties do not have to operate in different threads. 41 * 42 * The task can be either synchronous or asynchronous. 43 * 44 * If the task is synchronous, it is represented with `Runnable`. 45 * The handler invokes `Runnable.run` to run the task. 46 * 47 * If the task is asynchronous, it is represented with `Consumer<Runnable>`. 48 * The handler invokes `Consumer.accept(end)` to begin the task. The task 49 * invokes `end.run()` when it has ended. 50 * 51 * The next run of the task will not begin until the previous run has finished. 52 * 53 * The task may invoke `handle()` by itself, it's a normal situation. 54 */ 55 public final class CooperativeHandler { 56 57 /* 58 Since the task is fixed and known beforehand, no blocking synchronization 59 (locks, queues, etc.) is required. The job can be done solely using 60 nonblocking primitives. 61 62 The machinery below addresses two problems: 63 64 1. Running the task in a sequential order (no concurrent runs): 65 66 begin, end, begin, end... 67 68 2. Avoiding indefinite recursion: 69 70 begin 71 end 72 begin 73 end 74 ... 75 76 Problem #1 is solved with a finite state machine with 4 states: 77 78 BEGIN, AGAIN, END, and STOP. 79 80 Problem #2 is solved with a "state modifier" OFFLOAD. 81 82 Parties invoke `handle()` to signal the task must run. A party that has 83 invoked `handle()` either begins the task or exploits the party that is 84 either beginning the task or ending it. 85 86 The party that is trying to end the task either ends it or begins it 87 again. 88 89 To avoid indefinite recursion, before re-running the task tryEnd() sets 90 OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent") 91 tryEnd() is available and the "child" must offload the task on to the 92 "parent". Then a race begins. Whichever invocation of tryEnd() manages 93 to unset OFFLOAD bit first does not do the work. 94 95 There is at most 1 thread that is beginning the task and at most 2 96 threads that are trying to end it: "parent" and "child". In case of a 97 synchronous task "parent" and "child" are the same thread. 98 */ 99 100 private static final int OFFLOAD = 1; 101 private static final int AGAIN = 2; 102 private static final int BEGIN = 4; 103 private static final int STOP = 8; 104 private static final int END = 16; 105 106 private final AtomicInteger state = new AtomicInteger(END); 107 private final Consumer<Runnable> begin; 108 109 public CooperativeHandler(Runnable task) { 110 this(asyncOf(task)); 111 } 112 113 public CooperativeHandler(Consumer<Runnable> begin) { 114 this.begin = requireNonNull(begin); 115 } 116 117 /* 118 * Runs the task (though maybe by a different party). 119 * 120 * The recursion which is possible here will have the maximum depth of 1: 121 * 122 * this.handle() 123 * begin.accept() 124 * this.handle() 125 */ 126 public void handle() { 127 while (true) { 128 int s = state.get(); 129 if (s == END) { 130 if (state.compareAndSet(END, BEGIN)) { 131 break; 132 } 133 } else if ((s & BEGIN) != 0) { 134 // Tries to change the state to AGAIN, preserving OFFLOAD bit 135 if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) { 136 return; 137 } 138 } else if ((s & AGAIN) != 0 || s == STOP) { 139 return; 140 } else { 141 throw new InternalError(String.valueOf(s)); 142 } 143 } 144 begin.accept(this::tryEnd); 145 } 146 147 private void tryEnd() { 148 while (true) { 149 int s; 150 while (((s = state.get()) & OFFLOAD) != 0) { 151 // Tries to offload ending of the task to the parent 152 if (state.compareAndSet(s, s & ~OFFLOAD)) { 153 return; 154 } 155 } 156 while (true) { 157 if (s == BEGIN) { 158 if (state.compareAndSet(BEGIN, END)) { 159 return; 160 } 161 } else if (s == AGAIN) { 162 if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) { 163 break; 164 } 165 } else if (s == STOP) { 166 return; 167 } else { 168 throw new InternalError(String.valueOf(s)); 169 } 170 s = state.get(); 171 } 172 begin.accept(this::tryEnd); 173 } 174 } 175 176 /* 177 * Checks whether or not this handler has been permanently stopped. 178 * 179 * Should be used from inside the task to poll the status of the handler, 180 * pretty much the same way as it is done for threads: 181 * 182 * if (!Thread.currentThread().isInterrupted()) { 183 * ... 184 * } 185 */ 186 public boolean isStopped() { 187 return state.get() == STOP; 188 } 189 190 /* 191 * Signals this handler to ignore subsequent invocations to `handle()`. 192 * 193 * If the task has already begun, this invocation will not affect it, 194 * unless the task itself uses `isStopped()` method to check the state 195 * of the handler. 196 */ 197 public void stop() { 198 state.set(STOP); 199 } 200 201 private static Consumer<Runnable> asyncOf(Runnable task) { 202 requireNonNull(task); 203 return ender -> { 204 task.run(); 205 ender.run(); 206 }; 207 } 208 }