src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
Print this page
@@ -27,22 +27,15 @@
*/
package sun.nio.ch;
+import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
-import java.nio.channels.Selector;
-import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.Pipe;
-import java.nio.channels.SelectableChannel;
import java.io.IOException;
-import java.nio.channels.CancelledKeyException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import sun.misc.ManagedLocalsThread;
+import java.util.*;
+import java.util.function.Consumer;
/**
* A multi-threaded implementation of Selector for Windows.
*
* @author Konstantin Kladko
@@ -135,17 +128,44 @@
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
protected int doSelect(long timeout) throws IOException {
+ if (pollSubSelector(timeout))
+ return 0;
+
+ int updated = updateSelectedKeys();
+ // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
+ resetWakeupSocket();
+ return updated;
+ }
+
+ @Override
+ protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException {
+ Objects.requireNonNull(handler);
+ if (pollSubSelector(timeout))
+ return 0;
+
+ updateCount++;
+ int numKeysUpdated = 0;
+ numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler);
+ for (SelectThread t: threads) {
+ numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler);
+ }
+ // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
+ resetWakeupSocket();
+ return numKeysUpdated;
+ }
+
+ private boolean pollSubSelector(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
- return 0;
+ return true;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
@@ -168,14 +188,11 @@
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
- int updated = updateSelectedKeys();
- // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
- resetWakeupSocket();
- return updated;
+ return false;
}
// Helper threads wait on this lock for the next poll.
private final StartLock startLock = new StartLock();
@@ -340,32 +357,21 @@
boolean isExceptFds)
{
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
- if (desc == wakeupSourceFd) {
- synchronized (interruptLock) {
- interruptTriggered = true;
- }
+ if (checkWakeup(desc))
continue;
- }
MapEntry me = fdMap.get(desc);
// If me is null, the key was deregistered in the previous
// processDeregisterQueue.
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
- // The descriptor may be in the exceptfds set because there is
- // OOB data queued to the socket. If there is OOB data then it
- // is discarded and the key is not added to the selected set.
- if (isExceptFds &&
- (sk.channel() instanceof SocketChannelImpl) &&
- discardUrgentData(desc))
- {
+ if (isInterestingFileDescriptor(isExceptFds, desc, sk))
continue;
- }
if (selectedKeys.contains(sk)) { // Key in selected set
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
@@ -381,32 +387,100 @@
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
- if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
+ } else { // The readyOps have been set; now add
+ sk.channel.translateAndUpdateReadyOps(rOps, sk);
+ }
+ if (sk.hasOps()) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
+ me.clearedCount = updateCount;
+ }
+ }
+ return numKeysUpdated;
+ }
+
+ private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) {
+ // The descriptor may be in the exceptfds set because there is
+ // OOB data queued to the socket. If there is OOB data then it
+ // is discarded and the key is not added to the selected set.
+ return isExceptFds &&
+ (sk.channel() instanceof SocketChannelImpl) &&
+ discardUrgentData(desc);
+ }
+
+ private int processFDSet(long updateCount, int[] fds, int rOps,
+ boolean isExceptFds, Consumer<SelectionKey> handler)
+ {
+ int numKeysUpdated = 0;
+ for (int i = 1; i <= fds[0]; i++) {
+ int desc = fds[i];
+ if (checkWakeup(desc))
+ continue;
+ MapEntry me = fdMap.get(desc);
+ // If me is null, the key was deregistered in the previous
+ // processDeregisterQueue.
+ if (me == null)
+ continue;
+ SelectionKeyImpl sk = me.ski;
+
+ if (isInterestingFileDescriptor(isExceptFds, desc, sk))
+ continue;
+
+ if (me.clearedCount != updateCount) {
+ sk.channel.translateAndSetReadyOps(rOps, sk);
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
- if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
- selectedKeys.add(sk);
+ }
+
+ if (sk.hasOps()) {
+ handler.accept(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
- }
me.clearedCount = updateCount;
}
+ return numKeysUpdated;
+ }
+
+ private boolean checkWakeup(int desc) {
+ if (desc == wakeupSourceFd) {
+ synchronized (interruptLock) {
+ interruptTriggered = true;
+ }
+ return true;
+ }
+ return false;
}
+
+ public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) {
+ int numKeysUpdated = 0;
+ numKeysUpdated += processFDSet(updateCount, readFds,
+ Net.POLLIN,
+ false,
+ handler);
+ numKeysUpdated += processFDSet(updateCount, writeFds,
+ Net.POLLCONN |
+ Net.POLLOUT,
+ false,
+ handler);
+ numKeysUpdated += processFDSet(updateCount, exceptFds,
+ Net.POLLIN |
+ Net.POLLCONN |
+ Net.POLLOUT,
+ true,
+ handler);
return numKeysUpdated;
}
}
// Represents a helper thread used for select.
- private final class SelectThread extends ManagedLocalsThread {
+ private final class SelectThread extends Thread {
private final int index; // index of this thread
final SubSelector subSelector;
private long lastRun = 0; // last run number
private volatile boolean zombie;
// Creates a new thread