1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.spi.AsynchronousChannelProvider; 29 import java.util.concurrent.RejectedExecutionException; 30 import java.io.IOException; 31 import jdk.internal.misc.Unsafe; 32 33 /** 34 * Provides an AsynchronousChannelGroup implementation based on the Solaris 10 35 * event port framework and also provides direct access to that framework. 36 */ 37 38 class SolarisEventPort 39 extends Port 40 { 41 private static final Unsafe unsafe = Unsafe.getUnsafe(); 42 private static final int addressSize = unsafe.addressSize(); 43 44 private static int dependsArch(int value32, int value64) { 45 return (addressSize == 4) ? value32 : value64; 46 } 47 48 /* 49 * typedef struct port_event { 50 * int portev_events; 51 * ushort_t portev_source; 52 * ushort_t portev_pad; 53 * uintptr_t portev_object; 54 * void *portev_user; 55 * } port_event_t; 56 */ 57 static final int SIZEOF_PORT_EVENT = dependsArch(16, 24); 58 static final int OFFSETOF_EVENTS = 0; 59 static final int OFFSETOF_SOURCE = 4; 60 static final int OFFSETOF_OBJECT = 8; 61 62 // port sources 63 static final short PORT_SOURCE_USER = 3; 64 static final short PORT_SOURCE_FD = 4; 65 66 // file descriptor to event port. 67 private final int port; 68 69 // true when port is closed 70 private boolean closed; 71 72 SolarisEventPort(AsynchronousChannelProvider provider, ThreadPool pool) 73 throws IOException 74 { 75 super(provider, pool); 76 77 // create event port 78 this.port = port_create(); 79 } 80 81 SolarisEventPort start() { 82 startThreads(new EventHandlerTask()); 83 return this; 84 } 85 86 // releass resources 87 private void implClose() { 88 synchronized (this) { 89 if (closed) 90 return; 91 closed = true; 92 } 93 port_close(port); 94 } 95 96 private void wakeup() { 97 try { 98 port_send(port, 0); 99 } catch (IOException x) { 100 throw new AssertionError(x); 101 } 102 } 103 104 @Override 105 void executeOnHandlerTask(Runnable task) { 106 synchronized (this) { 107 if (closed) 108 throw new RejectedExecutionException(); 109 offerTask(task); 110 wakeup(); 111 } 112 } 113 114 @Override 115 void shutdownHandlerTasks() { 116 /* 117 * If no tasks are running then just release resources; otherwise 118 * write to the one end of the socketpair to wakeup any polling threads.. 119 */ 120 int nThreads = threadCount(); 121 if (nThreads == 0) { 122 implClose(); 123 } else { 124 // send user event to wakeup each thread 125 while (nThreads-- > 0) { 126 try { 127 port_send(port, 0); 128 } catch (IOException x) { 129 throw new AssertionError(x); 130 } 131 } 132 } 133 } 134 135 @Override 136 void startPoll(int fd, int events) { 137 // (re-)associate file descriptor 138 // no need to translate events 139 try { 140 port_associate(port, PORT_SOURCE_FD, fd, events); 141 } catch (IOException x) { 142 throw new AssertionError(); // should not happen 143 } 144 } 145 146 /* 147 * Task to read a single event from the port and dispatch it to the 148 * channel's onEvent handler. 149 */ 150 private class EventHandlerTask implements Runnable { 151 public void run() { 152 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 153 Invoker.getGroupAndInvokeCount(); 154 final boolean isPooledThread = (myGroupAndInvokeCount != null); 155 boolean replaceMe = false; 156 long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT); 157 try { 158 for (;;) { 159 // reset invoke count 160 if (isPooledThread) 161 myGroupAndInvokeCount.resetInvokeCount(); 162 163 // wait for I/O completion event 164 // A error here is fatal (thread will not be replaced) 165 replaceMe = false; 166 try { 167 port_get(port, address); 168 } catch (IOException x) { 169 x.printStackTrace(); 170 return; 171 } 172 173 // event source 174 short source = unsafe.getShort(address + OFFSETOF_SOURCE); 175 if (source != PORT_SOURCE_FD) { 176 // user event is trigger to invoke task or shutdown 177 if (source == PORT_SOURCE_USER) { 178 Runnable task = pollTask(); 179 if (task == null) { 180 // shutdown request 181 return; 182 } 183 // run task (may throw error/exception) 184 replaceMe = true; 185 task.run(); 186 } 187 // ignore 188 continue; 189 } 190 191 // pe->portev_object is file descriptor 192 int fd = (int)unsafe.getAddress(address + OFFSETOF_OBJECT); 193 // pe->portev_events 194 int events = unsafe.getInt(address + OFFSETOF_EVENTS); 195 196 // lookup channel 197 PollableChannel ch; 198 fdToChannelLock.readLock().lock(); 199 try { 200 ch = fdToChannel.get(fd); 201 } finally { 202 fdToChannelLock.readLock().unlock(); 203 } 204 205 // notify channel 206 if (ch != null) { 207 replaceMe = true; 208 // no need to translate events 209 ch.onEvent(events, isPooledThread); 210 } 211 } 212 } finally { 213 // free per-thread resources 214 unsafe.freeMemory(address); 215 // last task to exit when shutdown release resources 216 int remaining = threadExit(this, replaceMe); 217 if (remaining == 0 && isShutdown()) 218 implClose(); 219 } 220 } 221 } 222 223 /** 224 * Creates an event port 225 */ 226 static native int port_create() throws IOException; 227 228 /** 229 * Associates specific events of a given object with a port 230 */ 231 static native boolean port_associate(int port, int source, long object, int events) 232 throws IOException; 233 234 /** 235 * Removes the association of an object with a port. 236 */ 237 static native boolean port_dissociate(int port, int source, long object) 238 throws IOException; 239 240 /** 241 * Retrieves a single event from a port 242 */ 243 static native void port_get(int port, long pe) throws IOException; 244 245 /** 246 * Retrieves at most {@code max} events from a port. 247 */ 248 static native int port_getn(int port, long address, int max, long timeout) 249 throws IOException; 250 251 /** 252 * Sends a user-defined eventto a specified port. 253 */ 254 static native void port_send(int port, int events) throws IOException; 255 256 /** 257 * Closes a port. 258 */ 259 static native void port_close(int port); 260 261 262 static { 263 IOUtil.load(); 264 } 265 }