< prev index next >

src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java

Print this page
rev 50580 : [mq]: select-with-consumer

@@ -26,18 +26,20 @@
 package sun.nio.ch;
 
 import java.io.IOException;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.Pipe;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * A multi-threaded implementation of Selector for Windows.
  *
  * @author Konstantin Kladko

@@ -137,11 +139,13 @@
         if (!isOpen())
             throw new ClosedSelectorException();
     }
 
     @Override
-    protected int doSelect(long timeout) throws IOException {
+    protected int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
         assert Thread.holdsLock(this);
         this.timeout = timeout; // set selector timeout
         processUpdateQueue();
         processDeregisterQueue();
         if (interruptTriggered) {

@@ -171,11 +175,11 @@
               end();
           }
         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
         finishLock.checkForException();
         processDeregisterQueue();
-        int updated = updateSelectedKeys();
+        int updated = updateSelectedKeys(action);
         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
         resetWakeupSocket();
         return updated;
     }
 

@@ -347,20 +351,20 @@
         }
 
         private native int poll0(long pollAddress, int numfds,
              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
 
-        private int processSelectedKeys(long updateCount) {
+        private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
             int numKeysUpdated = 0;
-            numKeysUpdated += processFDSet(updateCount, readFds,
+            numKeysUpdated += processFDSet(updateCount, action, readFds,
                                            Net.POLLIN,
                                            false);
-            numKeysUpdated += processFDSet(updateCount, writeFds,
+            numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                            Net.POLLCONN |
                                            Net.POLLOUT,
                                            false);
-            numKeysUpdated += processFDSet(updateCount, exceptFds,
+            numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                            Net.POLLIN |
                                            Net.POLLCONN |
                                            Net.POLLOUT,
                                            true);
             return numKeysUpdated;

@@ -370,11 +374,13 @@
          * updateCount is used to tell if a key has been counted as updated
          * in this select operation.
          *
          * me.updateCount <= updateCount
          */
-        private int processFDSet(long updateCount, int[] fds, int rOps,
+        private int processFDSet(long updateCount,
+                                 Consumer<SelectionKey> action,
+                                 int[] fds, int rOps,
                                  boolean isExceptFds)
         {
             int numKeysUpdated = 0;
             for (int i = 1; i <= fds[0]; i++) {
                 int desc = fds[i];

@@ -399,26 +405,16 @@
                     discardUrgentData(desc))
                 {
                     continue;
                 }
 
-                if (selectedKeys.contains(sk)) { // Key in selected set
-                    if (sk.translateAndUpdateReadyOps(rOps)) {
-                        if (me.updateCount != updateCount) {
-                            me.updateCount = updateCount;
-                            numKeysUpdated++;
-                        }
-                    }
-                } else { // Key is not in selected set yet
-                    sk.translateAndSetReadyOps(rOps);
-                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
-                        selectedKeys.add(sk);
+                int updated = processReadyEvents(rOps, sk, action);
+                if (updated > 0 && me.updateCount != updateCount) {
                         me.updateCount = updateCount;
                         numKeysUpdated++;
                     }
                 }
-            }
             return numKeysUpdated;
         }
     }
 
     // Represents a helper thread used for select.

@@ -507,16 +503,16 @@
     // appear in readfds and writefds.
     private long updateCount = 0;
 
     // Update ops of the corresponding Channels. Add the ready keys to the
     // ready queue.
-    private int updateSelectedKeys() {
+    private int updateSelectedKeys(Consumer<SelectionKey> action) {
         updateCount++;
         int numKeysUpdated = 0;
-        numKeysUpdated += subSelector.processSelectedKeys(updateCount);
+        numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
         for (SelectThread t: threads) {
-            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
+            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
         }
         return numKeysUpdated;
     }
 
     @Override
< prev index next >