< prev index next >
src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java
Print this page
rev 49271 : [mq]: selector-cleanup
@@ -24,100 +24,192 @@
*/
package sun.nio.ch;
import java.io.IOException;
-import java.nio.channels.*;
-import java.nio.channels.spi.*;
-import java.util.*;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayDeque;
+import java.util.BitSet;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static sun.nio.ch.EPoll.EPOLLIN;
+import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
+import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
+import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
+
/**
- * An implementation of Selector for Linux 2.6+ kernels that uses
- * the epoll event notification facility.
+ * Linux epoll based Selector implementation
*/
-class EPollSelectorImpl
- extends SelectorImpl
-{
- // File descriptors used for interrupt
+
+class EPollSelectorImpl extends SelectorImpl {
+
+ // maximum number of events to poll in one call to epoll_wait
+ private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
+
+ // epoll file descriptor
+ private final int epfd;
+
+ // address of poll array when polling with epoll_wait
+ private final long pollArrayAddress;
+
+ // file descriptors used for interrupt
private final int fd0;
private final int fd1;
- // The poll object
- private final EPollArrayWrapper pollWrapper;
+ // maps file descriptor to selection key, synchronize on selector
+ private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
- // Maps from file descriptors to keys
- private final Map<Integer, SelectionKeyImpl> fdToKey;
+ // file descriptors registered with epoll, synchronize on selector
+ private final BitSet registered = new BitSet();
- // True if this Selector has been closed
- private volatile boolean closed;
+ // pending new registrations/updates, queued by implRegister and putEventOps
+ private final Object updateLock = new Object();
+ private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
+ private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
+ private final Deque<Integer> updateOps = new ArrayDeque<>();
- // Lock for interrupt triggering and clearing
+ // interrupt triggering and clearing
private final Object interruptLock = new Object();
- private boolean interruptTriggered = false;
+ private boolean interruptTriggered;
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
- long pipeFds = IOUtil.makePipe(false);
- fd0 = (int) (pipeFds >>> 32);
- fd1 = (int) pipeFds;
- try {
- pollWrapper = new EPollArrayWrapper(fd0, fd1);
- fdToKey = new HashMap<>();
- } catch (Throwable t) {
- try {
- FileDispatcherImpl.closeIntFD(fd0);
- } catch (IOException ioe0) {
- t.addSuppressed(ioe0);
- }
+
+ this.epfd = EPoll.create();
+ this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
+
try {
- FileDispatcherImpl.closeIntFD(fd1);
- } catch (IOException ioe1) {
- t.addSuppressed(ioe1);
- }
- throw t;
+ long fds = IOUtil.makePipe(false);
+ this.fd0 = (int) (fds >>> 32);
+ this.fd1 = (int) fds;
+ } catch (IOException ioe) {
+ EPoll.freePollArray(pollArrayAddress);
+ FileDispatcherImpl.closeIntFD(epfd);
+ throw ioe;
}
+
+ // register one end of the socket pair for wakeups
+ EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
private void ensureOpen() {
- if (closed)
+ if (!isOpen())
throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) throws IOException {
- ensureOpen();
+ assert Thread.holdsLock(this);
+
int numEntries;
+ processUpdateQueue();
processDeregisterQueue();
try {
begin();
- numEntries = pollWrapper.poll(timeout);
+
+ // epoll_wait timeout is int
+ int to = (int) Math.min(timeout, Integer.MAX_VALUE);
+ boolean timedPoll = (to > 0);
+ do {
+ long startTime = timedPoll ? System.nanoTime() : 0;
+ numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
+ if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
+ // timed poll interrupted so need to adjust timeout
+ long adjust = System.nanoTime() - startTime;
+ to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
+ if (to <= 0) {
+ // timeout expired so no retry
+ numEntries = 0;
+ }
+ }
+ } while (numEntries == IOStatus.INTERRUPTED);
+ assert IOStatus.check(numEntries);
+
} finally {
end();
}
processDeregisterQueue();
return updateSelectedKeys(numEntries);
}
/**
+ * Process new registrations and changes to the interest ops.
+ */
+ private void processUpdateQueue() {
+ assert Thread.holdsLock(this);
+
+ synchronized (updateLock) {
+ SelectionKeyImpl ski;
+
+ // new registrations
+ while ((ski = newKeys.pollFirst()) != null) {
+ if (ski.isValid()) {
+ SelChImpl ch = ski.channel;
+ int fd = ch.getFDVal();
+ SelectionKeyImpl previous = fdToKey.put(fd, ski);
+ assert previous == null;
+ assert registered.get(fd) == false;
+ }
+ }
+
+ // changes to interest ops
+ assert updateKeys.size() == updateOps.size();
+ while ((ski = updateKeys.pollFirst()) != null) {
+ int ops = updateOps.pollFirst();
+ int fd = ski.channel.getFDVal();
+ if (ski.isValid() && fdToKey.containsKey(fd)) {
+ if (registered.get(fd)) {
+ if (ops == 0) {
+ // remove from epoll
+ EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
+ registered.clear(fd);
+ } else {
+ // modify events
+ EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops);
+ }
+ } else if (ops != 0) {
+ // add to epoll
+ EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops);
+ registered.set(fd);
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Update the keys whose fd's have been selected by the epoll.
* Add the ready keys to the ready queue.
*/
private int updateSelectedKeys(int numEntries) throws IOException {
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(nioSelectedKeys());
+
boolean interrupted = false;
int numKeysUpdated = 0;
for (int i=0; i<numEntries; i++) {
- int nextFD = pollWrapper.getDescriptor(i);
- if (nextFD == fd0) {
+ long event = EPoll.getEvent(pollArrayAddress, i);
+ int fd = EPoll.getDescriptor(event);
+ if (fd == fd0) {
interrupted = true;
} else {
- SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
+ SelectionKeyImpl ski = fdToKey.get(fd);
if (ski != null) {
- int rOps = pollWrapper.getEventOps(i);
+ int rOps = EPoll.getEvents(event);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
@@ -138,20 +230,21 @@
return numKeysUpdated;
}
@Override
protected void implClose() throws IOException {
- if (closed)
- return;
- closed = true;
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
- pollWrapper.close();
+ FileDispatcherImpl.closeIntFD(epfd);
+ EPoll.freePollArray(pollArrayAddress);
+
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
@@ -165,46 +258,61 @@
}
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
+ assert Thread.holdsLock(nioKeys());
ensureOpen();
- SelChImpl ch = ski.channel;
- int fd = Integer.valueOf(ch.getFDVal());
- fdToKey.put(fd, ski);
- pollWrapper.add(fd);
+ synchronized (updateLock) {
+ newKeys.addLast(ski);
+ }
keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
- assert (ski.getIndex() >= 0);
- SelChImpl ch = ski.channel;
- int fd = ch.getFDVal();
- fdToKey.remove(Integer.valueOf(fd));
- pollWrapper.remove(fd);
- ski.setIndex(-1);
- keys.remove(ski);
+ assert !ski.isValid();
+ assert Thread.holdsLock(this);
+ assert Thread.holdsLock(nioKeys());
+ assert Thread.holdsLock(nioSelectedKeys());
+
+ int fd = ski.channel.getFDVal();
+ fdToKey.remove(fd);
+ if (registered.get(fd)) {
+ EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
+ registered.clear(fd);
+ }
+
selectedKeys.remove(ski);
+ keys.remove(ski);
+
+ // remove from channel's key set
deregister(ski);
+
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
+ ((SelChImpl) selch).kill();
}
@Override
public void putEventOps(SelectionKeyImpl ski, int ops) {
ensureOpen();
- SelChImpl ch = ski.channel;
- pollWrapper.setInterest(ch.getFDVal(), ops);
+ synchronized (updateLock) {
+ updateOps.addLast(ops); // ops first in case adding the key fails
+ updateKeys.addLast(ski);
+ }
}
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
- pollWrapper.interrupt();
+ try {
+ IOUtil.write1(fd1, (byte)0);
+ } catch (IOException ioe) {
+ throw new InternalError(ioe);
+ }
interruptTriggered = true;
}
}
return this;
}
< prev index next >