1 /* 2 * Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /* 27 * KQueueSelectorImpl.java 28 * Implementation of Selector using FreeBSD / Mac OS X kqueues 29 * Derived from Sun's DevPollSelectorImpl 30 */ 31 32 package sun.nio.ch; 33 34 import java.io.IOException; 35 import java.io.FileDescriptor; 36 import java.nio.channels.*; 37 import java.nio.channels.spi.*; 38 import java.util.*; 39 import java.util.function.Consumer; 40 41 import sun.misc.*; 42 43 class KQueueSelectorImpl 44 extends SelectorImpl 45 { 46 // File descriptors used for interrupt 47 protected int fd0; 48 protected int fd1; 49 50 // The kqueue manipulator 51 KQueueArrayWrapper kqueueWrapper; 52 53 // Count of registered descriptors (including interrupt) 54 private int totalChannels; 55 56 // Map from a file descriptor to an entry containing the selection key 57 private HashMap<Integer,MapEntry> fdMap; 58 59 // True if this Selector has been closed 60 private boolean closed = false; 61 62 // Lock for interrupt triggering and clearing 63 private Object interruptLock = new Object(); 64 private boolean interruptTriggered = false; 65 66 // used by updateSelectedKeys to handle cases where the same file 67 // descriptor is polled by more than one filter 68 private long updateCount; 69 70 // Used to map file descriptors to a selection key and "update count" 71 // (see updateSelectedKeys for usage). 72 private static class MapEntry { 73 SelectionKeyImpl ski; 74 long updateCount; 75 MapEntry(SelectionKeyImpl ski) { 76 this.ski = ski; 77 } 78 } 79 80 /** 81 * Package private constructor called by factory method in 82 * the abstract superclass Selector. 83 */ 84 KQueueSelectorImpl(SelectorProvider sp) { 85 super(sp); 86 long fds = IOUtil.makePipe(false); 87 fd0 = (int)(fds >>> 32); 88 fd1 = (int)fds; 89 kqueueWrapper = new KQueueArrayWrapper(); 90 kqueueWrapper.initInterrupt(fd0, fd1); 91 fdMap = new HashMap<>(); 92 totalChannels = 1; 93 } 94 95 protected int doSelect(long timeout) 96 throws IOException 97 { 98 int entries = pollKqueueWrapper(timeout); 99 return updateSelectedKeys(entries); 100 } 101 102 @Override 103 protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException { 104 int entries = pollKqueueWrapper(timeout); 105 int numKeysUpdated = 0; 106 boolean interrupted = false; 107 108 updateCount++; 109 110 for (int i = 0; i < entries; i++) { 111 int nextFD = kqueueWrapper.getDescriptor(i); 112 if (nextFD == fd0) { 113 interrupted = true; 114 } else { 115 MapEntry me = fdMap.get(Integer.valueOf(nextFD)); 116 // entry is null in the case of an interrupt 117 if (me != null) { 118 int rOps = kqueueWrapper.getReventOps(i); 119 SelectionKeyImpl ski = me.ski; 120 ski.channel.translateAndSetReadyOps(rOps, ski); 121 if (ski.hasOps()) { 122 handler.accept(ski); 123 numKeysUpdated++; 124 me.updateCount = updateCount; 125 } 126 } 127 } 128 } 129 130 clearWakeupPipe(interrupted); 131 return numKeysUpdated; 132 } 133 134 private int pollKqueueWrapper(long timeout) throws IOException { 135 int entries = 0; 136 if (closed) 137 throw new ClosedSelectorException(); 138 processDeregisterQueue(); 139 try { 140 begin(); 141 entries = kqueueWrapper.poll(timeout); 142 } finally { 143 end(); 144 } 145 processDeregisterQueue(); 146 return entries; 147 } 148 149 /** 150 * Update the keys whose fd's have been selected by kqueue. 151 * Add the ready keys to the selected key set. 152 * If the interrupt fd has been selected, drain it and clear the interrupt. 153 */ 154 private int updateSelectedKeys(int entries) 155 throws IOException 156 { 157 int numKeysUpdated = 0; 158 boolean interrupted = false; 159 160 // A file descriptor may be registered with kqueue with more than one 161 // filter and so there may be more than one event for a fd. The update 162 // count in the MapEntry tracks when the fd was last updated and this 163 // ensures that the ready ops are updated rather than replaced by a 164 // second or subsequent event. 165 updateCount++; 166 167 for (int i = 0; i < entries; i++) { 168 int nextFD = kqueueWrapper.getDescriptor(i); 169 if (nextFD == fd0) { 170 interrupted = true; 171 } else { 172 MapEntry me = fdMap.get(Integer.valueOf(nextFD)); 173 174 // entry is null in the case of an interrupt 175 if (me != null) { 176 int rOps = kqueueWrapper.getReventOps(i); 177 SelectionKeyImpl ski = me.ski; 178 if (selectedKeys.contains(ski)) { 179 // first time this file descriptor has been encountered on this 180 // update? 181 if (me.updateCount != updateCount) { 182 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { 183 numKeysUpdated++; 184 me.updateCount = updateCount; 185 } 186 } else { 187 // ready ops have already been set on this update 188 ski.channel.translateAndUpdateReadyOps(rOps, ski); 189 } 190 } else { 191 ski.channel.translateAndSetReadyOps(rOps, ski); 192 if (ski.hasOps()) { 193 selectedKeys.add(ski); 194 numKeysUpdated++; 195 me.updateCount = updateCount; 196 } 197 } 198 } 199 } 200 } 201 202 clearWakeupPipe(interrupted); 203 return numKeysUpdated; 204 } 205 206 private void clearWakeupPipe(boolean interrupted) throws IOException { 207 if (interrupted) { 208 // Clear the wakeup pipe 209 synchronized (interruptLock) { 210 IOUtil.drain(fd0); 211 interruptTriggered = false; 212 } 213 } 214 } 215 216 217 protected void implClose() throws IOException { 218 if (!closed) { 219 closed = true; 220 221 // prevent further wakeup 222 synchronized (interruptLock) { 223 interruptTriggered = true; 224 } 225 226 FileDispatcherImpl.closeIntFD(fd0); 227 FileDispatcherImpl.closeIntFD(fd1); 228 if (kqueueWrapper != null) { 229 kqueueWrapper.close(); 230 kqueueWrapper = null; 231 selectedKeys = null; 232 233 // Deregister channels 234 Iterator<SelectionKey> i = keys.iterator(); 235 while (i.hasNext()) { 236 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); 237 deregister(ski); 238 SelectableChannel selch = ski.channel(); 239 if (!selch.isOpen() && !selch.isRegistered()) 240 ((SelChImpl)selch).kill(); 241 i.remove(); 242 } 243 totalChannels = 0; 244 } 245 fd0 = -1; 246 fd1 = -1; 247 } 248 } 249 250 251 protected void implRegister(SelectionKeyImpl ski) { 252 if (closed) 253 throw new ClosedSelectorException(); 254 int fd = IOUtil.fdVal(ski.channel.getFD()); 255 fdMap.put(Integer.valueOf(fd), new MapEntry(ski)); 256 totalChannels++; 257 keys.add(ski); 258 } 259 260 261 protected void implDereg(SelectionKeyImpl ski) throws IOException { 262 int fd = ski.channel.getFDVal(); 263 fdMap.remove(Integer.valueOf(fd)); 264 kqueueWrapper.release(ski.channel); 265 totalChannels--; 266 keys.remove(ski); 267 selectedKeys.remove(ski); 268 deregister((AbstractSelectionKey)ski); 269 SelectableChannel selch = ski.channel(); 270 if (!selch.isOpen() && !selch.isRegistered()) 271 ((SelChImpl)selch).kill(); 272 } 273 274 275 public void putEventOps(SelectionKeyImpl ski, int ops) { 276 if (closed) 277 throw new ClosedSelectorException(); 278 kqueueWrapper.setInterest(ski.channel, ops); 279 } 280 281 282 public Selector wakeup() { 283 synchronized (interruptLock) { 284 if (!interruptTriggered) { 285 kqueueWrapper.interrupt(); 286 interruptTriggered = true; 287 } 288 } 289 return this; 290 } 291 }