src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
Print this page
@@ -36,20 +36,22 @@
package java.util.concurrent;
import java.util.Random;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.RejectedExecutionException;
/**
- * A thread managed by a {@link ForkJoinPool}. This class is
- * subclassable solely for the sake of adding functionality -- there
- * are no overridable methods dealing with scheduling or execution.
- * However, you can override initialization and termination methods
- * surrounding the main task processing loop. If you do create such a
- * subclass, you will also need to supply a custom {@link
- * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
- * ForkJoinPool}.
+ * A thread managed by a {@link ForkJoinPool}, which executes
+ * {@link ForkJoinTask}s.
+ * This class is subclassable solely for the sake of adding
+ * functionality -- there are no overridable methods dealing with
+ * scheduling or execution. However, you can override initialization
+ * and termination methods surrounding the main task processing loop.
+ * If you do create such a subclass, you will also need to supply a
+ * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
+ * in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
@@ -374,19 +376,19 @@
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
- * invoke @code{super.onStart()} at the beginning of the method.
+ * invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
int rs = seedGenerator.nextInt();
- seed = rs == 0? 1 : rs; // seed must be nonzero
+ seed = (rs == 0) ? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber());
String wid = Integer.toString(poolIndex);
setName("ForkJoinPool-" + pid + "-worker-" + wid);
@@ -424,11 +426,11 @@
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
- * ForkJoinTasks.
+ * {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
@@ -626,10 +628,23 @@
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ /*
+ * Note: here and in related methods, as a
+ * performance (not correctness) issue, we'd like
+ * to encourage compiler not to arbitrarily
+ * postpone setting sp after successful CAS.
+ * Currently there is no intrinsic for arranging
+ * this, but using Unsafe putOrderedInt may be a
+ * preferable strategy on some compilers even
+ * though its main effect is a pre-, not post-
+ * fence. To simplify possible changes, the option
+ * is left in comments next to the associated
+ * assignments.
+ */
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
return t;
}
}
@@ -882,23 +897,20 @@
* Removes and cancels all tasks in queue. Can be called from any
* thread.
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
- if (cj != null) {
- currentJoin = null;
+ if (cj != null && cj.status >= 0) {
cj.cancelIgnoringExceptions();
try {
this.interrupt(); // awaken wait
} catch (SecurityException ignore) {
}
}
ForkJoinTask<?> cs = currentSteal;
- if (cs != null) {
- currentSteal = null;
+ if (cs != null && cs.status >= 0)
cs.cancelIgnoringExceptions();
- }
while (base != sp) {
ForkJoinTask<?> t = deqTask();
if (t != null)
t.cancelIgnoringExceptions();
}
@@ -957,142 +969,158 @@
/**
* Possibly runs some tasks and/or blocks, until task is done.
*
* @param joinMe the task to join
+ * @param timed true if use timed wait
+ * @param nanos wait time if timed
*/
- final void joinTask(ForkJoinTask<?> joinMe) {
+ final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
// currentJoin only written by this thread; only need ordered store
ForkJoinTask<?> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
- if (sp != base)
- localHelpJoinTask(joinMe);
- if (joinMe.status >= 0)
- pool.awaitJoin(joinMe, this);
+ pool.awaitJoin(joinMe, this, timed, nanos);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
}
/**
- * Run tasks in local queue until given task is done.
+ * Tries to locate and help perform tasks for a stealer of the
+ * given task, or in turn one of its stealers. Traces
+ * currentSteal->currentJoin links looking for a thread working on
+ * a descendant of the given task and with a non-empty queue to
+ * steal back and execute tasks from.
*
+ * The implementation is very branchy to cope with potential
+ * inconsistencies or loops encountering chains that are stale,
+ * unknown, or of length greater than MAX_HELP_DEPTH links. All
+ * of these cases are dealt with by just returning back to the
+ * caller, who is expected to retry if other join mechanisms also
+ * don't work out.
+ *
* @param joinMe the task to join
+ * @param running if false, then must update pool count upon
+ * running a task
+ * @return value of running on exit
*/
- private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
- int s;
+ final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
+ /*
+ * Initial checks to (1) abort if terminating; (2) clean out
+ * old cancelled tasks from local queue; (3) if joinMe is next
+ * task, run it; (4) omit scan if local queue nonempty (since
+ * it may contain non-descendents of joinMe).
+ */
+ ForkJoinPool p = pool;
+ for (;;) {
ForkJoinTask<?>[] q;
- while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
+ int s;
+ if (joinMe.status < 0)
+ return running;
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ return running;
+ }
+ else if ((s = sp) == base || (q = queue) == null)
+ break; // queue empty
+ else {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
- if (t == null) // lost to a stealer
- break;
- if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
- /*
- * This recheck (and similarly in helpJoinTask)
- * handles cases where joinMe is independently
- * cancelled or forced even though there is other work
- * available. Back out of the pop by putting t back
- * into slot before we commit by writing sp.
- */
- if (joinMe.status < 0) {
- UNSAFE.putObjectVolatile(q, u, t);
- break;
- }
- sp = s;
+ if (t == null)
+ break; // lost to a stealer
+ else if (t != joinMe && t.status >= 0)
+ return running; // cannot safely help
+ else if ((running ||
+ (running = p.tryIncrementRunningCount())) &&
+ UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
}
- /**
- * Unless terminating, tries to locate and help perform tasks for
- * a stealer of the given task, or in turn one of its stealers.
- * Traces currentSteal->currentJoin links looking for a thread
- * working on a descendant of the given task and with a non-empty
- * queue to steal back and execute tasks from.
- *
- * The implementation is very branchy to cope with potential
- * inconsistencies or loops encountering chains that are stale,
- * unknown, or of length greater than MAX_HELP_DEPTH links. All
- * of these cases are dealt with by just returning back to the
- * caller, who is expected to retry if other join mechanisms also
- * don't work out.
- *
- * @param joinMe the task to join
- */
- final void helpJoinTask(ForkJoinTask<?> joinMe) {
- ForkJoinWorkerThread[] ws;
- int n;
- if (joinMe.status < 0) // already done
- return;
- if ((runState & TERMINATING) != 0) { // cancel if shutting down
- joinMe.cancelIgnoringExceptions();
- return;
- }
- if ((ws = pool.workers) == null || (n = ws.length) <= 1)
- return; // need at least 2 workers
-
+ int n; // worker array size
+ ForkJoinWorkerThread[] ws = p.workers;
+ if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
ForkJoinTask<?> task = joinMe; // base of chain
ForkJoinWorkerThread thread = this; // thread with stolen task
- for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
+
+ outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
// Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
if (v == null || v.currentSteal != task) {
for (int j = 0; ; ++j) { // search array
if (j < n) {
ForkJoinTask<?> vs;
if ((v = ws[j]) != null &&
(vs = v.currentSteal) != null) {
- if (joinMe.status < 0 || task.status < 0)
- return; // stale or done
+ if (joinMe.status < 0)
+ break outer;
if (vs == task) {
+ if (task.status < 0)
+ break outer; // stale
thread.stealHint = j;
break; // save hint for next time
}
}
}
else
- return; // no stealer
+ break outer; // no stealer
}
}
- for (;;) { // Try to help v, using specialized form of deqTask
+
+ // Try to help v, using specialized form of deqTask
+ for (;;) {
if (joinMe.status < 0)
- return;
+ break outer;
int b = v.base;
ForkJoinTask<?>[] q = v.queue;
if (b == v.sp || q == null)
- break;
+ break; // empty
int i = (q.length - 1) & b;
long u = (i << qShift) + qBase;
ForkJoinTask<?> t = q[i];
- int pid = poolIndex;
- ForkJoinTask<?> ps = currentSteal;
if (task.status < 0)
- return; // stale or done
- if (t != null && v.base == b++ &&
+ break outer; // stale
+ if (t != null &&
+ (running ||
+ (running = p.tryIncrementRunningCount())) &&
+ v.base == b++ &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
- if (joinMe.status < 0) {
+ if (t != joinMe && joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
- return; // back out on cancel
+ break outer; // joinMe cancelled; back out
}
v.base = b;
+ if (t.status >= 0) {
+ ForkJoinTask<?> ps = currentSteal;
+ int pid = poolIndex;
v.stealHint = pid;
- UNSAFE.putOrderedObject(this, currentStealOffset, t);
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, t);
t.quietlyExec();
- UNSAFE.putOrderedObject(this, currentStealOffset, ps);
+ UNSAFE.putOrderedObject(this,
+ currentStealOffset, ps);
}
}
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ break outer;
+ }
+ }
+
// Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin;
if (task.status < 0 || next == null || next == task ||
joinMe.status < 0)
- return;
+ break; // done, stale, dead-end, or cyclic
task = next;
thread = v;
}
}
+ return running;
+ }
/**
* Implements ForkJoinTask.getSurplusQueuedTaskCount().
* Returns an estimate of the number of tasks, offset by a
* function of number of idle workers.