1 /* 2 * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /* 27 */ 28 29 30 package sun.nio.ch; 31 32 import java.nio.channels.spi.SelectorProvider; 33 import java.nio.channels.Selector; 34 import java.nio.channels.ClosedSelectorException; 35 import java.nio.channels.Pipe; 36 import java.nio.channels.SelectableChannel; 37 import java.io.IOException; 38 import java.nio.channels.CancelledKeyException; 39 import java.util.List; 40 import java.util.ArrayList; 41 import java.util.HashMap; 42 43 /** 44 * A multi-threaded implementation of Selector for Windows. 45 * 46 * @author Konstantin Kladko 47 * @author Mark Reinhold 48 */ 49 50 final class WindowsSelectorImpl extends SelectorImpl { 51 // Initial capacity of the poll array 52 private final int INIT_CAP = 8; 53 // Maximum number of sockets for select(). 54 // Should be INIT_CAP times a power of 2 55 private static final int MAX_SELECTABLE_FDS = 1024; 56 57 // The list of SelectableChannels serviced by this Selector. Every mod 58 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll 59 // array, where the corresponding entry is occupied by the wakeupSocket 60 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 61 62 // The global native poll array holds file descriptors and event masks 63 private PollArrayWrapper pollWrapper; 64 65 // The number of valid entries in poll array, including entries occupied 66 // by wakeup socket handle. 67 private int totalChannels = 1; 68 69 // Number of helper threads needed for select. We need one thread per 70 // each additional set of MAX_SELECTABLE_FDS - 1 channels. 71 private int threadsCount = 0; 72 73 // A list of helper threads for select. 74 private final List<SelectThread> threads = new ArrayList<SelectThread>(); 75 76 //Pipe used as a wakeup object. 77 private final Pipe wakeupPipe; 78 79 // File descriptors corresponding to source and sink 80 private final int wakeupSourceFd, wakeupSinkFd; 81 82 // Lock for close cleanup 83 private final Object closeLock = new Object(); 84 85 // Maps file descriptors to their indices in pollArray 86 private static final class FdMap extends HashMap<Integer, MapEntry> { 87 static final long serialVersionUID = 0L; 88 private MapEntry get(int desc) { 89 return get(Integer.valueOf(desc)); 90 } 91 private MapEntry put(SelectionKeyImpl ski) { 92 return put(Integer.valueOf(ski.channel.getFDVal()), new MapEntry(ski)); 93 } 94 private MapEntry remove(SelectionKeyImpl ski) { 95 Integer fd = Integer.valueOf(ski.channel.getFDVal()); 96 MapEntry x = get(fd); 97 if ((x != null) && (x.ski.channel == ski.channel)) 98 return remove(fd); 99 return null; 100 } 101 } 102 103 // class for fdMap entries 104 private static final class MapEntry { 105 SelectionKeyImpl ski; 106 long updateCount = 0; 107 long clearedCount = 0; 108 MapEntry(SelectionKeyImpl ski) { 109 this.ski = ski; 110 } 111 } 112 private final FdMap fdMap = new FdMap(); 113 114 // SubSelector for the main thread 115 private final SubSelector subSelector = new SubSelector(); 116 117 private long timeout; //timeout for poll 118 119 // Lock for interrupt triggering and clearing 120 private final Object interruptLock = new Object(); 121 private volatile boolean interruptTriggered; 122 123 WindowsSelectorImpl(SelectorProvider sp) throws IOException { 124 super(sp); 125 pollWrapper = new PollArrayWrapper(INIT_CAP); 126 wakeupPipe = Pipe.open(); 127 wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); 128 129 // Disable the Nagle algorithm so that the wakeup is more immediate 130 SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); 131 (sink.sc).socket().setTcpNoDelay(true); 132 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); 133 134 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); 135 } 136 137 protected int doSelect(long timeout) throws IOException { 138 if (channelArray == null) 139 throw new ClosedSelectorException(); 140 this.timeout = timeout; // set selector timeout 141 processDeregisterQueue(); 142 if (interruptTriggered) { 143 resetWakeupSocket(); 144 return 0; 145 } 146 // Calculate number of helper threads needed for poll. If necessary 147 // threads are created here and start waiting on startLock 148 adjustThreadsCount(); 149 finishLock.reset(); // reset finishLock 150 // Wakeup helper threads, waiting on startLock, so they start polling. 151 // Redundant threads will exit here after wakeup. 152 startLock.startThreads(); 153 // do polling in the main thread. Main thread is responsible for 154 // first MAX_SELECTABLE_FDS entries in pollArray. 155 try { 156 begin(); 157 try { 158 subSelector.poll(); 159 } catch (IOException e) { 160 finishLock.setException(e); // Save this exception 161 } 162 // Main thread is out of poll(). Wakeup others and wait for them 163 if (threads.size() > 0) 164 finishLock.waitForHelperThreads(); 165 } finally { 166 end(); 167 } 168 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 169 finishLock.checkForException(); 170 processDeregisterQueue(); 171 int updated = updateSelectedKeys(); 172 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 173 resetWakeupSocket(); 174 return updated; 175 } 176 177 // Helper threads wait on this lock for the next poll. 178 private final StartLock startLock = new StartLock(); 179 180 private final class StartLock { 181 // A variable which distinguishes the current run of doSelect from the 182 // previous one. Incrementing runsCounter and notifying threads will 183 // trigger another round of poll. 184 private long runsCounter; 185 // Triggers threads, waiting on this lock to start polling. 186 private synchronized void startThreads() { 187 runsCounter++; // next run 188 notifyAll(); // wake up threads. 189 } 190 // This function is called by a helper thread to wait for the 191 // next round of poll(). It also checks, if this thread became 192 // redundant. If yes, it returns true, notifying the thread 193 // that it should exit. 194 private synchronized boolean waitForStart(SelectThread thread) { 195 while (true) { 196 while (runsCounter == thread.lastRun) { 197 try { 198 startLock.wait(); 199 } catch (InterruptedException e) { 200 Thread.currentThread().interrupt(); 201 } 202 } 203 if (thread.isZombie()) { // redundant thread 204 return true; // will cause run() to exit. 205 } else { 206 thread.lastRun = runsCounter; // update lastRun 207 return false; // will cause run() to poll. 208 } 209 } 210 } 211 } 212 213 // Main thread waits on this lock, until all helper threads are done 214 // with poll(). 215 private final FinishLock finishLock = new FinishLock(); 216 217 private final class FinishLock { 218 // Number of helper threads, that did not finish yet. 219 private int threadsToFinish; 220 221 // IOException which occurred during the last run. 222 IOException exception = null; 223 224 // Called before polling. 225 private void reset() { 226 threadsToFinish = threads.size(); // helper threads 227 } 228 229 // Each helper thread invokes this function on finishLock, when 230 // the thread is done with poll(). 231 private synchronized void threadFinished() { 232 if (threadsToFinish == threads.size()) { // finished poll() first 233 // if finished first, wakeup others 234 wakeup(); 235 } 236 threadsToFinish--; 237 if (threadsToFinish == 0) // all helper threads finished poll(). 238 notify(); // notify the main thread 239 } 240 241 // The main thread invokes this function on finishLock to wait 242 // for helper threads to finish poll(). 243 private synchronized void waitForHelperThreads() { 244 if (threadsToFinish == threads.size()) { 245 // no helper threads finished yet. Wakeup them up. 246 wakeup(); 247 } 248 while (threadsToFinish != 0) { 249 try { 250 finishLock.wait(); 251 } catch (InterruptedException e) { 252 // Interrupted - set interrupted state. 253 Thread.currentThread().interrupt(); 254 } 255 } 256 } 257 258 // sets IOException for this run 259 private synchronized void setException(IOException e) { 260 exception = e; 261 } 262 263 // Checks if there was any exception during the last run. 264 // If yes, throws it 265 private void checkForException() throws IOException { 266 if (exception == null) 267 return; 268 StringBuffer message = new StringBuffer("An exception occurred" + 269 " during the execution of select(): \n"); 270 message.append(exception); 271 message.append('\n'); 272 exception = null; 273 throw new IOException(message.toString()); 274 } 275 } 276 277 private final class SubSelector { 278 private final int pollArrayIndex; // starting index in pollArray to poll 279 // These arrays will hold result of native select(). 280 // The first element of each array is the number of selected sockets. 281 // Other elements are file descriptors of selected sockets. 282 private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; 283 private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; 284 private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; 285 286 private SubSelector() { 287 this.pollArrayIndex = 0; // main thread 288 } 289 290 private SubSelector(int threadIndex) { // helper threads 291 this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS; 292 } 293 294 private int poll() throws IOException{ // poll for the main thread 295 return poll0(pollWrapper.pollArrayAddress, 296 Math.min(totalChannels, MAX_SELECTABLE_FDS), 297 readFds, writeFds, exceptFds, timeout); 298 } 299 300 private int poll(int index) throws IOException { 301 // poll for helper threads 302 return poll0(pollWrapper.pollArrayAddress + 303 (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), 304 Math.min(MAX_SELECTABLE_FDS, 305 totalChannels - (index + 1) * MAX_SELECTABLE_FDS), 306 readFds, writeFds, exceptFds, timeout); 307 } 308 309 private native int poll0(long pollAddress, int numfds, 310 int[] readFds, int[] writeFds, int[] exceptFds, long timeout); 311 312 private int processSelectedKeys(long updateCount) { 313 int numKeysUpdated = 0; 314 numKeysUpdated += processFDSet(updateCount, readFds, 315 Net.POLLIN, 316 false); 317 numKeysUpdated += processFDSet(updateCount, writeFds, 318 Net.POLLCONN | 319 Net.POLLOUT, 320 false); 321 numKeysUpdated += processFDSet(updateCount, exceptFds, 322 Net.POLLIN | 323 Net.POLLCONN | 324 Net.POLLOUT, 325 true); 326 return numKeysUpdated; 327 } 328 329 /** 330 * Note, clearedCount is used to determine if the readyOps have 331 * been reset in this select operation. updateCount is used to 332 * tell if a key has been counted as updated in this select 333 * operation. 334 * 335 * me.updateCount <= me.clearedCount <= updateCount 336 */ 337 private int processFDSet(long updateCount, int[] fds, int rOps, 338 boolean isExceptFds) 339 { 340 int numKeysUpdated = 0; 341 for (int i = 1; i <= fds[0]; i++) { 342 int desc = fds[i]; 343 if (desc == wakeupSourceFd) { 344 synchronized (interruptLock) { 345 interruptTriggered = true; 346 } 347 continue; 348 } 349 MapEntry me = fdMap.get(desc); 350 // If me is null, the key was deregistered in the previous 351 // processDeregisterQueue. 352 if (me == null) 353 continue; 354 SelectionKeyImpl sk = me.ski; 355 356 // The descriptor may be in the exceptfds set because there is 357 // OOB data queued to the socket. If there is OOB data then it 358 // is discarded and the key is not added to the selected set. 359 if (isExceptFds && 360 (sk.channel() instanceof SocketChannelImpl) && 361 discardUrgentData(desc)) 362 { 363 continue; 364 } 365 366 if (selectedKeys.contains(sk)) { // Key in selected set 367 if (me.clearedCount != updateCount) { 368 if (sk.channel.translateAndSetReadyOps(rOps, sk) && 369 (me.updateCount != updateCount)) { 370 me.updateCount = updateCount; 371 numKeysUpdated++; 372 } 373 } else { // The readyOps have been set; now add 374 if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && 375 (me.updateCount != updateCount)) { 376 me.updateCount = updateCount; 377 numKeysUpdated++; 378 } 379 } 380 me.clearedCount = updateCount; 381 } else { // Key is not in selected set yet 382 if (me.clearedCount != updateCount) { 383 sk.channel.translateAndSetReadyOps(rOps, sk); 384 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { 385 selectedKeys.add(sk); 386 me.updateCount = updateCount; 387 numKeysUpdated++; 388 } 389 } else { // The readyOps have been set; now add 390 sk.channel.translateAndUpdateReadyOps(rOps, sk); 391 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { 392 selectedKeys.add(sk); 393 me.updateCount = updateCount; 394 numKeysUpdated++; 395 } 396 } 397 me.clearedCount = updateCount; 398 } 399 } 400 return numKeysUpdated; 401 } 402 } 403 404 // Represents a helper thread used for select. 405 private final class SelectThread extends Thread { 406 private final int index; // index of this thread 407 final SubSelector subSelector; 408 private long lastRun = 0; // last run number 409 private volatile boolean zombie; 410 // Creates a new thread 411 private SelectThread(int i) { 412 super(null, null, "SelectorHelper", 0, false); 413 this.index = i; 414 this.subSelector = new SubSelector(i); 415 //make sure we wait for next round of poll 416 this.lastRun = startLock.runsCounter; 417 } 418 void makeZombie() { 419 zombie = true; 420 } 421 boolean isZombie() { 422 return zombie; 423 } 424 public void run() { 425 while (true) { // poll loop 426 // wait for the start of poll. If this thread has become 427 // redundant, then exit. 428 if (startLock.waitForStart(this)) 429 return; 430 // call poll() 431 try { 432 subSelector.poll(index); 433 } catch (IOException e) { 434 // Save this exception and let other threads finish. 435 finishLock.setException(e); 436 } 437 // notify main thread, that this thread has finished, and 438 // wakeup others, if this thread is the first to finish. 439 finishLock.threadFinished(); 440 } 441 } 442 } 443 444 // After some channels registered/deregistered, the number of required 445 // helper threads may have changed. Adjust this number. 446 private void adjustThreadsCount() { 447 if (threadsCount > threads.size()) { 448 // More threads needed. Start more threads. 449 for (int i = threads.size(); i < threadsCount; i++) { 450 SelectThread newThread = new SelectThread(i); 451 threads.add(newThread); 452 newThread.setDaemon(true); 453 newThread.start(); 454 } 455 } else if (threadsCount < threads.size()) { 456 // Some threads become redundant. Remove them from the threads List. 457 for (int i = threads.size() - 1 ; i >= threadsCount; i--) 458 threads.remove(i).makeZombie(); 459 } 460 } 461 462 // Sets Windows wakeup socket to a signaled state. 463 private void setWakeupSocket() { 464 setWakeupSocket0(wakeupSinkFd); 465 } 466 private native void setWakeupSocket0(int wakeupSinkFd); 467 468 // Sets Windows wakeup socket to a non-signaled state. 469 private void resetWakeupSocket() { 470 synchronized (interruptLock) { 471 if (interruptTriggered == false) 472 return; 473 resetWakeupSocket0(wakeupSourceFd); 474 interruptTriggered = false; 475 } 476 } 477 478 private native void resetWakeupSocket0(int wakeupSourceFd); 479 480 private native boolean discardUrgentData(int fd); 481 482 // We increment this counter on each call to updateSelectedKeys() 483 // each entry in SubSelector.fdsMap has a memorized value of 484 // updateCount. When we increment numKeysUpdated we set updateCount 485 // for the corresponding entry to its current value. This is used to 486 // avoid counting the same key more than once - the same key can 487 // appear in readfds and writefds. 488 private long updateCount = 0; 489 490 // Update ops of the corresponding Channels. Add the ready keys to the 491 // ready queue. 492 private int updateSelectedKeys() { 493 updateCount++; 494 int numKeysUpdated = 0; 495 numKeysUpdated += subSelector.processSelectedKeys(updateCount); 496 for (SelectThread t: threads) { 497 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount); 498 } 499 return numKeysUpdated; 500 } 501 502 protected void implClose() throws IOException { 503 synchronized (closeLock) { 504 if (channelArray != null) { 505 if (pollWrapper != null) { 506 // prevent further wakeup 507 synchronized (interruptLock) { 508 interruptTriggered = true; 509 } 510 wakeupPipe.sink().close(); 511 wakeupPipe.source().close(); 512 for(int i = 1; i < totalChannels; i++) { // Deregister channels 513 if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent 514 deregister(channelArray[i]); 515 SelectableChannel selch = channelArray[i].channel(); 516 if (!selch.isOpen() && !selch.isRegistered()) 517 ((SelChImpl)selch).kill(); 518 } 519 } 520 pollWrapper.free(); 521 pollWrapper = null; 522 selectedKeys.clear(); 523 channelArray = null; 524 // Make all remaining helper threads exit 525 for (SelectThread t: threads) 526 t.makeZombie(); 527 startLock.startThreads(); 528 } 529 } 530 } 531 } 532 533 protected void implRegister(SelectionKeyImpl ski) { 534 synchronized (closeLock) { 535 if (pollWrapper == null) 536 throw new ClosedSelectorException(); 537 growIfNeeded(); 538 channelArray[totalChannels] = ski; 539 ski.setIndex(totalChannels); 540 fdMap.put(ski); 541 keys.add(ski); 542 pollWrapper.addEntry(totalChannels, ski); 543 totalChannels++; 544 } 545 } 546 547 private void growIfNeeded() { 548 if (channelArray.length == totalChannels) { 549 int newSize = totalChannels * 2; // Make a larger array 550 SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; 551 System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); 552 channelArray = temp; 553 pollWrapper.grow(newSize); 554 } 555 if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed 556 pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); 557 totalChannels++; 558 threadsCount++; 559 } 560 } 561 562 protected void implDereg(SelectionKeyImpl ski) throws IOException{ 563 int i = ski.getIndex(); 564 assert (i >= 0); 565 synchronized (closeLock) { 566 if (i != totalChannels - 1) { 567 // Copy end one over it 568 SelectionKeyImpl endChannel = channelArray[totalChannels-1]; 569 channelArray[i] = endChannel; 570 endChannel.setIndex(i); 571 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, 572 pollWrapper, i); 573 } 574 ski.setIndex(-1); 575 } 576 channelArray[totalChannels - 1] = null; 577 totalChannels--; 578 if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { 579 totalChannels--; 580 threadsCount--; // The last thread has become redundant. 581 } 582 fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys 583 keys.remove(ski); 584 selectedKeys.remove(ski); 585 deregister(ski); 586 SelectableChannel selch = ski.channel(); 587 if (!selch.isOpen() && !selch.isRegistered()) 588 ((SelChImpl)selch).kill(); 589 } 590 591 public void putEventOps(SelectionKeyImpl sk, int ops) { 592 synchronized (closeLock) { 593 if (pollWrapper == null) 594 throw new ClosedSelectorException(); 595 // make sure this sk has not been removed yet 596 int index = sk.getIndex(); 597 if (index == -1) 598 throw new CancelledKeyException(); 599 pollWrapper.putEventOps(index, ops); 600 } 601 } 602 603 public Selector wakeup() { 604 synchronized (interruptLock) { 605 if (!interruptTriggered) { 606 setWakeupSocket(); 607 interruptTriggered = true; 608 } 609 } 610 return this; 611 } 612 613 static { 614 IOUtil.load(); 615 } 616 }