src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java

Print this page

        

*** 27,48 **** */ package sun.nio.ch; import java.nio.channels.spi.SelectorProvider; - import java.nio.channels.Selector; - import java.nio.channels.ClosedSelectorException; - import java.nio.channels.Pipe; - import java.nio.channels.SelectableChannel; import java.io.IOException; ! import java.nio.channels.CancelledKeyException; ! import java.util.List; ! import java.util.ArrayList; ! import java.util.HashMap; ! import java.util.Iterator; ! import sun.misc.ManagedLocalsThread; /** * A multi-threaded implementation of Selector for Windows. * * @author Konstantin Kladko --- 27,41 ---- */ package sun.nio.ch; + import java.nio.channels.*; import java.nio.channels.spi.SelectorProvider; import java.io.IOException; ! import java.util.*; ! import java.util.function.Consumer; /** * A multi-threaded implementation of Selector for Windows. * * @author Konstantin Kladko
*** 135,151 **** pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); ! return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock --- 128,171 ---- pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } protected int doSelect(long timeout) throws IOException { + if (pollSubSelector(timeout)) + return 0; + + int updated = updateSelectedKeys(); + // Done with poll(). Set wakeupSocket to nonsignaled for the next run. + resetWakeupSocket(); + return updated; + } + + @Override + protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException { + Objects.requireNonNull(handler); + if (pollSubSelector(timeout)) + return 0; + + updateCount++; + int numKeysUpdated = 0; + numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler); + for (SelectThread t: threads) { + numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler); + } + // Done with poll(). Set wakeupSocket to nonsignaled for the next run. + resetWakeupSocket(); + return numKeysUpdated; + } + + private boolean pollSubSelector(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); ! return true; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock
*** 168,181 **** end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); ! int updated = updateSelectedKeys(); ! // Done with poll(). Set wakeupSocket to nonsignaled for the next run. ! resetWakeupSocket(); ! return updated; } // Helper threads wait on this lock for the next poll. private final StartLock startLock = new StartLock(); --- 188,198 ---- end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); ! return false; } // Helper threads wait on this lock for the next poll. private final StartLock startLock = new StartLock();
*** 340,371 **** boolean isExceptFds) { int numKeysUpdated = 0; for (int i = 1; i <= fds[0]; i++) { int desc = fds[i]; ! if (desc == wakeupSourceFd) { ! synchronized (interruptLock) { ! interruptTriggered = true; ! } continue; - } MapEntry me = fdMap.get(desc); // If me is null, the key was deregistered in the previous // processDeregisterQueue. if (me == null) continue; SelectionKeyImpl sk = me.ski; ! // The descriptor may be in the exceptfds set because there is ! // OOB data queued to the socket. If there is OOB data then it ! // is discarded and the key is not added to the selected set. ! if (isExceptFds && ! (sk.channel() instanceof SocketChannelImpl) && ! discardUrgentData(desc)) ! { continue; - } if (selectedKeys.contains(sk)) { // Key in selected set if (me.clearedCount != updateCount) { if (sk.channel.translateAndSetReadyOps(rOps, sk) && (me.updateCount != updateCount)) { --- 357,377 ---- boolean isExceptFds) { int numKeysUpdated = 0; for (int i = 1; i <= fds[0]; i++) { int desc = fds[i]; ! if (checkWakeup(desc)) continue; MapEntry me = fdMap.get(desc); // If me is null, the key was deregistered in the previous // processDeregisterQueue. if (me == null) continue; SelectionKeyImpl sk = me.ski; ! if (isInterestingFileDescriptor(isExceptFds, desc, sk)) continue; if (selectedKeys.contains(sk)) { // Key in selected set if (me.clearedCount != updateCount) { if (sk.channel.translateAndSetReadyOps(rOps, sk) && (me.updateCount != updateCount)) {
*** 381,412 **** } me.clearedCount = updateCount; } else { // Key is not in selected set yet if (me.clearedCount != updateCount) { sk.channel.translateAndSetReadyOps(rOps, sk); ! if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } } else { // The readyOps have been set; now add sk.channel.translateAndUpdateReadyOps(rOps, sk); ! if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { ! selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } - } me.clearedCount = updateCount; } } return numKeysUpdated; } } // Represents a helper thread used for select. ! private final class SelectThread extends ManagedLocalsThread { private final int index; // index of this thread final SubSelector subSelector; private long lastRun = 0; // last run number private volatile boolean zombie; // Creates a new thread --- 387,486 ---- } me.clearedCount = updateCount; } else { // Key is not in selected set yet if (me.clearedCount != updateCount) { sk.channel.translateAndSetReadyOps(rOps, sk); ! } else { // The readyOps have been set; now add ! sk.channel.translateAndUpdateReadyOps(rOps, sk); ! } ! if (sk.hasOps()) { selectedKeys.add(sk); me.updateCount = updateCount; numKeysUpdated++; } + me.clearedCount = updateCount; + } + } + return numKeysUpdated; + } + + private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) { + // The descriptor may be in the exceptfds set because there is + // OOB data queued to the socket. If there is OOB data then it + // is discarded and the key is not added to the selected set. + return isExceptFds && + (sk.channel() instanceof SocketChannelImpl) && + discardUrgentData(desc); + } + + private int processFDSet(long updateCount, int[] fds, int rOps, + boolean isExceptFds, Consumer<SelectionKey> handler) + { + int numKeysUpdated = 0; + for (int i = 1; i <= fds[0]; i++) { + int desc = fds[i]; + if (checkWakeup(desc)) + continue; + MapEntry me = fdMap.get(desc); + // If me is null, the key was deregistered in the previous + // processDeregisterQueue. + if (me == null) + continue; + SelectionKeyImpl sk = me.ski; + + if (isInterestingFileDescriptor(isExceptFds, desc, sk)) + continue; + + if (me.clearedCount != updateCount) { + sk.channel.translateAndSetReadyOps(rOps, sk); } else { // The readyOps have been set; now add sk.channel.translateAndUpdateReadyOps(rOps, sk); ! } ! ! if (sk.hasOps()) { ! handler.accept(sk); me.updateCount = updateCount; numKeysUpdated++; } me.clearedCount = updateCount; } + return numKeysUpdated; + } + + private boolean checkWakeup(int desc) { + if (desc == wakeupSourceFd) { + synchronized (interruptLock) { + interruptTriggered = true; + } + return true; + } + return false; } + + public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) { + int numKeysUpdated = 0; + numKeysUpdated += processFDSet(updateCount, readFds, + Net.POLLIN, + false, + handler); + numKeysUpdated += processFDSet(updateCount, writeFds, + Net.POLLCONN | + Net.POLLOUT, + false, + handler); + numKeysUpdated += processFDSet(updateCount, exceptFds, + Net.POLLIN | + Net.POLLCONN | + Net.POLLOUT, + true, + handler); return numKeysUpdated; } } // Represents a helper thread used for select. ! private final class SelectThread extends Thread { private final int index; // index of this thread final SubSelector subSelector; private long lastRun = 0; // last run number private volatile boolean zombie; // Creates a new thread