Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
          +++ new/src/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
↓ open down ↓ 30 lines elided ↑ open up ↑
  31   31   * Written by Doug Lea with assistance from members of JCP JSR-166
  32   32   * Expert Group and released to the public domain, as explained at
  33   33   * http://creativecommons.org/licenses/publicdomain
  34   34   */
  35   35  
  36   36  package java.util.concurrent;
  37   37  
  38   38  import java.util.Random;
  39   39  import java.util.Collection;
  40   40  import java.util.concurrent.locks.LockSupport;
       41 +import java.util.concurrent.RejectedExecutionException;
  41   42  
  42   43  /**
  43      - * A thread managed by a {@link ForkJoinPool}.  This class is
  44      - * subclassable solely for the sake of adding functionality -- there
  45      - * are no overridable methods dealing with scheduling or execution.
  46      - * However, you can override initialization and termination methods
  47      - * surrounding the main task processing loop.  If you do create such a
  48      - * subclass, you will also need to supply a custom {@link
  49      - * ForkJoinPool.ForkJoinWorkerThreadFactory} to use it in a {@code
  50      - * ForkJoinPool}.
       44 + * A thread managed by a {@link ForkJoinPool}, which executes
       45 + * {@link ForkJoinTask}s.
       46 + * This class is subclassable solely for the sake of adding
       47 + * functionality -- there are no overridable methods dealing with
       48 + * scheduling or execution.  However, you can override initialization
       49 + * and termination methods surrounding the main task processing loop.
       50 + * If you do create such a subclass, you will also need to supply a
       51 + * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
       52 + * in a {@code ForkJoinPool}.
  51   53   *
  52   54   * @since 1.7
  53   55   * @author Doug Lea
  54   56   */
  55   57  public class ForkJoinWorkerThread extends Thread {
  56   58      /*
  57   59       * Overview:
  58   60       *
  59   61       * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  60   62       * ForkJoinTasks. This class includes bookkeeping in support of
↓ open down ↓ 308 lines elided ↑ open up ↑
 369  371       *
 370  372       * @return the index number
 371  373       */
 372  374      public int getPoolIndex() {
 373  375          return poolIndex;
 374  376      }
 375  377  
 376  378      /**
 377  379       * Initializes internal state after construction but before
 378  380       * processing any tasks. If you override this method, you must
 379      -     * invoke @code{super.onStart()} at the beginning of the method.
      381 +     * invoke {@code super.onStart()} at the beginning of the method.
 380  382       * Initialization requires care: Most fields must have legal
 381  383       * default values, to ensure that attempted accesses from other
 382  384       * threads work correctly even before this thread starts
 383  385       * processing tasks.
 384  386       */
 385  387      protected void onStart() {
 386  388          int rs = seedGenerator.nextInt();
 387      -        seed = rs == 0? 1 : rs; // seed must be nonzero
      389 +        seed = (rs == 0) ? 1 : rs; // seed must be nonzero
 388  390  
 389  391          // Allocate name string and arrays in this thread
 390  392          String pid = Integer.toString(pool.getPoolNumber());
 391  393          String wid = Integer.toString(poolIndex);
 392  394          setName("ForkJoinPool-" + pid + "-worker-" + wid);
 393  395  
 394  396          queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
 395  397      }
 396  398  
 397  399      /**
↓ open down ↓ 21 lines elided ↑ open up ↑
 419  421                  exception = ex;
 420  422          } finally {
 421  423              if (exception != null)
 422  424                  UNSAFE.throwException(exception);
 423  425          }
 424  426      }
 425  427  
 426  428      /**
 427  429       * This method is required to be public, but should never be
 428  430       * called explicitly. It performs the main run loop to execute
 429      -     * ForkJoinTasks.
      431 +     * {@link ForkJoinTask}s.
 430  432       */
 431  433      public void run() {
 432  434          Throwable exception = null;
 433  435          try {
 434  436              onStart();
 435  437              mainLoop();
 436  438          } catch (Throwable ex) {
 437  439              exception = ex;
 438  440          } finally {
 439  441              onTermination(exception);
↓ open down ↓ 181 lines elided ↑ open up ↑
 621  623          ForkJoinTask<?>[] q = queue;
 622  624          if (q != null) {
 623  625              int s;
 624  626              while ((s = sp) != base) {
 625  627                  int i = (q.length - 1) & --s;
 626  628                  long u = (i << qShift) + qBase; // raw offset
 627  629                  ForkJoinTask<?> t = q[i];
 628  630                  if (t == null)   // lost to stealer
 629  631                      break;
 630  632                  if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
      633 +                    /*
      634 +                     * Note: here and in related methods, as a
      635 +                     * performance (not correctness) issue, we'd like
      636 +                     * to encourage compiler not to arbitrarily
      637 +                     * postpone setting sp after successful CAS.
      638 +                     * Currently there is no intrinsic for arranging
      639 +                     * this, but using Unsafe putOrderedInt may be a
      640 +                     * preferable strategy on some compilers even
      641 +                     * though its main effect is a pre-, not post-
      642 +                     * fence. To simplify possible changes, the option
      643 +                     * is left in comments next to the associated
      644 +                     * assignments.
      645 +                     */
 631  646                      sp = s; // putOrderedInt may encourage more timely write
 632  647                      // UNSAFE.putOrderedInt(this, spOffset, s);
 633  648                      return t;
 634  649                  }
 635  650              }
 636  651          }
 637  652          return null;
 638  653      }
 639  654  
 640  655      /**
↓ open down ↓ 129 lines elided ↑ open up ↑
 770  785                  else
 771  786                      break;
 772  787              }
 773  788          }
 774  789          return null;
 775  790      }
 776  791  
 777  792      // Run State management
 778  793  
 779  794      // status check methods used mainly by ForkJoinPool
 780      -    final boolean isRunning()     { return runState == 0; }
 781      -    final boolean isTerminated()  { return (runState & TERMINATED) != 0; }
 782      -    final boolean isSuspended()   { return (runState & SUSPENDED) != 0; }
 783      -    final boolean isTrimmed()     { return (runState & TRIMMED) != 0; }
      795 +    final boolean isRunning()    { return runState == 0; }
      796 +    final boolean isTerminated() { return (runState & TERMINATED) != 0; }
      797 +    final boolean isSuspended()  { return (runState & SUSPENDED) != 0; }
      798 +    final boolean isTrimmed()    { return (runState & TRIMMED) != 0; }
 784  799  
 785  800      final boolean isTerminating() {
 786  801          if ((runState & TERMINATING) != 0)
 787  802              return true;
 788  803          if (pool.isAtLeastTerminating()) { // propagate pool state
 789  804              shutdown();
 790  805              return true;
 791  806          }
 792  807          return false;
 793  808      }
↓ open down ↓ 83 lines elided ↑ open up ↑
 877  892          int n; // external calls must read base first
 878  893          return (n = -base + sp) <= 0 ? 0 : n;
 879  894      }
 880  895  
 881  896      /**
 882  897       * Removes and cancels all tasks in queue.  Can be called from any
 883  898       * thread.
 884  899       */
 885  900      final void cancelTasks() {
 886  901          ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
 887      -        if (cj != null) {
 888      -            currentJoin = null;
      902 +        if (cj != null && cj.status >= 0) {
 889  903              cj.cancelIgnoringExceptions();
 890  904              try {
 891  905                  this.interrupt(); // awaken wait
 892  906              } catch (SecurityException ignore) {
 893  907              }
 894  908          }
 895  909          ForkJoinTask<?> cs = currentSteal;
 896      -        if (cs != null) {
 897      -            currentSteal = null;
      910 +        if (cs != null && cs.status >= 0)
 898  911              cs.cancelIgnoringExceptions();
 899      -        }
 900  912          while (base != sp) {
 901  913              ForkJoinTask<?> t = deqTask();
 902  914              if (t != null)
 903  915                  t.cancelIgnoringExceptions();
 904  916          }
 905  917      }
 906  918  
 907  919      /**
 908  920       * Drains tasks to given collection c.
 909  921       *
↓ open down ↓ 42 lines elided ↑ open up ↑
 952  964              // cannot retain/track/help steal
 953  965              UNSAFE.putOrderedObject(this, currentStealOffset, null);
 954  966          }
 955  967          return t;
 956  968      }
 957  969  
 958  970      /**
 959  971       * Possibly runs some tasks and/or blocks, until task is done.
 960  972       *
 961  973       * @param joinMe the task to join
      974 +     * @param timed true if use timed wait
      975 +     * @param nanos wait time if timed
 962  976       */
 963      -    final void joinTask(ForkJoinTask<?> joinMe) {
      977 +    final void joinTask(ForkJoinTask<?> joinMe, boolean timed, long nanos) {
 964  978          // currentJoin only written by this thread; only need ordered store
 965  979          ForkJoinTask<?> prevJoin = currentJoin;
 966  980          UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe);
 967      -        if (sp != base)
 968      -            localHelpJoinTask(joinMe);
 969      -        if (joinMe.status >= 0)
 970      -            pool.awaitJoin(joinMe, this);
      981 +        pool.awaitJoin(joinMe, this, timed, nanos);
 971  982          UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin);
 972  983      }
 973  984  
 974  985      /**
 975      -     * Run tasks in local queue until given task is done.
      986 +     * Tries to locate and help perform tasks for a stealer of the
      987 +     * given task, or in turn one of its stealers.  Traces
      988 +     * currentSteal->currentJoin links looking for a thread working on
      989 +     * a descendant of the given task and with a non-empty queue to
      990 +     * steal back and execute tasks from.
 976  991       *
 977      -     * @param joinMe the task to join
 978      -     */
 979      -    private void localHelpJoinTask(ForkJoinTask<?> joinMe) {
 980      -        int s;
 981      -        ForkJoinTask<?>[] q;
 982      -        while (joinMe.status >= 0 && (s = sp) != base && (q = queue) != null) {
 983      -            int i = (q.length - 1) & --s;
 984      -            long u = (i << qShift) + qBase; // raw offset
 985      -            ForkJoinTask<?> t = q[i];
 986      -            if (t == null)  // lost to a stealer
 987      -                break;
 988      -            if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
 989      -                /*
 990      -                 * This recheck (and similarly in helpJoinTask)
 991      -                 * handles cases where joinMe is independently
 992      -                 * cancelled or forced even though there is other work
 993      -                 * available. Back out of the pop by putting t back
 994      -                 * into slot before we commit by writing sp.
 995      -                 */
 996      -                if (joinMe.status < 0) {
 997      -                    UNSAFE.putObjectVolatile(q, u, t);
 998      -                    break;
 999      -                }
1000      -                sp = s;
1001      -                // UNSAFE.putOrderedInt(this, spOffset, s);
1002      -                t.quietlyExec();
1003      -            }
1004      -        }
1005      -    }
1006      -
1007      -    /**
1008      -     * Unless terminating, tries to locate and help perform tasks for
1009      -     * a stealer of the given task, or in turn one of its stealers.
1010      -     * Traces currentSteal->currentJoin links looking for a thread
1011      -     * working on a descendant of the given task and with a non-empty
1012      -     * queue to steal back and execute tasks from.
1013      -     *
1014  992       * The implementation is very branchy to cope with potential
1015  993       * inconsistencies or loops encountering chains that are stale,
1016  994       * unknown, or of length greater than MAX_HELP_DEPTH links.  All
1017  995       * of these cases are dealt with by just returning back to the
1018  996       * caller, who is expected to retry if other join mechanisms also
1019  997       * don't work out.
1020  998       *
1021  999       * @param joinMe the task to join
     1000 +     * @param running if false, then must update pool count upon
     1001 +     *  running a task
     1002 +     * @return value of running on exit
1022 1003       */
1023      -    final void helpJoinTask(ForkJoinTask<?> joinMe) {
1024      -        ForkJoinWorkerThread[] ws;
1025      -        int n;
1026      -        if (joinMe.status < 0)                // already done
1027      -            return;
1028      -        if ((runState & TERMINATING) != 0) {  // cancel if shutting down
1029      -            joinMe.cancelIgnoringExceptions();
1030      -            return;
     1004 +    final boolean helpJoinTask(ForkJoinTask<?> joinMe, boolean running) {
     1005 +        /*
     1006 +         * Initial checks to (1) abort if terminating; (2) clean out
     1007 +         * old cancelled tasks from local queue; (3) if joinMe is next
     1008 +         * task, run it; (4) omit scan if local queue nonempty (since
     1009 +         * it may contain non-descendents of joinMe).
     1010 +         */
     1011 +        ForkJoinPool p = pool;
     1012 +        for (;;) {
     1013 +            ForkJoinTask<?>[] q;
     1014 +            int s;
     1015 +            if (joinMe.status < 0)
     1016 +                return running;
     1017 +            else if ((runState & TERMINATING) != 0) {
     1018 +                joinMe.cancelIgnoringExceptions();
     1019 +                return running;
     1020 +            }
     1021 +            else if ((s = sp) == base || (q = queue) == null)
     1022 +                break;                            // queue empty
     1023 +            else {
     1024 +                int i = (q.length - 1) & --s;
     1025 +                long u = (i << qShift) + qBase;   // raw offset
     1026 +                ForkJoinTask<?> t = q[i];
     1027 +                if (t == null)
     1028 +                    break;                        // lost to a stealer
     1029 +                else if (t != joinMe && t.status >= 0)
     1030 +                    return running;               // cannot safely help
     1031 +                else if ((running ||
     1032 +                          (running = p.tryIncrementRunningCount())) &&
     1033 +                         UNSAFE.compareAndSwapObject(q, u, t, null)) {
     1034 +                    sp = s; // putOrderedInt may encourage more timely write
     1035 +                    // UNSAFE.putOrderedInt(this, spOffset, s);
     1036 +                    t.quietlyExec();
     1037 +                }
     1038 +            }
1031 1039          }
1032      -        if ((ws = pool.workers) == null || (n = ws.length) <= 1)
1033      -            return;                           // need at least 2 workers
1034 1040  
1035      -        ForkJoinTask<?> task = joinMe;        // base of chain
1036      -        ForkJoinWorkerThread thread = this;   // thread with stolen task
1037      -        for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
1038      -            // Try to find v, the stealer of task, by first using hint
1039      -            ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
1040      -            if (v == null || v.currentSteal != task) {
1041      -                for (int j = 0; ; ++j) {      // search array
1042      -                    if (j < n) {
1043      -                        ForkJoinTask<?> vs;
1044      -                        if ((v = ws[j]) != null &&
1045      -                            (vs = v.currentSteal) != null) {
1046      -                            if (joinMe.status < 0 || task.status < 0)
1047      -                                return;       // stale or done
1048      -                            if (vs == task) {
1049      -                                thread.stealHint = j;
1050      -                                break;        // save hint for next time
     1041 +        int n;                                    // worker array size
     1042 +        ForkJoinWorkerThread[] ws = p.workers;
     1043 +        if (ws != null && (n = ws.length) > 1) {  // need at least 2 workers
     1044 +            ForkJoinTask<?> task = joinMe;        // base of chain
     1045 +            ForkJoinWorkerThread thread = this;   // thread with stolen task
     1046 +
     1047 +            outer:for (int d = 0; d < MAX_HELP_DEPTH; ++d) { // chain length
     1048 +                // Try to find v, the stealer of task, by first using hint
     1049 +                ForkJoinWorkerThread v = ws[thread.stealHint & (n - 1)];
     1050 +                if (v == null || v.currentSteal != task) {
     1051 +                    for (int j = 0; ; ++j) {      // search array
     1052 +                        if (j < n) {
     1053 +                            ForkJoinTask<?> vs;
     1054 +                            if ((v = ws[j]) != null &&
     1055 +                                (vs = v.currentSteal) != null) {
     1056 +                                if (joinMe.status < 0)
     1057 +                                    break outer;
     1058 +                                if (vs == task) {
     1059 +                                    if (task.status < 0)
     1060 +                                        break outer; // stale
     1061 +                                    thread.stealHint = j;
     1062 +                                    break;        // save hint for next time
     1063 +                                }
1051 1064                              }
1052 1065                          }
     1066 +                        else
     1067 +                            break outer;          // no stealer
1053 1068                      }
1054      -                    else
1055      -                        return;               // no stealer
1056 1069                  }
1057      -            }
1058      -            for (;;) { // Try to help v, using specialized form of deqTask
1059      -                if (joinMe.status < 0)
1060      -                    return;
1061      -                int b = v.base;
1062      -                ForkJoinTask<?>[] q = v.queue;
1063      -                if (b == v.sp || q == null)
1064      -                    break;
1065      -                int i = (q.length - 1) & b;
1066      -                long u = (i << qShift) + qBase;
1067      -                ForkJoinTask<?> t = q[i];
1068      -                int pid = poolIndex;
1069      -                ForkJoinTask<?> ps = currentSteal;
1070      -                if (task.status < 0)
1071      -                    return;                   // stale or done
1072      -                if (t != null && v.base == b++ &&
1073      -                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
1074      -                    if (joinMe.status < 0) {
1075      -                        UNSAFE.putObjectVolatile(q, u, t);
1076      -                        return;               // back out on cancel
     1070 +
     1071 +                // Try to help v, using specialized form of deqTask
     1072 +                for (;;) {
     1073 +                    if (joinMe.status < 0)
     1074 +                        break outer;
     1075 +                    int b = v.base;
     1076 +                    ForkJoinTask<?>[] q = v.queue;
     1077 +                    if (b == v.sp || q == null)
     1078 +                        break;                    // empty
     1079 +                    int i = (q.length - 1) & b;
     1080 +                    long u = (i << qShift) + qBase;
     1081 +                    ForkJoinTask<?> t = q[i];
     1082 +                    if (task.status < 0)
     1083 +                        break outer;              // stale
     1084 +                    if (t != null &&
     1085 +                        (running ||
     1086 +                         (running = p.tryIncrementRunningCount())) &&
     1087 +                        v.base == b++ &&
     1088 +                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
     1089 +                        if (t != joinMe && joinMe.status < 0) {
     1090 +                            UNSAFE.putObjectVolatile(q, u, t);
     1091 +                            break outer;          // joinMe cancelled; back out
     1092 +                        }
     1093 +                        v.base = b;
     1094 +                        if (t.status >= 0) {
     1095 +                            ForkJoinTask<?> ps = currentSteal;
     1096 +                            int pid = poolIndex;
     1097 +                            v.stealHint = pid;
     1098 +                            UNSAFE.putOrderedObject(this,
     1099 +                                                    currentStealOffset, t);
     1100 +                            t.quietlyExec();
     1101 +                            UNSAFE.putOrderedObject(this,
     1102 +                                                    currentStealOffset, ps);
     1103 +                        }
1077 1104                      }
1078      -                    v.base = b;
1079      -                    v.stealHint = pid;
1080      -                    UNSAFE.putOrderedObject(this, currentStealOffset, t);
1081      -                    t.quietlyExec();
1082      -                    UNSAFE.putOrderedObject(this, currentStealOffset, ps);
     1105 +                    else if ((runState & TERMINATING) != 0) {
     1106 +                        joinMe.cancelIgnoringExceptions();
     1107 +                        break outer;
     1108 +                    }
1083 1109                  }
     1110 +
     1111 +                // Try to descend to find v's stealer
     1112 +                ForkJoinTask<?> next = v.currentJoin;
     1113 +                if (task.status < 0 || next == null || next == task ||
     1114 +                    joinMe.status < 0)
     1115 +                    break;                 // done, stale, dead-end, or cyclic
     1116 +                task = next;
     1117 +                thread = v;
1084 1118              }
1085      -            // Try to descend to find v's stealer
1086      -            ForkJoinTask<?> next = v.currentJoin;
1087      -            if (task.status < 0 || next == null || next == task ||
1088      -                joinMe.status < 0)
1089      -                return;
1090      -            task = next;
1091      -            thread = v;
1092 1119          }
     1120 +        return running;
1093 1121      }
1094 1122  
1095 1123      /**
1096 1124       * Implements ForkJoinTask.getSurplusQueuedTaskCount().
1097 1125       * Returns an estimate of the number of tasks, offset by a
1098 1126       * function of number of idle workers.
1099 1127       *
1100 1128       * This method provides a cheap heuristic guide for task
1101 1129       * partitioning when programmers, frameworks, tools, or languages
1102 1130       * have little or no idea about task granularity.  In essence by
↓ open down ↓ 110 lines elided ↑ open up ↑
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX