13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.nio.channels.spi.AsynchronousChannelProvider;
29 import java.io.IOException;
30 import java.util.concurrent.ArrayBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import static sun.nio.ch.EPoll.*;
34
35 /**
36 * AsynchronousChannelGroup implementation based on the Linux epoll facility.
37 */
38
39 final class EPollPort
40 extends Port
41 {
42 // maximum number of events to poll at a time
43 private static final int MAX_EPOLL_EVENTS = 512;
44
45 // errors
46 private static final int ENOENT = 2;
47
48 // epoll file descriptor
49 private final int epfd;
50
51 // true if epoll closed
52 private boolean closed;
53
54 // socket pair used for wakeup
55 private final int sp[];
56
57 // number of wakeups pending
58 private final AtomicInteger wakeupCount = new AtomicInteger();
59
60 // address of the poll array passed to epoll_wait
61 private final long address;
62
63 // encapsulates an event for a channel
64 static class Event {
65 final PollableChannel channel;
66 final int events;
67
68 Event(PollableChannel channel, int events) {
69 this.channel = channel;
70 this.events = events;
71 }
72
73 PollableChannel channel() { return channel; }
74 int events() { return events; }
75 }
76
77 // queue of events for cases that a polling thread dequeues more than one
78 // event
79 private final ArrayBlockingQueue<Event> queue;
80 private final Event NEED_TO_POLL = new Event(null, 0);
81 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
82
83 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
84 throws IOException
85 {
86 super(provider, pool);
87
88 // open epoll
89 this.epfd = epollCreate();
90
91 // create socket pair for wakeup mechanism
92 int[] sv = new int[2];
93 try {
94 socketpair(sv);
95 // register one end with epoll
96 epollCtl(epfd, EPOLL_CTL_ADD, sv[0], EPOLLIN);
97 } catch (IOException x) {
98 close0(epfd);
99 throw x;
100 }
101 this.sp = sv;
102
103 // allocate the poll array
104 this.address = allocatePollArray(MAX_EPOLL_EVENTS);
105
106 // create the queue and offer the special event to ensure that the first
107 // threads polls
108 this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
109 this.queue.offer(NEED_TO_POLL);
110 }
111
112 EPollPort start() {
113 startThreads(new EventHandlerTask());
114 return this;
115 }
116
117 /**
118 * Release all resources
119 */
120 private void implClose() {
121 synchronized (this) {
122 if (closed)
123 return;
124 closed = true;
125 }
126 freePollArray(address);
127 close0(sp[0]);
128 close0(sp[1]);
129 close0(epfd);
130 }
131
132 private void wakeup() {
133 if (wakeupCount.incrementAndGet() == 1) {
134 // write byte to socketpair to force wakeup
135 try {
136 interrupt(sp[1]);
137 } catch (IOException x) {
138 throw new AssertionError(x);
139 }
140 }
141 }
142
143 @Override
144 void executeOnHandlerTask(Runnable task) {
145 synchronized (this) {
146 if (closed)
147 throw new RejectedExecutionException();
148 offerTask(task);
149 wakeup();
150 }
151 }
152
153 @Override
154 void shutdownHandlerTasks() {
155 /*
156 * If no tasks are running then just release resources; otherwise
157 * write to the one end of the socketpair to wakeup any polling threads.
158 */
159 int nThreads = threadCount();
160 if (nThreads == 0) {
161 implClose();
162 } else {
163 // send interrupt to each thread
164 while (nThreads-- > 0) {
165 wakeup();
166 }
167 }
168 }
169
170 // invoke by clients to register a file descriptor
171 @Override
172 void startPoll(int fd, int events) {
173 // update events (or add to epoll on first usage)
174 int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
175 if (err == ENOENT)
176 err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
177 if (err != 0)
178 throw new AssertionError(); // should not happen
179 }
180
181 /*
182 * Task to process events from epoll and dispatch to the channel's
183 * onEvent handler.
184 *
185 * Events are retreived from epoll in batch and offered to a BlockingQueue
186 * where they are consumed by handler threads. A special "NEED_TO_POLL"
187 * event is used to signal one consumer to re-poll when all events have
188 * been consumed.
189 */
190 private class EventHandlerTask implements Runnable {
191 private Event poll() throws IOException {
192 try {
193 for (;;) {
194 int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
195 /*
196 * 'n' events have been read. Here we map them to their
197 * corresponding channel in batch and queue n-1 so that
198 * they can be handled by other handler threads. The last
199 * event is handled by this thread (and so is not queued).
200 */
201 fdToChannelLock.readLock().lock();
202 try {
203 while (n-- > 0) {
204 long eventAddress = getEvent(address, n);
205 int fd = getDescriptor(eventAddress);
206
207 // wakeup
208 if (fd == sp[0]) {
209 if (wakeupCount.decrementAndGet() == 0) {
210 // no more wakeups so drain pipe
211 drain1(sp[0]);
212 }
213
214 // queue special event if there are more events
215 // to handle.
216 if (n > 0) {
217 queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
218 continue;
219 }
220 return EXECUTE_TASK_OR_SHUTDOWN;
221 }
222
223 PollableChannel channel = fdToChannel.get(fd);
224 if (channel != null) {
225 int events = getEvents(eventAddress);
226 Event ev = new Event(channel, events);
227
228 // n-1 events are queued; This thread handles
229 // the last one except for the wakeup
230 if (n > 0) {
231 queue.offer(ev);
232 } else {
233 return ev;
234 }
235 }
236 }
237 } finally {
238 fdToChannelLock.readLock().unlock();
239 }
240 }
241 } finally {
242 // to ensure that some thread will poll when all events have
243 // been consumed
244 queue.offer(NEED_TO_POLL);
245 }
289 }
290
291 // process event
292 try {
293 ev.channel().onEvent(ev.events(), isPooledThread);
294 } catch (Error x) {
295 replaceMe = true; throw x;
296 } catch (RuntimeException x) {
297 replaceMe = true; throw x;
298 }
299 }
300 } finally {
301 // last handler to exit when shutdown releases resources
302 int remaining = threadExit(this, replaceMe);
303 if (remaining == 0 && isShutdown()) {
304 implClose();
305 }
306 }
307 }
308 }
309
310 // -- Native methods --
311
312 private static native void socketpair(int[] sv) throws IOException;
313
314 private static native void interrupt(int fd) throws IOException;
315
316 private static native void drain1(int fd) throws IOException;
317
318 private static native void close0(int fd);
319
320 static {
321 IOUtil.load();
322 }
323 }
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.nio.channels.spi.AsynchronousChannelProvider;
29 import java.io.IOException;
30 import java.util.concurrent.ArrayBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import static sun.nio.ch.EPoll.EPOLLIN;
35 import static sun.nio.ch.EPoll.EPOLLONESHOT;
36 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
37 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
38 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
39
40
41 /**
42 * AsynchronousChannelGroup implementation based on the Linux epoll facility.
43 */
44
45 final class EPollPort
46 extends Port
47 {
48 // maximum number of events to poll at a time
49 private static final int MAX_EPOLL_EVENTS = 512;
50
51 // errors
52 private static final int ENOENT = 2;
53
54 // epoll file descriptor
55 private final int epfd;
56
57 // address of the poll array passed to epoll_wait
58 private final long address;
59
60 // true if epoll closed
61 private boolean closed;
62
63 // socket pair used for wakeup
64 private final int sp[];
65
66 // number of wakeups pending
67 private final AtomicInteger wakeupCount = new AtomicInteger();
68
69 // encapsulates an event for a channel
70 static class Event {
71 final PollableChannel channel;
72 final int events;
73
74 Event(PollableChannel channel, int events) {
75 this.channel = channel;
76 this.events = events;
77 }
78
79 PollableChannel channel() { return channel; }
80 int events() { return events; }
81 }
82
83 // queue of events for cases that a polling thread dequeues more than one
84 // event
85 private final ArrayBlockingQueue<Event> queue;
86 private final Event NEED_TO_POLL = new Event(null, 0);
87 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
88
89 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
90 throws IOException
91 {
92 super(provider, pool);
93
94 this.epfd = EPoll.create();
95 this.address = EPoll.allocatePollArray(MAX_EPOLL_EVENTS);
96
97 // create socket pair for wakeup mechanism
98 try {
99 long fds = IOUtil.makePipe(true);
100 this.sp = new int[]{(int) (fds >>> 32), (int) fds};
101 } catch (IOException ioe) {
102 EPoll.freePollArray(address);
103 FileDispatcherImpl.closeIntFD(epfd);
104 throw ioe;
105 }
106
107 // register one end with epoll
108 EPoll.ctl(epfd, EPOLL_CTL_ADD, sp[0], EPOLLIN);
109
110 // create the queue and offer the special event to ensure that the first
111 // threads polls
112 this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
113 this.queue.offer(NEED_TO_POLL);
114 }
115
116 EPollPort start() {
117 startThreads(new EventHandlerTask());
118 return this;
119 }
120
121 /**
122 * Release all resources
123 */
124 private void implClose() {
125 synchronized (this) {
126 if (closed)
127 return;
128 closed = true;
129 }
130 try { FileDispatcherImpl.closeIntFD(epfd); } catch (IOException ioe) { }
131 try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
132 try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
133 EPoll.freePollArray(address);
134 }
135
136 private void wakeup() {
137 if (wakeupCount.incrementAndGet() == 1) {
138 // write byte to socketpair to force wakeup
139 try {
140 IOUtil.write1(sp[1], (byte)0);
141 } catch (IOException x) {
142 throw new AssertionError(x);
143 }
144 }
145 }
146
147 @Override
148 void executeOnHandlerTask(Runnable task) {
149 synchronized (this) {
150 if (closed)
151 throw new RejectedExecutionException();
152 offerTask(task);
153 wakeup();
154 }
155 }
156
157 @Override
158 void shutdownHandlerTasks() {
159 /*
160 * If no tasks are running then just release resources; otherwise
161 * write to the one end of the socketpair to wakeup any polling threads.
162 */
163 int nThreads = threadCount();
164 if (nThreads == 0) {
165 implClose();
166 } else {
167 // send interrupt to each thread
168 while (nThreads-- > 0) {
169 wakeup();
170 }
171 }
172 }
173
174 // invoke by clients to register a file descriptor
175 @Override
176 void startPoll(int fd, int events) {
177 // update events (or add to epoll on first usage)
178 int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
179 if (err == ENOENT)
180 err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
181 if (err != 0)
182 throw new AssertionError(); // should not happen
183 }
184
185 /*
186 * Task to process events from epoll and dispatch to the channel's
187 * onEvent handler.
188 *
189 * Events are retreived from epoll in batch and offered to a BlockingQueue
190 * where they are consumed by handler threads. A special "NEED_TO_POLL"
191 * event is used to signal one consumer to re-poll when all events have
192 * been consumed.
193 */
194 private class EventHandlerTask implements Runnable {
195 private Event poll() throws IOException {
196 try {
197 for (;;) {
198 int n;
199 do {
200 n = EPoll.wait(epfd, address, MAX_EPOLL_EVENTS, -1);
201 } while (n == IOStatus.INTERRUPTED);
202
203 /*
204 * 'n' events have been read. Here we map them to their
205 * corresponding channel in batch and queue n-1 so that
206 * they can be handled by other handler threads. The last
207 * event is handled by this thread (and so is not queued).
208 */
209 fdToChannelLock.readLock().lock();
210 try {
211 while (n-- > 0) {
212 long eventAddress = EPoll.getEvent(address, n);
213 int fd = EPoll.getDescriptor(eventAddress);
214
215 // wakeup
216 if (fd == sp[0]) {
217 if (wakeupCount.decrementAndGet() == 0) {
218 // no more wakeups so drain pipe
219 IOUtil.drain(sp[0]);
220 }
221
222 // queue special event if there are more events
223 // to handle.
224 if (n > 0) {
225 queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
226 continue;
227 }
228 return EXECUTE_TASK_OR_SHUTDOWN;
229 }
230
231 PollableChannel channel = fdToChannel.get(fd);
232 if (channel != null) {
233 int events = EPoll.getEvents(eventAddress);
234 Event ev = new Event(channel, events);
235
236 // n-1 events are queued; This thread handles
237 // the last one except for the wakeup
238 if (n > 0) {
239 queue.offer(ev);
240 } else {
241 return ev;
242 }
243 }
244 }
245 } finally {
246 fdToChannelLock.readLock().unlock();
247 }
248 }
249 } finally {
250 // to ensure that some thread will poll when all events have
251 // been consumed
252 queue.offer(NEED_TO_POLL);
253 }
297 }
298
299 // process event
300 try {
301 ev.channel().onEvent(ev.events(), isPooledThread);
302 } catch (Error x) {
303 replaceMe = true; throw x;
304 } catch (RuntimeException x) {
305 replaceMe = true; throw x;
306 }
307 }
308 } finally {
309 // last handler to exit when shutdown releases resources
310 int remaining = threadExit(this, replaceMe);
311 if (remaining == 0 && isShutdown()) {
312 implClose();
313 }
314 }
315 }
316 }
317 }
|