23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.nio.channels.ClosedSelectorException;
29 import java.nio.channels.Selector;
30 import java.nio.channels.spi.SelectorProvider;
31 import java.util.ArrayDeque;
32 import java.util.ArrayList;
33 import java.util.Deque;
34 import java.util.List;
35 import java.util.concurrent.TimeUnit;
36
37 import jdk.internal.misc.Unsafe;
38
39 /**
40 * Selector implementation based on poll
41 */
42
43 class PollSelectorImpl extends SelectorImpl {
44
45 // initial capacity of poll array
46 private static final int INITIAL_CAPACITY = 16;
47
48 // poll array, grows as needed
49 private int pollArrayCapacity = INITIAL_CAPACITY;
50 private int pollArraySize;
51 private AllocatedNativeObject pollArray;
52
53 // file descriptors used for interrupt
54 private final int fd0;
55 private final int fd1;
56
57 // keys for file descriptors in poll array, synchronize on selector
58 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
59
60 // pending updates, queued by putEventOps
61 private final Object updateLock = new Object();
62 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
63
64 // interrupt triggering and clearing
65 private final Object interruptLock = new Object();
66 private boolean interruptTriggered;
67
68 PollSelectorImpl(SelectorProvider sp) throws IOException {
69 super(sp);
70
71 int size = pollArrayCapacity * SIZE_POLLFD;
72 this.pollArray = new AllocatedNativeObject(size, false);
73
74 try {
75 long fds = IOUtil.makePipe(false);
76 this.fd0 = (int) (fds >>> 32);
77 this.fd1 = (int) fds;
78 } catch (IOException ioe) {
79 pollArray.free();
80 throw ioe;
81 }
82
83 // wakeup support
84 synchronized (this) {
85 setFirst(fd0, Net.POLLIN);
86 }
87 }
88
111 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
112 // timed poll interrupted so need to adjust timeout
113 long adjust = System.nanoTime() - startTime;
114 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
115 if (to <= 0) {
116 // timeout expired so no retry
117 numPolled = 0;
118 }
119 }
120 } while (numPolled == IOStatus.INTERRUPTED);
121 assert numPolled <= pollArraySize;
122
123 } finally {
124 end(blocking);
125 }
126
127 processDeregisterQueue();
128 return updateSelectedKeys();
129 }
130
131 /**
132 * Process changes to the interest ops.
133 */
134 private void processUpdateQueue() {
135 assert Thread.holdsLock(this);
136
137 synchronized (updateLock) {
138 SelectionKeyImpl ski;
139 while ((ski = updateKeys.pollFirst()) != null) {
140 int newEvents = ski.translateInterestOps();
141 if (ski.isValid()) {
142 int index = ski.getIndex();
143 assert index >= 0 && index < pollArraySize;
144 if (index > 0) {
145 assert pollKeys.get(index) == ski;
146 if (newEvents == 0) {
147 remove(ski);
148 } else {
149 update(ski, newEvents);
150 }
366 private void putEventOps(int i, int event) {
367 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
368 pollArray.putShort(offset, (short)event);
369 }
370
371 private int getEventOps(int i) {
372 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
373 return pollArray.getShort(offset);
374 }
375
376 private void putReventOps(int i, int revent) {
377 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
378 pollArray.putShort(offset, (short)revent);
379 }
380
381 private int getReventOps(int i) {
382 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
383 return pollArray.getShort(offset);
384 }
385
386 private static native int poll(long pollAddress, int numfds, int timeout);
387
388 static {
389 IOUtil.load();
390 }
391 }
|
23 * questions.
24 */
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.nio.channels.ClosedSelectorException;
29 import java.nio.channels.Selector;
30 import java.nio.channels.spi.SelectorProvider;
31 import java.util.ArrayDeque;
32 import java.util.ArrayList;
33 import java.util.Deque;
34 import java.util.List;
35 import java.util.concurrent.TimeUnit;
36
37 import jdk.internal.misc.Unsafe;
38
39 /**
40 * Selector implementation based on poll
41 */
42
43 public class PollSelectorImpl extends SelectorImpl {
44
45 // initial capacity of poll array
46 private static final int INITIAL_CAPACITY = 16;
47
48 // poll array, grows as needed
49 private int pollArrayCapacity = INITIAL_CAPACITY;
50 private int pollArraySize;
51 private AllocatedNativeObject pollArray;
52
53 // file descriptors used for interrupt
54 private final int fd0;
55 private final int fd1;
56
57 // keys for file descriptors in poll array, synchronize on selector
58 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
59
60 // pending updates, queued by putEventOps
61 private final Object updateLock = new Object();
62 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
63
64 // interrupt triggering and clearing
65 private final Object interruptLock = new Object();
66 private boolean interruptTriggered;
67
68 protected PollSelectorImpl(SelectorProvider sp) throws IOException {
69 super(sp);
70
71 int size = pollArrayCapacity * SIZE_POLLFD;
72 this.pollArray = new AllocatedNativeObject(size, false);
73
74 try {
75 long fds = IOUtil.makePipe(false);
76 this.fd0 = (int) (fds >>> 32);
77 this.fd1 = (int) fds;
78 } catch (IOException ioe) {
79 pollArray.free();
80 throw ioe;
81 }
82
83 // wakeup support
84 synchronized (this) {
85 setFirst(fd0, Net.POLLIN);
86 }
87 }
88
111 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
112 // timed poll interrupted so need to adjust timeout
113 long adjust = System.nanoTime() - startTime;
114 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
115 if (to <= 0) {
116 // timeout expired so no retry
117 numPolled = 0;
118 }
119 }
120 } while (numPolled == IOStatus.INTERRUPTED);
121 assert numPolled <= pollArraySize;
122
123 } finally {
124 end(blocking);
125 }
126
127 processDeregisterQueue();
128 return updateSelectedKeys();
129 }
130
131 protected int poll(long pollAddress, int numfds, int timeout) {
132 return poll0(pollAddress, numfds, timeout);
133 }
134
135 /**
136 * Process changes to the interest ops.
137 */
138 private void processUpdateQueue() {
139 assert Thread.holdsLock(this);
140
141 synchronized (updateLock) {
142 SelectionKeyImpl ski;
143 while ((ski = updateKeys.pollFirst()) != null) {
144 int newEvents = ski.translateInterestOps();
145 if (ski.isValid()) {
146 int index = ski.getIndex();
147 assert index >= 0 && index < pollArraySize;
148 if (index > 0) {
149 assert pollKeys.get(index) == ski;
150 if (newEvents == 0) {
151 remove(ski);
152 } else {
153 update(ski, newEvents);
154 }
370 private void putEventOps(int i, int event) {
371 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
372 pollArray.putShort(offset, (short)event);
373 }
374
375 private int getEventOps(int i) {
376 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
377 return pollArray.getShort(offset);
378 }
379
380 private void putReventOps(int i, int revent) {
381 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
382 pollArray.putShort(offset, (short)revent);
383 }
384
385 private int getReventOps(int i) {
386 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
387 return pollArray.getShort(offset);
388 }
389
390 private static native int poll0(long pollAddress, int numfds, int timeout);
391
392 static {
393 IOUtil.load();
394 }
395 }
|