10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.Selector;
31 import java.nio.channels.spi.SelectorProvider;
32 import java.util.ArrayDeque;
33 import java.util.Deque;
34 import java.util.HashMap;
35 import java.util.Map;
36 import java.util.concurrent.TimeUnit;
37
38 import static sun.nio.ch.KQueue.EVFILT_READ;
39 import static sun.nio.ch.KQueue.EVFILT_WRITE;
40 import static sun.nio.ch.KQueue.EV_ADD;
41 import static sun.nio.ch.KQueue.EV_DELETE;
42
43 /**
44 * KQueue based Selector implementation for macOS
45 */
46
47 class KQueueSelectorImpl extends SelectorImpl {
48
49 // maximum number of events to poll in one call to kqueue
50 private static final int MAX_KEVENTS = 256;
51
52 // kqueue file descriptor
53 private final int kqfd;
54
55 // address of poll array (event list) when polling for pending events
56 private final long pollArrayAddress;
83 try {
84 long fds = IOUtil.makePipe(false);
85 this.fd0 = (int) (fds >>> 32);
86 this.fd1 = (int) fds;
87 } catch (IOException ioe) {
88 KQueue.freePollArray(pollArrayAddress);
89 FileDispatcherImpl.closeIntFD(kqfd);
90 throw ioe;
91 }
92
93 // register one end of the socket pair for wakeups
94 KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
95 }
96
97 private void ensureOpen() {
98 if (!isOpen())
99 throw new ClosedSelectorException();
100 }
101
102 @Override
103 protected int doSelect(long timeout) throws IOException {
104 assert Thread.holdsLock(this);
105
106 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
107 boolean blocking = (to != 0);
108 boolean timedPoll = (to > 0);
109
110 int numEntries;
111 processUpdateQueue();
112 processDeregisterQueue();
113 try {
114 begin(blocking);
115
116 do {
117 long startTime = timedPoll ? System.nanoTime() : 0;
118 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
119 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
120 // timed poll interrupted so need to adjust timeout
121 long adjust = System.nanoTime() - startTime;
122 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
123 if (to <= 0) {
124 // timeout expired so no retry
125 numEntries = 0;
126 }
127 }
128 } while (numEntries == IOStatus.INTERRUPTED);
129 assert IOStatus.check(numEntries);
130
131 } finally {
132 end(blocking);
133 }
134 processDeregisterQueue();
135 return updateSelectedKeys(numEntries);
136 }
137
138 /**
139 * Process changes to the interest ops.
140 */
141 private void processUpdateQueue() {
142 assert Thread.holdsLock(this);
143
144 synchronized (updateLock) {
145 SelectionKeyImpl ski;
146 while ((ski = updateKeys.pollFirst()) != null) {
147 if (ski.isValid()) {
148 int fd = ski.getFDVal();
149 // add to fdToKey if needed
150 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
151 assert (previous == null) || (previous == ski);
152
153 int newEvents = ski.translateInterestOps();
154 int registeredEvents = ski.registeredEvents();
155 if (newEvents != registeredEvents) {
163 KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
164 }
165
166 // add or delete interest in write events
167 if ((registeredEvents & Net.POLLOUT) != 0) {
168 if ((newEvents & Net.POLLOUT) == 0) {
169 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
170 }
171 } else if ((newEvents & Net.POLLOUT) != 0) {
172 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
173 }
174
175 ski.registeredEvents(newEvents);
176 }
177 }
178 }
179 }
180 }
181
182 /**
183 * Update the keys of file descriptors that were polled and add them to
184 * the selected-key set.
185 * If the interrupt fd has been selected, drain it and clear the interrupt.
186 */
187 private int updateSelectedKeys(int numEntries) throws IOException {
188 assert Thread.holdsLock(this);
189 assert Thread.holdsLock(nioSelectedKeys());
190
191 int numKeysUpdated = 0;
192 boolean interrupted = false;
193
194 // A file descriptor may be registered with kqueue with more than one
195 // filter and so there may be more than one event for a fd. The poll
196 // count is incremented here and compared against the SelectionKey's
197 // "lastPolled" field. This ensures that the ready ops is updated rather
198 // than replaced when a file descriptor is polled by both the read and
199 // write filter.
200 pollCount++;
201
202 for (int i = 0; i < numEntries; i++) {
203 long kevent = KQueue.getEvent(pollArrayAddress, i);
204 int fd = KQueue.getDescriptor(kevent);
205 if (fd == fd0) {
206 interrupted = true;
207 } else {
208 SelectionKeyImpl ski = fdToKey.get(fd);
209 if (ski != null) {
210 int rOps = 0;
211 short filter = KQueue.getFilter(kevent);
212 if (filter == EVFILT_READ) {
213 rOps |= Net.POLLIN;
214 } else if (filter == EVFILT_WRITE) {
215 rOps |= Net.POLLOUT;
216 }
217
218 if (selectedKeys.contains(ski)) {
219 if (ski.translateAndUpdateReadyOps(rOps)) {
220 // file descriptor may be polled more than once per poll
221 if (ski.lastPolled != pollCount) {
222 numKeysUpdated++;
223 ski.lastPolled = pollCount;
224 }
225 }
226 } else {
227 ski.translateAndSetReadyOps(rOps);
228 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
229 selectedKeys.add(ski);
230 numKeysUpdated++;
231 ski.lastPolled = pollCount;
232 }
233 }
234 }
235 }
236 }
237
238 if (interrupted) {
239 clearInterrupt();
240 }
241 return numKeysUpdated;
242 }
243
244 @Override
245 protected void implClose() throws IOException {
246 assert !isOpen();
247 assert Thread.holdsLock(this);
248
249 // prevent further wakeup
250 synchronized (interruptLock) {
251 interruptTriggered = true;
252 }
253
254 FileDispatcherImpl.closeIntFD(kqfd);
|
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.IOException;
29 import java.nio.channels.ClosedSelectorException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.nio.channels.spi.SelectorProvider;
33 import java.util.ArrayDeque;
34 import java.util.Deque;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Consumer;
39
40 import static sun.nio.ch.KQueue.EVFILT_READ;
41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
42 import static sun.nio.ch.KQueue.EV_ADD;
43 import static sun.nio.ch.KQueue.EV_DELETE;
44
45 /**
46 * KQueue based Selector implementation for macOS
47 */
48
49 class KQueueSelectorImpl extends SelectorImpl {
50
51 // maximum number of events to poll in one call to kqueue
52 private static final int MAX_KEVENTS = 256;
53
54 // kqueue file descriptor
55 private final int kqfd;
56
57 // address of poll array (event list) when polling for pending events
58 private final long pollArrayAddress;
85 try {
86 long fds = IOUtil.makePipe(false);
87 this.fd0 = (int) (fds >>> 32);
88 this.fd1 = (int) fds;
89 } catch (IOException ioe) {
90 KQueue.freePollArray(pollArrayAddress);
91 FileDispatcherImpl.closeIntFD(kqfd);
92 throw ioe;
93 }
94
95 // register one end of the socket pair for wakeups
96 KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
97 }
98
99 private void ensureOpen() {
100 if (!isOpen())
101 throw new ClosedSelectorException();
102 }
103
104 @Override
105 protected int doSelect(Consumer<SelectionKey> action, long timeout)
106 throws IOException
107 {
108 assert Thread.holdsLock(this);
109
110 long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
111 boolean blocking = (to != 0);
112 boolean timedPoll = (to > 0);
113
114 int numEntries;
115 processUpdateQueue();
116 processDeregisterQueue();
117 try {
118 begin(blocking);
119
120 do {
121 long startTime = timedPoll ? System.nanoTime() : 0;
122 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
123 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
124 // timed poll interrupted so need to adjust timeout
125 long adjust = System.nanoTime() - startTime;
126 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
127 if (to <= 0) {
128 // timeout expired so no retry
129 numEntries = 0;
130 }
131 }
132 } while (numEntries == IOStatus.INTERRUPTED);
133 assert IOStatus.check(numEntries);
134
135 } finally {
136 end(blocking);
137 }
138 processDeregisterQueue();
139 return processEvents(numEntries, action);
140 }
141
142 /**
143 * Process changes to the interest ops.
144 */
145 private void processUpdateQueue() {
146 assert Thread.holdsLock(this);
147
148 synchronized (updateLock) {
149 SelectionKeyImpl ski;
150 while ((ski = updateKeys.pollFirst()) != null) {
151 if (ski.isValid()) {
152 int fd = ski.getFDVal();
153 // add to fdToKey if needed
154 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
155 assert (previous == null) || (previous == ski);
156
157 int newEvents = ski.translateInterestOps();
158 int registeredEvents = ski.registeredEvents();
159 if (newEvents != registeredEvents) {
167 KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
168 }
169
170 // add or delete interest in write events
171 if ((registeredEvents & Net.POLLOUT) != 0) {
172 if ((newEvents & Net.POLLOUT) == 0) {
173 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
174 }
175 } else if ((newEvents & Net.POLLOUT) != 0) {
176 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
177 }
178
179 ski.registeredEvents(newEvents);
180 }
181 }
182 }
183 }
184 }
185
186 /**
187 * Process the polled events.
188 * If the interrupt fd has been selected, drain it and clear the interrupt.
189 */
190 private int processEvents(int numEntries, Consumer<SelectionKey> action)
191 throws IOException
192 {
193 assert Thread.holdsLock(this);
194
195 int numKeysUpdated = 0;
196 boolean interrupted = false;
197
198 // A file descriptor may be registered with kqueue with more than one
199 // filter and so there may be more than one event for a fd. The poll
200 // count is incremented here and compared against the SelectionKey's
201 // "lastPolled" field. This ensures that the ready ops is updated rather
202 // than replaced when a file descriptor is polled by both the read and
203 // write filter.
204 pollCount++;
205
206 for (int i = 0; i < numEntries; i++) {
207 long kevent = KQueue.getEvent(pollArrayAddress, i);
208 int fd = KQueue.getDescriptor(kevent);
209 if (fd == fd0) {
210 interrupted = true;
211 } else {
212 SelectionKeyImpl ski = fdToKey.get(fd);
213 if (ski != null) {
214 int rOps = 0;
215 short filter = KQueue.getFilter(kevent);
216 if (filter == EVFILT_READ) {
217 rOps |= Net.POLLIN;
218 } else if (filter == EVFILT_WRITE) {
219 rOps |= Net.POLLOUT;
220 }
221 int updated = processReadyEvents(rOps, ski, action);
222 if (updated > 0 && ski.lastPolled != pollCount) {
223 numKeysUpdated++;
224 ski.lastPolled = pollCount;
225 }
226 }
227 }
228 }
229
230 if (interrupted) {
231 clearInterrupt();
232 }
233 return numKeysUpdated;
234 }
235
236 @Override
237 protected void implClose() throws IOException {
238 assert !isOpen();
239 assert Thread.holdsLock(this);
240
241 // prevent further wakeup
242 synchronized (interruptLock) {
243 interruptTriggered = true;
244 }
245
246 FileDispatcherImpl.closeIntFD(kqfd);
|