1 /* 2 * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /* 27 */ 28 29 30 package sun.nio.ch; 31 32 import java.nio.channels.*; 33 import java.nio.channels.spi.SelectorProvider; 34 import java.io.IOException; 35 import java.util.*; 36 import java.util.function.Consumer; 37 38 /** 39 * A multi-threaded implementation of Selector for Windows. 40 * 41 * @author Konstantin Kladko 42 * @author Mark Reinhold 43 */ 44 45 final class WindowsSelectorImpl extends SelectorImpl { 46 // Initial capacity of the poll array 47 private final int INIT_CAP = 8; 48 // Maximum number of sockets for select(). 49 // Should be INIT_CAP times a power of 2 50 private final static int MAX_SELECTABLE_FDS = 1024; 51 52 // The list of SelectableChannels serviced by this Selector. Every mod 53 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll 54 // array, where the corresponding entry is occupied by the wakeupSocket 55 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 56 57 // The global native poll array holds file decriptors and event masks 58 private PollArrayWrapper pollWrapper; 59 60 // The number of valid entries in poll array, including entries occupied 61 // by wakeup socket handle. 62 private int totalChannels = 1; 63 64 // Number of helper threads needed for select. We need one thread per 65 // each additional set of MAX_SELECTABLE_FDS - 1 channels. 66 private int threadsCount = 0; 67 68 // A list of helper threads for select. 69 private final List<SelectThread> threads = new ArrayList<SelectThread>(); 70 71 //Pipe used as a wakeup object. 72 private final Pipe wakeupPipe; 73 74 // File descriptors corresponding to source and sink 75 private final int wakeupSourceFd, wakeupSinkFd; 76 77 // Lock for close cleanup 78 private Object closeLock = new Object(); 79 80 // Maps file descriptors to their indices in pollArray 81 private final static class FdMap extends HashMap<Integer, MapEntry> { 82 static final long serialVersionUID = 0L; 83 private MapEntry get(int desc) { 84 return get(new Integer(desc)); 85 } 86 private MapEntry put(SelectionKeyImpl ski) { 87 return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski)); 88 } 89 private MapEntry remove(SelectionKeyImpl ski) { 90 Integer fd = new Integer(ski.channel.getFDVal()); 91 MapEntry x = get(fd); 92 if ((x != null) && (x.ski.channel == ski.channel)) 93 return remove(fd); 94 return null; 95 } 96 } 97 98 // class for fdMap entries 99 private final static class MapEntry { 100 SelectionKeyImpl ski; 101 long updateCount = 0; 102 long clearedCount = 0; 103 MapEntry(SelectionKeyImpl ski) { 104 this.ski = ski; 105 } 106 } 107 private final FdMap fdMap = new FdMap(); 108 109 // SubSelector for the main thread 110 private final SubSelector subSelector = new SubSelector(); 111 112 private long timeout; //timeout for poll 113 114 // Lock for interrupt triggering and clearing 115 private final Object interruptLock = new Object(); 116 private volatile boolean interruptTriggered = false; 117 118 WindowsSelectorImpl(SelectorProvider sp) throws IOException { 119 super(sp); 120 pollWrapper = new PollArrayWrapper(INIT_CAP); 121 wakeupPipe = Pipe.open(); 122 wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); 123 124 // Disable the Nagle algorithm so that the wakeup is more immediate 125 SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); 126 (sink.sc).socket().setTcpNoDelay(true); 127 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); 128 129 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); 130 } 131 132 protected int doSelect(long timeout) throws IOException { 133 if (pollSubSelector(timeout)) 134 return 0; 135 136 int updated = updateSelectedKeys(); 137 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 138 resetWakeupSocket(); 139 return updated; 140 } 141 142 @Override 143 protected int doSelect(Consumer<SelectionKey> handler, long timeout) throws IOException { 144 Objects.requireNonNull(handler); 145 if (pollSubSelector(timeout)) 146 return 0; 147 148 updateCount++; 149 int numKeysUpdated = 0; 150 numKeysUpdated += subSelector.processSelectedKeys(updateCount, handler); 151 for (SelectThread t: threads) { 152 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, handler); 153 } 154 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 155 resetWakeupSocket(); 156 return numKeysUpdated; 157 } 158 159 private boolean pollSubSelector(long timeout) throws IOException { 160 if (channelArray == null) 161 throw new ClosedSelectorException(); 162 this.timeout = timeout; // set selector timeout 163 processDeregisterQueue(); 164 if (interruptTriggered) { 165 resetWakeupSocket(); 166 return true; 167 } 168 // Calculate number of helper threads needed for poll. If necessary 169 // threads are created here and start waiting on startLock 170 adjustThreadsCount(); 171 finishLock.reset(); // reset finishLock 172 // Wakeup helper threads, waiting on startLock, so they start polling. 173 // Redundant threads will exit here after wakeup. 174 startLock.startThreads(); 175 // do polling in the main thread. Main thread is responsible for 176 // first MAX_SELECTABLE_FDS entries in pollArray. 177 try { 178 begin(); 179 try { 180 subSelector.poll(); 181 } catch (IOException e) { 182 finishLock.setException(e); // Save this exception 183 } 184 // Main thread is out of poll(). Wakeup others and wait for them 185 if (threads.size() > 0) 186 finishLock.waitForHelperThreads(); 187 } finally { 188 end(); 189 } 190 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 191 finishLock.checkForException(); 192 processDeregisterQueue(); 193 return false; 194 } 195 196 // Helper threads wait on this lock for the next poll. 197 private final StartLock startLock = new StartLock(); 198 199 private final class StartLock { 200 // A variable which distinguishes the current run of doSelect from the 201 // previous one. Incrementing runsCounter and notifying threads will 202 // trigger another round of poll. 203 private long runsCounter; 204 // Triggers threads, waiting on this lock to start polling. 205 private synchronized void startThreads() { 206 runsCounter++; // next run 207 notifyAll(); // wake up threads. 208 } 209 // This function is called by a helper thread to wait for the 210 // next round of poll(). It also checks, if this thread became 211 // redundant. If yes, it returns true, notifying the thread 212 // that it should exit. 213 private synchronized boolean waitForStart(SelectThread thread) { 214 while (true) { 215 while (runsCounter == thread.lastRun) { 216 try { 217 startLock.wait(); 218 } catch (InterruptedException e) { 219 Thread.currentThread().interrupt(); 220 } 221 } 222 if (thread.isZombie()) { // redundant thread 223 return true; // will cause run() to exit. 224 } else { 225 thread.lastRun = runsCounter; // update lastRun 226 return false; // will cause run() to poll. 227 } 228 } 229 } 230 } 231 232 // Main thread waits on this lock, until all helper threads are done 233 // with poll(). 234 private final FinishLock finishLock = new FinishLock(); 235 236 private final class FinishLock { 237 // Number of helper threads, that did not finish yet. 238 private int threadsToFinish; 239 240 // IOException which occurred during the last run. 241 IOException exception = null; 242 243 // Called before polling. 244 private void reset() { 245 threadsToFinish = threads.size(); // helper threads 246 } 247 248 // Each helper thread invokes this function on finishLock, when 249 // the thread is done with poll(). 250 private synchronized void threadFinished() { 251 if (threadsToFinish == threads.size()) { // finished poll() first 252 // if finished first, wakeup others 253 wakeup(); 254 } 255 threadsToFinish--; 256 if (threadsToFinish == 0) // all helper threads finished poll(). 257 notify(); // notify the main thread 258 } 259 260 // The main thread invokes this function on finishLock to wait 261 // for helper threads to finish poll(). 262 private synchronized void waitForHelperThreads() { 263 if (threadsToFinish == threads.size()) { 264 // no helper threads finished yet. Wakeup them up. 265 wakeup(); 266 } 267 while (threadsToFinish != 0) { 268 try { 269 finishLock.wait(); 270 } catch (InterruptedException e) { 271 // Interrupted - set interrupted state. 272 Thread.currentThread().interrupt(); 273 } 274 } 275 } 276 277 // sets IOException for this run 278 private synchronized void setException(IOException e) { 279 exception = e; 280 } 281 282 // Checks if there was any exception during the last run. 283 // If yes, throws it 284 private void checkForException() throws IOException { 285 if (exception == null) 286 return; 287 StringBuffer message = new StringBuffer("An exception occurred" + 288 " during the execution of select(): \n"); 289 message.append(exception); 290 message.append('\n'); 291 exception = null; 292 throw new IOException(message.toString()); 293 } 294 } 295 296 private final class SubSelector { 297 private final int pollArrayIndex; // starting index in pollArray to poll 298 // These arrays will hold result of native select(). 299 // The first element of each array is the number of selected sockets. 300 // Other elements are file descriptors of selected sockets. 301 private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; 302 private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; 303 private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; 304 305 private SubSelector() { 306 this.pollArrayIndex = 0; // main thread 307 } 308 309 private SubSelector(int threadIndex) { // helper threads 310 this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS; 311 } 312 313 private int poll() throws IOException{ // poll for the main thread 314 return poll0(pollWrapper.pollArrayAddress, 315 Math.min(totalChannels, MAX_SELECTABLE_FDS), 316 readFds, writeFds, exceptFds, timeout); 317 } 318 319 private int poll(int index) throws IOException { 320 // poll for helper threads 321 return poll0(pollWrapper.pollArrayAddress + 322 (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), 323 Math.min(MAX_SELECTABLE_FDS, 324 totalChannels - (index + 1) * MAX_SELECTABLE_FDS), 325 readFds, writeFds, exceptFds, timeout); 326 } 327 328 private native int poll0(long pollAddress, int numfds, 329 int[] readFds, int[] writeFds, int[] exceptFds, long timeout); 330 331 private int processSelectedKeys(long updateCount) { 332 int numKeysUpdated = 0; 333 numKeysUpdated += processFDSet(updateCount, readFds, 334 Net.POLLIN, 335 false); 336 numKeysUpdated += processFDSet(updateCount, writeFds, 337 Net.POLLCONN | 338 Net.POLLOUT, 339 false); 340 numKeysUpdated += processFDSet(updateCount, exceptFds, 341 Net.POLLIN | 342 Net.POLLCONN | 343 Net.POLLOUT, 344 true); 345 return numKeysUpdated; 346 } 347 348 /** 349 * Note, clearedCount is used to determine if the readyOps have 350 * been reset in this select operation. updateCount is used to 351 * tell if a key has been counted as updated in this select 352 * operation. 353 * 354 * me.updateCount <= me.clearedCount <= updateCount 355 */ 356 private int processFDSet(long updateCount, int[] fds, int rOps, 357 boolean isExceptFds) 358 { 359 int numKeysUpdated = 0; 360 for (int i = 1; i <= fds[0]; i++) { 361 int desc = fds[i]; 362 if (checkWakeup(desc)) 363 continue; 364 MapEntry me = fdMap.get(desc); 365 // If me is null, the key was deregistered in the previous 366 // processDeregisterQueue. 367 if (me == null) 368 continue; 369 SelectionKeyImpl sk = me.ski; 370 371 if (isInterestingFileDescriptor(isExceptFds, desc, sk)) 372 continue; 373 374 if (selectedKeys.contains(sk)) { // Key in selected set 375 if (me.clearedCount != updateCount) { 376 if (sk.channel.translateAndSetReadyOps(rOps, sk) && 377 (me.updateCount != updateCount)) { 378 me.updateCount = updateCount; 379 numKeysUpdated++; 380 } 381 } else { // The readyOps have been set; now add 382 if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && 383 (me.updateCount != updateCount)) { 384 me.updateCount = updateCount; 385 numKeysUpdated++; 386 } 387 } 388 me.clearedCount = updateCount; 389 } else { // Key is not in selected set yet 390 if (me.clearedCount != updateCount) { 391 sk.channel.translateAndSetReadyOps(rOps, sk); 392 } else { // The readyOps have been set; now add 393 sk.channel.translateAndUpdateReadyOps(rOps, sk); 394 } 395 if (sk.hasOps()) { 396 selectedKeys.add(sk); 397 me.updateCount = updateCount; 398 numKeysUpdated++; 399 } 400 me.clearedCount = updateCount; 401 } 402 } 403 return numKeysUpdated; 404 } 405 406 private boolean isInterestingFileDescriptor(boolean isExceptFds, int desc, SelectionKeyImpl sk) { 407 // The descriptor may be in the exceptfds set because there is 408 // OOB data queued to the socket. If there is OOB data then it 409 // is discarded and the key is not added to the selected set. 410 return isExceptFds && 411 (sk.channel() instanceof SocketChannelImpl) && 412 discardUrgentData(desc); 413 } 414 415 private int processFDSet(long updateCount, int[] fds, int rOps, 416 boolean isExceptFds, Consumer<SelectionKey> handler) 417 { 418 int numKeysUpdated = 0; 419 for (int i = 1; i <= fds[0]; i++) { 420 int desc = fds[i]; 421 if (checkWakeup(desc)) 422 continue; 423 MapEntry me = fdMap.get(desc); 424 // If me is null, the key was deregistered in the previous 425 // processDeregisterQueue. 426 if (me == null) 427 continue; 428 SelectionKeyImpl sk = me.ski; 429 430 if (isInterestingFileDescriptor(isExceptFds, desc, sk)) 431 continue; 432 433 if (me.clearedCount != updateCount) { 434 sk.channel.translateAndSetReadyOps(rOps, sk); 435 } else { // The readyOps have been set; now add 436 sk.channel.translateAndUpdateReadyOps(rOps, sk); 437 } 438 439 if (sk.hasOps()) { 440 handler.accept(sk); 441 me.updateCount = updateCount; 442 numKeysUpdated++; 443 } 444 me.clearedCount = updateCount; 445 } 446 return numKeysUpdated; 447 } 448 449 private boolean checkWakeup(int desc) { 450 if (desc == wakeupSourceFd) { 451 synchronized (interruptLock) { 452 interruptTriggered = true; 453 } 454 return true; 455 } 456 return false; 457 } 458 459 public int processSelectedKeys(long updateCount, Consumer<SelectionKey> handler) { 460 int numKeysUpdated = 0; 461 numKeysUpdated += processFDSet(updateCount, readFds, 462 Net.POLLIN, 463 false, 464 handler); 465 numKeysUpdated += processFDSet(updateCount, writeFds, 466 Net.POLLCONN | 467 Net.POLLOUT, 468 false, 469 handler); 470 numKeysUpdated += processFDSet(updateCount, exceptFds, 471 Net.POLLIN | 472 Net.POLLCONN | 473 Net.POLLOUT, 474 true, 475 handler); 476 return numKeysUpdated; 477 } 478 } 479 480 // Represents a helper thread used for select. 481 private final class SelectThread extends Thread { 482 private final int index; // index of this thread 483 final SubSelector subSelector; 484 private long lastRun = 0; // last run number 485 private volatile boolean zombie; 486 // Creates a new thread 487 private SelectThread(int i) { 488 this.index = i; 489 this.subSelector = new SubSelector(i); 490 //make sure we wait for next round of poll 491 this.lastRun = startLock.runsCounter; 492 } 493 void makeZombie() { 494 zombie = true; 495 } 496 boolean isZombie() { 497 return zombie; 498 } 499 public void run() { 500 while (true) { // poll loop 501 // wait for the start of poll. If this thread has become 502 // redundant, then exit. 503 if (startLock.waitForStart(this)) 504 return; 505 // call poll() 506 try { 507 subSelector.poll(index); 508 } catch (IOException e) { 509 // Save this exception and let other threads finish. 510 finishLock.setException(e); 511 } 512 // notify main thread, that this thread has finished, and 513 // wakeup others, if this thread is the first to finish. 514 finishLock.threadFinished(); 515 } 516 } 517 } 518 519 // After some channels registered/deregistered, the number of required 520 // helper threads may have changed. Adjust this number. 521 private void adjustThreadsCount() { 522 if (threadsCount > threads.size()) { 523 // More threads needed. Start more threads. 524 for (int i = threads.size(); i < threadsCount; i++) { 525 SelectThread newThread = new SelectThread(i); 526 threads.add(newThread); 527 newThread.setDaemon(true); 528 newThread.start(); 529 } 530 } else if (threadsCount < threads.size()) { 531 // Some threads become redundant. Remove them from the threads List. 532 for (int i = threads.size() - 1 ; i >= threadsCount; i--) 533 threads.remove(i).makeZombie(); 534 } 535 } 536 537 // Sets Windows wakeup socket to a signaled state. 538 private void setWakeupSocket() { 539 setWakeupSocket0(wakeupSinkFd); 540 } 541 private native void setWakeupSocket0(int wakeupSinkFd); 542 543 // Sets Windows wakeup socket to a non-signaled state. 544 private void resetWakeupSocket() { 545 synchronized (interruptLock) { 546 if (interruptTriggered == false) 547 return; 548 resetWakeupSocket0(wakeupSourceFd); 549 interruptTriggered = false; 550 } 551 } 552 553 private native void resetWakeupSocket0(int wakeupSourceFd); 554 555 private native boolean discardUrgentData(int fd); 556 557 // We increment this counter on each call to updateSelectedKeys() 558 // each entry in SubSelector.fdsMap has a memorized value of 559 // updateCount. When we increment numKeysUpdated we set updateCount 560 // for the corresponding entry to its current value. This is used to 561 // avoid counting the same key more than once - the same key can 562 // appear in readfds and writefds. 563 private long updateCount = 0; 564 565 // Update ops of the corresponding Channels. Add the ready keys to the 566 // ready queue. 567 private int updateSelectedKeys() { 568 updateCount++; 569 int numKeysUpdated = 0; 570 numKeysUpdated += subSelector.processSelectedKeys(updateCount); 571 for (SelectThread t: threads) { 572 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount); 573 } 574 return numKeysUpdated; 575 } 576 577 protected void implClose() throws IOException { 578 synchronized (closeLock) { 579 if (channelArray != null) { 580 if (pollWrapper != null) { 581 // prevent further wakeup 582 synchronized (interruptLock) { 583 interruptTriggered = true; 584 } 585 wakeupPipe.sink().close(); 586 wakeupPipe.source().close(); 587 for(int i = 1; i < totalChannels; i++) { // Deregister channels 588 if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent 589 deregister(channelArray[i]); 590 SelectableChannel selch = channelArray[i].channel(); 591 if (!selch.isOpen() && !selch.isRegistered()) 592 ((SelChImpl)selch).kill(); 593 } 594 } 595 pollWrapper.free(); 596 pollWrapper = null; 597 selectedKeys = null; 598 channelArray = null; 599 // Make all remaining helper threads exit 600 for (SelectThread t: threads) 601 t.makeZombie(); 602 startLock.startThreads(); 603 } 604 } 605 } 606 } 607 608 protected void implRegister(SelectionKeyImpl ski) { 609 synchronized (closeLock) { 610 if (pollWrapper == null) 611 throw new ClosedSelectorException(); 612 growIfNeeded(); 613 channelArray[totalChannels] = ski; 614 ski.setIndex(totalChannels); 615 fdMap.put(ski); 616 keys.add(ski); 617 pollWrapper.addEntry(totalChannels, ski); 618 totalChannels++; 619 } 620 } 621 622 private void growIfNeeded() { 623 if (channelArray.length == totalChannels) { 624 int newSize = totalChannels * 2; // Make a larger array 625 SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; 626 System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); 627 channelArray = temp; 628 pollWrapper.grow(newSize); 629 } 630 if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed 631 pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); 632 totalChannels++; 633 threadsCount++; 634 } 635 } 636 637 protected void implDereg(SelectionKeyImpl ski) throws IOException{ 638 int i = ski.getIndex(); 639 assert (i >= 0); 640 synchronized (closeLock) { 641 if (i != totalChannels - 1) { 642 // Copy end one over it 643 SelectionKeyImpl endChannel = channelArray[totalChannels-1]; 644 channelArray[i] = endChannel; 645 endChannel.setIndex(i); 646 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, 647 pollWrapper, i); 648 } 649 ski.setIndex(-1); 650 } 651 channelArray[totalChannels - 1] = null; 652 totalChannels--; 653 if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { 654 totalChannels--; 655 threadsCount--; // The last thread has become redundant. 656 } 657 fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys 658 keys.remove(ski); 659 selectedKeys.remove(ski); 660 deregister(ski); 661 SelectableChannel selch = ski.channel(); 662 if (!selch.isOpen() && !selch.isRegistered()) 663 ((SelChImpl)selch).kill(); 664 } 665 666 public void putEventOps(SelectionKeyImpl sk, int ops) { 667 synchronized (closeLock) { 668 if (pollWrapper == null) 669 throw new ClosedSelectorException(); 670 // make sure this sk has not been removed yet 671 int index = sk.getIndex(); 672 if (index == -1) 673 throw new CancelledKeyException(); 674 pollWrapper.putEventOps(index, ops); 675 } 676 } 677 678 public Selector wakeup() { 679 synchronized (interruptLock) { 680 if (!interruptTriggered) { 681 setWakeupSocket(); 682 interruptTriggered = true; 683 } 684 } 685 return this; 686 } 687 688 static { 689 IOUtil.load(); 690 } 691 }