21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36 package java.util.concurrent;
37
38 import java.util.Random;
39 import java.util.Collection;
40 import java.util.concurrent.locks.LockSupport;
41
42 /**
43 * A thread managed by a {@link ForkJoinPool}. This class is
44 * subclassable solely for the sake of adding functionality -- there
45 * are no overridable methods dealing with scheduling or execution.
46 * However, you can override initialization and termination methods
47 * surrounding the main task processing loop. If you do create such a
48 * subclass, you will also need to supply a custom {@link
49 * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
50 * ForkJoinPool}.
51 *
52 * @since 1.7
53 * @author Doug Lea
54 */
55 public class ForkJoinWorkerThread extends Thread {
56 /*
57 * Overview:
58 *
59 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
60 * ForkJoinTasks. This class includes bookkeeping in support of
61 * worker activation, suspension, and lifecycle control described
62 * in more detail in the internal documentation of class
63 * ForkJoinPool. And as described further below, this class also
64 * includes special-cased support for some ForkJoinTask
65 * methods. But the main mechanics involve work-stealing:
66 *
67 * Work-stealing queues are special forms of Deques that support
68 * only three of the four possible end-operations -- push, pop,
69 * and deq (aka steal), under the further constraints that push
70 * and pop are called only from the owning thread, while deq may
359 public ForkJoinPool getPool() {
360 return pool;
361 }
362
363 /**
364 * Returns the index number of this thread in its pool. The
365 * returned value ranges from zero to the maximum number of
366 * threads (minus one) that have ever been created in the pool.
367 * This method may be useful for applications that track status or
368 * collect results per-worker rather than per-task.
369 *
370 * @return the index number
371 */
372 public int getPoolIndex() {
373 return poolIndex;
374 }
375
376 /**
377 * Initializes internal state after construction but before
378 * processing any tasks. If you override this method, you must
379 * invoke @code{super.onStart()} at the beginning of the method.
380 * Initialization requires care: Most fields must have legal
381 * default values, to ensure that attempted accesses from other
382 * threads work correctly even before this thread starts
383 * processing tasks.
384 */
385 protected void onStart() {
386 int rs = seedGenerator.nextInt();
387 seed = rs == 0? 1 : rs; // seed must be nonzero
388
389 // Allocate name string and arrays in this thread
390 String pid = Integer.toString(pool.getPoolNumber());
391 String wid = Integer.toString(poolIndex);
392 setName("ForkJoinPool-" + pid + "-worker-" + wid);
393
394 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
395 }
396
397 /**
398 * Performs cleanup associated with termination of this worker
399 * thread. If you override this method, you must invoke
400 * {@code super.onTermination} at the end of the overridden method.
401 *
402 * @param exception the exception causing this thread to abort due
403 * to an unrecoverable error, or {@code null} if completed normally
404 */
405 protected void onTermination(Throwable exception) {
406 try {
407 ForkJoinPool p = pool;
409 int a; // inline p.tryDecrementActiveCount
410 active = false;
411 do {} while (!UNSAFE.compareAndSwapInt
412 (p, poolRunStateOffset, a = p.runState, a - 1));
413 }
414 cancelTasks();
415 setTerminated();
416 p.workerTerminated(this);
417 } catch (Throwable ex) { // Shouldn't ever happen
418 if (exception == null) // but if so, at least rethrown
419 exception = ex;
420 } finally {
421 if (exception != null)
422 UNSAFE.throwException(exception);
423 }
424 }
425
426 /**
427 * This method is required to be public, but should never be
428 * called explicitly. It performs the main run loop to execute
429 * ForkJoinTasks.
430 */
431 public void run() {
432 Throwable exception = null;
433 try {
434 onStart();
435 mainLoop();
436 } catch (Throwable ex) {
437 exception = ex;
438 } finally {
439 onTermination(exception);
440 }
441 }
442
443 // helpers for run()
444
445 /**
446 * Finds and executes tasks, and checks status while running.
447 */
448 private void mainLoop() {
449 boolean ran = false; // true if ran a task on last step
611 }
612 }
613 return null;
614 }
615
616 /**
617 * Returns a popped task, or null if empty. Assumes active status.
618 * Called only by this thread.
619 */
620 private ForkJoinTask<?> popTask() {
621 ForkJoinTask<?>[] q = queue;
622 if (q != null) {
623 int s;
624 while ((s = sp) != base) {
625 int i = (q.length - 1) & --s;
626 long u = (i << qShift) + qBase; // raw offset
627 ForkJoinTask<?> t = q[i];
628 if (t == null) // lost to stealer
629 break;
630 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
631 sp = s; // putOrderedInt may encourage more timely write
632 // UNSAFE.putOrderedInt(this, spOffset, s);
633 return t;
634 }
635 }
636 }
637 return null;
638 }
639
640 /**
641 * Specialized version of popTask to pop only if topmost element
642 * is the given task. Called only by this thread while active.
643 *
644 * @param t the task. Caller must ensure non-null.
645 */
646 final boolean unpushTask(ForkJoinTask<?> t) {
647 int s;
648 ForkJoinTask<?>[] q = queue;
649 if ((s = sp) != base && q != null &&
650 UNSAFE.compareAndSwapObject
867 }
868 }
869
870 // Misc support methods for ForkJoinPool
871
872 /**
873 * Returns an estimate of the number of tasks in the queue. Also
874 * used by ForkJoinTask.
875 */
876 final int getQueueSize() {
877 int n; // external calls must read base first
878 return (n = -base + sp) <= 0 ? 0 : n;
879 }
880
881 /**
882 * Removes and cancels all tasks in queue. Can be called from any
883 * thread.
884 */
885 final void cancelTasks() {
886 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
887 if (cj != null) {
888 currentJoin = null;
889 cj.cancelIgnoringExceptions();
890 try {
891 this.interrupt(); // awaken wait
892 } catch (SecurityException ignore) {
893 }
894 }
895 ForkJoinTask<?> cs = currentSteal;
896 if (cs != null) {
897 currentSteal = null;
898 cs.cancelIgnoringExceptions();
899 }
900 while (base != sp) {
901 ForkJoinTask<?> t = deqTask();
902 if (t != null)
903 t.cancelIgnoringExceptions();
904 }
905 }
906
907 /**
908 * Drains tasks to given collection c.
909 *
910 * @return the number of tasks drained
911 */
912 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
913 int n = 0;
914 while (base != sp) {
915 ForkJoinTask<?> t = deqTask();
916 if (t != null) {
917 c.add(t);
918 ++n;
919 }
942
943 /**
944 * Gets and removes a local or stolen task.
945 *
946 * @return a task, if available
947 */
948 final ForkJoinTask<?> pollTask() {
949 ForkJoinTask<?> t = pollLocalTask();
950 if (t == null) {
951 t = scan();
952 // cannot retain/track/help steal
953 UNSAFE.putOrderedObject(this, currentStealOffset, null);
954 }
955 return t;
956 }
957
958 /**
959 * Possibly runs some tasks and/or blocks, until task is done.
960 *
961 * @param joinMe the task to join
962 */
963 final void joinTask(ForkJoinTask<?> joinMe) {
964 // currentJoin only written by this thread; only need ordered store
965 ForkJoinTask<?> prevJoin = currentJoin;
966 UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
967 if (sp != base)
968 localHelpJoinTask(joinMe);
969 if (joinMe.status >= 0)
970 pool.awaitJoin(joinMe, this);
971 UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
972 }
973
974 /**
975 * Run tasks in local queue until given task is done.
976 *
977 * @param joinMe the task to join
978 */
979 private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
980 int s;
981 ForkJoinTask<?>[] q;
982 while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
983 int i = (q.length - 1) & --s;
984 long u = (i << qShift) + qBase; // raw offset
985 ForkJoinTask<?> t = q[i];
986 if (t == null) // lost to a stealer
987 break;
988 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
989 /*
990 * This recheck (and similarly in helpJoinTask)
991 * handles cases where joinMe is independently
992 * cancelled or forced even though there is other work
993 * available. Back out of the pop by putting t back
994 * into slot before we commit by writing sp.
995 */
996 if (joinMe.status < 0) {
997 UNSAFE.putObjectVolatile(q, u, t);
998 break;
999 }
1000 sp = s;
1001 // UNSAFE.putOrderedInt(this, spOffset, s);
1002 t.quietlyExec();
1003 }
1004 }
1005 }
1006
1007 /**
1008 * Unless terminating, tries to locate and help perform tasks for
1009 * a stealer of the given task, or in turn one of its stealers.
1010 * Traces currentSteal->currentJoin links looking for a thread
1011 * working on a descendant of the given task and with a non-empty
1012 * queue to steal back and execute tasks from.
1013 *
1014 * The implementation is very branchy to cope with potential
1015 * inconsistencies or loops encountering chains that are stale,
1016 * unknown, or of length greater than MAX_HELP_DEPTH links. All
1017 * of these cases are dealt with by just returning back to the
1018 * caller, who is expected to retry if other join mechanisms also
1019 * don't work out.
1020 *
1021 * @param joinMe the task to join
1022 */
1023 final void helpJoinTask(ForkJoinTask<?> joinMe) {
1024 ForkJoinWorkerThread[] ws;
1025 int n;
1026 if (joinMe.status < 0) // already done
1027 return;
1028 if ((runState & TERMINATING) != 0) { // cancel if shutting down
1029 joinMe.cancelIgnoringExceptions();
1030 return;
1031 }
1032 if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1033 return; // need at least 2 workers
1034
1035 ForkJoinTask<?> task = joinMe; // base of chain
1036 ForkJoinWorkerThread thread = this; // thread with stolen task
1037 for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1038 // Try to find v, the stealer of task, by first using hint
1039 ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1040 if (v == null || v.currentSteal != task) {
1041 for (int j = 0; ; ++j) { // search array
1042 if (j < n) {
1043 ForkJoinTask<?> vs;
1044 if ((v = ws[j]) != null &&
1045 (vs = v.currentSteal) != null) {
1046 if (joinMe.status < 0 || task.status < 0)
1047 return; // stale or done
1048 if (vs == task) {
1049 thread.stealHint = j;
1050 break; // save hint for next time
1051 }
1052 }
1053 }
1054 else
1055 return; // no stealer
1056 }
1057 }
1058 for (;;) { // Try to help v, using specialized form of deqTask
1059 if (joinMe.status < 0)
1060 return;
1061 int b = v.base;
1062 ForkJoinTask<?>[] q = v.queue;
1063 if (b == v.sp || q == null)
1064 break;
1065 int i = (q.length - 1) & b;
1066 long u = (i << qShift) + qBase;
1067 ForkJoinTask<?> t = q[i];
1068 int pid = poolIndex;
1069 ForkJoinTask<?> ps = currentSteal;
1070 if (task.status < 0)
1071 return; // stale or done
1072 if (t != null && v.base == b++ &&
1073 UNSAFE.compareAndSwapObject(q, u, t, null)) {
1074 if (joinMe.status < 0) {
1075 UNSAFE.putObjectVolatile(q, u, t);
1076 return; // back out on cancel
1077 }
1078 v.base = b;
1079 v.stealHint = pid;
1080 UNSAFE.putOrderedObject(this, currentStealOffset, t);
1081 t.quietlyExec();
1082 UNSAFE.putOrderedObject(this, currentStealOffset, ps);
1083 }
1084 }
1085 // Try to descend to find v's stealer
1086 ForkJoinTask<?> next = v.currentJoin;
1087 if (task.status < 0 || next == null || next == task ||
1088 joinMe.status < 0)
1089 return;
1090 task = next;
1091 thread = v;
1092 }
1093 }
1094
1095 /**
1096 * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1097 * Returns an estimate of the number of tasks, offset by a
1098 * function of number of idle workers.
1099 *
1100 * This method provides a cheap heuristic guide for task
1101 * partitioning when programmers, frameworks, tools, or languages
1102 * have little or no idea about task granularity. In essence by
1103 * offering this method, we ask users only about tradeoffs in
1104 * overhead vs expected throughput and its variance, rather than
1105 * how finely to partition tasks.
1106 *
1107 * In a steady state strict (tree-structured) computation, each
1108 * thread makes available for stealing enough tasks for other
1109 * threads to remain active. Inductively, if all threads play by
1110 * the same rules, each thread should make available only a
1111 * constant number of tasks.
1112 *
1113 * The minimum useful constant is just 1. But using a value of 1
|
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/licenses/publicdomain
34 */
35
36 package java.util.concurrent;
37
38 import java.util.Random;
39 import java.util.Collection;
40 import java.util.concurrent.locks.LockSupport;
41 import java.util.concurrent.RejectedExecutionException;
42
43 /**
44 * A thread managed by a {@link ForkJoinPool}, which executes
45 * {@link ForkJoinTask}s.
46 * This class is subclassable solely for the sake of adding
47 * functionality -- there are no overridable methods dealing with
48 * scheduling or execution. However, you can override initialization
49 * and termination methods surrounding the main task processing loop.
50 * If you do create such a subclass, you will also need to supply a
51 * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
52 * in a {@code ForkJoinPool}.
53 *
54 * @since 1.7
55 * @author Doug Lea
56 */
57 public class ForkJoinWorkerThread extends Thread {
58 /*
59 * Overview:
60 *
61 * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
62 * ForkJoinTasks. This class includes bookkeeping in support of
63 * worker activation, suspension, and lifecycle control described
64 * in more detail in the internal documentation of class
65 * ForkJoinPool. And as described further below, this class also
66 * includes special-cased support for some ForkJoinTask
67 * methods. But the main mechanics involve work-stealing:
68 *
69 * Work-stealing queues are special forms of Deques that support
70 * only three of the four possible end-operations -- push, pop,
71 * and deq (aka steal), under the further constraints that push
72 * and pop are called only from the owning thread, while deq may
361 public ForkJoinPool getPool() {
362 return pool;
363 }
364
365 /**
366 * Returns the index number of this thread in its pool. The
367 * returned value ranges from zero to the maximum number of
368 * threads (minus one) that have ever been created in the pool.
369 * This method may be useful for applications that track status or
370 * collect results per-worker rather than per-task.
371 *
372 * @return the index number
373 */
374 public int getPoolIndex() {
375 return poolIndex;
376 }
377
378 /**
379 * Initializes internal state after construction but before
380 * processing any tasks. If you override this method, you must
381 * invoke {@code super.onStart()} at the beginning of the method.
382 * Initialization requires care: Most fields must have legal
383 * default values, to ensure that attempted accesses from other
384 * threads work correctly even before this thread starts
385 * processing tasks.
386 */
387 protected void onStart() {
388 int rs = seedGenerator.nextInt();
389 seed = (rs == 0) ? 1 : rs; // seed must be nonzero
390
391 // Allocate name string and arrays in this thread
392 String pid = Integer.toString(pool.getPoolNumber());
393 String wid = Integer.toString(poolIndex);
394 setName("ForkJoinPool-" + pid + "-worker-" + wid);
395
396 queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
397 }
398
399 /**
400 * Performs cleanup associated with termination of this worker
401 * thread. If you override this method, you must invoke
402 * {@code super.onTermination} at the end of the overridden method.
403 *
404 * @param exception the exception causing this thread to abort due
405 * to an unrecoverable error, or {@code null} if completed normally
406 */
407 protected void onTermination(Throwable exception) {
408 try {
409 ForkJoinPool p = pool;
411 int a; // inline p.tryDecrementActiveCount
412 active = false;
413 do {} while (!UNSAFE.compareAndSwapInt
414 (p, poolRunStateOffset, a = p.runState, a - 1));
415 }
416 cancelTasks();
417 setTerminated();
418 p.workerTerminated(this);
419 } catch (Throwable ex) { // Shouldn't ever happen
420 if (exception == null) // but if so, at least rethrown
421 exception = ex;
422 } finally {
423 if (exception != null)
424 UNSAFE.throwException(exception);
425 }
426 }
427
428 /**
429 * This method is required to be public, but should never be
430 * called explicitly. It performs the main run loop to execute
431 * {@link ForkJoinTask}s.
432 */
433 public void run() {
434 Throwable exception = null;
435 try {
436 onStart();
437 mainLoop();
438 } catch (Throwable ex) {
439 exception = ex;
440 } finally {
441 onTermination(exception);
442 }
443 }
444
445 // helpers for run()
446
447 /**
448 * Finds and executes tasks, and checks status while running.
449 */
450 private void mainLoop() {
451 boolean ran = false; // true if ran a task on last step
613 }
614 }
615 return null;
616 }
617
618 /**
619 * Returns a popped task, or null if empty. Assumes active status.
620 * Called only by this thread.
621 */
622 private ForkJoinTask<?> popTask() {
623 ForkJoinTask<?>[] q = queue;
624 if (q != null) {
625 int s;
626 while ((s = sp) != base) {
627 int i = (q.length - 1) & --s;
628 long u = (i << qShift) + qBase; // raw offset
629 ForkJoinTask<?> t = q[i];
630 if (t == null) // lost to stealer
631 break;
632 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
633 /*
634 * Note: here and in related methods, as a
635 * performance (not correctness) issue, we'd like
636 * to encourage compiler not to arbitrarily
637 * postpone setting sp after successful CAS.
638 * Currently there is no intrinsic for arranging
639 * this, but using Unsafe putOrderedInt may be a
640 * preferable strategy on some compilers even
641 * though its main effect is a pre-, not post-
642 * fence. To simplify possible changes, the option
643 * is left in comments next to the associated
644 * assignments.
645 */
646 sp = s; // putOrderedInt may encourage more timely write
647 // UNSAFE.putOrderedInt(this, spOffset, s);
648 return t;
649 }
650 }
651 }
652 return null;
653 }
654
655 /**
656 * Specialized version of popTask to pop only if topmost element
657 * is the given task. Called only by this thread while active.
658 *
659 * @param t the task. Caller must ensure non-null.
660 */
661 final boolean unpushTask(ForkJoinTask<?> t) {
662 int s;
663 ForkJoinTask<?>[] q = queue;
664 if ((s = sp) != base && q != null &&
665 UNSAFE.compareAndSwapObject
882 }
883 }
884
885 // Misc support methods for ForkJoinPool
886
887 /**
888 * Returns an estimate of the number of tasks in the queue. Also
889 * used by ForkJoinTask.
890 */
891 final int getQueueSize() {
892 int n; // external calls must read base first
893 return (n = -base + sp) <= 0 ? 0 : n;
894 }
895
896 /**
897 * Removes and cancels all tasks in queue. Can be called from any
898 * thread.
899 */
900 final void cancelTasks() {
901 ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
902 if (cj != null && cj.status >= 0) {
903 cj.cancelIgnoringExceptions();
904 try {
905 this.interrupt(); // awaken wait
906 } catch (SecurityException ignore) {
907 }
908 }
909 ForkJoinTask<?> cs = currentSteal;
910 if (cs != null && cs.status >= 0)
911 cs.cancelIgnoringExceptions();
912 while (base != sp) {
913 ForkJoinTask<?> t = deqTask();
914 if (t != null)
915 t.cancelIgnoringExceptions();
916 }
917 }
918
919 /**
920 * Drains tasks to given collection c.
921 *
922 * @return the number of tasks drained
923 */
924 final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
925 int n = 0;
926 while (base != sp) {
927 ForkJoinTask<?> t = deqTask();
928 if (t != null) {
929 c.add(t);
930 ++n;
931 }
954
955 /**
956 * Gets and removes a local or stolen task.
957 *
958 * @return a task, if available
959 */
960 final ForkJoinTask<?> pollTask() {
961 ForkJoinTask<?> t = pollLocalTask();
962 if (t == null) {
963 t = scan();
964 // cannot retain/track/help steal
965 UNSAFE.putOrderedObject(this, currentStealOffset, null);
966 }
967 return t;
968 }
969
970 /**
971 * Possibly runs some tasks and/or blocks, until task is done.
972 *
973 * @param joinMe the task to join
974 * @param timed true if use timed wait
975 * @param nanos wait time if timed
976 */
977 final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
978 // currentJoin only written by this thread; only need ordered store
979 ForkJoinTask<?> prevJoin = currentJoin;
980 UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
981 pool.awaitJoin(joinMe, this, timed, nanos);
982 UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
983 }
984
985 /**
986 * Tries to locate and help perform tasks for a stealer of the
987 * given task, or in turn one of its stealers. Traces
988 * currentSteal->currentJoin links looking for a thread working on
989 * a descendant of the given task and with a non-empty queue to
990 * steal back and execute tasks from.
991 *
992 * The implementation is very branchy to cope with potential
993 * inconsistencies or loops encountering chains that are stale,
994 * unknown, or of length greater than MAX_HELP_DEPTH links. All
995 * of these cases are dealt with by just returning back to the
996 * caller, who is expected to retry if other join mechanisms also
997 * don't work out.
998 *
999 * @param joinMe the task to join
1000 * @param running if false, then must update pool count upon
1001 * running a task
1002 * @return value of running on exit
1003 */
1004 final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
1005 /*
1006 * Initial checks to (1) abort if terminating; (2) clean out
1007 * old cancelled tasks from local queue; (3) if joinMe is next
1008 * task, run it; (4) omit scan if local queue nonempty (since
1009 * it may contain non-descendents of joinMe).
1010 */
1011 ForkJoinPool p = pool;
1012 for (;;) {
1013 ForkJoinTask<?>[] q;
1014 int s;
1015 if (joinMe.status < 0)
1016 return running;
1017 else if ((runState & TERMINATING) != 0) {
1018 joinMe.cancelIgnoringExceptions();
1019 return running;
1020 }
1021 else if ((s = sp) == base || (q = queue) == null)
1022 break; // queue empty
1023 else {
1024 int i = (q.length - 1) & --s;
1025 long u = (i << qShift) + qBase; // raw offset
1026 ForkJoinTask<?> t = q[i];
1027 if (t == null)
1028 break; // lost to a stealer
1029 else if (t != joinMe && t.status >= 0)
1030 return running; // cannot safely help
1031 else if ((running ||
1032 (running = p.tryIncrementRunningCount())) &&
1033 UNSAFE.compareAndSwapObject(q, u, t, null)) {
1034 sp = s; // putOrderedInt may encourage more timely write
1035 // UNSAFE.putOrderedInt(this, spOffset, s);
1036 t.quietlyExec();
1037 }
1038 }
1039 }
1040
1041 int n; // worker array size
1042 ForkJoinWorkerThread[] ws = p.workers;
1043 if (ws != null && (n = ws.length) > 1) { // need at least 2 workers
1044 ForkJoinTask<?> task = joinMe; // base of chain
1045 ForkJoinWorkerThread thread = this; // thread with stolen task
1046
1047 outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1048 // Try to find v, the stealer of task, by first using hint
1049 ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1050 if (v == null || v.currentSteal != task) {
1051 for (int j = 0; ; ++j) { // search array
1052 if (j < n) {
1053 ForkJoinTask<?> vs;
1054 if ((v = ws[j]) != null &&
1055 (vs = v.currentSteal) != null) {
1056 if (joinMe.status < 0)
1057 break outer;
1058 if (vs == task) {
1059 if (task.status < 0)
1060 break outer; // stale
1061 thread.stealHint = j;
1062 break; // save hint for next time
1063 }
1064 }
1065 }
1066 else
1067 break outer; // no stealer
1068 }
1069 }
1070
1071 // Try to help v, using specialized form of deqTask
1072 for (;;) {
1073 if (joinMe.status < 0)
1074 break outer;
1075 int b = v.base;
1076 ForkJoinTask<?>[] q = v.queue;
1077 if (b == v.sp || q == null)
1078 break; // empty
1079 int i = (q.length - 1) & b;
1080 long u = (i << qShift) + qBase;
1081 ForkJoinTask<?> t = q[i];
1082 if (task.status < 0)
1083 break outer; // stale
1084 if (t != null &&
1085 (running ||
1086 (running = p.tryIncrementRunningCount())) &&
1087 v.base == b++ &&
1088 UNSAFE.compareAndSwapObject(q, u, t, null)) {
1089 if (t != joinMe && joinMe.status < 0) {
1090 UNSAFE.putObjectVolatile(q, u, t);
1091 break outer; // joinMe cancelled; back out
1092 }
1093 v.base = b;
1094 if (t.status >= 0) {
1095 ForkJoinTask<?> ps = currentSteal;
1096 int pid = poolIndex;
1097 v.stealHint = pid;
1098 UNSAFE.putOrderedObject(this,
1099 currentStealOffset, t);
1100 t.quietlyExec();
1101 UNSAFE.putOrderedObject(this,
1102 currentStealOffset, ps);
1103 }
1104 }
1105 else if ((runState & TERMINATING) != 0) {
1106 joinMe.cancelIgnoringExceptions();
1107 break outer;
1108 }
1109 }
1110
1111 // Try to descend to find v's stealer
1112 ForkJoinTask<?> next = v.currentJoin;
1113 if (task.status < 0 || next == null || next == task ||
1114 joinMe.status < 0)
1115 break; // done, stale, dead-end, or cyclic
1116 task = next;
1117 thread = v;
1118 }
1119 }
1120 return running;
1121 }
1122
1123 /**
1124 * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1125 * Returns an estimate of the number of tasks, offset by a
1126 * function of number of idle workers.
1127 *
1128 * This method provides a cheap heuristic guide for task
1129 * partitioning when programmers, frameworks, tools, or languages
1130 * have little or no idea about task granularity. In essence by
1131 * offering this method, we ask users only about tradeoffs in
1132 * overhead vs expected throughput and its variance, rather than
1133 * how finely to partition tasks.
1134 *
1135 * In a steady state strict (tree-structured) computation, each
1136 * thread makes available for stealing enough tasks for other
1137 * threads to remain active. Inductively, if all threads play by
1138 * the same rules, each thread should make available only a
1139 * constant number of tasks.
1140 *
1141 * The minimum useful constant is just 1. But using a value of 1
|