1 /*
   2  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 /*
  27  */
  28 
  29 
  30 package sun.nio.ch;
  31 
  32 import java.nio.channels.spi.SelectorProvider;
  33 import java.nio.channels.Selector;
  34 import java.nio.channels.ClosedSelectorException;
  35 import java.nio.channels.Pipe;
  36 import java.nio.channels.SelectableChannel;
  37 import java.io.IOException;
  38 import java.util.List;
  39 import java.util.ArrayList;
  40 import java.util.HashMap;
  41 import java.util.Iterator;
  42 
  43 /**
  44  * A multi-threaded implementation of Selector for Windows.
  45  *
  46  * @author Konstantin Kladko
  47  * @author Mark Reinhold
  48  */
  49 
  50 final class WindowsSelectorImpl extends SelectorImpl {
  51     // Initial capacity of the poll array
  52     private final int INIT_CAP = 8;
  53     // Maximum number of sockets for select().
  54     // Should be INIT_CAP times a power of 2
  55     private final static int MAX_SELECTABLE_FDS = 1024;
  56 
  57     // The list of SelectableChannels serviced by this Selector. Every mod
  58     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
  59     // array,  where the corresponding entry is occupied by the wakeupSocket
  60     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  61 
  62     // The global native poll array holds file decriptors and event masks
  63     private PollArrayWrapper pollWrapper;
  64 
  65     // The number of valid entries in  poll array, including entries occupied
  66     // by wakeup socket handle.
  67     private int totalChannels = 1;
  68 
  69     // Number of helper threads needed for select. We need one thread per
  70     // each additional set of MAX_SELECTABLE_FDS - 1 channels.
  71     private int threadsCount = 0;
  72 
  73     // A list of helper threads for select.
  74     private final List<SelectThread> threads = new ArrayList<SelectThread>();
  75 
  76     //Pipe used as a wakeup object.
  77     private final Pipe wakeupPipe;
  78 
  79     // File descriptors corresponding to source and sink
  80     private final int wakeupSourceFd, wakeupSinkFd;
  81 
  82     // Lock for close cleanup
  83     private Object closeLock = new Object();
  84 
  85     // Maps file descriptors to their indices in  pollArray
  86     private final static class FdMap extends HashMap<Integer, MapEntry> {
  87         static final long serialVersionUID = 0L;
  88         private MapEntry get(int desc) {
  89             return get(new Integer(desc));
  90         }
  91         private MapEntry put(SelectionKeyImpl ski) {
  92             return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
  93         }
  94         private MapEntry remove(SelectionKeyImpl ski) {
  95             Integer fd = new Integer(ski.channel.getFDVal());
  96             MapEntry x = get(fd);
  97             if ((x != null) && (x.ski.channel == ski.channel))
  98                 return remove(fd);
  99             return null;
 100         }
 101     }
 102 
 103     // class for fdMap entries
 104     private final static class MapEntry {
 105         SelectionKeyImpl ski;
 106         long updateCount = 0;
 107         long clearedCount = 0;
 108         MapEntry(SelectionKeyImpl ski) {
 109             this.ski = ski;
 110         }
 111     }
 112     private final FdMap fdMap = new FdMap();
 113 
 114     // SubSelector for the main thread
 115     private final SubSelector subSelector = new SubSelector();
 116 
 117     private long timeout; //timeout for poll
 118 
 119     // Lock for interrupt triggering and clearing
 120     private final Object interruptLock = new Object();
 121     private volatile boolean interruptTriggered = false;
 122 
 123     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
 124         super(sp);
 125         pollWrapper = new PollArrayWrapper(INIT_CAP);
 126         wakeupPipe = Pipe.open();
 127         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
 128 
 129         // Disable the Nagle algorithm so that the wakeup is more immediate
 130         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
 131         (sink.sc).socket().setTcpNoDelay(true);
 132         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
 133 
 134         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
 135     }
 136 
 137     protected int doSelect(long timeout) throws IOException {
 138         if (channelArray == null)
 139             throw new ClosedSelectorException();
 140         this.timeout = timeout; // set selector timeout
 141         processDeregisterQueue();
 142         if (interruptTriggered) {
 143             resetWakeupSocket();
 144             return 0;
 145         }
 146         // Calculate number of helper threads needed for poll. If necessary
 147         // threads are created here and start waiting on startLock
 148         adjustThreadsCount();
 149         finishLock.reset(); // reset finishLock
 150         // Wakeup helper threads, waiting on startLock, so they start polling.
 151         // Redundant threads will exit here after wakeup.
 152         startLock.startThreads();
 153         // do polling in the main thread. Main thread is responsible for
 154         // first MAX_SELECTABLE_FDS entries in pollArray.
 155         try {
 156             begin();
 157             try {
 158                 subSelector.poll();
 159             } catch (IOException e) {
 160                 finishLock.setException(e); // Save this exception
 161             }
 162             // Main thread is out of poll(). Wakeup others and wait for them
 163             if (threads.size() > 0)
 164                 finishLock.waitForHelperThreads();
 165           } finally {
 166               end();
 167           }
 168         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 169         finishLock.checkForException();
 170         processDeregisterQueue();
 171         int updated = updateSelectedKeys();
 172         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 173         resetWakeupSocket();
 174         return updated;
 175     }
 176 
 177     // Helper threads wait on this lock for the next poll.
 178     private final StartLock startLock = new StartLock();
 179 
 180     private final class StartLock {
 181         // A variable which distinguishes the current run of doSelect from the
 182         // previous one. Incrementing runsCounter and notifying threads will
 183         // trigger another round of poll.
 184         private long runsCounter;
 185        // Triggers threads, waiting on this lock to start polling.
 186         private synchronized void startThreads() {
 187             runsCounter++; // next run
 188             notifyAll(); // wake up threads.
 189         }
 190         // This function is called by a helper thread to wait for the
 191         // next round of poll(). It also checks, if this thread became
 192         // redundant. If yes, it returns true, notifying the thread
 193         // that it should exit.
 194         private synchronized boolean waitForStart(SelectThread thread) {
 195             while (true) {
 196                 while (runsCounter == thread.lastRun) {
 197                     try {
 198                         startLock.wait();
 199                     } catch (InterruptedException e) {
 200                         Thread.currentThread().interrupt();
 201                     }
 202                 }
 203                 if (thread.isZombie()) { // redundant thread
 204                     return true; // will cause run() to exit.
 205                 } else {
 206                     thread.lastRun = runsCounter; // update lastRun
 207                     return false; //   will cause run() to poll.
 208                 }
 209             }
 210         }
 211     }
 212 
 213     // Main thread waits on this lock, until all helper threads are done
 214     // with poll().
 215     private final FinishLock finishLock = new FinishLock();
 216 
 217     private final class FinishLock  {
 218         // Number of helper threads, that did not finish yet.
 219         private int threadsToFinish;
 220 
 221         // IOException which occured during the last run.
 222         IOException exception = null;
 223 
 224         // Called before polling.
 225         private void reset() {
 226             threadsToFinish = threads.size(); // helper threads
 227         }
 228 
 229         // Each helper thread invokes this function on finishLock, when
 230         // the thread is done with poll().
 231         private synchronized void threadFinished() {
 232             if (threadsToFinish == threads.size()) { // finished poll() first
 233                 // if finished first, wakeup others
 234                 wakeup();
 235             }
 236             threadsToFinish--;
 237             if (threadsToFinish == 0) // all helper threads finished poll().
 238                 notify();             // notify the main thread
 239         }
 240 
 241         // The main thread invokes this function on finishLock to wait
 242         // for helper threads to finish poll().
 243         private synchronized void waitForHelperThreads() {
 244             if (threadsToFinish == threads.size()) {
 245                 // no helper threads finished yet. Wakeup them up.
 246                 wakeup();
 247             }
 248             while (threadsToFinish != 0) {
 249                 try {
 250                     finishLock.wait();
 251                 } catch (InterruptedException e) {
 252                     // Interrupted - set interrupted state.
 253                     Thread.currentThread().interrupt();
 254                 }
 255             }
 256         }
 257 
 258         // sets IOException for this run
 259         private synchronized void setException(IOException e) {
 260             exception = e;
 261         }
 262 
 263         // Checks if there was any exception during the last run.
 264         // If yes, throws it
 265         private void checkForException() throws IOException {
 266             if (exception == null)
 267                 return;
 268             StringBuffer message =  new StringBuffer("An exception occured" +
 269                                        " during the execution of select(): \n");
 270             message.append(exception);
 271             message.append('\n');
 272             exception = null;
 273             throw new IOException(message.toString());
 274         }
 275     }
 276 
 277     private final class SubSelector {
 278         private final int pollArrayIndex; // starting index in pollArray to poll
 279         // These arrays will hold result of native select().
 280         // The first element of each array is the number of selected sockets.
 281         // Other elements are file descriptors of selected sockets.
 282         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
 283         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
 284         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
 285 
 286         private SubSelector() {
 287             this.pollArrayIndex = 0; // main thread
 288         }
 289 
 290         private SubSelector(int threadIndex) { // helper threads
 291             this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
 292         }
 293 
 294         private int poll() throws IOException{ // poll for the main thread
 295             return poll0(pollWrapper.pollArrayAddress,
 296                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
 297                          readFds, writeFds, exceptFds, timeout);
 298         }
 299 
 300         private int poll(int index) throws IOException {
 301             // poll for helper threads
 302             return  poll0(pollWrapper.pollArrayAddress +
 303                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
 304                      Math.min(MAX_SELECTABLE_FDS,
 305                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
 306                      readFds, writeFds, exceptFds, timeout);
 307         }
 308 
 309         private native int poll0(long pollAddress, int numfds,
 310              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
 311 
 312         private int processSelectedKeys(long updateCount) {
 313             int numKeysUpdated = 0;
 314             numKeysUpdated += processFDSet(updateCount, readFds,
 315                                            PollArrayWrapper.POLLIN,
 316                                            false);
 317             numKeysUpdated += processFDSet(updateCount, writeFds,
 318                                            PollArrayWrapper.POLLCONN |
 319                                            PollArrayWrapper.POLLOUT,
 320                                            false);
 321             numKeysUpdated += processFDSet(updateCount, exceptFds,
 322                                            PollArrayWrapper.POLLIN |
 323                                            PollArrayWrapper.POLLCONN |
 324                                            PollArrayWrapper.POLLOUT,
 325                                            true);
 326             return numKeysUpdated;
 327         }
 328 
 329         /**
 330          * Note, clearedCount is used to determine if the readyOps have
 331          * been reset in this select operation. updateCount is used to
 332          * tell if a key has been counted as updated in this select
 333          * operation.
 334          *
 335          * me.updateCount <= me.clearedCount <= updateCount
 336          */
 337         private int processFDSet(long updateCount, int[] fds, int rOps,
 338                                  boolean isExceptFds)
 339         {
 340             int numKeysUpdated = 0;
 341             for (int i = 1; i <= fds[0]; i++) {
 342                 int desc = fds[i];
 343                 if (desc == wakeupSourceFd) {
 344                     synchronized (interruptLock) {
 345                         interruptTriggered = true;
 346                     }
 347                     continue;
 348                 }
 349                 MapEntry me = fdMap.get(desc);
 350                 // If me is null, the key was deregistered in the previous
 351                 // processDeregisterQueue.
 352                 if (me == null)
 353                     continue;
 354                 SelectionKeyImpl sk = me.ski;
 355 
 356                 // The descriptor may be in the exceptfds set because there is
 357                 // OOB data queued to the socket. If there is OOB data then it
 358                 // is discarded and the key is not added to the selected set.
 359                 if (isExceptFds &&
 360                     (sk.channel() instanceof SocketChannelImpl) &&
 361                     discardUrgentData(desc))
 362                 {
 363                     continue;
 364                 }
 365 
 366                 if (selectedKeys.contains(sk)) { // Key in selected set
 367                     if (me.clearedCount != updateCount) {
 368                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
 369                             (me.updateCount != updateCount)) {
 370                             me.updateCount = updateCount;
 371                             numKeysUpdated++;
 372                         }
 373                     } else { // The readyOps have been set; now add
 374                         if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
 375                             (me.updateCount != updateCount)) {
 376                             me.updateCount = updateCount;
 377                             numKeysUpdated++;
 378                         }
 379                     }
 380                     me.clearedCount = updateCount;
 381                 } else { // Key is not in selected set yet
 382                     if (me.clearedCount != updateCount) {
 383                         sk.channel.translateAndSetReadyOps(rOps, sk);
 384                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
 385                             selectedKeys.add(sk);
 386                             me.updateCount = updateCount;
 387                             numKeysUpdated++;
 388                         }
 389                     } else { // The readyOps have been set; now add
 390                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
 391                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
 392                             selectedKeys.add(sk);
 393                             me.updateCount = updateCount;
 394                             numKeysUpdated++;
 395                         }
 396                     }
 397                     me.clearedCount = updateCount;
 398                 }
 399             }
 400             return numKeysUpdated;
 401         }
 402     }
 403 
 404     // Represents a helper thread used for select.
 405     private final class SelectThread extends Thread {
 406         private final int index; // index of this thread
 407         final SubSelector subSelector;
 408         private long lastRun = 0; // last run number
 409         private volatile boolean zombie;
 410         // Creates a new thread
 411         private SelectThread(int i) {
 412             this.index = i;
 413             this.subSelector = new SubSelector(i);
 414             //make sure we wait for next round of poll
 415             this.lastRun = startLock.runsCounter;
 416         }
 417         void makeZombie() {
 418             zombie = true;
 419         }
 420         boolean isZombie() {
 421             return zombie;
 422         }
 423         public void run() {
 424             while (true) { // poll loop
 425                 // wait for the start of poll. If this thread has become
 426                 // redundant, then exit.
 427                 if (startLock.waitForStart(this))
 428                     return;
 429                 // call poll()
 430                 try {
 431                     subSelector.poll(index);
 432                 } catch (IOException e) {
 433                     // Save this exception and let other threads finish.
 434                     finishLock.setException(e);
 435                 }
 436                 // notify main thread, that this thread has finished, and
 437                 // wakeup others, if this thread is the first to finish.
 438                 finishLock.threadFinished();
 439             }
 440         }
 441     }
 442 
 443     // After some channels registered/deregistered, the number of required
 444     // helper threads may have changed. Adjust this number.
 445     private void adjustThreadsCount() {
 446         if (threadsCount > threads.size()) {
 447             // More threads needed. Start more threads.
 448             for (int i = threads.size(); i < threadsCount; i++) {
 449                 SelectThread newThread = new SelectThread(i);
 450                 threads.add(newThread);
 451                 newThread.setDaemon(true);
 452                 newThread.start();
 453             }
 454         } else if (threadsCount < threads.size()) {
 455             // Some threads become redundant. Remove them from the threads List.
 456             for (int i = threads.size() - 1 ; i >= threadsCount; i--)
 457                 threads.remove(i).makeZombie();
 458         }
 459     }
 460 
 461     // Sets Windows wakeup socket to a signaled state.
 462     private void setWakeupSocket() {
 463         setWakeupSocket0(wakeupSinkFd);
 464     }
 465     private native void setWakeupSocket0(int wakeupSinkFd);
 466 
 467     // Sets Windows wakeup socket to a non-signaled state.
 468     private void resetWakeupSocket() {
 469         synchronized (interruptLock) {
 470             if (interruptTriggered == false)
 471                 return;
 472             resetWakeupSocket0(wakeupSourceFd);
 473             interruptTriggered = false;
 474         }
 475     }
 476 
 477     private native void resetWakeupSocket0(int wakeupSourceFd);
 478 
 479     private native boolean discardUrgentData(int fd);
 480 
 481     // We increment this counter on each call to updateSelectedKeys()
 482     // each entry in  SubSelector.fdsMap has a memorized value of
 483     // updateCount. When we increment numKeysUpdated we set updateCount
 484     // for the corresponding entry to its current value. This is used to
 485     // avoid counting the same key more than once - the same key can
 486     // appear in readfds and writefds.
 487     private long updateCount = 0;
 488 
 489     // Update ops of the corresponding Channels. Add the ready keys to the
 490     // ready queue.
 491     private int updateSelectedKeys() {
 492         updateCount++;
 493         int numKeysUpdated = 0;
 494         numKeysUpdated += subSelector.processSelectedKeys(updateCount);
 495         for (SelectThread t: threads) {
 496             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
 497         }
 498         return numKeysUpdated;
 499     }
 500 
 501     protected void implClose() throws IOException {
 502         synchronized (closeLock) {
 503             if (channelArray != null) {
 504                 if (pollWrapper != null) {
 505                     // prevent further wakeup
 506                     synchronized (interruptLock) {
 507                         interruptTriggered = true;
 508                     }
 509                     wakeupPipe.sink().close();
 510                     wakeupPipe.source().close();
 511                     for(int i = 1; i < totalChannels; i++) { // Deregister channels
 512                         if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
 513                             deregister(channelArray[i]);
 514                             SelectableChannel selch = channelArray[i].channel();
 515                             if (!selch.isOpen() && !selch.isRegistered())
 516                                 ((SelChImpl)selch).kill();
 517                         }
 518                     }
 519                     pollWrapper.free();
 520                     pollWrapper = null;
 521                     selectedKeys = null;
 522                     channelArray = null;
 523                     // Make all remaining helper threads exit
 524                     for (SelectThread t: threads)
 525                          t.makeZombie();
 526                     startLock.startThreads();
 527                 }
 528             }
 529         }
 530     }
 531 
 532     protected void implRegister(SelectionKeyImpl ski) {
 533         synchronized (closeLock) {
 534             if (pollWrapper == null)
 535                 throw new ClosedSelectorException();
 536             growIfNeeded();
 537             channelArray[totalChannels] = ski;
 538             ski.setIndex(totalChannels);
 539             fdMap.put(ski);
 540             keys.add(ski);
 541             pollWrapper.addEntry(totalChannels, ski);
 542             totalChannels++;
 543         }
 544     }
 545 
 546     private void growIfNeeded() {
 547         if (channelArray.length == totalChannels) {
 548             int newSize = totalChannels * 2; // Make a larger array
 549             SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
 550             System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
 551             channelArray = temp;
 552             pollWrapper.grow(newSize);
 553         }
 554         if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
 555             pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
 556             totalChannels++;
 557             threadsCount++;
 558         }
 559     }
 560 
 561     protected void implDereg(SelectionKeyImpl ski) throws IOException{
 562         int i = ski.getIndex();
 563         assert (i >= 0);
 564         if (i != totalChannels - 1) {
 565             // Copy end one over it
 566             SelectionKeyImpl endChannel = channelArray[totalChannels-1];
 567             channelArray[i] = endChannel;
 568             endChannel.setIndex(i);
 569             pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
 570                                                                 pollWrapper, i);
 571         }
 572         channelArray[totalChannels - 1] = null;
 573         totalChannels--;
 574         ski.setIndex(-1);
 575         if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
 576             totalChannels--;
 577             threadsCount--; // The last thread has become redundant.
 578         }
 579         fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
 580         keys.remove(ski);
 581         selectedKeys.remove(ski);
 582         deregister(ski);
 583         SelectableChannel selch = ski.channel();
 584         if (!selch.isOpen() && !selch.isRegistered())
 585             ((SelChImpl)selch).kill();
 586     }
 587 
 588     public void putEventOps(SelectionKeyImpl sk, int ops) {
 589         synchronized (closeLock) {
 590             if (pollWrapper == null)
 591                 throw new ClosedSelectorException();
 592             pollWrapper.putEventOps(sk.getIndex(), ops);
 593         }
 594     }
 595 
 596     public Selector wakeup() {
 597         synchronized (interruptLock) {
 598             if (!interruptTriggered) {
 599                 setWakeupSocket();
 600                 interruptTriggered = true;
 601             }
 602         }
 603         return this;
 604     }
 605 
 606     static {
 607         Util.load();
 608     }
 609 }