src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java

Print this page




  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 /*
  27  */
  28 
  29 
  30 package sun.nio.ch;
  31 

  32 import java.nio.channels.spi.SelectorProvider;
  33 import java.nio.channels.Selector;
  34 import java.nio.channels.ClosedSelectorException;
  35 import java.nio.channels.Pipe;
  36 import java.nio.channels.SelectableChannel;
  37 import java.io.IOException;
  38 import java.nio.channels.CancelledKeyException;
  39 import java.util.List;
  40 import java.util.ArrayList;
  41 import java.util.HashMap;
  42 import java.util.Iterator;
  43 import sun.misc.ManagedLocalsThread;
  44 
  45 /**
  46  * A multi-threaded implementation of Selector for Windows.
  47  *
  48  * @author Konstantin Kladko
  49  * @author Mark Reinhold
  50  */
  51 
  52 final class WindowsSelectorImpl extends SelectorImpl {
  53     // Initial capacity of the poll array
  54     private final int INIT_CAP = 8;
  55     // Maximum number of sockets for select().
  56     // Should be INIT_CAP times a power of 2
  57     private final static int MAX_SELECTABLE_FDS = 1024;
  58 
  59     // The list of SelectableChannels serviced by this Selector. Every mod
  60     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
  61     // array,  where the corresponding entry is occupied by the wakeupSocket
  62     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  63 


 120 
 121     // Lock for interrupt triggering and clearing
 122     private final Object interruptLock = new Object();
 123     private volatile boolean interruptTriggered = false;
 124 
 125     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
 126         super(sp);
 127         pollWrapper = new PollArrayWrapper(INIT_CAP);
 128         wakeupPipe = Pipe.open();
 129         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
 130 
 131         // Disable the Nagle algorithm so that the wakeup is more immediate
 132         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
 133         (sink.sc).socket().setTcpNoDelay(true);
 134         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
 135 
 136         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
 137     }
 138 
 139     protected int doSelect(long timeout) throws IOException {



























 140         if (channelArray == null)
 141             throw new ClosedSelectorException();
 142         this.timeout = timeout; // set selector timeout
 143         processDeregisterQueue();
 144         if (interruptTriggered) {
 145             resetWakeupSocket();
 146             return 0;
 147         }
 148         // Calculate number of helper threads needed for poll. If necessary
 149         // threads are created here and start waiting on startLock
 150         adjustThreadsCount();
 151         finishLock.reset(); // reset finishLock
 152         // Wakeup helper threads, waiting on startLock, so they start polling.
 153         // Redundant threads will exit here after wakeup.
 154         startLock.startThreads();
 155         // do polling in the main thread. Main thread is responsible for
 156         // first MAX_SELECTABLE_FDS entries in pollArray.
 157         try {
 158             begin();
 159             try {
 160                 subSelector.poll();
 161             } catch (IOException e) {
 162                 finishLock.setException(e); // Save this exception
 163             }
 164             // Main thread is out of poll(). Wakeup others and wait for them
 165             if (threads.size() > 0)
 166                 finishLock.waitForHelperThreads();
 167           } finally {
 168               end();
 169           }
 170         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 171         finishLock.checkForException();
 172         processDeregisterQueue();
 173         int updated = updateSelectedKeys();
 174         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 175         resetWakeupSocket();
 176         return updated;
 177     }
 178 
 179     // Helper threads wait on this lock for the next poll.
 180     private final StartLock startLock = new StartLock();
 181 
 182     private final class StartLock {
 183         // A variable which distinguishes the current run of doSelect from the
 184         // previous one. Incrementing runsCounter and notifying threads will
 185         // trigger another round of poll.
 186         private long runsCounter;
 187        // Triggers threads, waiting on this lock to start polling.
 188         private synchronized void startThreads() {
 189             runsCounter++; // next run
 190             notifyAll(); // wake up threads.
 191         }
 192         // This function is called by a helper thread to wait for the
 193         // next round of poll(). It also checks, if this thread became
 194         // redundant. If yes, it returns true, notifying the thread
 195         // that it should exit.
 196         private synchronized boolean waitForStart(SelectThread thread) {


 325                                            Net.POLLCONN |
 326                                            Net.POLLOUT,
 327                                            true);
 328             return numKeysUpdated;
 329         }
 330 
 331         /**
 332          * Note, clearedCount is used to determine if the readyOps have
 333          * been reset in this select operation. updateCount is used to
 334          * tell if a key has been counted as updated in this select
 335          * operation.
 336          *
 337          * me.updateCount <= me.clearedCount <= updateCount
 338          */
 339         private int processFDSet(long updateCount, int[] fds, int rOps,
 340                                  boolean isExceptFds)
 341         {
 342             int numKeysUpdated = 0;
 343             for (int i = 1; i <= fds[0]; i++) {
 344                 int desc = fds[i];
 345                 if (desc == wakeupSourceFd) {
 346                     synchronized (interruptLock) {
 347                         interruptTriggered = true;
 348                     }
 349                     continue;
 350                 }
 351                 MapEntry me = fdMap.get(desc);
 352                 // If me is null, the key was deregistered in the previous
 353                 // processDeregisterQueue.
 354                 if (me == null)
 355                     continue;
 356                 SelectionKeyImpl sk = me.ski;
 357 
 358                 // The descriptor may be in the exceptfds set because there is
 359                 // OOB data queued to the socket. If there is OOB data then it
 360                 // is discarded and the key is not added to the selected set.
 361                 if (isExceptFds &&
 362                     (sk.channel() instanceof SocketChannelImpl) &&
 363                     discardUrgentData(desc))
 364                 {
 365                     continue;
 366                 }
 367 
 368                 if (selectedKeys.contains(sk)) { // Key in selected set
 369                     if (me.clearedCount != updateCount) {
 370                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
 371                             (me.updateCount != updateCount)) {
 372                             me.updateCount = updateCount;
 373                             numKeysUpdated++;
 374                         }
 375                     } else { // The readyOps have been set; now add
 376                         if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
 377                             (me.updateCount != updateCount)) {
 378                             me.updateCount = updateCount;
 379                             numKeysUpdated++;
 380                         }
 381                     }
 382                     me.clearedCount = updateCount;
 383                 } else { // Key is not in selected set yet
 384                     if (me.clearedCount != updateCount) {
 385                         sk.channel.translateAndSetReadyOps(rOps, sk);
 386                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {



 387                             selectedKeys.add(sk);
 388                             me.updateCount = updateCount;
 389                             numKeysUpdated++;
 390                         }



































 391                     } else { // The readyOps have been set; now add
 392                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
 393                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
 394                             selectedKeys.add(sk);


 395                             me.updateCount = updateCount;
 396                             numKeysUpdated++;
 397                         }
 398                     }
 399                     me.clearedCount = updateCount;
 400                 }











 401             }


















 402             return numKeysUpdated;
 403         }
 404     }
 405 
 406     // Represents a helper thread used for select.
 407     private final class SelectThread extends ManagedLocalsThread {
 408         private final int index; // index of this thread
 409         final SubSelector subSelector;
 410         private long lastRun = 0; // last run number
 411         private volatile boolean zombie;
 412         // Creates a new thread
 413         private SelectThread(int i) {
 414             this.index = i;
 415             this.subSelector = new SubSelector(i);
 416             //make sure we wait for next round of poll
 417             this.lastRun = startLock.runsCounter;
 418         }
 419         void makeZombie() {
 420             zombie = true;
 421         }
 422         boolean isZombie() {
 423             return zombie;
 424         }
 425         public void run() {
 426             while (true) { // poll loop
 427                 // wait for the start of poll. If this thread has become




  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 /*
  27  */
  28 
  29 
  30 package sun.nio.ch;
  31 
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.SelectorProvider;




  34 import java.io.IOException;
  35 import java.util.*;
  36 import java.util.function.Consumer;




  37 
  38 /**
  39  * A multi-threaded implementation of Selector for Windows.
  40  *
  41  * @author Konstantin Kladko
  42  * @author Mark Reinhold
  43  */
  44 
  45 final class WindowsSelectorImpl extends SelectorImpl {
  46     // Initial capacity of the poll array
  47     private final int INIT_CAP = 8;
  48     // Maximum number of sockets for select().
  49     // Should be INIT_CAP times a power of 2
  50     private final static int MAX_SELECTABLE_FDS = 1024;
  51 
  52     // The list of SelectableChannels serviced by this Selector. Every mod
  53     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
  54     // array,  where the corresponding entry is occupied by the wakeupSocket
  55     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  56 


 113 
 114     // Lock for interrupt triggering and clearing
 115     private final Object interruptLock = new Object();
 116     private volatile boolean interruptTriggered = false;
 117 
 118     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
 119         super(sp);
 120         pollWrapper = new PollArrayWrapper(INIT_CAP);
 121         wakeupPipe = Pipe.open();
 122         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
 123 
 124         // Disable the Nagle algorithm so that the wakeup is more immediate
 125         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
 126         (sink.sc).socket().setTcpNoDelay(true);
 127         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
 128 
 129         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
 130     }
 131 
 132     protected int doSelect(long timeout) throws IOException {
 133         if (pollSubSelector(timeout))
 134             return 0;
 135 
 136         int updated = updateSelectedKeys();
 137         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 138         resetWakeupSocket();
 139         return updated;
 140     }
 141 
 142     @Override
 143     protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException {
 144         Objects.requireNonNull(handler);
 145         if (pollSubSelector(timeout))
 146             return 0;
 147 
 148         updateCount++;
 149         int numKeysUpdated = 0;
 150         numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler);
 151         for (SelectThread t: threads) {
 152             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler);
 153         }
 154         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 155         resetWakeupSocket();
 156         return numKeysUpdated;
 157     }
 158 
 159     private boolean pollSubSelector(long timeout) throws IOException {
 160         if (channelArray == null)
 161             throw new ClosedSelectorException();
 162         this.timeout = timeout; // set selector timeout
 163         processDeregisterQueue();
 164         if (interruptTriggered) {
 165             resetWakeupSocket();
 166             return true;
 167         }
 168         // Calculate number of helper threads needed for poll. If necessary
 169         // threads are created here and start waiting on startLock
 170         adjustThreadsCount();
 171         finishLock.reset(); // reset finishLock
 172         // Wakeup helper threads, waiting on startLock, so they start polling.
 173         // Redundant threads will exit here after wakeup.
 174         startLock.startThreads();
 175         // do polling in the main thread. Main thread is responsible for
 176         // first MAX_SELECTABLE_FDS entries in pollArray.
 177         try {
 178             begin();
 179             try {
 180                 subSelector.poll();
 181             } catch (IOException e) {
 182                 finishLock.setException(e); // Save this exception
 183             }
 184             // Main thread is out of poll(). Wakeup others and wait for them
 185             if (threads.size() > 0)
 186                 finishLock.waitForHelperThreads();
 187           } finally {
 188               end();
 189           }
 190         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 191         finishLock.checkForException();
 192         processDeregisterQueue();
 193         return false;



 194     }
 195 
 196     // Helper threads wait on this lock for the next poll.
 197     private final StartLock startLock = new StartLock();
 198 
 199     private final class StartLock {
 200         // A variable which distinguishes the current run of doSelect from the
 201         // previous one. Incrementing runsCounter and notifying threads will
 202         // trigger another round of poll.
 203         private long runsCounter;
 204        // Triggers threads, waiting on this lock to start polling.
 205         private synchronized void startThreads() {
 206             runsCounter++; // next run
 207             notifyAll(); // wake up threads.
 208         }
 209         // This function is called by a helper thread to wait for the
 210         // next round of poll(). It also checks, if this thread became
 211         // redundant. If yes, it returns true, notifying the thread
 212         // that it should exit.
 213         private synchronized boolean waitForStart(SelectThread thread) {


 342                                            Net.POLLCONN |
 343                                            Net.POLLOUT,
 344                                            true);
 345             return numKeysUpdated;
 346         }
 347 
 348         /**
 349          * Note, clearedCount is used to determine if the readyOps have
 350          * been reset in this select operation. updateCount is used to
 351          * tell if a key has been counted as updated in this select
 352          * operation.
 353          *
 354          * me.updateCount <= me.clearedCount <= updateCount
 355          */
 356         private int processFDSet(long updateCount, int[] fds, int rOps,
 357                                  boolean isExceptFds)
 358         {
 359             int numKeysUpdated = 0;
 360             for (int i = 1; i <= fds[0]; i++) {
 361                 int desc = fds[i];
 362                 if (checkWakeup(desc))



 363                     continue;

 364                 MapEntry me = fdMap.get(desc);
 365                 // If me is null, the key was deregistered in the previous
 366                 // processDeregisterQueue.
 367                 if (me == null)
 368                     continue;
 369                 SelectionKeyImpl sk = me.ski;
 370 
 371                 if (isInterestingFileDescriptor(isExceptFds, desc, sk))






 372                     continue;

 373 
 374                 if (selectedKeys.contains(sk)) { // Key in selected set
 375                     if (me.clearedCount != updateCount) {
 376                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
 377                             (me.updateCount != updateCount)) {
 378                             me.updateCount = updateCount;
 379                             numKeysUpdated++;
 380                         }
 381                     } else { // The readyOps have been set; now add
 382                         if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
 383                             (me.updateCount != updateCount)) {
 384                             me.updateCount = updateCount;
 385                             numKeysUpdated++;
 386                         }
 387                     }
 388                     me.clearedCount = updateCount;
 389                 } else { // Key is not in selected set yet
 390                     if (me.clearedCount != updateCount) {
 391                         sk.channel.translateAndSetReadyOps(rOps, sk);
 392                     } else { // The readyOps have been set; now add
 393                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
 394                     }
 395                     if (sk.hasOps()) {
 396                         selectedKeys.add(sk);
 397                         me.updateCount = updateCount;
 398                         numKeysUpdated++;
 399                     }
 400                     me.clearedCount = updateCount;
 401                 }
 402             }
 403             return numKeysUpdated;
 404         }
 405 
 406         private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) {
 407             // The descriptor may be in the exceptfds set because there is
 408             // OOB data queued to the socket. If there is OOB data then it
 409             // is discarded and the key is not added to the selected set.
 410             return isExceptFds &&
 411                 (sk.channel() instanceof SocketChannelImpl) &&
 412                 discardUrgentData(desc);
 413         }
 414 
 415         private int processFDSet(long updateCount, int[] fds, int rOps,
 416                                  boolean isExceptFds, Consumer<SelectionKey> handler)
 417         {
 418             int numKeysUpdated = 0;
 419             for (int i = 1; i <= fds[0]; i++) {
 420                 int desc = fds[i];
 421                 if (checkWakeup(desc))
 422                     continue;
 423                 MapEntry me = fdMap.get(desc);
 424                 // If me is null, the key was deregistered in the previous
 425                 // processDeregisterQueue.
 426                 if (me == null)
 427                     continue;
 428                 SelectionKeyImpl sk = me.ski;
 429 
 430                 if (isInterestingFileDescriptor(isExceptFds, desc, sk))
 431                     continue;
 432 
 433                 if (me.clearedCount != updateCount) {
 434                     sk.channel.translateAndSetReadyOps(rOps, sk);
 435                 } else { // The readyOps have been set; now add
 436                     sk.channel.translateAndUpdateReadyOps(rOps, sk);
 437                 }
 438 
 439                 if (sk.hasOps()) {
 440                     handler.accept(sk);
 441                     me.updateCount = updateCount;
 442                     numKeysUpdated++;
 443                 }

 444                 me.clearedCount = updateCount;
 445             }
 446             return numKeysUpdated;
 447         }
 448 
 449         private boolean checkWakeup(int desc) {
 450             if (desc == wakeupSourceFd) {
 451                 synchronized (interruptLock) {
 452                     interruptTriggered = true;
 453                 }
 454                 return true;
 455             }
 456             return false;
 457         }
 458 
 459         public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) {
 460             int numKeysUpdated = 0;
 461             numKeysUpdated += processFDSet(updateCount, readFds,
 462                                            Net.POLLIN,
 463                                            false,
 464                                            handler);
 465             numKeysUpdated += processFDSet(updateCount, writeFds,
 466                                            Net.POLLCONN |
 467                                            Net.POLLOUT,
 468                                            false,
 469                                            handler);
 470             numKeysUpdated += processFDSet(updateCount, exceptFds,
 471                                            Net.POLLIN |
 472                                            Net.POLLCONN |
 473                                            Net.POLLOUT,
 474                                            true,
 475                                            handler);
 476             return numKeysUpdated;
 477         }
 478     }
 479 
 480     // Represents a helper thread used for select.
 481     private final class SelectThread extends Thread {
 482         private final int index; // index of this thread
 483         final SubSelector subSelector;
 484         private long lastRun = 0; // last run number
 485         private volatile boolean zombie;
 486         // Creates a new thread
 487         private SelectThread(int i) {
 488             this.index = i;
 489             this.subSelector = new SubSelector(i);
 490             //make sure we wait for next round of poll
 491             this.lastRun = startLock.runsCounter;
 492         }
 493         void makeZombie() {
 494             zombie = true;
 495         }
 496         boolean isZombie() {
 497             return zombie;
 498         }
 499         public void run() {
 500             while (true) { // poll loop
 501                 // wait for the start of poll. If this thread has become