1 /* 2 * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.spi.AsynchronousChannelProvider; 29 import java.util.concurrent.RejectedExecutionException; 30 import java.io.IOException; 31 import jdk.internal.misc.Unsafe; 32 33 /** 34 * Provides an AsynchronousChannelGroup implementation based on the Solaris 10 35 * event port framework and also provides direct access to that framework. 36 */ 37 38 class SolarisEventPort 39 extends Port 40 { 41 private static final Unsafe unsafe = Unsafe.getUnsafe(); 42 private static final int addressSize = unsafe.addressSize(); 43 44 private static int dependsArch(int value32, int value64) { 45 return (addressSize == 4) ? value32 : value64; 46 } 47 48 /* 49 * typedef struct port_event { 50 * int portev_events; 51 * ushort_t portev_source; 52 * ushort_t portev_pad; 53 * uintptr_t portev_object; 54 * void *portev_user; 55 * } port_event_t; 56 */ 57 static final int SIZEOF_PORT_EVENT = dependsArch(16, 24); 58 static final int OFFSETOF_EVENTS = 0; 59 static final int OFFSETOF_SOURCE = 4; 60 static final int OFFSETOF_OBJECT = 8; 61 62 // port sources 63 static final short PORT_SOURCE_USER = 3; 64 static final short PORT_SOURCE_FD = 4; 65 66 // file descriptor to event port. 67 private final int port; 68 69 // true when port is closed 70 private boolean closed; 71 72 SolarisEventPort(AsynchronousChannelProvider provider, ThreadPool pool) 73 throws IOException 74 { 75 super(provider, pool); 76 77 // create event port 78 this.port = port_create(); 79 } 80 81 SolarisEventPort start() { 82 startThreads(new EventHandlerTask()); 83 return this; 84 } 85 86 // releass resources 87 private void implClose() { 88 synchronized (this) { 89 if (closed) 90 return; 91 closed = true; 92 } 93 port_close(port); 94 } 95 96 private void wakeup() { 97 try { 98 port_send(port, 0); 99 } catch (IOException x) { 100 throw new AssertionError(x); 101 } 102 } 103 104 @Override 105 void executeOnHandlerTask(Runnable task) { 106 synchronized (this) { 107 if (closed) 108 throw new RejectedExecutionException(); 109 offerTask(task); 110 wakeup(); 111 } 112 } 113 114 @Override 115 void shutdownHandlerTasks() { 116 /* 117 * If no tasks are running then just release resources; otherwise 118 * write to the one end of the socketpair to wakeup any polling threads.. 119 */ 120 int nThreads = threadCount(); 121 if (nThreads == 0) { 122 implClose(); 123 } else { 124 // send user event to wakeup each thread 125 while (nThreads-- > 0) { 126 try { 127 port_send(port, 0); 128 } catch (IOException x) { 129 throw new AssertionError(x); 130 } 131 } 132 } 133 } 134 135 @Override 136 void startPoll(int fd, int events) { 137 // (re-)associate file descriptor 138 // no need to translate events 139 try { 140 port_associate(port, PORT_SOURCE_FD, fd, events); 141 } catch (IOException x) { 142 throw new AssertionError(); // should not happen 143 } 144 } 145 146 /* 147 * Task to read a single event from the port and dispatch it to the 148 * channel's onEvent handler. 149 */ 150 private class EventHandlerTask implements Runnable { 151 public void run() { 152 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 153 Invoker.getGroupAndInvokeCount(); 154 final boolean isPooledThread = (myGroupAndInvokeCount != null); 155 boolean replaceMe = false; 156 long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT); 157 try { 158 for (;;) { 159 // reset invoke count 160 if (isPooledThread) 161 myGroupAndInvokeCount.resetInvokeCount(); 162 163 // wait for I/O completion event 164 // A error here is fatal (thread will not be replaced) 165 replaceMe = false; 166 try { 167 int n; 168 do { 169 n = port_get(port, address); 170 } while (n == IOStatus.INTERRUPTED); 171 } catch (IOException x) { 172 x.printStackTrace(); 173 return; 174 } 175 176 // event source 177 short source = unsafe.getShort(address + OFFSETOF_SOURCE); 178 if (source != PORT_SOURCE_FD) { 179 // user event is trigger to invoke task or shutdown 180 if (source == PORT_SOURCE_USER) { 181 Runnable task = pollTask(); 182 if (task == null) { 183 // shutdown request 184 return; 185 } 186 // run task (may throw error/exception) 187 replaceMe = true; 188 task.run(); 189 } 190 // ignore 191 continue; 192 } 193 194 // pe->portev_object is file descriptor 195 int fd = (int)unsafe.getAddress(address + OFFSETOF_OBJECT); 196 // pe->portev_events 197 int events = unsafe.getInt(address + OFFSETOF_EVENTS); 198 199 // lookup channel 200 PollableChannel ch; 201 fdToChannelLock.readLock().lock(); 202 try { 203 ch = fdToChannel.get(fd); 204 } finally { 205 fdToChannelLock.readLock().unlock(); 206 } 207 208 // notify channel 209 if (ch != null) { 210 replaceMe = true; 211 // no need to translate events 212 ch.onEvent(events, isPooledThread); 213 } 214 } 215 } finally { 216 // free per-thread resources 217 unsafe.freeMemory(address); 218 // last task to exit when shutdown release resources 219 int remaining = threadExit(this, replaceMe); 220 if (remaining == 0 && isShutdown()) 221 implClose(); 222 } 223 } 224 } 225 226 /** 227 * Creates an event port 228 */ 229 static native int port_create() throws IOException; 230 231 /** 232 * Associates specific events of a given object with a port 233 */ 234 static native boolean port_associate(int port, int source, long object, int events) 235 throws IOException; 236 237 /** 238 * Removes the association of an object with a port. 239 */ 240 static native boolean port_dissociate(int port, int source, long object) 241 throws IOException; 242 243 /** 244 * Retrieves a single event from a port 245 */ 246 static native int port_get(int port, long address) throws IOException; 247 248 /** 249 * Retrieves at most {@code max} events from a port. 250 */ 251 static native int port_getn(int port, long address, int max, long timeout) 252 throws IOException; 253 254 /** 255 * Sends a user-defined eventto a specified port. 256 */ 257 static native void port_send(int port, int events) throws IOException; 258 259 /** 260 * Closes a port. 261 */ 262 static native void port_close(int port); 263 264 265 static { 266 IOUtil.load(); 267 } 268 }