340 * 0, array.length).invoke();
341 * }
342 * }}</pre>
343 *
344 * Here, method {@code onCompletion} takes a form common to many
345 * completion designs that combine results. This callback-style method
346 * is triggered once per task, in either of the two different contexts
347 * in which the pending count is, or becomes, zero: (1) by a task
348 * itself, if its pending count is zero upon invocation of {@code
349 * tryComplete}, or (2) by any of its subtasks when they complete and
350 * decrement the pending count to zero. The {@code caller} argument
351 * distinguishes cases. Most often, when the caller is {@code this},
352 * no action is necessary. Otherwise the caller argument can be used
353 * (usually via a cast) to supply a value (and/or links to other
354 * values) to be combined. Assuming proper use of pending counts, the
355 * actions inside {@code onCompletion} occur (once) upon completion of
356 * a task and its subtasks. No additional synchronization is required
357 * within this method to ensure thread safety of accesses to fields of
358 * this task or other completed tasks.
359 *
360 * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
361 * process completions is inapplicable or inconvenient, you can use
362 * methods {@link #firstComplete} and {@link #nextComplete} to create
363 * custom traversals. For example, to define a MapReducer that only
364 * splits out right-hand tasks in the form of the third ForEach
365 * example, the completions must cooperatively reduce along
366 * unexhausted subtask links, which can be done as follows:
367 *
368 * <pre> {@code
369 * class MapReducer<E> extends CountedCompleter<E> { // version 2
370 * final E[] array; final MyMapper<E> mapper;
371 * final MyReducer<E> reducer; final int lo, hi;
372 * MapReducer<E> forks, next; // record subtask forks in list
373 * E result;
374 * MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
375 * MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
376 * super(p);
377 * this.array = array; this.mapper = mapper;
378 * this.reducer = reducer; this.lo = lo; this.hi = hi;
379 * this.next = next;
380 * }
536 * Adds (atomically) the given value to the pending count.
537 *
538 * @param delta the value to add
539 */
540 public final void addToPendingCount(int delta) {
541 PENDING.getAndAdd(this, delta);
542 }
543
544 /**
545 * Sets (atomically) the pending count to the given count only if
546 * it currently holds the given expected value.
547 *
548 * @param expected the expected value
549 * @param count the new value
550 * @return {@code true} if successful
551 */
552 public final boolean compareAndSetPendingCount(int expected, int count) {
553 return PENDING.compareAndSet(this, expected, count);
554 }
555
556 /**
557 * If the pending count is nonzero, (atomically) decrements it.
558 *
559 * @return the initial (undecremented) pending count holding on entry
560 * to this method
561 */
562 public final int decrementPendingCountUnlessZero() {
563 int c;
564 do {} while ((c = pending) != 0 &&
565 !PENDING.weakCompareAndSet(this, c, c - 1));
566 return c;
567 }
568
569 /**
570 * Returns the root of the current computation; i.e., this
571 * task if it has no completer, else its completer's root.
572 *
573 * @return the root of the current computation
574 */
575 public final CountedCompleter<?> getRoot() {
576 CountedCompleter<?> a = this, p;
577 while ((p = a.completer) != null)
578 a = p;
579 return a;
580 }
581
582 /**
583 * If the pending count is nonzero, decrements the count;
584 * otherwise invokes {@link #onCompletion(CountedCompleter)}
585 * and then similarly tries to complete this task's completer,
586 * if one exists, else marks this task as complete.
587 */
588 public final void tryComplete() {
589 CountedCompleter<?> a = this, s = a;
590 for (int c;;) {
591 if ((c = a.pending) == 0) {
592 a.onCompletion(s);
593 if ((a = (s = a).completer) == null) {
594 s.quietlyComplete();
595 return;
596 }
597 }
598 else if (PENDING.weakCompareAndSet(a, c, c - 1))
599 return;
600 }
601 }
602
603 /**
604 * Equivalent to {@link #tryComplete} but does not invoke {@link
605 * #onCompletion(CountedCompleter)} along the completion path:
606 * If the pending count is nonzero, decrements the count;
607 * otherwise, similarly tries to complete this task's completer, if
608 * one exists, else marks this task as complete. This method may be
609 * useful in cases where {@code onCompletion} should not, or need
610 * not, be invoked for each completer in a computation.
611 */
612 public final void propagateCompletion() {
613 CountedCompleter<?> a = this, s;
614 for (int c;;) {
615 if ((c = a.pending) == 0) {
616 if ((a = (s = a).completer) == null) {
617 s.quietlyComplete();
618 return;
619 }
620 }
621 else if (PENDING.weakCompareAndSet(a, c, c - 1))
622 return;
623 }
624 }
625
626 /**
627 * Regardless of pending count, invokes
628 * {@link #onCompletion(CountedCompleter)}, marks this task as
629 * complete and further triggers {@link #tryComplete} on this
630 * task's completer, if one exists. The given rawResult is
631 * used as an argument to {@link #setRawResult} before invoking
632 * {@link #onCompletion(CountedCompleter)} or marking this task
633 * as complete; its value is meaningful only for classes
634 * overriding {@code setRawResult}. This method does not modify
635 * the pending count.
636 *
637 * <p>This method may be useful when forcing completion as soon as
638 * any one (versus all) of several subtask results are obtained.
639 * However, in the common (and recommended) case in which {@code
640 * setRawResult} is not overridden, this effect can be obtained
641 * more simply using {@link #quietlyCompleteRoot()}.
646 CountedCompleter<?> p;
647 setRawResult(rawResult);
648 onCompletion(this);
649 quietlyComplete();
650 if ((p = completer) != null)
651 p.tryComplete();
652 }
653
654 /**
655 * If this task's pending count is zero, returns this task;
656 * otherwise decrements its pending count and returns {@code null}.
657 * This method is designed to be used with {@link #nextComplete} in
658 * completion traversal loops.
659 *
660 * @return this task, if pending count was zero, else {@code null}
661 */
662 public final CountedCompleter<?> firstComplete() {
663 for (int c;;) {
664 if ((c = pending) == 0)
665 return this;
666 else if (PENDING.weakCompareAndSet(this, c, c - 1))
667 return null;
668 }
669 }
670
671 /**
672 * If this task does not have a completer, invokes {@link
673 * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if
674 * the completer's pending count is non-zero, decrements that
675 * pending count and returns {@code null}. Otherwise, returns the
676 * completer. This method can be used as part of a completion
677 * traversal loop for homogeneous task hierarchies:
678 *
679 * <pre> {@code
680 * for (CountedCompleter<?> c = firstComplete();
681 * c != null;
682 * c = c.nextComplete()) {
683 * // ... process c ...
684 * }}</pre>
685 *
686 * @return the completer, or {@code null} if none
701 public final void quietlyCompleteRoot() {
702 for (CountedCompleter<?> a = this, p;;) {
703 if ((p = a.completer) == null) {
704 a.quietlyComplete();
705 return;
706 }
707 a = p;
708 }
709 }
710
711 /**
712 * If this task has not completed, attempts to process at most the
713 * given number of other unprocessed tasks for which this task is
714 * on the completion path, if any are known to exist.
715 *
716 * @param maxTasks the maximum number of tasks to process. If
717 * less than or equal to zero, then no tasks are
718 * processed.
719 */
720 public final void helpComplete(int maxTasks) {
721 Thread t; ForkJoinWorkerThread wt;
722 if (maxTasks > 0 && status >= 0) {
723 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
724 (wt = (ForkJoinWorkerThread)t).pool.
725 helpComplete(wt.workQueue, this, maxTasks);
726 else
727 ForkJoinPool.common.externalHelpComplete(this, maxTasks);
728 }
729 }
730
731 /**
732 * Supports ForkJoinTask exception propagation.
733 */
734 void internalPropagateException(Throwable ex) {
735 CountedCompleter<?> a = this, s = a;
736 while (a.onExceptionalCompletion(ex, s) &&
737 (a = (s = a).completer) != null && a.status >= 0 &&
738 isExceptionalStatus(a.recordExceptionalCompletion(ex)))
739 ;
740 }
741
742 /**
743 * Implements execution conventions for CountedCompleters.
744 */
745 protected final boolean exec() {
746 compute();
747 return false;
748 }
749
750 /**
751 * Returns the result of the computation. By default,
752 * returns {@code null}, which is appropriate for {@code Void}
753 * actions, but in other cases should be overridden, almost
754 * always to return a field or function of a field that
755 * holds the result upon completion.
756 *
757 * @return the result of the computation
758 */
759 public T getRawResult() { return null; }
760
761 /**
762 * A method that result-bearing CountedCompleters may optionally
763 * use to help maintain result data. By default, does nothing.
764 * Overrides are not recommended. However, if this method is
765 * overridden to update existing objects or fields, then it must
766 * in general be defined to be thread-safe.
767 */
768 protected void setRawResult(T t) { }
769
770 // VarHandle mechanics
771 private static final VarHandle PENDING;
772 static {
773 try {
774 MethodHandles.Lookup l = MethodHandles.lookup();
775 PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class);
776
777 } catch (ReflectiveOperationException e) {
778 throw new ExceptionInInitializerError(e);
779 }
780 }
781 }
|
340 * 0, array.length).invoke();
341 * }
342 * }}</pre>
343 *
344 * Here, method {@code onCompletion} takes a form common to many
345 * completion designs that combine results. This callback-style method
346 * is triggered once per task, in either of the two different contexts
347 * in which the pending count is, or becomes, zero: (1) by a task
348 * itself, if its pending count is zero upon invocation of {@code
349 * tryComplete}, or (2) by any of its subtasks when they complete and
350 * decrement the pending count to zero. The {@code caller} argument
351 * distinguishes cases. Most often, when the caller is {@code this},
352 * no action is necessary. Otherwise the caller argument can be used
353 * (usually via a cast) to supply a value (and/or links to other
354 * values) to be combined. Assuming proper use of pending counts, the
355 * actions inside {@code onCompletion} occur (once) upon completion of
356 * a task and its subtasks. No additional synchronization is required
357 * within this method to ensure thread safety of accesses to fields of
358 * this task or other completed tasks.
359 *
360 * <p><b>Completion Traversals.</b> If using {@code onCompletion} to
361 * process completions is inapplicable or inconvenient, you can use
362 * methods {@link #firstComplete} and {@link #nextComplete} to create
363 * custom traversals. For example, to define a MapReducer that only
364 * splits out right-hand tasks in the form of the third ForEach
365 * example, the completions must cooperatively reduce along
366 * unexhausted subtask links, which can be done as follows:
367 *
368 * <pre> {@code
369 * class MapReducer<E> extends CountedCompleter<E> { // version 2
370 * final E[] array; final MyMapper<E> mapper;
371 * final MyReducer<E> reducer; final int lo, hi;
372 * MapReducer<E> forks, next; // record subtask forks in list
373 * E result;
374 * MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
375 * MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
376 * super(p);
377 * this.array = array; this.mapper = mapper;
378 * this.reducer = reducer; this.lo = lo; this.hi = hi;
379 * this.next = next;
380 * }
536 * Adds (atomically) the given value to the pending count.
537 *
538 * @param delta the value to add
539 */
540 public final void addToPendingCount(int delta) {
541 PENDING.getAndAdd(this, delta);
542 }
543
544 /**
545 * Sets (atomically) the pending count to the given count only if
546 * it currently holds the given expected value.
547 *
548 * @param expected the expected value
549 * @param count the new value
550 * @return {@code true} if successful
551 */
552 public final boolean compareAndSetPendingCount(int expected, int count) {
553 return PENDING.compareAndSet(this, expected, count);
554 }
555
556 // internal-only weak version
557 final boolean weakCompareAndSetPendingCount(int expected, int count) {
558 return PENDING.weakCompareAndSet(this, expected, count);
559 }
560
561 /**
562 * If the pending count is nonzero, (atomically) decrements it.
563 *
564 * @return the initial (undecremented) pending count holding on entry
565 * to this method
566 */
567 public final int decrementPendingCountUnlessZero() {
568 int c;
569 do {} while ((c = pending) != 0 &&
570 !weakCompareAndSetPendingCount(c, c - 1));
571 return c;
572 }
573
574 /**
575 * Returns the root of the current computation; i.e., this
576 * task if it has no completer, else its completer's root.
577 *
578 * @return the root of the current computation
579 */
580 public final CountedCompleter<?> getRoot() {
581 CountedCompleter<?> a = this, p;
582 while ((p = a.completer) != null)
583 a = p;
584 return a;
585 }
586
587 /**
588 * If the pending count is nonzero, decrements the count;
589 * otherwise invokes {@link #onCompletion(CountedCompleter)}
590 * and then similarly tries to complete this task's completer,
591 * if one exists, else marks this task as complete.
592 */
593 public final void tryComplete() {
594 CountedCompleter<?> a = this, s = a;
595 for (int c;;) {
596 if ((c = a.pending) == 0) {
597 a.onCompletion(s);
598 if ((a = (s = a).completer) == null) {
599 s.quietlyComplete();
600 return;
601 }
602 }
603 else if (a.weakCompareAndSetPendingCount(c, c - 1))
604 return;
605 }
606 }
607
608 /**
609 * Equivalent to {@link #tryComplete} but does not invoke {@link
610 * #onCompletion(CountedCompleter)} along the completion path:
611 * If the pending count is nonzero, decrements the count;
612 * otherwise, similarly tries to complete this task's completer, if
613 * one exists, else marks this task as complete. This method may be
614 * useful in cases where {@code onCompletion} should not, or need
615 * not, be invoked for each completer in a computation.
616 */
617 public final void propagateCompletion() {
618 CountedCompleter<?> a = this, s;
619 for (int c;;) {
620 if ((c = a.pending) == 0) {
621 if ((a = (s = a).completer) == null) {
622 s.quietlyComplete();
623 return;
624 }
625 }
626 else if (a.weakCompareAndSetPendingCount(c, c - 1))
627 return;
628 }
629 }
630
631 /**
632 * Regardless of pending count, invokes
633 * {@link #onCompletion(CountedCompleter)}, marks this task as
634 * complete and further triggers {@link #tryComplete} on this
635 * task's completer, if one exists. The given rawResult is
636 * used as an argument to {@link #setRawResult} before invoking
637 * {@link #onCompletion(CountedCompleter)} or marking this task
638 * as complete; its value is meaningful only for classes
639 * overriding {@code setRawResult}. This method does not modify
640 * the pending count.
641 *
642 * <p>This method may be useful when forcing completion as soon as
643 * any one (versus all) of several subtask results are obtained.
644 * However, in the common (and recommended) case in which {@code
645 * setRawResult} is not overridden, this effect can be obtained
646 * more simply using {@link #quietlyCompleteRoot()}.
651 CountedCompleter<?> p;
652 setRawResult(rawResult);
653 onCompletion(this);
654 quietlyComplete();
655 if ((p = completer) != null)
656 p.tryComplete();
657 }
658
659 /**
660 * If this task's pending count is zero, returns this task;
661 * otherwise decrements its pending count and returns {@code null}.
662 * This method is designed to be used with {@link #nextComplete} in
663 * completion traversal loops.
664 *
665 * @return this task, if pending count was zero, else {@code null}
666 */
667 public final CountedCompleter<?> firstComplete() {
668 for (int c;;) {
669 if ((c = pending) == 0)
670 return this;
671 else if (weakCompareAndSetPendingCount(c, c - 1))
672 return null;
673 }
674 }
675
676 /**
677 * If this task does not have a completer, invokes {@link
678 * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if
679 * the completer's pending count is non-zero, decrements that
680 * pending count and returns {@code null}. Otherwise, returns the
681 * completer. This method can be used as part of a completion
682 * traversal loop for homogeneous task hierarchies:
683 *
684 * <pre> {@code
685 * for (CountedCompleter<?> c = firstComplete();
686 * c != null;
687 * c = c.nextComplete()) {
688 * // ... process c ...
689 * }}</pre>
690 *
691 * @return the completer, or {@code null} if none
706 public final void quietlyCompleteRoot() {
707 for (CountedCompleter<?> a = this, p;;) {
708 if ((p = a.completer) == null) {
709 a.quietlyComplete();
710 return;
711 }
712 a = p;
713 }
714 }
715
716 /**
717 * If this task has not completed, attempts to process at most the
718 * given number of other unprocessed tasks for which this task is
719 * on the completion path, if any are known to exist.
720 *
721 * @param maxTasks the maximum number of tasks to process. If
722 * less than or equal to zero, then no tasks are
723 * processed.
724 */
725 public final void helpComplete(int maxTasks) {
726 ForkJoinPool.WorkQueue q; Thread t; boolean owned;
727 if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
728 q = ((ForkJoinWorkerThread)t).workQueue;
729 else
730 q = ForkJoinPool.commonQueue();
731 if (q != null && maxTasks > 0)
732 q.helpComplete(this, owned, maxTasks);
733 }
734
735 // ForkJoinTask overrides
736
737 /**
738 * Supports ForkJoinTask exception propagation.
739 */
740 @Override
741 final int trySetException(Throwable ex) {
742 CountedCompleter<?> a = this, p = a;
743 do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
744 a.onExceptionalCompletion(ex, p) &&
745 (a = (p = a).completer) != null && a.status >= 0);
746 return status;
747 }
748
749 /**
750 * Implements execution conventions for CountedCompleters.
751 */
752 @Override
753 protected final boolean exec() {
754 compute();
755 return false;
756 }
757
758 /**
759 * Returns the result of the computation. By default,
760 * returns {@code null}, which is appropriate for {@code Void}
761 * actions, but in other cases should be overridden, almost
762 * always to return a field or function of a field that
763 * holds the result upon completion.
764 *
765 * @return the result of the computation
766 */
767 @Override
768 public T getRawResult() { return null; }
769
770 /**
771 * A method that result-bearing CountedCompleters may optionally
772 * use to help maintain result data. By default, does nothing.
773 * Overrides are not recommended. However, if this method is
774 * overridden to update existing objects or fields, then it must
775 * in general be defined to be thread-safe.
776 */
777 @Override
778 protected void setRawResult(T t) { }
779
780 // VarHandle mechanics
781 private static final VarHandle PENDING;
782 static {
783 try {
784 MethodHandles.Lookup l = MethodHandles.lookup();
785 PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class);
786
787 } catch (ReflectiveOperationException e) {
788 throw new ExceptionInInitializerError(e);
789 }
790 }
791 }
|