/* * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.lang; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.lang.reflect.Constructor; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.security.ProtectionDomain; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import static java.util.concurrent.TimeUnit.NANOSECONDS; /** * A lightweight thread. A Fiber is a user mode thread, it is always * scheduled by the Java virtual machine rather than the operating system. * *
While a {@code Fiber} is a {@code Thread} object, Fibers do not support * all features of regular threads. In particular, a Fiber is not an active * thread in its {@link ThreadGroup thread group} and so it not enumerated * or acted on by thread group operations. A Fiber does not inherit the initial * values of {@link InheritableThreadLocal inheritable thread-local variables}. * Finally, Fibers do not support setting an {@link * Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler) uncaught exception * handler}, and operations such as {@link Thread#stop() Thread.stop}, {@link * Thread#suspend() Thread.suspend}, and {@link Thread#resume() Thread.resume}. */ public final class Fiber extends Thread { private static final ContinuationScope FIBER_SCOPE = new ContinuationScope() { }; private static final ThreadGroup DEFAULT_GROUP = defaultThreadGroup(); private static final Executor DEFAULT_SCHEDULER = defaultScheduler(); private static final ScheduledExecutorService UNPARKER = delayedTaskScheduler(); private static final AccessControlContext INNOCUOUS_ACC = innocuousACC(); private static final VarHandle STATE; private static final VarHandle PARK_PERMIT; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(Fiber.class, "state", short.class); PARK_PERMIT = l.findVarHandle(Fiber.class, "parkPermit", boolean.class); } catch (Exception e) { throw new InternalError(e); } } // scheduler and continuation private final Executor scheduler; private final Continuation cont; // fiber state private static final short ST_NEW = 0; private static final short ST_STARTED = 1; private static final short ST_RUNNING = 2; private static final short ST_PARKING1 = 3; private static final short ST_PARKING2 = 4; private static final short ST_PARKED = 5; private static final short ST_PINNED = 6; private static final short ST_TERMINATED = 99; private volatile short state; // park/unpark and join/await support private volatile boolean parkPermit; private final ReentrantLock lock = new ReentrantLock(); private Condition parking; // created lazily private Condition termination; // created lazily // Thread.interrupt support // // Lock operations ignore interrupts and so may call Thread.interrupted() // or Thread.currentThread().interrupt() to clear or reassert the interrupt // status. Changes to the following fields that require synchronization must // therefore synchronize on the lock object and not use explicit lock and // unlock operations. private volatile Thread kernelThread; // kernel thread when running or pinned private volatile boolean interrupted; // interrupt status /** * Creates a new {@code Fiber} to run the given task with the default * scheduler. The {@link #start() start} method must be invoked to start its * execution. * * @param task the task to execute * @throws NullPointerException if task is {@code null} */ public Fiber(Runnable task) { this(DEFAULT_SCHEDULER, task); } /** * Creates a new {@code Fiber} to run the given task with the given scheduler. * The {@link #start() start} method must be invoked to start its execution. * * @param scheduler the scheduler * @param task the task to execute * @throws SecurityManager if a security manager is set and it denies * {@link RuntimePermission}{@code ("fiberScheduler")} * @throws NullPointerException if the scheduler or task is {@code null} */ public Fiber(Executor scheduler, Runnable task) { super(DEFAULT_GROUP, "Fiber", threadACC(scheduler), /*inheritThreadLocals*/false); Objects.requireNonNull(scheduler); Objects.requireNonNull(task); SecurityManager sm; if (scheduler != DEFAULT_SCHEDULER && (sm = System.getSecurityManager()) != null) { sm.checkPermission(new RuntimePermission("fiberScheduler")); } this.scheduler = scheduler; this.cont = new Continuation(FIBER_SCOPE, task) { @Override protected void onPinned(int reason) { yieldFailed(); } }; } /** * Creates a new {@code Fiber} to run the given task with the default * scheduler and starts its execution. * * @param task the task to execute * @return the fiber * @throws NullPointerException if task is {@code null} */ public static Fiber execute(Runnable task) { Fiber f = new Fiber(task); f.start(); return f; } /** * Creates a new {@code Fiber} to run the given task with the given * scheduler and starts its execution. * * @param scheduler the scheduler * @param task the task to execute * @return the fiber * @throws RejectedExecutionException if the scheduler cannot accept a task * @throws SecurityManager if a security manager is set and it denies * {@link RuntimePermission}{@code ("fiberScheduler")} * @throws NullPointerException if the scheduler or task is {@code null} */ public static Fiber execute(Executor scheduler, Runnable task) { Fiber f = new Fiber(scheduler, task); f.start(); return f; } /** * Causes this fiber to be scheduled for execution. * * @throws IllegalStateException if already started * @throws RejectedExecutionException if using a scheduler and it cannot * accept a task */ @Override public void start() { if (!stateCompareAndSet(ST_NEW, ST_STARTED)) throw new IllegalStateException(); scheduler.execute(this::runContinuation); } /** * Runs or continues execution of the continuation on the current kernel thread. */ private void runContinuation() { assert Thread.currentKernelThread().getFiber() == null; // set state to RUNNING if not already started if (!stateCompareAndSet(ST_STARTED, ST_RUNNING)) { // already started // If this fiber is parking on another kernel thread then wait for // the continuation to yield before it continues on the current kernel // thread. If the parking fails, due to the continuation being pinned, // then execution will continue on the original kernel thread. boolean pinned = waitIfParking(); if (pinned) return; // continue on this kernel thread if fiber was parked if (stateCompareAndSet(ST_PARKED, ST_RUNNING)) { parkPermitGetAndSet(false); // consume parking permit } else { return; } } attachKernelThread(); try { cont.run(); } finally { detachKernelThread(); if (cont.isDone()) { afterTerminate(); } else { afterYield(); } } } /** * Invoke before continuation is started or continued to attach this fiber * to the current kernel thread. */ private void attachKernelThread() { Thread t = Thread.currentKernelThread(); // set kernel thread before forwarding interrupt status kernelThread = t; if (interrupted) t.interrupt(); // set the fiber so that Thread.currentThread() returns the Fiber object t.setFiber(this); } /** * Invoke after a continuation yields or terminates to detach this fiber * from the current kernel thread */ private void detachKernelThread() { Thread t = Thread.currentKernelThread(); t.setFiber(null); // synchronize with Thread.interrupt to ensure that the kernel thread // is not interrupted after it has been detached. synchronized (lock) { kernelThread = null; } t.getAndClearInterrupt(); } /** * Invoke after yielding to set the state to ST_PARKED and notify any * threads waiting for the fiber to park. */ private void afterYield() { int oldState = stateGetAndSet(ST_PARKED); assert oldState == ST_PARKING2; // notify in case another thread is attempt to continue lock.lock(); try { Condition parking = this.parking; if (parking != null) { parking.signalAll(); } } finally { lock.unlock(); } } /** * Invoke after the continuation completes to set the state to ST_TERMINATED * and notify anyone waiting via the join method. */ private void afterTerminate() { int oldState = stateGetAndSet(ST_TERMINATED); assert oldState == ST_RUNNING; // notify everyone waiting for this fiber to terminate lock.lock(); try { Condition termination = this.termination; if (termination != null) { termination.signalAll(); } } finally { lock.unlock(); } } /** * If this fiber is parking then wait for the continuation to yield before * it continues on this kernel thread. If the yield fails then the kernel * thread executing it will park and needs to be signalled so that execution * continues on the original kernel thread. * * @return true if pinned */ private boolean waitIfParking() { short s; while ((s = stateGet()) == ST_PARKING1) { Thread.yield(); } if (s == ST_PARKING2 || s == ST_PINNED) { lock.lock(); try { Condition parking = parkingCondition(); while ((s = stateGet()) == ST_PARKING2) { parking.awaitUninterruptibly(); } if (s == ST_PINNED) { // signal so that execution continues on original thread parking.signalAll(); return true; } } finally { lock.unlock(); } } return false; } /** * Invoked by onPinned when the continuation cannot yield due to a * synchronized or native frame on the continuation stack. This method sets * the fiber state to ST_PINNED and parks the kernel thread. */ private void yieldFailed() { // switch to kernel thread Thread t = Thread.currentKernelThread(); t.setFiber(null); if (!stateCompareAndSet(ST_PARKING2, ST_PINNED)) throw new InternalError(); boolean parkInterrupted = false; lock.lock(); try { parkingCondition().await(); } catch (InterruptedException e) { parkInterrupted = true; } finally { lock.unlock(); // continue running on the kernel thread if (!stateCompareAndSet(ST_PINNED, ST_RUNNING)) throw new InternalError(); // consume parking permit parkPermitGetAndSet(false); // switch back to fiber t.setFiber(this); } // restore interrupt status if (parkInterrupted) { // no need to synchronize here as the interrupt is never cleared // asynchronously interrupted = true; t.interrupt(); } } /** * Disables the current fiber for scheduling purposes. * *
If this fiber has already been {@link #unpark() unparked} then the * parking permit is consumed and this method completes immediately; * otherwise the current fiber is disabled for scheduling purposes and lies * dormant until it is {@link #unpark() unparked} or {@link #interrupt() * interrupted}. * * @throws IllegalCallerException if not called from a fiber */ public static void park() { Fiber fiber = Thread.currentKernelThread().getFiber(); if (fiber == null) throw new IllegalCallerException(); fiber.maybePark(); } /** * Disables the current fiber for scheduling purposes for up to the * specified waiting time. * *
If this fiber has already been {@link #unpark() unparked} then the * parking permit is consumed and this method completes immediately; * otherwise if the time to wait is greater than zero then the current fiber * is disabled for scheduling purposes and lies dormant until it is {@link * #unpark unparked}, {@link #interrupt() interrupted}, or the waiting time * elapses. * * @param nanos the maximum number of nanoseconds to wait. * * @throws IllegalCallerException if not called from a fiber */ public static void parkNanos(long nanos) { Thread t = Thread.currentKernelThread(); Fiber fiber = t.getFiber(); if (fiber == null) throw new IllegalCallerException("not a fiber"); if (nanos > 0) { // switch to kernel thread when submitting task to unpark t.setFiber(null); Future> unparker; try { unparker = UNPARKER.schedule(fiber::unpark, nanos, NANOSECONDS); } finally { t.setFiber(fiber); } // now park try { fiber.maybePark(); } finally { unparker.cancel(false); } } else { // consume permit when not parking fiber.parkPermitGetAndSet(false); } } /** * Re-enables this fiber for scheduling. If the fiber was {@link #park() * parked} then it will be unblocked, otherwise its next call to {@code park} * or {@link #parkNanos(long) parkNanos} is guaranteed not to block. * * @throws IllegalStateException if the fiber has not started * @throws RejectedExecutionException if using a scheduler and it cannot * accept a task * @return this fiber */ public Fiber unpark() { if (stateGet() == ST_NEW) throw new IllegalStateException("fiber not started"); Thread t = Thread.currentKernelThread(); Fiber fiber = t.getFiber(); if (!parkPermitGetAndSet(true) && fiber != this) { // switch to kernel thread when submitting task to continue if (fiber != null) { t.setFiber(null); } try { scheduler.execute(this::runContinuation); } finally { if (fiber != null) { t.setFiber(fiber); } } } return this; } /** * Park or complete immediately. * *
If this fiber has already been unparked or its interrupt status is * set then this method completes immediately; otherwise it yields. */ private void maybePark() { assert Thread.currentKernelThread().getFiber() == this; // prepare to park; important to do this before consuming the parking // permit and yielding if (!stateCompareAndSet(ST_RUNNING, ST_PARKING1)) throw new InternalError(); // consume permit if available, and continue rather than park if (parkPermitGetAndSet(false) || interrupted) { if (!stateCompareAndSet(ST_PARKING1, ST_RUNNING)) throw new InternalError(); return; } // attempt to yield if (!stateCompareAndSet(ST_PARKING1, ST_PARKING2)) throw new InternalError(); Continuation.yield(FIBER_SCOPE); // continued assert stateGet() == ST_RUNNING; } /** * Waits for this fiber to terminate. * *
If the current thread is interrupted while waiting then it will * continue to wait. When the thread does return from this method then its * interrupt status will be set. * * @throws IllegalStateException if the fiber has not started * @return this fiber */ public Fiber await() { boolean joinInterrupted = false; boolean terminated = false; while (!terminated) { try { terminated = joinNanos(0); } catch (InterruptedException e) { joinInterrupted = true; } } if (joinInterrupted) Thread.currentThread().interrupt(); return this; } /** * Waits for this fiber to terminate. This method does not wait if the time * to wait is less than or equal to zero. * *
If the current thread is interrupted while waiting then it will
* continue to wait. When the thread does return from this method then its
* interrupt status will be set.
*
* @param nanos the maximum time to wait, in nanoseconds
* @throws IllegalStateException if the fiber has not started
* @return this fiber
*/
public Fiber awaitNanos(long nanos) {
if (stateGet() == ST_NEW) {
throw new IllegalStateException("fiber not started");
}
if (nanos > 0) {
boolean joinInterrupted = false;
boolean terminated = false;
// wait until the fiber terminates or timeout elapses
while (!terminated && nanos > 0) {
long startTime = System.nanoTime();
try {
terminated = joinNanos(nanos);
} catch (InterruptedException e) {
joinInterrupted = true;
}
nanos -= (System.nanoTime() - startTime);
}
// restore interrupt status
if (joinInterrupted) {
Thread.currentThread().interrupt();
}
}
return this;
}
/**
* Waits up to {@code nanos} nanoseconds for this fiber to terminate.
* A timeout of {@code 0} means to wait forever.
*
* @throws InterruptedException if interrupted while waiting
* @throws IllegalArgumentException if nanos is negative
* @throws IllegalStateException if the fiber has not been started
* @return true if the fiber has terminated
*/
boolean joinNanos(long nanos) throws InterruptedException {
if (nanos < 0) {
throw new IllegalArgumentException();
}
lock.lock();
try {
short s = stateGet();
if (s == ST_NEW) {
throw new IllegalStateException("fiber not started");
} else if (s == ST_TERMINATED) {
// already terminated
return true;
}
if (nanos == 0) {
terminationCondition().await();
} else {
terminationCondition().await(nanos, NANOSECONDS);
}
} finally {
lock.unlock();
}
return (stateGet() == ST_TERMINATED);
}
@Override
public void interrupt() {
// synchronize with detachKernelThread to ensure the kernel
// thread is not interrupted after it has been detached
synchronized (lock) {
// set fiber interrupt status and close channel if fiber is
// blocked in an I/O operation on an InterruptibleChannel.
super.interrupt();
// interrupt kernel thread
Thread t = kernelThread;
if (t != null) t.interrupt();
}
unpark();
}
@Override
public boolean isInterrupted() {
return interrupted;
}
@Override
void doInterrupt() {
assert Thread.holdsLock(lock);
interrupted = true;
}
/**
* Clears the interrupt status and returns the old value. If set, this
* method clears the fiber's interrupt status and the interrupt status of
* the kernel thread.
*/
@Override
boolean getAndClearInterrupt() {
assert Thread.currentThread() == this && kernelThread != null;
boolean oldValue = interrupted;
if (oldValue) {
synchronized (lock) {
interrupted = false;
kernelThread.getAndClearInterrupt();
}
}
return oldValue;
}
@Override
public Thread.State getState() {
switch (stateGet()) {
case ST_NEW:
return State.NEW;
case ST_STARTED:
return State.RUNNABLE;
case ST_RUNNING:
Thread t = kernelThread;
if (t != null) {
return t.getState();
} else {
return State.RUNNABLE;
}
case ST_PARKING1:
case ST_PARKING2:
return State.RUNNABLE; // not yet waiting
case ST_PARKED:
case ST_PINNED:
return State.WAITING;
case ST_TERMINATED:
return State.TERMINATED;
default:
throw new InternalError();
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("Fiber[");
Thread t = kernelThread;
if (t != null) {
sb.append(t.getName());
ThreadGroup g = t.getThreadGroup();
if (g != null) {
sb.append(",");
sb.append(g.getName());
}
} else {
sb.append("