17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
22 * CA 95054 USA or visit www.sun.com if you need additional information or
23 * have any questions.
24 */
25
26 /*
27 */
28
29
30 package sun.nio.ch;
31
32 import java.nio.channels.spi.SelectorProvider;
33 import java.nio.channels.Selector;
34 import java.nio.channels.ClosedSelectorException;
35 import java.nio.channels.Pipe;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38 import java.io.IOException;
39 import java.util.List;
40 import java.util.ArrayList;
41 import java.util.HashMap;
42 import java.util.Iterator;
43
44 /**
45 * A multi-threaded implementation of Selector for Windows.
46 *
47 * @author Konstantin Kladko
48 * @author Mark Reinhold
49 */
50
51 final class WindowsSelectorImpl extends SelectorImpl {
52 // Initial capacity of the poll array
53 private final int INIT_CAP = 8;
54 // Maximum number of sockets for select().
55 // Should be INIT_CAP times a power of 2
56 private final static int MAX_SELECTABLE_FDS = 1024;
57
58 // The list of SelectableChannels serviced by this Selector. Every mod
59 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
60 // array, where the corresponding entry is occupied by the wakeupSocket
61 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
62
63 // The global native poll array holds file decriptors and event masks
64 private PollArrayWrapper pollWrapper;
65
66 // The number of valid entries in poll array, including entries occupied
67 // by wakeup socket handle.
68 private int totalChannels = 1;
69
70 // Number of helper threads needed for select. We need one thread per
71 // each additional set of MAX_SELECTABLE_FDS - 1 channels.
72 private int threadsCount = 0;
73
74 // A list of helper threads for select.
75 private final List<Thread> threads = new ArrayList<Thread>();
76
77 //Pipe used as a wakeup object.
78 private final Pipe wakeupPipe;
79
80 // File descriptors corresponding to source and sink
81 private final int wakeupSourceFd, wakeupSinkFd;
82
83 // Lock for close cleanup
84 private Object closeLock = new Object();
85
86 // Maps file descriptors to their indices in pollArray
87 private final static class FdMap extends HashMap<Integer, MapEntry> {
88 static final long serialVersionUID = 0L;
89 private MapEntry get(int desc) {
90 return get(new Integer(desc));
91 }
92 private MapEntry put(SelectionKeyImpl ski) {
93 return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
94 }
95 private MapEntry remove(SelectionKeyImpl ski) {
184 // trigger another round of poll.
185 private long runsCounter;
186 // Triggers threads, waiting on this lock to start polling.
187 private synchronized void startThreads() {
188 runsCounter++; // next run
189 notifyAll(); // wake up threads.
190 }
191 // This function is called by a helper thread to wait for the
192 // next round of poll(). It also checks, if this thread became
193 // redundant. If yes, it returns true, notifying the thread
194 // that it should exit.
195 private synchronized boolean waitForStart(SelectThread thread) {
196 while (true) {
197 while (runsCounter == thread.lastRun) {
198 try {
199 startLock.wait();
200 } catch (InterruptedException e) {
201 Thread.currentThread().interrupt();
202 }
203 }
204 if (thread.index >= threads.size()) { // redundant thread
205 return true; // will cause run() to exit.
206 } else {
207 thread.lastRun = runsCounter; // update lastRun
208 return false; // will cause run() to poll.
209 }
210 }
211 }
212 }
213
214 // Main thread waits on this lock, until all helper threads are done
215 // with poll().
216 private final FinishLock finishLock = new FinishLock();
217
218 private final class FinishLock {
219 // Number of helper threads, that did not finish yet.
220 private int threadsToFinish;
221
222 // IOException which occured during the last run.
223 IOException exception = null;
224
371 me.updateCount = updateCount;
372 numKeysUpdated++;
373 }
374 } else { // The readyOps have been set; now add
375 sk.channel.translateAndUpdateReadyOps(rOps, sk);
376 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
377 selectedKeys.add(sk);
378 me.updateCount = updateCount;
379 numKeysUpdated++;
380 }
381 }
382 me.clearedCount = updateCount;
383 }
384 }
385 return numKeysUpdated;
386 }
387 }
388
389 // Represents a helper thread used for select.
390 private final class SelectThread extends Thread {
391 private int index; // index of this thread
392 SubSelector subSelector;
393 private long lastRun = 0; // last run number
394 // Creates a new thread
395 private SelectThread(int i) {
396 this.index = i;
397 this.subSelector = new SubSelector(i);
398 //make sure we wait for next round of poll
399 this.lastRun = startLock.runsCounter;
400 }
401 public void run() {
402 while (true) { // poll loop
403 // wait for the start of poll. If this thread has become
404 // redundant, then exit.
405 if (startLock.waitForStart(this))
406 return;
407 // call poll()
408 try {
409 subSelector.poll(index);
410 } catch (IOException e) {
411 // Save this exception and let other threads finish.
412 finishLock.setException(e);
413 }
414 // notify main thread, that this thread has finished, and
415 // wakeup others, if this thread is the first to finish.
416 finishLock.threadFinished();
417 }
418 }
419 }
420
421 // After some channels registered/deregistered, the number of required
422 // helper threads may have changed. Adjust this number.
423 private void adjustThreadsCount() {
424 if (threadsCount > threads.size()) {
425 // More threads needed. Start more threads.
426 for (int i = threads.size(); i < threadsCount; i++) {
427 SelectThread newThread = new SelectThread(i);
428 threads.add(newThread);
429 newThread.setDaemon(true);
430 newThread.start();
431 }
432 } else if (threadsCount < threads.size()) {
433 // Some threads become redundant. Remove them from the threads List.
434 for (int i = threads.size() - 1 ; i >= threadsCount; i--)
435 threads.remove(i);
436 }
437 }
438
439 // Sets Windows wakeup socket to a signaled state.
440 private void setWakeupSocket() {
441 setWakeupSocket0(wakeupSinkFd);
442 }
443 private native void setWakeupSocket0(int wakeupSinkFd);
444
445 // Sets Windows wakeup socket to a non-signaled state.
446 private void resetWakeupSocket() {
447 synchronized (interruptLock) {
448 if (interruptTriggered == false)
449 return;
450 resetWakeupSocket0(wakeupSourceFd);
451 interruptTriggered = false;
452 }
453 }
454
455 private native void resetWakeupSocket0(int wakeupSourceFd);
456
457 // We increment this counter on each call to updateSelectedKeys()
458 // each entry in SubSelector.fdsMap has a memorized value of
459 // updateCount. When we increment numKeysUpdated we set updateCount
460 // for the corresponding entry to its current value. This is used to
461 // avoid counting the same key more than once - the same key can
462 // appear in readfds and writefds.
463 private long updateCount = 0;
464
465 // Update ops of the corresponding Channels. Add the ready keys to the
466 // ready queue.
467 private int updateSelectedKeys() {
468 updateCount++;
469 int numKeysUpdated = 0;
470 numKeysUpdated += subSelector.processSelectedKeys(updateCount);
471 Iterator it = threads.iterator();
472 while (it.hasNext())
473 numKeysUpdated += ((SelectThread)it.next()).subSelector.
474 processSelectedKeys(updateCount);
475 return numKeysUpdated;
476 }
477
478 protected void implClose() throws IOException {
479 synchronized (closeLock) {
480 if (channelArray != null) {
481 if (pollWrapper != null) {
482 // prevent further wakeup
483 synchronized (interruptLock) {
484 interruptTriggered = true;
485 }
486 wakeupPipe.sink().close();
487 wakeupPipe.source().close();
488 for(int i = 1; i < totalChannels; i++) { // Deregister channels
489 if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
490 deregister(channelArray[i]);
491 SelectableChannel selch = channelArray[i].channel();
492 if (!selch.isOpen() && !selch.isRegistered())
493 ((SelChImpl)selch).kill();
494 }
495 }
496 pollWrapper.free();
497 pollWrapper = null;
498 selectedKeys = null;
499 channelArray = null;
500 threads.clear();
501 // Call startThreads. All remaining helper threads now exit,
502 // since threads.size() = 0;
503 startLock.startThreads();
504 }
505 }
506 }
507 }
508
509 protected void implRegister(SelectionKeyImpl ski) {
510 synchronized (closeLock) {
511 if (pollWrapper == null)
512 throw new ClosedSelectorException();
513 growIfNeeded();
514 channelArray[totalChannels] = ski;
515 ski.setIndex(totalChannels);
516 fdMap.put(ski);
517 keys.add(ski);
518 pollWrapper.addEntry(totalChannels, ski);
519 totalChannels++;
520 }
521 }
522
|
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
22 * CA 95054 USA or visit www.sun.com if you need additional information or
23 * have any questions.
24 */
25
26 /*
27 */
28
29
30 package sun.nio.ch;
31
32 import java.nio.channels.spi.SelectorProvider;
33 import java.nio.channels.Selector;
34 import java.nio.channels.ClosedSelectorException;
35 import java.nio.channels.Pipe;
36 import java.nio.channels.SelectableChannel;
37 import java.io.IOException;
38 import java.util.List;
39 import java.util.ArrayList;
40 import java.util.HashMap;
41 import java.util.Iterator;
42
43 /**
44 * A multi-threaded implementation of Selector for Windows.
45 *
46 * @author Konstantin Kladko
47 * @author Mark Reinhold
48 */
49
50 final class WindowsSelectorImpl extends SelectorImpl {
51 // Initial capacity of the poll array
52 private final int INIT_CAP = 8;
53 // Maximum number of sockets for select().
54 // Should be INIT_CAP times a power of 2
55 private final static int MAX_SELECTABLE_FDS = 1024;
56
57 // The list of SelectableChannels serviced by this Selector. Every mod
58 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
59 // array, where the corresponding entry is occupied by the wakeupSocket
60 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
61
62 // The global native poll array holds file decriptors and event masks
63 private PollArrayWrapper pollWrapper;
64
65 // The number of valid entries in poll array, including entries occupied
66 // by wakeup socket handle.
67 private int totalChannels = 1;
68
69 // Number of helper threads needed for select. We need one thread per
70 // each additional set of MAX_SELECTABLE_FDS - 1 channels.
71 private int threadsCount = 0;
72
73 // A list of helper threads for select.
74 private final List<SelectThread> threads = new ArrayList<SelectThread>();
75
76 //Pipe used as a wakeup object.
77 private final Pipe wakeupPipe;
78
79 // File descriptors corresponding to source and sink
80 private final int wakeupSourceFd, wakeupSinkFd;
81
82 // Lock for close cleanup
83 private Object closeLock = new Object();
84
85 // Maps file descriptors to their indices in pollArray
86 private final static class FdMap extends HashMap<Integer, MapEntry> {
87 static final long serialVersionUID = 0L;
88 private MapEntry get(int desc) {
89 return get(new Integer(desc));
90 }
91 private MapEntry put(SelectionKeyImpl ski) {
92 return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
93 }
94 private MapEntry remove(SelectionKeyImpl ski) {
183 // trigger another round of poll.
184 private long runsCounter;
185 // Triggers threads, waiting on this lock to start polling.
186 private synchronized void startThreads() {
187 runsCounter++; // next run
188 notifyAll(); // wake up threads.
189 }
190 // This function is called by a helper thread to wait for the
191 // next round of poll(). It also checks, if this thread became
192 // redundant. If yes, it returns true, notifying the thread
193 // that it should exit.
194 private synchronized boolean waitForStart(SelectThread thread) {
195 while (true) {
196 while (runsCounter == thread.lastRun) {
197 try {
198 startLock.wait();
199 } catch (InterruptedException e) {
200 Thread.currentThread().interrupt();
201 }
202 }
203 if (thread.isZombie()) { // redundant thread
204 return true; // will cause run() to exit.
205 } else {
206 thread.lastRun = runsCounter; // update lastRun
207 return false; // will cause run() to poll.
208 }
209 }
210 }
211 }
212
213 // Main thread waits on this lock, until all helper threads are done
214 // with poll().
215 private final FinishLock finishLock = new FinishLock();
216
217 private final class FinishLock {
218 // Number of helper threads, that did not finish yet.
219 private int threadsToFinish;
220
221 // IOException which occured during the last run.
222 IOException exception = null;
223
370 me.updateCount = updateCount;
371 numKeysUpdated++;
372 }
373 } else { // The readyOps have been set; now add
374 sk.channel.translateAndUpdateReadyOps(rOps, sk);
375 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
376 selectedKeys.add(sk);
377 me.updateCount = updateCount;
378 numKeysUpdated++;
379 }
380 }
381 me.clearedCount = updateCount;
382 }
383 }
384 return numKeysUpdated;
385 }
386 }
387
388 // Represents a helper thread used for select.
389 private final class SelectThread extends Thread {
390 private final int index; // index of this thread
391 final SubSelector subSelector;
392 private long lastRun = 0; // last run number
393 private volatile boolean zombie;
394 // Creates a new thread
395 private SelectThread(int i) {
396 this.index = i;
397 this.subSelector = new SubSelector(i);
398 //make sure we wait for next round of poll
399 this.lastRun = startLock.runsCounter;
400 }
401 void makeZombie() {
402 zombie = true;
403 }
404 boolean isZombie() {
405 return zombie;
406 }
407 public void run() {
408 while (true) { // poll loop
409 // wait for the start of poll. If this thread has become
410 // redundant, then exit.
411 if (startLock.waitForStart(this))
412 return;
413 // call poll()
414 try {
415 subSelector.poll(index);
416 } catch (IOException e) {
417 // Save this exception and let other threads finish.
418 finishLock.setException(e);
419 }
420 // notify main thread, that this thread has finished, and
421 // wakeup others, if this thread is the first to finish.
422 finishLock.threadFinished();
423 }
424 }
425 }
426
427 // After some channels registered/deregistered, the number of required
428 // helper threads may have changed. Adjust this number.
429 private void adjustThreadsCount() {
430 if (threadsCount > threads.size()) {
431 // More threads needed. Start more threads.
432 for (int i = threads.size(); i < threadsCount; i++) {
433 SelectThread newThread = new SelectThread(i);
434 threads.add(newThread);
435 newThread.setDaemon(true);
436 newThread.start();
437 }
438 } else if (threadsCount < threads.size()) {
439 // Some threads become redundant. Remove them from the threads List.
440 for (int i = threads.size() - 1 ; i >= threadsCount; i--)
441 threads.remove(i).makeZombie();
442 }
443 }
444
445 // Sets Windows wakeup socket to a signaled state.
446 private void setWakeupSocket() {
447 setWakeupSocket0(wakeupSinkFd);
448 }
449 private native void setWakeupSocket0(int wakeupSinkFd);
450
451 // Sets Windows wakeup socket to a non-signaled state.
452 private void resetWakeupSocket() {
453 synchronized (interruptLock) {
454 if (interruptTriggered == false)
455 return;
456 resetWakeupSocket0(wakeupSourceFd);
457 interruptTriggered = false;
458 }
459 }
460
461 private native void resetWakeupSocket0(int wakeupSourceFd);
462
463 // We increment this counter on each call to updateSelectedKeys()
464 // each entry in SubSelector.fdsMap has a memorized value of
465 // updateCount. When we increment numKeysUpdated we set updateCount
466 // for the corresponding entry to its current value. This is used to
467 // avoid counting the same key more than once - the same key can
468 // appear in readfds and writefds.
469 private long updateCount = 0;
470
471 // Update ops of the corresponding Channels. Add the ready keys to the
472 // ready queue.
473 private int updateSelectedKeys() {
474 updateCount++;
475 int numKeysUpdated = 0;
476 numKeysUpdated += subSelector.processSelectedKeys(updateCount);
477 for (SelectThread t: threads) {
478 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
479 }
480 return numKeysUpdated;
481 }
482
483 protected void implClose() throws IOException {
484 synchronized (closeLock) {
485 if (channelArray != null) {
486 if (pollWrapper != null) {
487 // prevent further wakeup
488 synchronized (interruptLock) {
489 interruptTriggered = true;
490 }
491 wakeupPipe.sink().close();
492 wakeupPipe.source().close();
493 for(int i = 1; i < totalChannels; i++) { // Deregister channels
494 if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
495 deregister(channelArray[i]);
496 SelectableChannel selch = channelArray[i].channel();
497 if (!selch.isOpen() && !selch.isRegistered())
498 ((SelChImpl)selch).kill();
499 }
500 }
501 pollWrapper.free();
502 pollWrapper = null;
503 selectedKeys = null;
504 channelArray = null;
505 // Make all remaining helper threads exit
506 for (SelectThread t: threads)
507 t.makeZombie();
508 startLock.startThreads();
509 }
510 }
511 }
512 }
513
514 protected void implRegister(SelectionKeyImpl ski) {
515 synchronized (closeLock) {
516 if (pollWrapper == null)
517 throw new ClosedSelectorException();
518 growIfNeeded();
519 channelArray[totalChannels] = ski;
520 ski.setIndex(totalChannels);
521 fdMap.put(ski);
522 keys.add(ski);
523 pollWrapper.addEntry(totalChannels, ski);
524 totalChannels++;
525 }
526 }
527
|