1 /* 2 * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.spi.AsynchronousChannelProvider; 29 import java.io.IOException; 30 import java.util.concurrent.ArrayBlockingQueue; 31 import java.util.concurrent.RejectedExecutionException; 32 import java.util.concurrent.atomic.AtomicInteger; 33 import static sun.nio.ch.KQueue.*; 34 35 /** 36 * AsynchronousChannelGroup implementation based on the BSD kqueue facility. 37 */ 38 39 final class KQueuePort 40 extends Port 41 { 42 // maximum number of events to poll at a time 43 private static final int MAX_KEVENTS_TO_POLL = 512; 44 45 // kqueue file descriptor 46 private final int kqfd; 47 48 // true if kqueue closed 49 private boolean closed; 50 51 // socket pair used for wakeup 52 private final int sp[]; 53 54 // number of wakeups pending 55 private final AtomicInteger wakeupCount = new AtomicInteger(); 56 57 // address of the poll array passed to kqueue_wait 58 private final long address; 59 60 // encapsulates an event for a channel 61 static class Event { 62 final PollableChannel channel; 63 final int events; 64 65 Event(PollableChannel channel, int events) { 66 this.channel = channel; 67 this.events = events; 68 } 69 70 PollableChannel channel() { return channel; } 71 int events() { return events; } 72 } 73 74 // queue of events for cases that a polling thread dequeues more than one 75 // event 76 private final ArrayBlockingQueue<Event> queue; 77 private final Event NEED_TO_POLL = new Event(null, 0); 78 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); 79 80 KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool) 81 throws IOException 82 { 83 super(provider, pool); 84 85 // open kqueue 86 this.kqfd = kqueue(); 87 88 // create socket pair for wakeup mechanism 89 int[] sv = new int[2]; 90 try { 91 socketpair(sv); 92 93 // register one end with kqueue 94 keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD); 95 } catch (IOException x) { 96 close0(kqfd); 97 throw x; 98 } 99 this.sp = sv; 100 101 // allocate the poll array 102 this.address = allocatePollArray(MAX_KEVENTS_TO_POLL); 103 104 // create the queue and offer the special event to ensure that the first 105 // threads polls 106 this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL); 107 this.queue.offer(NEED_TO_POLL); 108 } 109 110 KQueuePort start() { 111 startThreads(new EventHandlerTask()); 112 return this; 113 } 114 115 /** 116 * Release all resources 117 */ 118 private void implClose() { 119 synchronized (this) { 120 if (closed) 121 return; 122 closed = true; 123 } 124 freePollArray(address); 125 close0(sp[0]); 126 close0(sp[1]); 127 close0(kqfd); 128 } 129 130 private void wakeup() { 131 if (wakeupCount.incrementAndGet() == 1) { 132 // write byte to socketpair to force wakeup 133 try { 134 interrupt(sp[1]); 135 } catch (IOException x) { 136 throw new AssertionError(x); 137 } 138 } 139 } 140 141 @Override 142 void executeOnHandlerTask(Runnable task) { 143 synchronized (this) { 144 if (closed) 145 throw new RejectedExecutionException(); 146 offerTask(task); 147 wakeup(); 148 } 149 } 150 151 @Override 152 void shutdownHandlerTasks() { 153 /* 154 * If no tasks are running then just release resources; otherwise 155 * write to the one end of the socketpair to wakeup any polling threads. 156 */ 157 int nThreads = threadCount(); 158 if (nThreads == 0) { 159 implClose(); 160 } else { 161 // send interrupt to each thread 162 while (nThreads-- > 0) { 163 wakeup(); 164 } 165 } 166 } 167 168 // invoked by clients to register a file descriptor 169 @Override 170 void startPoll(int fd, int events) { 171 // We use a separate filter for read and write events. 172 // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here. 173 int err = 0; 174 int flags = (EV_ADD|EV_ONESHOT); 175 if ((events & Net.POLLIN) > 0) 176 err = keventRegister(kqfd, fd, EVFILT_READ, flags); 177 if (err == 0 && (events & Net.POLLOUT) > 0) 178 err = keventRegister(kqfd, fd, EVFILT_WRITE, flags); 179 if (err != 0) 180 throw new InternalError("kevent failed: " + err); // should not happen 181 } 182 183 /* 184 * Task to process events from kqueue and dispatch to the channel's 185 * onEvent handler. 186 * 187 * Events are retrieved from kqueue in batch and offered to a BlockingQueue 188 * where they are consumed by handler threads. A special "NEED_TO_POLL" 189 * event is used to signal one consumer to re-poll when all events have 190 * been consumed. 191 */ 192 private class EventHandlerTask implements Runnable { 193 private Event poll() throws IOException { 194 try { 195 for (;;) { 196 int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL); 197 /* 198 * 'n' events have been read. Here we map them to their 199 * corresponding channel in batch and queue n-1 so that 200 * they can be handled by other handler threads. The last 201 * event is handled by this thread (and so is not queued). 202 */ 203 fdToChannelLock.readLock().lock(); 204 try { 205 while (n-- > 0) { 206 long keventAddress = getEvent(address, n); 207 int fd = getDescriptor(keventAddress); 208 209 // wakeup 210 if (fd == sp[0]) { 211 if (wakeupCount.decrementAndGet() == 0) { 212 // no more wakeups so drain pipe 213 drain1(sp[0]); 214 } 215 216 // queue special event if there are more events 217 // to handle. 218 if (n > 0) { 219 queue.offer(EXECUTE_TASK_OR_SHUTDOWN); 220 continue; 221 } 222 return EXECUTE_TASK_OR_SHUTDOWN; 223 } 224 225 PollableChannel channel = fdToChannel.get(fd); 226 if (channel != null) { 227 int filter = getFilter(keventAddress); 228 int events = 0; 229 if (filter == EVFILT_READ) 230 events = Net.POLLIN; 231 else if (filter == EVFILT_WRITE) 232 events = Net.POLLOUT; 233 234 Event ev = new Event(channel, events); 235 236 // n-1 events are queued; This thread handles 237 // the last one except for the wakeup 238 if (n > 0) { 239 queue.offer(ev); 240 } else { 241 return ev; 242 } 243 } 244 } 245 } finally { 246 fdToChannelLock.readLock().unlock(); 247 } 248 } 249 } finally { 250 // to ensure that some thread will poll when all events have 251 // been consumed 252 queue.offer(NEED_TO_POLL); 253 } 254 } 255 256 public void run() { 257 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 258 Invoker.getGroupAndInvokeCount(); 259 final boolean isPooledThread = (myGroupAndInvokeCount != null); 260 boolean replaceMe = false; 261 Event ev; 262 try { 263 for (;;) { 264 // reset invoke count 265 if (isPooledThread) 266 myGroupAndInvokeCount.resetInvokeCount(); 267 268 try { 269 replaceMe = false; 270 ev = queue.take(); 271 272 // no events and this thread has been "selected" to 273 // poll for more. 274 if (ev == NEED_TO_POLL) { 275 try { 276 ev = poll(); 277 } catch (IOException x) { 278 x.printStackTrace(); 279 return; 280 } 281 } 282 } catch (InterruptedException x) { 283 continue; 284 } 285 286 // handle wakeup to execute task or shutdown 287 if (ev == EXECUTE_TASK_OR_SHUTDOWN) { 288 Runnable task = pollTask(); 289 if (task == null) { 290 // shutdown request 291 return; 292 } 293 // run task (may throw error/exception) 294 replaceMe = true; 295 task.run(); 296 continue; 297 } 298 299 // process event 300 try { 301 ev.channel().onEvent(ev.events(), isPooledThread); 302 } catch (Error x) { 303 replaceMe = true; throw x; 304 } catch (RuntimeException x) { 305 replaceMe = true; throw x; 306 } 307 } 308 } finally { 309 // last handler to exit when shutdown releases resources 310 int remaining = threadExit(this, replaceMe); 311 if (remaining == 0 && isShutdown()) { 312 implClose(); 313 } 314 } 315 } 316 } 317 318 // -- Native methods -- 319 320 private static native void socketpair(int[] sv) throws IOException; 321 322 private static native void interrupt(int fd) throws IOException; 323 324 private static native void drain1(int fd) throws IOException; 325 326 private static native void close0(int fd); 327 328 static { 329 IOUtil.load(); 330 } 331 }