Print this page


Split Close
Expand all
Collapse all
          --- old/src/share/classes/java/util/concurrent/ForkJoinPool.java
          +++ new/src/share/classes/java/util/concurrent/ForkJoinPool.java
↓ open down ↓ 517 lines elided ↑ open up ↑
 518  518      private volatile long stealCount;
 519  519  
 520  520      /**
 521  521       * Encoded record of top of Treiber stack of threads waiting for
 522  522       * events. The top 32 bits contain the count being waited for. The
 523  523       * bottom 16 bits contains one plus the pool index of waiting
 524  524       * worker thread. (Bits 16-31 are unused.)
 525  525       */
 526  526      private volatile long eventWaiters;
 527  527  
 528      -    private static final int  EVENT_COUNT_SHIFT = 32;
 529      -    private static final long WAITER_ID_MASK    = (1L << 16) - 1L;
      528 +    private static final int EVENT_COUNT_SHIFT = 32;
      529 +    private static final int WAITER_ID_MASK    = (1 << 16) - 1;
 530  530  
 531  531      /**
 532  532       * A counter for events that may wake up worker threads:
 533  533       *   - Submission of a new task to the pool
 534  534       *   - A worker pushing a task on an empty queue
 535  535       *   - termination
 536  536       */
 537  537      private volatile int eventCount;
 538  538  
 539  539      /**
↓ open down ↓ 68 lines elided ↑ open up ↑
 608  608  
 609  609      /**
 610  610       * Pool number, just for assigning useful names to worker threads
 611  611       */
 612  612      private final int poolNumber;
 613  613  
 614  614      // Utilities for CASing fields. Note that most of these
 615  615      // are usually manually inlined by callers
 616  616  
 617  617      /**
 618      -     * Increments running count part of workerCounts
      618 +     * Increments running count part of workerCounts.
 619  619       */
 620  620      final void incrementRunningCount() {
 621  621          int c;
 622  622          do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 623  623                                                 c = workerCounts,
 624  624                                                 c + ONE_RUNNING));
 625  625      }
 626  626  
 627  627      /**
 628      -     * Tries to decrement running count unless already zero
      628 +     * Tries to increment running count part of workerCounts.
 629  629       */
      630 +    final boolean tryIncrementRunningCount() {
      631 +        int c;
      632 +        return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
      633 +                                        c = workerCounts,
      634 +                                        c + ONE_RUNNING);
      635 +    }
      636 +
      637 +    /**
      638 +     * Tries to decrement running count unless already zero.
      639 +     */
 630  640      final boolean tryDecrementRunningCount() {
 631  641          int wc = workerCounts;
 632  642          if ((wc & RUNNING_COUNT_MASK) == 0)
 633  643              return false;
 634  644          return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 635  645                                          wc, wc - ONE_RUNNING);
 636  646      }
 637  647  
 638  648      /**
 639  649       * Forces decrement of encoded workerCounts, awaiting nonzero if
↓ open down ↓ 51 lines elided ↑ open up ↑
 691  701          int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
 692  702          final ReentrantLock lock = this.workerLock;
 693  703          lock.lock();
 694  704          try {
 695  705              ForkJoinWorkerThread[] ws = workers;
 696  706              int n = ws.length;
 697  707              if (k < 0 || k >= n || ws[k] != null) {
 698  708                  for (k = 0; k < n && ws[k] != null; ++k)
 699  709                      ;
 700  710                  if (k == n)
 701      -                    ws = Arrays.copyOf(ws, n << 1);
      711 +                    ws = workers = Arrays.copyOf(ws, n << 1);
 702  712              }
 703  713              ws[k] = w;
 704      -            workers = ws; // volatile array write ensures slot visibility
      714 +            int c = eventCount; // advance event count to ensure visibility
      715 +            UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
 705  716          } finally {
 706  717              lock.unlock();
 707  718          }
 708  719          return k;
 709  720      }
 710  721  
 711  722      /**
 712  723       * Nulls out record of worker in workers array.
 713  724       */
 714  725      private void forgetWorker(ForkJoinWorkerThread w) {
↓ open down ↓ 12 lines elided ↑ open up ↑
 727  738  
 728  739      /**
 729  740       * Final callback from terminating worker.  Removes record of
 730  741       * worker from array, and adjusts counts. If pool is shutting
 731  742       * down, tries to complete termination.
 732  743       *
 733  744       * @param w the worker
 734  745       */
 735  746      final void workerTerminated(ForkJoinWorkerThread w) {
 736  747          forgetWorker(w);
 737      -        decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
      748 +        decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
 738  749          while (w.stealCount != 0) // collect final count
 739  750              tryAccumulateStealCount(w);
 740  751          tryTerminate(false);
 741  752      }
 742  753  
 743  754      // Waiting for and signalling events
 744  755  
 745  756      /**
 746  757       * Releases workers blocked on a count not equal to current count.
 747  758       * Normally called after precheck that eventWaiters isn't zero to
 748  759       * avoid wasted array checks. Gives up upon a change in count or
 749      -     * upon releasing two workers, letting others take over.
      760 +     * upon releasing four workers, letting others take over.
 750  761       */
 751  762      private void releaseEventWaiters() {
 752  763          ForkJoinWorkerThread[] ws = workers;
 753  764          int n = ws.length;
 754  765          long h = eventWaiters;
 755  766          int ec = eventCount;
 756      -        boolean releasedOne = false;
      767 +        int releases = 4;
 757  768          ForkJoinWorkerThread w; int id;
 758      -        while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
      769 +        while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
 759  770                 (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
 760  771                 id < n && (w = ws[id]) != null) {
 761  772              if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
 762  773                                            h,  w.nextWaiter)) {
 763  774                  LockSupport.unpark(w);
 764      -                if (releasedOne) // exit on second release
      775 +                if (--releases == 0)
 765  776                      break;
 766      -                releasedOne = true;
 767  777              }
 768  778              if (eventCount != ec)
 769  779                  break;
 770  780              h = eventWaiters;
 771  781          }
 772  782      }
 773  783  
 774  784      /**
 775  785       * Tries to advance eventCount and releases waiters. Called only
 776  786       * from workers.
↓ open down ↓ 9 lines elided ↑ open up ↑
 786  796       * Adds the given worker to event queue and blocks until
 787  797       * terminating or event count advances from the given value
 788  798       *
 789  799       * @param w the calling worker thread
 790  800       * @param ec the count
 791  801       */
 792  802      private void eventSync(ForkJoinWorkerThread w, int ec) {
 793  803          long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
 794  804          long h;
 795  805          while ((runState < SHUTDOWN || !tryTerminate(false)) &&
 796      -               (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
      806 +               (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
 797  807                  (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
 798  808                 eventCount == ec) {
 799  809              if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
 800  810                                            w.nextWaiter = h, nh)) {
 801  811                  awaitEvent(w, ec);
 802  812                  break;
 803  813              }
 804  814          }
 805  815      }
 806  816  
↓ open down ↓ 6 lines elided ↑ open up ↑
 813  823       * periods.
 814  824       *
 815  825       * @param w the calling worker thread
 816  826       * @param ec the count
 817  827       */
 818  828      private void awaitEvent(ForkJoinWorkerThread w, int ec) {
 819  829          while (eventCount == ec) {
 820  830              if (tryAccumulateStealCount(w)) { // transfer while idle
 821  831                  boolean untimed = (w.nextWaiter != 0L ||
 822  832                                     (workerCounts & RUNNING_COUNT_MASK) <= 1);
 823      -                long startTime = untimed? 0 : System.nanoTime();
      833 +                long startTime = untimed ? 0 : System.nanoTime();
 824  834                  Thread.interrupted();         // clear/ignore interrupt
 825      -                if (eventCount != ec || w.isTerminating())
      835 +                if (w.isTerminating() || eventCount != ec)
 826  836                      break;                    // recheck after clear
 827  837                  if (untimed)
 828  838                      LockSupport.park(w);
 829  839                  else {
 830  840                      LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
 831  841                      if (eventCount != ec || w.isTerminating())
 832  842                          break;
 833  843                      if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
 834  844                          tryShutdownUnusedWorker(ec);
 835  845                  }
↓ open down ↓ 17 lines elided ↑ open up ↑
 853  863       * threads is less than target.
 854  864       */
 855  865      private void tryResumeSpare() {
 856  866          int sw, id;
 857  867          ForkJoinWorkerThread[] ws = workers;
 858  868          int n = ws.length;
 859  869          ForkJoinWorkerThread w;
 860  870          if ((sw = spareWaiters) != 0 &&
 861  871              (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
 862  872              id < n && (w = ws[id]) != null &&
 863      -            (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
      873 +            (runState >= TERMINATING ||
      874 +             (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
 864  875              spareWaiters == sw &&
 865  876              UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 866  877                                       sw, w.nextSpare)) {
 867  878              int c; // increment running count before resume
 868  879              do {} while (!UNSAFE.compareAndSwapInt
 869  880                           (this, workerCountsOffset,
 870  881                            c = workerCounts, c + ONE_RUNNING));
 871  882              if (w.tryUnsuspend())
 872  883                  LockSupport.unpark(w);
 873  884              else   // back out if w was shutdown
↓ open down ↓ 33 lines elided ↑ open up ↑
 907  918                      tryTerminate(false); // handle failure during shutdown
 908  919                      // If originating from an external caller,
 909  920                      // propagate exception, else ignore
 910  921                      if (fail != null && runState < TERMINATING &&
 911  922                          !(Thread.currentThread() instanceof
 912  923                            ForkJoinWorkerThread))
 913  924                          UNSAFE.throwException(fail);
 914  925                      break;
 915  926                  }
 916  927                  w.start(recordWorker(w), ueh);
 917      -                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
 918      -                    int c; // advance event count
 919      -                    UNSAFE.compareAndSwapInt(this, eventCountOffset,
 920      -                                             c = eventCount, c+1);
      928 +                if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
 921  929                      break; // add at most one unless total below target
 922      -                }
 923  930              }
 924  931          }
 925  932          if (eventWaiters != 0L)
 926  933              releaseEventWaiters();
 927  934      }
 928  935  
 929  936      /**
 930  937       * Callback from the oldest waiter in awaitEvent waking up after a
 931  938       * period of non-use. If all workers are idle, tries (once) to
 932  939       * shutdown an event waiter or a spare, if one exists. Note that
↓ open down ↓ 15 lines elided ↑ open up ↑
 948  955              long h;
 949  956              if ((sw = spareWaiters) != 0) { // prefer killing spares
 950  957                  int id = (sw & SPARE_ID_MASK) - 1;
 951  958                  if (id >= 0 && id < n && (w = ws[id]) != null &&
 952  959                      UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 953  960                                               sw, w.nextSpare))
 954  961                      shutdown = true;
 955  962              }
 956  963              else if ((h = eventWaiters) != 0L) {
 957  964                  long nh;
 958      -                int id = ((int)(h & WAITER_ID_MASK)) - 1;
      965 +                int id = (((int)h) & WAITER_ID_MASK) - 1;
 959  966                  if (id >= 0 && id < n && (w = ws[id]) != null &&
 960  967                      (nh = w.nextWaiter) != 0L && // keep at least one worker
 961  968                      UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
 962  969                      shutdown = true;
 963  970              }
 964  971              if (w != null && shutdown) {
 965  972                  w.shutdown();
 966  973                  LockSupport.unpark(w);
 967  974              }
 968  975          }
↓ open down ↓ 27 lines elided ↑ open up ↑
 996 1003       * @param w the worker
 997 1004       * @param ran true if worker ran a task since last call to this method
 998 1005       */
 999 1006      final void preStep(ForkJoinWorkerThread w, boolean ran) {
1000 1007          int wec = w.lastEventCount;
1001 1008          boolean active = w.active;
1002 1009          boolean inactivate = false;
1003 1010          int pc = parallelism;
1004 1011          while (w.runState == 0) {
1005 1012              int rs = runState;
1006      -            if (rs >= TERMINATING) { // propagate shutdown
     1013 +            if (rs >= TERMINATING) {           // propagate shutdown
1007 1014                  w.shutdown();
1008 1015                  break;
1009 1016              }
1010 1017              if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
1011      -                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
     1018 +                UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
1012 1019                  inactivate = active = w.active = false;
1013      -            int wc = workerCounts;
     1020 +                if (rs == SHUTDOWN) {          // all inactive and shut down
     1021 +                    tryTerminate(false);
     1022 +                    continue;
     1023 +                }
     1024 +            }
     1025 +            int wc = workerCounts;             // try to suspend as spare
1014 1026              if ((wc & RUNNING_COUNT_MASK) > pc) {
1015 1027                  if (!(inactivate |= active) && // must inactivate to suspend
1016      -                    workerCounts == wc &&      // try to suspend as spare
     1028 +                    workerCounts == wc &&
1017 1029                      UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1018 1030                                               wc, wc - ONE_RUNNING))
1019 1031                      w.suspendAsSpare();
1020 1032              }
1021 1033              else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1022 1034                  helpMaintainParallelism();     // not enough workers
1023      -            else if (!ran) {
     1035 +            else if (ran)
     1036 +                break;
     1037 +            else {
1024 1038                  long h = eventWaiters;
1025 1039                  int ec = eventCount;
1026 1040                  if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
1027 1041                      releaseEventWaiters();     // release others before waiting
1028 1042                  else if (ec != wec) {
1029 1043                      w.lastEventCount = ec;     // no need to wait
1030 1044                      break;
1031 1045                  }
1032 1046                  else if (!(inactivate |= active))
1033 1047                      eventSync(w, wec);         // must inactivate before sync
1034 1048              }
1035      -            else
1036      -                break;
1037 1049          }
1038 1050      }
1039 1051  
1040 1052      /**
1041 1053       * Helps and/or blocks awaiting join of the given task.
1042 1054       * See above for explanation.
1043 1055       *
1044 1056       * @param joinMe the task to join
1045 1057       * @param worker the current worker thread
     1058 +     * @param timed true if wait should time out
     1059 +     * @param nanos timeout value if timed
1046 1060       */
1047      -    final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
     1061 +    final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
     1062 +                         boolean timed, long nanos) {
     1063 +        long startTime = timed ? System.nanoTime() : 0L;
1048 1064          int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
     1065 +        boolean running = true;               // false when count decremented
1049 1066          while (joinMe.status >= 0) {
1050      -            int wc;
1051      -            worker.helpJoinTask(joinMe);
     1067 +            if (runState >= TERMINATING) {
     1068 +                joinMe.cancelIgnoringExceptions();
     1069 +                break;
     1070 +            }
     1071 +            running = worker.helpJoinTask(joinMe, running);
1052 1072              if (joinMe.status < 0)
1053 1073                  break;
1054      -            else if (retries > 0)
     1074 +            if (retries > 0) {
1055 1075                  --retries;
1056      -            else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1057      -                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058      -                                              wc, wc - ONE_RUNNING)) {
1059      -                int stat, c; long h;
1060      -                while ((stat = joinMe.status) >= 0 &&
1061      -                       (h = eventWaiters) != 0L && // help release others
1062      -                       (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
     1076 +                continue;
     1077 +            }
     1078 +            int wc = workerCounts;
     1079 +            if ((wc & RUNNING_COUNT_MASK) != 0) {
     1080 +                if (running) {
     1081 +                    if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
     1082 +                                                  wc, wc - ONE_RUNNING))
     1083 +                        continue;
     1084 +                    running = false;
     1085 +                }
     1086 +                long h = eventWaiters;
     1087 +                if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1063 1088                      releaseEventWaiters();
1064      -                if (stat >= 0 &&
1065      -                    ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1066      -                     (stat =
1067      -                      joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1068      -                    helpMaintainParallelism(); // timeout or no running workers
1069      -                do {} while (!UNSAFE.compareAndSwapInt
1070      -                             (this, workerCountsOffset,
1071      -                              c = workerCounts, c + ONE_RUNNING));
1072      -                if (stat < 0)
1073      -                    break;   // else restart
     1089 +                if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
     1090 +                    long ms; int ns;
     1091 +                    if (!timed) {
     1092 +                        ms = JOIN_TIMEOUT_MILLIS;
     1093 +                        ns = 0;
     1094 +                    }
     1095 +                    else { // at most JOIN_TIMEOUT_MILLIS per wait
     1096 +                        long nt = nanos - (System.nanoTime() - startTime);
     1097 +                        if (nt <= 0L)
     1098 +                            break;
     1099 +                        ms = nt / 1000000;
     1100 +                        if (ms > JOIN_TIMEOUT_MILLIS) {
     1101 +                            ms = JOIN_TIMEOUT_MILLIS;
     1102 +                            ns = 0;
     1103 +                        }
     1104 +                        else
     1105 +                            ns = (int) (nt % 1000000);
     1106 +                    }
     1107 +                    joinMe.internalAwaitDone(ms, ns);
     1108 +                }
     1109 +                if (joinMe.status < 0)
     1110 +                    break;
1074 1111              }
     1112 +            helpMaintainParallelism();
1075 1113          }
     1114 +        if (!running) {
     1115 +            int c;
     1116 +            do {} while (!UNSAFE.compareAndSwapInt
     1117 +                         (this, workerCountsOffset,
     1118 +                          c = workerCounts, c + ONE_RUNNING));
     1119 +        }
1076 1120      }
1077 1121  
1078 1122      /**
1079 1123       * Same idea as awaitJoin, but no helping, retries, or timeouts.
1080 1124       */
1081 1125      final void awaitBlocker(ManagedBlocker blocker)
1082 1126          throws InterruptedException {
1083 1127          while (!blocker.isReleasable()) {
1084 1128              int wc = workerCounts;
1085      -            if ((wc & RUNNING_COUNT_MASK) != 0 &&
1086      -                UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1087      -                                         wc, wc - ONE_RUNNING)) {
     1129 +            if ((wc & RUNNING_COUNT_MASK) == 0)
     1130 +                helpMaintainParallelism();
     1131 +            else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
     1132 +                                              wc, wc - ONE_RUNNING)) {
1088 1133                  try {
1089 1134                      while (!blocker.isReleasable()) {
1090 1135                          long h = eventWaiters;
1091 1136                          if (h != 0L &&
1092 1137                              (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1093 1138                              releaseEventWaiters();
1094 1139                          else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1095 1140                                   runState < TERMINATING)
1096 1141                              helpMaintainParallelism();
1097 1142                          else if (blocker.block())
↓ open down ↓ 24 lines elided ↑ open up ↑
1122 1167                   !submissionQueue.isEmpty() ||
1123 1168                   (runState & ACTIVE_COUNT_MASK) != 0)
1124 1169              return false;
1125 1170  
1126 1171          if (advanceRunLevel(TERMINATING))
1127 1172              startTerminating();
1128 1173  
1129 1174          // Finish now if all threads terminated; else in some subsequent call
1130 1175          if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1131 1176              advanceRunLevel(TERMINATED);
1132      -            termination.arrive();
     1177 +            termination.forceTermination();
1133 1178          }
1134 1179          return true;
1135 1180      }
1136 1181  
1137      -
1138 1182      /**
1139 1183       * Actions on transition to TERMINATING
1140 1184       *
1141 1185       * Runs up to four passes through workers: (0) shutting down each
1142 1186       * (without waking up if parked) to quickly spread notifications
1143 1187       * without unnecessary bouncing around event queues etc (1) wake
1144 1188       * up and help cancel tasks (2) interrupt (3) mop up races with
1145 1189       * interrupted workers
1146 1190       */
1147 1191      private void startTerminating() {
↓ open down ↓ 170 lines elided ↑ open up ↑
1318 1362          size |= size >>> 1;
1319 1363          size |= size >>> 2;
1320 1364          size |= size >>> 4;
1321 1365          size |= size >>> 8;
1322 1366          return size + 1;
1323 1367      }
1324 1368  
1325 1369      // Execution methods
1326 1370  
1327 1371      /**
1328      -     * Common code for execute, invoke and submit
     1372 +     * Submits task and creates, starts, or resumes some workers if necessary
1329 1373       */
1330 1374      private <T> void doSubmit(ForkJoinTask<T> task) {
1331      -        if (task == null)
1332      -            throw new NullPointerException();
1333      -        if (runState >= SHUTDOWN)
1334      -            throw new RejectedExecutionException();
1335 1375          submissionQueue.offer(task);
1336 1376          int c; // try to increment event count -- CAS failure OK
1337 1377          UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1338      -        helpMaintainParallelism(); // create, start, or resume some workers
     1378 +        helpMaintainParallelism();
1339 1379      }
1340 1380  
1341 1381      /**
1342 1382       * Performs the given task, returning its result upon completion.
1343 1383       *
1344 1384       * @param task the task
1345 1385       * @return the task's result
1346 1386       * @throws NullPointerException if the task is null
1347 1387       * @throws RejectedExecutionException if the task cannot be
1348 1388       *         scheduled for execution
1349 1389       */
1350 1390      public <T> T invoke(ForkJoinTask<T> task) {
1351      -        doSubmit(task);
1352      -        return task.join();
     1391 +        if (task == null)
     1392 +            throw new NullPointerException();
     1393 +        if (runState >= SHUTDOWN)
     1394 +            throw new RejectedExecutionException();
     1395 +        Thread t = Thread.currentThread();
     1396 +        if ((t instanceof ForkJoinWorkerThread) &&
     1397 +            ((ForkJoinWorkerThread)t).pool == this)
     1398 +            return task.invoke();  // bypass submit if in same pool
     1399 +        else {
     1400 +            doSubmit(task);
     1401 +            return task.join();
     1402 +        }
1353 1403      }
1354 1404  
1355 1405      /**
     1406 +     * Unless terminating, forks task if within an ongoing FJ
     1407 +     * computation in the current pool, else submits as external task.
     1408 +     */
     1409 +    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
     1410 +        if (runState >= SHUTDOWN)
     1411 +            throw new RejectedExecutionException();
     1412 +        Thread t = Thread.currentThread();
     1413 +        if ((t instanceof ForkJoinWorkerThread) &&
     1414 +            ((ForkJoinWorkerThread)t).pool == this)
     1415 +            task.fork();
     1416 +        else
     1417 +            doSubmit(task);
     1418 +    }
     1419 +
     1420 +    /**
1356 1421       * Arranges for (asynchronous) execution of the given task.
1357 1422       *
1358 1423       * @param task the task
1359 1424       * @throws NullPointerException if the task is null
1360 1425       * @throws RejectedExecutionException if the task cannot be
1361 1426       *         scheduled for execution
1362 1427       */
1363 1428      public void execute(ForkJoinTask<?> task) {
1364      -        doSubmit(task);
     1429 +        if (task == null)
     1430 +            throw new NullPointerException();
     1431 +        forkOrSubmit(task);
1365 1432      }
1366 1433  
1367 1434      // AbstractExecutorService methods
1368 1435  
1369 1436      /**
1370 1437       * @throws NullPointerException if the task is null
1371 1438       * @throws RejectedExecutionException if the task cannot be
1372 1439       *         scheduled for execution
1373 1440       */
1374 1441      public void execute(Runnable task) {
     1442 +        if (task == null)
     1443 +            throw new NullPointerException();
1375 1444          ForkJoinTask<?> job;
1376 1445          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1377 1446              job = (ForkJoinTask<?>) task;
1378 1447          else
1379 1448              job = ForkJoinTask.adapt(task, null);
1380      -        doSubmit(job);
     1449 +        forkOrSubmit(job);
1381 1450      }
1382 1451  
1383 1452      /**
1384 1453       * Submits a ForkJoinTask for execution.
1385 1454       *
1386 1455       * @param task the task to submit
1387 1456       * @return the task
1388 1457       * @throws NullPointerException if the task is null
1389 1458       * @throws RejectedExecutionException if the task cannot be
1390 1459       *         scheduled for execution
1391 1460       */
1392 1461      public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1393      -        doSubmit(task);
     1462 +        if (task == null)
     1463 +            throw new NullPointerException();
     1464 +        forkOrSubmit(task);
1394 1465          return task;
1395 1466      }
1396 1467  
1397 1468      /**
1398 1469       * @throws NullPointerException if the task is null
1399 1470       * @throws RejectedExecutionException if the task cannot be
1400 1471       *         scheduled for execution
1401 1472       */
1402 1473      public <T> ForkJoinTask<T> submit(Callable<T> task) {
     1474 +        if (task == null)
     1475 +            throw new NullPointerException();
1403 1476          ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1404      -        doSubmit(job);
     1477 +        forkOrSubmit(job);
1405 1478          return job;
1406 1479      }
1407 1480  
1408 1481      /**
1409 1482       * @throws NullPointerException if the task is null
1410 1483       * @throws RejectedExecutionException if the task cannot be
1411 1484       *         scheduled for execution
1412 1485       */
1413 1486      public <T> ForkJoinTask<T> submit(Runnable task, T result) {
     1487 +        if (task == null)
     1488 +            throw new NullPointerException();
1414 1489          ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1415      -        doSubmit(job);
     1490 +        forkOrSubmit(job);
1416 1491          return job;
1417 1492      }
1418 1493  
1419 1494      /**
1420 1495       * @throws NullPointerException if the task is null
1421 1496       * @throws RejectedExecutionException if the task cannot be
1422 1497       *         scheduled for execution
1423 1498       */
1424 1499      public ForkJoinTask<?> submit(Runnable task) {
     1500 +        if (task == null)
     1501 +            throw new NullPointerException();
1425 1502          ForkJoinTask<?> job;
1426 1503          if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1427 1504              job = (ForkJoinTask<?>) task;
1428 1505          else
1429 1506              job = ForkJoinTask.adapt(task, null);
1430      -        doSubmit(job);
     1507 +        forkOrSubmit(job);
1431 1508          return job;
1432 1509      }
1433 1510  
1434 1511      /**
1435 1512       * @throws NullPointerException       {@inheritDoc}
1436 1513       * @throws RejectedExecutionException {@inheritDoc}
1437 1514       */
1438 1515      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1439 1516          ArrayList<ForkJoinTask<T>> forkJoinTasks =
1440 1517              new ArrayList<ForkJoinTask<T>>(tasks.size());
↓ open down ↓ 277 lines elided ↑ open up ↑
1718 1795       */
1719 1796      public boolean isTerminated() {
1720 1797          return runState >= TERMINATED;
1721 1798      }
1722 1799  
1723 1800      /**
1724 1801       * Returns {@code true} if the process of termination has
1725 1802       * commenced but not yet completed.  This method may be useful for
1726 1803       * debugging. A return of {@code true} reported a sufficient
1727 1804       * period after shutdown may indicate that submitted tasks have
1728      -     * ignored or suppressed interruption, causing this executor not
1729      -     * to properly terminate.
     1805 +     * ignored or suppressed interruption, or are waiting for IO,
     1806 +     * causing this executor not to properly terminate. (See the
     1807 +     * advisory notes for class {@link ForkJoinTask} stating that
     1808 +     * tasks should not normally entail blocking operations.  But if
     1809 +     * they do, they must abort them on interrupt.)
1730 1810       *
1731 1811       * @return {@code true} if terminating but not yet terminated
1732 1812       */
1733 1813      public boolean isTerminating() {
1734 1814          return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1735 1815      }
1736 1816  
1737 1817      /**
1738 1818       * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1739 1819       */
↓ open down ↓ 17 lines elided ↑ open up ↑
1757 1837       *
1758 1838       * @param timeout the maximum time to wait
1759 1839       * @param unit the time unit of the timeout argument
1760 1840       * @return {@code true} if this executor terminated and
1761 1841       *         {@code false} if the timeout elapsed before termination
1762 1842       * @throws InterruptedException if interrupted while waiting
1763 1843       */
1764 1844      public boolean awaitTermination(long timeout, TimeUnit unit)
1765 1845          throws InterruptedException {
1766 1846          try {
1767      -            return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
     1847 +            termination.awaitAdvanceInterruptibly(0, timeout, unit);
1768 1848          } catch (TimeoutException ex) {
1769 1849              return false;
1770 1850          }
     1851 +        return true;
1771 1852      }
1772 1853  
1773 1854      /**
1774 1855       * Interface for extending managed parallelism for tasks running
1775 1856       * in {@link ForkJoinPool}s.
1776 1857       *
1777 1858       * <p>A {@code ManagedBlocker} provides two methods.  Method
1778 1859       * {@code isReleasable} must return {@code true} if blocking is
1779 1860       * not necessary. Method {@code block} blocks the current thread
1780 1861       * if necessary (perhaps internally invoking {@code isReleasable}
↓ open down ↓ 134 lines elided ↑ open up ↑
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX