509 /**
510 * Creation factory for worker threads.
511 */
512 private final ForkJoinWorkerThreadFactory factory;
513
514 /**
515 * Sum of per-thread steal counts, updated only when threads are
516 * idle or terminating.
517 */
518 private volatile long stealCount;
519
520 /**
521 * Encoded record of top of Treiber stack of threads waiting for
522 * events. The top 32 bits contain the count being waited for. The
523 * bottom 16 bits contains one plus the pool index of waiting
524 * worker thread. (Bits 16-31 are unused.)
525 */
526 private volatile long eventWaiters;
527
528 private static final int EVENT_COUNT_SHIFT = 32;
529 private static final long WAITER_ID_MASK = (1L << 16) - 1L;
530
531 /**
532 * A counter for events that may wake up worker threads:
533 * - Submission of a new task to the pool
534 * - A worker pushing a task on an empty queue
535 * - termination
536 */
537 private volatile int eventCount;
538
539 /**
540 * Encoded record of top of Treiber stack of spare threads waiting
541 * for resumption. The top 16 bits contain an arbitrary count to
542 * avoid ABA effects. The bottom 16bits contains one plus the pool
543 * index of waiting worker thread.
544 */
545 private volatile int spareWaiters;
546
547 private static final int SPARE_COUNT_SHIFT = 16;
548 private static final int SPARE_ID_MASK = (1 << 16) - 1;
549
598 * True if use local fifo, not default lifo, for local polling
599 * Read by, and replicated by ForkJoinWorkerThreads
600 */
601 final boolean locallyFifo;
602
603 /**
604 * The uncaught exception handler used when any worker abruptly
605 * terminates.
606 */
607 private final Thread.UncaughtExceptionHandler ueh;
608
609 /**
610 * Pool number, just for assigning useful names to worker threads
611 */
612 private final int poolNumber;
613
614 // Utilities for CASing fields. Note that most of these
615 // are usually manually inlined by callers
616
617 /**
618 * Increments running count part of workerCounts
619 */
620 final void incrementRunningCount() {
621 int c;
622 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
623 c = workerCounts,
624 c + ONE_RUNNING));
625 }
626
627 /**
628 * Tries to decrement running count unless already zero
629 */
630 final boolean tryDecrementRunningCount() {
631 int wc = workerCounts;
632 if ((wc & RUNNING_COUNT_MASK) == 0)
633 return false;
634 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
635 wc, wc - ONE_RUNNING);
636 }
637
638 /**
639 * Forces decrement of encoded workerCounts, awaiting nonzero if
640 * (rarely) necessary when other count updates lag.
641 *
642 * @param dr -- either zero or ONE_RUNNING
643 * @param dt -- either zero or ONE_TOTAL
644 */
645 private void decrementWorkerCounts(int dr, int dt) {
646 for (;;) {
647 int wc = workerCounts;
648 if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
649 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
681 }
682 }
683
684 // workers array maintenance
685
686 /**
687 * Records and returns a workers array index for new worker.
688 */
689 private int recordWorker(ForkJoinWorkerThread w) {
690 // Try using slot totalCount-1. If not available, scan and/or resize
691 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
692 final ReentrantLock lock = this.workerLock;
693 lock.lock();
694 try {
695 ForkJoinWorkerThread[] ws = workers;
696 int n = ws.length;
697 if (k < 0 || k >= n || ws[k] != null) {
698 for (k = 0; k < n && ws[k] != null; ++k)
699 ;
700 if (k == n)
701 ws = Arrays.copyOf(ws, n << 1);
702 }
703 ws[k] = w;
704 workers = ws; // volatile array write ensures slot visibility
705 } finally {
706 lock.unlock();
707 }
708 return k;
709 }
710
711 /**
712 * Nulls out record of worker in workers array.
713 */
714 private void forgetWorker(ForkJoinWorkerThread w) {
715 int idx = w.poolIndex;
716 // Locking helps method recordWorker avoid unnecessary expansion
717 final ReentrantLock lock = this.workerLock;
718 lock.lock();
719 try {
720 ForkJoinWorkerThread[] ws = workers;
721 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
722 ws[idx] = null;
723 } finally {
724 lock.unlock();
725 }
726 }
727
728 /**
729 * Final callback from terminating worker. Removes record of
730 * worker from array, and adjusts counts. If pool is shutting
731 * down, tries to complete termination.
732 *
733 * @param w the worker
734 */
735 final void workerTerminated(ForkJoinWorkerThread w) {
736 forgetWorker(w);
737 decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
738 while (w.stealCount != 0) // collect final count
739 tryAccumulateStealCount(w);
740 tryTerminate(false);
741 }
742
743 // Waiting for and signalling events
744
745 /**
746 * Releases workers blocked on a count not equal to current count.
747 * Normally called after precheck that eventWaiters isn't zero to
748 * avoid wasted array checks. Gives up upon a change in count or
749 * upon releasing two workers, letting others take over.
750 */
751 private void releaseEventWaiters() {
752 ForkJoinWorkerThread[] ws = workers;
753 int n = ws.length;
754 long h = eventWaiters;
755 int ec = eventCount;
756 boolean releasedOne = false;
757 ForkJoinWorkerThread w; int id;
758 while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
759 (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
760 id < n && (w = ws[id]) != null) {
761 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
762 h, w.nextWaiter)) {
763 LockSupport.unpark(w);
764 if (releasedOne) // exit on second release
765 break;
766 releasedOne = true;
767 }
768 if (eventCount != ec)
769 break;
770 h = eventWaiters;
771 }
772 }
773
774 /**
775 * Tries to advance eventCount and releases waiters. Called only
776 * from workers.
777 */
778 final void signalWork() {
779 int c; // try to increment event count -- CAS failure OK
780 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
781 if (eventWaiters != 0L)
782 releaseEventWaiters();
783 }
784
785 /**
786 * Adds the given worker to event queue and blocks until
787 * terminating or event count advances from the given value
788 *
789 * @param w the calling worker thread
790 * @param ec the count
791 */
792 private void eventSync(ForkJoinWorkerThread w, int ec) {
793 long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
794 long h;
795 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
796 (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
797 (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
798 eventCount == ec) {
799 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
800 w.nextWaiter = h, nh)) {
801 awaitEvent(w, ec);
802 break;
803 }
804 }
805 }
806
807 /**
808 * Blocks the given worker (that has already been entered as an
809 * event waiter) until terminating or event count advances from
810 * the given value. The oldest (first) waiter uses a timed wait to
811 * occasionally one-by-one shrink the number of workers (to a
812 * minimum of one) if the pool has not been used for extended
813 * periods.
814 *
815 * @param w the calling worker thread
816 * @param ec the count
817 */
818 private void awaitEvent(ForkJoinWorkerThread w, int ec) {
819 while (eventCount == ec) {
820 if (tryAccumulateStealCount(w)) { // transfer while idle
821 boolean untimed = (w.nextWaiter != 0L ||
822 (workerCounts & RUNNING_COUNT_MASK) <= 1);
823 long startTime = untimed? 0 : System.nanoTime();
824 Thread.interrupted(); // clear/ignore interrupt
825 if (eventCount != ec || w.isTerminating())
826 break; // recheck after clear
827 if (untimed)
828 LockSupport.park(w);
829 else {
830 LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
831 if (eventCount != ec || w.isTerminating())
832 break;
833 if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
834 tryShutdownUnusedWorker(ec);
835 }
836 }
837 }
838 }
839
840 // Maintaining parallelism
841
842 /**
843 * Pushes worker onto the spare stack.
844 */
845 final void pushSpare(ForkJoinWorkerThread w) {
846 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
847 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
848 w.nextSpare = spareWaiters,ns));
849 }
850
851 /**
852 * Tries (once) to resume a spare if the number of running
853 * threads is less than target.
854 */
855 private void tryResumeSpare() {
856 int sw, id;
857 ForkJoinWorkerThread[] ws = workers;
858 int n = ws.length;
859 ForkJoinWorkerThread w;
860 if ((sw = spareWaiters) != 0 &&
861 (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
862 id < n && (w = ws[id]) != null &&
863 (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
864 spareWaiters == sw &&
865 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
866 sw, w.nextSpare)) {
867 int c; // increment running count before resume
868 do {} while (!UNSAFE.compareAndSwapInt
869 (this, workerCountsOffset,
870 c = workerCounts, c + ONE_RUNNING));
871 if (w.tryUnsuspend())
872 LockSupport.unpark(w);
873 else // back out if w was shutdown
874 decrementWorkerCounts(ONE_RUNNING, 0);
875 }
876 }
877
878 /**
879 * Tries to increase the number of running workers if below target
880 * parallelism: If a spare exists tries to resume it via
881 * tryResumeSpare. Otherwise, if not enough total workers or all
882 * existing workers are busy, adds a new worker. In all cases also
883 * helps wake up releasable workers waiting for work.
897 wc + (ONE_RUNNING|ONE_TOTAL))) {
898 ForkJoinWorkerThread w = null;
899 Throwable fail = null;
900 try {
901 w = factory.newThread(this);
902 } catch (Throwable ex) {
903 fail = ex;
904 }
905 if (w == null) { // null or exceptional factory return
906 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
907 tryTerminate(false); // handle failure during shutdown
908 // If originating from an external caller,
909 // propagate exception, else ignore
910 if (fail != null && runState < TERMINATING &&
911 !(Thread.currentThread() instanceof
912 ForkJoinWorkerThread))
913 UNSAFE.throwException(fail);
914 break;
915 }
916 w.start(recordWorker(w), ueh);
917 if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
918 int c; // advance event count
919 UNSAFE.compareAndSwapInt(this, eventCountOffset,
920 c = eventCount, c+1);
921 break; // add at most one unless total below target
922 }
923 }
924 }
925 if (eventWaiters != 0L)
926 releaseEventWaiters();
927 }
928
929 /**
930 * Callback from the oldest waiter in awaitEvent waking up after a
931 * period of non-use. If all workers are idle, tries (once) to
932 * shutdown an event waiter or a spare, if one exists. Note that
933 * we don't need CAS or locks here because the method is called
934 * only from one thread occasionally waking (and even misfires are
935 * OK). Note that until the shutdown worker fully terminates,
936 * workerCounts will overestimate total count, which is tolerable.
937 *
938 * @param ec the event count waited on by caller (to abort
939 * attempt if count has since changed).
940 */
941 private void tryShutdownUnusedWorker(int ec) {
942 if (runState == 0 && eventCount == ec) { // only trigger if all idle
943 ForkJoinWorkerThread[] ws = workers;
944 int n = ws.length;
945 ForkJoinWorkerThread w = null;
946 boolean shutdown = false;
947 int sw;
948 long h;
949 if ((sw = spareWaiters) != 0) { // prefer killing spares
950 int id = (sw & SPARE_ID_MASK) - 1;
951 if (id >= 0 && id < n && (w = ws[id]) != null &&
952 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
953 sw, w.nextSpare))
954 shutdown = true;
955 }
956 else if ((h = eventWaiters) != 0L) {
957 long nh;
958 int id = ((int)(h & WAITER_ID_MASK)) - 1;
959 if (id >= 0 && id < n && (w = ws[id]) != null &&
960 (nh = w.nextWaiter) != 0L && // keep at least one worker
961 UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
962 shutdown = true;
963 }
964 if (w != null && shutdown) {
965 w.shutdown();
966 LockSupport.unpark(w);
967 }
968 }
969 releaseEventWaiters(); // in case of interference
970 }
971
972 /**
973 * Callback from workers invoked upon each top-level action (i.e.,
974 * stealing a task or taking a submission and running it).
975 * Performs one or more of the following:
976 *
977 * 1. If the worker is active and either did not run a task
978 * or there are too many workers, try to set its active status
991 * eventSync if necessary (first forcing inactivation), upon
992 * which the worker may be shutdown via
993 * tryShutdownUnusedWorker. Otherwise, help release any
994 * existing event waiters that are now releasable,
995 *
996 * @param w the worker
997 * @param ran true if worker ran a task since last call to this method
998 */
999 final void preStep(ForkJoinWorkerThread w, boolean ran) {
1000 int wec = w.lastEventCount;
1001 boolean active = w.active;
1002 boolean inactivate = false;
1003 int pc = parallelism;
1004 while (w.runState == 0) {
1005 int rs = runState;
1006 if (rs >= TERMINATING) { // propagate shutdown
1007 w.shutdown();
1008 break;
1009 }
1010 if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
1011 UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
1012 inactivate = active = w.active = false;
1013 int wc = workerCounts;
1014 if ((wc & RUNNING_COUNT_MASK) > pc) {
1015 if (!(inactivate |= active) && // must inactivate to suspend
1016 workerCounts == wc && // try to suspend as spare
1017 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1018 wc, wc - ONE_RUNNING))
1019 w.suspendAsSpare();
1020 }
1021 else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1022 helpMaintainParallelism(); // not enough workers
1023 else if (!ran) {
1024 long h = eventWaiters;
1025 int ec = eventCount;
1026 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
1027 releaseEventWaiters(); // release others before waiting
1028 else if (ec != wec) {
1029 w.lastEventCount = ec; // no need to wait
1030 break;
1031 }
1032 else if (!(inactivate |= active))
1033 eventSync(w, wec); // must inactivate before sync
1034 }
1035 else
1036 break;
1037 }
1038 }
1039
1040 /**
1041 * Helps and/or blocks awaiting join of the given task.
1042 * See above for explanation.
1043 *
1044 * @param joinMe the task to join
1045 * @param worker the current worker thread
1046 */
1047 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker) {
1048 int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1049 while (joinMe.status >= 0) {
1050 int wc;
1051 worker.helpJoinTask(joinMe);
1052 if (joinMe.status < 0)
1053 break;
1054 else if (retries > 0)
1055 --retries;
1056 else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
1057 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1058 wc, wc - ONE_RUNNING)) {
1059 int stat, c; long h;
1060 while ((stat = joinMe.status) >= 0 &&
1061 (h = eventWaiters) != 0L && // help release others
1062 (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1063 releaseEventWaiters();
1064 if (stat >= 0 &&
1065 ((workerCounts & RUNNING_COUNT_MASK) == 0 ||
1066 (stat =
1067 joinMe.internalAwaitDone(JOIN_TIMEOUT_MILLIS)) >= 0))
1068 helpMaintainParallelism(); // timeout or no running workers
1069 do {} while (!UNSAFE.compareAndSwapInt
1070 (this, workerCountsOffset,
1071 c = workerCounts, c + ONE_RUNNING));
1072 if (stat < 0)
1073 break; // else restart
1074 }
1075 }
1076 }
1077
1078 /**
1079 * Same idea as awaitJoin, but no helping, retries, or timeouts.
1080 */
1081 final void awaitBlocker(ManagedBlocker blocker)
1082 throws InterruptedException {
1083 while (!blocker.isReleasable()) {
1084 int wc = workerCounts;
1085 if ((wc & RUNNING_COUNT_MASK) != 0 &&
1086 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1087 wc, wc - ONE_RUNNING)) {
1088 try {
1089 while (!blocker.isReleasable()) {
1090 long h = eventWaiters;
1091 if (h != 0L &&
1092 (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1093 releaseEventWaiters();
1094 else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1095 runState < TERMINATING)
1096 helpMaintainParallelism();
1097 else if (blocker.block())
1098 break;
1099 }
1100 } finally {
1101 int c;
1102 do {} while (!UNSAFE.compareAndSwapInt
1103 (this, workerCountsOffset,
1104 c = workerCounts, c + ONE_RUNNING));
1105 }
1106 break;
1112 * Possibly initiates and/or completes termination.
1113 *
1114 * @param now if true, unconditionally terminate, else only
1115 * if shutdown and empty queue and no active workers
1116 * @return true if now terminating or terminated
1117 */
1118 private boolean tryTerminate(boolean now) {
1119 if (now)
1120 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1121 else if (runState < SHUTDOWN ||
1122 !submissionQueue.isEmpty() ||
1123 (runState & ACTIVE_COUNT_MASK) != 0)
1124 return false;
1125
1126 if (advanceRunLevel(TERMINATING))
1127 startTerminating();
1128
1129 // Finish now if all threads terminated; else in some subsequent call
1130 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1131 advanceRunLevel(TERMINATED);
1132 termination.arrive();
1133 }
1134 return true;
1135 }
1136
1137
1138 /**
1139 * Actions on transition to TERMINATING
1140 *
1141 * Runs up to four passes through workers: (0) shutting down each
1142 * (without waking up if parked) to quickly spread notifications
1143 * without unnecessary bouncing around event queues etc (1) wake
1144 * up and help cancel tasks (2) interrupt (3) mop up races with
1145 * interrupted workers
1146 */
1147 private void startTerminating() {
1148 cancelSubmissions();
1149 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1150 int c; // advance event count
1151 UNSAFE.compareAndSwapInt(this, eventCountOffset,
1152 c = eventCount, c+1);
1153 eventWaiters = 0L; // clobber lists
1154 spareWaiters = 0;
1155 for (ForkJoinWorkerThread w : workers) {
1156 if (w != null) {
1157 w.shutdown();
1308 }
1309
1310 /**
1311 * Returns initial power of two size for workers array.
1312 * @param pc the initial parallelism level
1313 */
1314 private static int initialArraySizeFor(int pc) {
1315 // If possible, initially allocate enough space for one spare
1316 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1317 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1318 size |= size >>> 1;
1319 size |= size >>> 2;
1320 size |= size >>> 4;
1321 size |= size >>> 8;
1322 return size + 1;
1323 }
1324
1325 // Execution methods
1326
1327 /**
1328 * Common code for execute, invoke and submit
1329 */
1330 private <T> void doSubmit(ForkJoinTask<T> task) {
1331 if (task == null)
1332 throw new NullPointerException();
1333 if (runState >= SHUTDOWN)
1334 throw new RejectedExecutionException();
1335 submissionQueue.offer(task);
1336 int c; // try to increment event count -- CAS failure OK
1337 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1338 helpMaintainParallelism(); // create, start, or resume some workers
1339 }
1340
1341 /**
1342 * Performs the given task, returning its result upon completion.
1343 *
1344 * @param task the task
1345 * @return the task's result
1346 * @throws NullPointerException if the task is null
1347 * @throws RejectedExecutionException if the task cannot be
1348 * scheduled for execution
1349 */
1350 public <T> T invoke(ForkJoinTask<T> task) {
1351 doSubmit(task);
1352 return task.join();
1353 }
1354
1355 /**
1356 * Arranges for (asynchronous) execution of the given task.
1357 *
1358 * @param task the task
1359 * @throws NullPointerException if the task is null
1360 * @throws RejectedExecutionException if the task cannot be
1361 * scheduled for execution
1362 */
1363 public void execute(ForkJoinTask<?> task) {
1364 doSubmit(task);
1365 }
1366
1367 // AbstractExecutorService methods
1368
1369 /**
1370 * @throws NullPointerException if the task is null
1371 * @throws RejectedExecutionException if the task cannot be
1372 * scheduled for execution
1373 */
1374 public void execute(Runnable task) {
1375 ForkJoinTask<?> job;
1376 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1377 job = (ForkJoinTask<?>) task;
1378 else
1379 job = ForkJoinTask.adapt(task, null);
1380 doSubmit(job);
1381 }
1382
1383 /**
1384 * Submits a ForkJoinTask for execution.
1385 *
1386 * @param task the task to submit
1387 * @return the task
1388 * @throws NullPointerException if the task is null
1389 * @throws RejectedExecutionException if the task cannot be
1390 * scheduled for execution
1391 */
1392 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1393 doSubmit(task);
1394 return task;
1395 }
1396
1397 /**
1398 * @throws NullPointerException if the task is null
1399 * @throws RejectedExecutionException if the task cannot be
1400 * scheduled for execution
1401 */
1402 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1403 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1404 doSubmit(job);
1405 return job;
1406 }
1407
1408 /**
1409 * @throws NullPointerException if the task is null
1410 * @throws RejectedExecutionException if the task cannot be
1411 * scheduled for execution
1412 */
1413 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1414 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1415 doSubmit(job);
1416 return job;
1417 }
1418
1419 /**
1420 * @throws NullPointerException if the task is null
1421 * @throws RejectedExecutionException if the task cannot be
1422 * scheduled for execution
1423 */
1424 public ForkJoinTask<?> submit(Runnable task) {
1425 ForkJoinTask<?> job;
1426 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1427 job = (ForkJoinTask<?>) task;
1428 else
1429 job = ForkJoinTask.adapt(task, null);
1430 doSubmit(job);
1431 return job;
1432 }
1433
1434 /**
1435 * @throws NullPointerException {@inheritDoc}
1436 * @throws RejectedExecutionException {@inheritDoc}
1437 */
1438 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1439 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1440 new ArrayList<ForkJoinTask<T>>(tasks.size());
1441 for (Callable<T> task : tasks)
1442 forkJoinTasks.add(ForkJoinTask.adapt(task));
1443 invoke(new InvokeAll<T>(forkJoinTasks));
1444
1445 @SuppressWarnings({"unchecked", "rawtypes"})
1446 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1447 return futures;
1448 }
1449
1450 static final class InvokeAll<T> extends RecursiveAction {
1708 public List<Runnable> shutdownNow() {
1709 checkPermission();
1710 tryTerminate(true);
1711 return Collections.emptyList();
1712 }
1713
1714 /**
1715 * Returns {@code true} if all tasks have completed following shut down.
1716 *
1717 * @return {@code true} if all tasks have completed following shut down
1718 */
1719 public boolean isTerminated() {
1720 return runState >= TERMINATED;
1721 }
1722
1723 /**
1724 * Returns {@code true} if the process of termination has
1725 * commenced but not yet completed. This method may be useful for
1726 * debugging. A return of {@code true} reported a sufficient
1727 * period after shutdown may indicate that submitted tasks have
1728 * ignored or suppressed interruption, causing this executor not
1729 * to properly terminate.
1730 *
1731 * @return {@code true} if terminating but not yet terminated
1732 */
1733 public boolean isTerminating() {
1734 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1735 }
1736
1737 /**
1738 * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1739 */
1740 final boolean isAtLeastTerminating() {
1741 return runState >= TERMINATING;
1742 }
1743
1744 /**
1745 * Returns {@code true} if this pool has been shut down.
1746 *
1747 * @return {@code true} if this pool has been shut down
1748 */
1749 public boolean isShutdown() {
1750 return runState >= SHUTDOWN;
1751 }
1752
1753 /**
1754 * Blocks until all tasks have completed execution after a shutdown
1755 * request, or the timeout occurs, or the current thread is
1756 * interrupted, whichever happens first.
1757 *
1758 * @param timeout the maximum time to wait
1759 * @param unit the time unit of the timeout argument
1760 * @return {@code true} if this executor terminated and
1761 * {@code false} if the timeout elapsed before termination
1762 * @throws InterruptedException if interrupted while waiting
1763 */
1764 public boolean awaitTermination(long timeout, TimeUnit unit)
1765 throws InterruptedException {
1766 try {
1767 return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
1768 } catch (TimeoutException ex) {
1769 return false;
1770 }
1771 }
1772
1773 /**
1774 * Interface for extending managed parallelism for tasks running
1775 * in {@link ForkJoinPool}s.
1776 *
1777 * <p>A {@code ManagedBlocker} provides two methods. Method
1778 * {@code isReleasable} must return {@code true} if blocking is
1779 * not necessary. Method {@code block} blocks the current thread
1780 * if necessary (perhaps internally invoking {@code isReleasable}
1781 * before actually blocking). The unusual methods in this API
1782 * accommodate synchronizers that may, but don't usually, block
1783 * for long periods. Similarly, they allow more efficient internal
1784 * handling of cases in which additional workers may be, but
1785 * usually are not, needed to ensure sufficient parallelism.
1786 * Toward this end, implementations of method {@code isReleasable}
1787 * must be amenable to repeated invocation.
1788 *
1789 * <p>For example, here is a ManagedBlocker based on a
1790 * ReentrantLock:
|
509 /**
510 * Creation factory for worker threads.
511 */
512 private final ForkJoinWorkerThreadFactory factory;
513
514 /**
515 * Sum of per-thread steal counts, updated only when threads are
516 * idle or terminating.
517 */
518 private volatile long stealCount;
519
520 /**
521 * Encoded record of top of Treiber stack of threads waiting for
522 * events. The top 32 bits contain the count being waited for. The
523 * bottom 16 bits contains one plus the pool index of waiting
524 * worker thread. (Bits 16-31 are unused.)
525 */
526 private volatile long eventWaiters;
527
528 private static final int EVENT_COUNT_SHIFT = 32;
529 private static final int WAITER_ID_MASK = (1 << 16) - 1;
530
531 /**
532 * A counter for events that may wake up worker threads:
533 * - Submission of a new task to the pool
534 * - A worker pushing a task on an empty queue
535 * - termination
536 */
537 private volatile int eventCount;
538
539 /**
540 * Encoded record of top of Treiber stack of spare threads waiting
541 * for resumption. The top 16 bits contain an arbitrary count to
542 * avoid ABA effects. The bottom 16bits contains one plus the pool
543 * index of waiting worker thread.
544 */
545 private volatile int spareWaiters;
546
547 private static final int SPARE_COUNT_SHIFT = 16;
548 private static final int SPARE_ID_MASK = (1 << 16) - 1;
549
598 * True if use local fifo, not default lifo, for local polling
599 * Read by, and replicated by ForkJoinWorkerThreads
600 */
601 final boolean locallyFifo;
602
603 /**
604 * The uncaught exception handler used when any worker abruptly
605 * terminates.
606 */
607 private final Thread.UncaughtExceptionHandler ueh;
608
609 /**
610 * Pool number, just for assigning useful names to worker threads
611 */
612 private final int poolNumber;
613
614 // Utilities for CASing fields. Note that most of these
615 // are usually manually inlined by callers
616
617 /**
618 * Increments running count part of workerCounts.
619 */
620 final void incrementRunningCount() {
621 int c;
622 do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
623 c = workerCounts,
624 c + ONE_RUNNING));
625 }
626
627 /**
628 * Tries to increment running count part of workerCounts.
629 */
630 final boolean tryIncrementRunningCount() {
631 int c;
632 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
633 c = workerCounts,
634 c + ONE_RUNNING);
635 }
636
637 /**
638 * Tries to decrement running count unless already zero.
639 */
640 final boolean tryDecrementRunningCount() {
641 int wc = workerCounts;
642 if ((wc & RUNNING_COUNT_MASK) == 0)
643 return false;
644 return UNSAFE.compareAndSwapInt(this, workerCountsOffset,
645 wc, wc - ONE_RUNNING);
646 }
647
648 /**
649 * Forces decrement of encoded workerCounts, awaiting nonzero if
650 * (rarely) necessary when other count updates lag.
651 *
652 * @param dr -- either zero or ONE_RUNNING
653 * @param dt -- either zero or ONE_TOTAL
654 */
655 private void decrementWorkerCounts(int dr, int dt) {
656 for (;;) {
657 int wc = workerCounts;
658 if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
659 (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
691 }
692 }
693
694 // workers array maintenance
695
696 /**
697 * Records and returns a workers array index for new worker.
698 */
699 private int recordWorker(ForkJoinWorkerThread w) {
700 // Try using slot totalCount-1. If not available, scan and/or resize
701 int k = (workerCounts >>> TOTAL_COUNT_SHIFT) - 1;
702 final ReentrantLock lock = this.workerLock;
703 lock.lock();
704 try {
705 ForkJoinWorkerThread[] ws = workers;
706 int n = ws.length;
707 if (k < 0 || k >= n || ws[k] != null) {
708 for (k = 0; k < n && ws[k] != null; ++k)
709 ;
710 if (k == n)
711 ws = workers = Arrays.copyOf(ws, n << 1);
712 }
713 ws[k] = w;
714 int c = eventCount; // advance event count to ensure visibility
715 UNSAFE.compareAndSwapInt(this, eventCountOffset, c, c+1);
716 } finally {
717 lock.unlock();
718 }
719 return k;
720 }
721
722 /**
723 * Nulls out record of worker in workers array.
724 */
725 private void forgetWorker(ForkJoinWorkerThread w) {
726 int idx = w.poolIndex;
727 // Locking helps method recordWorker avoid unnecessary expansion
728 final ReentrantLock lock = this.workerLock;
729 lock.lock();
730 try {
731 ForkJoinWorkerThread[] ws = workers;
732 if (idx >= 0 && idx < ws.length && ws[idx] == w) // verify
733 ws[idx] = null;
734 } finally {
735 lock.unlock();
736 }
737 }
738
739 /**
740 * Final callback from terminating worker. Removes record of
741 * worker from array, and adjusts counts. If pool is shutting
742 * down, tries to complete termination.
743 *
744 * @param w the worker
745 */
746 final void workerTerminated(ForkJoinWorkerThread w) {
747 forgetWorker(w);
748 decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
749 while (w.stealCount != 0) // collect final count
750 tryAccumulateStealCount(w);
751 tryTerminate(false);
752 }
753
754 // Waiting for and signalling events
755
756 /**
757 * Releases workers blocked on a count not equal to current count.
758 * Normally called after precheck that eventWaiters isn't zero to
759 * avoid wasted array checks. Gives up upon a change in count or
760 * upon releasing four workers, letting others take over.
761 */
762 private void releaseEventWaiters() {
763 ForkJoinWorkerThread[] ws = workers;
764 int n = ws.length;
765 long h = eventWaiters;
766 int ec = eventCount;
767 int releases = 4;
768 ForkJoinWorkerThread w; int id;
769 while ((id = (((int)h) & WAITER_ID_MASK) - 1) >= 0 &&
770 (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
771 id < n && (w = ws[id]) != null) {
772 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
773 h, w.nextWaiter)) {
774 LockSupport.unpark(w);
775 if (--releases == 0)
776 break;
777 }
778 if (eventCount != ec)
779 break;
780 h = eventWaiters;
781 }
782 }
783
784 /**
785 * Tries to advance eventCount and releases waiters. Called only
786 * from workers.
787 */
788 final void signalWork() {
789 int c; // try to increment event count -- CAS failure OK
790 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
791 if (eventWaiters != 0L)
792 releaseEventWaiters();
793 }
794
795 /**
796 * Adds the given worker to event queue and blocks until
797 * terminating or event count advances from the given value
798 *
799 * @param w the calling worker thread
800 * @param ec the count
801 */
802 private void eventSync(ForkJoinWorkerThread w, int ec) {
803 long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
804 long h;
805 while ((runState < SHUTDOWN || !tryTerminate(false)) &&
806 (((int)(h = eventWaiters) & WAITER_ID_MASK) == 0 ||
807 (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
808 eventCount == ec) {
809 if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
810 w.nextWaiter = h, nh)) {
811 awaitEvent(w, ec);
812 break;
813 }
814 }
815 }
816
817 /**
818 * Blocks the given worker (that has already been entered as an
819 * event waiter) until terminating or event count advances from
820 * the given value. The oldest (first) waiter uses a timed wait to
821 * occasionally one-by-one shrink the number of workers (to a
822 * minimum of one) if the pool has not been used for extended
823 * periods.
824 *
825 * @param w the calling worker thread
826 * @param ec the count
827 */
828 private void awaitEvent(ForkJoinWorkerThread w, int ec) {
829 while (eventCount == ec) {
830 if (tryAccumulateStealCount(w)) { // transfer while idle
831 boolean untimed = (w.nextWaiter != 0L ||
832 (workerCounts & RUNNING_COUNT_MASK) <= 1);
833 long startTime = untimed ? 0 : System.nanoTime();
834 Thread.interrupted(); // clear/ignore interrupt
835 if (w.isTerminating() || eventCount != ec)
836 break; // recheck after clear
837 if (untimed)
838 LockSupport.park(w);
839 else {
840 LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
841 if (eventCount != ec || w.isTerminating())
842 break;
843 if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
844 tryShutdownUnusedWorker(ec);
845 }
846 }
847 }
848 }
849
850 // Maintaining parallelism
851
852 /**
853 * Pushes worker onto the spare stack.
854 */
855 final void pushSpare(ForkJoinWorkerThread w) {
856 int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
857 do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
858 w.nextSpare = spareWaiters,ns));
859 }
860
861 /**
862 * Tries (once) to resume a spare if the number of running
863 * threads is less than target.
864 */
865 private void tryResumeSpare() {
866 int sw, id;
867 ForkJoinWorkerThread[] ws = workers;
868 int n = ws.length;
869 ForkJoinWorkerThread w;
870 if ((sw = spareWaiters) != 0 &&
871 (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
872 id < n && (w = ws[id]) != null &&
873 (runState >= TERMINATING ||
874 (workerCounts & RUNNING_COUNT_MASK) < parallelism) &&
875 spareWaiters == sw &&
876 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
877 sw, w.nextSpare)) {
878 int c; // increment running count before resume
879 do {} while (!UNSAFE.compareAndSwapInt
880 (this, workerCountsOffset,
881 c = workerCounts, c + ONE_RUNNING));
882 if (w.tryUnsuspend())
883 LockSupport.unpark(w);
884 else // back out if w was shutdown
885 decrementWorkerCounts(ONE_RUNNING, 0);
886 }
887 }
888
889 /**
890 * Tries to increase the number of running workers if below target
891 * parallelism: If a spare exists tries to resume it via
892 * tryResumeSpare. Otherwise, if not enough total workers or all
893 * existing workers are busy, adds a new worker. In all cases also
894 * helps wake up releasable workers waiting for work.
908 wc + (ONE_RUNNING|ONE_TOTAL))) {
909 ForkJoinWorkerThread w = null;
910 Throwable fail = null;
911 try {
912 w = factory.newThread(this);
913 } catch (Throwable ex) {
914 fail = ex;
915 }
916 if (w == null) { // null or exceptional factory return
917 decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
918 tryTerminate(false); // handle failure during shutdown
919 // If originating from an external caller,
920 // propagate exception, else ignore
921 if (fail != null && runState < TERMINATING &&
922 !(Thread.currentThread() instanceof
923 ForkJoinWorkerThread))
924 UNSAFE.throwException(fail);
925 break;
926 }
927 w.start(recordWorker(w), ueh);
928 if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc)
929 break; // add at most one unless total below target
930 }
931 }
932 if (eventWaiters != 0L)
933 releaseEventWaiters();
934 }
935
936 /**
937 * Callback from the oldest waiter in awaitEvent waking up after a
938 * period of non-use. If all workers are idle, tries (once) to
939 * shutdown an event waiter or a spare, if one exists. Note that
940 * we don't need CAS or locks here because the method is called
941 * only from one thread occasionally waking (and even misfires are
942 * OK). Note that until the shutdown worker fully terminates,
943 * workerCounts will overestimate total count, which is tolerable.
944 *
945 * @param ec the event count waited on by caller (to abort
946 * attempt if count has since changed).
947 */
948 private void tryShutdownUnusedWorker(int ec) {
949 if (runState == 0 && eventCount == ec) { // only trigger if all idle
950 ForkJoinWorkerThread[] ws = workers;
951 int n = ws.length;
952 ForkJoinWorkerThread w = null;
953 boolean shutdown = false;
954 int sw;
955 long h;
956 if ((sw = spareWaiters) != 0) { // prefer killing spares
957 int id = (sw & SPARE_ID_MASK) - 1;
958 if (id >= 0 && id < n && (w = ws[id]) != null &&
959 UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
960 sw, w.nextSpare))
961 shutdown = true;
962 }
963 else if ((h = eventWaiters) != 0L) {
964 long nh;
965 int id = (((int)h) & WAITER_ID_MASK) - 1;
966 if (id >= 0 && id < n && (w = ws[id]) != null &&
967 (nh = w.nextWaiter) != 0L && // keep at least one worker
968 UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
969 shutdown = true;
970 }
971 if (w != null && shutdown) {
972 w.shutdown();
973 LockSupport.unpark(w);
974 }
975 }
976 releaseEventWaiters(); // in case of interference
977 }
978
979 /**
980 * Callback from workers invoked upon each top-level action (i.e.,
981 * stealing a task or taking a submission and running it).
982 * Performs one or more of the following:
983 *
984 * 1. If the worker is active and either did not run a task
985 * or there are too many workers, try to set its active status
998 * eventSync if necessary (first forcing inactivation), upon
999 * which the worker may be shutdown via
1000 * tryShutdownUnusedWorker. Otherwise, help release any
1001 * existing event waiters that are now releasable,
1002 *
1003 * @param w the worker
1004 * @param ran true if worker ran a task since last call to this method
1005 */
1006 final void preStep(ForkJoinWorkerThread w, boolean ran) {
1007 int wec = w.lastEventCount;
1008 boolean active = w.active;
1009 boolean inactivate = false;
1010 int pc = parallelism;
1011 while (w.runState == 0) {
1012 int rs = runState;
1013 if (rs >= TERMINATING) { // propagate shutdown
1014 w.shutdown();
1015 break;
1016 }
1017 if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
1018 UNSAFE.compareAndSwapInt(this, runStateOffset, rs, --rs)) {
1019 inactivate = active = w.active = false;
1020 if (rs == SHUTDOWN) { // all inactive and shut down
1021 tryTerminate(false);
1022 continue;
1023 }
1024 }
1025 int wc = workerCounts; // try to suspend as spare
1026 if ((wc & RUNNING_COUNT_MASK) > pc) {
1027 if (!(inactivate |= active) && // must inactivate to suspend
1028 workerCounts == wc &&
1029 UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1030 wc, wc - ONE_RUNNING))
1031 w.suspendAsSpare();
1032 }
1033 else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
1034 helpMaintainParallelism(); // not enough workers
1035 else if (ran)
1036 break;
1037 else {
1038 long h = eventWaiters;
1039 int ec = eventCount;
1040 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
1041 releaseEventWaiters(); // release others before waiting
1042 else if (ec != wec) {
1043 w.lastEventCount = ec; // no need to wait
1044 break;
1045 }
1046 else if (!(inactivate |= active))
1047 eventSync(w, wec); // must inactivate before sync
1048 }
1049 }
1050 }
1051
1052 /**
1053 * Helps and/or blocks awaiting join of the given task.
1054 * See above for explanation.
1055 *
1056 * @param joinMe the task to join
1057 * @param worker the current worker thread
1058 * @param timed true if wait should time out
1059 * @param nanos timeout value if timed
1060 */
1061 final void awaitJoin(ForkJoinTask<?> joinMe, ForkJoinWorkerThread worker,
1062 boolean timed, long nanos) {
1063 long startTime = timed ? System.nanoTime() : 0L;
1064 int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
1065 boolean running = true; // false when count decremented
1066 while (joinMe.status >= 0) {
1067 if (runState >= TERMINATING) {
1068 joinMe.cancelIgnoringExceptions();
1069 break;
1070 }
1071 running = worker.helpJoinTask(joinMe, running);
1072 if (joinMe.status < 0)
1073 break;
1074 if (retries > 0) {
1075 --retries;
1076 continue;
1077 }
1078 int wc = workerCounts;
1079 if ((wc & RUNNING_COUNT_MASK) != 0) {
1080 if (running) {
1081 if (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1082 wc, wc - ONE_RUNNING))
1083 continue;
1084 running = false;
1085 }
1086 long h = eventWaiters;
1087 if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1088 releaseEventWaiters();
1089 if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
1090 long ms; int ns;
1091 if (!timed) {
1092 ms = JOIN_TIMEOUT_MILLIS;
1093 ns = 0;
1094 }
1095 else { // at most JOIN_TIMEOUT_MILLIS per wait
1096 long nt = nanos - (System.nanoTime() - startTime);
1097 if (nt <= 0L)
1098 break;
1099 ms = nt / 1000000;
1100 if (ms > JOIN_TIMEOUT_MILLIS) {
1101 ms = JOIN_TIMEOUT_MILLIS;
1102 ns = 0;
1103 }
1104 else
1105 ns = (int) (nt % 1000000);
1106 }
1107 joinMe.internalAwaitDone(ms, ns);
1108 }
1109 if (joinMe.status < 0)
1110 break;
1111 }
1112 helpMaintainParallelism();
1113 }
1114 if (!running) {
1115 int c;
1116 do {} while (!UNSAFE.compareAndSwapInt
1117 (this, workerCountsOffset,
1118 c = workerCounts, c + ONE_RUNNING));
1119 }
1120 }
1121
1122 /**
1123 * Same idea as awaitJoin, but no helping, retries, or timeouts.
1124 */
1125 final void awaitBlocker(ManagedBlocker blocker)
1126 throws InterruptedException {
1127 while (!blocker.isReleasable()) {
1128 int wc = workerCounts;
1129 if ((wc & RUNNING_COUNT_MASK) == 0)
1130 helpMaintainParallelism();
1131 else if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
1132 wc, wc - ONE_RUNNING)) {
1133 try {
1134 while (!blocker.isReleasable()) {
1135 long h = eventWaiters;
1136 if (h != 0L &&
1137 (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
1138 releaseEventWaiters();
1139 else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
1140 runState < TERMINATING)
1141 helpMaintainParallelism();
1142 else if (blocker.block())
1143 break;
1144 }
1145 } finally {
1146 int c;
1147 do {} while (!UNSAFE.compareAndSwapInt
1148 (this, workerCountsOffset,
1149 c = workerCounts, c + ONE_RUNNING));
1150 }
1151 break;
1157 * Possibly initiates and/or completes termination.
1158 *
1159 * @param now if true, unconditionally terminate, else only
1160 * if shutdown and empty queue and no active workers
1161 * @return true if now terminating or terminated
1162 */
1163 private boolean tryTerminate(boolean now) {
1164 if (now)
1165 advanceRunLevel(SHUTDOWN); // ensure at least SHUTDOWN
1166 else if (runState < SHUTDOWN ||
1167 !submissionQueue.isEmpty() ||
1168 (runState & ACTIVE_COUNT_MASK) != 0)
1169 return false;
1170
1171 if (advanceRunLevel(TERMINATING))
1172 startTerminating();
1173
1174 // Finish now if all threads terminated; else in some subsequent call
1175 if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
1176 advanceRunLevel(TERMINATED);
1177 termination.forceTermination();
1178 }
1179 return true;
1180 }
1181
1182 /**
1183 * Actions on transition to TERMINATING
1184 *
1185 * Runs up to four passes through workers: (0) shutting down each
1186 * (without waking up if parked) to quickly spread notifications
1187 * without unnecessary bouncing around event queues etc (1) wake
1188 * up and help cancel tasks (2) interrupt (3) mop up races with
1189 * interrupted workers
1190 */
1191 private void startTerminating() {
1192 cancelSubmissions();
1193 for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
1194 int c; // advance event count
1195 UNSAFE.compareAndSwapInt(this, eventCountOffset,
1196 c = eventCount, c+1);
1197 eventWaiters = 0L; // clobber lists
1198 spareWaiters = 0;
1199 for (ForkJoinWorkerThread w : workers) {
1200 if (w != null) {
1201 w.shutdown();
1352 }
1353
1354 /**
1355 * Returns initial power of two size for workers array.
1356 * @param pc the initial parallelism level
1357 */
1358 private static int initialArraySizeFor(int pc) {
1359 // If possible, initially allocate enough space for one spare
1360 int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
1361 // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
1362 size |= size >>> 1;
1363 size |= size >>> 2;
1364 size |= size >>> 4;
1365 size |= size >>> 8;
1366 return size + 1;
1367 }
1368
1369 // Execution methods
1370
1371 /**
1372 * Submits task and creates, starts, or resumes some workers if necessary
1373 */
1374 private <T> void doSubmit(ForkJoinTask<T> task) {
1375 submissionQueue.offer(task);
1376 int c; // try to increment event count -- CAS failure OK
1377 UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
1378 helpMaintainParallelism();
1379 }
1380
1381 /**
1382 * Performs the given task, returning its result upon completion.
1383 *
1384 * @param task the task
1385 * @return the task's result
1386 * @throws NullPointerException if the task is null
1387 * @throws RejectedExecutionException if the task cannot be
1388 * scheduled for execution
1389 */
1390 public <T> T invoke(ForkJoinTask<T> task) {
1391 if (task == null)
1392 throw new NullPointerException();
1393 if (runState >= SHUTDOWN)
1394 throw new RejectedExecutionException();
1395 Thread t = Thread.currentThread();
1396 if ((t instanceof ForkJoinWorkerThread) &&
1397 ((ForkJoinWorkerThread)t).pool == this)
1398 return task.invoke(); // bypass submit if in same pool
1399 else {
1400 doSubmit(task);
1401 return task.join();
1402 }
1403 }
1404
1405 /**
1406 * Unless terminating, forks task if within an ongoing FJ
1407 * computation in the current pool, else submits as external task.
1408 */
1409 private <T> void forkOrSubmit(ForkJoinTask<T> task) {
1410 if (runState >= SHUTDOWN)
1411 throw new RejectedExecutionException();
1412 Thread t = Thread.currentThread();
1413 if ((t instanceof ForkJoinWorkerThread) &&
1414 ((ForkJoinWorkerThread)t).pool == this)
1415 task.fork();
1416 else
1417 doSubmit(task);
1418 }
1419
1420 /**
1421 * Arranges for (asynchronous) execution of the given task.
1422 *
1423 * @param task the task
1424 * @throws NullPointerException if the task is null
1425 * @throws RejectedExecutionException if the task cannot be
1426 * scheduled for execution
1427 */
1428 public void execute(ForkJoinTask<?> task) {
1429 if (task == null)
1430 throw new NullPointerException();
1431 forkOrSubmit(task);
1432 }
1433
1434 // AbstractExecutorService methods
1435
1436 /**
1437 * @throws NullPointerException if the task is null
1438 * @throws RejectedExecutionException if the task cannot be
1439 * scheduled for execution
1440 */
1441 public void execute(Runnable task) {
1442 if (task == null)
1443 throw new NullPointerException();
1444 ForkJoinTask<?> job;
1445 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1446 job = (ForkJoinTask<?>) task;
1447 else
1448 job = ForkJoinTask.adapt(task, null);
1449 forkOrSubmit(job);
1450 }
1451
1452 /**
1453 * Submits a ForkJoinTask for execution.
1454 *
1455 * @param task the task to submit
1456 * @return the task
1457 * @throws NullPointerException if the task is null
1458 * @throws RejectedExecutionException if the task cannot be
1459 * scheduled for execution
1460 */
1461 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
1462 if (task == null)
1463 throw new NullPointerException();
1464 forkOrSubmit(task);
1465 return task;
1466 }
1467
1468 /**
1469 * @throws NullPointerException if the task is null
1470 * @throws RejectedExecutionException if the task cannot be
1471 * scheduled for execution
1472 */
1473 public <T> ForkJoinTask<T> submit(Callable<T> task) {
1474 if (task == null)
1475 throw new NullPointerException();
1476 ForkJoinTask<T> job = ForkJoinTask.adapt(task);
1477 forkOrSubmit(job);
1478 return job;
1479 }
1480
1481 /**
1482 * @throws NullPointerException if the task is null
1483 * @throws RejectedExecutionException if the task cannot be
1484 * scheduled for execution
1485 */
1486 public <T> ForkJoinTask<T> submit(Runnable task, T result) {
1487 if (task == null)
1488 throw new NullPointerException();
1489 ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
1490 forkOrSubmit(job);
1491 return job;
1492 }
1493
1494 /**
1495 * @throws NullPointerException if the task is null
1496 * @throws RejectedExecutionException if the task cannot be
1497 * scheduled for execution
1498 */
1499 public ForkJoinTask<?> submit(Runnable task) {
1500 if (task == null)
1501 throw new NullPointerException();
1502 ForkJoinTask<?> job;
1503 if (task instanceof ForkJoinTask<?>) // avoid re-wrap
1504 job = (ForkJoinTask<?>) task;
1505 else
1506 job = ForkJoinTask.adapt(task, null);
1507 forkOrSubmit(job);
1508 return job;
1509 }
1510
1511 /**
1512 * @throws NullPointerException {@inheritDoc}
1513 * @throws RejectedExecutionException {@inheritDoc}
1514 */
1515 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
1516 ArrayList<ForkJoinTask<T>> forkJoinTasks =
1517 new ArrayList<ForkJoinTask<T>>(tasks.size());
1518 for (Callable<T> task : tasks)
1519 forkJoinTasks.add(ForkJoinTask.adapt(task));
1520 invoke(new InvokeAll<T>(forkJoinTasks));
1521
1522 @SuppressWarnings({"unchecked", "rawtypes"})
1523 List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
1524 return futures;
1525 }
1526
1527 static final class InvokeAll<T> extends RecursiveAction {
1785 public List<Runnable> shutdownNow() {
1786 checkPermission();
1787 tryTerminate(true);
1788 return Collections.emptyList();
1789 }
1790
1791 /**
1792 * Returns {@code true} if all tasks have completed following shut down.
1793 *
1794 * @return {@code true} if all tasks have completed following shut down
1795 */
1796 public boolean isTerminated() {
1797 return runState >= TERMINATED;
1798 }
1799
1800 /**
1801 * Returns {@code true} if the process of termination has
1802 * commenced but not yet completed. This method may be useful for
1803 * debugging. A return of {@code true} reported a sufficient
1804 * period after shutdown may indicate that submitted tasks have
1805 * ignored or suppressed interruption, or are waiting for IO,
1806 * causing this executor not to properly terminate. (See the
1807 * advisory notes for class {@link ForkJoinTask} stating that
1808 * tasks should not normally entail blocking operations. But if
1809 * they do, they must abort them on interrupt.)
1810 *
1811 * @return {@code true} if terminating but not yet terminated
1812 */
1813 public boolean isTerminating() {
1814 return (runState & (TERMINATING|TERMINATED)) == TERMINATING;
1815 }
1816
1817 /**
1818 * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
1819 */
1820 final boolean isAtLeastTerminating() {
1821 return runState >= TERMINATING;
1822 }
1823
1824 /**
1825 * Returns {@code true} if this pool has been shut down.
1826 *
1827 * @return {@code true} if this pool has been shut down
1828 */
1829 public boolean isShutdown() {
1830 return runState >= SHUTDOWN;
1831 }
1832
1833 /**
1834 * Blocks until all tasks have completed execution after a shutdown
1835 * request, or the timeout occurs, or the current thread is
1836 * interrupted, whichever happens first.
1837 *
1838 * @param timeout the maximum time to wait
1839 * @param unit the time unit of the timeout argument
1840 * @return {@code true} if this executor terminated and
1841 * {@code false} if the timeout elapsed before termination
1842 * @throws InterruptedException if interrupted while waiting
1843 */
1844 public boolean awaitTermination(long timeout, TimeUnit unit)
1845 throws InterruptedException {
1846 try {
1847 termination.awaitAdvanceInterruptibly(0, timeout, unit);
1848 } catch (TimeoutException ex) {
1849 return false;
1850 }
1851 return true;
1852 }
1853
1854 /**
1855 * Interface for extending managed parallelism for tasks running
1856 * in {@link ForkJoinPool}s.
1857 *
1858 * <p>A {@code ManagedBlocker} provides two methods. Method
1859 * {@code isReleasable} must return {@code true} if blocking is
1860 * not necessary. Method {@code block} blocks the current thread
1861 * if necessary (perhaps internally invoking {@code isReleasable}
1862 * before actually blocking). The unusual methods in this API
1863 * accommodate synchronizers that may, but don't usually, block
1864 * for long periods. Similarly, they allow more efficient internal
1865 * handling of cases in which additional workers may be, but
1866 * usually are not, needed to ensure sufficient parallelism.
1867 * Toward this end, implementations of method {@code isReleasable}
1868 * must be amenable to repeated invocation.
1869 *
1870 * <p>For example, here is a ManagedBlocker based on a
1871 * ReentrantLock:
|