1 /* 2 * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.nio.channels.ClosedSelectorException; 29 import java.nio.channels.SelectionKey; 30 import java.nio.channels.Selector; 31 import java.nio.channels.spi.SelectorProvider; 32 import java.util.ArrayDeque; 33 import java.util.ArrayList; 34 import java.util.Deque; 35 import java.util.List; 36 import java.util.concurrent.TimeUnit; 37 import java.util.function.Consumer; 38 39 import jdk.internal.misc.Unsafe; 40 41 /** 42 * Selector implementation based on poll 43 */ 44 45 class PollSelectorImpl extends SelectorImpl { 46 47 // initial capacity of poll array 48 private static final int INITIAL_CAPACITY = 16; 49 50 // poll array, grows as needed 51 private int pollArrayCapacity = INITIAL_CAPACITY; 52 private int pollArraySize; 53 private AllocatedNativeObject pollArray; 54 55 // file descriptors used for interrupt 56 private final int fd0; 57 private final int fd1; 58 59 // keys for file descriptors in poll array, synchronize on selector 60 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>(); 61 62 // pending updates, queued by putEventOps 63 private final Object updateLock = new Object(); 64 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 65 66 // interrupt triggering and clearing 67 private final Object interruptLock = new Object(); 68 private boolean interruptTriggered; 69 70 PollSelectorImpl(SelectorProvider sp) throws IOException { 71 super(sp); 72 73 int size = pollArrayCapacity * SIZE_POLLFD; 74 this.pollArray = new AllocatedNativeObject(size, false); 75 76 try { 77 long fds = IOUtil.makePipe(false); 78 this.fd0 = (int) (fds >>> 32); 79 this.fd1 = (int) fds; 80 } catch (IOException ioe) { 81 pollArray.free(); 82 throw ioe; 83 } 84 85 // wakeup support 86 synchronized (this) { 87 setFirst(fd0, Net.POLLIN); 88 } 89 } 90 91 private void ensureOpen() { 92 if (!isOpen()) 93 throw new ClosedSelectorException(); 94 } 95 96 @Override 97 protected int doSelect(Consumer<SelectionKey> action, long timeout) 98 throws IOException 99 { 100 assert Thread.holdsLock(this); 101 102 int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout 103 boolean blocking = (to != 0); 104 boolean timedPoll = (to > 0); 105 106 processUpdateQueue(); 107 processDeregisterQueue(); 108 try { 109 begin(blocking); 110 111 int numPolled; 112 do { 113 long startTime = timedPoll ? System.nanoTime() : 0; 114 numPolled = poll(pollArray.address(), pollArraySize, to); 115 if (numPolled == IOStatus.INTERRUPTED && timedPoll) { 116 // timed poll interrupted so need to adjust timeout 117 long adjust = System.nanoTime() - startTime; 118 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 119 if (to <= 0) { 120 // timeout expired so no retry 121 numPolled = 0; 122 } 123 } 124 } while (numPolled == IOStatus.INTERRUPTED); 125 assert numPolled <= pollArraySize; 126 127 } finally { 128 end(blocking); 129 } 130 131 processDeregisterQueue(); 132 return processEvents(action); 133 } 134 135 /** 136 * Process changes to the interest ops. 137 */ 138 private void processUpdateQueue() { 139 assert Thread.holdsLock(this); 140 141 synchronized (updateLock) { 142 SelectionKeyImpl ski; 143 while ((ski = updateKeys.pollFirst()) != null) { 144 int newEvents = ski.translateInterestOps(); 145 if (ski.isValid()) { 146 int index = ski.getIndex(); 147 assert index >= 0 && index < pollArraySize; 148 if (index > 0) { 149 assert pollKeys.get(index) == ski; 150 if (newEvents == 0) { 151 remove(ski); 152 } else { 153 update(ski, newEvents); 154 } 155 } else if (newEvents != 0) { 156 add(ski, newEvents); 157 } 158 } 159 } 160 } 161 } 162 163 /** 164 * Process the polled events. 165 * If the interrupt fd has been selected, drain it and clear the interrupt. 166 */ 167 private int processEvents(Consumer<SelectionKey> action) 168 throws IOException 169 { 170 assert Thread.holdsLock(this); 171 assert pollArraySize > 0 && pollArraySize == pollKeys.size(); 172 173 int numKeysUpdated = 0; 174 for (int i = 1; i < pollArraySize; i++) { 175 int rOps = getReventOps(i); 176 if (rOps != 0) { 177 SelectionKeyImpl ski = pollKeys.get(i); 178 assert ski.getFDVal() == getDescriptor(i); 179 if (ski.isValid()) { 180 numKeysUpdated += processReadyEvents(rOps, ski, action); 181 } 182 } 183 } 184 185 // check for interrupt 186 if (getReventOps(0) != 0) { 187 assert getDescriptor(0) == fd0; 188 clearInterrupt(); 189 } 190 191 return numKeysUpdated; 192 } 193 194 @Override 195 protected void implClose() throws IOException { 196 assert !isOpen(); 197 assert Thread.holdsLock(this); 198 199 // prevent further wakeup 200 synchronized (interruptLock) { 201 interruptTriggered = true; 202 } 203 204 pollArray.free(); 205 FileDispatcherImpl.closeIntFD(fd0); 206 FileDispatcherImpl.closeIntFD(fd1); 207 } 208 209 @Override 210 protected void implRegister(SelectionKeyImpl ski) { 211 assert ski.getIndex() == 0; 212 ensureOpen(); 213 } 214 215 @Override 216 protected void implDereg(SelectionKeyImpl ski) throws IOException { 217 assert !ski.isValid(); 218 assert Thread.holdsLock(this); 219 220 // remove from poll array 221 int index = ski.getIndex(); 222 if (index > 0) { 223 remove(ski); 224 } 225 } 226 227 @Override 228 public void setEventOps(SelectionKeyImpl ski) { 229 ensureOpen(); 230 synchronized (updateLock) { 231 updateKeys.addLast(ski); 232 } 233 } 234 235 @Override 236 public Selector wakeup() { 237 synchronized (interruptLock) { 238 if (!interruptTriggered) { 239 try { 240 IOUtil.write1(fd1, (byte)0); 241 } catch (IOException ioe) { 242 throw new InternalError(ioe); 243 } 244 interruptTriggered = true; 245 } 246 } 247 return this; 248 } 249 250 private void clearInterrupt() throws IOException { 251 synchronized (interruptLock) { 252 IOUtil.drain(fd0); 253 interruptTriggered = false; 254 } 255 } 256 257 /** 258 * Sets the first pollfd enty in the poll array to the given fd 259 */ 260 private void setFirst(int fd, int ops) { 261 assert pollArraySize == 0; 262 assert pollKeys.isEmpty(); 263 264 putDescriptor(0, fd); 265 putEventOps(0, ops); 266 pollArraySize = 1; 267 268 pollKeys.add(null); // dummy element 269 } 270 271 /** 272 * Adds a pollfd entry to the poll array, expanding the poll array if needed. 273 */ 274 private void add(SelectionKeyImpl ski, int ops) { 275 expandIfNeeded(); 276 277 int index = pollArraySize; 278 assert index > 0; 279 putDescriptor(index, ski.getFDVal()); 280 putEventOps(index, ops); 281 putReventOps(index, 0); 282 ski.setIndex(index); 283 pollArraySize++; 284 285 pollKeys.add(ski); 286 assert pollKeys.size() == pollArraySize; 287 } 288 289 /** 290 * Update the events of pollfd entry. 291 */ 292 private void update(SelectionKeyImpl ski, int ops) { 293 int index = ski.getIndex(); 294 assert index > 0 && index < pollArraySize; 295 assert getDescriptor(index) == ski.getFDVal(); 296 putEventOps(index, ops); 297 } 298 299 /** 300 * Removes a pollfd entry from the poll array 301 */ 302 private void remove(SelectionKeyImpl ski) { 303 int index = ski.getIndex(); 304 assert index > 0 && index < pollArraySize; 305 assert getDescriptor(index) == ski.getFDVal(); 306 307 // replace pollfd at index with the last pollfd in array 308 int lastIndex = pollArraySize - 1; 309 if (lastIndex != index) { 310 SelectionKeyImpl lastKey = pollKeys.get(lastIndex); 311 assert lastKey.getIndex() == lastIndex; 312 int lastFd = getDescriptor(lastIndex); 313 int lastOps = getEventOps(lastIndex); 314 int lastRevents = getReventOps(lastIndex); 315 assert lastKey.getFDVal() == lastFd; 316 putDescriptor(index, lastFd); 317 putEventOps(index, lastOps); 318 putReventOps(index, lastRevents); 319 pollKeys.set(index, lastKey); 320 lastKey.setIndex(index); 321 } 322 pollKeys.remove(lastIndex); 323 pollArraySize--; 324 assert pollKeys.size() == pollArraySize; 325 326 ski.setIndex(0); 327 } 328 329 /** 330 * Expand poll array if at capacity 331 */ 332 private void expandIfNeeded() { 333 if (pollArraySize == pollArrayCapacity) { 334 int oldSize = pollArrayCapacity * SIZE_POLLFD; 335 int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; 336 int newSize = newCapacity * SIZE_POLLFD; 337 AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); 338 Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); 339 pollArray.free(); 340 pollArray = newPollArray; 341 pollArrayCapacity = newCapacity; 342 } 343 } 344 345 private static final short SIZE_POLLFD = 8; 346 private static final short FD_OFFSET = 0; 347 private static final short EVENT_OFFSET = 4; 348 private static final short REVENT_OFFSET = 6; 349 350 private void putDescriptor(int i, int fd) { 351 int offset = SIZE_POLLFD * i + FD_OFFSET; 352 pollArray.putInt(offset, fd); 353 } 354 355 private int getDescriptor(int i) { 356 int offset = SIZE_POLLFD * i + FD_OFFSET; 357 return pollArray.getInt(offset); 358 } 359 360 private void putEventOps(int i, int event) { 361 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 362 pollArray.putShort(offset, (short)event); 363 } 364 365 private int getEventOps(int i) { 366 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 367 return pollArray.getShort(offset); 368 } 369 370 private void putReventOps(int i, int revent) { 371 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 372 pollArray.putShort(offset, (short)revent); 373 } 374 375 private int getReventOps(int i) { 376 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 377 return pollArray.getShort(offset); 378 } 379 380 private static native int poll(long pollAddress, int numfds, int timeout); 381 382 static { 383 IOUtil.load(); 384 } 385 }