src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java

Print this page

        

*** 36,55 **** package java.util.concurrent; import java.util.Random; import java.util.Collection; import java.util.concurrent.locks.LockSupport; /** ! * A thread managed by a {@link ForkJoinPool}. This class is ! * subclassable solely for the sake of adding functionality -- there ! * are no overridable methods dealing with scheduling or execution. ! * However, you can override initialization and termination methods ! * surrounding the main task processing loop. If you do create such a ! * subclass, you will also need to supply a custom {@link ! * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code ! * ForkJoinPool}. * * @since 1.7 * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread { --- 36,57 ---- package java.util.concurrent; import java.util.Random; import java.util.Collection; import java.util.concurrent.locks.LockSupport; + import java.util.concurrent.RejectedExecutionException; /** ! * A thread managed by a {@link ForkJoinPool}, which executes ! * {@link ForkJoinTask}s. ! * This class is subclassable solely for the sake of adding ! * functionality -- there are no overridable methods dealing with ! * scheduling or execution. However, you can override initialization ! * and termination methods surrounding the main task processing loop. ! * If you do create such a subclass, you will also need to supply a ! * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it ! * in a {@code ForkJoinPool}. * * @since 1.7 * @author Doug Lea */ public class ForkJoinWorkerThread extends Thread {
*** 374,392 **** } /** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must ! * invoke @code{super.onStart()} at the beginning of the method. * Initialization requires care: Most fields must have legal * default values, to ensure that attempted accesses from other * threads work correctly even before this thread starts * processing tasks. */ protected void onStart() { int rs = seedGenerator.nextInt(); ! seed = rs == 0? 1 : rs; // seed must be nonzero // Allocate name string and arrays in this thread String pid = Integer.toString(pool.getPoolNumber()); String wid = Integer.toString(poolIndex); setName("ForkJoinPool-" + pid + "-worker-" + wid); --- 376,394 ---- } /** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must ! * invoke {@code super.onStart()} at the beginning of the method. * Initialization requires care: Most fields must have legal * default values, to ensure that attempted accesses from other * threads work correctly even before this thread starts * processing tasks. */ protected void onStart() { int rs = seedGenerator.nextInt(); ! seed = (rs == 0) ? 1 : rs; // seed must be nonzero // Allocate name string and arrays in this thread String pid = Integer.toString(pool.getPoolNumber()); String wid = Integer.toString(poolIndex); setName("ForkJoinPool-" + pid + "-worker-" + wid);
*** 424,434 **** } /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute ! * ForkJoinTasks. */ public void run() { Throwable exception = null; try { onStart(); --- 426,436 ---- } /** * This method is required to be public, but should never be * called explicitly. It performs the main run loop to execute ! * {@link ForkJoinTask}s. */ public void run() { Throwable exception = null; try { onStart();
*** 626,635 **** --- 628,650 ---- long u = (i << qShift) + qBase; // raw offset ForkJoinTask<?> t = q[i]; if (t == null) // lost to stealer break; if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + /* + * Note: here and in related methods, as a + * performance (not correctness) issue, we'd like + * to encourage compiler not to arbitrarily + * postpone setting sp after successful CAS. + * Currently there is no intrinsic for arranging + * this, but using Unsafe putOrderedInt may be a + * preferable strategy on some compilers even + * though its main effect is a pre-, not post- + * fence. To simplify possible changes, the option + * is left in comments next to the associated + * assignments. + */ sp = s; // putOrderedInt may encourage more timely write // UNSAFE.putOrderedInt(this, spOffset, s); return t; } }
*** 882,904 **** * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks ! if (cj != null) { ! currentJoin = null; cj.cancelIgnoringExceptions(); try { this.interrupt(); // awaken wait } catch (SecurityException ignore) { } } ForkJoinTask<?> cs = currentSteal; ! if (cs != null) { ! currentSteal = null; cs.cancelIgnoringExceptions(); - } while (base != sp) { ForkJoinTask<?> t = deqTask(); if (t != null) t.cancelIgnoringExceptions(); } --- 897,916 ---- * Removes and cancels all tasks in queue. Can be called from any * thread. */ final void cancelTasks() { ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks ! if (cj != null && cj.status >= 0) { cj.cancelIgnoringExceptions(); try { this.interrupt(); // awaken wait } catch (SecurityException ignore) { } } ForkJoinTask<?> cs = currentSteal; ! if (cs != null && cs.status >= 0) cs.cancelIgnoringExceptions(); while (base != sp) { ForkJoinTask<?> t = deqTask(); if (t != null) t.cancelIgnoringExceptions(); }
*** 957,1098 **** /** * Possibly runs some tasks and/or blocks, until task is done. * * @param joinMe the task to join */ ! final void joinTask(ForkJoinTask<?> joinMe) { // currentJoin only written by this thread; only need ordered store ForkJoinTask<?> prevJoin = currentJoin; UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); ! if (sp != base) ! localHelpJoinTask(joinMe); ! if (joinMe.status >= 0) ! pool.awaitJoin(joinMe, this); UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); } /** ! * Run tasks in local queue until given task is done. * * @param joinMe the task to join */ ! private void localHelpJoinTask(ForkJoinTask<?> joinMe) { ! int s; ForkJoinTask<?>[] q; ! while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) { int i = (q.length - 1) & --s; long u = (i << qShift) + qBase; // raw offset ForkJoinTask<?> t = q[i]; ! if (t == null) // lost to a stealer ! break; ! if (UNSAFE.compareAndSwapObject(q, u, t, null)) { ! /* ! * This recheck (and similarly in helpJoinTask) ! * handles cases where joinMe is independently ! * cancelled or forced even though there is other work ! * available. Back out of the pop by putting t back ! * into slot before we commit by writing sp. ! */ ! if (joinMe.status < 0) { ! UNSAFE.putObjectVolatile(q, u, t); ! break; ! } ! sp = s; // UNSAFE.putOrderedInt(this, spOffset, s); t.quietlyExec(); } } } ! /** ! * Unless terminating, tries to locate and help perform tasks for ! * a stealer of the given task, or in turn one of its stealers. ! * Traces currentSteal->currentJoin links looking for a thread ! * working on a descendant of the given task and with a non-empty ! * queue to steal back and execute tasks from. ! * ! * The implementation is very branchy to cope with potential ! * inconsistencies or loops encountering chains that are stale, ! * unknown, or of length greater than MAX_HELP_DEPTH links. All ! * of these cases are dealt with by just returning back to the ! * caller, who is expected to retry if other join mechanisms also ! * don't work out. ! * ! * @param joinMe the task to join ! */ ! final void helpJoinTask(ForkJoinTask<?> joinMe) { ! ForkJoinWorkerThread[] ws; ! int n; ! if (joinMe.status < 0) // already done ! return; ! if ((runState & TERMINATING) != 0) { // cancel if shutting down ! joinMe.cancelIgnoringExceptions(); ! return; ! } ! if ((ws = pool.workers) == null || (n = ws.length) <= 1) ! return; // need at least 2 workers ! ForkJoinTask<?> task = joinMe; // base of chain ForkJoinWorkerThread thread = this; // thread with stolen task ! for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length // Try to find v, the stealer of task, by first using hint ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; if (v == null || v.currentSteal != task) { for (int j = 0; ; ++j) { // search array if (j < n) { ForkJoinTask<?> vs; if ((v = ws[j]) != null && (vs = v.currentSteal) != null) { ! if (joinMe.status < 0 || task.status < 0) ! return; // stale or done if (vs == task) { thread.stealHint = j; break; // save hint for next time } } } else ! return; // no stealer } } ! for (;;) { // Try to help v, using specialized form of deqTask if (joinMe.status < 0) ! return; int b = v.base; ForkJoinTask<?>[] q = v.queue; if (b == v.sp || q == null) ! break; int i = (q.length - 1) & b; long u = (i << qShift) + qBase; ForkJoinTask<?> t = q[i]; - int pid = poolIndex; - ForkJoinTask<?> ps = currentSteal; if (task.status < 0) ! return; // stale or done ! if (t != null && v.base == b++ && UNSAFE.compareAndSwapObject(q, u, t, null)) { ! if (joinMe.status < 0) { UNSAFE.putObjectVolatile(q, u, t); ! return; // back out on cancel } v.base = b; v.stealHint = pid; ! UNSAFE.putOrderedObject(this, currentStealOffset, t); t.quietlyExec(); ! UNSAFE.putOrderedObject(this, currentStealOffset, ps); } } // Try to descend to find v's stealer ForkJoinTask<?> next = v.currentJoin; if (task.status < 0 || next == null || next == task || joinMe.status < 0) ! return; task = next; thread = v; } } /** * Implements ForkJoinTask.getSurplusQueuedTaskCount(). * Returns an estimate of the number of tasks, offset by a * function of number of idle workers. --- 969,1126 ---- /** * Possibly runs some tasks and/or blocks, until task is done. * * @param joinMe the task to join + * @param timed true if use timed wait + * @param nanos wait time if timed */ ! final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) { // currentJoin only written by this thread; only need ordered store ForkJoinTask<?> prevJoin = currentJoin; UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); ! pool.awaitJoin(joinMe, this, timed, nanos); UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); } /** ! * Tries to locate and help perform tasks for a stealer of the ! * given task, or in turn one of its stealers. Traces ! * currentSteal->currentJoin links looking for a thread working on ! * a descendant of the given task and with a non-empty queue to ! * steal back and execute tasks from. * + * The implementation is very branchy to cope with potential + * inconsistencies or loops encountering chains that are stale, + * unknown, or of length greater than MAX_HELP_DEPTH links. All + * of these cases are dealt with by just returning back to the + * caller, who is expected to retry if other join mechanisms also + * don't work out. + * * @param joinMe the task to join + * @param running if false, then must update pool count upon + * running a task + * @return value of running on exit */ ! final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) { ! /* ! * Initial checks to (1) abort if terminating; (2) clean out ! * old cancelled tasks from local queue; (3) if joinMe is next ! * task, run it; (4) omit scan if local queue nonempty (since ! * it may contain non-descendents of joinMe). ! */ ! ForkJoinPool p = pool; ! for (;;) { ForkJoinTask<?>[] q; ! int s; ! if (joinMe.status < 0) ! return running; ! else if ((runState & TERMINATING) != 0) { ! joinMe.cancelIgnoringExceptions(); ! return running; ! } ! else if ((s = sp) == base || (q = queue) == null) ! break; // queue empty ! else { int i = (q.length - 1) & --s; long u = (i << qShift) + qBase; // raw offset ForkJoinTask<?> t = q[i]; ! if (t == null) ! break; // lost to a stealer ! else if (t != joinMe && t.status >= 0) ! return running; // cannot safely help ! else if ((running || ! (running = p.tryIncrementRunningCount())) && ! UNSAFE.compareAndSwapObject(q, u, t, null)) { ! sp = s; // putOrderedInt may encourage more timely write // UNSAFE.putOrderedInt(this, spOffset, s); t.quietlyExec(); } } } ! int n; // worker array size ! ForkJoinWorkerThread[] ws = p.workers; ! if (ws != null && (n = ws.length) > 1) { // need at least 2 workers ForkJoinTask<?> task = joinMe; // base of chain ForkJoinWorkerThread thread = this; // thread with stolen task ! ! outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length // Try to find v, the stealer of task, by first using hint ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)]; if (v == null || v.currentSteal != task) { for (int j = 0; ; ++j) { // search array if (j < n) { ForkJoinTask<?> vs; if ((v = ws[j]) != null && (vs = v.currentSteal) != null) { ! if (joinMe.status < 0) ! break outer; if (vs == task) { + if (task.status < 0) + break outer; // stale thread.stealHint = j; break; // save hint for next time } } } else ! break outer; // no stealer } } ! ! // Try to help v, using specialized form of deqTask ! for (;;) { if (joinMe.status < 0) ! break outer; int b = v.base; ForkJoinTask<?>[] q = v.queue; if (b == v.sp || q == null) ! break; // empty int i = (q.length - 1) & b; long u = (i << qShift) + qBase; ForkJoinTask<?> t = q[i]; if (task.status < 0) ! break outer; // stale ! if (t != null && ! (running || ! (running = p.tryIncrementRunningCount())) && ! v.base == b++ && UNSAFE.compareAndSwapObject(q, u, t, null)) { ! if (t != joinMe && joinMe.status < 0) { UNSAFE.putObjectVolatile(q, u, t); ! break outer; // joinMe cancelled; back out } v.base = b; + if (t.status >= 0) { + ForkJoinTask<?> ps = currentSteal; + int pid = poolIndex; v.stealHint = pid; ! UNSAFE.putOrderedObject(this, ! currentStealOffset, t); t.quietlyExec(); ! UNSAFE.putOrderedObject(this, ! currentStealOffset, ps); } } + else if ((runState & TERMINATING) != 0) { + joinMe.cancelIgnoringExceptions(); + break outer; + } + } + // Try to descend to find v's stealer ForkJoinTask<?> next = v.currentJoin; if (task.status < 0 || next == null || next == task || joinMe.status < 0) ! break; // done, stale, dead-end, or cyclic task = next; thread = v; } } + return running; + } /** * Implements ForkJoinTask.getSurplusQueuedTaskCount(). * Returns an estimate of the number of tasks, offset by a * function of number of idle workers.