< prev index next >

src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java

Print this page
8234131: Miscellaneous changes imported from jsr166 CVS 2021-01
Reviewed-by: martin


 204  *
 205  * <dd>New tasks submitted in method {@link #execute(Runnable)} will be
 206  * <em>rejected</em> when the Executor has been shut down, and also when
 207  * the Executor uses finite bounds for both maximum threads and work queue
 208  * capacity, and is saturated.  In either case, the {@code execute} method
 209  * invokes the {@link
 210  * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
 211  * method of its {@link RejectedExecutionHandler}.  Four predefined handler
 212  * policies are provided:
 213  *
 214  * <ol>
 215  *
 216  * <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler
 217  * throws a runtime {@link RejectedExecutionException} upon rejection.
 218  *
 219  * <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
 220  * that invokes {@code execute} itself runs the task. This provides a
 221  * simple feedback control mechanism that will slow down the rate that
 222  * new tasks are submitted.
 223  *
 224  * <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
 225  * cannot be executed is simply dropped.

 226  *
 227  * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
 228  * executor is not shut down, the task at the head of the work queue
 229  * is dropped, and then execution is retried (which can fail again,
 230  * causing this to be repeated.)




 231  *
 232  * </ol>
 233  *
 234  * It is possible to define and use other kinds of {@link
 235  * RejectedExecutionHandler} classes. Doing so requires some care
 236  * especially when policies are designed to work only under particular
 237  * capacity or queuing policies. </dd>
 238  *
 239  * <dt>Hook methods</dt>
 240  *
 241  * <dd>This class provides {@code protected} overridable
 242  * {@link #beforeExecute(Thread, Runnable)} and
 243  * {@link #afterExecute(Runnable, Throwable)} methods that are called
 244  * before and after execution of each task.  These can be used to
 245  * manipulate the execution environment; for example, reinitializing
 246  * ThreadLocals, gathering statistics, or adding log entries.
 247  * Additionally, method {@link #terminated} can be overridden to perform
 248  * any special processing that needs to be done once the Executor has
 249  * fully terminated.
 250  *


 255  * <dt>Queue maintenance</dt>
 256  *
 257  * <dd>Method {@link #getQueue()} allows access to the work queue
 258  * for purposes of monitoring and debugging.  Use of this method for
 259  * any other purpose is strongly discouraged.  Two supplied methods,
 260  * {@link #remove(Runnable)} and {@link #purge} are available to
 261  * assist in storage reclamation when large numbers of queued tasks
 262  * become cancelled.</dd>
 263  *
 264  * <dt>Reclamation</dt>
 265  *
 266  * <dd>A pool that is no longer referenced in a program <em>AND</em>
 267  * has no remaining threads may be reclaimed (garbage collected)
 268  * without being explicitly shutdown. You can configure a pool to
 269  * allow all unused threads to eventually die by setting appropriate
 270  * keep-alive times, using a lower bound of zero core threads and/or
 271  * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
 272  *
 273  * </dl>
 274  *
 275  * <p><b>Extension example</b>. Most extensions of this class
 276  * override one or more of the protected hook methods. For example,
 277  * here is a subclass that adds a simple pause/resume feature:
 278  *
 279  * <pre> {@code
 280  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 281  *   private boolean isPaused;
 282  *   private ReentrantLock pauseLock = new ReentrantLock();
 283  *   private Condition unpaused = pauseLock.newCondition();
 284  *
 285  *   public PausableThreadPoolExecutor(...) { super(...); }
 286  *
 287  *   protected void beforeExecute(Thread t, Runnable r) {
 288  *     super.beforeExecute(t, r);
 289  *     pauseLock.lock();
 290  *     try {
 291  *       while (isPaused) unpaused.await();
 292  *     } catch (InterruptedException ie) {
 293  *       t.interrupt();
 294  *     } finally {
 295  *       pauseLock.unlock();


1132                     } catch (Throwable ex) {
1133                         afterExecute(task, ex);
1134                         throw ex;
1135                     }
1136                 } finally {
1137                     task = null;
1138                     w.completedTasks++;
1139                     w.unlock();
1140                 }
1141             }
1142             completedAbruptly = false;
1143         } finally {
1144             processWorkerExit(w, completedAbruptly);
1145         }
1146     }
1147 
1148     // Public constructors and methods
1149 
1150     /**
1151      * Creates a new {@code ThreadPoolExecutor} with the given initial
1152      * parameters, the default thread factory and the default rejected
1153      * execution handler.


1154      *
1155      * <p>It may be more convenient to use one of the {@link Executors}
1156      * factory methods instead of this general purpose constructor.
1157      *
1158      * @param corePoolSize the number of threads to keep in the pool, even
1159      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1160      * @param maximumPoolSize the maximum number of threads to allow in the
1161      *        pool
1162      * @param keepAliveTime when the number of threads is greater than
1163      *        the core, this is the maximum time that excess idle threads
1164      *        will wait for new tasks before terminating.
1165      * @param unit the time unit for the {@code keepAliveTime} argument
1166      * @param workQueue the queue to use for holding tasks before they are
1167      *        executed.  This queue will hold only the {@code Runnable}
1168      *        tasks submitted by the {@code execute} method.
1169      * @throws IllegalArgumentException if one of the following holds:<br>
1170      *         {@code corePoolSize < 0}<br>
1171      *         {@code keepAliveTime < 0}<br>
1172      *         {@code maximumPoolSize <= 0}<br>
1173      *         {@code maximumPoolSize < corePoolSize}
1174      * @throws NullPointerException if {@code workQueue} is null
1175      */
1176     public ThreadPoolExecutor(int corePoolSize,
1177                               int maximumPoolSize,
1178                               long keepAliveTime,
1179                               TimeUnit unit,
1180                               BlockingQueue<Runnable> workQueue) {
1181         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1182              Executors.defaultThreadFactory(), defaultHandler);
1183     }
1184 
1185     /**
1186      * Creates a new {@code ThreadPoolExecutor} with the given initial
1187      * parameters and {@linkplain ThreadPoolExecutor.AbortPolicy
1188      * default rejected execution handler}.
1189      *
1190      * @param corePoolSize the number of threads to keep in the pool, even
1191      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1192      * @param maximumPoolSize the maximum number of threads to allow in the
1193      *        pool
1194      * @param keepAliveTime when the number of threads is greater than
1195      *        the core, this is the maximum time that excess idle threads
1196      *        will wait for new tasks before terminating.
1197      * @param unit the time unit for the {@code keepAliveTime} argument
1198      * @param workQueue the queue to use for holding tasks before they are
1199      *        executed.  This queue will hold only the {@code Runnable}
1200      *        tasks submitted by the {@code execute} method.
1201      * @param threadFactory the factory to use when the executor
1202      *        creates a new thread
1203      * @throws IllegalArgumentException if one of the following holds:<br>
1204      *         {@code corePoolSize < 0}<br>
1205      *         {@code keepAliveTime < 0}<br>
1206      *         {@code maximumPoolSize <= 0}<br>
1207      *         {@code maximumPoolSize < corePoolSize}
1208      * @throws NullPointerException if {@code workQueue}
1209      *         or {@code threadFactory} is null
1210      */
1211     public ThreadPoolExecutor(int corePoolSize,
1212                               int maximumPoolSize,
1213                               long keepAliveTime,
1214                               TimeUnit unit,
1215                               BlockingQueue<Runnable> workQueue,
1216                               ThreadFactory threadFactory) {
1217         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1218              threadFactory, defaultHandler);
1219     }
1220 
1221     /**
1222      * Creates a new {@code ThreadPoolExecutor} with the given initial
1223      * parameters and
1224      * {@linkplain Executors#defaultThreadFactory default thread factory}.
1225      *
1226      * @param corePoolSize the number of threads to keep in the pool, even
1227      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1228      * @param maximumPoolSize the maximum number of threads to allow in the
1229      *        pool
1230      * @param keepAliveTime when the number of threads is greater than
1231      *        the core, this is the maximum time that excess idle threads
1232      *        will wait for new tasks before terminating.
1233      * @param unit the time unit for the {@code keepAliveTime} argument
1234      * @param workQueue the queue to use for holding tasks before they are
1235      *        executed.  This queue will hold only the {@code Runnable}
1236      *        tasks submitted by the {@code execute} method.
1237      * @param handler the handler to use when execution is blocked
1238      *        because the thread bounds and queue capacities are reached
1239      * @throws IllegalArgumentException if one of the following holds:<br>
1240      *         {@code corePoolSize < 0}<br>
1241      *         {@code keepAliveTime < 0}<br>
1242      *         {@code maximumPoolSize <= 0}<br>
1243      *         {@code maximumPoolSize < corePoolSize}


2064      */
2065     public static class DiscardPolicy implements RejectedExecutionHandler {
2066         /**
2067          * Creates a {@code DiscardPolicy}.
2068          */
2069         public DiscardPolicy() { }
2070 
2071         /**
2072          * Does nothing, which has the effect of discarding task r.
2073          *
2074          * @param r the runnable task requested to be executed
2075          * @param e the executor attempting to execute this task
2076          */
2077         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2078         }
2079     }
2080 
2081     /**
2082      * A handler for rejected tasks that discards the oldest unhandled
2083      * request and then retries {@code execute}, unless the executor
2084      * is shut down, in which case the task is discarded.













2085      */
2086     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2087         /**
2088          * Creates a {@code DiscardOldestPolicy} for the given executor.
2089          */
2090         public DiscardOldestPolicy() { }
2091 
2092         /**
2093          * Obtains and ignores the next task that the executor
2094          * would otherwise execute, if one is immediately available,
2095          * and then retries execution of task r, unless the executor
2096          * is shut down, in which case task r is instead discarded.
2097          *
2098          * @param r the runnable task requested to be executed
2099          * @param e the executor attempting to execute this task
2100          */
2101         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2102             if (!e.isShutdown()) {
2103                 e.getQueue().poll();
2104                 e.execute(r);


 204  *
 205  * <dd>New tasks submitted in method {@link #execute(Runnable)} will be
 206  * <em>rejected</em> when the Executor has been shut down, and also when
 207  * the Executor uses finite bounds for both maximum threads and work queue
 208  * capacity, and is saturated.  In either case, the {@code execute} method
 209  * invokes the {@link
 210  * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
 211  * method of its {@link RejectedExecutionHandler}.  Four predefined handler
 212  * policies are provided:
 213  *
 214  * <ol>
 215  *
 216  * <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler
 217  * throws a runtime {@link RejectedExecutionException} upon rejection.
 218  *
 219  * <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
 220  * that invokes {@code execute} itself runs the task. This provides a
 221  * simple feedback control mechanism that will slow down the rate that
 222  * new tasks are submitted.
 223  *
 224  * <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that cannot
 225  * be executed is simply dropped. This policy is designed only for
 226  * those rare cases in which task completion is never relied upon.
 227  *
 228  * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
 229  * executor is not shut down, the task at the head of the work queue
 230  * is dropped, and then execution is retried (which can fail again,
 231  * causing this to be repeated.) This policy is rarely acceptable.  In
 232  * nearly all cases, you should also cancel the task to cause an
 233  * exception in any component waiting for its completion, and/or log
 234  * the failure, as illustrated in {@link
 235  * ThreadPoolExecutor.DiscardOldestPolicy} documentation.
 236  *
 237  * </ol>
 238  *
 239  * It is possible to define and use other kinds of {@link
 240  * RejectedExecutionHandler} classes. Doing so requires some care
 241  * especially when policies are designed to work only under particular
 242  * capacity or queuing policies. </dd>
 243  *
 244  * <dt>Hook methods</dt>
 245  *
 246  * <dd>This class provides {@code protected} overridable
 247  * {@link #beforeExecute(Thread, Runnable)} and
 248  * {@link #afterExecute(Runnable, Throwable)} methods that are called
 249  * before and after execution of each task.  These can be used to
 250  * manipulate the execution environment; for example, reinitializing
 251  * ThreadLocals, gathering statistics, or adding log entries.
 252  * Additionally, method {@link #terminated} can be overridden to perform
 253  * any special processing that needs to be done once the Executor has
 254  * fully terminated.
 255  *


 260  * <dt>Queue maintenance</dt>
 261  *
 262  * <dd>Method {@link #getQueue()} allows access to the work queue
 263  * for purposes of monitoring and debugging.  Use of this method for
 264  * any other purpose is strongly discouraged.  Two supplied methods,
 265  * {@link #remove(Runnable)} and {@link #purge} are available to
 266  * assist in storage reclamation when large numbers of queued tasks
 267  * become cancelled.</dd>
 268  *
 269  * <dt>Reclamation</dt>
 270  *
 271  * <dd>A pool that is no longer referenced in a program <em>AND</em>
 272  * has no remaining threads may be reclaimed (garbage collected)
 273  * without being explicitly shutdown. You can configure a pool to
 274  * allow all unused threads to eventually die by setting appropriate
 275  * keep-alive times, using a lower bound of zero core threads and/or
 276  * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
 277  *
 278  * </dl>
 279  *
 280  * <p><b>Extension example.</b> Most extensions of this class
 281  * override one or more of the protected hook methods. For example,
 282  * here is a subclass that adds a simple pause/resume feature:
 283  *
 284  * <pre> {@code
 285  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 286  *   private boolean isPaused;
 287  *   private ReentrantLock pauseLock = new ReentrantLock();
 288  *   private Condition unpaused = pauseLock.newCondition();
 289  *
 290  *   public PausableThreadPoolExecutor(...) { super(...); }
 291  *
 292  *   protected void beforeExecute(Thread t, Runnable r) {
 293  *     super.beforeExecute(t, r);
 294  *     pauseLock.lock();
 295  *     try {
 296  *       while (isPaused) unpaused.await();
 297  *     } catch (InterruptedException ie) {
 298  *       t.interrupt();
 299  *     } finally {
 300  *       pauseLock.unlock();


1137                     } catch (Throwable ex) {
1138                         afterExecute(task, ex);
1139                         throw ex;
1140                     }
1141                 } finally {
1142                     task = null;
1143                     w.completedTasks++;
1144                     w.unlock();
1145                 }
1146             }
1147             completedAbruptly = false;
1148         } finally {
1149             processWorkerExit(w, completedAbruptly);
1150         }
1151     }
1152 
1153     // Public constructors and methods
1154 
1155     /**
1156      * Creates a new {@code ThreadPoolExecutor} with the given initial
1157      * parameters, the
1158      * {@linkplain Executors#defaultThreadFactory default thread factory}
1159      * and the {@linkplain ThreadPoolExecutor.AbortPolicy
1160      * default rejected execution handler}.
1161      *
1162      * <p>It may be more convenient to use one of the {@link Executors}
1163      * factory methods instead of this general purpose constructor.
1164      *
1165      * @param corePoolSize the number of threads to keep in the pool, even
1166      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1167      * @param maximumPoolSize the maximum number of threads to allow in the
1168      *        pool
1169      * @param keepAliveTime when the number of threads is greater than
1170      *        the core, this is the maximum time that excess idle threads
1171      *        will wait for new tasks before terminating.
1172      * @param unit the time unit for the {@code keepAliveTime} argument
1173      * @param workQueue the queue to use for holding tasks before they are
1174      *        executed.  This queue will hold only the {@code Runnable}
1175      *        tasks submitted by the {@code execute} method.
1176      * @throws IllegalArgumentException if one of the following holds:<br>
1177      *         {@code corePoolSize < 0}<br>
1178      *         {@code keepAliveTime < 0}<br>
1179      *         {@code maximumPoolSize <= 0}<br>
1180      *         {@code maximumPoolSize < corePoolSize}
1181      * @throws NullPointerException if {@code workQueue} is null
1182      */
1183     public ThreadPoolExecutor(int corePoolSize,
1184                               int maximumPoolSize,
1185                               long keepAliveTime,
1186                               TimeUnit unit,
1187                               BlockingQueue<Runnable> workQueue) {
1188         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1189              Executors.defaultThreadFactory(), defaultHandler);
1190     }
1191 
1192     /**
1193      * Creates a new {@code ThreadPoolExecutor} with the given initial
1194      * parameters and the {@linkplain ThreadPoolExecutor.AbortPolicy
1195      * default rejected execution handler}.
1196      *
1197      * @param corePoolSize the number of threads to keep in the pool, even
1198      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1199      * @param maximumPoolSize the maximum number of threads to allow in the
1200      *        pool
1201      * @param keepAliveTime when the number of threads is greater than
1202      *        the core, this is the maximum time that excess idle threads
1203      *        will wait for new tasks before terminating.
1204      * @param unit the time unit for the {@code keepAliveTime} argument
1205      * @param workQueue the queue to use for holding tasks before they are
1206      *        executed.  This queue will hold only the {@code Runnable}
1207      *        tasks submitted by the {@code execute} method.
1208      * @param threadFactory the factory to use when the executor
1209      *        creates a new thread
1210      * @throws IllegalArgumentException if one of the following holds:<br>
1211      *         {@code corePoolSize < 0}<br>
1212      *         {@code keepAliveTime < 0}<br>
1213      *         {@code maximumPoolSize <= 0}<br>
1214      *         {@code maximumPoolSize < corePoolSize}
1215      * @throws NullPointerException if {@code workQueue}
1216      *         or {@code threadFactory} is null
1217      */
1218     public ThreadPoolExecutor(int corePoolSize,
1219                               int maximumPoolSize,
1220                               long keepAliveTime,
1221                               TimeUnit unit,
1222                               BlockingQueue<Runnable> workQueue,
1223                               ThreadFactory threadFactory) {
1224         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1225              threadFactory, defaultHandler);
1226     }
1227 
1228     /**
1229      * Creates a new {@code ThreadPoolExecutor} with the given initial
1230      * parameters and the
1231      * {@linkplain Executors#defaultThreadFactory default thread factory}.
1232      *
1233      * @param corePoolSize the number of threads to keep in the pool, even
1234      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1235      * @param maximumPoolSize the maximum number of threads to allow in the
1236      *        pool
1237      * @param keepAliveTime when the number of threads is greater than
1238      *        the core, this is the maximum time that excess idle threads
1239      *        will wait for new tasks before terminating.
1240      * @param unit the time unit for the {@code keepAliveTime} argument
1241      * @param workQueue the queue to use for holding tasks before they are
1242      *        executed.  This queue will hold only the {@code Runnable}
1243      *        tasks submitted by the {@code execute} method.
1244      * @param handler the handler to use when execution is blocked
1245      *        because the thread bounds and queue capacities are reached
1246      * @throws IllegalArgumentException if one of the following holds:<br>
1247      *         {@code corePoolSize < 0}<br>
1248      *         {@code keepAliveTime < 0}<br>
1249      *         {@code maximumPoolSize <= 0}<br>
1250      *         {@code maximumPoolSize < corePoolSize}


2071      */
2072     public static class DiscardPolicy implements RejectedExecutionHandler {
2073         /**
2074          * Creates a {@code DiscardPolicy}.
2075          */
2076         public DiscardPolicy() { }
2077 
2078         /**
2079          * Does nothing, which has the effect of discarding task r.
2080          *
2081          * @param r the runnable task requested to be executed
2082          * @param e the executor attempting to execute this task
2083          */
2084         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2085         }
2086     }
2087 
2088     /**
2089      * A handler for rejected tasks that discards the oldest unhandled
2090      * request and then retries {@code execute}, unless the executor
2091      * is shut down, in which case the task is discarded. This policy is
2092      * rarely useful in cases where other threads may be waiting for
2093      * tasks to terminate, or failures must be recorded. Instead consider
2094      * using a handler of the form:
2095      * <pre> {@code
2096      * new RejectedExecutionHandler() {
2097      *   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2098      *     Runnable dropped = e.getQueue().poll();
2099      *     if (dropped instanceof Future<?>) {
2100      *       ((Future<?>)dropped).cancel(false);
2101      *       // also consider logging the failure
2102      *     }
2103      *     e.execute(r);  // retry
2104      * }}}</pre>
2105      */
2106     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2107         /**
2108          * Creates a {@code DiscardOldestPolicy} for the given executor.
2109          */
2110         public DiscardOldestPolicy() { }
2111 
2112         /**
2113          * Obtains and ignores the next task that the executor
2114          * would otherwise execute, if one is immediately available,
2115          * and then retries execution of task r, unless the executor
2116          * is shut down, in which case task r is instead discarded.
2117          *
2118          * @param r the runnable task requested to be executed
2119          * @param e the executor attempting to execute this task
2120          */
2121         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2122             if (!e.isShutdown()) {
2123                 e.getQueue().poll();
2124                 e.execute(r);
< prev index next >