1 /* 2 * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.IOException; 29 import java.nio.channels.ClosedSelectorException; 30 import java.nio.channels.SelectionKey; 31 import java.nio.channels.Selector; 32 import java.nio.channels.spi.SelectorProvider; 33 import java.util.ArrayDeque; 34 import java.util.Deque; 35 import java.util.HashMap; 36 import java.util.Map; 37 import java.util.concurrent.TimeUnit; 38 import java.util.function.Consumer; 39 40 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_FD; 41 import static sun.nio.ch.SolarisEventPort.PORT_SOURCE_USER; 42 import static sun.nio.ch.SolarisEventPort.SIZEOF_PORT_EVENT; 43 import static sun.nio.ch.SolarisEventPort.OFFSETOF_EVENTS; 44 import static sun.nio.ch.SolarisEventPort.OFFSETOF_SOURCE; 45 import static sun.nio.ch.SolarisEventPort.OFFSETOF_OBJECT; 46 import static sun.nio.ch.SolarisEventPort.port_create; 47 import static sun.nio.ch.SolarisEventPort.port_close; 48 import static sun.nio.ch.SolarisEventPort.port_associate; 49 import static sun.nio.ch.SolarisEventPort.port_dissociate; 50 import static sun.nio.ch.SolarisEventPort.port_getn; 51 import static sun.nio.ch.SolarisEventPort.port_send; 52 53 /** 54 * Selector implementation based on the Solaris event port mechanism. 55 */ 56 57 class EventPortSelectorImpl 58 extends SelectorImpl 59 { 60 // maximum number of events to retrive in one call to port_getn 61 static final int MAX_EVENTS = Math.min(IOUtil.fdLimit()-1, 1024); 62 63 // port file descriptor 64 private final int pfd; 65 66 // the poll array (populated by port_getn) 67 private final long pollArrayAddress; 68 private final AllocatedNativeObject pollArray; 69 70 // maps file descriptor to selection key, synchronize on selector 71 private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>(); 72 73 // the last update operation, incremented by processUpdateQueue 74 private int lastUpdate; 75 76 // pending new registrations/updates, queued by setEventOps and 77 // updateSelectedKeys 78 private final Object updateLock = new Object(); 79 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>(); 80 81 // interrupt triggering and clearing 82 private final Object interruptLock = new Object(); 83 private boolean interruptTriggered; 84 85 EventPortSelectorImpl(SelectorProvider sp) throws IOException { 86 super(sp); 87 88 this.pfd = port_create(); 89 90 int allocationSize = MAX_EVENTS * SIZEOF_PORT_EVENT; 91 this.pollArray = new AllocatedNativeObject(allocationSize, false); 92 this.pollArrayAddress = pollArray.address(); 93 } 94 95 private void ensureOpen() { 96 if (!isOpen()) 97 throw new ClosedSelectorException(); 98 } 99 100 @Override 101 protected int doSelect(Consumer<SelectionKey> action, long timeout) 102 throws IOException 103 { 104 assert Thread.holdsLock(this); 105 106 long to = timeout; 107 boolean blocking = (to != 0); 108 boolean timedPoll = (to > 0); 109 110 int numEvents; 111 processUpdateQueue(); 112 processDeregisterQueue(); 113 try { 114 begin(blocking); 115 116 do { 117 long startTime = timedPoll ? System.nanoTime() : 0; 118 numEvents = port_getn(pfd, pollArrayAddress, MAX_EVENTS, to); 119 if (numEvents == IOStatus.INTERRUPTED && timedPoll) { 120 // timed poll interrupted so need to adjust timeout 121 long adjust = System.nanoTime() - startTime; 122 to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); 123 if (to <= 0) { 124 // timeout also expired so no retry 125 numEvents = 0; 126 } 127 } 128 } while (numEvents == IOStatus.INTERRUPTED); 129 assert IOStatus.check(numEvents); 130 131 } finally { 132 end(blocking); 133 } 134 processDeregisterQueue(); 135 return processPortEvents(numEvents, action); 136 } 137 138 /** 139 * Process new registrations and changes to the interest ops. 140 */ 141 private void processUpdateQueue() throws IOException { 142 assert Thread.holdsLock(this); 143 144 // bump lastUpdate to ensure that the interest ops are changed at most 145 // once per bulk update 146 lastUpdate++; 147 148 synchronized (updateLock) { 149 SelectionKeyImpl ski; 150 while ((ski = updateKeys.pollFirst()) != null) { 151 if (ski.isValid()) { 152 int fd = ski.getFDVal(); 153 // add to fdToKey if needed 154 SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); 155 assert (previous == null) || (previous == ski); 156 157 int newEvents = ski.translateInterestOps(); 158 if (newEvents != ski.registeredEvents()) { 159 if (newEvents == 0) { 160 port_dissociate(pfd, PORT_SOURCE_FD, fd); 161 } else { 162 port_associate(pfd, PORT_SOURCE_FD, fd, newEvents); 163 } 164 ski.registeredEvents(newEvents); 165 } 166 } 167 } 168 } 169 } 170 171 /** 172 * Process the polled events and re-queue the selected keys so the file 173 * descriptors are re-associated at the next select operation. 174 */ 175 private int processPortEvents(int numEvents, Consumer<SelectionKey> action) 176 throws IOException 177 { 178 assert Thread.holdsLock(this); 179 180 int numKeysUpdated = 0; 181 boolean interrupted = false; 182 183 // Process the polled events while holding the update lock. This allows 184 // keys to be queued for ready file descriptors so they can be 185 // re-associated at the next select. The selected-key can be updated 186 // in this pass. 187 synchronized (updateLock) { 188 for (int i = 0; i < numEvents; i++) { 189 short source = getSource(i); 190 if (source == PORT_SOURCE_FD) { 191 int fd = getDescriptor(i); 192 SelectionKeyImpl ski = fdToKey.get(fd); 193 if (ski != null) { 194 ski.registeredEvents(0); 195 updateKeys.addLast(ski); 196 197 // update selected-key set if no action specified 198 if (action == null) { 199 int rOps = getEventOps(i); 200 numKeysUpdated += processReadyEvents(rOps, ski, null); 201 } 202 203 } 204 } else if (source == PORT_SOURCE_USER) { 205 interrupted = true; 206 } else { 207 assert false; 208 } 209 } 210 } 211 212 // if an action specified then iterate over the polled events again so 213 // that the action is performed without holding the update lock. 214 if (action != null) { 215 for (int i = 0; i < numEvents; i++) { 216 short source = getSource(i); 217 if (source == PORT_SOURCE_FD) { 218 int fd = getDescriptor(i); 219 SelectionKeyImpl ski = fdToKey.get(fd); 220 if (ski != null) { 221 int rOps = getEventOps(i); 222 numKeysUpdated += processReadyEvents(rOps, ski, action); 223 } 224 } 225 } 226 } 227 228 if (interrupted) { 229 clearInterrupt(); 230 } 231 return numKeysUpdated; 232 } 233 234 @Override 235 protected void implClose() throws IOException { 236 assert !isOpen(); 237 assert Thread.holdsLock(this); 238 239 // prevent further wakeup 240 synchronized (interruptLock) { 241 interruptTriggered = true; 242 } 243 244 port_close(pfd); 245 pollArray.free(); 246 } 247 248 @Override 249 protected void implDereg(SelectionKeyImpl ski) throws IOException { 250 assert !ski.isValid(); 251 assert Thread.holdsLock(this); 252 253 int fd = ski.getFDVal(); 254 if (fdToKey.remove(fd) != null) { 255 if (ski.registeredEvents() != 0) { 256 port_dissociate(pfd, PORT_SOURCE_FD, fd); 257 ski.registeredEvents(0); 258 } 259 } else { 260 assert ski.registeredEvents() == 0; 261 } 262 } 263 264 @Override 265 public void setEventOps(SelectionKeyImpl ski) { 266 ensureOpen(); 267 synchronized (updateLock) { 268 updateKeys.addLast(ski); 269 } 270 } 271 272 @Override 273 public Selector wakeup() { 274 synchronized (interruptLock) { 275 if (!interruptTriggered) { 276 try { 277 port_send(pfd, 0); 278 } catch (IOException ioe) { 279 throw new InternalError(ioe); 280 } 281 interruptTriggered = true; 282 } 283 } 284 return this; 285 } 286 287 private void clearInterrupt() throws IOException { 288 synchronized (interruptLock) { 289 interruptTriggered = false; 290 } 291 } 292 293 private short getSource(int i) { 294 int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_SOURCE; 295 return pollArray.getShort(offset); 296 } 297 298 private int getEventOps(int i) { 299 int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_EVENTS; 300 return pollArray.getInt(offset); 301 } 302 303 private int getDescriptor(int i) { 304 //assert Unsafe.getUnsafe().addressSize() == 8; 305 int offset = SIZEOF_PORT_EVENT * i + OFFSETOF_OBJECT; 306 return (int) pollArray.getLong(offset); 307 } 308 }