25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.nio.channels.ClosedSelectorException;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.nio.channels.spi.SelectorProvider;
32 import java.util.ArrayDeque;
33 import java.util.ArrayList;
34 import java.util.Deque;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Consumer;
38
39 import jdk.internal.misc.Unsafe;
40
41 /**
42 * Selector implementation based on poll
43 */
44
45 class PollSelectorImpl extends SelectorImpl {
46
47 // initial capacity of poll array
48 private static final int INITIAL_CAPACITY = 16;
49
50 // poll array, grows as needed
51 private int pollArrayCapacity = INITIAL_CAPACITY;
52 private int pollArraySize;
53 private AllocatedNativeObject pollArray;
54
55 // file descriptors used for interrupt
56 private final int fd0;
57 private final int fd1;
58
59 // keys for file descriptors in poll array, synchronize on selector
60 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
61
62 // pending updates, queued by putEventOps
63 private final Object updateLock = new Object();
64 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
65
66 // interrupt triggering and clearing
67 private final Object interruptLock = new Object();
68 private boolean interruptTriggered;
69
70 PollSelectorImpl(SelectorProvider sp) throws IOException {
71 super(sp);
72
73 int size = pollArrayCapacity * SIZE_POLLFD;
74 this.pollArray = new AllocatedNativeObject(size, false);
75
76 try {
77 long fds = IOUtil.makePipe(false);
78 this.fd0 = (int) (fds >>> 32);
79 this.fd1 = (int) fds;
80 } catch (IOException ioe) {
81 pollArray.free();
82 throw ioe;
83 }
84
85 // wakeup support
86 synchronized (this) {
87 setFirst(fd0, Net.POLLIN);
88 }
89 }
90
116 // timed poll interrupted so need to adjust timeout
117 long adjust = System.nanoTime() - startTime;
118 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
119 if (to <= 0) {
120 // timeout expired so no retry
121 numPolled = 0;
122 }
123 }
124 } while (numPolled == IOStatus.INTERRUPTED);
125 assert numPolled <= pollArraySize;
126
127 } finally {
128 end(blocking);
129 }
130
131 processDeregisterQueue();
132 return processEvents(action);
133 }
134
135 /**
136 * Process changes to the interest ops.
137 */
138 private void processUpdateQueue() {
139 assert Thread.holdsLock(this);
140
141 synchronized (updateLock) {
142 SelectionKeyImpl ski;
143 while ((ski = updateKeys.pollFirst()) != null) {
144 int newEvents = ski.translateInterestOps();
145 if (ski.isValid()) {
146 int index = ski.getIndex();
147 assert index >= 0 && index < pollArraySize;
148 if (index > 0) {
149 assert pollKeys.get(index) == ski;
150 if (newEvents == 0) {
151 remove(ski);
152 } else {
153 update(ski, newEvents);
154 }
155 } else if (newEvents != 0) {
360 private void putEventOps(int i, int event) {
361 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
362 pollArray.putShort(offset, (short)event);
363 }
364
365 private int getEventOps(int i) {
366 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
367 return pollArray.getShort(offset);
368 }
369
370 private void putReventOps(int i, int revent) {
371 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
372 pollArray.putShort(offset, (short)revent);
373 }
374
375 private int getReventOps(int i) {
376 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
377 return pollArray.getShort(offset);
378 }
379
380 private static native int poll(long pollAddress, int numfds, int timeout);
381
382 static {
383 IOUtil.load();
384 }
385 }
|
25 package sun.nio.ch;
26
27 import java.io.IOException;
28 import java.nio.channels.ClosedSelectorException;
29 import java.nio.channels.SelectionKey;
30 import java.nio.channels.Selector;
31 import java.nio.channels.spi.SelectorProvider;
32 import java.util.ArrayDeque;
33 import java.util.ArrayList;
34 import java.util.Deque;
35 import java.util.List;
36 import java.util.concurrent.TimeUnit;
37 import java.util.function.Consumer;
38
39 import jdk.internal.misc.Unsafe;
40
41 /**
42 * Selector implementation based on poll
43 */
44
45 public class PollSelectorImpl extends SelectorImpl {
46
47 // initial capacity of poll array
48 private static final int INITIAL_CAPACITY = 16;
49
50 // poll array, grows as needed
51 private int pollArrayCapacity = INITIAL_CAPACITY;
52 private int pollArraySize;
53 private AllocatedNativeObject pollArray;
54
55 // file descriptors used for interrupt
56 private final int fd0;
57 private final int fd1;
58
59 // keys for file descriptors in poll array, synchronize on selector
60 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
61
62 // pending updates, queued by putEventOps
63 private final Object updateLock = new Object();
64 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
65
66 // interrupt triggering and clearing
67 private final Object interruptLock = new Object();
68 private boolean interruptTriggered;
69
70 protected PollSelectorImpl(SelectorProvider sp) throws IOException {
71 super(sp);
72
73 int size = pollArrayCapacity * SIZE_POLLFD;
74 this.pollArray = new AllocatedNativeObject(size, false);
75
76 try {
77 long fds = IOUtil.makePipe(false);
78 this.fd0 = (int) (fds >>> 32);
79 this.fd1 = (int) fds;
80 } catch (IOException ioe) {
81 pollArray.free();
82 throw ioe;
83 }
84
85 // wakeup support
86 synchronized (this) {
87 setFirst(fd0, Net.POLLIN);
88 }
89 }
90
116 // timed poll interrupted so need to adjust timeout
117 long adjust = System.nanoTime() - startTime;
118 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
119 if (to <= 0) {
120 // timeout expired so no retry
121 numPolled = 0;
122 }
123 }
124 } while (numPolled == IOStatus.INTERRUPTED);
125 assert numPolled <= pollArraySize;
126
127 } finally {
128 end(blocking);
129 }
130
131 processDeregisterQueue();
132 return processEvents(action);
133 }
134
135 /**
136 * Protected poll method allows different platform-specific
137 * native implementations
138 */
139 protected int poll(long pollAddress, int numfds, int timeout) {
140 return poll0(pollAddress, numfds, timeout);
141 }
142
143 /**
144 * Process changes to the interest ops.
145 */
146 private void processUpdateQueue() {
147 assert Thread.holdsLock(this);
148
149 synchronized (updateLock) {
150 SelectionKeyImpl ski;
151 while ((ski = updateKeys.pollFirst()) != null) {
152 int newEvents = ski.translateInterestOps();
153 if (ski.isValid()) {
154 int index = ski.getIndex();
155 assert index >= 0 && index < pollArraySize;
156 if (index > 0) {
157 assert pollKeys.get(index) == ski;
158 if (newEvents == 0) {
159 remove(ski);
160 } else {
161 update(ski, newEvents);
162 }
163 } else if (newEvents != 0) {
368 private void putEventOps(int i, int event) {
369 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
370 pollArray.putShort(offset, (short)event);
371 }
372
373 private int getEventOps(int i) {
374 int offset = SIZE_POLLFD * i + EVENT_OFFSET;
375 return pollArray.getShort(offset);
376 }
377
378 private void putReventOps(int i, int revent) {
379 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
380 pollArray.putShort(offset, (short)revent);
381 }
382
383 private int getReventOps(int i) {
384 int offset = SIZE_POLLFD * i + REVENT_OFFSET;
385 return pollArray.getShort(offset);
386 }
387
388 private static native int poll0(long pollAddress, int numfds, int timeout);
389
390 static {
391 IOUtil.load();
392 }
393 }
|