1 /*
   2  * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 /*
  27  */
  28 
  29 
  30 package sun.nio.ch;
  31 
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.SelectorProvider;
  34 import java.io.IOException;
  35 import java.util.*;
  36 import java.util.function.Consumer;
  37 
  38 /**
  39  * A multi-threaded implementation of Selector for Windows.
  40  *
  41  * @author Konstantin Kladko
  42  * @author Mark Reinhold
  43  */
  44 
  45 final class WindowsSelectorImpl extends SelectorImpl {
  46     // Initial capacity of the poll array
  47     private final int INIT_CAP = 8;
  48     // Maximum number of sockets for select().
  49     // Should be INIT_CAP times a power of 2
  50     private final static int MAX_SELECTABLE_FDS = 1024;
  51 
  52     // The list of SelectableChannels serviced by this Selector. Every mod
  53     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
  54     // array,  where the corresponding entry is occupied by the wakeupSocket
  55     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  56 
  57     // The global native poll array holds file decriptors and event masks
  58     private PollArrayWrapper pollWrapper;
  59 
  60     // The number of valid entries in  poll array, including entries occupied
  61     // by wakeup socket handle.
  62     private int totalChannels = 1;
  63 
  64     // Number of helper threads needed for select. We need one thread per
  65     // each additional set of MAX_SELECTABLE_FDS - 1 channels.
  66     private int threadsCount = 0;
  67 
  68     // A list of helper threads for select.
  69     private final List<SelectThread> threads = new ArrayList<SelectThread>();
  70 
  71     //Pipe used as a wakeup object.
  72     private final Pipe wakeupPipe;
  73 
  74     // File descriptors corresponding to source and sink
  75     private final int wakeupSourceFd, wakeupSinkFd;
  76 
  77     // Lock for close cleanup
  78     private Object closeLock = new Object();
  79 
  80     // Maps file descriptors to their indices in  pollArray
  81     private final static class FdMap extends HashMap<Integer, MapEntry> {
  82         static final long serialVersionUID = 0L;
  83         private MapEntry get(int desc) {
  84             return get(new Integer(desc));
  85         }
  86         private MapEntry put(SelectionKeyImpl ski) {
  87             return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
  88         }
  89         private MapEntry remove(SelectionKeyImpl ski) {
  90             Integer fd = new Integer(ski.channel.getFDVal());
  91             MapEntry x = get(fd);
  92             if ((x != null) && (x.ski.channel == ski.channel))
  93                 return remove(fd);
  94             return null;
  95         }
  96     }
  97 
  98     // class for fdMap entries
  99     private final static class MapEntry {
 100         SelectionKeyImpl ski;
 101         long updateCount = 0;
 102         long clearedCount = 0;
 103         MapEntry(SelectionKeyImpl ski) {
 104             this.ski = ski;
 105         }
 106     }
 107     private final FdMap fdMap = new FdMap();
 108 
 109     // SubSelector for the main thread
 110     private final SubSelector subSelector = new SubSelector();
 111 
 112     private long timeout; //timeout for poll
 113 
 114     // Lock for interrupt triggering and clearing
 115     private final Object interruptLock = new Object();
 116     private volatile boolean interruptTriggered = false;
 117 
 118     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
 119         super(sp);
 120         pollWrapper = new PollArrayWrapper(INIT_CAP);
 121         wakeupPipe = Pipe.open();
 122         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
 123 
 124         // Disable the Nagle algorithm so that the wakeup is more immediate
 125         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
 126         (sink.sc).socket().setTcpNoDelay(true);
 127         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
 128 
 129         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
 130     }
 131 
 132     protected int doSelect(long timeout) throws IOException {
 133         if (pollSubSelector(timeout))
 134             return 0;
 135 
 136         int updated = updateSelectedKeys();
 137         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 138         resetWakeupSocket();
 139         return updated;
 140     }
 141 
 142     @Override
 143     protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException {
 144         Objects.requireNonNull(handler);
 145         if (pollSubSelector(timeout))
 146             return 0;
 147 
 148         updateCount++;
 149         int numKeysUpdated = 0;
 150         numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler);
 151         for (SelectThread t: threads) {
 152             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler);
 153         }
 154         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 155         resetWakeupSocket();
 156         return numKeysUpdated;
 157     }
 158 
 159     private boolean pollSubSelector(long timeout) throws IOException {
 160         if (channelArray == null)
 161             throw new ClosedSelectorException();
 162         this.timeout = timeout; // set selector timeout
 163         processDeregisterQueue();
 164         if (interruptTriggered) {
 165             resetWakeupSocket();
 166             return true;
 167         }
 168         // Calculate number of helper threads needed for poll. If necessary
 169         // threads are created here and start waiting on startLock
 170         adjustThreadsCount();
 171         finishLock.reset(); // reset finishLock
 172         // Wakeup helper threads, waiting on startLock, so they start polling.
 173         // Redundant threads will exit here after wakeup.
 174         startLock.startThreads();
 175         // do polling in the main thread. Main thread is responsible for
 176         // first MAX_SELECTABLE_FDS entries in pollArray.
 177         try {
 178             begin();
 179             try {
 180                 subSelector.poll();
 181             } catch (IOException e) {
 182                 finishLock.setException(e); // Save this exception
 183             }
 184             // Main thread is out of poll(). Wakeup others and wait for them
 185             if (threads.size() > 0)
 186                 finishLock.waitForHelperThreads();
 187           } finally {
 188               end();
 189           }
 190         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 191         finishLock.checkForException();
 192         processDeregisterQueue();
 193         return false;
 194     }
 195 
 196     // Helper threads wait on this lock for the next poll.
 197     private final StartLock startLock = new StartLock();
 198 
 199     private final class StartLock {
 200         // A variable which distinguishes the current run of doSelect from the
 201         // previous one. Incrementing runsCounter and notifying threads will
 202         // trigger another round of poll.
 203         private long runsCounter;
 204        // Triggers threads, waiting on this lock to start polling.
 205         private synchronized void startThreads() {
 206             runsCounter++; // next run
 207             notifyAll(); // wake up threads.
 208         }
 209         // This function is called by a helper thread to wait for the
 210         // next round of poll(). It also checks, if this thread became
 211         // redundant. If yes, it returns true, notifying the thread
 212         // that it should exit.
 213         private synchronized boolean waitForStart(SelectThread thread) {
 214             while (true) {
 215                 while (runsCounter == thread.lastRun) {
 216                     try {
 217                         startLock.wait();
 218                     } catch (InterruptedException e) {
 219                         Thread.currentThread().interrupt();
 220                     }
 221                 }
 222                 if (thread.isZombie()) { // redundant thread
 223                     return true; // will cause run() to exit.
 224                 } else {
 225                     thread.lastRun = runsCounter; // update lastRun
 226                     return false; //   will cause run() to poll.
 227                 }
 228             }
 229         }
 230     }
 231 
 232     // Main thread waits on this lock, until all helper threads are done
 233     // with poll().
 234     private final FinishLock finishLock = new FinishLock();
 235 
 236     private final class FinishLock  {
 237         // Number of helper threads, that did not finish yet.
 238         private int threadsToFinish;
 239 
 240         // IOException which occurred during the last run.
 241         IOException exception = null;
 242 
 243         // Called before polling.
 244         private void reset() {
 245             threadsToFinish = threads.size(); // helper threads
 246         }
 247 
 248         // Each helper thread invokes this function on finishLock, when
 249         // the thread is done with poll().
 250         private synchronized void threadFinished() {
 251             if (threadsToFinish == threads.size()) { // finished poll() first
 252                 // if finished first, wakeup others
 253                 wakeup();
 254             }
 255             threadsToFinish--;
 256             if (threadsToFinish == 0) // all helper threads finished poll().
 257                 notify();             // notify the main thread
 258         }
 259 
 260         // The main thread invokes this function on finishLock to wait
 261         // for helper threads to finish poll().
 262         private synchronized void waitForHelperThreads() {
 263             if (threadsToFinish == threads.size()) {
 264                 // no helper threads finished yet. Wakeup them up.
 265                 wakeup();
 266             }
 267             while (threadsToFinish != 0) {
 268                 try {
 269                     finishLock.wait();
 270                 } catch (InterruptedException e) {
 271                     // Interrupted - set interrupted state.
 272                     Thread.currentThread().interrupt();
 273                 }
 274             }
 275         }
 276 
 277         // sets IOException for this run
 278         private synchronized void setException(IOException e) {
 279             exception = e;
 280         }
 281 
 282         // Checks if there was any exception during the last run.
 283         // If yes, throws it
 284         private void checkForException() throws IOException {
 285             if (exception == null)
 286                 return;
 287             StringBuffer message =  new StringBuffer("An exception occurred" +
 288                                        " during the execution of select(): \n");
 289             message.append(exception);
 290             message.append('\n');
 291             exception = null;
 292             throw new IOException(message.toString());
 293         }
 294     }
 295 
 296     private final class SubSelector {
 297         private final int pollArrayIndex; // starting index in pollArray to poll
 298         // These arrays will hold result of native select().
 299         // The first element of each array is the number of selected sockets.
 300         // Other elements are file descriptors of selected sockets.
 301         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
 302         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
 303         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
 304 
 305         private SubSelector() {
 306             this.pollArrayIndex = 0; // main thread
 307         }
 308 
 309         private SubSelector(int threadIndex) { // helper threads
 310             this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
 311         }
 312 
 313         private int poll() throws IOException{ // poll for the main thread
 314             return poll0(pollWrapper.pollArrayAddress,
 315                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
 316                          readFds, writeFds, exceptFds, timeout);
 317         }
 318 
 319         private int poll(int index) throws IOException {
 320             // poll for helper threads
 321             return  poll0(pollWrapper.pollArrayAddress +
 322                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
 323                      Math.min(MAX_SELECTABLE_FDS,
 324                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
 325                      readFds, writeFds, exceptFds, timeout);
 326         }
 327 
 328         private native int poll0(long pollAddress, int numfds,
 329              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
 330 
 331         private int processSelectedKeys(long updateCount) {
 332             int numKeysUpdated = 0;
 333             numKeysUpdated += processFDSet(updateCount, readFds,
 334                                            Net.POLLIN,
 335                                            false);
 336             numKeysUpdated += processFDSet(updateCount, writeFds,
 337                                            Net.POLLCONN |
 338                                            Net.POLLOUT,
 339                                            false);
 340             numKeysUpdated += processFDSet(updateCount, exceptFds,
 341                                            Net.POLLIN |
 342                                            Net.POLLCONN |
 343                                            Net.POLLOUT,
 344                                            true);
 345             return numKeysUpdated;
 346         }
 347 
 348         /**
 349          * Note, clearedCount is used to determine if the readyOps have
 350          * been reset in this select operation. updateCount is used to
 351          * tell if a key has been counted as updated in this select
 352          * operation.
 353          *
 354          * me.updateCount <= me.clearedCount <= updateCount
 355          */
 356         private int processFDSet(long updateCount, int[] fds, int rOps,
 357                                  boolean isExceptFds)
 358         {
 359             int numKeysUpdated = 0;
 360             for (int i = 1; i <= fds[0]; i++) {
 361                 int desc = fds[i];
 362                 if (checkWakeup(desc))
 363                     continue;
 364                 MapEntry me = fdMap.get(desc);
 365                 // If me is null, the key was deregistered in the previous
 366                 // processDeregisterQueue.
 367                 if (me == null)
 368                     continue;
 369                 SelectionKeyImpl sk = me.ski;
 370 
 371                 if (isInterestingFileDescriptor(isExceptFds, desc, sk))
 372                     continue;
 373 
 374                 if (selectedKeys.contains(sk)) { // Key in selected set
 375                     if (me.clearedCount != updateCount) {
 376                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
 377                             (me.updateCount != updateCount)) {
 378                             me.updateCount = updateCount;
 379                             numKeysUpdated++;
 380                         }
 381                     } else { // The readyOps have been set; now add
 382                         if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
 383                             (me.updateCount != updateCount)) {
 384                             me.updateCount = updateCount;
 385                             numKeysUpdated++;
 386                         }
 387                     }
 388                     me.clearedCount = updateCount;
 389                 } else { // Key is not in selected set yet
 390                     if (me.clearedCount != updateCount) {
 391                         sk.channel.translateAndSetReadyOps(rOps, sk);
 392                     } else { // The readyOps have been set; now add
 393                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
 394                     }
 395                     if (sk.hasOps()) {
 396                         selectedKeys.add(sk);
 397                         me.updateCount = updateCount;
 398                         numKeysUpdated++;
 399                     }
 400                     me.clearedCount = updateCount;
 401                 }
 402             }
 403             return numKeysUpdated;
 404         }
 405 
 406         private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) {
 407             // The descriptor may be in the exceptfds set because there is
 408             // OOB data queued to the socket. If there is OOB data then it
 409             // is discarded and the key is not added to the selected set.
 410             return isExceptFds &&
 411                 (sk.channel() instanceof SocketChannelImpl) &&
 412                 discardUrgentData(desc);
 413         }
 414 
 415         private int processFDSet(long updateCount, int[] fds, int rOps,
 416                                  boolean isExceptFds, Consumer<SelectionKey> handler)
 417         {
 418             int numKeysUpdated = 0;
 419             for (int i = 1; i <= fds[0]; i++) {
 420                 int desc = fds[i];
 421                 if (checkWakeup(desc))
 422                     continue;
 423                 MapEntry me = fdMap.get(desc);
 424                 // If me is null, the key was deregistered in the previous
 425                 // processDeregisterQueue.
 426                 if (me == null)
 427                     continue;
 428                 SelectionKeyImpl sk = me.ski;
 429 
 430                 if (isInterestingFileDescriptor(isExceptFds, desc, sk))
 431                     continue;
 432 
 433                 if (me.clearedCount != updateCount) {
 434                     sk.channel.translateAndSetReadyOps(rOps, sk);
 435                 } else { // The readyOps have been set; now add
 436                     sk.channel.translateAndUpdateReadyOps(rOps, sk);
 437                 }
 438 
 439                 if (sk.hasOps()) {
 440                     handler.accept(sk);
 441                     me.updateCount = updateCount;
 442                     numKeysUpdated++;
 443                 }
 444                 me.clearedCount = updateCount;
 445             }
 446             return numKeysUpdated;
 447         }
 448 
 449         private boolean checkWakeup(int desc) {
 450             if (desc == wakeupSourceFd) {
 451                 synchronized (interruptLock) {
 452                     interruptTriggered = true;
 453                 }
 454                 return true;
 455             }
 456             return false;
 457         }
 458 
 459         public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) {
 460             int numKeysUpdated = 0;
 461             numKeysUpdated += processFDSet(updateCount, readFds,
 462                                            Net.POLLIN,
 463                                            false,
 464                                            handler);
 465             numKeysUpdated += processFDSet(updateCount, writeFds,
 466                                            Net.POLLCONN |
 467                                            Net.POLLOUT,
 468                                            false,
 469                                            handler);
 470             numKeysUpdated += processFDSet(updateCount, exceptFds,
 471                                            Net.POLLIN |
 472                                            Net.POLLCONN |
 473                                            Net.POLLOUT,
 474                                            true,
 475                                            handler);
 476             return numKeysUpdated;
 477         }
 478     }
 479 
 480     // Represents a helper thread used for select.
 481     private final class SelectThread extends Thread {
 482         private final int index; // index of this thread
 483         final SubSelector subSelector;
 484         private long lastRun = 0; // last run number
 485         private volatile boolean zombie;
 486         // Creates a new thread
 487         private SelectThread(int i) {
 488             this.index = i;
 489             this.subSelector = new SubSelector(i);
 490             //make sure we wait for next round of poll
 491             this.lastRun = startLock.runsCounter;
 492         }
 493         void makeZombie() {
 494             zombie = true;
 495         }
 496         boolean isZombie() {
 497             return zombie;
 498         }
 499         public void run() {
 500             while (true) { // poll loop
 501                 // wait for the start of poll. If this thread has become
 502                 // redundant, then exit.
 503                 if (startLock.waitForStart(this))
 504                     return;
 505                 // call poll()
 506                 try {
 507                     subSelector.poll(index);
 508                 } catch (IOException e) {
 509                     // Save this exception and let other threads finish.
 510                     finishLock.setException(e);
 511                 }
 512                 // notify main thread, that this thread has finished, and
 513                 // wakeup others, if this thread is the first to finish.
 514                 finishLock.threadFinished();
 515             }
 516         }
 517     }
 518 
 519     // After some channels registered/deregistered, the number of required
 520     // helper threads may have changed. Adjust this number.
 521     private void adjustThreadsCount() {
 522         if (threadsCount > threads.size()) {
 523             // More threads needed. Start more threads.
 524             for (int i = threads.size(); i < threadsCount; i++) {
 525                 SelectThread newThread = new SelectThread(i);
 526                 threads.add(newThread);
 527                 newThread.setDaemon(true);
 528                 newThread.start();
 529             }
 530         } else if (threadsCount < threads.size()) {
 531             // Some threads become redundant. Remove them from the threads List.
 532             for (int i = threads.size() - 1 ; i >= threadsCount; i--)
 533                 threads.remove(i).makeZombie();
 534         }
 535     }
 536 
 537     // Sets Windows wakeup socket to a signaled state.
 538     private void setWakeupSocket() {
 539         setWakeupSocket0(wakeupSinkFd);
 540     }
 541     private native void setWakeupSocket0(int wakeupSinkFd);
 542 
 543     // Sets Windows wakeup socket to a non-signaled state.
 544     private void resetWakeupSocket() {
 545         synchronized (interruptLock) {
 546             if (interruptTriggered == false)
 547                 return;
 548             resetWakeupSocket0(wakeupSourceFd);
 549             interruptTriggered = false;
 550         }
 551     }
 552 
 553     private native void resetWakeupSocket0(int wakeupSourceFd);
 554 
 555     private native boolean discardUrgentData(int fd);
 556 
 557     // We increment this counter on each call to updateSelectedKeys()
 558     // each entry in  SubSelector.fdsMap has a memorized value of
 559     // updateCount. When we increment numKeysUpdated we set updateCount
 560     // for the corresponding entry to its current value. This is used to
 561     // avoid counting the same key more than once - the same key can
 562     // appear in readfds and writefds.
 563     private long updateCount = 0;
 564 
 565     // Update ops of the corresponding Channels. Add the ready keys to the
 566     // ready queue.
 567     private int updateSelectedKeys() {
 568         updateCount++;
 569         int numKeysUpdated = 0;
 570         numKeysUpdated += subSelector.processSelectedKeys(updateCount);
 571         for (SelectThread t: threads) {
 572             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
 573         }
 574         return numKeysUpdated;
 575     }
 576 
 577     protected void implClose() throws IOException {
 578         synchronized (closeLock) {
 579             if (channelArray != null) {
 580                 if (pollWrapper != null) {
 581                     // prevent further wakeup
 582                     synchronized (interruptLock) {
 583                         interruptTriggered = true;
 584                     }
 585                     wakeupPipe.sink().close();
 586                     wakeupPipe.source().close();
 587                     for(int i = 1; i < totalChannels; i++) { // Deregister channels
 588                         if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
 589                             deregister(channelArray[i]);
 590                             SelectableChannel selch = channelArray[i].channel();
 591                             if (!selch.isOpen() && !selch.isRegistered())
 592                                 ((SelChImpl)selch).kill();
 593                         }
 594                     }
 595                     pollWrapper.free();
 596                     pollWrapper = null;
 597                     selectedKeys = null;
 598                     channelArray = null;
 599                     // Make all remaining helper threads exit
 600                     for (SelectThread t: threads)
 601                          t.makeZombie();
 602                     startLock.startThreads();
 603                 }
 604             }
 605         }
 606     }
 607 
 608     protected void implRegister(SelectionKeyImpl ski) {
 609         synchronized (closeLock) {
 610             if (pollWrapper == null)
 611                 throw new ClosedSelectorException();
 612             growIfNeeded();
 613             channelArray[totalChannels] = ski;
 614             ski.setIndex(totalChannels);
 615             fdMap.put(ski);
 616             keys.add(ski);
 617             pollWrapper.addEntry(totalChannels, ski);
 618             totalChannels++;
 619         }
 620     }
 621 
 622     private void growIfNeeded() {
 623         if (channelArray.length == totalChannels) {
 624             int newSize = totalChannels * 2; // Make a larger array
 625             SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
 626             System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
 627             channelArray = temp;
 628             pollWrapper.grow(newSize);
 629         }
 630         if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
 631             pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
 632             totalChannels++;
 633             threadsCount++;
 634         }
 635     }
 636 
 637     protected void implDereg(SelectionKeyImpl ski) throws IOException{
 638         int i = ski.getIndex();
 639         assert (i >= 0);
 640         synchronized (closeLock) {
 641             if (i != totalChannels - 1) {
 642                 // Copy end one over it
 643                 SelectionKeyImpl endChannel = channelArray[totalChannels-1];
 644                 channelArray[i] = endChannel;
 645                 endChannel.setIndex(i);
 646                 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
 647                                                                 pollWrapper, i);
 648             }
 649             ski.setIndex(-1);
 650         }
 651         channelArray[totalChannels - 1] = null;
 652         totalChannels--;
 653         if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
 654             totalChannels--;
 655             threadsCount--; // The last thread has become redundant.
 656         }
 657         fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
 658         keys.remove(ski);
 659         selectedKeys.remove(ski);
 660         deregister(ski);
 661         SelectableChannel selch = ski.channel();
 662         if (!selch.isOpen() && !selch.isRegistered())
 663             ((SelChImpl)selch).kill();
 664     }
 665 
 666     public void putEventOps(SelectionKeyImpl sk, int ops) {
 667         synchronized (closeLock) {
 668             if (pollWrapper == null)
 669                 throw new ClosedSelectorException();
 670             // make sure this sk has not been removed yet
 671             int index = sk.getIndex();
 672             if (index == -1)
 673                 throw new CancelledKeyException();
 674             pollWrapper.putEventOps(index, ops);
 675         }
 676     }
 677 
 678     public Selector wakeup() {
 679         synchronized (interruptLock) {
 680             if (!interruptTriggered) {
 681                 setWakeupSocket();
 682                 interruptTriggered = true;
 683             }
 684         }
 685         return this;
 686     }
 687 
 688     static {
 689         IOUtil.load();
 690     }
 691 }