src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
Print this page
*** 36,55 ****
package java.util.concurrent;
import java.util.Random;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
/**
! * A thread managed by a {@link ForkJoinPool}. This class is
! * subclassable solely for the sake of adding functionality -- there
! * are no overridable methods dealing with scheduling or execution.
! * However, you can override initialization and termination methods
! * surrounding the main task processing loop. If you do create such a
! * subclass, you will also need to supply a custom {@link
! * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
! * ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
--- 36,57 ----
package java.util.concurrent;
import java.util.Random;
import java.util.Collection;
import java.util.concurrent.locks.LockSupport;
+ import java.util.concurrent.RejectedExecutionException;
/**
! * A thread managed by a {@link ForkJoinPool}, which executes
! * {@link ForkJoinTask}s.
! * This class is subclassable solely for the sake of adding
! * functionality -- there are no overridable methods dealing with
! * scheduling or execution. However, you can override initialization
! * and termination methods surrounding the main task processing loop.
! * If you do create such a subclass, you will also need to supply a
! * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
! * in a {@code ForkJoinPool}.
*
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinWorkerThread extends Thread {
*** 374,392 ****
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
! * invoke @code{super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
int rs = seedGenerator.nextInt();
! seed = rs == 0? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber());
String wid = Integer.toString(poolIndex);
setName("ForkJoinPool-" + pid + "-worker-" + wid);
--- 376,394 ----
}
/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
! * invoke {@code super.onStart()} at the beginning of the method.
* Initialization requires care: Most fields must have legal
* default values, to ensure that attempted accesses from other
* threads work correctly even before this thread starts
* processing tasks.
*/
protected void onStart() {
int rs = seedGenerator.nextInt();
! seed = (rs == 0) ? 1 : rs; // seed must be nonzero
// Allocate name string and arrays in this thread
String pid = Integer.toString(pool.getPoolNumber());
String wid = Integer.toString(poolIndex);
setName("ForkJoinPool-" + pid + "-worker-" + wid);
*** 424,434 ****
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
! * ForkJoinTasks.
*/
public void run() {
Throwable exception = null;
try {
onStart();
--- 426,436 ----
}
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
! * {@link ForkJoinTask}s.
*/
public void run() {
Throwable exception = null;
try {
onStart();
*** 626,635 ****
--- 628,650 ----
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
+ /*
+ * Note: here and in related methods, as a
+ * performance (not correctness) issue, we'd like
+ * to encourage compiler not to arbitrarily
+ * postpone setting sp after successful CAS.
+ * Currently there is no intrinsic for arranging
+ * this, but using Unsafe putOrderedInt may be a
+ * preferable strategy on some compilers even
+ * though its main effect is a pre-, not post-
+ * fence. To simplify possible changes, the option
+ * is left in comments next to the associated
+ * assignments.
+ */
sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
return t;
}
}
*** 882,904 ****
* Removes and cancels all tasks in queue. Can be called from any
* thread.
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
! if (cj != null) {
! currentJoin = null;
cj.cancelIgnoringExceptions();
try {
this.interrupt(); // awaken wait
} catch (SecurityException ignore) {
}
}
ForkJoinTask<?> cs = currentSteal;
! if (cs != null) {
! currentSteal = null;
cs.cancelIgnoringExceptions();
- }
while (base != sp) {
ForkJoinTask<?> t = deqTask();
if (t != null)
t.cancelIgnoringExceptions();
}
--- 897,916 ----
* Removes and cancels all tasks in queue. Can be called from any
* thread.
*/
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
! if (cj != null && cj.status >= 0) {
cj.cancelIgnoringExceptions();
try {
this.interrupt(); // awaken wait
} catch (SecurityException ignore) {
}
}
ForkJoinTask<?> cs = currentSteal;
! if (cs != null && cs.status >= 0)
cs.cancelIgnoringExceptions();
while (base != sp) {
ForkJoinTask<?> t = deqTask();
if (t != null)
t.cancelIgnoringExceptions();
}
*** 957,1098 ****
/**
* Possibly runs some tasks and/or blocks, until task is done.
*
* @param joinMe the task to join
*/
! final void joinTask(ForkJoinTask<?> joinMe) {
// currentJoin only written by this thread; only need ordered store
ForkJoinTask<?> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
! if (sp != base)
! localHelpJoinTask(joinMe);
! if (joinMe.status >= 0)
! pool.awaitJoin(joinMe, this);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
}
/**
! * Run tasks in local queue until given task is done.
*
* @param joinMe the task to join
*/
! private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
! int s;
ForkJoinTask<?>[] q;
! while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
! if (t == null) // lost to a stealer
! break;
! if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
! /*
! * This recheck (and similarly in helpJoinTask)
! * handles cases where joinMe is independently
! * cancelled or forced even though there is other work
! * available. Back out of the pop by putting t back
! * into slot before we commit by writing sp.
! */
! if (joinMe.status < 0) {
! UNSAFE.putObjectVolatile(q, u, t);
! break;
! }
! sp = s;
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
}
! /**
! * Unless terminating, tries to locate and help perform tasks for
! * a stealer of the given task, or in turn one of its stealers.
! * Traces currentSteal->currentJoin links looking for a thread
! * working on a descendant of the given task and with a non-empty
! * queue to steal back and execute tasks from.
! *
! * The implementation is very branchy to cope with potential
! * inconsistencies or loops encountering chains that are stale,
! * unknown, or of length greater than MAX_HELP_DEPTH links. All
! * of these cases are dealt with by just returning back to the
! * caller, who is expected to retry if other join mechanisms also
! * don't work out.
! *
! * @param joinMe the task to join
! */
! final void helpJoinTask(ForkJoinTask<?> joinMe) {
! ForkJoinWorkerThread[] ws;
! int n;
! if (joinMe.status < 0) // already done
! return;
! if ((runState & TERMINATING) != 0) { // cancel if shutting down
! joinMe.cancelIgnoringExceptions();
! return;
! }
! if ((ws = pool.workers) == null || (n = ws.length) <= 1)
! return; // need at least 2 workers
!
ForkJoinTask<?> task = joinMe; // base of chain
ForkJoinWorkerThread thread = this; // thread with stolen task
! for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
// Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
if (v == null || v.currentSteal != task) {
for (int j = 0; ; ++j) { // search array
if (j < n) {
ForkJoinTask<?> vs;
if ((v = ws[j]) != null &&
(vs = v.currentSteal) != null) {
! if (joinMe.status < 0 || task.status < 0)
! return; // stale or done
if (vs == task) {
thread.stealHint = j;
break; // save hint for next time
}
}
}
else
! return; // no stealer
}
}
! for (;;) { // Try to help v, using specialized form of deqTask
if (joinMe.status < 0)
! return;
int b = v.base;
ForkJoinTask<?>[] q = v.queue;
if (b == v.sp || q == null)
! break;
int i = (q.length - 1) & b;
long u = (i << qShift) + qBase;
ForkJoinTask<?> t = q[i];
- int pid = poolIndex;
- ForkJoinTask<?> ps = currentSteal;
if (task.status < 0)
! return; // stale or done
! if (t != null && v.base == b++ &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
! if (joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
! return; // back out on cancel
}
v.base = b;
v.stealHint = pid;
! UNSAFE.putOrderedObject(this, currentStealOffset, t);
t.quietlyExec();
! UNSAFE.putOrderedObject(this, currentStealOffset, ps);
}
}
// Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin;
if (task.status < 0 || next == null || next == task ||
joinMe.status < 0)
! return;
task = next;
thread = v;
}
}
/**
* Implements ForkJoinTask.getSurplusQueuedTaskCount().
* Returns an estimate of the number of tasks, offset by a
* function of number of idle workers.
--- 969,1126 ----
/**
* Possibly runs some tasks and/or blocks, until task is done.
*
* @param joinMe the task to join
+ * @param timed true if use timed wait
+ * @param nanos wait time if timed
*/
! final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
// currentJoin only written by this thread; only need ordered store
ForkJoinTask<?> prevJoin = currentJoin;
UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
! pool.awaitJoin(joinMe, this, timed, nanos);
UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
}
/**
! * Tries to locate and help perform tasks for a stealer of the
! * given task, or in turn one of its stealers. Traces
! * currentSteal->currentJoin links looking for a thread working on
! * a descendant of the given task and with a non-empty queue to
! * steal back and execute tasks from.
*
+ * The implementation is very branchy to cope with potential
+ * inconsistencies or loops encountering chains that are stale,
+ * unknown, or of length greater than MAX_HELP_DEPTH links. All
+ * of these cases are dealt with by just returning back to the
+ * caller, who is expected to retry if other join mechanisms also
+ * don't work out.
+ *
* @param joinMe the task to join
+ * @param running if false, then must update pool count upon
+ * running a task
+ * @return value of running on exit
*/
! final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
! /*
! * Initial checks to (1) abort if terminating; (2) clean out
! * old cancelled tasks from local queue; (3) if joinMe is next
! * task, run it; (4) omit scan if local queue nonempty (since
! * it may contain non-descendents of joinMe).
! */
! ForkJoinPool p = pool;
! for (;;) {
ForkJoinTask<?>[] q;
! int s;
! if (joinMe.status < 0)
! return running;
! else if ((runState & TERMINATING) != 0) {
! joinMe.cancelIgnoringExceptions();
! return running;
! }
! else if ((s = sp) == base || (q = queue) == null)
! break; // queue empty
! else {
int i = (q.length - 1) & --s;
long u = (i << qShift) + qBase; // raw offset
ForkJoinTask<?> t = q[i];
! if (t == null)
! break; // lost to a stealer
! else if (t != joinMe && t.status >= 0)
! return running; // cannot safely help
! else if ((running ||
! (running = p.tryIncrementRunningCount())) &&
! UNSAFE.compareAndSwapObject(q, u, t, null)) {
! sp = s; // putOrderedInt may encourage more timely write
// UNSAFE.putOrderedInt(this, spOffset, s);
t.quietlyExec();
}
}
}
! int n; // worker array size
! ForkJoinWorkerThread[] ws = p.workers;
! if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
ForkJoinTask<?> task = joinMe; // base of chain
ForkJoinWorkerThread thread = this; // thread with stolen task
!
! outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
// Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
if (v == null || v.currentSteal != task) {
for (int j = 0; ; ++j) { // search array
if (j < n) {
ForkJoinTask<?> vs;
if ((v = ws[j]) != null &&
(vs = v.currentSteal) != null) {
! if (joinMe.status < 0)
! break outer;
if (vs == task) {
+ if (task.status < 0)
+ break outer; // stale
thread.stealHint = j;
break; // save hint for next time
}
}
}
else
! break outer; // no stealer
}
}
!
! // Try to help v, using specialized form of deqTask
! for (;;) {
if (joinMe.status < 0)
! break outer;
int b = v.base;
ForkJoinTask<?>[] q = v.queue;
if (b == v.sp || q == null)
! break; // empty
int i = (q.length - 1) & b;
long u = (i << qShift) + qBase;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
! break outer; // stale
! if (t != null &&
! (running ||
! (running = p.tryIncrementRunningCount())) &&
! v.base == b++ &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
! if (t != joinMe && joinMe.status < 0) {
UNSAFE.putObjectVolatile(q, u, t);
! break outer; // joinMe cancelled; back out
}
v.base = b;
+ if (t.status >= 0) {
+ ForkJoinTask<?> ps = currentSteal;
+ int pid = poolIndex;
v.stealHint = pid;
! UNSAFE.putOrderedObject(this,
! currentStealOffset, t);
t.quietlyExec();
! UNSAFE.putOrderedObject(this,
! currentStealOffset, ps);
}
}
+ else if ((runState & TERMINATING) != 0) {
+ joinMe.cancelIgnoringExceptions();
+ break outer;
+ }
+ }
+
// Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin;
if (task.status < 0 || next == null || next == task ||
joinMe.status < 0)
! break; // done, stale, dead-end, or cyclic
task = next;
thread = v;
}
}
+ return running;
+ }
/**
* Implements ForkJoinTask.getSurplusQueuedTaskCount().
* Returns an estimate of the number of tasks, offset by a
* function of number of idle workers.