1 /*
   2  * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 /*
  27  */
  28 
  29 
  30 package sun.nio.ch;
  31 
  32 import java.nio.channels.spi.SelectorProvider;
  33 import java.nio.channels.Selector;
  34 import java.nio.channels.ClosedSelectorException;
  35 import java.nio.channels.Pipe;
  36 import java.nio.channels.SelectableChannel;
  37 import java.io.IOException;
  38 import java.nio.channels.CancelledKeyException;
  39 import java.util.List;
  40 import java.util.ArrayList;
  41 import java.util.HashMap;
  42 import java.util.Iterator;
  43 import sun.misc.ManagedLocalsThread;
  44 
  45 /**
  46  * A multi-threaded implementation of Selector for Windows.
  47  *
  48  * @author Konstantin Kladko
  49  * @author Mark Reinhold
  50  */
  51 
  52 final class WindowsSelectorImpl extends SelectorImpl {
  53     // Initial capacity of the poll array
  54     private final int INIT_CAP = 8;
  55     // Maximum number of sockets for select().
  56     // Should be INIT_CAP times a power of 2
  57     private static final int MAX_SELECTABLE_FDS = 1024;
  58 
  59     // The list of SelectableChannels serviced by this Selector. Every mod
  60     // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
  61     // array,  where the corresponding entry is occupied by the wakeupSocket
  62     private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  63 
  64     // The global native poll array holds file decriptors and event masks
  65     private PollArrayWrapper pollWrapper;
  66 
  67     // The number of valid entries in  poll array, including entries occupied
  68     // by wakeup socket handle.
  69     private int totalChannels = 1;
  70 
  71     // Number of helper threads needed for select. We need one thread per
  72     // each additional set of MAX_SELECTABLE_FDS - 1 channels.
  73     private int threadsCount = 0;
  74 
  75     // A list of helper threads for select.
  76     private final List<SelectThread> threads = new ArrayList<SelectThread>();
  77 
  78     //Pipe used as a wakeup object.
  79     private final Pipe wakeupPipe;
  80 
  81     // File descriptors corresponding to source and sink
  82     private final int wakeupSourceFd, wakeupSinkFd;
  83 
  84     // Lock for close cleanup
  85     private Object closeLock = new Object();
  86 
  87     // Maps file descriptors to their indices in  pollArray
  88     private static final class FdMap extends HashMap<Integer, MapEntry> {
  89         static final long serialVersionUID = 0L;
  90         private MapEntry get(int desc) {
  91             return get(new Integer(desc));
  92         }
  93         private MapEntry put(SelectionKeyImpl ski) {
  94             return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
  95         }
  96         private MapEntry remove(SelectionKeyImpl ski) {
  97             Integer fd = new Integer(ski.channel.getFDVal());
  98             MapEntry x = get(fd);
  99             if ((x != null) && (x.ski.channel == ski.channel))
 100                 return remove(fd);
 101             return null;
 102         }
 103     }
 104 
 105     // class for fdMap entries
 106     private static final class MapEntry {
 107         SelectionKeyImpl ski;
 108         long updateCount = 0;
 109         long clearedCount = 0;
 110         MapEntry(SelectionKeyImpl ski) {
 111             this.ski = ski;
 112         }
 113     }
 114     private final FdMap fdMap = new FdMap();
 115 
 116     // SubSelector for the main thread
 117     private final SubSelector subSelector = new SubSelector();
 118 
 119     private long timeout; //timeout for poll
 120 
 121     // Lock for interrupt triggering and clearing
 122     private final Object interruptLock = new Object();
 123     private volatile boolean interruptTriggered = false;
 124 
 125     WindowsSelectorImpl(SelectorProvider sp) throws IOException {
 126         super(sp);
 127         pollWrapper = new PollArrayWrapper(INIT_CAP);
 128         wakeupPipe = Pipe.open();
 129         wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
 130 
 131         // Disable the Nagle algorithm so that the wakeup is more immediate
 132         SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
 133         (sink.sc).socket().setTcpNoDelay(true);
 134         wakeupSinkFd = ((SelChImpl)sink).getFDVal();
 135 
 136         pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
 137     }
 138 
 139     protected int doSelect(long timeout) throws IOException {
 140         if (channelArray == null)
 141             throw new ClosedSelectorException();
 142         this.timeout = timeout; // set selector timeout
 143         processDeregisterQueue();
 144         if (interruptTriggered) {
 145             resetWakeupSocket();
 146             return 0;
 147         }
 148         // Calculate number of helper threads needed for poll. If necessary
 149         // threads are created here and start waiting on startLock
 150         adjustThreadsCount();
 151         finishLock.reset(); // reset finishLock
 152         // Wakeup helper threads, waiting on startLock, so they start polling.
 153         // Redundant threads will exit here after wakeup.
 154         startLock.startThreads();
 155         // do polling in the main thread. Main thread is responsible for
 156         // first MAX_SELECTABLE_FDS entries in pollArray.
 157         try {
 158             begin();
 159             try {
 160                 subSelector.poll();
 161             } catch (IOException e) {
 162                 finishLock.setException(e); // Save this exception
 163             }
 164             // Main thread is out of poll(). Wakeup others and wait for them
 165             if (threads.size() > 0)
 166                 finishLock.waitForHelperThreads();
 167           } finally {
 168               end();
 169           }
 170         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 171         finishLock.checkForException();
 172         processDeregisterQueue();
 173         int updated = updateSelectedKeys();
 174         // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
 175         resetWakeupSocket();
 176         return updated;
 177     }
 178 
 179     // Helper threads wait on this lock for the next poll.
 180     private final StartLock startLock = new StartLock();
 181 
 182     private final class StartLock {
 183         // A variable which distinguishes the current run of doSelect from the
 184         // previous one. Incrementing runsCounter and notifying threads will
 185         // trigger another round of poll.
 186         private long runsCounter;
 187        // Triggers threads, waiting on this lock to start polling.
 188         private synchronized void startThreads() {
 189             runsCounter++; // next run
 190             notifyAll(); // wake up threads.
 191         }
 192         // This function is called by a helper thread to wait for the
 193         // next round of poll(). It also checks, if this thread became
 194         // redundant. If yes, it returns true, notifying the thread
 195         // that it should exit.
 196         private synchronized boolean waitForStart(SelectThread thread) {
 197             while (true) {
 198                 while (runsCounter == thread.lastRun) {
 199                     try {
 200                         startLock.wait();
 201                     } catch (InterruptedException e) {
 202                         Thread.currentThread().interrupt();
 203                     }
 204                 }
 205                 if (thread.isZombie()) { // redundant thread
 206                     return true; // will cause run() to exit.
 207                 } else {
 208                     thread.lastRun = runsCounter; // update lastRun
 209                     return false; //   will cause run() to poll.
 210                 }
 211             }
 212         }
 213     }
 214 
 215     // Main thread waits on this lock, until all helper threads are done
 216     // with poll().
 217     private final FinishLock finishLock = new FinishLock();
 218 
 219     private final class FinishLock  {
 220         // Number of helper threads, that did not finish yet.
 221         private int threadsToFinish;
 222 
 223         // IOException which occurred during the last run.
 224         IOException exception = null;
 225 
 226         // Called before polling.
 227         private void reset() {
 228             threadsToFinish = threads.size(); // helper threads
 229         }
 230 
 231         // Each helper thread invokes this function on finishLock, when
 232         // the thread is done with poll().
 233         private synchronized void threadFinished() {
 234             if (threadsToFinish == threads.size()) { // finished poll() first
 235                 // if finished first, wakeup others
 236                 wakeup();
 237             }
 238             threadsToFinish--;
 239             if (threadsToFinish == 0) // all helper threads finished poll().
 240                 notify();             // notify the main thread
 241         }
 242 
 243         // The main thread invokes this function on finishLock to wait
 244         // for helper threads to finish poll().
 245         private synchronized void waitForHelperThreads() {
 246             if (threadsToFinish == threads.size()) {
 247                 // no helper threads finished yet. Wakeup them up.
 248                 wakeup();
 249             }
 250             while (threadsToFinish != 0) {
 251                 try {
 252                     finishLock.wait();
 253                 } catch (InterruptedException e) {
 254                     // Interrupted - set interrupted state.
 255                     Thread.currentThread().interrupt();
 256                 }
 257             }
 258         }
 259 
 260         // sets IOException for this run
 261         private synchronized void setException(IOException e) {
 262             exception = e;
 263         }
 264 
 265         // Checks if there was any exception during the last run.
 266         // If yes, throws it
 267         private void checkForException() throws IOException {
 268             if (exception == null)
 269                 return;
 270             StringBuffer message =  new StringBuffer("An exception occurred" +
 271                                        " during the execution of select(): \n");
 272             message.append(exception);
 273             message.append('\n');
 274             exception = null;
 275             throw new IOException(message.toString());
 276         }
 277     }
 278 
 279     private final class SubSelector {
 280         private final int pollArrayIndex; // starting index in pollArray to poll
 281         // These arrays will hold result of native select().
 282         // The first element of each array is the number of selected sockets.
 283         // Other elements are file descriptors of selected sockets.
 284         private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
 285         private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
 286         private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
 287 
 288         private SubSelector() {
 289             this.pollArrayIndex = 0; // main thread
 290         }
 291 
 292         private SubSelector(int threadIndex) { // helper threads
 293             this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
 294         }
 295 
 296         private int poll() throws IOException{ // poll for the main thread
 297             return poll0(pollWrapper.pollArrayAddress,
 298                          Math.min(totalChannels, MAX_SELECTABLE_FDS),
 299                          readFds, writeFds, exceptFds, timeout);
 300         }
 301 
 302         private int poll(int index) throws IOException {
 303             // poll for helper threads
 304             return  poll0(pollWrapper.pollArrayAddress +
 305                      (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
 306                      Math.min(MAX_SELECTABLE_FDS,
 307                              totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
 308                      readFds, writeFds, exceptFds, timeout);
 309         }
 310 
 311         private native int poll0(long pollAddress, int numfds,
 312              int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
 313 
 314         private int processSelectedKeys(long updateCount) {
 315             int numKeysUpdated = 0;
 316             numKeysUpdated += processFDSet(updateCount, readFds,
 317                                            Net.POLLIN,
 318                                            false);
 319             numKeysUpdated += processFDSet(updateCount, writeFds,
 320                                            Net.POLLCONN |
 321                                            Net.POLLOUT,
 322                                            false);
 323             numKeysUpdated += processFDSet(updateCount, exceptFds,
 324                                            Net.POLLIN |
 325                                            Net.POLLCONN |
 326                                            Net.POLLOUT,
 327                                            true);
 328             return numKeysUpdated;
 329         }
 330 
 331         /**
 332          * Note, clearedCount is used to determine if the readyOps have
 333          * been reset in this select operation. updateCount is used to
 334          * tell if a key has been counted as updated in this select
 335          * operation.
 336          *
 337          * me.updateCount <= me.clearedCount <= updateCount
 338          */
 339         private int processFDSet(long updateCount, int[] fds, int rOps,
 340                                  boolean isExceptFds)
 341         {
 342             int numKeysUpdated = 0;
 343             for (int i = 1; i <= fds[0]; i++) {
 344                 int desc = fds[i];
 345                 if (desc == wakeupSourceFd) {
 346                     synchronized (interruptLock) {
 347                         interruptTriggered = true;
 348                     }
 349                     continue;
 350                 }
 351                 MapEntry me = fdMap.get(desc);
 352                 // If me is null, the key was deregistered in the previous
 353                 // processDeregisterQueue.
 354                 if (me == null)
 355                     continue;
 356                 SelectionKeyImpl sk = me.ski;
 357 
 358                 // The descriptor may be in the exceptfds set because there is
 359                 // OOB data queued to the socket. If there is OOB data then it
 360                 // is discarded and the key is not added to the selected set.
 361                 if (isExceptFds &&
 362                     (sk.channel() instanceof SocketChannelImpl) &&
 363                     discardUrgentData(desc))
 364                 {
 365                     continue;
 366                 }
 367 
 368                 if (selectedKeys.contains(sk)) { // Key in selected set
 369                     if (me.clearedCount != updateCount) {
 370                         if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
 371                             (me.updateCount != updateCount)) {
 372                             me.updateCount = updateCount;
 373                             numKeysUpdated++;
 374                         }
 375                     } else { // The readyOps have been set; now add
 376                         if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
 377                             (me.updateCount != updateCount)) {
 378                             me.updateCount = updateCount;
 379                             numKeysUpdated++;
 380                         }
 381                     }
 382                     me.clearedCount = updateCount;
 383                 } else { // Key is not in selected set yet
 384                     if (me.clearedCount != updateCount) {
 385                         sk.channel.translateAndSetReadyOps(rOps, sk);
 386                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
 387                             selectedKeys.add(sk);
 388                             me.updateCount = updateCount;
 389                             numKeysUpdated++;
 390                         }
 391                     } else { // The readyOps have been set; now add
 392                         sk.channel.translateAndUpdateReadyOps(rOps, sk);
 393                         if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
 394                             selectedKeys.add(sk);
 395                             me.updateCount = updateCount;
 396                             numKeysUpdated++;
 397                         }
 398                     }
 399                     me.clearedCount = updateCount;
 400                 }
 401             }
 402             return numKeysUpdated;
 403         }
 404     }
 405 
 406     // Represents a helper thread used for select.
 407     private final class SelectThread extends ManagedLocalsThread {
 408         private final int index; // index of this thread
 409         final SubSelector subSelector;
 410         private long lastRun = 0; // last run number
 411         private volatile boolean zombie;
 412         // Creates a new thread
 413         private SelectThread(int i) {

 414             this.index = i;
 415             this.subSelector = new SubSelector(i);
 416             //make sure we wait for next round of poll
 417             this.lastRun = startLock.runsCounter;
 418         }
 419         void makeZombie() {
 420             zombie = true;
 421         }
 422         boolean isZombie() {
 423             return zombie;
 424         }
 425         public void run() {
 426             while (true) { // poll loop
 427                 // wait for the start of poll. If this thread has become
 428                 // redundant, then exit.
 429                 if (startLock.waitForStart(this))
 430                     return;
 431                 // call poll()
 432                 try {
 433                     subSelector.poll(index);
 434                 } catch (IOException e) {
 435                     // Save this exception and let other threads finish.
 436                     finishLock.setException(e);
 437                 }
 438                 // notify main thread, that this thread has finished, and
 439                 // wakeup others, if this thread is the first to finish.
 440                 finishLock.threadFinished();
 441             }
 442         }
 443     }
 444 
 445     // After some channels registered/deregistered, the number of required
 446     // helper threads may have changed. Adjust this number.
 447     private void adjustThreadsCount() {
 448         if (threadsCount > threads.size()) {
 449             // More threads needed. Start more threads.
 450             for (int i = threads.size(); i < threadsCount; i++) {
 451                 SelectThread newThread = new SelectThread(i);
 452                 threads.add(newThread);
 453                 newThread.setDaemon(true);
 454                 newThread.start();
 455             }
 456         } else if (threadsCount < threads.size()) {
 457             // Some threads become redundant. Remove them from the threads List.
 458             for (int i = threads.size() - 1 ; i >= threadsCount; i--)
 459                 threads.remove(i).makeZombie();
 460         }
 461     }
 462 
 463     // Sets Windows wakeup socket to a signaled state.
 464     private void setWakeupSocket() {
 465         setWakeupSocket0(wakeupSinkFd);
 466     }
 467     private native void setWakeupSocket0(int wakeupSinkFd);
 468 
 469     // Sets Windows wakeup socket to a non-signaled state.
 470     private void resetWakeupSocket() {
 471         synchronized (interruptLock) {
 472             if (interruptTriggered == false)
 473                 return;
 474             resetWakeupSocket0(wakeupSourceFd);
 475             interruptTriggered = false;
 476         }
 477     }
 478 
 479     private native void resetWakeupSocket0(int wakeupSourceFd);
 480 
 481     private native boolean discardUrgentData(int fd);
 482 
 483     // We increment this counter on each call to updateSelectedKeys()
 484     // each entry in  SubSelector.fdsMap has a memorized value of
 485     // updateCount. When we increment numKeysUpdated we set updateCount
 486     // for the corresponding entry to its current value. This is used to
 487     // avoid counting the same key more than once - the same key can
 488     // appear in readfds and writefds.
 489     private long updateCount = 0;
 490 
 491     // Update ops of the corresponding Channels. Add the ready keys to the
 492     // ready queue.
 493     private int updateSelectedKeys() {
 494         updateCount++;
 495         int numKeysUpdated = 0;
 496         numKeysUpdated += subSelector.processSelectedKeys(updateCount);
 497         for (SelectThread t: threads) {
 498             numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
 499         }
 500         return numKeysUpdated;
 501     }
 502 
 503     protected void implClose() throws IOException {
 504         synchronized (closeLock) {
 505             if (channelArray != null) {
 506                 if (pollWrapper != null) {
 507                     // prevent further wakeup
 508                     synchronized (interruptLock) {
 509                         interruptTriggered = true;
 510                     }
 511                     wakeupPipe.sink().close();
 512                     wakeupPipe.source().close();
 513                     for(int i = 1; i < totalChannels; i++) { // Deregister channels
 514                         if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
 515                             deregister(channelArray[i]);
 516                             SelectableChannel selch = channelArray[i].channel();
 517                             if (!selch.isOpen() && !selch.isRegistered())
 518                                 ((SelChImpl)selch).kill();
 519                         }
 520                     }
 521                     pollWrapper.free();
 522                     pollWrapper = null;
 523                     selectedKeys = null;
 524                     channelArray = null;
 525                     // Make all remaining helper threads exit
 526                     for (SelectThread t: threads)
 527                          t.makeZombie();
 528                     startLock.startThreads();
 529                 }
 530             }
 531         }
 532     }
 533 
 534     protected void implRegister(SelectionKeyImpl ski) {
 535         synchronized (closeLock) {
 536             if (pollWrapper == null)
 537                 throw new ClosedSelectorException();
 538             growIfNeeded();
 539             channelArray[totalChannels] = ski;
 540             ski.setIndex(totalChannels);
 541             fdMap.put(ski);
 542             keys.add(ski);
 543             pollWrapper.addEntry(totalChannels, ski);
 544             totalChannels++;
 545         }
 546     }
 547 
 548     private void growIfNeeded() {
 549         if (channelArray.length == totalChannels) {
 550             int newSize = totalChannels * 2; // Make a larger array
 551             SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
 552             System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
 553             channelArray = temp;
 554             pollWrapper.grow(newSize);
 555         }
 556         if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
 557             pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
 558             totalChannels++;
 559             threadsCount++;
 560         }
 561     }
 562 
 563     protected void implDereg(SelectionKeyImpl ski) throws IOException{
 564         int i = ski.getIndex();
 565         assert (i >= 0);
 566         synchronized (closeLock) {
 567             if (i != totalChannels - 1) {
 568                 // Copy end one over it
 569                 SelectionKeyImpl endChannel = channelArray[totalChannels-1];
 570                 channelArray[i] = endChannel;
 571                 endChannel.setIndex(i);
 572                 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
 573                                                                 pollWrapper, i);
 574             }
 575             ski.setIndex(-1);
 576         }
 577         channelArray[totalChannels - 1] = null;
 578         totalChannels--;
 579         if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
 580             totalChannels--;
 581             threadsCount--; // The last thread has become redundant.
 582         }
 583         fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
 584         keys.remove(ski);
 585         selectedKeys.remove(ski);
 586         deregister(ski);
 587         SelectableChannel selch = ski.channel();
 588         if (!selch.isOpen() && !selch.isRegistered())
 589             ((SelChImpl)selch).kill();
 590     }
 591 
 592     public void putEventOps(SelectionKeyImpl sk, int ops) {
 593         synchronized (closeLock) {
 594             if (pollWrapper == null)
 595                 throw new ClosedSelectorException();
 596             // make sure this sk has not been removed yet
 597             int index = sk.getIndex();
 598             if (index == -1)
 599                 throw new CancelledKeyException();
 600             pollWrapper.putEventOps(index, ops);
 601         }
 602     }
 603 
 604     public Selector wakeup() {
 605         synchronized (interruptLock) {
 606             if (!interruptTriggered) {
 607                 setWakeupSocket();
 608                 interruptTriggered = true;
 609             }
 610         }
 611         return this;
 612     }
 613 
 614     static {
 615         IOUtil.load();
 616     }
 617 }
--- EOF ---