59
60 // associated Executor for timeouts
61 private ScheduledThreadPoolExecutor timeoutExecutor;
62
63 // task queue for when using a fixed thread pool. In that case, thread
64 // waiting on I/O events must be awokon to poll tasks from this queue.
65 private final Queue<Runnable> taskQueue;
66
67 // group shutdown
68 private final AtomicBoolean shutdown = new AtomicBoolean();
69 private final Object shutdownNowLock = new Object();
70 private volatile boolean terminateInitiated;
71
72 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
73 ThreadPool pool)
74 {
75 super(provider);
76 this.pool = pool;
77
78 if (pool.isFixedThreadPool()) {
79 taskQueue = new ConcurrentLinkedQueue<Runnable>();
80 } else {
81 taskQueue = null; // not used
82 }
83
84 // use default thread factory as thread should not be visible to
85 // application (it doesn't execute completion handlers).
86 this.timeoutExecutor = (ScheduledThreadPoolExecutor)
87 Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
88 this.timeoutExecutor.setRemoveOnCancelPolicy(true);
89 }
90
91 final ExecutorService executor() {
92 return pool.executor();
93 }
94
95 final boolean isFixedThreadPool() {
96 return pool.isFixedThreadPool();
97 }
98
99 final int fixedThreadCount() {
100 if (isFixedThreadPool()) {
101 return pool.poolSize();
102 } else {
103 return pool.poolSize() + internalThreadCount;
104 }
105 }
106
107 private Runnable bindToGroup(final Runnable task) {
108 final AsynchronousChannelGroupImpl thisGroup = this;
109 return new Runnable() {
110 public void run() {
111 Invoker.bindToGroup(thisGroup);
112 task.run();
113 }
114 };
115 }
116
117 private void startInternalThread(final Runnable task) {
118 AccessController.doPrivileged(new PrivilegedAction<Void>() {
119 @Override
120 public Void run() {
121 // internal threads should not be visible to application so
122 // cannot use user-supplied thread factory
123 ThreadPool.defaultThreadFactory().newThread(task).start();
124 return null;
125 }
126 });
127 }
128
129 protected final void startThreads(Runnable task) {
130 if (!isFixedThreadPool()) {
131 for (int i=0; i<internalThreadCount; i++) {
132 startInternalThread(task);
133 threadCount.incrementAndGet();
134 }
135 }
136 if (pool.poolSize() > 0) {
137 task = bindToGroup(task);
138 try {
229 */
230 abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
231 throws IOException;
232
233 /**
234 * Detaches a foreign channel from this group.
235 */
236 abstract void detachForeignChannel(Object key);
237
238 /**
239 * Closes all channels in the group
240 */
241 abstract void closeAllChannels() throws IOException;
242
243 /**
244 * Shutdown all tasks waiting for I/O events.
245 */
246 abstract void shutdownHandlerTasks();
247
248 private void shutdownExecutors() {
249 AccessController.doPrivileged(new PrivilegedAction<Void>() {
250 public Void run() {
251 pool.executor().shutdown();
252 timeoutExecutor.shutdown();
253 return null;
254 }
255 });
256 }
257
258 @Override
259 public final void shutdown() {
260 if (shutdown.getAndSet(true)) {
261 // already shutdown
262 return;
263 }
264 // if there are channels in the group then shutdown will continue
265 // when the last channel is closed
266 if (!isEmpty()) {
267 return;
268 }
269 // initiate termination (acquire shutdownNowLock to ensure that other
306 public final boolean awaitTermination(long timeout, TimeUnit unit)
307 throws InterruptedException
308 {
309 return pool.executor().awaitTermination(timeout, unit);
310 }
311
312 /**
313 * Executes the given command on one of the channel group's pooled threads.
314 */
315 @Override
316 public final void execute(Runnable task) {
317 SecurityManager sm = System.getSecurityManager();
318 if (sm != null) {
319 // when a security manager is installed then the user's task
320 // must be run with the current calling context
321 final AccessControlContext acc = AccessController.getContext();
322 final Runnable delegate = task;
323 task = new Runnable() {
324 @Override
325 public void run() {
326 AccessController.doPrivileged(new PrivilegedAction<Void>() {
327 @Override
328 public Void run() {
329 delegate.run();
330 return null;
331 }
332 }, acc);
333 }
334 };
335 }
336 executeOnPooledThread(task);
337 }
338 }
|
59
60 // associated Executor for timeouts
61 private ScheduledThreadPoolExecutor timeoutExecutor;
62
63 // task queue for when using a fixed thread pool. In that case, thread
64 // waiting on I/O events must be awokon to poll tasks from this queue.
65 private final Queue<Runnable> taskQueue;
66
67 // group shutdown
68 private final AtomicBoolean shutdown = new AtomicBoolean();
69 private final Object shutdownNowLock = new Object();
70 private volatile boolean terminateInitiated;
71
72 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
73 ThreadPool pool)
74 {
75 super(provider);
76 this.pool = pool;
77
78 if (pool.isFixedThreadPool()) {
79 taskQueue = new ConcurrentLinkedQueue<>();
80 } else {
81 taskQueue = null; // not used
82 }
83
84 // use default thread factory as thread should not be visible to
85 // application (it doesn't execute completion handlers).
86 this.timeoutExecutor = (ScheduledThreadPoolExecutor)
87 Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
88 this.timeoutExecutor.setRemoveOnCancelPolicy(true);
89 }
90
91 final ExecutorService executor() {
92 return pool.executor();
93 }
94
95 final boolean isFixedThreadPool() {
96 return pool.isFixedThreadPool();
97 }
98
99 final int fixedThreadCount() {
100 if (isFixedThreadPool()) {
101 return pool.poolSize();
102 } else {
103 return pool.poolSize() + internalThreadCount;
104 }
105 }
106
107 private Runnable bindToGroup(final Runnable task) {
108 final AsynchronousChannelGroupImpl thisGroup = this;
109 return new Runnable() {
110 public void run() {
111 Invoker.bindToGroup(thisGroup);
112 task.run();
113 }
114 };
115 }
116
117 private void startInternalThread(final Runnable task) {
118 AccessController.doPrivileged(new PrivilegedAction<>() {
119 @Override
120 public Void run() {
121 // internal threads should not be visible to application so
122 // cannot use user-supplied thread factory
123 ThreadPool.defaultThreadFactory().newThread(task).start();
124 return null;
125 }
126 });
127 }
128
129 protected final void startThreads(Runnable task) {
130 if (!isFixedThreadPool()) {
131 for (int i=0; i<internalThreadCount; i++) {
132 startInternalThread(task);
133 threadCount.incrementAndGet();
134 }
135 }
136 if (pool.poolSize() > 0) {
137 task = bindToGroup(task);
138 try {
229 */
230 abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
231 throws IOException;
232
233 /**
234 * Detaches a foreign channel from this group.
235 */
236 abstract void detachForeignChannel(Object key);
237
238 /**
239 * Closes all channels in the group
240 */
241 abstract void closeAllChannels() throws IOException;
242
243 /**
244 * Shutdown all tasks waiting for I/O events.
245 */
246 abstract void shutdownHandlerTasks();
247
248 private void shutdownExecutors() {
249 AccessController.doPrivileged(new PrivilegedAction<>() {
250 public Void run() {
251 pool.executor().shutdown();
252 timeoutExecutor.shutdown();
253 return null;
254 }
255 });
256 }
257
258 @Override
259 public final void shutdown() {
260 if (shutdown.getAndSet(true)) {
261 // already shutdown
262 return;
263 }
264 // if there are channels in the group then shutdown will continue
265 // when the last channel is closed
266 if (!isEmpty()) {
267 return;
268 }
269 // initiate termination (acquire shutdownNowLock to ensure that other
306 public final boolean awaitTermination(long timeout, TimeUnit unit)
307 throws InterruptedException
308 {
309 return pool.executor().awaitTermination(timeout, unit);
310 }
311
312 /**
313 * Executes the given command on one of the channel group's pooled threads.
314 */
315 @Override
316 public final void execute(Runnable task) {
317 SecurityManager sm = System.getSecurityManager();
318 if (sm != null) {
319 // when a security manager is installed then the user's task
320 // must be run with the current calling context
321 final AccessControlContext acc = AccessController.getContext();
322 final Runnable delegate = task;
323 task = new Runnable() {
324 @Override
325 public void run() {
326 AccessController.doPrivileged(new PrivilegedAction<>() {
327 @Override
328 public Void run() {
329 delegate.run();
330 return null;
331 }
332 }, acc);
333 }
334 };
335 }
336 executeOnPooledThread(task);
337 }
338 }
|