src/share/classes/java/util/concurrent/ForkJoinPool.java

Print this page




 509     /**
 510      * Creation factory for worker threads.
 511      */
 512     private final ForkJoinWorkerThreadFactory factory;
 513 
 514     /**
 515      * Sum of per-thread steal counts, updated only when threads are
 516      * idle or terminating.
 517      */
 518     private volatile long stealCount;
 519 
 520     /**
 521      * Encoded record of top of Treiber stack of threads waiting for
 522      * events. The top 32 bits contain the count being waited for. The
 523      * bottom 16 bits contains one plus the pool index of waiting
 524      * worker thread. (Bits 16-31 are unused.)
 525      */
 526     private volatile long eventWaiters;
 527 
 528     private static final int  EVENT_COUNT_SHIFT = 32;
 529     private static final long WAITER_ID_MASK    = (1L << 16) - 1L;
 530 
 531     /**
 532      * A counter for events that may wake up worker threads:
 533      *   - Submission of a new task to the pool
 534      *   - A worker pushing a task on an empty queue
 535      *   - termination
 536      */
 537     private volatile int eventCount;
 538 
 539     /**
 540      * Encoded record of top of Treiber stack of spare threads waiting
 541      * for resumption. The top 16 bits contain an arbitrary count to
 542      * avoid ABA effects. The bottom 16bits contains one plus the pool
 543      * index of waiting worker thread.
 544      */
 545     private volatile int spareWaiters;
 546 
 547     private static final int SPARE_COUNT_SHIFT = 16;
 548     private static final int SPARE_ID_MASK     = (1 << 16) - 1;
 549 


 598      * True if use local fifo, not default lifo, for local polling
 599      * Read by, and replicated by ForkJoinWorkerThreads
 600      */
 601     final boolean locallyFifo;
 602 
 603     /**
 604      * The uncaught exception handler used when any worker abruptly
 605      * terminates.
 606      */
 607     private final Thread.UncaughtExceptionHandler ueh;
 608 
 609     /**
 610      * Pool number, just for assigning useful names to worker threads
 611      */
 612     private final int poolNumber;
 613 
 614     // Utilities for CASing fields. Note that most of these
 615     // are usually manually inlined by callers
 616 
 617     /**
 618      * Increments running count part of workerCounts
 619      */
 620     final void incrementRunningCount() {
 621         int c;
 622         do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 623                                                c = workerCounts,
 624                                                c + ONE_RUNNING));
 625     }
 626 
 627     /**
 628      * Tries to decrement running count unless already zero
 629      */










 630     final boolean tryDecrementRunningCount() {
 631         int wc = workerCounts;
 632         if ((wc & RUNNING_COUNT_MASK) == 0)
 633             return false;
 634         return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 635                                         wc, wc - ONE_RUNNING);
 636     }
 637 
 638     /**
 639      * Forces decrement of encoded workerCounts, awaiting nonzero if
 640      * (rarely) necessary when other count updates lag.
 641      *
 642      * @param dr -- either zero or ONE_RUNNING
 643      * @param dt -- either zero or ONE_TOTAL
 644      */
 645     private void decrementWorkerCounts(int dr, int dt) {
 646         for (;;) {
 647             int wc = workerCounts;
 648             if ((wc & RUNNING_COUNT_MASK)  - dr < 0 ||
 649                 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {


 681         }
 682     }
 683 
 684     // workers array maintenance
 685 
 686     /**
 687      * Records and returns a workers array index for new worker.
 688      */
 689     private int recordWorker(ForkJoinWorkerThread w) {
 690         // Try using slot totalCount-1. If not available, scan and/or resize
 691         int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
 692         final ReentrantLock lock = this.workerLock;
 693         lock.lock();
 694         try {
 695             ForkJoinWorkerThread[] ws = workers;
 696             int n = ws.length;
 697             if (k < 0 || k >= n || ws[k] != null) {
 698                 for (k = 0; k < n && ws[k] != null; ++k)
 699                     ;
 700                 if (k == n)
 701                     ws = Arrays.copyOf(ws, n << 1);
 702             }
 703             ws[k] = w;
 704             workers = ws; // volatile array write ensures slot visibility

 705         } finally {
 706             lock.unlock();
 707         }
 708         return k;
 709     }
 710 
 711     /**
 712      * Nulls out record of worker in workers array.
 713      */
 714     private void forgetWorker(ForkJoinWorkerThread w) {
 715         int idx = w.poolIndex;
 716         // Locking helps method recordWorker avoid unnecessary expansion
 717         final ReentrantLock lock = this.workerLock;
 718         lock.lock();
 719         try {
 720             ForkJoinWorkerThread[] ws = workers;
 721             if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
 722                 ws[idx] = null;
 723         } finally {
 724             lock.unlock();
 725         }
 726     }
 727 
 728     /**
 729      * Final callback from terminating worker.  Removes record of
 730      * worker from array, and adjusts counts. If pool is shutting
 731      * down, tries to complete termination.
 732      *
 733      * @param w the worker
 734      */
 735     final void workerTerminated(ForkJoinWorkerThread w) {
 736         forgetWorker(w);
 737         decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
 738         while (w.stealCount != 0) // collect final count
 739             tryAccumulateStealCount(w);
 740         tryTerminate(false);
 741     }
 742 
 743     // Waiting for and signalling events
 744 
 745     /**
 746      * Releases workers blocked on a count not equal to current count.
 747      * Normally called after precheck that eventWaiters isn't zero to
 748      * avoid wasted array checks. Gives up upon a change in count or
 749      * upon releasing two workers, letting others take over.
 750      */
 751     private void releaseEventWaiters() {
 752         ForkJoinWorkerThread[] ws = workers;
 753         int n = ws.length;
 754         long h = eventWaiters;
 755         int ec = eventCount;
 756         boolean releasedOne = false;
 757         ForkJoinWorkerThread w; int id;
 758         while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
 759                (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
 760                id < n && (w = ws[id]) != null) {
 761             if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
 762                                           h,  w.nextWaiter)) {
 763                 LockSupport.unpark(w);
 764                 if (releasedOne) // exit on second release
 765                     break;
 766                 releasedOne = true;
 767             }
 768             if (eventCount != ec)
 769                 break;
 770             h = eventWaiters;
 771         }
 772     }
 773 
 774     /**
 775      * Tries to advance eventCount and releases waiters. Called only
 776      * from workers.
 777      */
 778     final void signalWork() {
 779         int c; // try to increment event count -- CAS failure OK
 780         UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
 781         if (eventWaiters != 0L)
 782             releaseEventWaiters();
 783     }
 784 
 785     /**
 786      * Adds the given worker to event queue and blocks until
 787      * terminating or event count advances from the given value
 788      *
 789      * @param w the calling worker thread
 790      * @param ec the count
 791      */
 792     private void eventSync(ForkJoinWorkerThread w, int ec) {
 793         long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
 794         long h;
 795         while ((runState < SHUTDOWN || !tryTerminate(false)) &&
 796                (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
 797                 (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
 798                eventCount == ec) {
 799             if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
 800                                           w.nextWaiter = h, nh)) {
 801                 awaitEvent(w, ec);
 802                 break;
 803             }
 804         }
 805     }
 806 
 807     /**
 808      * Blocks the given worker (that has already been entered as an
 809      * event waiter) until terminating or event count advances from
 810      * the given value. The oldest (first) waiter uses a timed wait to
 811      * occasionally one-by-one shrink the number of workers (to a
 812      * minimum of one) if the pool has not been used for extended
 813      * periods.
 814      *
 815      * @param w the calling worker thread
 816      * @param ec the count
 817      */
 818     private void awaitEvent(ForkJoinWorkerThread w, int ec) {
 819         while (eventCount == ec) {
 820             if (tryAccumulateStealCount(w)) { // transfer while idle
 821                 boolean untimed = (w.nextWaiter != 0L ||
 822                                    (workerCounts & RUNNING_COUNT_MASK) <= 1);
 823                 long startTime = untimed? 0 : System.nanoTime();
 824                 Thread.interrupted();         // clear/ignore interrupt
 825                 if (eventCount != ec || w.isTerminating())
 826                     break;                    // recheck after clear
 827                 if (untimed)
 828                     LockSupport.park(w);
 829                 else {
 830                     LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
 831                     if (eventCount != ec || w.isTerminating())
 832                         break;
 833                     if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
 834                         tryShutdownUnusedWorker(ec);
 835                 }
 836             }
 837         }
 838     }
 839 
 840     // Maintaining parallelism
 841 
 842     /**
 843      * Pushes worker onto the spare stack.
 844      */
 845     final void pushSpare(ForkJoinWorkerThread w) {
 846         int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
 847         do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 848                                                w.nextSpare = spareWaiters,ns));
 849     }
 850 
 851     /**
 852      * Tries (once) to resume a spare if the number of running
 853      * threads is less than target.
 854      */
 855     private void tryResumeSpare() {
 856         int sw, id;
 857         ForkJoinWorkerThread[] ws = workers;
 858         int n = ws.length;
 859         ForkJoinWorkerThread w;
 860         if ((sw = spareWaiters) != 0 &&
 861             (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
 862             id < n && (w = ws[id]) != null &&
 863             (workerCounts & RUNNING_COUNT_MASK) < parallelism &&

 864             spareWaiters == sw &&
 865             UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 866                                      sw, w.nextSpare)) {
 867             int c; // increment running count before resume
 868             do {} while (!UNSAFE.compareAndSwapInt
 869                          (this, workerCountsOffset,
 870                           c = workerCounts, c + ONE_RUNNING));
 871             if (w.tryUnsuspend())
 872                 LockSupport.unpark(w);
 873             else   // back out if w was shutdown
 874                 decrementWorkerCounts(ONE_RUNNING, 0);
 875         }
 876     }
 877 
 878     /**
 879      * Tries to increase the number of running workers if below target
 880      * parallelism: If a spare exists tries to resume it via
 881      * tryResumeSpare.  Otherwise, if not enough total workers or all
 882      * existing workers are busy, adds a new worker. In all cases also
 883      * helps wake up releasable workers waiting for work.


 897                                               wc + (ONE_RUNNING|ONE_TOTAL))) {
 898                 ForkJoinWorkerThread w = null;
 899                 Throwable fail = null;
 900                 try {
 901                     w = factory.newThread(this);
 902                 } catch (Throwable ex) {
 903                     fail = ex;
 904                 }
 905                 if (w == null) { // null or exceptional factory return
 906                     decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
 907                     tryTerminate(false); // handle failure during shutdown
 908                     // If originating from an external caller,
 909                     // propagate exception, else ignore
 910                     if (fail != null && runState < TERMINATING &&
 911                         !(Thread.currentThread() instanceof
 912                           ForkJoinWorkerThread))
 913                         UNSAFE.throwException(fail);
 914                     break;
 915                 }
 916                 w.start(recordWorker(w), ueh);
 917                 if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
 918                     int c; // advance event count
 919                     UNSAFE.compareAndSwapInt(this, eventCountOffset,
 920                                              c = eventCount, c+1);
 921                     break; // add at most one unless total below target
 922                 }
 923             }
 924         }
 925         if (eventWaiters != 0L)
 926             releaseEventWaiters();
 927     }
 928 
 929     /**
 930      * Callback from the oldest waiter in awaitEvent waking up after a
 931      * period of non-use. If all workers are idle, tries (once) to
 932      * shutdown an event waiter or a spare, if one exists. Note that
 933      * we don't need CAS or locks here because the method is called
 934      * only from one thread occasionally waking (and even misfires are
 935      * OK). Note that until the shutdown worker fully terminates,
 936      * workerCounts will overestimate total count, which is tolerable.
 937      *
 938      * @param ec the event count waited on by caller (to abort
 939      * attempt if count has since changed).
 940      */
 941     private void tryShutdownUnusedWorker(int ec) {
 942         if (runState == 0 && eventCount == ec) { // only trigger if all idle
 943             ForkJoinWorkerThread[] ws = workers;
 944             int n = ws.length;
 945             ForkJoinWorkerThread w = null;
 946             boolean shutdown = false;
 947             int sw;
 948             long h;
 949             if ((sw = spareWaiters) != 0) { // prefer killing spares
 950                 int id = (sw & SPARE_ID_MASK) - 1;
 951                 if (id >= 0 && id < n && (w = ws[id]) != null &&
 952                     UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 953                                              sw, w.nextSpare))
 954                     shutdown = true;
 955             }
 956             else if ((h = eventWaiters) != 0L) {
 957                 long nh;
 958                 int id = ((int)(h & WAITER_ID_MASK)) - 1;
 959                 if (id >= 0 && id < n && (w = ws[id]) != null &&
 960                     (nh = w.nextWaiter) != 0L && // keep at least one worker
 961                     UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
 962                     shutdown = true;
 963             }
 964             if (w != null && shutdown) {
 965                 w.shutdown();
 966                 LockSupport.unpark(w);
 967             }
 968         }
 969         releaseEventWaiters(); // in case of interference
 970     }
 971 
 972     /**
 973      * Callback from workers invoked upon each top-level action (i.e.,
 974      * stealing a task or taking a submission and running it).
 975      * Performs one or more of the following:
 976      *
 977      * 1. If the worker is active and either did not run a task
 978      *    or there are too many workers, try to set its active status


 991      *    eventSync if necessary (first forcing inactivation), upon
 992      *    which the worker may be shutdown via
 993      *    tryShutdownUnusedWorker.  Otherwise, help release any
 994      *    existing event waiters that are now releasable,
 995      *
 996      * @param w the worker
 997      * @param ran true if worker ran a task since last call to this method
 998      */
 999     final void preStep(ForkJoinWorkerThread w, boolean ran) {
1000         int wec = w.lastEventCount;
1001         boolean active = w.active;
1002         boolean inactivate = false;
1003         int pc = parallelism;
1004         while (w.runState == 0) {
1005             int rs = runState;
1006             if (rs >= TERMINATING) { // propagate shutdown
1007                 w.shutdown();
1008                 break;
1009             }
1010             if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
1011                 UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
1012                 inactivate = active = w.active = false;
1013             int wc = workerCounts;





1014             if ((wc & RUNNING_COUNT_MASK) > pc) {
1015                 if (!(inactivate |= active) && // must inactivate to suspend
1016                     workerCounts == wc &&      // try to suspend as spare
1017                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1018                                              wc, wc - ONE_RUNNING))
1019                     w.suspendAsSpare();
1020             }
1021             else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1022                 helpMaintainParallelism();     // not enough workers
1023             else if (!ran) {


1024                 long h = eventWaiters;
1025                 int ec = eventCount;
1026                 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
1027                     releaseEventWaiters();     // release others before waiting
1028                 else if (ec != wec) {
1029                     w.lastEventCount = ec;     // no need to wait
1030                     break;
1031                 }
1032                 else if (!(inactivate |= active))
1033                     eventSync(w, wec);         // must inactivate before sync
1034             }
1035             else
1036                 break;
1037         }
1038     }
1039 
1040     /**
1041      * Helps and/or blocks awaiting join of the given task.
1042      * See above for explanation.
1043      *
1044      * @param joinMe the task to join
1045      * @param worker the current worker thread


1046      */
1047     final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {


1048         int retries = 2 + (parallelism >> 2); // #helpJoins before blocking

1049         while (joinMe.status >= 0) {
1050             int wc;
1051             worker.helpJoinTask(joinMe);



1052             if (joinMe.status < 0)
1053                 break;
1054             else if (retries > 0)
1055                 --retries;
1056             else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1057                      UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058                                               wc, wc - ONE_RUNNING)) {
1059                 int stat, c; long h;
1060                 while ((stat = joinMe.status) >= 0 &&
1061                        (h = eventWaiters) != 0L && // help release others
1062                        (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)





1063                     releaseEventWaiters();
1064                 if (stat >= 0 &&
1065                     ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1066                      (stat =
1067                       joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1068                     helpMaintainParallelism(); // timeout or no running workers






















1069                 do {} while (!UNSAFE.compareAndSwapInt
1070                              (this, workerCountsOffset,
1071                               c = workerCounts, c + ONE_RUNNING));
1072                 if (stat < 0)
1073                     break;   // else restart
1074             }
1075         }
1076     }
1077 
1078     /**
1079      * Same idea as awaitJoin, but no helping, retries, or timeouts.
1080      */
1081     final void awaitBlocker(ManagedBlocker blocker)
1082         throws InterruptedException {
1083         while (!blocker.isReleasable()) {
1084             int wc = workerCounts;
1085             if ((wc & RUNNING_COUNT_MASK) != 0 &&
1086                 UNSAFE.compareAndSwapInt(this, workerCountsOffset,

1087                                          wc, wc - ONE_RUNNING)) {
1088                 try {
1089                     while (!blocker.isReleasable()) {
1090                         long h = eventWaiters;
1091                         if (h != 0L &&
1092                             (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1093                             releaseEventWaiters();
1094                         else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1095                                  runState < TERMINATING)
1096                             helpMaintainParallelism();
1097                         else if (blocker.block())
1098                             break;
1099                     }
1100                 } finally {
1101                     int c;
1102                     do {} while (!UNSAFE.compareAndSwapInt
1103                                  (this, workerCountsOffset,
1104                                   c = workerCounts, c + ONE_RUNNING));
1105                 }
1106                 break;


1112      * Possibly initiates and/or completes termination.
1113      *
1114      * @param now if true, unconditionally terminate, else only
1115      * if shutdown and empty queue and no active workers
1116      * @return true if now terminating or terminated
1117      */
1118     private boolean tryTerminate(boolean now) {
1119         if (now)
1120             advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1121         else if (runState < SHUTDOWN ||
1122                  !submissionQueue.isEmpty() ||
1123                  (runState & ACTIVE_COUNT_MASK) != 0)
1124             return false;
1125 
1126         if (advanceRunLevel(TERMINATING))
1127             startTerminating();
1128 
1129         // Finish now if all threads terminated; else in some subsequent call
1130         if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1131             advanceRunLevel(TERMINATED);
1132             termination.arrive();
1133         }
1134         return true;
1135     }
1136 
1137 
1138     /**
1139      * Actions on transition to TERMINATING
1140      *
1141      * Runs up to four passes through workers: (0) shutting down each
1142      * (without waking up if parked) to quickly spread notifications
1143      * without unnecessary bouncing around event queues etc (1) wake
1144      * up and help cancel tasks (2) interrupt (3) mop up races with
1145      * interrupted workers
1146      */
1147     private void startTerminating() {
1148         cancelSubmissions();
1149         for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1150             int c; // advance event count
1151             UNSAFE.compareAndSwapInt(this, eventCountOffset,
1152                                      c = eventCount, c+1);
1153             eventWaiters = 0L; // clobber lists
1154             spareWaiters = 0;
1155             for (ForkJoinWorkerThread w : workers) {
1156                 if (w != null) {
1157                     w.shutdown();


1308     }
1309 
1310     /**
1311      * Returns initial power of two size for workers array.
1312      * @param pc the initial parallelism level
1313      */
1314     private static int initialArraySizeFor(int pc) {
1315         // If possible, initially allocate enough space for one spare
1316         int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1317         // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1318         size |= size >>> 1;
1319         size |= size >>> 2;
1320         size |= size >>> 4;
1321         size |= size >>> 8;
1322         return size + 1;
1323     }
1324 
1325     // Execution methods
1326 
1327     /**
1328      * Common code for execute, invoke and submit
1329      */
1330     private <T> void doSubmit(ForkJoinTask<T> task) {
1331         if (task == null)
1332             throw new NullPointerException();
1333         if (runState >= SHUTDOWN)
1334             throw new RejectedExecutionException();
1335         submissionQueue.offer(task);
1336         int c; // try to increment event count -- CAS failure OK
1337         UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1338         helpMaintainParallelism(); // create, start, or resume some workers
1339     }
1340 
1341     /**
1342      * Performs the given task, returning its result upon completion.
1343      *
1344      * @param task the task
1345      * @return the task's result
1346      * @throws NullPointerException if the task is null
1347      * @throws RejectedExecutionException if the task cannot be
1348      *         scheduled for execution
1349      */
1350     public <T> T invoke(ForkJoinTask<T> task) {









1351         doSubmit(task);
1352         return task.join();
1353     }

1354 
1355     /**















1356      * Arranges for (asynchronous) execution of the given task.
1357      *
1358      * @param task the task
1359      * @throws NullPointerException if the task is null
1360      * @throws RejectedExecutionException if the task cannot be
1361      *         scheduled for execution
1362      */
1363     public void execute(ForkJoinTask<?> task) {
1364         doSubmit(task);


1365     }
1366 
1367     // AbstractExecutorService methods
1368 
1369     /**
1370      * @throws NullPointerException if the task is null
1371      * @throws RejectedExecutionException if the task cannot be
1372      *         scheduled for execution
1373      */
1374     public void execute(Runnable task) {


1375         ForkJoinTask<?> job;
1376         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1377             job = (ForkJoinTask<?>) task;
1378         else
1379             job = ForkJoinTask.adapt(task, null);
1380         doSubmit(job);
1381     }
1382 
1383     /**
1384      * Submits a ForkJoinTask for execution.
1385      *
1386      * @param task the task to submit
1387      * @return the task
1388      * @throws NullPointerException if the task is null
1389      * @throws RejectedExecutionException if the task cannot be
1390      *         scheduled for execution
1391      */
1392     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1393         doSubmit(task);


1394         return task;
1395     }
1396 
1397     /**
1398      * @throws NullPointerException if the task is null
1399      * @throws RejectedExecutionException if the task cannot be
1400      *         scheduled for execution
1401      */
1402     public <T> ForkJoinTask<T> submit(Callable<T> task) {


1403         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1404         doSubmit(job);
1405         return job;
1406     }
1407 
1408     /**
1409      * @throws NullPointerException if the task is null
1410      * @throws RejectedExecutionException if the task cannot be
1411      *         scheduled for execution
1412      */
1413     public <T> ForkJoinTask<T> submit(Runnable task, T result) {


1414         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1415         doSubmit(job);
1416         return job;
1417     }
1418 
1419     /**
1420      * @throws NullPointerException if the task is null
1421      * @throws RejectedExecutionException if the task cannot be
1422      *         scheduled for execution
1423      */
1424     public ForkJoinTask<?> submit(Runnable task) {


1425         ForkJoinTask<?> job;
1426         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1427             job = (ForkJoinTask<?>) task;
1428         else
1429             job = ForkJoinTask.adapt(task, null);
1430         doSubmit(job);
1431         return job;
1432     }
1433 
1434     /**
1435      * @throws NullPointerException       {@inheritDoc}
1436      * @throws RejectedExecutionException {@inheritDoc}
1437      */
1438     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1439         ArrayList<ForkJoinTask<T>> forkJoinTasks =
1440             new ArrayList<ForkJoinTask<T>>(tasks.size());
1441         for (Callable<T> task : tasks)
1442             forkJoinTasks.add(ForkJoinTask.adapt(task));
1443         invoke(new InvokeAll<T>(forkJoinTasks));
1444 
1445         @SuppressWarnings({"unchecked", "rawtypes"})
1446             List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1447         return futures;
1448     }
1449 
1450     static final class InvokeAll<T> extends RecursiveAction {


1708     public List<Runnable> shutdownNow() {
1709         checkPermission();
1710         tryTerminate(true);
1711         return Collections.emptyList();
1712     }
1713 
1714     /**
1715      * Returns {@code true} if all tasks have completed following shut down.
1716      *
1717      * @return {@code true} if all tasks have completed following shut down
1718      */
1719     public boolean isTerminated() {
1720         return runState >= TERMINATED;
1721     }
1722 
1723     /**
1724      * Returns {@code true} if the process of termination has
1725      * commenced but not yet completed.  This method may be useful for
1726      * debugging. A return of {@code true} reported a sufficient
1727      * period after shutdown may indicate that submitted tasks have
1728      * ignored or suppressed interruption, causing this executor not
1729      * to properly terminate.



1730      *
1731      * @return {@code true} if terminating but not yet terminated
1732      */
1733     public boolean isTerminating() {
1734         return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1735     }
1736 
1737     /**
1738      * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1739      */
1740     final boolean isAtLeastTerminating() {
1741         return runState >= TERMINATING;
1742     }
1743 
1744     /**
1745      * Returns {@code true} if this pool has been shut down.
1746      *
1747      * @return {@code true} if this pool has been shut down
1748      */
1749     public boolean isShutdown() {
1750         return runState >= SHUTDOWN;
1751     }
1752 
1753     /**
1754      * Blocks until all tasks have completed execution after a shutdown
1755      * request, or the timeout occurs, or the current thread is
1756      * interrupted, whichever happens first.
1757      *
1758      * @param timeout the maximum time to wait
1759      * @param unit the time unit of the timeout argument
1760      * @return {@code true} if this executor terminated and
1761      *         {@code false} if the timeout elapsed before termination
1762      * @throws InterruptedException if interrupted while waiting
1763      */
1764     public boolean awaitTermination(long timeout, TimeUnit unit)
1765         throws InterruptedException {
1766         try {
1767             return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1768         } catch (TimeoutException ex) {
1769             return false;
1770         }

1771     }
1772 
1773     /**
1774      * Interface for extending managed parallelism for tasks running
1775      * in {@link ForkJoinPool}s.
1776      *
1777      * <p>A {@code ManagedBlocker} provides two methods.  Method
1778      * {@code isReleasable} must return {@code true} if blocking is
1779      * not necessary. Method {@code block} blocks the current thread
1780      * if necessary (perhaps internally invoking {@code isReleasable}
1781      * before actually blocking). The unusual methods in this API
1782      * accommodate synchronizers that may, but don't usually, block
1783      * for long periods. Similarly, they allow more efficient internal
1784      * handling of cases in which additional workers may be, but
1785      * usually are not, needed to ensure sufficient parallelism.
1786      * Toward this end, implementations of method {@code isReleasable}
1787      * must be amenable to repeated invocation.
1788      *
1789      * <p>For example, here is a ManagedBlocker based on a
1790      * ReentrantLock:




 509     /**
 510      * Creation factory for worker threads.
 511      */
 512     private final ForkJoinWorkerThreadFactory factory;
 513 
 514     /**
 515      * Sum of per-thread steal counts, updated only when threads are
 516      * idle or terminating.
 517      */
 518     private volatile long stealCount;
 519 
 520     /**
 521      * Encoded record of top of Treiber stack of threads waiting for
 522      * events. The top 32 bits contain the count being waited for. The
 523      * bottom 16 bits contains one plus the pool index of waiting
 524      * worker thread. (Bits 16-31 are unused.)
 525      */
 526     private volatile long eventWaiters;
 527 
 528     private static final int EVENT_COUNT_SHIFT = 32;
 529     private static final int WAITER_ID_MASK    = (1 << 16) - 1;
 530 
 531     /**
 532      * A counter for events that may wake up worker threads:
 533      *   - Submission of a new task to the pool
 534      *   - A worker pushing a task on an empty queue
 535      *   - termination
 536      */
 537     private volatile int eventCount;
 538 
 539     /**
 540      * Encoded record of top of Treiber stack of spare threads waiting
 541      * for resumption. The top 16 bits contain an arbitrary count to
 542      * avoid ABA effects. The bottom 16bits contains one plus the pool
 543      * index of waiting worker thread.
 544      */
 545     private volatile int spareWaiters;
 546 
 547     private static final int SPARE_COUNT_SHIFT = 16;
 548     private static final int SPARE_ID_MASK     = (1 << 16) - 1;
 549 


 598      * True if use local fifo, not default lifo, for local polling
 599      * Read by, and replicated by ForkJoinWorkerThreads
 600      */
 601     final boolean locallyFifo;
 602 
 603     /**
 604      * The uncaught exception handler used when any worker abruptly
 605      * terminates.
 606      */
 607     private final Thread.UncaughtExceptionHandler ueh;
 608 
 609     /**
 610      * Pool number, just for assigning useful names to worker threads
 611      */
 612     private final int poolNumber;
 613 
 614     // Utilities for CASing fields. Note that most of these
 615     // are usually manually inlined by callers
 616 
 617     /**
 618      * Increments running count part of workerCounts.
 619      */
 620     final void incrementRunningCount() {
 621         int c;
 622         do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 623                                                c = workerCounts,
 624                                                c + ONE_RUNNING));
 625     }
 626 
 627     /**
 628      * Tries to increment running count part of workerCounts.
 629      */
 630     final boolean tryIncrementRunningCount() {
 631         int c;
 632         return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 633                                         c = workerCounts,
 634                                         c + ONE_RUNNING);
 635     }
 636 
 637     /**
 638      * Tries to decrement running count unless already zero.
 639      */
 640     final boolean tryDecrementRunningCount() {
 641         int wc = workerCounts;
 642         if ((wc & RUNNING_COUNT_MASK) == 0)
 643             return false;
 644         return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
 645                                         wc, wc - ONE_RUNNING);
 646     }
 647 
 648     /**
 649      * Forces decrement of encoded workerCounts, awaiting nonzero if
 650      * (rarely) necessary when other count updates lag.
 651      *
 652      * @param dr -- either zero or ONE_RUNNING
 653      * @param dt -- either zero or ONE_TOTAL
 654      */
 655     private void decrementWorkerCounts(int dr, int dt) {
 656         for (;;) {
 657             int wc = workerCounts;
 658             if ((wc & RUNNING_COUNT_MASK)  - dr < 0 ||
 659                 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {


 691         }
 692     }
 693 
 694     // workers array maintenance
 695 
 696     /**
 697      * Records and returns a workers array index for new worker.
 698      */
 699     private int recordWorker(ForkJoinWorkerThread w) {
 700         // Try using slot totalCount-1. If not available, scan and/or resize
 701         int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
 702         final ReentrantLock lock = this.workerLock;
 703         lock.lock();
 704         try {
 705             ForkJoinWorkerThread[] ws = workers;
 706             int n = ws.length;
 707             if (k < 0 || k >= n || ws[k] != null) {
 708                 for (k = 0; k < n && ws[k] != null; ++k)
 709                     ;
 710                 if (k == n)
 711                     ws = workers = Arrays.copyOf(ws, n << 1);
 712             }
 713             ws[k] = w;
 714             int c = eventCount; // advance event count to ensure visibility
 715             UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
 716         } finally {
 717             lock.unlock();
 718         }
 719         return k;
 720     }
 721 
 722     /**
 723      * Nulls out record of worker in workers array.
 724      */
 725     private void forgetWorker(ForkJoinWorkerThread w) {
 726         int idx = w.poolIndex;
 727         // Locking helps method recordWorker avoid unnecessary expansion
 728         final ReentrantLock lock = this.workerLock;
 729         lock.lock();
 730         try {
 731             ForkJoinWorkerThread[] ws = workers;
 732             if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
 733                 ws[idx] = null;
 734         } finally {
 735             lock.unlock();
 736         }
 737     }
 738 
 739     /**
 740      * Final callback from terminating worker.  Removes record of
 741      * worker from array, and adjusts counts. If pool is shutting
 742      * down, tries to complete termination.
 743      *
 744      * @param w the worker
 745      */
 746     final void workerTerminated(ForkJoinWorkerThread w) {
 747         forgetWorker(w);
 748         decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
 749         while (w.stealCount != 0) // collect final count
 750             tryAccumulateStealCount(w);
 751         tryTerminate(false);
 752     }
 753 
 754     // Waiting for and signalling events
 755 
 756     /**
 757      * Releases workers blocked on a count not equal to current count.
 758      * Normally called after precheck that eventWaiters isn't zero to
 759      * avoid wasted array checks. Gives up upon a change in count or
 760      * upon releasing four workers, letting others take over.
 761      */
 762     private void releaseEventWaiters() {
 763         ForkJoinWorkerThread[] ws = workers;
 764         int n = ws.length;
 765         long h = eventWaiters;
 766         int ec = eventCount;
 767         int releases = 4;
 768         ForkJoinWorkerThread w; int id;
 769         while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
 770                (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
 771                id < n && (w = ws[id]) != null) {
 772             if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
 773                                           h,  w.nextWaiter)) {
 774                 LockSupport.unpark(w);
 775                 if (--releases == 0)
 776                     break;

 777             }
 778             if (eventCount != ec)
 779                 break;
 780             h = eventWaiters;
 781         }
 782     }
 783 
 784     /**
 785      * Tries to advance eventCount and releases waiters. Called only
 786      * from workers.
 787      */
 788     final void signalWork() {
 789         int c; // try to increment event count -- CAS failure OK
 790         UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
 791         if (eventWaiters != 0L)
 792             releaseEventWaiters();
 793     }
 794 
 795     /**
 796      * Adds the given worker to event queue and blocks until
 797      * terminating or event count advances from the given value
 798      *
 799      * @param w the calling worker thread
 800      * @param ec the count
 801      */
 802     private void eventSync(ForkJoinWorkerThread w, int ec) {
 803         long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
 804         long h;
 805         while ((runState < SHUTDOWN || !tryTerminate(false)) &&
 806                (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
 807                 (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
 808                eventCount == ec) {
 809             if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
 810                                           w.nextWaiter = h, nh)) {
 811                 awaitEvent(w, ec);
 812                 break;
 813             }
 814         }
 815     }
 816 
 817     /**
 818      * Blocks the given worker (that has already been entered as an
 819      * event waiter) until terminating or event count advances from
 820      * the given value. The oldest (first) waiter uses a timed wait to
 821      * occasionally one-by-one shrink the number of workers (to a
 822      * minimum of one) if the pool has not been used for extended
 823      * periods.
 824      *
 825      * @param w the calling worker thread
 826      * @param ec the count
 827      */
 828     private void awaitEvent(ForkJoinWorkerThread w, int ec) {
 829         while (eventCount == ec) {
 830             if (tryAccumulateStealCount(w)) { // transfer while idle
 831                 boolean untimed = (w.nextWaiter != 0L ||
 832                                    (workerCounts & RUNNING_COUNT_MASK) <= 1);
 833                 long startTime = untimed ? 0 : System.nanoTime();
 834                 Thread.interrupted();         // clear/ignore interrupt
 835                 if (w.isTerminating() || eventCount != ec)
 836                     break;                    // recheck after clear
 837                 if (untimed)
 838                     LockSupport.park(w);
 839                 else {
 840                     LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
 841                     if (eventCount != ec || w.isTerminating())
 842                         break;
 843                     if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
 844                         tryShutdownUnusedWorker(ec);
 845                 }
 846             }
 847         }
 848     }
 849 
 850     // Maintaining parallelism
 851 
 852     /**
 853      * Pushes worker onto the spare stack.
 854      */
 855     final void pushSpare(ForkJoinWorkerThread w) {
 856         int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
 857         do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 858                                                w.nextSpare = spareWaiters,ns));
 859     }
 860 
 861     /**
 862      * Tries (once) to resume a spare if the number of running
 863      * threads is less than target.
 864      */
 865     private void tryResumeSpare() {
 866         int sw, id;
 867         ForkJoinWorkerThread[] ws = workers;
 868         int n = ws.length;
 869         ForkJoinWorkerThread w;
 870         if ((sw = spareWaiters) != 0 &&
 871             (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
 872             id < n && (w = ws[id]) != null &&
 873             (runState >= TERMINATING ||
 874              (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
 875             spareWaiters == sw &&
 876             UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 877                                      sw, w.nextSpare)) {
 878             int c; // increment running count before resume
 879             do {} while (!UNSAFE.compareAndSwapInt
 880                          (this, workerCountsOffset,
 881                           c = workerCounts, c + ONE_RUNNING));
 882             if (w.tryUnsuspend())
 883                 LockSupport.unpark(w);
 884             else   // back out if w was shutdown
 885                 decrementWorkerCounts(ONE_RUNNING, 0);
 886         }
 887     }
 888 
 889     /**
 890      * Tries to increase the number of running workers if below target
 891      * parallelism: If a spare exists tries to resume it via
 892      * tryResumeSpare.  Otherwise, if not enough total workers or all
 893      * existing workers are busy, adds a new worker. In all cases also
 894      * helps wake up releasable workers waiting for work.


 908                                               wc + (ONE_RUNNING|ONE_TOTAL))) {
 909                 ForkJoinWorkerThread w = null;
 910                 Throwable fail = null;
 911                 try {
 912                     w = factory.newThread(this);
 913                 } catch (Throwable ex) {
 914                     fail = ex;
 915                 }
 916                 if (w == null) { // null or exceptional factory return
 917                     decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
 918                     tryTerminate(false); // handle failure during shutdown
 919                     // If originating from an external caller,
 920                     // propagate exception, else ignore
 921                     if (fail != null && runState < TERMINATING &&
 922                         !(Thread.currentThread() instanceof
 923                           ForkJoinWorkerThread))
 924                         UNSAFE.throwException(fail);
 925                     break;
 926                 }
 927                 w.start(recordWorker(w), ueh);
 928                 if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)



 929                     break; // add at most one unless total below target
 930             }
 931         }

 932         if (eventWaiters != 0L)
 933             releaseEventWaiters();
 934     }
 935 
 936     /**
 937      * Callback from the oldest waiter in awaitEvent waking up after a
 938      * period of non-use. If all workers are idle, tries (once) to
 939      * shutdown an event waiter or a spare, if one exists. Note that
 940      * we don't need CAS or locks here because the method is called
 941      * only from one thread occasionally waking (and even misfires are
 942      * OK). Note that until the shutdown worker fully terminates,
 943      * workerCounts will overestimate total count, which is tolerable.
 944      *
 945      * @param ec the event count waited on by caller (to abort
 946      * attempt if count has since changed).
 947      */
 948     private void tryShutdownUnusedWorker(int ec) {
 949         if (runState == 0 && eventCount == ec) { // only trigger if all idle
 950             ForkJoinWorkerThread[] ws = workers;
 951             int n = ws.length;
 952             ForkJoinWorkerThread w = null;
 953             boolean shutdown = false;
 954             int sw;
 955             long h;
 956             if ((sw = spareWaiters) != 0) { // prefer killing spares
 957                 int id = (sw & SPARE_ID_MASK) - 1;
 958                 if (id >= 0 && id < n && (w = ws[id]) != null &&
 959                     UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
 960                                              sw, w.nextSpare))
 961                     shutdown = true;
 962             }
 963             else if ((h = eventWaiters) != 0L) {
 964                 long nh;
 965                 int id = (((int)h) & WAITER_ID_MASK) - 1;
 966                 if (id >= 0 && id < n && (w = ws[id]) != null &&
 967                     (nh = w.nextWaiter) != 0L && // keep at least one worker
 968                     UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
 969                     shutdown = true;
 970             }
 971             if (w != null && shutdown) {
 972                 w.shutdown();
 973                 LockSupport.unpark(w);
 974             }
 975         }
 976         releaseEventWaiters(); // in case of interference
 977     }
 978 
 979     /**
 980      * Callback from workers invoked upon each top-level action (i.e.,
 981      * stealing a task or taking a submission and running it).
 982      * Performs one or more of the following:
 983      *
 984      * 1. If the worker is active and either did not run a task
 985      *    or there are too many workers, try to set its active status


 998      *    eventSync if necessary (first forcing inactivation), upon
 999      *    which the worker may be shutdown via
1000      *    tryShutdownUnusedWorker.  Otherwise, help release any
1001      *    existing event waiters that are now releasable,
1002      *
1003      * @param w the worker
1004      * @param ran true if worker ran a task since last call to this method
1005      */
1006     final void preStep(ForkJoinWorkerThread w, boolean ran) {
1007         int wec = w.lastEventCount;
1008         boolean active = w.active;
1009         boolean inactivate = false;
1010         int pc = parallelism;
1011         while (w.runState == 0) {
1012             int rs = runState;
1013             if (rs >= TERMINATING) {           // propagate shutdown
1014                 w.shutdown();
1015                 break;
1016             }
1017             if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
1018                 UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
1019                 inactivate = active = w.active = false;
1020                 if (rs == SHUTDOWN) {          // all inactive and shut down
1021                     tryTerminate(false);
1022                     continue;
1023                 }
1024             }
1025             int wc = workerCounts;             // try to suspend as spare
1026             if ((wc & RUNNING_COUNT_MASK) > pc) {
1027                 if (!(inactivate |= active) && // must inactivate to suspend
1028                     workerCounts == wc &&
1029                     UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1030                                              wc, wc - ONE_RUNNING))
1031                     w.suspendAsSpare();
1032             }
1033             else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1034                 helpMaintainParallelism();     // not enough workers
1035             else if (ran)
1036                 break;
1037             else {
1038                 long h = eventWaiters;
1039                 int ec = eventCount;
1040                 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
1041                     releaseEventWaiters();     // release others before waiting
1042                 else if (ec != wec) {
1043                     w.lastEventCount = ec;     // no need to wait
1044                     break;
1045                 }
1046                 else if (!(inactivate |= active))
1047                     eventSync(w, wec);         // must inactivate before sync
1048             }


1049         }
1050     }
1051 
1052     /**
1053      * Helps and/or blocks awaiting join of the given task.
1054      * See above for explanation.
1055      *
1056      * @param joinMe the task to join
1057      * @param worker the current worker thread
1058      * @param timed true if wait should time out
1059      * @param nanos timeout value if timed
1060      */
1061     final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
1062                          boolean timed, long nanos) {
1063         long startTime = timed ? System.nanoTime() : 0L;
1064         int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1065         boolean running = true;               // false when count decremented
1066         while (joinMe.status >= 0) {
1067             if (runState >= TERMINATING) {
1068                 joinMe.cancelIgnoringExceptions();
1069                 break;
1070             }
1071             running = worker.helpJoinTask(joinMe, running);
1072             if (joinMe.status < 0)
1073                 break;
1074             if (retries > 0) {
1075                 --retries;
1076                 continue;
1077             }
1078             int wc = workerCounts;
1079             if ((wc & RUNNING_COUNT_MASK) != 0) {
1080                 if (running) {
1081                     if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1082                                                   wc, wc - ONE_RUNNING))
1083                         continue;
1084                     running = false;
1085                 }
1086                 long h = eventWaiters;
1087                 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1088                     releaseEventWaiters();
1089                 if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
1090                     long ms; int ns;
1091                     if (!timed) {
1092                         ms = JOIN_TIMEOUT_MILLIS;
1093                         ns = 0;
1094                     }
1095                     else { // at most JOIN_TIMEOUT_MILLIS per wait
1096                         long nt = nanos - (System.nanoTime() - startTime);
1097                         if (nt <= 0L)
1098                             break;
1099                         ms = nt / 1000000;
1100                         if (ms > JOIN_TIMEOUT_MILLIS) {
1101                             ms = JOIN_TIMEOUT_MILLIS;
1102                             ns = 0;
1103                         }
1104                         else
1105                             ns = (int) (nt % 1000000);
1106                     }
1107                     joinMe.internalAwaitDone(ms, ns);
1108                 }
1109                 if (joinMe.status < 0)
1110                     break;
1111             }
1112             helpMaintainParallelism();
1113         }
1114         if (!running) {
1115             int c;
1116             do {} while (!UNSAFE.compareAndSwapInt
1117                          (this, workerCountsOffset,
1118                           c = workerCounts, c + ONE_RUNNING));


1119         }
1120     }

1121 
1122     /**
1123      * Same idea as awaitJoin, but no helping, retries, or timeouts.
1124      */
1125     final void awaitBlocker(ManagedBlocker blocker)
1126         throws InterruptedException {
1127         while (!blocker.isReleasable()) {
1128             int wc = workerCounts;
1129             if ((wc & RUNNING_COUNT_MASK) == 0)
1130                 helpMaintainParallelism();
1131             else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1132                                               wc, wc - ONE_RUNNING)) {
1133                 try {
1134                     while (!blocker.isReleasable()) {
1135                         long h = eventWaiters;
1136                         if (h != 0L &&
1137                             (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1138                             releaseEventWaiters();
1139                         else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1140                                  runState < TERMINATING)
1141                             helpMaintainParallelism();
1142                         else if (blocker.block())
1143                             break;
1144                     }
1145                 } finally {
1146                     int c;
1147                     do {} while (!UNSAFE.compareAndSwapInt
1148                                  (this, workerCountsOffset,
1149                                   c = workerCounts, c + ONE_RUNNING));
1150                 }
1151                 break;


1157      * Possibly initiates and/or completes termination.
1158      *
1159      * @param now if true, unconditionally terminate, else only
1160      * if shutdown and empty queue and no active workers
1161      * @return true if now terminating or terminated
1162      */
1163     private boolean tryTerminate(boolean now) {
1164         if (now)
1165             advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1166         else if (runState < SHUTDOWN ||
1167                  !submissionQueue.isEmpty() ||
1168                  (runState & ACTIVE_COUNT_MASK) != 0)
1169             return false;
1170 
1171         if (advanceRunLevel(TERMINATING))
1172             startTerminating();
1173 
1174         // Finish now if all threads terminated; else in some subsequent call
1175         if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1176             advanceRunLevel(TERMINATED);
1177             termination.forceTermination();
1178         }
1179         return true;
1180     }
1181 

1182     /**
1183      * Actions on transition to TERMINATING
1184      *
1185      * Runs up to four passes through workers: (0) shutting down each
1186      * (without waking up if parked) to quickly spread notifications
1187      * without unnecessary bouncing around event queues etc (1) wake
1188      * up and help cancel tasks (2) interrupt (3) mop up races with
1189      * interrupted workers
1190      */
1191     private void startTerminating() {
1192         cancelSubmissions();
1193         for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1194             int c; // advance event count
1195             UNSAFE.compareAndSwapInt(this, eventCountOffset,
1196                                      c = eventCount, c+1);
1197             eventWaiters = 0L; // clobber lists
1198             spareWaiters = 0;
1199             for (ForkJoinWorkerThread w : workers) {
1200                 if (w != null) {
1201                     w.shutdown();


1352     }
1353 
1354     /**
1355      * Returns initial power of two size for workers array.
1356      * @param pc the initial parallelism level
1357      */
1358     private static int initialArraySizeFor(int pc) {
1359         // If possible, initially allocate enough space for one spare
1360         int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1361         // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1362         size |= size >>> 1;
1363         size |= size >>> 2;
1364         size |= size >>> 4;
1365         size |= size >>> 8;
1366         return size + 1;
1367     }
1368 
1369     // Execution methods
1370 
1371     /**
1372      * Submits task and creates, starts, or resumes some workers if necessary
1373      */
1374     private <T> void doSubmit(ForkJoinTask<T> task) {




1375         submissionQueue.offer(task);
1376         int c; // try to increment event count -- CAS failure OK
1377         UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1378         helpMaintainParallelism();
1379     }
1380 
1381     /**
1382      * Performs the given task, returning its result upon completion.
1383      *
1384      * @param task the task
1385      * @return the task's result
1386      * @throws NullPointerException if the task is null
1387      * @throws RejectedExecutionException if the task cannot be
1388      *         scheduled for execution
1389      */
1390     public <T> T invoke(ForkJoinTask<T> task) {
1391         if (task == null)
1392             throw new NullPointerException();
1393         if (runState >= SHUTDOWN)
1394             throw new RejectedExecutionException();
1395         Thread t = Thread.currentThread();
1396         if ((t instanceof ForkJoinWorkerThread) &&
1397             ((ForkJoinWorkerThread)t).pool == this)
1398             return task.invoke();  // bypass submit if in same pool
1399         else {
1400             doSubmit(task);
1401             return task.join();
1402         }
1403     }
1404 
1405     /**
1406      * Unless terminating, forks task if within an ongoing FJ
1407      * computation in the current pool, else submits as external task.
1408      */
1409     private <T> void forkOrSubmit(ForkJoinTask<T> task) {
1410         if (runState >= SHUTDOWN)
1411             throw new RejectedExecutionException();
1412         Thread t = Thread.currentThread();
1413         if ((t instanceof ForkJoinWorkerThread) &&
1414             ((ForkJoinWorkerThread)t).pool == this)
1415             task.fork();
1416         else
1417             doSubmit(task);
1418     }
1419 
1420     /**
1421      * Arranges for (asynchronous) execution of the given task.
1422      *
1423      * @param task the task
1424      * @throws NullPointerException if the task is null
1425      * @throws RejectedExecutionException if the task cannot be
1426      *         scheduled for execution
1427      */
1428     public void execute(ForkJoinTask<?> task) {
1429         if (task == null)
1430             throw new NullPointerException();
1431         forkOrSubmit(task);
1432     }
1433 
1434     // AbstractExecutorService methods
1435 
1436     /**
1437      * @throws NullPointerException if the task is null
1438      * @throws RejectedExecutionException if the task cannot be
1439      *         scheduled for execution
1440      */
1441     public void execute(Runnable task) {
1442         if (task == null)
1443             throw new NullPointerException();
1444         ForkJoinTask<?> job;
1445         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1446             job = (ForkJoinTask<?>) task;
1447         else
1448             job = ForkJoinTask.adapt(task, null);
1449         forkOrSubmit(job);
1450     }
1451 
1452     /**
1453      * Submits a ForkJoinTask for execution.
1454      *
1455      * @param task the task to submit
1456      * @return the task
1457      * @throws NullPointerException if the task is null
1458      * @throws RejectedExecutionException if the task cannot be
1459      *         scheduled for execution
1460      */
1461     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1462         if (task == null)
1463             throw new NullPointerException();
1464         forkOrSubmit(task);
1465         return task;
1466     }
1467 
1468     /**
1469      * @throws NullPointerException if the task is null
1470      * @throws RejectedExecutionException if the task cannot be
1471      *         scheduled for execution
1472      */
1473     public <T> ForkJoinTask<T> submit(Callable<T> task) {
1474         if (task == null)
1475             throw new NullPointerException();
1476         ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1477         forkOrSubmit(job);
1478         return job;
1479     }
1480 
1481     /**
1482      * @throws NullPointerException if the task is null
1483      * @throws RejectedExecutionException if the task cannot be
1484      *         scheduled for execution
1485      */
1486     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1487         if (task == null)
1488             throw new NullPointerException();
1489         ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1490         forkOrSubmit(job);
1491         return job;
1492     }
1493 
1494     /**
1495      * @throws NullPointerException if the task is null
1496      * @throws RejectedExecutionException if the task cannot be
1497      *         scheduled for execution
1498      */
1499     public ForkJoinTask<?> submit(Runnable task) {
1500         if (task == null)
1501             throw new NullPointerException();
1502         ForkJoinTask<?> job;
1503         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1504             job = (ForkJoinTask<?>) task;
1505         else
1506             job = ForkJoinTask.adapt(task, null);
1507         forkOrSubmit(job);
1508         return job;
1509     }
1510 
1511     /**
1512      * @throws NullPointerException       {@inheritDoc}
1513      * @throws RejectedExecutionException {@inheritDoc}
1514      */
1515     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1516         ArrayList<ForkJoinTask<T>> forkJoinTasks =
1517             new ArrayList<ForkJoinTask<T>>(tasks.size());
1518         for (Callable<T> task : tasks)
1519             forkJoinTasks.add(ForkJoinTask.adapt(task));
1520         invoke(new InvokeAll<T>(forkJoinTasks));
1521 
1522         @SuppressWarnings({"unchecked", "rawtypes"})
1523             List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1524         return futures;
1525     }
1526 
1527     static final class InvokeAll<T> extends RecursiveAction {


1785     public List<Runnable> shutdownNow() {
1786         checkPermission();
1787         tryTerminate(true);
1788         return Collections.emptyList();
1789     }
1790 
1791     /**
1792      * Returns {@code true} if all tasks have completed following shut down.
1793      *
1794      * @return {@code true} if all tasks have completed following shut down
1795      */
1796     public boolean isTerminated() {
1797         return runState >= TERMINATED;
1798     }
1799 
1800     /**
1801      * Returns {@code true} if the process of termination has
1802      * commenced but not yet completed.  This method may be useful for
1803      * debugging. A return of {@code true} reported a sufficient
1804      * period after shutdown may indicate that submitted tasks have
1805      * ignored or suppressed interruption, or are waiting for IO,
1806      * causing this executor not to properly terminate. (See the
1807      * advisory notes for class {@link ForkJoinTask} stating that
1808      * tasks should not normally entail blocking operations.  But if
1809      * they do, they must abort them on interrupt.)
1810      *
1811      * @return {@code true} if terminating but not yet terminated
1812      */
1813     public boolean isTerminating() {
1814         return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1815     }
1816 
1817     /**
1818      * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1819      */
1820     final boolean isAtLeastTerminating() {
1821         return runState >= TERMINATING;
1822     }
1823 
1824     /**
1825      * Returns {@code true} if this pool has been shut down.
1826      *
1827      * @return {@code true} if this pool has been shut down
1828      */
1829     public boolean isShutdown() {
1830         return runState >= SHUTDOWN;
1831     }
1832 
1833     /**
1834      * Blocks until all tasks have completed execution after a shutdown
1835      * request, or the timeout occurs, or the current thread is
1836      * interrupted, whichever happens first.
1837      *
1838      * @param timeout the maximum time to wait
1839      * @param unit the time unit of the timeout argument
1840      * @return {@code true} if this executor terminated and
1841      *         {@code false} if the timeout elapsed before termination
1842      * @throws InterruptedException if interrupted while waiting
1843      */
1844     public boolean awaitTermination(long timeout, TimeUnit unit)
1845         throws InterruptedException {
1846         try {
1847             termination.awaitAdvanceInterruptibly(0, timeout, unit);
1848         } catch (TimeoutException ex) {
1849             return false;
1850         }
1851         return true;
1852     }
1853 
1854     /**
1855      * Interface for extending managed parallelism for tasks running
1856      * in {@link ForkJoinPool}s.
1857      *
1858      * <p>A {@code ManagedBlocker} provides two methods.  Method
1859      * {@code isReleasable} must return {@code true} if blocking is
1860      * not necessary. Method {@code block} blocks the current thread
1861      * if necessary (perhaps internally invoking {@code isReleasable}
1862      * before actually blocking). The unusual methods in this API
1863      * accommodate synchronizers that may, but don't usually, block
1864      * for long periods. Similarly, they allow more efficient internal
1865      * handling of cases in which additional workers may be, but
1866      * usually are not, needed to ensure sufficient parallelism.
1867      * Toward this end, implementations of method {@code isReleasable}
1868      * must be amenable to repeated invocation.
1869      *
1870      * <p>For example, here is a ManagedBlocker based on a
1871      * ReentrantLock: