src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java

Print this page




  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.Random;
  39 import java.util.Collection;
  40 import java.util.concurrent.locks.LockSupport;

  41 
  42 /**
  43  * A thread managed by a {@link ForkJoinPool}.  This class is
  44  * subclassable solely for the sake of adding functionality -- there
  45  * are no overridable methods dealing with scheduling or execution.
  46  * However, you can override initialization and termination methods
  47  * surrounding the main task processing loop.  If you do create such a
  48  * subclass, you will also need to supply a custom {@link
  49  * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
  50  * ForkJoinPool}.

  51  *
  52  * @since 1.7
  53  * @author Doug Lea
  54  */
  55 public class ForkJoinWorkerThread extends Thread {
  56     /*
  57      * Overview:
  58      *
  59      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  60      * ForkJoinTasks. This class includes bookkeeping in support of
  61      * worker activation, suspension, and lifecycle control described
  62      * in more detail in the internal documentation of class
  63      * ForkJoinPool. And as described further below, this class also
  64      * includes special-cased support for some ForkJoinTask
  65      * methods. But the main mechanics involve work-stealing:
  66      *
  67      * Work-stealing queues are special forms of Deques that support
  68      * only three of the four possible end-operations -- push, pop,
  69      * and deq (aka steal), under the further constraints that push
  70      * and pop are called only from the owning thread, while deq may


 359     public ForkJoinPool getPool() {
 360         return pool;
 361     }
 362 
 363     /**
 364      * Returns the index number of this thread in its pool.  The
 365      * returned value ranges from zero to the maximum number of
 366      * threads (minus one) that have ever been created in the pool.
 367      * This method may be useful for applications that track status or
 368      * collect results per-worker rather than per-task.
 369      *
 370      * @return the index number
 371      */
 372     public int getPoolIndex() {
 373         return poolIndex;
 374     }
 375 
 376     /**
 377      * Initializes internal state after construction but before
 378      * processing any tasks. If you override this method, you must
 379      * invoke @code{super.onStart()} at the beginning of the method.
 380      * Initialization requires care: Most fields must have legal
 381      * default values, to ensure that attempted accesses from other
 382      * threads work correctly even before this thread starts
 383      * processing tasks.
 384      */
 385     protected void onStart() {
 386         int rs = seedGenerator.nextInt();
 387         seed = rs == 0? 1 : rs; // seed must be nonzero
 388 
 389         // Allocate name string and arrays in this thread
 390         String pid = Integer.toString(pool.getPoolNumber());
 391         String wid = Integer.toString(poolIndex);
 392         setName("ForkJoinPool-" + pid + "-worker-" + wid);
 393 
 394         queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 395     }
 396 
 397     /**
 398      * Performs cleanup associated with termination of this worker
 399      * thread.  If you override this method, you must invoke
 400      * {@code super.onTermination} at the end of the overridden method.
 401      *
 402      * @param exception the exception causing this thread to abort due
 403      * to an unrecoverable error, or {@code null} if completed normally
 404      */
 405     protected void onTermination(Throwable exception) {
 406         try {
 407             ForkJoinPool p = pool;


 409                 int a; // inline p.tryDecrementActiveCount
 410                 active = false;
 411                 do {} while (!UNSAFE.compareAndSwapInt
 412                              (p, poolRunStateOffset, a = p.runState, a - 1));
 413             }
 414             cancelTasks();
 415             setTerminated();
 416             p.workerTerminated(this);
 417         } catch (Throwable ex) {        // Shouldn't ever happen
 418             if (exception == null)      // but if so, at least rethrown
 419                 exception = ex;
 420         } finally {
 421             if (exception != null)
 422                 UNSAFE.throwException(exception);
 423         }
 424     }
 425 
 426     /**
 427      * This method is required to be public, but should never be
 428      * called explicitly. It performs the main run loop to execute
 429      * ForkJoinTasks.
 430      */
 431     public void run() {
 432         Throwable exception = null;
 433         try {
 434             onStart();
 435             mainLoop();
 436         } catch (Throwable ex) {
 437             exception = ex;
 438         } finally {
 439             onTermination(exception);
 440         }
 441     }
 442 
 443     // helpers for run()
 444 
 445     /**
 446      * Finds and executes tasks, and checks status while running.
 447      */
 448     private void mainLoop() {
 449         boolean ran = false; // true if ran a task on last step


 611             }
 612         }
 613         return null;
 614     }
 615 
 616     /**
 617      * Returns a popped task, or null if empty. Assumes active status.
 618      * Called only by this thread.
 619      */
 620     private ForkJoinTask<?> popTask() {
 621         ForkJoinTask<?>[] q = queue;
 622         if (q != null) {
 623             int s;
 624             while ((s = sp) != base) {
 625                 int i = (q.length - 1) & --s;
 626                 long u = (i << qShift) + qBase; // raw offset
 627                 ForkJoinTask<?> t = q[i];
 628                 if (t == null)   // lost to stealer
 629                     break;
 630                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {













 631                     sp = s; // putOrderedInt may encourage more timely write
 632                     // UNSAFE.putOrderedInt(this, spOffset, s);
 633                     return t;
 634                 }
 635             }
 636         }
 637         return null;
 638     }
 639 
 640     /**
 641      * Specialized version of popTask to pop only if topmost element
 642      * is the given task. Called only by this thread while active.
 643      *
 644      * @param t the task. Caller must ensure non-null.
 645      */
 646     final boolean unpushTask(ForkJoinTask<?> t) {
 647         int s;
 648         ForkJoinTask<?>[] q = queue;
 649         if ((s = sp) != base && q != null &&
 650             UNSAFE.compareAndSwapObject


 867         }
 868     }
 869 
 870     // Misc support methods for ForkJoinPool
 871 
 872     /**
 873      * Returns an estimate of the number of tasks in the queue.  Also
 874      * used by ForkJoinTask.
 875      */
 876     final int getQueueSize() {
 877         int n; // external calls must read base first
 878         return (n = -base + sp) <= 0 ? 0 : n;
 879     }
 880 
 881     /**
 882      * Removes and cancels all tasks in queue.  Can be called from any
 883      * thread.
 884      */
 885     final void cancelTasks() {
 886         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
 887         if (cj != null) {
 888             currentJoin = null;
 889             cj.cancelIgnoringExceptions();
 890             try {
 891                 this.interrupt(); // awaken wait
 892             } catch (SecurityException ignore) {
 893             }
 894         }
 895         ForkJoinTask<?> cs = currentSteal;
 896         if (cs != null) {
 897             currentSteal = null;
 898             cs.cancelIgnoringExceptions();
 899         }
 900         while (base != sp) {
 901             ForkJoinTask<?> t = deqTask();
 902             if (t != null)
 903                 t.cancelIgnoringExceptions();
 904         }
 905     }
 906 
 907     /**
 908      * Drains tasks to given collection c.
 909      *
 910      * @return the number of tasks drained
 911      */
 912     final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
 913         int n = 0;
 914         while (base != sp) {
 915             ForkJoinTask<?> t = deqTask();
 916             if (t != null) {
 917                 c.add(t);
 918                 ++n;
 919             }


 942 
 943     /**
 944      * Gets and removes a local or stolen task.
 945      *
 946      * @return a task, if available
 947      */
 948     final ForkJoinTask<?> pollTask() {
 949         ForkJoinTask<?> t = pollLocalTask();
 950         if (t == null) {
 951             t = scan();
 952             // cannot retain/track/help steal
 953             UNSAFE.putOrderedObject(this, currentStealOffset, null);
 954         }
 955         return t;
 956     }
 957 
 958     /**
 959      * Possibly runs some tasks and/or blocks, until task is done.
 960      *
 961      * @param joinMe the task to join


 962      */
 963     final void joinTask(ForkJoinTask<?> joinMe) {
 964         // currentJoin only written by this thread; only need ordered store
 965         ForkJoinTask<?> prevJoin = currentJoin;
 966         UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
 967         if (sp != base)
 968             localHelpJoinTask(joinMe);
 969         if (joinMe.status >= 0)
 970             pool.awaitJoin(joinMe, this);
 971         UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
 972     }
 973 
 974     /**
 975      * Run tasks in local queue until given task is done.




 976      *







 977      * @param joinMe the task to join



 978      */
 979     private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
 980         int s;







 981         ForkJoinTask<?>[] q;
 982         while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {









 983             int i = (q.length - 1) & --s;
 984             long u = (i << qShift) + qBase; // raw offset
 985             ForkJoinTask<?> t = q[i];
 986             if (t == null)  // lost to a stealer
 987                 break;
 988             if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
 989                 /*
 990                  * This recheck (and similarly in helpJoinTask)
 991                  * handles cases where joinMe is independently
 992                  * cancelled or forced even though there is other work
 993                  * available. Back out of the pop by putting t back
 994                  * into slot before we commit by writing sp.
 995                  */
 996                 if (joinMe.status < 0) {
 997                     UNSAFE.putObjectVolatile(q, u, t);
 998                     break;
 999                 }
1000                 sp = s;
1001                 // UNSAFE.putOrderedInt(this, spOffset, s);
1002                 t.quietlyExec();
1003             }
1004         }
1005     }
1006 
1007     /**
1008      * Unless terminating, tries to locate and help perform tasks for
1009      * a stealer of the given task, or in turn one of its stealers.
1010      * Traces currentSteal->currentJoin links looking for a thread
1011      * working on a descendant of the given task and with a non-empty
1012      * queue to steal back and execute tasks from.
1013      *
1014      * The implementation is very branchy to cope with potential
1015      * inconsistencies or loops encountering chains that are stale,
1016      * unknown, or of length greater than MAX_HELP_DEPTH links.  All
1017      * of these cases are dealt with by just returning back to the
1018      * caller, who is expected to retry if other join mechanisms also
1019      * don't work out.
1020      *
1021      * @param joinMe the task to join
1022      */
1023     final void helpJoinTask(ForkJoinTask<?> joinMe) {
1024         ForkJoinWorkerThread[] ws;
1025         int n;
1026         if (joinMe.status < 0)                // already done
1027             return;
1028         if ((runState & TERMINATING) != 0) {  // cancel if shutting down
1029             joinMe.cancelIgnoringExceptions();
1030             return;
1031         }
1032         if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1033             return;                           // need at least 2 workers
1034 
1035         ForkJoinTask<?> task = joinMe;        // base of chain
1036         ForkJoinWorkerThread thread = this;   // thread with stolen task
1037         for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length

1038             // Try to find v, the stealer of task, by first using hint
1039             ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1040             if (v == null || v.currentSteal != task) {
1041                 for (int j = 0; ; ++j) {      // search array
1042                     if (j < n) {
1043                         ForkJoinTask<?> vs;
1044                         if ((v = ws[j]) != null &&
1045                             (vs = v.currentSteal) != null) {
1046                             if (joinMe.status < 0 || task.status < 0)
1047                                 return;       // stale or done
1048                             if (vs == task) {


1049                                 thread.stealHint = j;
1050                                 break;        // save hint for next time
1051                             }
1052                         }
1053                     }
1054                     else
1055                         return;               // no stealer
1056                 }
1057             }
1058             for (;;) { // Try to help v, using specialized form of deqTask


1059                 if (joinMe.status < 0)
1060                     return;
1061                 int b = v.base;
1062                 ForkJoinTask<?>[] q = v.queue;
1063                 if (b == v.sp || q == null)
1064                     break;
1065                 int i = (q.length - 1) & b;
1066                 long u = (i << qShift) + qBase;
1067                 ForkJoinTask<?> t = q[i];
1068                 int pid = poolIndex;
1069                 ForkJoinTask<?> ps = currentSteal;
1070                 if (task.status < 0)
1071                     return;                   // stale or done
1072                 if (t != null && v.base == b++ &&



1073                     UNSAFE.compareAndSwapObject(q, u, t, null)) {
1074                     if (joinMe.status < 0) {
1075                         UNSAFE.putObjectVolatile(q, u, t);
1076                         return;               // back out on cancel
1077                     }
1078                     v.base = b;



1079                     v.stealHint = pid;
1080                     UNSAFE.putOrderedObject(this, currentStealOffset, t);

1081                     t.quietlyExec();
1082                     UNSAFE.putOrderedObject(this, currentStealOffset, ps);

1083                 }
1084             }






1085             // Try to descend to find v's stealer
1086             ForkJoinTask<?> next = v.currentJoin;
1087             if (task.status < 0 || next == null || next == task ||
1088                 joinMe.status < 0)
1089                 return;
1090             task = next;
1091             thread = v;
1092         }
1093     }


1094 
1095     /**
1096      * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1097      * Returns an estimate of the number of tasks, offset by a
1098      * function of number of idle workers.
1099      *
1100      * This method provides a cheap heuristic guide for task
1101      * partitioning when programmers, frameworks, tools, or languages
1102      * have little or no idea about task granularity.  In essence by
1103      * offering this method, we ask users only about tradeoffs in
1104      * overhead vs expected throughput and its variance, rather than
1105      * how finely to partition tasks.
1106      *
1107      * In a steady state strict (tree-structured) computation, each
1108      * thread makes available for stealing enough tasks for other
1109      * threads to remain active. Inductively, if all threads play by
1110      * the same rules, each thread should make available only a
1111      * constant number of tasks.
1112      *
1113      * The minimum useful constant is just 1. But using a value of 1




  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/licenses/publicdomain
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.Random;
  39 import java.util.Collection;
  40 import java.util.concurrent.locks.LockSupport;
  41 import java.util.concurrent.RejectedExecutionException;
  42 
  43 /**
  44  * A thread managed by a {@link ForkJoinPool}, which executes
  45  * {@link ForkJoinTask}s.
  46  * This class is subclassable solely for the sake of adding
  47  * functionality -- there are no overridable methods dealing with
  48  * scheduling or execution.  However, you can override initialization
  49  * and termination methods surrounding the main task processing loop.
  50  * If you do create such a subclass, you will also need to supply a
  51  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
  52  * in a {@code ForkJoinPool}.
  53  *
  54  * @since 1.7
  55  * @author Doug Lea
  56  */
  57 public class ForkJoinWorkerThread extends Thread {
  58     /*
  59      * Overview:
  60      *
  61      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  62      * ForkJoinTasks. This class includes bookkeeping in support of
  63      * worker activation, suspension, and lifecycle control described
  64      * in more detail in the internal documentation of class
  65      * ForkJoinPool. And as described further below, this class also
  66      * includes special-cased support for some ForkJoinTask
  67      * methods. But the main mechanics involve work-stealing:
  68      *
  69      * Work-stealing queues are special forms of Deques that support
  70      * only three of the four possible end-operations -- push, pop,
  71      * and deq (aka steal), under the further constraints that push
  72      * and pop are called only from the owning thread, while deq may


 361     public ForkJoinPool getPool() {
 362         return pool;
 363     }
 364 
 365     /**
 366      * Returns the index number of this thread in its pool.  The
 367      * returned value ranges from zero to the maximum number of
 368      * threads (minus one) that have ever been created in the pool.
 369      * This method may be useful for applications that track status or
 370      * collect results per-worker rather than per-task.
 371      *
 372      * @return the index number
 373      */
 374     public int getPoolIndex() {
 375         return poolIndex;
 376     }
 377 
 378     /**
 379      * Initializes internal state after construction but before
 380      * processing any tasks. If you override this method, you must
 381      * invoke {@code super.onStart()} at the beginning of the method.
 382      * Initialization requires care: Most fields must have legal
 383      * default values, to ensure that attempted accesses from other
 384      * threads work correctly even before this thread starts
 385      * processing tasks.
 386      */
 387     protected void onStart() {
 388         int rs = seedGenerator.nextInt();
 389         seed = (rs == 0) ? 1 : rs; // seed must be nonzero
 390 
 391         // Allocate name string and arrays in this thread
 392         String pid = Integer.toString(pool.getPoolNumber());
 393         String wid = Integer.toString(poolIndex);
 394         setName("ForkJoinPool-" + pid + "-worker-" + wid);
 395 
 396         queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 397     }
 398 
 399     /**
 400      * Performs cleanup associated with termination of this worker
 401      * thread.  If you override this method, you must invoke
 402      * {@code super.onTermination} at the end of the overridden method.
 403      *
 404      * @param exception the exception causing this thread to abort due
 405      * to an unrecoverable error, or {@code null} if completed normally
 406      */
 407     protected void onTermination(Throwable exception) {
 408         try {
 409             ForkJoinPool p = pool;


 411                 int a; // inline p.tryDecrementActiveCount
 412                 active = false;
 413                 do {} while (!UNSAFE.compareAndSwapInt
 414                              (p, poolRunStateOffset, a = p.runState, a - 1));
 415             }
 416             cancelTasks();
 417             setTerminated();
 418             p.workerTerminated(this);
 419         } catch (Throwable ex) {        // Shouldn't ever happen
 420             if (exception == null)      // but if so, at least rethrown
 421                 exception = ex;
 422         } finally {
 423             if (exception != null)
 424                 UNSAFE.throwException(exception);
 425         }
 426     }
 427 
 428     /**
 429      * This method is required to be public, but should never be
 430      * called explicitly. It performs the main run loop to execute
 431      * {@link ForkJoinTask}s.
 432      */
 433     public void run() {
 434         Throwable exception = null;
 435         try {
 436             onStart();
 437             mainLoop();
 438         } catch (Throwable ex) {
 439             exception = ex;
 440         } finally {
 441             onTermination(exception);
 442         }
 443     }
 444 
 445     // helpers for run()
 446 
 447     /**
 448      * Finds and executes tasks, and checks status while running.
 449      */
 450     private void mainLoop() {
 451         boolean ran = false; // true if ran a task on last step


 613             }
 614         }
 615         return null;
 616     }
 617 
 618     /**
 619      * Returns a popped task, or null if empty. Assumes active status.
 620      * Called only by this thread.
 621      */
 622     private ForkJoinTask<?> popTask() {
 623         ForkJoinTask<?>[] q = queue;
 624         if (q != null) {
 625             int s;
 626             while ((s = sp) != base) {
 627                 int i = (q.length - 1) & --s;
 628                 long u = (i << qShift) + qBase; // raw offset
 629                 ForkJoinTask<?> t = q[i];
 630                 if (t == null)   // lost to stealer
 631                     break;
 632                 if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
 633                     /*
 634                      * Note: here and in related methods, as a
 635                      * performance (not correctness) issue, we'd like
 636                      * to encourage compiler not to arbitrarily
 637                      * postpone setting sp after successful CAS.
 638                      * Currently there is no intrinsic for arranging
 639                      * this, but using Unsafe putOrderedInt may be a
 640                      * preferable strategy on some compilers even
 641                      * though its main effect is a pre-, not post-
 642                      * fence. To simplify possible changes, the option
 643                      * is left in comments next to the associated
 644                      * assignments.
 645                      */
 646                     sp = s; // putOrderedInt may encourage more timely write
 647                     // UNSAFE.putOrderedInt(this, spOffset, s);
 648                     return t;
 649                 }
 650             }
 651         }
 652         return null;
 653     }
 654 
 655     /**
 656      * Specialized version of popTask to pop only if topmost element
 657      * is the given task. Called only by this thread while active.
 658      *
 659      * @param t the task. Caller must ensure non-null.
 660      */
 661     final boolean unpushTask(ForkJoinTask<?> t) {
 662         int s;
 663         ForkJoinTask<?>[] q = queue;
 664         if ((s = sp) != base && q != null &&
 665             UNSAFE.compareAndSwapObject


 882         }
 883     }
 884 
 885     // Misc support methods for ForkJoinPool
 886 
 887     /**
 888      * Returns an estimate of the number of tasks in the queue.  Also
 889      * used by ForkJoinTask.
 890      */
 891     final int getQueueSize() {
 892         int n; // external calls must read base first
 893         return (n = -base + sp) <= 0 ? 0 : n;
 894     }
 895 
 896     /**
 897      * Removes and cancels all tasks in queue.  Can be called from any
 898      * thread.
 899      */
 900     final void cancelTasks() {
 901         ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
 902         if (cj != null && cj.status >= 0) {

 903             cj.cancelIgnoringExceptions();
 904             try {
 905                 this.interrupt(); // awaken wait
 906             } catch (SecurityException ignore) {
 907             }
 908         }
 909         ForkJoinTask<?> cs = currentSteal;
 910         if (cs != null && cs.status >= 0)

 911             cs.cancelIgnoringExceptions();

 912         while (base != sp) {
 913             ForkJoinTask<?> t = deqTask();
 914             if (t != null)
 915                 t.cancelIgnoringExceptions();
 916         }
 917     }
 918 
 919     /**
 920      * Drains tasks to given collection c.
 921      *
 922      * @return the number of tasks drained
 923      */
 924     final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
 925         int n = 0;
 926         while (base != sp) {
 927             ForkJoinTask<?> t = deqTask();
 928             if (t != null) {
 929                 c.add(t);
 930                 ++n;
 931             }


 954 
 955     /**
 956      * Gets and removes a local or stolen task.
 957      *
 958      * @return a task, if available
 959      */
 960     final ForkJoinTask<?> pollTask() {
 961         ForkJoinTask<?> t = pollLocalTask();
 962         if (t == null) {
 963             t = scan();
 964             // cannot retain/track/help steal
 965             UNSAFE.putOrderedObject(this, currentStealOffset, null);
 966         }
 967         return t;
 968     }
 969 
 970     /**
 971      * Possibly runs some tasks and/or blocks, until task is done.
 972      *
 973      * @param joinMe the task to join
 974      * @param timed true if use timed wait
 975      * @param nanos wait time if timed
 976      */
 977     final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
 978         // currentJoin only written by this thread; only need ordered store
 979         ForkJoinTask<?> prevJoin = currentJoin;
 980         UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
 981         pool.awaitJoin(joinMe, this, timed, nanos);



 982         UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
 983     }
 984 
 985     /**
 986      * Tries to locate and help perform tasks for a stealer of the
 987      * given task, or in turn one of its stealers.  Traces
 988      * currentSteal->currentJoin links looking for a thread working on
 989      * a descendant of the given task and with a non-empty queue to
 990      * steal back and execute tasks from.
 991      *
 992      * The implementation is very branchy to cope with potential
 993      * inconsistencies or loops encountering chains that are stale,
 994      * unknown, or of length greater than MAX_HELP_DEPTH links.  All
 995      * of these cases are dealt with by just returning back to the
 996      * caller, who is expected to retry if other join mechanisms also
 997      * don't work out.
 998      *
 999      * @param joinMe the task to join
1000      * @param running if false, then must update pool count upon
1001      *  running a task
1002      * @return value of running on exit
1003      */
1004     final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
1005         /*
1006          * Initial checks to (1) abort if terminating; (2) clean out
1007          * old cancelled tasks from local queue; (3) if joinMe is next
1008          * task, run it; (4) omit scan if local queue nonempty (since
1009          * it may contain non-descendents of joinMe).
1010          */
1011         ForkJoinPool p = pool;
1012         for (;;) {
1013             ForkJoinTask<?>[] q;
1014             int s;
1015             if (joinMe.status < 0)
1016                 return running;
1017             else if ((runState & TERMINATING) != 0) {
1018                 joinMe.cancelIgnoringExceptions();
1019                 return running;
1020             }
1021             else if ((s = sp) == base || (q = queue) == null)
1022                 break;                            // queue empty
1023             else {
1024                 int i = (q.length - 1) & --s;
1025                 long u = (i << qShift) + qBase;   // raw offset
1026                 ForkJoinTask<?> t = q[i];
1027                 if (t == null)
1028                     break;                        // lost to a stealer
1029                 else if (t != joinMe && t.status >= 0)
1030                     return running;               // cannot safely help
1031                 else if ((running ||
1032                           (running = p.tryIncrementRunningCount())) &&
1033                          UNSAFE.compareAndSwapObject(q, u, t, null)) {
1034                     sp = s; // putOrderedInt may encourage more timely write







1035                     // UNSAFE.putOrderedInt(this, spOffset, s);
1036                     t.quietlyExec();
1037                 }
1038             }
1039         }
1040 
1041         int n;                                    // worker array size
1042         ForkJoinWorkerThread[] ws = p.workers;
1043         if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers

























1044             ForkJoinTask<?> task = joinMe;        // base of chain
1045             ForkJoinWorkerThread thread = this;   // thread with stolen task
1046 
1047             outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1048                 // Try to find v, the stealer of task, by first using hint
1049                 ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1050                 if (v == null || v.currentSteal != task) {
1051                     for (int j = 0; ; ++j) {      // search array
1052                         if (j < n) {
1053                             ForkJoinTask<?> vs;
1054                             if ((v = ws[j]) != null &&
1055                                 (vs = v.currentSteal) != null) {
1056                                 if (joinMe.status < 0)
1057                                     break outer;
1058                                 if (vs == task) {
1059                                     if (task.status < 0)
1060                                         break outer; // stale
1061                                     thread.stealHint = j;
1062                                     break;        // save hint for next time
1063                                 }
1064                             }
1065                         }
1066                         else
1067                             break outer;          // no stealer
1068                     }
1069                 }
1070 
1071                 // Try to help v, using specialized form of deqTask
1072                 for (;;) {
1073                     if (joinMe.status < 0)
1074                         break outer;
1075                     int b = v.base;
1076                     ForkJoinTask<?>[] q = v.queue;
1077                     if (b == v.sp || q == null)
1078                         break;                    // empty
1079                     int i = (q.length - 1) & b;
1080                     long u = (i << qShift) + qBase;
1081                     ForkJoinTask<?> t = q[i];


1082                     if (task.status < 0)
1083                         break outer;              // stale
1084                     if (t != null &&
1085                         (running ||
1086                          (running = p.tryIncrementRunningCount())) &&
1087                         v.base == b++ &&
1088                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
1089                         if (t != joinMe && joinMe.status < 0) {
1090                             UNSAFE.putObjectVolatile(q, u, t);
1091                             break outer;          // joinMe cancelled; back out
1092                         }
1093                         v.base = b;
1094                         if (t.status >= 0) {
1095                             ForkJoinTask<?> ps = currentSteal;
1096                             int pid = poolIndex;
1097                             v.stealHint = pid;
1098                             UNSAFE.putOrderedObject(this,
1099                                                     currentStealOffset, t);
1100                             t.quietlyExec();
1101                             UNSAFE.putOrderedObject(this,
1102                                                     currentStealOffset, ps);
1103                         }
1104                     }
1105                     else if ((runState & TERMINATING) != 0) {
1106                         joinMe.cancelIgnoringExceptions();
1107                         break outer;
1108                     }
1109                 }
1110 
1111                 // Try to descend to find v's stealer
1112                 ForkJoinTask<?> next = v.currentJoin;
1113                 if (task.status < 0 || next == null || next == task ||
1114                     joinMe.status < 0)
1115                     break;                 // done, stale, dead-end, or cyclic
1116                 task = next;
1117                 thread = v;
1118             }
1119         }
1120         return running;
1121     }
1122 
1123     /**
1124      * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1125      * Returns an estimate of the number of tasks, offset by a
1126      * function of number of idle workers.
1127      *
1128      * This method provides a cheap heuristic guide for task
1129      * partitioning when programmers, frameworks, tools, or languages
1130      * have little or no idea about task granularity.  In essence by
1131      * offering this method, we ask users only about tradeoffs in
1132      * overhead vs expected throughput and its variance, rather than
1133      * how finely to partition tasks.
1134      *
1135      * In a steady state strict (tree-structured) computation, each
1136      * thread makes available for stealing enough tasks for other
1137      * threads to remain active. Inductively, if all threads play by
1138      * the same rules, each thread should make available only a
1139      * constant number of tasks.
1140      *
1141      * The minimum useful constant is just 1. But using a value of 1