1 /* 2 * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.nio.channels.ClosedSelectorException; 29 import java.nio.channels.Selector; 30 import java.nio.channels.spi.SelectorProvider; 31 import java.util.ArrayDeque; 32 import java.util.ArrayList; 33 import java.util.Deque; 34 import java.util.List; 35 import java.util.concurrent.TimeUnit; 36 37 import jdk.internal.misc.Unsafe; 38 39 /** 40 * Selector implementation based on poll 41 */ 42 43 class PollSelectorImpl extends SelectorImpl { 44 45 // initial capacity of poll array 46 private static final int INITIAL_CAPACITY = 16; 47 48 // poll array, grows as needed 49 private int pollArrayCapacity = INITIAL_CAPACITY; 50 private int pollArraySize; 51 private AllocatedNativeObject pollArray; 52 53 // file descriptors used for interrupt 54 private final int fd0; 55 private final int fd1; 56 57 // keys for file descriptors in poll array, synchronize on selector 58 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>(); 59 60 // pending updates, queued by putEventOps 61 private final Object updateLock = new Object(); 62 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 63 64 // interrupt triggering and clearing 65 private final Object interruptLock = new Object(); 66 private boolean interruptTriggered; 67 68 PollSelectorImpl(SelectorProvider sp) throws IOException { 69 super(sp); 70 71 int size = pollArrayCapacity * SIZE_POLLFD; 72 this.pollArray = new AllocatedNativeObject(size, false); 73 74 try { 75 long fds = IOUtil.makePipe(false); 76 this.fd0 = (int) (fds >>> 32); 77 this.fd1 = (int) fds; 78 } catch (IOException ioe) { 79 pollArray.free(); 80 throw ioe; 81 } 82 83 // wakeup support 84 synchronized (this) { 85 setFirst(fd0, Net.POLLIN); 86 } 87 } 88 89 private void ensureOpen() { 90 if (!isOpen()) 91 throw new ClosedSelectorException(); 92 } 93 94 @Override 95 protected int doSelect(long timeout) throws IOException { 96 assert Thread.holdsLock(this); 97 98 int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout 99 boolean blocking = (to != 0); 100 boolean timedPoll = (to > 0); 101 102 processUpdateQueue(); 103 processDeregisterQueue(); 104 try { 105 begin(blocking); 106 107 int numPolled; 108 do { 109 long startTime = timedPoll ? System.nanoTime() : 0; 110 numPolled = poll(pollArray.address(), pollArraySize, to); 111 if (numPolled == IOStatus.INTERRUPTED && timedPoll) { 112 // timed poll interrupted so need to adjust timeout 113 long adjust = System.nanoTime() - startTime; 114 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 115 if (to <= 0) { 116 // timeout expired so no retry 117 numPolled = 0; 118 } 119 } 120 } while (numPolled == IOStatus.INTERRUPTED); 121 assert numPolled <= pollArraySize; 122 123 } finally { 124 end(blocking); 125 } 126 127 processDeregisterQueue(); 128 return updateSelectedKeys(); 129 } 130 131 /** 132 * Process changes to the interest ops. 133 */ 134 private void processUpdateQueue() { 135 assert Thread.holdsLock(this); 136 137 synchronized (updateLock) { 138 SelectionKeyImpl ski; 139 while ((ski = updateKeys.pollFirst()) != null) { 140 int newEvents = ski.translateInterestOps(); 141 if (ski.isValid()) { 142 int index = ski.getIndex(); 143 assert index >= 0 && index < pollArraySize; 144 if (index > 0) { 145 assert pollKeys.get(index) == ski; 146 if (newEvents == 0) { 147 remove(ski); 148 } else { 149 update(ski, newEvents); 150 } 151 } else if (newEvents != 0) { 152 add(ski, newEvents); 153 } 154 } 155 } 156 } 157 } 158 159 /** 160 * Update the keys of file descriptors that were polled and add them to 161 * the selected-key set. 162 * If the interrupt fd has been selected, drain it and clear the interrupt. 163 */ 164 private int updateSelectedKeys() throws IOException { 165 assert Thread.holdsLock(this); 166 assert Thread.holdsLock(nioSelectedKeys()); 167 assert pollArraySize > 0 && pollArraySize == pollKeys.size(); 168 169 int numKeysUpdated = 0; 170 for (int i = 1; i < pollArraySize; i++) { 171 int rOps = getReventOps(i); 172 if (rOps != 0) { 173 SelectionKeyImpl ski = pollKeys.get(i); 174 assert ski.getFDVal() == getDescriptor(i); 175 if (ski.isValid()) { 176 if (selectedKeys.contains(ski)) { 177 if (ski.translateAndUpdateReadyOps(rOps)) { 178 numKeysUpdated++; 179 } 180 } else { 181 ski.translateAndSetReadyOps(rOps); 182 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { 183 selectedKeys.add(ski); 184 numKeysUpdated++; 185 } 186 } 187 } 188 } 189 } 190 191 // check for interrupt 192 if (getReventOps(0) != 0) { 193 assert getDescriptor(0) == fd0; 194 clearInterrupt(); 195 } 196 197 return numKeysUpdated; 198 } 199 200 @Override 201 protected void implClose() throws IOException { 202 assert !isOpen(); 203 assert Thread.holdsLock(this); 204 205 // prevent further wakeup 206 synchronized (interruptLock) { 207 interruptTriggered = true; 208 } 209 210 pollArray.free(); 211 FileDispatcherImpl.closeIntFD(fd0); 212 FileDispatcherImpl.closeIntFD(fd1); 213 } 214 215 @Override 216 protected void implRegister(SelectionKeyImpl ski) { 217 assert ski.getIndex() == 0; 218 ensureOpen(); 219 } 220 221 @Override 222 protected void implDereg(SelectionKeyImpl ski) throws IOException { 223 assert !ski.isValid(); 224 assert Thread.holdsLock(this); 225 226 // remove from poll array 227 int index = ski.getIndex(); 228 if (index > 0) { 229 remove(ski); 230 } 231 } 232 233 @Override 234 public void setEventOps(SelectionKeyImpl ski) { 235 ensureOpen(); 236 synchronized (updateLock) { 237 updateKeys.addLast(ski); 238 } 239 } 240 241 @Override 242 public Selector wakeup() { 243 synchronized (interruptLock) { 244 if (!interruptTriggered) { 245 try { 246 IOUtil.write1(fd1, (byte)0); 247 } catch (IOException ioe) { 248 throw new InternalError(ioe); 249 } 250 interruptTriggered = true; 251 } 252 } 253 return this; 254 } 255 256 private void clearInterrupt() throws IOException { 257 synchronized (interruptLock) { 258 IOUtil.drain(fd0); 259 interruptTriggered = false; 260 } 261 } 262 263 /** 264 * Sets the first pollfd enty in the poll array to the given fd 265 */ 266 private void setFirst(int fd, int ops) { 267 assert pollArraySize == 0; 268 assert pollKeys.isEmpty(); 269 270 putDescriptor(0, fd); 271 putEventOps(0, ops); 272 pollArraySize = 1; 273 274 pollKeys.add(null); // dummy element 275 } 276 277 /** 278 * Adds a pollfd entry to the poll array, expanding the poll array if needed. 279 */ 280 private void add(SelectionKeyImpl ski, int ops) { 281 expandIfNeeded(); 282 283 int index = pollArraySize; 284 assert index > 0; 285 putDescriptor(index, ski.getFDVal()); 286 putEventOps(index, ops); 287 putReventOps(index, 0); 288 ski.setIndex(index); 289 pollArraySize++; 290 291 pollKeys.add(ski); 292 assert pollKeys.size() == pollArraySize; 293 } 294 295 /** 296 * Update the events of pollfd entry. 297 */ 298 private void update(SelectionKeyImpl ski, int ops) { 299 int index = ski.getIndex(); 300 assert index > 0 && index < pollArraySize; 301 assert getDescriptor(index) == ski.getFDVal(); 302 putEventOps(index, ops); 303 } 304 305 /** 306 * Removes a pollfd entry from the poll array 307 */ 308 private void remove(SelectionKeyImpl ski) { 309 int index = ski.getIndex(); 310 assert index > 0 && index < pollArraySize; 311 assert getDescriptor(index) == ski.getFDVal(); 312 313 // replace pollfd at index with the last pollfd in array 314 int lastIndex = pollArraySize - 1; 315 if (lastIndex != index) { 316 SelectionKeyImpl lastKey = pollKeys.get(lastIndex); 317 assert lastKey.getIndex() == lastIndex; 318 int lastFd = getDescriptor(lastIndex); 319 int lastOps = getEventOps(lastIndex); 320 int lastRevents = getReventOps(lastIndex); 321 assert lastKey.getFDVal() == lastFd; 322 putDescriptor(index, lastFd); 323 putEventOps(index, lastOps); 324 putReventOps(index, lastRevents); 325 pollKeys.set(index, lastKey); 326 lastKey.setIndex(index); 327 } 328 pollKeys.remove(lastIndex); 329 pollArraySize--; 330 assert pollKeys.size() == pollArraySize; 331 332 ski.setIndex(0); 333 } 334 335 /** 336 * Expand poll array if at capacity 337 */ 338 private void expandIfNeeded() { 339 if (pollArraySize == pollArrayCapacity) { 340 int oldSize = pollArrayCapacity * SIZE_POLLFD; 341 int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; 342 int newSize = newCapacity * SIZE_POLLFD; 343 AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); 344 Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); 345 pollArray.free(); 346 pollArray = newPollArray; 347 pollArrayCapacity = newCapacity; 348 } 349 } 350 351 private static final short SIZE_POLLFD = 8; 352 private static final short FD_OFFSET = 0; 353 private static final short EVENT_OFFSET = 4; 354 private static final short REVENT_OFFSET = 6; 355 356 private void putDescriptor(int i, int fd) { 357 int offset = SIZE_POLLFD * i + FD_OFFSET; 358 pollArray.putInt(offset, fd); 359 } 360 361 private int getDescriptor(int i) { 362 int offset = SIZE_POLLFD * i + FD_OFFSET; 363 return pollArray.getInt(offset); 364 } 365 366 private void putEventOps(int i, int event) { 367 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 368 pollArray.putShort(offset, (short)event); 369 } 370 371 private int getEventOps(int i) { 372 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 373 return pollArray.getShort(offset); 374 } 375 376 private void putReventOps(int i, int revent) { 377 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 378 pollArray.putShort(offset, (short)revent); 379 } 380 381 private int getReventOps(int i) { 382 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 383 return pollArray.getShort(offset); 384 } 385 386 private static native int poll(long pollAddress, int numfds, int timeout); 387 388 static { 389 IOUtil.load(); 390 } 391 }