--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/CooperativeHandler.java 2017-11-30 04:06:33.704576347 -0800 +++ /dev/null 2017-10-28 22:49:55.551349757 -0700 @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http.internal.websocket; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import static java.util.Objects.requireNonNull; - -/* - * A synchronization aid that assists a number of parties in running a task - * in a mutually exclusive fashion. - * - * To run the task, a party invokes `handle`. To permanently prevent the task - * from subsequent runs, the party invokes `stop`. - * - * The parties do not have to operate in different threads. - * - * The task can be either synchronous or asynchronous. - * - * If the task is synchronous, it is represented with `Runnable`. - * The handler invokes `Runnable.run` to run the task. - * - * If the task is asynchronous, it is represented with `Consumer`. - * The handler invokes `Consumer.accept(end)` to begin the task. The task - * invokes `end.run()` when it has ended. - * - * The next run of the task will not begin until the previous run has finished. - * - * The task may invoke `handle()` by itself, it's a normal situation. - */ -public final class CooperativeHandler { - - /* - Since the task is fixed and known beforehand, no blocking synchronization - (locks, queues, etc.) is required. The job can be done solely using - nonblocking primitives. - - The machinery below addresses two problems: - - 1. Running the task in a sequential order (no concurrent runs): - - begin, end, begin, end... - - 2. Avoiding indefinite recursion: - - begin - end - begin - end - ... - - Problem #1 is solved with a finite state machine with 4 states: - - BEGIN, AGAIN, END, and STOP. - - Problem #2 is solved with a "state modifier" OFFLOAD. - - Parties invoke `handle()` to signal the task must run. A party that has - invoked `handle()` either begins the task or exploits the party that is - either beginning the task or ending it. - - The party that is trying to end the task either ends it or begins it - again. - - To avoid indefinite recursion, before re-running the task tryEnd() sets - OFFLOAD bit, signalling to its "child" tryEnd() that this ("parent") - tryEnd() is available and the "child" must offload the task on to the - "parent". Then a race begins. Whichever invocation of tryEnd() manages - to unset OFFLOAD bit first does not do the work. - - There is at most 1 thread that is beginning the task and at most 2 - threads that are trying to end it: "parent" and "child". In case of a - synchronous task "parent" and "child" are the same thread. - */ - - private static final int OFFLOAD = 1; - private static final int AGAIN = 2; - private static final int BEGIN = 4; - private static final int STOP = 8; - private static final int END = 16; - - private final AtomicInteger state = new AtomicInteger(END); - private final Consumer begin; - - public CooperativeHandler(Runnable task) { - this(asyncOf(task)); - } - - public CooperativeHandler(Consumer begin) { - this.begin = requireNonNull(begin); - } - - /* - * Runs the task (though maybe by a different party). - * - * The recursion which is possible here will have the maximum depth of 1: - * - * this.handle() - * begin.accept() - * this.handle() - */ - public void handle() { - while (true) { - int s = state.get(); - if (s == END) { - if (state.compareAndSet(END, BEGIN)) { - break; - } - } else if ((s & BEGIN) != 0) { - // Tries to change the state to AGAIN, preserving OFFLOAD bit - if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) { - return; - } - } else if ((s & AGAIN) != 0 || s == STOP) { - return; - } else { - throw new InternalError(String.valueOf(s)); - } - } - begin.accept(this::tryEnd); - } - - private void tryEnd() { - while (true) { - int s; - while (((s = state.get()) & OFFLOAD) != 0) { - // Tries to offload ending of the task to the parent - if (state.compareAndSet(s, s & ~OFFLOAD)) { - return; - } - } - while (true) { - if (s == BEGIN) { - if (state.compareAndSet(BEGIN, END)) { - return; - } - } else if (s == AGAIN) { - if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) { - break; - } - } else if (s == STOP) { - return; - } else { - throw new InternalError(String.valueOf(s)); - } - s = state.get(); - } - begin.accept(this::tryEnd); - } - } - - /* - * Checks whether or not this handler has been permanently stopped. - * - * Should be used from inside the task to poll the status of the handler, - * pretty much the same way as it is done for threads: - * - * if (!Thread.currentThread().isInterrupted()) { - * ... - * } - */ - public boolean isStopped() { - return state.get() == STOP; - } - - /* - * Signals this handler to ignore subsequent invocations to `handle()`. - * - * If the task has already begun, this invocation will not affect it, - * unless the task itself uses `isStopped()` method to check the state - * of the handler. - */ - public void stop() { - state.set(STOP); - } - - private static Consumer asyncOf(Runnable task) { - requireNonNull(task); - return ender -> { - task.run(); - ender.run(); - }; - } -}