--- old/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java 2018-03-22 13:26:23.000000000 +0000 +++ new/src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java 2018-03-22 13:26:22.000000000 +0000 @@ -23,11 +23,6 @@ * questions. */ -/* - * KQueueSelectorImpl.java - * Implementation of Selector using FreeBSD / Mac OS X kqueues - */ - package sun.nio.ch; import java.io.IOException; @@ -36,85 +31,111 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayDeque; +import java.util.BitSet; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static sun.nio.ch.KQueue.EVFILT_READ; +import static sun.nio.ch.KQueue.EVFILT_WRITE; +import static sun.nio.ch.KQueue.EV_ADD; +import static sun.nio.ch.KQueue.EV_DELETE; + +/** + * KQueue based Selector implementation for macOS + */ + +class KQueueSelectorImpl extends SelectorImpl { + + // maximum number of events to poll in one call to kqueue + private static final int MAX_KEVENTS = 256; + + // kqueue file descriptor + private final int kqfd; -class KQueueSelectorImpl - extends SelectorImpl -{ - // File descriptors used for interrupt + // address of poll array (event list) when polling for pending events + private final long pollArrayAddress; + + // file descriptors used for interrupt private final int fd0; private final int fd1; - // The kqueue manipulator - private final KQueueArrayWrapper kqueueWrapper; - - // Map from a file descriptor to an entry containing the selection key - private final HashMap fdMap; + // maps file descriptor to selection key, synchronize on selector + private final Map fdToKey = new HashMap<>(); - // True if this Selector has been closed - private boolean closed; + // file descriptors registered with kqueue, synchronize on selector + private final BitSet registeredReadFilter = new BitSet(); + private final BitSet registeredWriteFilter = new BitSet(); + + // pending new registrations/updates, queued by implRegister and putEventOps + private final Object updateLock = new Object(); + private final Deque newKeys = new ArrayDeque<>(); + private final Deque updateKeys = new ArrayDeque<>(); + private final Deque updateOps = new ArrayDeque<>(); - // Lock for interrupt triggering and clearing + // interrupt triggering and clearing private final Object interruptLock = new Object(); private boolean interruptTriggered; // used by updateSelectedKeys to handle cases where the same file // descriptor is polled by more than one filter - private long updateCount; - - // Used to map file descriptors to a selection key and "update count" - // (see updateSelectedKeys for usage). - private static class MapEntry { - SelectionKeyImpl ski; - long updateCount; - MapEntry(SelectionKeyImpl ski) { - this.ski = ski; - } - } + private int pollCount; - /** - * Package private constructor called by factory method in - * the abstract superclass Selector. - */ KQueueSelectorImpl(SelectorProvider sp) throws IOException { super(sp); - long fds = IOUtil.makePipe(false); - fd0 = (int)(fds >>> 32); - fd1 = (int)fds; + + this.kqfd = KQueue.create(); + this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS); + try { - kqueueWrapper = new KQueueArrayWrapper(fd0, fd1); - fdMap = new HashMap<>(); - } catch (Throwable t) { - try { - FileDispatcherImpl.closeIntFD(fd0); - } catch (IOException ioe0) { - t.addSuppressed(ioe0); - } - try { - FileDispatcherImpl.closeIntFD(fd1); - } catch (IOException ioe1) { - t.addSuppressed(ioe1); - } - throw t; + long fds = IOUtil.makePipe(false); + this.fd0 = (int) (fds >>> 32); + this.fd1 = (int) fds; + } catch (IOException ioe) { + KQueue.freePollArray(pollArrayAddress); + FileDispatcherImpl.closeIntFD(kqfd); + throw ioe; } + + // register one end of the socket pair for wakeups + KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); } private void ensureOpen() { - if (closed) + if (!isOpen()) throw new ClosedSelectorException(); } @Override - protected int doSelect(long timeout) - throws IOException - { - ensureOpen(); + protected int doSelect(long timeout) throws IOException { + assert Thread.holdsLock(this); + int numEntries; + processUpdateQueue(); processDeregisterQueue(); try { begin(); - numEntries = kqueueWrapper.poll(timeout); + + long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout + boolean timedPoll = (to > 0); + do { + long startTime = timedPoll ? System.nanoTime() : 0; + numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); + if (numEntries == IOStatus.INTERRUPTED && timedPoll) { + // timed poll interrupted so need to adjust timeout + long adjust = System.nanoTime() - startTime; + to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); + if (to <= 0) { + // timeout expired so no retry + numEntries = 0; + } + } + } while (numEntries == IOStatus.INTERRUPTED); + assert IOStatus.check(numEntries); + } finally { end(); } @@ -123,39 +144,100 @@ } /** + * Process new registrations and changes to the interest ops. + */ + private void processUpdateQueue() { + assert Thread.holdsLock(this); + + synchronized (updateLock) { + SelectionKeyImpl ski; + + // new registrations + while ((ski = newKeys.pollFirst()) != null) { + if (ski.isValid()) { + SelChImpl ch = ski.channel; + int fd = ch.getFDVal(); + SelectionKeyImpl previous = fdToKey.put(fd, ski); + assert previous == null; + assert registeredReadFilter.get(fd) == false; + assert registeredWriteFilter.get(fd) == false; + } + } + + // changes to interest ops + assert updateKeys.size() == updateOps.size(); + while ((ski = updateKeys.pollFirst()) != null) { + int ops = updateOps.pollFirst(); + int fd = ski.channel.getFDVal(); + if (ski.isValid() && fdToKey.containsKey(fd)) { + // add or delete interest in read events + if (registeredReadFilter.get(fd)) { + if ((ops & Net.POLLIN) == 0) { + KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); + registeredReadFilter.clear(fd); + } + } else if ((ops & Net.POLLIN) != 0) { + KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD); + registeredReadFilter.set(fd); + } + + // add or delete interest in write events + if (registeredWriteFilter.get(fd)) { + if ((ops & Net.POLLOUT) == 0) { + KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); + registeredWriteFilter.clear(fd); + } + } else if ((ops & Net.POLLOUT) != 0) { + KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD); + registeredWriteFilter.set(fd); + } + } + } + } + } + + /** * Update the keys whose fd's have been selected by kqueue. * Add the ready keys to the selected key set. * If the interrupt fd has been selected, drain it and clear the interrupt. */ - private int updateSelectedKeys(int numEntries) - throws IOException - { + private int updateSelectedKeys(int numEntries) throws IOException { + assert Thread.holdsLock(this); + assert Thread.holdsLock(nioSelectedKeys()); + int numKeysUpdated = 0; boolean interrupted = false; // A file descriptor may be registered with kqueue with more than one - // filter and so there may be more than one event for a fd. The update - // count in the MapEntry tracks when the fd was last updated and this - // ensures that the ready ops are updated rather than replaced by a - // second or subsequent event. - updateCount++; + // filter and so there may be more than one event for a fd. The poll + // count is incremented here and compared against the SelectionKey's + // "lastPolled" field. This ensures that the ready ops is updated rather + // than replaced when a file descriptor is polled by both the read and + // write filter. + pollCount++; for (int i = 0; i < numEntries; i++) { - int nextFD = kqueueWrapper.getDescriptor(i); - if (nextFD == fd0) { + long kevent = KQueue.getEvent(pollArrayAddress, i); + int fd = KQueue.getDescriptor(kevent); + if (fd == fd0) { interrupted = true; } else { - MapEntry me = fdMap.get(Integer.valueOf(nextFD)); - if (me != null) { - int rOps = kqueueWrapper.getReventOps(i); - SelectionKeyImpl ski = me.ski; + SelectionKeyImpl ski = fdToKey.get(fd); + if (ski != null) { + int rOps = 0; + short filter = KQueue.getFilter(kevent); + if (filter == EVFILT_READ) { + rOps |= Net.POLLIN; + } else if (filter == EVFILT_WRITE) { + rOps |= Net.POLLOUT; + } + if (selectedKeys.contains(ski)) { - // first time this file descriptor has been encountered on this - // update? - if (me.updateCount != updateCount) { + // file descriptor may be polled more than once per poll + if (ski.lastPolled != pollCount) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; - me.updateCount = updateCount; + ski.lastPolled = pollCount; } } else { // ready ops have already been set on this update @@ -166,7 +248,7 @@ if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { selectedKeys.add(ski); numKeysUpdated++; - me.updateCount = updateCount; + ski.lastPolled = pollCount; } } } @@ -181,63 +263,90 @@ @Override protected void implClose() throws IOException { - if (!closed) { - closed = true; + assert !isOpen(); + assert Thread.holdsLock(this); + assert Thread.holdsLock(nioKeys()); - // prevent further wakeup - synchronized (interruptLock) { - interruptTriggered = true; - } + // prevent further wakeup + synchronized (interruptLock) { + interruptTriggered = true; + } - kqueueWrapper.close(); - FileDispatcherImpl.closeIntFD(fd0); - FileDispatcherImpl.closeIntFD(fd1); - - // Deregister channels - Iterator i = keys.iterator(); - while (i.hasNext()) { - SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); - deregister(ski); - SelectableChannel selch = ski.channel(); - if (!selch.isOpen() && !selch.isRegistered()) - ((SelChImpl)selch).kill(); - i.remove(); - } + FileDispatcherImpl.closeIntFD(kqfd); + KQueue.freePollArray(pollArrayAddress); + + FileDispatcherImpl.closeIntFD(fd0); + FileDispatcherImpl.closeIntFD(fd1); + + // Deregister channels + Iterator i = keys.iterator(); + while (i.hasNext()) { + SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); + deregister(ski); + SelectableChannel selch = ski.channel(); + if (!selch.isOpen() && !selch.isRegistered()) + ((SelChImpl)selch).kill(); + i.remove(); } } @Override protected void implRegister(SelectionKeyImpl ski) { + assert Thread.holdsLock(nioKeys()); ensureOpen(); - int fd = IOUtil.fdVal(ski.channel.getFD()); - fdMap.put(Integer.valueOf(fd), new MapEntry(ski)); + synchronized (updateLock) { + newKeys.addLast(ski); + } keys.add(ski); } @Override protected void implDereg(SelectionKeyImpl ski) throws IOException { + assert !ski.isValid(); + assert Thread.holdsLock(this); + assert Thread.holdsLock(nioKeys()); + assert Thread.holdsLock(nioSelectedKeys()); + int fd = ski.channel.getFDVal(); - fdMap.remove(Integer.valueOf(fd)); - kqueueWrapper.release(ski.channel); - keys.remove(ski); + fdToKey.remove(fd); + if (registeredReadFilter.get(fd)) { + KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE); + registeredReadFilter.clear(fd); + } + if (registeredWriteFilter.get(fd)) { + KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE); + registeredWriteFilter.clear(fd); + } + selectedKeys.remove(ski); + keys.remove(ski); + + // remove from channel's key set deregister(ski); + SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) - ((SelChImpl)selch).kill(); + ((SelChImpl) selch).kill(); } @Override public void putEventOps(SelectionKeyImpl ski, int ops) { ensureOpen(); - kqueueWrapper.setInterest(ski.channel, ops); + synchronized (updateLock) { + updateOps.addLast(ops); // ops first in case adding the key fails + updateKeys.addLast(ski); + } } @Override public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { - kqueueWrapper.interrupt(); + try { + IOUtil.write1(fd1, (byte)0); + } catch (IOException ioe) { + throw new InternalError(ioe); + } interruptTriggered = true; } }