< prev index next >

src/java.base/share/classes/java/util/concurrent/CountedCompleter.java

Print this page
8246585: ForkJoin updates
Reviewed-by: martin


 340  *                              0, array.length).invoke();
 341  *   }
 342  * }}</pre>
 343  *
 344  * Here, method {@code onCompletion} takes a form common to many
 345  * completion designs that combine results. This callback-style method
 346  * is triggered once per task, in either of the two different contexts
 347  * in which the pending count is, or becomes, zero: (1) by a task
 348  * itself, if its pending count is zero upon invocation of {@code
 349  * tryComplete}, or (2) by any of its subtasks when they complete and
 350  * decrement the pending count to zero. The {@code caller} argument
 351  * distinguishes cases.  Most often, when the caller is {@code this},
 352  * no action is necessary. Otherwise the caller argument can be used
 353  * (usually via a cast) to supply a value (and/or links to other
 354  * values) to be combined.  Assuming proper use of pending counts, the
 355  * actions inside {@code onCompletion} occur (once) upon completion of
 356  * a task and its subtasks. No additional synchronization is required
 357  * within this method to ensure thread safety of accesses to fields of
 358  * this task or other completed tasks.
 359  *
 360  * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
 361  * process completions is inapplicable or inconvenient, you can use
 362  * methods {@link #firstComplete} and {@link #nextComplete} to create
 363  * custom traversals.  For example, to define a MapReducer that only
 364  * splits out right-hand tasks in the form of the third ForEach
 365  * example, the completions must cooperatively reduce along
 366  * unexhausted subtask links, which can be done as follows:
 367  *
 368  * <pre> {@code
 369  * class MapReducer<E> extends CountedCompleter<E> { // version 2
 370  *   final E[] array; final MyMapper<E> mapper;
 371  *   final MyReducer<E> reducer; final int lo, hi;
 372  *   MapReducer<E> forks, next; // record subtask forks in list
 373  *   E result;
 374  *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
 375  *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
 376  *     super(p);
 377  *     this.array = array; this.mapper = mapper;
 378  *     this.reducer = reducer; this.lo = lo; this.hi = hi;
 379  *     this.next = next;
 380  *   }


 536      * Adds (atomically) the given value to the pending count.
 537      *
 538      * @param delta the value to add
 539      */
 540     public final void addToPendingCount(int delta) {
 541         PENDING.getAndAdd(this, delta);
 542     }
 543 
 544     /**
 545      * Sets (atomically) the pending count to the given count only if
 546      * it currently holds the given expected value.
 547      *
 548      * @param expected the expected value
 549      * @param count the new value
 550      * @return {@code true} if successful
 551      */
 552     public final boolean compareAndSetPendingCount(int expected, int count) {
 553         return PENDING.compareAndSet(this, expected, count);
 554     }
 555 





 556     /**
 557      * If the pending count is nonzero, (atomically) decrements it.
 558      *
 559      * @return the initial (undecremented) pending count holding on entry
 560      * to this method
 561      */
 562     public final int decrementPendingCountUnlessZero() {
 563         int c;
 564         do {} while ((c = pending) != 0 &&
 565                      !PENDING.weakCompareAndSet(this, c, c - 1));
 566         return c;
 567     }
 568 
 569     /**
 570      * Returns the root of the current computation; i.e., this
 571      * task if it has no completer, else its completer's root.
 572      *
 573      * @return the root of the current computation
 574      */
 575     public final CountedCompleter<?> getRoot() {
 576         CountedCompleter<?> a = this, p;
 577         while ((p = a.completer) != null)
 578             a = p;
 579         return a;
 580     }
 581 
 582     /**
 583      * If the pending count is nonzero, decrements the count;
 584      * otherwise invokes {@link #onCompletion(CountedCompleter)}
 585      * and then similarly tries to complete this task's completer,
 586      * if one exists, else marks this task as complete.
 587      */
 588     public final void tryComplete() {
 589         CountedCompleter<?> a = this, s = a;
 590         for (int c;;) {
 591             if ((c = a.pending) == 0) {
 592                 a.onCompletion(s);
 593                 if ((a = (s = a).completer) == null) {
 594                     s.quietlyComplete();
 595                     return;
 596                 }
 597             }
 598             else if (PENDING.weakCompareAndSet(a, c, c - 1))
 599                 return;
 600         }
 601     }
 602 
 603     /**
 604      * Equivalent to {@link #tryComplete} but does not invoke {@link
 605      * #onCompletion(CountedCompleter)} along the completion path:
 606      * If the pending count is nonzero, decrements the count;
 607      * otherwise, similarly tries to complete this task's completer, if
 608      * one exists, else marks this task as complete. This method may be
 609      * useful in cases where {@code onCompletion} should not, or need
 610      * not, be invoked for each completer in a computation.
 611      */
 612     public final void propagateCompletion() {
 613         CountedCompleter<?> a = this, s;
 614         for (int c;;) {
 615             if ((c = a.pending) == 0) {
 616                 if ((a = (s = a).completer) == null) {
 617                     s.quietlyComplete();
 618                     return;
 619                 }
 620             }
 621             else if (PENDING.weakCompareAndSet(a, c, c - 1))
 622                 return;
 623         }
 624     }
 625 
 626     /**
 627      * Regardless of pending count, invokes
 628      * {@link #onCompletion(CountedCompleter)}, marks this task as
 629      * complete and further triggers {@link #tryComplete} on this
 630      * task's completer, if one exists.  The given rawResult is
 631      * used as an argument to {@link #setRawResult} before invoking
 632      * {@link #onCompletion(CountedCompleter)} or marking this task
 633      * as complete; its value is meaningful only for classes
 634      * overriding {@code setRawResult}.  This method does not modify
 635      * the pending count.
 636      *
 637      * <p>This method may be useful when forcing completion as soon as
 638      * any one (versus all) of several subtask results are obtained.
 639      * However, in the common (and recommended) case in which {@code
 640      * setRawResult} is not overridden, this effect can be obtained
 641      * more simply using {@link #quietlyCompleteRoot()}.


 646         CountedCompleter<?> p;
 647         setRawResult(rawResult);
 648         onCompletion(this);
 649         quietlyComplete();
 650         if ((p = completer) != null)
 651             p.tryComplete();
 652     }
 653 
 654     /**
 655      * If this task's pending count is zero, returns this task;
 656      * otherwise decrements its pending count and returns {@code null}.
 657      * This method is designed to be used with {@link #nextComplete} in
 658      * completion traversal loops.
 659      *
 660      * @return this task, if pending count was zero, else {@code null}
 661      */
 662     public final CountedCompleter<?> firstComplete() {
 663         for (int c;;) {
 664             if ((c = pending) == 0)
 665                 return this;
 666             else if (PENDING.weakCompareAndSet(this, c, c - 1))
 667                 return null;
 668         }
 669     }
 670 
 671     /**
 672      * If this task does not have a completer, invokes {@link
 673      * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
 674      * the completer's pending count is non-zero, decrements that
 675      * pending count and returns {@code null}.  Otherwise, returns the
 676      * completer.  This method can be used as part of a completion
 677      * traversal loop for homogeneous task hierarchies:
 678      *
 679      * <pre> {@code
 680      * for (CountedCompleter<?> c = firstComplete();
 681      *      c != null;
 682      *      c = c.nextComplete()) {
 683      *   // ... process c ...
 684      * }}</pre>
 685      *
 686      * @return the completer, or {@code null} if none


 701     public final void quietlyCompleteRoot() {
 702         for (CountedCompleter<?> a = this, p;;) {
 703             if ((p = a.completer) == null) {
 704                 a.quietlyComplete();
 705                 return;
 706             }
 707             a = p;
 708         }
 709     }
 710 
 711     /**
 712      * If this task has not completed, attempts to process at most the
 713      * given number of other unprocessed tasks for which this task is
 714      * on the completion path, if any are known to exist.
 715      *
 716      * @param maxTasks the maximum number of tasks to process.  If
 717      *                 less than or equal to zero, then no tasks are
 718      *                 processed.
 719      */
 720     public final void helpComplete(int maxTasks) {
 721         Thread t; ForkJoinWorkerThread wt;
 722         if (maxTasks > 0 && status >= 0) {
 723             if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
 724                 (wt = (ForkJoinWorkerThread)t).pool.
 725                     helpComplete(wt.workQueue, this, maxTasks);
 726             else
 727                 ForkJoinPool.common.externalHelpComplete(this, maxTasks);
 728         }

 729     }
 730 


 731     /**
 732      * Supports ForkJoinTask exception propagation.
 733      */
 734     void internalPropagateException(Throwable ex) {
 735         CountedCompleter<?> a = this, s = a;
 736         while (a.onExceptionalCompletion(ex, s) &&
 737                (a = (s = a).completer) != null && a.status >= 0 &&
 738                isExceptionalStatus(a.recordExceptionalCompletion(ex)))
 739             ;

 740     }
 741 
 742     /**
 743      * Implements execution conventions for CountedCompleters.
 744      */

 745     protected final boolean exec() {
 746         compute();
 747         return false;
 748     }
 749 
 750     /**
 751      * Returns the result of the computation.  By default,
 752      * returns {@code null}, which is appropriate for {@code Void}
 753      * actions, but in other cases should be overridden, almost
 754      * always to return a field or function of a field that
 755      * holds the result upon completion.
 756      *
 757      * @return the result of the computation
 758      */

 759     public T getRawResult() { return null; }
 760 
 761     /**
 762      * A method that result-bearing CountedCompleters may optionally
 763      * use to help maintain result data.  By default, does nothing.
 764      * Overrides are not recommended. However, if this method is
 765      * overridden to update existing objects or fields, then it must
 766      * in general be defined to be thread-safe.
 767      */

 768     protected void setRawResult(T t) { }
 769 
 770     // VarHandle mechanics
 771     private static final VarHandle PENDING;
 772     static {
 773         try {
 774             MethodHandles.Lookup l = MethodHandles.lookup();
 775             PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class);
 776 
 777         } catch (ReflectiveOperationException e) {
 778             throw new ExceptionInInitializerError(e);
 779         }
 780     }
 781 }


 340  *                              0, array.length).invoke();
 341  *   }
 342  * }}</pre>
 343  *
 344  * Here, method {@code onCompletion} takes a form common to many
 345  * completion designs that combine results. This callback-style method
 346  * is triggered once per task, in either of the two different contexts
 347  * in which the pending count is, or becomes, zero: (1) by a task
 348  * itself, if its pending count is zero upon invocation of {@code
 349  * tryComplete}, or (2) by any of its subtasks when they complete and
 350  * decrement the pending count to zero. The {@code caller} argument
 351  * distinguishes cases.  Most often, when the caller is {@code this},
 352  * no action is necessary. Otherwise the caller argument can be used
 353  * (usually via a cast) to supply a value (and/or links to other
 354  * values) to be combined.  Assuming proper use of pending counts, the
 355  * actions inside {@code onCompletion} occur (once) upon completion of
 356  * a task and its subtasks. No additional synchronization is required
 357  * within this method to ensure thread safety of accesses to fields of
 358  * this task or other completed tasks.
 359  *
 360  * <p><b>Completion Traversals.</b> If using {@code onCompletion} to
 361  * process completions is inapplicable or inconvenient, you can use
 362  * methods {@link #firstComplete} and {@link #nextComplete} to create
 363  * custom traversals.  For example, to define a MapReducer that only
 364  * splits out right-hand tasks in the form of the third ForEach
 365  * example, the completions must cooperatively reduce along
 366  * unexhausted subtask links, which can be done as follows:
 367  *
 368  * <pre> {@code
 369  * class MapReducer<E> extends CountedCompleter<E> { // version 2
 370  *   final E[] array; final MyMapper<E> mapper;
 371  *   final MyReducer<E> reducer; final int lo, hi;
 372  *   MapReducer<E> forks, next; // record subtask forks in list
 373  *   E result;
 374  *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
 375  *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
 376  *     super(p);
 377  *     this.array = array; this.mapper = mapper;
 378  *     this.reducer = reducer; this.lo = lo; this.hi = hi;
 379  *     this.next = next;
 380  *   }


 536      * Adds (atomically) the given value to the pending count.
 537      *
 538      * @param delta the value to add
 539      */
 540     public final void addToPendingCount(int delta) {
 541         PENDING.getAndAdd(this, delta);
 542     }
 543 
 544     /**
 545      * Sets (atomically) the pending count to the given count only if
 546      * it currently holds the given expected value.
 547      *
 548      * @param expected the expected value
 549      * @param count the new value
 550      * @return {@code true} if successful
 551      */
 552     public final boolean compareAndSetPendingCount(int expected, int count) {
 553         return PENDING.compareAndSet(this, expected, count);
 554     }
 555 
 556     // internal-only weak version
 557     final boolean weakCompareAndSetPendingCount(int expected, int count) {
 558         return PENDING.weakCompareAndSet(this, expected, count);
 559     }
 560 
 561     /**
 562      * If the pending count is nonzero, (atomically) decrements it.
 563      *
 564      * @return the initial (undecremented) pending count holding on entry
 565      * to this method
 566      */
 567     public final int decrementPendingCountUnlessZero() {
 568         int c;
 569         do {} while ((c = pending) != 0 &&
 570                      !weakCompareAndSetPendingCount(c, c - 1));
 571         return c;
 572     }
 573 
 574     /**
 575      * Returns the root of the current computation; i.e., this
 576      * task if it has no completer, else its completer's root.
 577      *
 578      * @return the root of the current computation
 579      */
 580     public final CountedCompleter<?> getRoot() {
 581         CountedCompleter<?> a = this, p;
 582         while ((p = a.completer) != null)
 583             a = p;
 584         return a;
 585     }
 586 
 587     /**
 588      * If the pending count is nonzero, decrements the count;
 589      * otherwise invokes {@link #onCompletion(CountedCompleter)}
 590      * and then similarly tries to complete this task's completer,
 591      * if one exists, else marks this task as complete.
 592      */
 593     public final void tryComplete() {
 594         CountedCompleter<?> a = this, s = a;
 595         for (int c;;) {
 596             if ((c = a.pending) == 0) {
 597                 a.onCompletion(s);
 598                 if ((a = (s = a).completer) == null) {
 599                     s.quietlyComplete();
 600                     return;
 601                 }
 602             }
 603             else if (a.weakCompareAndSetPendingCount(c, c - 1))
 604                 return;
 605         }
 606     }
 607 
 608     /**
 609      * Equivalent to {@link #tryComplete} but does not invoke {@link
 610      * #onCompletion(CountedCompleter)} along the completion path:
 611      * If the pending count is nonzero, decrements the count;
 612      * otherwise, similarly tries to complete this task's completer, if
 613      * one exists, else marks this task as complete. This method may be
 614      * useful in cases where {@code onCompletion} should not, or need
 615      * not, be invoked for each completer in a computation.
 616      */
 617     public final void propagateCompletion() {
 618         CountedCompleter<?> a = this, s;
 619         for (int c;;) {
 620             if ((c = a.pending) == 0) {
 621                 if ((a = (s = a).completer) == null) {
 622                     s.quietlyComplete();
 623                     return;
 624                 }
 625             }
 626             else if (a.weakCompareAndSetPendingCount(c, c - 1))
 627                 return;
 628         }
 629     }
 630 
 631     /**
 632      * Regardless of pending count, invokes
 633      * {@link #onCompletion(CountedCompleter)}, marks this task as
 634      * complete and further triggers {@link #tryComplete} on this
 635      * task's completer, if one exists.  The given rawResult is
 636      * used as an argument to {@link #setRawResult} before invoking
 637      * {@link #onCompletion(CountedCompleter)} or marking this task
 638      * as complete; its value is meaningful only for classes
 639      * overriding {@code setRawResult}.  This method does not modify
 640      * the pending count.
 641      *
 642      * <p>This method may be useful when forcing completion as soon as
 643      * any one (versus all) of several subtask results are obtained.
 644      * However, in the common (and recommended) case in which {@code
 645      * setRawResult} is not overridden, this effect can be obtained
 646      * more simply using {@link #quietlyCompleteRoot()}.


 651         CountedCompleter<?> p;
 652         setRawResult(rawResult);
 653         onCompletion(this);
 654         quietlyComplete();
 655         if ((p = completer) != null)
 656             p.tryComplete();
 657     }
 658 
 659     /**
 660      * If this task's pending count is zero, returns this task;
 661      * otherwise decrements its pending count and returns {@code null}.
 662      * This method is designed to be used with {@link #nextComplete} in
 663      * completion traversal loops.
 664      *
 665      * @return this task, if pending count was zero, else {@code null}
 666      */
 667     public final CountedCompleter<?> firstComplete() {
 668         for (int c;;) {
 669             if ((c = pending) == 0)
 670                 return this;
 671             else if (weakCompareAndSetPendingCount(c, c - 1))
 672                 return null;
 673         }
 674     }
 675 
 676     /**
 677      * If this task does not have a completer, invokes {@link
 678      * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
 679      * the completer's pending count is non-zero, decrements that
 680      * pending count and returns {@code null}.  Otherwise, returns the
 681      * completer.  This method can be used as part of a completion
 682      * traversal loop for homogeneous task hierarchies:
 683      *
 684      * <pre> {@code
 685      * for (CountedCompleter<?> c = firstComplete();
 686      *      c != null;
 687      *      c = c.nextComplete()) {
 688      *   // ... process c ...
 689      * }}</pre>
 690      *
 691      * @return the completer, or {@code null} if none


 706     public final void quietlyCompleteRoot() {
 707         for (CountedCompleter<?> a = this, p;;) {
 708             if ((p = a.completer) == null) {
 709                 a.quietlyComplete();
 710                 return;
 711             }
 712             a = p;
 713         }
 714     }
 715 
 716     /**
 717      * If this task has not completed, attempts to process at most the
 718      * given number of other unprocessed tasks for which this task is
 719      * on the completion path, if any are known to exist.
 720      *
 721      * @param maxTasks the maximum number of tasks to process.  If
 722      *                 less than or equal to zero, then no tasks are
 723      *                 processed.
 724      */
 725     public final void helpComplete(int maxTasks) {
 726         ForkJoinPool.WorkQueue q; Thread t; boolean owned;
 727         if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
 728             q = ((ForkJoinWorkerThread)t).workQueue;


 729         else
 730             q = ForkJoinPool.commonQueue();
 731         if (q != null && maxTasks > 0)
 732             q.helpComplete(this, owned, maxTasks);
 733     }
 734 
 735     // ForkJoinTask overrides
 736 
 737     /**
 738      * Supports ForkJoinTask exception propagation.
 739      */
 740     @Override
 741     final int trySetException(Throwable ex) {
 742         CountedCompleter<?> a = this, p = a;
 743         do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
 744                      a.onExceptionalCompletion(ex, p) &&
 745                      (a = (p = a).completer) != null && a.status >= 0);
 746         return status;
 747     }
 748 
 749     /**
 750      * Implements execution conventions for CountedCompleters.
 751      */
 752     @Override
 753     protected final boolean exec() {
 754         compute();
 755         return false;
 756     }
 757 
 758     /**
 759      * Returns the result of the computation.  By default,
 760      * returns {@code null}, which is appropriate for {@code Void}
 761      * actions, but in other cases should be overridden, almost
 762      * always to return a field or function of a field that
 763      * holds the result upon completion.
 764      *
 765      * @return the result of the computation
 766      */
 767     @Override
 768     public T getRawResult() { return null; }
 769 
 770     /**
 771      * A method that result-bearing CountedCompleters may optionally
 772      * use to help maintain result data.  By default, does nothing.
 773      * Overrides are not recommended. However, if this method is
 774      * overridden to update existing objects or fields, then it must
 775      * in general be defined to be thread-safe.
 776      */
 777     @Override
 778     protected void setRawResult(T t) { }
 779 
 780     // VarHandle mechanics
 781     private static final VarHandle PENDING;
 782     static {
 783         try {
 784             MethodHandles.Lookup l = MethodHandles.lookup();
 785             PENDING = l.findVarHandle(CountedCompleter.class, "pending", int.class);
 786 
 787         } catch (ReflectiveOperationException e) {
 788             throw new ExceptionInInitializerError(e);
 789         }
 790     }
 791 }
< prev index next >