< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java

Print this page
8246585: ForkJoin updates
Reviewed-by: martin


  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.security.AccessControlContext;
  39 import java.security.AccessController;
  40 import java.security.PrivilegedAction;
  41 import java.security.ProtectionDomain;
  42 
  43 /**
  44  * A thread managed by a {@link ForkJoinPool}, which executes
  45  * {@link ForkJoinTask}s.
  46  * This class is subclassable solely for the sake of adding
  47  * functionality -- there are no overridable methods dealing with
  48  * scheduling or execution.  However, you can override initialization
  49  * and termination methods surrounding the main task processing loop.
  50  * If you do create such a subclass, you will also need to supply a
  51  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
  52  * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory,
  53  * UncaughtExceptionHandler, boolean, int, int, int, Predicate, long, TimeUnit)
  54  * use it} in a {@code ForkJoinPool}.
  55  *
  56  * @since 1.7
  57  * @author Doug Lea
  58  */
  59 public class ForkJoinWorkerThread extends Thread {
  60     /*
  61      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  62      * ForkJoinTasks. For explanation, see the internal documentation
  63      * of class ForkJoinPool.
  64      *
  65      * This class just maintains links to its pool and WorkQueue.  The
  66      * pool field is set immediately upon construction, but the
  67      * workQueue field is not set until a call to registerWorker
  68      * completes. This leads to a visibility race, that is tolerated
  69      * by requiring that the workQueue field is only accessed by the
  70      * owning thread.
  71      *
  72      * Support for (non-public) subclass InnocuousForkJoinWorkerThread
  73      * requires that we break quite a lot of encapsulation (via helper
  74      * methods in ThreadLocalRandom) both here and in the subclass to
  75      * access and set Thread fields.
  76      */
  77 
  78     final ForkJoinPool pool;                // the pool this thread works in
  79     final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
  80 
  81     /** An AccessControlContext supporting no privileges */
  82     private static final AccessControlContext INNOCUOUS_ACC =
  83         new AccessControlContext(
  84             new ProtectionDomain[] { new ProtectionDomain(null, null) });










  85 
  86     /**
  87      * Creates a ForkJoinWorkerThread operating in the given pool.

  88      *

  89      * @param pool the pool this thread works in
  90      * @throws NullPointerException if pool is null
  91      */
  92     protected ForkJoinWorkerThread(ForkJoinPool pool) {
  93         // Use a placeholder until a useful name can be set in registerWorker
  94         super("aForkJoinWorkerThread");
  95         this.pool = pool;
  96         this.workQueue = pool.registerWorker(this);
  97     }
  98 
  99     /**
 100      * Version for use by the default pool.  Supports setting the
 101      * context class loader.  This is a separate constructor to avoid
 102      * affecting the protected constructor.
 103      */
 104     ForkJoinWorkerThread(ForkJoinPool pool, ClassLoader ccl) {
 105         super("aForkJoinWorkerThread");
 106         super.setContextClassLoader(ccl);
 107         ThreadLocalRandom.setInheritedAccessControlContext(this, INNOCUOUS_ACC);
 108         this.pool = pool;
 109         this.workQueue = pool.registerWorker(this);
 110     }
 111 
 112     /**
 113      * Version for InnocuousForkJoinWorkerThread.
 114      */
 115     ForkJoinWorkerThread(ForkJoinPool pool,
 116                          ClassLoader ccl,
 117                          ThreadGroup threadGroup,
 118                          AccessControlContext acc) {
 119         super(threadGroup, null, "aForkJoinWorkerThread");
 120         super.setContextClassLoader(ccl);
 121         ThreadLocalRandom.setInheritedAccessControlContext(this, acc);
 122         ThreadLocalRandom.eraseThreadLocals(this); // clear before registering
 123         this.pool = pool;
 124         this.workQueue = pool.registerWorker(this);
 125     }
 126 
 127     /**
 128      * Returns the pool hosting this thread.
 129      *
 130      * @return the pool
 131      */
 132     public ForkJoinPool getPool() {
 133         return pool;
 134     }
 135 
 136     /**
 137      * Returns the unique index number of this thread in its pool.
 138      * The returned value ranges from zero to the maximum number of
 139      * threads (minus one) that may exist in the pool, and does not
 140      * change during the lifetime of the thread.  This method may be
 141      * useful for applications that track status or collect results
 142      * per-worker-thread rather than per-task.
 143      *
 144      * @return the index number


 159     protected void onStart() {
 160     }
 161 
 162     /**
 163      * Performs cleanup associated with termination of this worker
 164      * thread.  If you override this method, you must invoke
 165      * {@code super.onTermination} at the end of the overridden method.
 166      *
 167      * @param exception the exception causing this thread to abort due
 168      * to an unrecoverable error, or {@code null} if completed normally
 169      */
 170     protected void onTermination(Throwable exception) {
 171     }
 172 
 173     /**
 174      * This method is required to be public, but should never be
 175      * called explicitly. It performs the main run loop to execute
 176      * {@link ForkJoinTask}s.
 177      */
 178     public void run() {
 179         if (workQueue.array == null) { // only run once
 180             Throwable exception = null;



 181             try {

 182                 onStart();
 183                 pool.runWorker(workQueue);
 184             } catch (Throwable ex) {
 185                 exception = ex;
 186             } finally {
 187                 try {
 188                     onTermination(exception);
 189                 } catch (Throwable ex) {
 190                     if (exception == null)
 191                         exception = ex;
 192                 } finally {
 193                     pool.deregisterWorker(this, exception);
 194                 }
 195             }
 196         }
 197     }
 198 
 199     /**
 200      * Non-public hook method for InnocuousForkJoinWorkerThread.
 201      */
 202     void afterTopLevelExec() {
 203     }
 204 
 205     /**
 206      * A worker thread that has no permissions, is not a member of any
 207      * user-defined ThreadGroup, uses the system class loader as
 208      * thread context class loader, and erases all ThreadLocals after
 209      * running each top-level task.
 210      */
 211     static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
 212         /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
 213         private static final ThreadGroup innocuousThreadGroup =
 214             AccessController.doPrivileged(new PrivilegedAction<>() {
 215                 public ThreadGroup run() {
 216                     ThreadGroup group = Thread.currentThread().getThreadGroup();
 217                     for (ThreadGroup p; (p = group.getParent()) != null; )
 218                         group = p;
 219                     return new ThreadGroup(
 220                         group, "InnocuousForkJoinWorkerThreadGroup");
 221                 }});
 222 
 223         InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
 224             super(pool,
 225                   ClassLoader.getSystemClassLoader(),
 226                   innocuousThreadGroup,
 227                   INNOCUOUS_ACC);
 228         }
 229 
 230         @Override // to erase ThreadLocals
 231         void afterTopLevelExec() {
 232             ThreadLocalRandom.eraseThreadLocals(this);
 233         }
 234 
 235         @Override // to silently fail
 236         public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
 237 
 238         @Override // paranoically
 239         public void setContextClassLoader(ClassLoader cl) {
 240             if (cl != null && ClassLoader.getSystemClassLoader() != cl)
 241                 throw new SecurityException("setContextClassLoader");
 242         }
 243     }
 244 }


  18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  19  *
  20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  21  * or visit www.oracle.com if you need additional information or have any
  22  * questions.
  23  */
  24 
  25 /*
  26  * This file is available under and governed by the GNU General Public
  27  * License version 2 only, as published by the Free Software Foundation.
  28  * However, the following notice accompanied the original version of this
  29  * file:
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 

  38 import java.security.AccessController;
  39 import java.security.PrivilegedAction;

  40 
  41 /**
  42  * A thread managed by a {@link ForkJoinPool}, which executes
  43  * {@link ForkJoinTask}s.
  44  * This class is subclassable solely for the sake of adding
  45  * functionality -- there are no overridable methods dealing with
  46  * scheduling or execution.  However, you can override initialization
  47  * and termination methods surrounding the main task processing loop.
  48  * If you do create such a subclass, you will also need to supply a
  49  * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
  50  * {@linkplain ForkJoinPool#ForkJoinPool(int, ForkJoinWorkerThreadFactory,
  51  * UncaughtExceptionHandler, boolean, int, int, int, Predicate, long, TimeUnit)
  52  * use it} in a {@code ForkJoinPool}.
  53  *
  54  * @since 1.7
  55  * @author Doug Lea
  56  */
  57 public class ForkJoinWorkerThread extends Thread {
  58     /*
  59      * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
  60      * ForkJoinTasks. For explanation, see the internal documentation
  61      * of class ForkJoinPool.
  62      *
  63      * This class just maintains links to its pool and WorkQueue.










  64      */
  65 
  66     final ForkJoinPool pool;                // the pool this thread works in
  67     final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
  68 
  69     /**
  70      * Full nonpublic constructor.
  71      */
  72     ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,
  73                          boolean useSystemClassLoader, boolean isInnocuous) {
  74         super(group, null, pool.nextWorkerThreadName(), 0L);
  75         UncaughtExceptionHandler handler = (this.pool = pool).ueh;
  76         this.workQueue = new ForkJoinPool.WorkQueue(this, isInnocuous);
  77         super.setDaemon(true);
  78         if (handler != null)
  79             super.setUncaughtExceptionHandler(handler);
  80         if (useSystemClassLoader)
  81             super.setContextClassLoader(ClassLoader.getSystemClassLoader());
  82     }
  83 
  84     /**
  85      * Creates a ForkJoinWorkerThread operating in the given thread group and
  86      * pool.
  87      *
  88      * @param group if non-null, the thread group for this thread
  89      * @param pool the pool this thread works in
  90      * @throws NullPointerException if pool is null
  91      */
  92     /* TODO: protected */ ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool) {
  93         this(group, pool, false, false);



  94     }
  95 
  96     /**
  97      * Creates a ForkJoinWorkerThread operating in the given pool.
  98      *
  99      * @param pool the pool this thread works in
 100      * @throws NullPointerException if pool is null
 101      */
 102     protected ForkJoinWorkerThread(ForkJoinPool pool) {
 103         this(null, pool, false, false);


















 104     }
 105 
 106     /**
 107      * Returns the pool hosting this thread.
 108      *
 109      * @return the pool
 110      */
 111     public ForkJoinPool getPool() {
 112         return pool;
 113     }
 114 
 115     /**
 116      * Returns the unique index number of this thread in its pool.
 117      * The returned value ranges from zero to the maximum number of
 118      * threads (minus one) that may exist in the pool, and does not
 119      * change during the lifetime of the thread.  This method may be
 120      * useful for applications that track status or collect results
 121      * per-worker-thread rather than per-task.
 122      *
 123      * @return the index number


 138     protected void onStart() {
 139     }
 140 
 141     /**
 142      * Performs cleanup associated with termination of this worker
 143      * thread.  If you override this method, you must invoke
 144      * {@code super.onTermination} at the end of the overridden method.
 145      *
 146      * @param exception the exception causing this thread to abort due
 147      * to an unrecoverable error, or {@code null} if completed normally
 148      */
 149     protected void onTermination(Throwable exception) {
 150     }
 151 
 152     /**
 153      * This method is required to be public, but should never be
 154      * called explicitly. It performs the main run loop to execute
 155      * {@link ForkJoinTask}s.
 156      */
 157     public void run() {

 158         Throwable exception = null;
 159         ForkJoinPool p = pool;
 160         ForkJoinPool.WorkQueue w = workQueue;
 161         if (p != null && w != null) {   // skip on failed initialization
 162             try {
 163                 p.registerWorker(w);
 164                 onStart();
 165                 p.runWorker(w);
 166             } catch (Throwable ex) {
 167                 exception = ex;
 168             } finally {
 169                 try {
 170                     onTermination(exception);
 171                 } catch (Throwable ex) {
 172                     if (exception == null)
 173                         exception = ex;
 174                 } finally {
 175                     p.deregisterWorker(this, exception);

 176                 }
 177             }
 178         }





 179     }
 180 
 181     /**
 182      * A worker thread that has no permissions, is not a member of any
 183      * user-defined ThreadGroup, uses the system class loader as
 184      * thread context class loader, and erases all ThreadLocals after
 185      * running each top-level task.
 186      */
 187     static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
 188         /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
 189         private static final ThreadGroup innocuousThreadGroup =
 190             AccessController.doPrivileged(new PrivilegedAction<>() {
 191                 public ThreadGroup run() {
 192                     ThreadGroup group = Thread.currentThread().getThreadGroup();
 193                     for (ThreadGroup p; (p = group.getParent()) != null; )
 194                         group = p;
 195                     return new ThreadGroup(
 196                         group, "InnocuousForkJoinWorkerThreadGroup");
 197                 }});
 198 
 199         InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
 200             super(innocuousThreadGroup, pool, true, true);








 201         }
 202 
 203         @Override // to silently fail
 204         public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
 205 
 206         @Override // paranoically
 207         public void setContextClassLoader(ClassLoader cl) {
 208             if (cl != null && ClassLoader.getSystemClassLoader() != cl)
 209                 throw new SecurityException("setContextClassLoader");
 210         }
 211     }
 212 }
< prev index next >