12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /* 27 */ 28 29 30 package sun.nio.ch; 31 32 import java.nio.channels.spi.SelectorProvider; 33 import java.nio.channels.Selector; 34 import java.nio.channels.ClosedSelectorException; 35 import java.nio.channels.Pipe; 36 import java.nio.channels.SelectableChannel; 37 import java.io.IOException; 38 import java.nio.channels.CancelledKeyException; 39 import java.util.List; 40 import java.util.ArrayList; 41 import java.util.HashMap; 42 import java.util.Iterator; 43 import sun.misc.ManagedLocalsThread; 44 45 /** 46 * A multi-threaded implementation of Selector for Windows. 47 * 48 * @author Konstantin Kladko 49 * @author Mark Reinhold 50 */ 51 52 final class WindowsSelectorImpl extends SelectorImpl { 53 // Initial capacity of the poll array 54 private final int INIT_CAP = 8; 55 // Maximum number of sockets for select(). 56 // Should be INIT_CAP times a power of 2 57 private final static int MAX_SELECTABLE_FDS = 1024; 58 59 // The list of SelectableChannels serviced by this Selector. Every mod 60 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll 61 // array, where the corresponding entry is occupied by the wakeupSocket 62 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 63 120 121 // Lock for interrupt triggering and clearing 122 private final Object interruptLock = new Object(); 123 private volatile boolean interruptTriggered = false; 124 125 WindowsSelectorImpl(SelectorProvider sp) throws IOException { 126 super(sp); 127 pollWrapper = new PollArrayWrapper(INIT_CAP); 128 wakeupPipe = Pipe.open(); 129 wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); 130 131 // Disable the Nagle algorithm so that the wakeup is more immediate 132 SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); 133 (sink.sc).socket().setTcpNoDelay(true); 134 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); 135 136 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); 137 } 138 139 protected int doSelect(long timeout) throws IOException { 140 if (channelArray == null) 141 throw new ClosedSelectorException(); 142 this.timeout = timeout; // set selector timeout 143 processDeregisterQueue(); 144 if (interruptTriggered) { 145 resetWakeupSocket(); 146 return 0; 147 } 148 // Calculate number of helper threads needed for poll. If necessary 149 // threads are created here and start waiting on startLock 150 adjustThreadsCount(); 151 finishLock.reset(); // reset finishLock 152 // Wakeup helper threads, waiting on startLock, so they start polling. 153 // Redundant threads will exit here after wakeup. 154 startLock.startThreads(); 155 // do polling in the main thread. Main thread is responsible for 156 // first MAX_SELECTABLE_FDS entries in pollArray. 157 try { 158 begin(); 159 try { 160 subSelector.poll(); 161 } catch (IOException e) { 162 finishLock.setException(e); // Save this exception 163 } 164 // Main thread is out of poll(). Wakeup others and wait for them 165 if (threads.size() > 0) 166 finishLock.waitForHelperThreads(); 167 } finally { 168 end(); 169 } 170 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 171 finishLock.checkForException(); 172 processDeregisterQueue(); 173 int updated = updateSelectedKeys(); 174 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 175 resetWakeupSocket(); 176 return updated; 177 } 178 179 // Helper threads wait on this lock for the next poll. 180 private final StartLock startLock = new StartLock(); 181 182 private final class StartLock { 183 // A variable which distinguishes the current run of doSelect from the 184 // previous one. Incrementing runsCounter and notifying threads will 185 // trigger another round of poll. 186 private long runsCounter; 187 // Triggers threads, waiting on this lock to start polling. 188 private synchronized void startThreads() { 189 runsCounter++; // next run 190 notifyAll(); // wake up threads. 191 } 192 // This function is called by a helper thread to wait for the 193 // next round of poll(). It also checks, if this thread became 194 // redundant. If yes, it returns true, notifying the thread 195 // that it should exit. 196 private synchronized boolean waitForStart(SelectThread thread) { 325 Net.POLLCONN | 326 Net.POLLOUT, 327 true); 328 return numKeysUpdated; 329 } 330 331 /** 332 * Note, clearedCount is used to determine if the readyOps have 333 * been reset in this select operation. updateCount is used to 334 * tell if a key has been counted as updated in this select 335 * operation. 336 * 337 * me.updateCount <= me.clearedCount <= updateCount 338 */ 339 private int processFDSet(long updateCount, int[] fds, int rOps, 340 boolean isExceptFds) 341 { 342 int numKeysUpdated = 0; 343 for (int i = 1; i <= fds[0]; i++) { 344 int desc = fds[i]; 345 if (desc == wakeupSourceFd) { 346 synchronized (interruptLock) { 347 interruptTriggered = true; 348 } 349 continue; 350 } 351 MapEntry me = fdMap.get(desc); 352 // If me is null, the key was deregistered in the previous 353 // processDeregisterQueue. 354 if (me == null) 355 continue; 356 SelectionKeyImpl sk = me.ski; 357 358 // The descriptor may be in the exceptfds set because there is 359 // OOB data queued to the socket. If there is OOB data then it 360 // is discarded and the key is not added to the selected set. 361 if (isExceptFds && 362 (sk.channel() instanceof SocketChannelImpl) && 363 discardUrgentData(desc)) 364 { 365 continue; 366 } 367 368 if (selectedKeys.contains(sk)) { // Key in selected set 369 if (me.clearedCount != updateCount) { 370 if (sk.channel.translateAndSetReadyOps(rOps, sk) && 371 (me.updateCount != updateCount)) { 372 me.updateCount = updateCount; 373 numKeysUpdated++; 374 } 375 } else { // The readyOps have been set; now add 376 if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && 377 (me.updateCount != updateCount)) { 378 me.updateCount = updateCount; 379 numKeysUpdated++; 380 } 381 } 382 me.clearedCount = updateCount; 383 } else { // Key is not in selected set yet 384 if (me.clearedCount != updateCount) { 385 sk.channel.translateAndSetReadyOps(rOps, sk); 386 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { 387 selectedKeys.add(sk); 388 me.updateCount = updateCount; 389 numKeysUpdated++; 390 } 391 } else { // The readyOps have been set; now add 392 sk.channel.translateAndUpdateReadyOps(rOps, sk); 393 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { 394 selectedKeys.add(sk); 395 me.updateCount = updateCount; 396 numKeysUpdated++; 397 } 398 } 399 me.clearedCount = updateCount; 400 } 401 } 402 return numKeysUpdated; 403 } 404 } 405 406 // Represents a helper thread used for select. 407 private final class SelectThread extends ManagedLocalsThread { 408 private final int index; // index of this thread 409 final SubSelector subSelector; 410 private long lastRun = 0; // last run number 411 private volatile boolean zombie; 412 // Creates a new thread 413 private SelectThread(int i) { 414 this.index = i; 415 this.subSelector = new SubSelector(i); 416 //make sure we wait for next round of poll 417 this.lastRun = startLock.runsCounter; 418 } 419 void makeZombie() { 420 zombie = true; 421 } 422 boolean isZombie() { 423 return zombie; 424 } 425 public void run() { 426 while (true) { // poll loop 427 // wait for the start of poll. If this thread has become | 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /* 27 */ 28 29 30 package sun.nio.ch; 31 32 import java.nio.channels.*; 33 import java.nio.channels.spi.SelectorProvider; 34 import java.io.IOException; 35 import java.util.*; 36 import java.util.function.Consumer; 37 38 /** 39 * A multi-threaded implementation of Selector for Windows. 40 * 41 * @author Konstantin Kladko 42 * @author Mark Reinhold 43 */ 44 45 final class WindowsSelectorImpl extends SelectorImpl { 46 // Initial capacity of the poll array 47 private final int INIT_CAP = 8; 48 // Maximum number of sockets for select(). 49 // Should be INIT_CAP times a power of 2 50 private final static int MAX_SELECTABLE_FDS = 1024; 51 52 // The list of SelectableChannels serviced by this Selector. Every mod 53 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll 54 // array, where the corresponding entry is occupied by the wakeupSocket 55 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 56 113 114 // Lock for interrupt triggering and clearing 115 private final Object interruptLock = new Object(); 116 private volatile boolean interruptTriggered = false; 117 118 WindowsSelectorImpl(SelectorProvider sp) throws IOException { 119 super(sp); 120 pollWrapper = new PollArrayWrapper(INIT_CAP); 121 wakeupPipe = Pipe.open(); 122 wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); 123 124 // Disable the Nagle algorithm so that the wakeup is more immediate 125 SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); 126 (sink.sc).socket().setTcpNoDelay(true); 127 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); 128 129 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); 130 } 131 132 protected int doSelect(long timeout) throws IOException { 133 if (pollSubSelector(timeout)) 134 return 0; 135 136 int updated = updateSelectedKeys(); 137 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 138 resetWakeupSocket(); 139 return updated; 140 } 141 142 @Override 143 protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException { 144 Objects.requireNonNull(handler); 145 if (pollSubSelector(timeout)) 146 return 0; 147 148 updateCount++; 149 int numKeysUpdated = 0; 150 numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler); 151 for (SelectThread t: threads) { 152 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler); 153 } 154 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 155 resetWakeupSocket(); 156 return numKeysUpdated; 157 } 158 159 private boolean pollSubSelector(long timeout) throws IOException { 160 if (channelArray == null) 161 throw new ClosedSelectorException(); 162 this.timeout = timeout; // set selector timeout 163 processDeregisterQueue(); 164 if (interruptTriggered) { 165 resetWakeupSocket(); 166 return true; 167 } 168 // Calculate number of helper threads needed for poll. If necessary 169 // threads are created here and start waiting on startLock 170 adjustThreadsCount(); 171 finishLock.reset(); // reset finishLock 172 // Wakeup helper threads, waiting on startLock, so they start polling. 173 // Redundant threads will exit here after wakeup. 174 startLock.startThreads(); 175 // do polling in the main thread. Main thread is responsible for 176 // first MAX_SELECTABLE_FDS entries in pollArray. 177 try { 178 begin(); 179 try { 180 subSelector.poll(); 181 } catch (IOException e) { 182 finishLock.setException(e); // Save this exception 183 } 184 // Main thread is out of poll(). Wakeup others and wait for them 185 if (threads.size() > 0) 186 finishLock.waitForHelperThreads(); 187 } finally { 188 end(); 189 } 190 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 191 finishLock.checkForException(); 192 processDeregisterQueue(); 193 return false; 194 } 195 196 // Helper threads wait on this lock for the next poll. 197 private final StartLock startLock = new StartLock(); 198 199 private final class StartLock { 200 // A variable which distinguishes the current run of doSelect from the 201 // previous one. Incrementing runsCounter and notifying threads will 202 // trigger another round of poll. 203 private long runsCounter; 204 // Triggers threads, waiting on this lock to start polling. 205 private synchronized void startThreads() { 206 runsCounter++; // next run 207 notifyAll(); // wake up threads. 208 } 209 // This function is called by a helper thread to wait for the 210 // next round of poll(). It also checks, if this thread became 211 // redundant. If yes, it returns true, notifying the thread 212 // that it should exit. 213 private synchronized boolean waitForStart(SelectThread thread) { 342 Net.POLLCONN | 343 Net.POLLOUT, 344 true); 345 return numKeysUpdated; 346 } 347 348 /** 349 * Note, clearedCount is used to determine if the readyOps have 350 * been reset in this select operation. updateCount is used to 351 * tell if a key has been counted as updated in this select 352 * operation. 353 * 354 * me.updateCount <= me.clearedCount <= updateCount 355 */ 356 private int processFDSet(long updateCount, int[] fds, int rOps, 357 boolean isExceptFds) 358 { 359 int numKeysUpdated = 0; 360 for (int i = 1; i <= fds[0]; i++) { 361 int desc = fds[i]; 362 if (checkWakeup(desc)) 363 continue; 364 MapEntry me = fdMap.get(desc); 365 // If me is null, the key was deregistered in the previous 366 // processDeregisterQueue. 367 if (me == null) 368 continue; 369 SelectionKeyImpl sk = me.ski; 370 371 if (isInterestingFileDescriptor(isExceptFds, desc, sk)) 372 continue; 373 374 if (selectedKeys.contains(sk)) { // Key in selected set 375 if (me.clearedCount != updateCount) { 376 if (sk.channel.translateAndSetReadyOps(rOps, sk) && 377 (me.updateCount != updateCount)) { 378 me.updateCount = updateCount; 379 numKeysUpdated++; 380 } 381 } else { // The readyOps have been set; now add 382 if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && 383 (me.updateCount != updateCount)) { 384 me.updateCount = updateCount; 385 numKeysUpdated++; 386 } 387 } 388 me.clearedCount = updateCount; 389 } else { // Key is not in selected set yet 390 if (me.clearedCount != updateCount) { 391 sk.channel.translateAndSetReadyOps(rOps, sk); 392 } else { // The readyOps have been set; now add 393 sk.channel.translateAndUpdateReadyOps(rOps, sk); 394 } 395 if (sk.hasOps()) { 396 selectedKeys.add(sk); 397 me.updateCount = updateCount; 398 numKeysUpdated++; 399 } 400 me.clearedCount = updateCount; 401 } 402 } 403 return numKeysUpdated; 404 } 405 406 private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) { 407 // The descriptor may be in the exceptfds set because there is 408 // OOB data queued to the socket. If there is OOB data then it 409 // is discarded and the key is not added to the selected set. 410 return isExceptFds && 411 (sk.channel() instanceof SocketChannelImpl) && 412 discardUrgentData(desc); 413 } 414 415 private int processFDSet(long updateCount, int[] fds, int rOps, 416 boolean isExceptFds, Consumer<SelectionKey> handler) 417 { 418 int numKeysUpdated = 0; 419 for (int i = 1; i <= fds[0]; i++) { 420 int desc = fds[i]; 421 if (checkWakeup(desc)) 422 continue; 423 MapEntry me = fdMap.get(desc); 424 // If me is null, the key was deregistered in the previous 425 // processDeregisterQueue. 426 if (me == null) 427 continue; 428 SelectionKeyImpl sk = me.ski; 429 430 if (isInterestingFileDescriptor(isExceptFds, desc, sk)) 431 continue; 432 433 if (me.clearedCount != updateCount) { 434 sk.channel.translateAndSetReadyOps(rOps, sk); 435 } else { // The readyOps have been set; now add 436 sk.channel.translateAndUpdateReadyOps(rOps, sk); 437 } 438 439 if (sk.hasOps()) { 440 handler.accept(sk); 441 me.updateCount = updateCount; 442 numKeysUpdated++; 443 } 444 me.clearedCount = updateCount; 445 } 446 return numKeysUpdated; 447 } 448 449 private boolean checkWakeup(int desc) { 450 if (desc == wakeupSourceFd) { 451 synchronized (interruptLock) { 452 interruptTriggered = true; 453 } 454 return true; 455 } 456 return false; 457 } 458 459 public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) { 460 int numKeysUpdated = 0; 461 numKeysUpdated += processFDSet(updateCount, readFds, 462 Net.POLLIN, 463 false, 464 handler); 465 numKeysUpdated += processFDSet(updateCount, writeFds, 466 Net.POLLCONN | 467 Net.POLLOUT, 468 false, 469 handler); 470 numKeysUpdated += processFDSet(updateCount, exceptFds, 471 Net.POLLIN | 472 Net.POLLCONN | 473 Net.POLLOUT, 474 true, 475 handler); 476 return numKeysUpdated; 477 } 478 } 479 480 // Represents a helper thread used for select. 481 private final class SelectThread extends Thread { 482 private final int index; // index of this thread 483 final SubSelector subSelector; 484 private long lastRun = 0; // last run number 485 private volatile boolean zombie; 486 // Creates a new thread 487 private SelectThread(int i) { 488 this.index = i; 489 this.subSelector = new SubSelector(i); 490 //make sure we wait for next round of poll 491 this.lastRun = startLock.runsCounter; 492 } 493 void makeZombie() { 494 zombie = true; 495 } 496 boolean isZombie() { 497 return zombie; 498 } 499 public void run() { 500 while (true) { // poll loop 501 // wait for the start of poll. If this thread has become |