1 /* 2 * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.io.IOException; 28 import java.nio.channels.ClosedSelectorException; 29 import java.nio.channels.Selector; 30 import java.nio.channels.spi.SelectorProvider; 31 import java.util.ArrayDeque; 32 import java.util.ArrayList; 33 import java.util.Deque; 34 import java.util.List; 35 import java.util.concurrent.TimeUnit; 36 37 import jdk.internal.misc.Unsafe; 38 39 /** 40 * Selector implementation based on poll 41 */ 42 43 public class PollSelectorImpl extends SelectorImpl { 44 45 // initial capacity of poll array 46 private static final int INITIAL_CAPACITY = 16; 47 48 // poll array, grows as needed 49 private int pollArrayCapacity = INITIAL_CAPACITY; 50 private int pollArraySize; 51 private AllocatedNativeObject pollArray; 52 53 // file descriptors used for interrupt 54 private final int fd0; 55 private final int fd1; 56 57 // keys for file descriptors in poll array, synchronize on selector 58 private final List<SelectionKeyImpl> pollKeys = new ArrayList<>(); 59 60 // pending updates, queued by putEventOps 61 private final Object updateLock = new Object(); 62 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 63 64 // interrupt triggering and clearing 65 private final Object interruptLock = new Object(); 66 private boolean interruptTriggered; 67 68 protected PollSelectorImpl(SelectorProvider sp) throws IOException { 69 super(sp); 70 71 int size = pollArrayCapacity * SIZE_POLLFD; 72 this.pollArray = new AllocatedNativeObject(size, false); 73 74 try { 75 long fds = IOUtil.makePipe(false); 76 this.fd0 = (int) (fds >>> 32); 77 this.fd1 = (int) fds; 78 } catch (IOException ioe) { 79 pollArray.free(); 80 throw ioe; 81 } 82 83 // wakeup support 84 synchronized (this) { 85 setFirst(fd0, Net.POLLIN); 86 } 87 } 88 89 private void ensureOpen() { 90 if (!isOpen()) 91 throw new ClosedSelectorException(); 92 } 93 94 @Override 95 protected int doSelect(long timeout) throws IOException { 96 assert Thread.holdsLock(this); 97 98 int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout 99 boolean blocking = (to != 0); 100 boolean timedPoll = (to > 0); 101 102 processUpdateQueue(); 103 processDeregisterQueue(); 104 try { 105 begin(blocking); 106 107 int numPolled; 108 do { 109 long startTime = timedPoll ? System.nanoTime() : 0; 110 numPolled = poll(pollArray.address(), pollArraySize, to); 111 if (numPolled == IOStatus.INTERRUPTED && timedPoll) { 112 // timed poll interrupted so need to adjust timeout 113 long adjust = System.nanoTime() - startTime; 114 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 115 if (to <= 0) { 116 // timeout expired so no retry 117 numPolled = 0; 118 } 119 } 120 } while (numPolled == IOStatus.INTERRUPTED); 121 assert numPolled <= pollArraySize; 122 123 } finally { 124 end(blocking); 125 } 126 127 processDeregisterQueue(); 128 return updateSelectedKeys(); 129 } 130 131 protected int poll(long pollAddress, int numfds, int timeout) { 132 return poll0(pollAddress, numfds, timeout); 133 } 134 135 /** 136 * Process changes to the interest ops. 137 */ 138 private void processUpdateQueue() { 139 assert Thread.holdsLock(this); 140 141 synchronized (updateLock) { 142 SelectionKeyImpl ski; 143 while ((ski = updateKeys.pollFirst()) != null) { 144 int newEvents = ski.translateInterestOps(); 145 if (ski.isValid()) { 146 int index = ski.getIndex(); 147 assert index >= 0 && index < pollArraySize; 148 if (index > 0) { 149 assert pollKeys.get(index) == ski; 150 if (newEvents == 0) { 151 remove(ski); 152 } else { 153 update(ski, newEvents); 154 } 155 } else if (newEvents != 0) { 156 add(ski, newEvents); 157 } 158 } 159 } 160 } 161 } 162 163 /** 164 * Update the keys of file descriptors that were polled and add them to 165 * the selected-key set. 166 * If the interrupt fd has been selected, drain it and clear the interrupt. 167 */ 168 private int updateSelectedKeys() throws IOException { 169 assert Thread.holdsLock(this); 170 assert Thread.holdsLock(nioSelectedKeys()); 171 assert pollArraySize > 0 && pollArraySize == pollKeys.size(); 172 173 int numKeysUpdated = 0; 174 for (int i = 1; i < pollArraySize; i++) { 175 int rOps = getReventOps(i); 176 if (rOps != 0) { 177 SelectionKeyImpl ski = pollKeys.get(i); 178 assert ski.getFDVal() == getDescriptor(i); 179 if (ski.isValid()) { 180 if (selectedKeys.contains(ski)) { 181 if (ski.translateAndUpdateReadyOps(rOps)) { 182 numKeysUpdated++; 183 } 184 } else { 185 ski.translateAndSetReadyOps(rOps); 186 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { 187 selectedKeys.add(ski); 188 numKeysUpdated++; 189 } 190 } 191 } 192 } 193 } 194 195 // check for interrupt 196 if (getReventOps(0) != 0) { 197 assert getDescriptor(0) == fd0; 198 clearInterrupt(); 199 } 200 201 return numKeysUpdated; 202 } 203 204 @Override 205 protected void implClose() throws IOException { 206 assert !isOpen(); 207 assert Thread.holdsLock(this); 208 209 // prevent further wakeup 210 synchronized (interruptLock) { 211 interruptTriggered = true; 212 } 213 214 pollArray.free(); 215 FileDispatcherImpl.closeIntFD(fd0); 216 FileDispatcherImpl.closeIntFD(fd1); 217 } 218 219 @Override 220 protected void implRegister(SelectionKeyImpl ski) { 221 assert ski.getIndex() == 0; 222 ensureOpen(); 223 } 224 225 @Override 226 protected void implDereg(SelectionKeyImpl ski) throws IOException { 227 assert !ski.isValid(); 228 assert Thread.holdsLock(this); 229 230 // remove from poll array 231 int index = ski.getIndex(); 232 if (index > 0) { 233 remove(ski); 234 } 235 } 236 237 @Override 238 public void setEventOps(SelectionKeyImpl ski) { 239 ensureOpen(); 240 synchronized (updateLock) { 241 updateKeys.addLast(ski); 242 } 243 } 244 245 @Override 246 public Selector wakeup() { 247 synchronized (interruptLock) { 248 if (!interruptTriggered) { 249 try { 250 IOUtil.write1(fd1, (byte)0); 251 } catch (IOException ioe) { 252 throw new InternalError(ioe); 253 } 254 interruptTriggered = true; 255 } 256 } 257 return this; 258 } 259 260 private void clearInterrupt() throws IOException { 261 synchronized (interruptLock) { 262 IOUtil.drain(fd0); 263 interruptTriggered = false; 264 } 265 } 266 267 /** 268 * Sets the first pollfd enty in the poll array to the given fd 269 */ 270 private void setFirst(int fd, int ops) { 271 assert pollArraySize == 0; 272 assert pollKeys.isEmpty(); 273 274 putDescriptor(0, fd); 275 putEventOps(0, ops); 276 pollArraySize = 1; 277 278 pollKeys.add(null); // dummy element 279 } 280 281 /** 282 * Adds a pollfd entry to the poll array, expanding the poll array if needed. 283 */ 284 private void add(SelectionKeyImpl ski, int ops) { 285 expandIfNeeded(); 286 287 int index = pollArraySize; 288 assert index > 0; 289 putDescriptor(index, ski.getFDVal()); 290 putEventOps(index, ops); 291 putReventOps(index, 0); 292 ski.setIndex(index); 293 pollArraySize++; 294 295 pollKeys.add(ski); 296 assert pollKeys.size() == pollArraySize; 297 } 298 299 /** 300 * Update the events of pollfd entry. 301 */ 302 private void update(SelectionKeyImpl ski, int ops) { 303 int index = ski.getIndex(); 304 assert index > 0 && index < pollArraySize; 305 assert getDescriptor(index) == ski.getFDVal(); 306 putEventOps(index, ops); 307 } 308 309 /** 310 * Removes a pollfd entry from the poll array 311 */ 312 private void remove(SelectionKeyImpl ski) { 313 int index = ski.getIndex(); 314 assert index > 0 && index < pollArraySize; 315 assert getDescriptor(index) == ski.getFDVal(); 316 317 // replace pollfd at index with the last pollfd in array 318 int lastIndex = pollArraySize - 1; 319 if (lastIndex != index) { 320 SelectionKeyImpl lastKey = pollKeys.get(lastIndex); 321 assert lastKey.getIndex() == lastIndex; 322 int lastFd = getDescriptor(lastIndex); 323 int lastOps = getEventOps(lastIndex); 324 int lastRevents = getReventOps(lastIndex); 325 assert lastKey.getFDVal() == lastFd; 326 putDescriptor(index, lastFd); 327 putEventOps(index, lastOps); 328 putReventOps(index, lastRevents); 329 pollKeys.set(index, lastKey); 330 lastKey.setIndex(index); 331 } 332 pollKeys.remove(lastIndex); 333 pollArraySize--; 334 assert pollKeys.size() == pollArraySize; 335 336 ski.setIndex(0); 337 } 338 339 /** 340 * Expand poll array if at capacity 341 */ 342 private void expandIfNeeded() { 343 if (pollArraySize == pollArrayCapacity) { 344 int oldSize = pollArrayCapacity * SIZE_POLLFD; 345 int newCapacity = pollArrayCapacity + INITIAL_CAPACITY; 346 int newSize = newCapacity * SIZE_POLLFD; 347 AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false); 348 Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize); 349 pollArray.free(); 350 pollArray = newPollArray; 351 pollArrayCapacity = newCapacity; 352 } 353 } 354 355 private static final short SIZE_POLLFD = 8; 356 private static final short FD_OFFSET = 0; 357 private static final short EVENT_OFFSET = 4; 358 private static final short REVENT_OFFSET = 6; 359 360 private void putDescriptor(int i, int fd) { 361 int offset = SIZE_POLLFD * i + FD_OFFSET; 362 pollArray.putInt(offset, fd); 363 } 364 365 private int getDescriptor(int i) { 366 int offset = SIZE_POLLFD * i + FD_OFFSET; 367 return pollArray.getInt(offset); 368 } 369 370 private void putEventOps(int i, int event) { 371 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 372 pollArray.putShort(offset, (short)event); 373 } 374 375 private int getEventOps(int i) { 376 int offset = SIZE_POLLFD * i + EVENT_OFFSET; 377 return pollArray.getShort(offset); 378 } 379 380 private void putReventOps(int i, int revent) { 381 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 382 pollArray.putShort(offset, (short)revent); 383 } 384 385 private int getReventOps(int i) { 386 int offset = SIZE_POLLFD * i + REVENT_OFFSET; 387 return pollArray.getShort(offset); 388 } 389 390 private static native int poll0(long pollAddress, int numfds, int timeout); 391 392 static { 393 IOUtil.load(); 394 } 395 }