src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java

Print this page

        

@@ -27,22 +27,15 @@
  */
 
 
 package sun.nio.ch;
 
+import java.nio.channels.*;
 import java.nio.channels.spi.SelectorProvider;
-import java.nio.channels.Selector;
-import java.nio.channels.ClosedSelectorException;
-import java.nio.channels.Pipe;
-import java.nio.channels.SelectableChannel;
 import java.io.IOException;
-import java.nio.channels.CancelledKeyException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import sun.misc.ManagedLocalsThread;
+import java.util.*;
+import java.util.function.Consumer;
 
 /**
  * A multi-threaded implementation of Selector for Windows.
  *
  * @author Konstantin Kladko

@@ -135,17 +128,44 @@
 
         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
     }
 
     protected int doSelect(long timeout) throws IOException {
+        if (pollSubSelector(timeout))
+            return 0;
+
+        int updated = updateSelectedKeys();
+        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
+        resetWakeupSocket();
+        return updated;
+    }
+
+    @Override
+    protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException {
+        Objects.requireNonNull(handler);
+        if (pollSubSelector(timeout))
+            return 0;
+
+        updateCount++;
+        int numKeysUpdated = 0;
+        numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler);
+        for (SelectThread t: threads) {
+            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler);
+        }
+        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
+        resetWakeupSocket();
+        return numKeysUpdated;
+    }
+
+    private boolean pollSubSelector(long timeout) throws IOException {
         if (channelArray == null)
             throw new ClosedSelectorException();
         this.timeout = timeout; // set selector timeout
         processDeregisterQueue();
         if (interruptTriggered) {
             resetWakeupSocket();
-            return 0;
+            return true;
         }
         // Calculate number of helper threads needed for poll. If necessary
         // threads are created here and start waiting on startLock
         adjustThreadsCount();
         finishLock.reset(); // reset finishLock

@@ -168,14 +188,11 @@
               end();
           }
         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
         finishLock.checkForException();
         processDeregisterQueue();
-        int updated = updateSelectedKeys();
-        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
-        resetWakeupSocket();
-        return updated;
+        return false;
     }
 
     // Helper threads wait on this lock for the next poll.
     private final StartLock startLock = new StartLock();
 

@@ -340,32 +357,21 @@
                                  boolean isExceptFds)
         {
             int numKeysUpdated = 0;
             for (int i = 1; i <= fds[0]; i++) {
                 int desc = fds[i];
-                if (desc == wakeupSourceFd) {
-                    synchronized (interruptLock) {
-                        interruptTriggered = true;
-                    }
+                if (checkWakeup(desc))
                     continue;
-                }
                 MapEntry me = fdMap.get(desc);
                 // If me is null, the key was deregistered in the previous
                 // processDeregisterQueue.
                 if (me == null)
                     continue;
                 SelectionKeyImpl sk = me.ski;
 
-                // The descriptor may be in the exceptfds set because there is
-                // OOB data queued to the socket. If there is OOB data then it
-                // is discarded and the key is not added to the selected set.
-                if (isExceptFds &&
-                    (sk.channel() instanceof SocketChannelImpl) &&
-                    discardUrgentData(desc))
-                {
+                if (isInterestingFileDescriptor(isExceptFds, desc, sk))
                     continue;
-                }
 
                 if (selectedKeys.contains(sk)) { // Key in selected set
                     if (me.clearedCount != updateCount) {
                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
                             (me.updateCount != updateCount)) {

@@ -381,32 +387,100 @@
                     }
                     me.clearedCount = updateCount;
                 } else { // Key is not in selected set yet
                     if (me.clearedCount != updateCount) {
                         sk.channel.translateAndSetReadyOps(rOps, sk);
-                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
+                    } else { // The readyOps have been set; now add
+                        sk.channel.translateAndUpdateReadyOps(rOps, sk);
+                    }
+                    if (sk.hasOps()) {
                             selectedKeys.add(sk);
                             me.updateCount = updateCount;
                             numKeysUpdated++;
                         }
+                    me.clearedCount = updateCount;
+                }
+            }
+            return numKeysUpdated;
+        }
+
+        private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) {
+            // The descriptor may be in the exceptfds set because there is
+            // OOB data queued to the socket. If there is OOB data then it
+            // is discarded and the key is not added to the selected set.
+            return isExceptFds &&
+                (sk.channel() instanceof SocketChannelImpl) &&
+                discardUrgentData(desc);
+        }
+
+        private int processFDSet(long updateCount, int[] fds, int rOps,
+                                 boolean isExceptFds, Consumer<SelectionKey> handler)
+        {
+            int numKeysUpdated = 0;
+            for (int i = 1; i <= fds[0]; i++) {
+                int desc = fds[i];
+                if (checkWakeup(desc))
+                    continue;
+                MapEntry me = fdMap.get(desc);
+                // If me is null, the key was deregistered in the previous
+                // processDeregisterQueue.
+                if (me == null)
+                    continue;
+                SelectionKeyImpl sk = me.ski;
+
+                if (isInterestingFileDescriptor(isExceptFds, desc, sk))
+                    continue;
+
+                if (me.clearedCount != updateCount) {
+                    sk.channel.translateAndSetReadyOps(rOps, sk);
                     } else { // The readyOps have been set; now add
                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
-                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
-                            selectedKeys.add(sk);
+                }
+
+                if (sk.hasOps()) {
+                    handler.accept(sk);
                             me.updateCount = updateCount;
                             numKeysUpdated++;
                         }
-                    }
                     me.clearedCount = updateCount;
                 }
+            return numKeysUpdated;
+        }
+
+        private boolean checkWakeup(int desc) {
+            if (desc == wakeupSourceFd) {
+                synchronized (interruptLock) {
+                    interruptTriggered = true;
+                }
+                return true;
+            }
+            return false;
             }
+
+        public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) {
+            int numKeysUpdated = 0;
+            numKeysUpdated += processFDSet(updateCount, readFds,
+                                           Net.POLLIN,
+                                           false,
+                                           handler);
+            numKeysUpdated += processFDSet(updateCount, writeFds,
+                                           Net.POLLCONN |
+                                           Net.POLLOUT,
+                                           false,
+                                           handler);
+            numKeysUpdated += processFDSet(updateCount, exceptFds,
+                                           Net.POLLIN |
+                                           Net.POLLCONN |
+                                           Net.POLLOUT,
+                                           true,
+                                           handler);
             return numKeysUpdated;
         }
     }
 
     // Represents a helper thread used for select.
-    private final class SelectThread extends ManagedLocalsThread {
+    private final class SelectThread extends Thread {
         private final int index; // index of this thread
         final SubSelector subSelector;
         private long lastRun = 0; // last run number
         private volatile boolean zombie;
         // Creates a new thread