1 /* 2 * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.nio.channels.ClosedSelectorException; 29 import java.nio.channels.SelectionKey; 30 import java.nio.channels.Selector; 31 import java.nio.channels.spi.SelectorProvider; 32 import java.util.ArrayDeque; 33 import java.util.ArrayList; 34 import java.util.Deque; 35 import java.util.List; 36 import java.util.concurrent.TimeUnit; 37 import java.util.function.Consumer; 38 39 import jdk.internal.misc.Unsafe; 40 41 /** 42 * Selector implementation based on poll 43 */ 44 45 public class PollSelectorImpl extends SelectorImpl { 46 47 // initial capacity of poll array 48 private static final int INITIAL_CAPACITY = 16; 49 50 // poll array, grows as needed 51 private int pollArrayCapacity = INITIAL_CAPACITY; 52 private int pollArraySize; 53 private AllocatedNativeObject pollArray; 54 55 // file descriptors used for interrupt 56 private final int fd0; 57 private final int fd1; 58 59 // keys for file descriptors in poll array, synchronize on selector 60 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>(); 61 62 // pending updates, queued by putEventOps 63 private final Object updateLock = new Object(); 64 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 65 66 // interrupt triggering and clearing 67 private final Object interruptLock = new Object(); 68 private boolean interruptTriggered; 69 70 protected PollSelectorImpl(SelectorProvider sp) throws IOException { 71 super(sp); 72 73 int size = pollArrayCapacity * SIZE_POLLFD; 74 this.pollArray = new AllocatedNativeObject(size, false); 75 76 try { 77 long fds = IOUtil.makePipe(false); 78 this.fd0 = (int) (fds >>> 32); 79 this.fd1 = (int) fds; 80 } catch (IOException ioe) { 81 pollArray.free(); 82 throw ioe; 83 } 84 85 // wakeup support 86 synchronized (this) { 87 setFirst(fd0, Net.POLLIN); 88 } 89 } 90 91 private void ensureOpen() { 92 if (!isOpen()) 93 throw new ClosedSelectorException(); 94 } 95 96 @Override 97 protected int doSelect(Consumer<SelectionKey> action, long timeout) 98 throws IOException 99 { 100 assert Thread.holdsLock(this); 101 102 int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout 103 boolean blocking = (to != 0); 104 boolean timedPoll = (to > 0); 105 106 processUpdateQueue(); 107 processDeregisterQueue(); 108 try { 109 begin(blocking); 110 111 int numPolled; 112 do { 113 long startTime = timedPoll ? System.nanoTime() : 0; 114 numPolled = poll(pollArray.address(), pollArraySize, to); 115 if (numPolled == IOStatus.INTERRUPTED && timedPoll) { 116 // timed poll interrupted so need to adjust timeout 117 long adjust = System.nanoTime() - startTime; 118 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 119 if (to <= 0) { 120 // timeout expired so no retry 121 numPolled = 0; 122 } 123 } 124 } while (numPolled == IOStatus.INTERRUPTED); 125 assert numPolled <= pollArraySize; 126 127 } finally { 128 end(blocking); 129 } 130 131 processDeregisterQueue(); 132 return processEvents(action); 133 } 134 135 /** 136 * Protected poll method allows different platform-specific 137 * native implementations 138 */ 139 protected int poll(long pollAddress, int numfds, int timeout) { 140 return poll0(pollAddress, numfds, timeout); 141 } 142 143 /** 144 * Process changes to the interest ops. 145 */ 146 private void processUpdateQueue() { 147 assert Thread.holdsLock(this); 148 149 synchronized (updateLock) { 150 SelectionKeyImpl ski; 151 while ((ski = updateKeys.pollFirst()) != null) { 152 int newEvents = ski.translateInterestOps(); 153 if (ski.isValid()) { 154 int index = ski.getIndex(); 155 assert index >= 0 && index < pollArraySize; 156 if (index > 0) { 157 assert pollKeys.get(index) == ski; 158 if (newEvents == 0) { 159 remove(ski); 160 } else { 161 update(ski, newEvents); 162 } 163 } else if (newEvents != 0) { 164 add(ski, newEvents); 165 } 166 } 167 } 168 } 169 } 170 171 /** 172 * Process the polled events. 173 * If the interrupt fd has been selected, drain it and clear the interrupt. 174 */ 175 private int processEvents(Consumer<SelectionKey> action) 176 throws IOException 177 { 178 assert Thread.holdsLock(this); 179 assert pollArraySize > 0 && pollArraySize == pollKeys.size(); 180 181 int numKeysUpdated = 0; 182 for (int i = 1; i < pollArraySize; i++) { 183 int rOps = getReventOps(i); 184 if (rOps != 0) { 185 SelectionKeyImpl ski = pollKeys.get(i); 186 assert ski.getFDVal() == getDescriptor(i); 187 if (ski.isValid()) { 188 numKeysUpdated += processReadyEvents(rOps, ski, action); 189 } 190 } 191 } 192 193 // check for interrupt 194 if (getReventOps(0) != 0) { 195 assert getDescriptor(0) == fd0; 196 clearInterrupt(); 197 } 198 199 return numKeysUpdated; 200 } 201 202 @Override 203 protected void implClose() throws IOException { 204 assert !isOpen(); 205 assert Thread.holdsLock(this); 206 207 // prevent further wakeup 208 synchronized (interruptLock) { 209 interruptTriggered = true; 210 } 211 212 pollArray.free(); 213 FileDispatcherImpl.closeIntFD(fd0); 214 FileDispatcherImpl.closeIntFD(fd1); 215 } 216 217 @Override 218 protected void implRegister(SelectionKeyImpl ski) { 219 assert ski.getIndex() == 0; 220 ensureOpen(); 221 } 222 223 @Override 224 protected void implDereg(SelectionKeyImpl ski) throws IOException { 225 assert !ski.isValid(); 226 assert Thread.holdsLock(this); 227 228 // remove from poll array 229 int index = ski.getIndex(); 230 if (index > 0) { 231 remove(ski); 232 } 233 } 234 235 @Override 236 public void setEventOps(SelectionKeyImpl ski) { 237 ensureOpen(); 238 synchronized (updateLock) { 239 updateKeys.addLast(ski); 240 } 241 } 242 243 @Override 244 public Selector wakeup() { 245 synchronized (interruptLock) { 246 if (!interruptTriggered) { 247 try { 248 IOUtil.write1(fd1, (byte)0); 249 } catch (IOException ioe) { 250 throw new InternalError(ioe); 251 } 252 interruptTriggered = true; 253 } 254 } 255 return this; 256 } 257 258 private void clearInterrupt() throws IOException { 259 synchronized (interruptLock) { 260 IOUtil.drain(fd0); 261 interruptTriggered = false; 262 } 263 } 264 265 /** 266 * Sets the first pollfd enty in the poll array to the given fd 267 */ 268 private void setFirst(int fd, int ops) { 269 assert pollArraySize == 0; 270 assert pollKeys.isEmpty(); 271 272 putDescriptor(0, fd); 273 putEventOps(0, ops); 274 pollArraySize = 1; 275 276 pollKeys.add(null); // dummy element 277 } 278 279 /** 280 * Adds a pollfd entry to the poll array, expanding the poll array if needed. 281 */ 282 private void add(SelectionKeyImpl ski, int ops) { 283 expandIfNeeded(); 284 285 int index = pollArraySize; 286 assert index > 0; 287 putDescriptor(index, ski.getFDVal()); 288 putEventOps(index, ops); 289 putReventOps(index, 0); 290 ski.setIndex(index); 291 pollArraySize++; 292 293 pollKeys.add(ski); 294 assert pollKeys.size() == pollArraySize; 295 } 296 297 /** 298 * Update the events of pollfd entry. 299 */ 300 private void update(SelectionKeyImpl ski, int ops) { 301 int index = ski.getIndex(); 302 assert index > 0 && index < pollArraySize; 303 assert getDescriptor(index) == ski.getFDVal(); 304 putEventOps(index, ops); 305 } 306 307 /** 308 * Removes a pollfd entry from the poll array 309 */ 310 private void remove(SelectionKeyImpl ski) { 311 int index = ski.getIndex(); 312 assert index > 0 && index < pollArraySize; 313 assert getDescriptor(index) == ski.getFDVal(); 314 315 // replace pollfd at index with the last pollfd in array 316 int lastIndex = pollArraySize - 1; 317 if (lastIndex != index) { 318 SelectionKeyImpl lastKey = pollKeys.get(lastIndex); 319 assert lastKey.getIndex() == lastIndex; 320 int lastFd = getDescriptor(lastIndex); 321 int lastOps = getEventOps(lastIndex); 322 int lastRevents = getReventOps(lastIndex); 323 assert lastKey.getFDVal() == lastFd; 324 putDescriptor(index, lastFd); 325 putEventOps(index, lastOps); 326 putReventOps(index, lastRevents); 327 pollKeys.set(index, lastKey); 328 lastKey.setIndex(index); 329 } 330 pollKeys.remove(lastIndex); 331 pollArraySize--; 332 assert pollKeys.size() == pollArraySize; 333 334 ski.setIndex(0); 335 } 336 337 /** 338 * Expand poll array if at capacity 339 */ 340 private void expandIfNeeded() { 341 if (pollArraySize == pollArrayCapacity) { 342 int oldSize = pollArrayCapacity * SIZE_POLLFD; 343 int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; 344 int newSize = newCapacity * SIZE_POLLFD; 345 AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); 346 Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); 347 pollArray.free(); 348 pollArray = newPollArray; 349 pollArrayCapacity = newCapacity; 350 } 351 } 352 353 private static final short SIZE_POLLFD = 8; 354 private static final short FD_OFFSET = 0; 355 private static final short EVENT_OFFSET = 4; 356 private static final short REVENT_OFFSET = 6; 357 358 private void putDescriptor(int i, int fd) { 359 int offset = SIZE_POLLFD * i + FD_OFFSET; 360 pollArray.putInt(offset, fd); 361 } 362 363 private int getDescriptor(int i) { 364 int offset = SIZE_POLLFD * i + FD_OFFSET; 365 return pollArray.getInt(offset); 366 } 367 368 private void putEventOps(int i, int event) { 369 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 370 pollArray.putShort(offset, (short)event); 371 } 372 373 private int getEventOps(int i) { 374 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 375 return pollArray.getShort(offset); 376 } 377 378 private void putReventOps(int i, int revent) { 379 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 380 pollArray.putShort(offset, (short)revent); 381 } 382 383 private int getReventOps(int i) { 384 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 385 return pollArray.getShort(offset); 386 } 387 388 private static native int poll0(long pollAddress, int numfds, int timeout); 389 390 static { 391 IOUtil.load(); 392 } 393 }