9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.*;
30 import java.nio.channels.spi.*;
31 import java.util.*;
32
33 /**
34 * An implementation of Selector for Linux 2.6+ kernels that uses
35 * the epoll event notification facility.
36 */
37 class EPollSelectorImpl
38 extends SelectorImpl
39 {
40 // File descriptors used for interrupt
41 private final int fd0;
42 private final int fd1;
43
44 // The poll object
45 private final EPollArrayWrapper pollWrapper;
46
47 // Maps from file descriptors to keys
48 private final Map<Integer, SelectionKeyImpl> fdToKey;
49
50 // True if this Selector has been closed
51 private volatile boolean closed;
52
53 // Lock for interrupt triggering and clearing
54 private final Object interruptLock = new Object();
55 private boolean interruptTriggered = false;
56
57 /**
58 * Package private constructor called by factory method in
59 * the abstract superclass Selector.
60 */
61 EPollSelectorImpl(SelectorProvider sp) throws IOException {
62 super(sp);
63 long pipeFds = IOUtil.makePipe(false);
64 fd0 = (int) (pipeFds >>> 32);
65 fd1 = (int) pipeFds;
66 try {
67 pollWrapper = new EPollArrayWrapper(fd0, fd1);
68 fdToKey = new HashMap<>();
69 } catch (Throwable t) {
70 try {
71 FileDispatcherImpl.closeIntFD(fd0);
72 } catch (IOException ioe0) {
73 t.addSuppressed(ioe0);
74 }
75 try {
76 FileDispatcherImpl.closeIntFD(fd1);
77 } catch (IOException ioe1) {
78 t.addSuppressed(ioe1);
79 }
80 throw t;
81 }
82 }
83
84 private void ensureOpen() {
85 if (closed)
86 throw new ClosedSelectorException();
87 }
88
89 @Override
90 protected int doSelect(long timeout) throws IOException {
91 ensureOpen();
92 int numEntries;
93 processDeregisterQueue();
94 try {
95 begin();
96 numEntries = pollWrapper.poll(timeout);
97 } finally {
98 end();
99 }
100 processDeregisterQueue();
101 return updateSelectedKeys(numEntries);
102 }
103
104 /**
105 * Update the keys whose fd's have been selected by the epoll.
106 * Add the ready keys to the ready queue.
107 */
108 private int updateSelectedKeys(int numEntries) throws IOException {
109 boolean interrupted = false;
110 int numKeysUpdated = 0;
111 for (int i=0; i<numEntries; i++) {
112 int nextFD = pollWrapper.getDescriptor(i);
113 if (nextFD == fd0) {
114 interrupted = true;
115 } else {
116 SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
117 if (ski != null) {
118 int rOps = pollWrapper.getEventOps(i);
119 if (selectedKeys.contains(ski)) {
120 if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
121 numKeysUpdated++;
122 }
123 } else {
124 ski.channel.translateAndSetReadyOps(rOps, ski);
125 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
126 selectedKeys.add(ski);
127 numKeysUpdated++;
128 }
129 }
130 }
131 }
132 }
133
134 if (interrupted) {
135 clearInterrupt();
136 }
137
138 return numKeysUpdated;
139 }
140
141 @Override
142 protected void implClose() throws IOException {
143 if (closed)
144 return;
145 closed = true;
146
147 // prevent further wakeup
148 synchronized (interruptLock) {
149 interruptTriggered = true;
150 }
151
152 pollWrapper.close();
153 FileDispatcherImpl.closeIntFD(fd0);
154 FileDispatcherImpl.closeIntFD(fd1);
155
156 // Deregister channels
157 Iterator<SelectionKey> i = keys.iterator();
158 while (i.hasNext()) {
159 SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
160 deregister(ski);
161 SelectableChannel selch = ski.channel();
162 if (!selch.isOpen() && !selch.isRegistered())
163 ((SelChImpl)selch).kill();
164 i.remove();
165 }
166 }
167
168 @Override
169 protected void implRegister(SelectionKeyImpl ski) {
170 ensureOpen();
171 SelChImpl ch = ski.channel;
172 int fd = Integer.valueOf(ch.getFDVal());
173 fdToKey.put(fd, ski);
174 pollWrapper.add(fd);
175 keys.add(ski);
176 }
177
178 @Override
179 protected void implDereg(SelectionKeyImpl ski) throws IOException {
180 assert (ski.getIndex() >= 0);
181 SelChImpl ch = ski.channel;
182 int fd = ch.getFDVal();
183 fdToKey.remove(Integer.valueOf(fd));
184 pollWrapper.remove(fd);
185 ski.setIndex(-1);
186 keys.remove(ski);
187 selectedKeys.remove(ski);
188 deregister(ski);
189 SelectableChannel selch = ski.channel();
190 if (!selch.isOpen() && !selch.isRegistered())
191 ((SelChImpl)selch).kill();
192 }
193
194 @Override
195 public void putEventOps(SelectionKeyImpl ski, int ops) {
196 ensureOpen();
197 SelChImpl ch = ski.channel;
198 pollWrapper.setInterest(ch.getFDVal(), ops);
199 }
200
201 @Override
202 public Selector wakeup() {
203 synchronized (interruptLock) {
204 if (!interruptTriggered) {
205 pollWrapper.interrupt();
206 interruptTriggered = true;
207 }
208 }
209 return this;
210 }
211
212 private void clearInterrupt() throws IOException {
213 synchronized (interruptLock) {
214 IOUtil.drain(fd0);
215 interruptTriggered = false;
216 }
217 }
218 }
|
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.spi.SelectorProvider;
34 import java.util.ArrayDeque;
35 import java.util.BitSet;
36 import java.util.Deque;
37 import java.util.HashMap;
38 import java.util.Iterator;
39 import java.util.Map;
40 import java.util.concurrent.TimeUnit;
41
42 import static sun.nio.ch.EPoll.EPOLLIN;
43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
44 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
45 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
46
47
48 /**
49 * Linux epoll based Selector implementation
50 */
51
52 class EPollSelectorImpl extends SelectorImpl {
53
54 // maximum number of events to poll in one call to epoll_wait
55 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
56
57 // epoll file descriptor
58 private final int epfd;
59
60 // address of poll array when polling with epoll_wait
61 private final long pollArrayAddress;
62
63 // file descriptors used for interrupt
64 private final int fd0;
65 private final int fd1;
66
67 // maps file descriptor to selection key, synchronize on selector
68 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
69
70 // file descriptors registered with epoll, synchronize on selector
71 private final BitSet registered = new BitSet();
72
73 // pending new registrations/updates, queued by implRegister and putEventOps
74 private final Object updateLock = new Object();
75 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
76 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
77 private final Deque<Integer> updateOps = new ArrayDeque<>();
78
79 // interrupt triggering and clearing
80 private final Object interruptLock = new Object();
81 private boolean interruptTriggered;
82
83 /**
84 * Package private constructor called by factory method in
85 * the abstract superclass Selector.
86 */
87 EPollSelectorImpl(SelectorProvider sp) throws IOException {
88 super(sp);
89
90 this.epfd = EPoll.create();
91 this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
92
93 try {
94 long fds = IOUtil.makePipe(false);
95 this.fd0 = (int) (fds >>> 32);
96 this.fd1 = (int) fds;
97 } catch (IOException ioe) {
98 EPoll.freePollArray(pollArrayAddress);
99 FileDispatcherImpl.closeIntFD(epfd);
100 throw ioe;
101 }
102
103 // register one end of the socket pair for wakeups
104 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
105 }
106
107 private void ensureOpen() {
108 if (!isOpen())
109 throw new ClosedSelectorException();
110 }
111
112 @Override
113 protected int doSelect(long timeout) throws IOException {
114 assert Thread.holdsLock(this);
115
116 int numEntries;
117 processUpdateQueue();
118 processDeregisterQueue();
119 try {
120 begin();
121
122 // epoll_wait timeout is int
123 int to = (int) Math.min(timeout, Integer.MAX_VALUE);
124 boolean timedPoll = (to > 0);
125 do {
126 long startTime = timedPoll ? System.nanoTime() : 0;
127 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
128 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
129 // timed poll interrupted so need to adjust timeout
130 long adjust = System.nanoTime() - startTime;
131 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
132 if (to <= 0) {
133 // timeout expired so no retry
134 numEntries = 0;
135 }
136 }
137 } while (numEntries == IOStatus.INTERRUPTED);
138 assert IOStatus.check(numEntries);
139
140 } finally {
141 end();
142 }
143 processDeregisterQueue();
144 return updateSelectedKeys(numEntries);
145 }
146
147 /**
148 * Process new registrations and changes to the interest ops.
149 */
150 private void processUpdateQueue() {
151 assert Thread.holdsLock(this);
152
153 synchronized (updateLock) {
154 SelectionKeyImpl ski;
155
156 // new registrations
157 while ((ski = newKeys.pollFirst()) != null) {
158 if (ski.isValid()) {
159 SelChImpl ch = ski.channel;
160 int fd = ch.getFDVal();
161 SelectionKeyImpl previous = fdToKey.put(fd, ski);
162 assert previous == null;
163 assert registered.get(fd) == false;
164 }
165 }
166
167 // changes to interest ops
168 assert updateKeys.size() == updateOps.size();
169 while ((ski = updateKeys.pollFirst()) != null) {
170 int ops = updateOps.pollFirst();
171 int fd = ski.channel.getFDVal();
172 if (ski.isValid() && fdToKey.containsKey(fd)) {
173 if (registered.get(fd)) {
174 if (ops == 0) {
175 // remove from epoll
176 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
177 registered.clear(fd);
178 } else {
179 // modify events
180 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops);
181 }
182 } else if (ops != 0) {
183 // add to epoll
184 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops);
185 registered.set(fd);
186 }
187 }
188 }
189 }
190 }
191
192 /**
193 * Update the keys whose fd's have been selected by the epoll.
194 * Add the ready keys to the ready queue.
195 */
196 private int updateSelectedKeys(int numEntries) throws IOException {
197 assert Thread.holdsLock(this);
198 assert Thread.holdsLock(nioSelectedKeys());
199
200 boolean interrupted = false;
201 int numKeysUpdated = 0;
202 for (int i=0; i<numEntries; i++) {
203 long event = EPoll.getEvent(pollArrayAddress, i);
204 int fd = EPoll.getDescriptor(event);
205 if (fd == fd0) {
206 interrupted = true;
207 } else {
208 SelectionKeyImpl ski = fdToKey.get(fd);
209 if (ski != null) {
210 int rOps = EPoll.getEvents(event);
211 if (selectedKeys.contains(ski)) {
212 if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
213 numKeysUpdated++;
214 }
215 } else {
216 ski.channel.translateAndSetReadyOps(rOps, ski);
217 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
218 selectedKeys.add(ski);
219 numKeysUpdated++;
220 }
221 }
222 }
223 }
224 }
225
226 if (interrupted) {
227 clearInterrupt();
228 }
229
230 return numKeysUpdated;
231 }
232
233 @Override
234 protected void implClose() throws IOException {
235 assert Thread.holdsLock(this);
236 assert Thread.holdsLock(nioKeys());
237
238 // prevent further wakeup
239 synchronized (interruptLock) {
240 interruptTriggered = true;
241 }
242
243 FileDispatcherImpl.closeIntFD(epfd);
244 EPoll.freePollArray(pollArrayAddress);
245
246 FileDispatcherImpl.closeIntFD(fd0);
247 FileDispatcherImpl.closeIntFD(fd1);
248
249 // Deregister channels
250 Iterator<SelectionKey> i = keys.iterator();
251 while (i.hasNext()) {
252 SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
253 deregister(ski);
254 SelectableChannel selch = ski.channel();
255 if (!selch.isOpen() && !selch.isRegistered())
256 ((SelChImpl)selch).kill();
257 i.remove();
258 }
259 }
260
261 @Override
262 protected void implRegister(SelectionKeyImpl ski) {
263 assert Thread.holdsLock(nioKeys());
264 ensureOpen();
265 synchronized (updateLock) {
266 newKeys.addLast(ski);
267 }
268 keys.add(ski);
269 }
270
271 @Override
272 protected void implDereg(SelectionKeyImpl ski) throws IOException {
273 assert !ski.isValid();
274 assert Thread.holdsLock(this);
275 assert Thread.holdsLock(nioKeys());
276 assert Thread.holdsLock(nioSelectedKeys());
277
278 int fd = ski.channel.getFDVal();
279 fdToKey.remove(fd);
280 if (registered.get(fd)) {
281 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
282 registered.clear(fd);
283 }
284
285 selectedKeys.remove(ski);
286 keys.remove(ski);
287
288 // remove from channel's key set
289 deregister(ski);
290
291 SelectableChannel selch = ski.channel();
292 if (!selch.isOpen() && !selch.isRegistered())
293 ((SelChImpl) selch).kill();
294 }
295
296 @Override
297 public void putEventOps(SelectionKeyImpl ski, int ops) {
298 ensureOpen();
299 synchronized (updateLock) {
300 updateOps.addLast(ops); // ops first in case adding the key fails
301 updateKeys.addLast(ski);
302 }
303 }
304
305 @Override
306 public Selector wakeup() {
307 synchronized (interruptLock) {
308 if (!interruptTriggered) {
309 try {
310 IOUtil.write1(fd1, (byte)0);
311 } catch (IOException ioe) {
312 throw new InternalError(ioe);
313 }
314 interruptTriggered = true;
315 }
316 }
317 return this;
318 }
319
320 private void clearInterrupt() throws IOException {
321 synchronized (interruptLock) {
322 IOUtil.drain(fd0);
323 interruptTriggered = false;
324 }
325 }
326 }
|