< prev index next >
src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
Print this page
rev 50580 : [mq]: select-with-consumer
@@ -26,18 +26,20 @@
package sun.nio.ch;
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
+import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
* A multi-threaded implementation of Selector for Windows.
*
* @author Konstantin Kladko
@@ -137,11 +139,13 @@
if (!isOpen())
throw new ClosedSelectorException();
}
@Override
- protected int doSelect(long timeout) throws IOException {
+ protected int doSelect(Consumer<SelectionKey> action, long timeout)
+ throws IOException
+ {
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue();
processDeregisterQueue();
if (interruptTriggered) {
@@ -171,11 +175,11 @@
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
- int updated = updateSelectedKeys();
+ int updated = updateSelectedKeys(action);
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
@@ -347,20 +351,20 @@
}
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
- private int processSelectedKeys(long updateCount) {
+ private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
int numKeysUpdated = 0;
- numKeysUpdated += processFDSet(updateCount, readFds,
+ numKeysUpdated += processFDSet(updateCount, action, readFds,
Net.POLLIN,
false);
- numKeysUpdated += processFDSet(updateCount, writeFds,
+ numKeysUpdated += processFDSet(updateCount, action, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
- numKeysUpdated += processFDSet(updateCount, exceptFds,
+ numKeysUpdated += processFDSet(updateCount, action, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
@@ -370,11 +374,13 @@
* updateCount is used to tell if a key has been counted as updated
* in this select operation.
*
* me.updateCount <= updateCount
*/
- private int processFDSet(long updateCount, int[] fds, int rOps,
+ private int processFDSet(long updateCount,
+ Consumer<SelectionKey> action,
+ int[] fds, int rOps,
boolean isExceptFds)
{
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {
int desc = fds[i];
@@ -399,26 +405,16 @@
discardUrgentData(desc))
{
continue;
}
- if (selectedKeys.contains(sk)) { // Key in selected set
- if (sk.translateAndUpdateReadyOps(rOps)) {
- if (me.updateCount != updateCount) {
- me.updateCount = updateCount;
- numKeysUpdated++;
- }
- }
- } else { // Key is not in selected set yet
- sk.translateAndSetReadyOps(rOps);
- if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
- selectedKeys.add(sk);
+ int updated = processReadyEvents(rOps, sk, action);
+ if (updated > 0 && me.updateCount != updateCount) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
- }
return numKeysUpdated;
}
}
// Represents a helper thread used for select.
@@ -507,16 +503,16 @@
// appear in readfds and writefds.
private long updateCount = 0;
// Update ops of the corresponding Channels. Add the ready keys to the
// ready queue.
- private int updateSelectedKeys() {
+ private int updateSelectedKeys(Consumer<SelectionKey> action) {
updateCount++;
int numKeysUpdated = 0;
- numKeysUpdated += subSelector.processSelectedKeys(updateCount);
+ numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
for (SelectThread t: threads) {
- numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
+ numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
}
return numKeysUpdated;
}
@Override
< prev index next >