1 /* 2 * Copyright (c) 2005, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedSelectorException; 30 import java.nio.channels.SelectionKey; 31 import java.nio.channels.Selector; 32 import java.nio.channels.spi.SelectorProvider; 33 import java.util.ArrayDeque; 34 import java.util.Deque; 35 import java.util.HashMap; 36 import java.util.Map; 37 import java.util.concurrent.TimeUnit; 38 import java.util.function.Consumer; 39 40 import static sun.nio.ch.EPoll.EPOLLIN; 41 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD; 42 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL; 43 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD; 44 45 46 /** 47 * Linux epoll based Selector implementation 48 */ 49 50 class EPollSelectorImpl extends SelectorImpl { 51 52 // maximum number of events to poll in one call to epoll_wait 53 private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024); 54 55 // epoll file descriptor 56 private final int epfd; 57 58 // address of poll array when polling with epoll_wait 59 private final long pollArrayAddress; 60 61 // file descriptors used for interrupt 62 private final int fd0; 63 private final int fd1; 64 65 // maps file descriptor to selection key, synchronize on selector 66 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); 67 68 // pending new registrations/updates, queued by setEventOps 69 private final Object updateLock = new Object(); 70 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 71 72 // interrupt triggering and clearing 73 private final Object interruptLock = new Object(); 74 private boolean interruptTriggered; 75 76 EPollSelectorImpl(SelectorProvider sp) throws IOException { 77 super(sp); 78 79 this.epfd = EPoll.create(); 80 this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); 81 82 try { 83 long fds = IOUtil.makePipe(false); 84 this.fd0 = (int) (fds >>> 32); 85 this.fd1 = (int) fds; 86 } catch (IOException ioe) { 87 EPoll.freePollArray(pollArrayAddress); 88 FileDispatcherImpl.closeIntFD(epfd); 89 throw ioe; 90 } 91 92 // register one end of the socket pair for wakeups 93 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); 94 } 95 96 private void ensureOpen() { 97 if (!isOpen()) 98 throw new ClosedSelectorException(); 99 } 100 101 @Override 102 protected int doSelect(Consumer<SelectionKey> action, long timeout) 103 throws IOException 104 { 105 assert Thread.holdsLock(this); 106 107 // epoll_wait timeout is int 108 int to = (int) Math.min(timeout, Integer.MAX_VALUE); 109 boolean blocking = (to != 0); 110 boolean timedPoll = (to > 0); 111 112 int numEntries; 113 processUpdateQueue(); 114 processDeregisterQueue(); 115 try { 116 begin(blocking); 117 118 do { 119 long startTime = timedPoll ? System.nanoTime() : 0; 120 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); 121 if (numEntries == IOStatus.INTERRUPTED && timedPoll) { 122 // timed poll interrupted so need to adjust timeout 123 long adjust = System.nanoTime() - startTime; 124 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 125 if (to <= 0) { 126 // timeout expired so no retry 127 numEntries = 0; 128 } 129 } 130 } while (numEntries == IOStatus.INTERRUPTED); 131 assert IOStatus.check(numEntries); 132 133 } finally { 134 end(blocking); 135 } 136 processDeregisterQueue(); 137 return processEvents(numEntries, action); 138 } 139 140 /** 141 * Process changes to the interest ops. 142 */ 143 private void processUpdateQueue() { 144 assert Thread.holdsLock(this); 145 146 synchronized (updateLock) { 147 SelectionKeyImpl ski; 148 while ((ski = updateKeys.pollFirst()) != null) { 149 if (ski.isValid()) { 150 int fd = ski.getFDVal(); 151 // add to fdToKey if needed 152 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); 153 assert (previous == null) || (previous == ski); 154 155 int newEvents = ski.translateInterestOps(); 156 int registeredEvents = ski.registeredEvents(); 157 if (newEvents != registeredEvents) { 158 if (newEvents == 0) { 159 // remove from epoll 160 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 161 } else { 162 if (registeredEvents == 0) { 163 // add to epoll 164 EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents); 165 } else { 166 // modify events 167 EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents); 168 } 169 } 170 ski.registeredEvents(newEvents); 171 } 172 } 173 } 174 } 175 } 176 177 /** 178 * Process the polled events. 179 * If the interrupt fd has been selected, drain it and clear the interrupt. 180 */ 181 private int processEvents(int numEntries, Consumer<SelectionKey> action) 182 throws IOException 183 { 184 assert Thread.holdsLock(this); 185 186 boolean interrupted = false; 187 int numKeysUpdated = 0; 188 for (int i=0; i<numEntries; i++) { 189 long event = EPoll.getEvent(pollArrayAddress, i); 190 int fd = EPoll.getDescriptor(event); 191 if (fd == fd0) { 192 interrupted = true; 193 } else { 194 SelectionKeyImpl ski = fdToKey.get(fd); 195 if (ski != null) { 196 int rOps = EPoll.getEvents(event); 197 numKeysUpdated += processReadyEvents(rOps, ski, action); 198 } 199 } 200 } 201 202 if (interrupted) { 203 clearInterrupt(); 204 } 205 206 return numKeysUpdated; 207 } 208 209 @Override 210 protected void implClose() throws IOException { 211 assert Thread.holdsLock(this); 212 213 // prevent further wakeup 214 synchronized (interruptLock) { 215 interruptTriggered = true; 216 } 217 218 FileDispatcherImpl.closeIntFD(epfd); 219 EPoll.freePollArray(pollArrayAddress); 220 221 FileDispatcherImpl.closeIntFD(fd0); 222 FileDispatcherImpl.closeIntFD(fd1); 223 } 224 225 @Override 226 protected void implDereg(SelectionKeyImpl ski) throws IOException { 227 assert !ski.isValid(); 228 assert Thread.holdsLock(this); 229 230 int fd = ski.getFDVal(); 231 if (fdToKey.remove(fd) != null) { 232 if (ski.registeredEvents() != 0) { 233 EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0); 234 ski.registeredEvents(0); 235 } 236 } else { 237 assert ski.registeredEvents() == 0; 238 } 239 } 240 241 @Override 242 public void setEventOps(SelectionKeyImpl ski) { 243 ensureOpen(); 244 synchronized (updateLock) { 245 updateKeys.addLast(ski); 246 } 247 } 248 249 @Override 250 public Selector wakeup() { 251 synchronized (interruptLock) { 252 if (!interruptTriggered) { 253 try { 254 IOUtil.write1(fd1, (byte)0); 255 } catch (IOException ioe) { 256 throw new InternalError(ioe); 257 } 258 interruptTriggered = true; 259 } 260 } 261 return this; 262 } 263 264 private void clearInterrupt() throws IOException { 265 synchronized (interruptLock) { 266 IOUtil.drain(fd0); 267 interruptTriggered = false; 268 } 269 } 270 }