1 /* 2 * Copyright (c) 2005, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedSelectorException; 30 import java.nio.channels.SelectableChannel; 31 import java.nio.channels.SelectionKey; 32 import java.nio.channels.Selector; 33 import java.nio.channels.spi.SelectorProvider; 34 import java.util.ArrayDeque; 35 import java.util.BitSet; 36 import java.util.Deque; 37 import java.util.HashMap; 38 import java.util.Iterator; 39 import java.util.Map; 40 import java.util.concurrent.TimeUnit; 41 42 import static sun.nio.ch.EPoll.EPOLLIN; 43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; 44 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL; 45 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD; 46 47 48 /** 49 * Linux epoll based Selector implementation 50 */ 51 52 class EPollSelectorImpl extends SelectorImpl { 53 54 // maximum number of events to poll in one call to epoll_wait 55 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024); 56 57 // epoll file descriptor 58 private final int epfd; 59 60 // address of poll array when polling with epoll_wait 61 private final long pollArrayAddress; 62 63 // file descriptors used for interrupt 64 private final int fd0; 65 private final int fd1; 66 67 // maps file descriptor to selection key, synchronize on selector 68 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); 69 70 // file descriptors registered with epoll, synchronize on selector 71 private final BitSet registered = new BitSet(); 72 73 // pending new registrations/updates, queued by implRegister and putEventOps 74 private final Object updateLock = new Object(); 75 private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>(); 76 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 77 private final Deque<Integer> updateOps = new ArrayDeque<>(); 78 79 // interrupt triggering and clearing 80 private final Object interruptLock = new Object(); 81 private boolean interruptTriggered; 82 83 /** 84 * Package private constructor called by factory method in 85 * the abstract superclass Selector. 86 */ 87 EPollSelectorImpl(SelectorProvider sp) throws IOException { 88 super(sp); 89 90 this.epfd = EPoll.create(); 91 this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); 92 93 try { 94 long fds = IOUtil.makePipe(false); 95 this.fd0 = (int) (fds >>> 32); 96 this.fd1 = (int) fds; 97 } catch (IOException ioe) { 98 EPoll.freePollArray(pollArrayAddress); 99 FileDispatcherImpl.closeIntFD(epfd); 100 throw ioe; 101 } 102 103 // register one end of the socket pair for wakeups 104 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); 105 } 106 107 private void ensureOpen() { 108 if (!isOpen()) 109 throw new ClosedSelectorException(); 110 } 111 112 @Override 113 protected int doSelect(long timeout) throws IOException { 114 assert Thread.holdsLock(this); 115 116 int numEntries; 117 processUpdateQueue(); 118 processDeregisterQueue(); 119 try { 120 begin(); 121 122 // epoll_wait timeout is int 123 int to = (int) Math.min(timeout, Integer.MAX_VALUE); 124 boolean timedPoll = (to > 0); 125 do { 126 long startTime = timedPoll ? System.nanoTime() : 0; 127 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); 128 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { 129 // timed poll interrupted so need to adjust timeout 130 long adjust = System.nanoTime() - startTime; 131 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 132 if (to <= 0) { 133 // timeout expired so no retry 134 numEntries = 0; 135 } 136 } 137 } while (numEntries == IOStatus.INTERRUPTED); 138 assert IOStatus.check(numEntries); 139 140 } finally { 141 end(); 142 } 143 processDeregisterQueue(); 144 return updateSelectedKeys(numEntries); 145 } 146 147 /** 148 * Process new registrations and changes to the interest ops. 149 */ 150 private void processUpdateQueue() { 151 assert Thread.holdsLock(this); 152 153 synchronized (updateLock) { 154 SelectionKeyImpl ski; 155 156 // new registrations 157 while ((ski = newKeys.pollFirst()) != null) { 158 if (ski.isValid()) { 159 SelChImpl ch = ski.channel; 160 int fd = ch.getFDVal(); 161 SelectionKeyImpl previous = fdToKey.put(fd, ski); 162 assert previous == null; 163 assert registered.get(fd) == false; 164 } 165 } 166 167 // changes to interest ops 168 assert updateKeys.size() == updateOps.size(); 169 while ((ski = updateKeys.pollFirst()) != null) { 170 int ops = updateOps.pollFirst(); 171 int fd = ski.channel.getFDVal(); 172 if (ski.isValid() && fdToKey.containsKey(fd)) { 173 if (registered.get(fd)) { 174 if (ops == 0) { 175 // remove from epoll 176 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 177 registered.clear(fd); 178 } else { 179 // modify events 180 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops); 181 } 182 } else if (ops != 0) { 183 // add to epoll 184 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops); 185 registered.set(fd); 186 } 187 } 188 } 189 } 190 } 191 192 /** 193 * Update the keys whose fd's have been selected by the epoll. 194 * Add the ready keys to the ready queue. 195 */ 196 private int updateSelectedKeys(int numEntries) throws IOException { 197 assert Thread.holdsLock(this); 198 assert Thread.holdsLock(nioSelectedKeys()); 199 200 boolean interrupted = false; 201 int numKeysUpdated = 0; 202 for (int i=0; i<numEntries; i++) { 203 long event = EPoll.getEvent(pollArrayAddress, i); 204 int fd = EPoll.getDescriptor(event); 205 if (fd == fd0) { 206 interrupted = true; 207 } else { 208 SelectionKeyImpl ski = fdToKey.get(fd); 209 if (ski != null) { 210 int rOps = EPoll.getEvents(event); 211 if (selectedKeys.contains(ski)) { 212 if (ski.channel.translateAndSetReadyOps(rOps, ski)) { 213 numKeysUpdated++; 214 } 215 } else { 216 ski.channel.translateAndSetReadyOps(rOps, ski); 217 if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { 218 selectedKeys.add(ski); 219 numKeysUpdated++; 220 } 221 } 222 } 223 } 224 } 225 226 if (interrupted) { 227 clearInterrupt(); 228 } 229 230 return numKeysUpdated; 231 } 232 233 @Override 234 protected void implClose() throws IOException { 235 assert Thread.holdsLock(this); 236 assert Thread.holdsLock(nioKeys()); 237 238 // prevent further wakeup 239 synchronized (interruptLock) { 240 interruptTriggered = true; 241 } 242 243 FileDispatcherImpl.closeIntFD(epfd); 244 EPoll.freePollArray(pollArrayAddress); 245 246 FileDispatcherImpl.closeIntFD(fd0); 247 FileDispatcherImpl.closeIntFD(fd1); 248 249 // Deregister channels 250 Iterator<SelectionKey> i = keys.iterator(); 251 while (i.hasNext()) { 252 SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); 253 deregister(ski); 254 SelectableChannel selch = ski.channel(); 255 if (!selch.isOpen() && !selch.isRegistered()) 256 ((SelChImpl)selch).kill(); 257 i.remove(); 258 } 259 } 260 261 @Override 262 protected void implRegister(SelectionKeyImpl ski) { 263 assert Thread.holdsLock(nioKeys()); 264 ensureOpen(); 265 synchronized (updateLock) { 266 newKeys.addLast(ski); 267 } 268 keys.add(ski); 269 } 270 271 @Override 272 protected void implDereg(SelectionKeyImpl ski) throws IOException { 273 assert !ski.isValid(); 274 assert Thread.holdsLock(this); 275 assert Thread.holdsLock(nioKeys()); 276 assert Thread.holdsLock(nioSelectedKeys()); 277 278 int fd = ski.channel.getFDVal(); 279 fdToKey.remove(fd); 280 if (registered.get(fd)) { 281 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 282 registered.clear(fd); 283 } 284 285 selectedKeys.remove(ski); 286 keys.remove(ski); 287 288 // remove from channel's key set 289 deregister(ski); 290 291 SelectableChannel selch = ski.channel(); 292 if (!selch.isOpen() && !selch.isRegistered()) 293 ((SelChImpl) selch).kill(); 294 } 295 296 @Override 297 public void putEventOps(SelectionKeyImpl ski, int ops) { 298 ensureOpen(); 299 synchronized (updateLock) { 300 updateOps.addLast(ops); // ops first in case adding the key fails 301 updateKeys.addLast(ski); 302 } 303 } 304 305 @Override 306 public Selector wakeup() { 307 synchronized (interruptLock) { 308 if (!interruptTriggered) { 309 try { 310 IOUtil.write1(fd1, (byte)0); 311 } catch (IOException ioe) { 312 throw new InternalError(ioe); 313 } 314 interruptTriggered = true; 315 } 316 } 317 return this; 318 } 319 320 private void clearInterrupt() throws IOException { 321 synchronized (interruptLock) { 322 IOUtil.drain(fd0); 323 interruptTriggered = false; 324 } 325 } 326 }