1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * Copyright 2012 SAP AG. All rights reserved. 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This code is free software; you can redistribute it and/or modify it 7 * under the terms of the GNU General Public License version 2 only, as 8 * published by the Free Software Foundation. Oracle designates this 9 * particular file as subject to the "Classpath" exception as provided 10 * by Oracle in the LICENSE file that accompanied this code. 11 * 12 * This code is distributed in the hope that it will be useful, but WITHOUT 13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 15 * version 2 for more details (a copy is included in the LICENSE file that 16 * accompanied this code). 17 * 18 * You should have received a copy of the GNU General Public License version 19 * 2 along with this work; if not, write to the Free Software Foundation, 20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 21 * 22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 23 * or visit www.oracle.com if you need additional information or have any 24 * questions. 25 */ 26 27 package sun.nio.ch; 28 29 import java.nio.channels.spi.AsynchronousChannelProvider; 30 import java.io.IOException; 31 import java.util.HashSet; 32 import java.util.Iterator; 33 import java.util.concurrent.ArrayBlockingQueue; 34 import java.util.concurrent.RejectedExecutionException; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import java.util.concurrent.locks.ReentrantLock; 37 import sun.misc.Unsafe; 38 39 /** 40 * AsynchronousChannelGroup implementation based on the AIX pollset framework. 41 */ 42 final class AixPollPort 43 extends Port 44 { 45 private static final Unsafe unsafe = Unsafe.getUnsafe(); 46 47 static { 48 IOUtil.load(); 49 init(); 50 } 51 52 /** 53 * struct pollfd { 54 * int fd; 55 * short events; 56 * short revents; 57 * } 58 */ 59 private static final int SIZEOF_POLLFD = eventSize(); 60 private static final int OFFSETOF_EVENTS = eventsOffset(); 61 private static final int OFFSETOF_REVENTS = reventsOffset(); 62 private static final int OFFSETOF_FD = fdOffset(); 63 64 // opcodes 65 private static final int PS_ADD = 0x0; 66 private static final int PS_MOD = 0x1; 67 private static final int PS_DELETE = 0x2; 68 69 // maximum number of events to poll at a time 70 private static final int MAX_POLL_EVENTS = 512; 71 72 // pollset ID 73 private final int pollset; 74 75 // true if port is closed 76 private boolean closed; 77 78 // socket pair used for wakeup 79 private final int sp[]; 80 81 // socket pair used to indicate pending pollsetCtl calls 82 // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call. 83 private final int ctlSp[]; 84 85 // number of wakeups pending 86 private final AtomicInteger wakeupCount = new AtomicInteger(); 87 88 // address of the poll array passed to pollset_poll 89 private final long address; 90 91 // encapsulates an event for a channel 92 static class Event { 93 final PollableChannel channel; 94 final int events; 95 96 Event(PollableChannel channel, int events) { 97 this.channel = channel; 98 this.events = events; 99 } 100 101 PollableChannel channel() { return channel; } 102 int events() { return events; } 103 } 104 105 // queue of events for cases that a polling thread dequeues more than one 106 // event 107 private final ArrayBlockingQueue<Event> queue; 108 private final Event NEED_TO_POLL = new Event(null, 0); 109 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); 110 111 // encapsulates a pollset control event for a file descriptor 112 static class ControlEvent { 113 final int fd; 114 final int events; 115 final boolean removeOnly; 116 int error = 0; 117 118 ControlEvent(int fd, int events, boolean removeOnly) { 119 this.fd = fd; 120 this.events = events; 121 this.removeOnly = removeOnly; 122 } 123 124 int fd() { return fd; } 125 int events() { return events; } 126 boolean removeOnly() { return removeOnly; } 127 int error() { return error; } 128 void setError(int error) { this.error = error; } 129 } 130 131 // queue of control events that need to be processed 132 // (this object is also used for synchronization) 133 private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>(); 134 135 // lock used to check whether a poll operation is ongoing 136 private final ReentrantLock controlLock = new ReentrantLock(); 137 138 AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool) 139 throws IOException 140 { 141 super(provider, pool); 142 143 // open pollset 144 this.pollset = pollsetCreate(); 145 146 // create socket pair for wakeup mechanism 147 int[] sv = new int[2]; 148 try { 149 socketpair(sv); 150 // register one end with pollset 151 pollsetCtl(pollset, PS_ADD, sv[0], POLLIN); 152 } catch (IOException x) { 153 pollsetDestroy(pollset); 154 throw x; 155 } 156 this.sp = sv; 157 158 // create socket pair for pollset control mechanism 159 sv = new int[2]; 160 try { 161 socketpair(sv); 162 // register one end with pollset 163 pollsetCtl(pollset, PS_ADD, sv[0], POLLIN); 164 } catch (IOException x) { 165 pollsetDestroy(pollset); 166 throw x; 167 } 168 this.ctlSp = sv; 169 170 // allocate the poll array 171 this.address = allocatePollArray(MAX_POLL_EVENTS); 172 173 // create the queue and offer the special event to ensure that the first 174 // threads polls 175 this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS); 176 this.queue.offer(NEED_TO_POLL); 177 } 178 179 AixPollPort start() { 180 startThreads(new EventHandlerTask()); 181 return this; 182 } 183 184 /** 185 * Release all resources 186 */ 187 private void implClose() { 188 synchronized (this) { 189 if (closed) 190 return; 191 closed = true; 192 } 193 freePollArray(address); 194 close0(sp[0]); 195 close0(sp[1]); 196 close0(ctlSp[0]); 197 close0(ctlSp[1]); 198 pollsetDestroy(pollset); 199 } 200 201 private void wakeup() { 202 if (wakeupCount.incrementAndGet() == 1) { 203 // write byte to socketpair to force wakeup 204 try { 205 interrupt(sp[1]); 206 } catch (IOException x) { 207 throw new AssertionError(x); 208 } 209 } 210 } 211 212 @Override 213 void executeOnHandlerTask(Runnable task) { 214 synchronized (this) { 215 if (closed) 216 throw new RejectedExecutionException(); 217 offerTask(task); 218 wakeup(); 219 } 220 } 221 222 @Override 223 void shutdownHandlerTasks() { 224 /* 225 * If no tasks are running then just release resources; otherwise 226 * write to the one end of the socketpair to wakeup any polling threads. 227 */ 228 int nThreads = threadCount(); 229 if (nThreads == 0) { 230 implClose(); 231 } else { 232 // send interrupt to each thread 233 while (nThreads-- > 0) { 234 wakeup(); 235 } 236 } 237 } 238 239 // invoke by clients to register a file descriptor 240 @Override 241 void startPoll(int fd, int events) { 242 queueControlEvent(new ControlEvent(fd, events, false)); 243 } 244 245 // Callback method for implementations that need special handling when fd is removed 246 @Override 247 protected void preUnregister(int fd) { 248 queueControlEvent(new ControlEvent(fd, 0, true)); 249 } 250 251 // Add control event into queue and wait for completion. 252 // In case the control lock is free, this method also tries to apply the control change directly. 253 private void queueControlEvent(ControlEvent ev) { 254 // pollsetCtl blocks when a poll call is ongoing. This is very probable. 255 // Therefore we let the polling thread do the pollsetCtl call. 256 synchronized (controlQueue) { 257 controlQueue.add(ev); 258 // write byte to socketpair to force wakeup 259 try { 260 interrupt(ctlSp[1]); 261 } catch (IOException x) { 262 throw new AssertionError(x); 263 } 264 do { 265 // Directly empty queue if no poll call is ongoing. 266 if (controlLock.tryLock()) { 267 try { 268 processControlQueue(); 269 } finally { 270 controlLock.unlock(); 271 } 272 } else { 273 try { 274 // Do not starve in case the polling thread returned before 275 // we could write to ctlSp[1] but the polling thread did not 276 // release the control lock until we checked. Therefore, use 277 // a timed wait for the time being. 278 controlQueue.wait(100); 279 } catch (InterruptedException e) { 280 // ignore exception and try again 281 } 282 } 283 } while (controlQueue.contains(ev)); 284 } 285 if (ev.error() != 0) { 286 throw new AssertionError(); 287 } 288 } 289 290 // Process all events currently stored in the control queue. 291 private void processControlQueue() { 292 synchronized (controlQueue) { 293 // On Aix it is only possible to set the event 294 // bits on the first call of pollsetCtl. Later 295 // calls only add bits, but cannot remove them. 296 // Therefore, we always remove the file 297 // descriptor ignoring the error and then add it. 298 Iterator<ControlEvent> iter = controlQueue.iterator(); 299 while (iter.hasNext()) { 300 ControlEvent ev = iter.next(); 301 pollsetCtl(pollset, PS_DELETE, ev.fd(), 0); 302 if (!ev.removeOnly()) { 303 ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events())); 304 } 305 iter.remove(); 306 } 307 controlQueue.notifyAll(); 308 } 309 } 310 311 /* 312 * Task to process events from pollset and dispatch to the channel's 313 * onEvent handler. 314 * 315 * Events are retreived from pollset in batch and offered to a BlockingQueue 316 * where they are consumed by handler threads. A special "NEED_TO_POLL" 317 * event is used to signal one consumer to re-poll when all events have 318 * been consumed. 319 */ 320 private class EventHandlerTask implements Runnable { 321 private Event poll() throws IOException { 322 try { 323 for (;;) { 324 int n; 325 controlLock.lock(); 326 try { 327 n = pollsetPoll(pollset, address, MAX_POLL_EVENTS); 328 } finally { 329 controlLock.unlock(); 330 } 331 /* 332 * 'n' events have been read. Here we map them to their 333 * corresponding channel in batch and queue n-1 so that 334 * they can be handled by other handler threads. The last 335 * event is handled by this thread (and so is not queued). 336 */ 337 fdToChannelLock.readLock().lock(); 338 try { 339 while (n-- > 0) { 340 long eventAddress = getEvent(address, n); 341 int fd = getDescriptor(eventAddress); 342 343 // To emulate one shot semantic we need to remove 344 // the file descriptor here. 345 pollsetCtl(pollset, PS_DELETE, fd, 0); 346 347 // wakeup 348 if (fd == sp[0]) { 349 if (wakeupCount.decrementAndGet() == 0) { 350 // no more wakeups so drain pipe 351 drain1(sp[0]); 352 } 353 354 // This is the only file descriptor without 355 // one shot semantic => register it again. 356 pollsetCtl(pollset, PS_ADD, sp[0], POLLIN); 357 358 // queue special event if there are more events 359 // to handle. 360 if (n > 0) { 361 queue.offer(EXECUTE_TASK_OR_SHUTDOWN); 362 continue; 363 } 364 return EXECUTE_TASK_OR_SHUTDOWN; 365 } 366 367 // wakeup to process control event 368 if (fd == ctlSp[0]) { 369 synchronized (controlQueue) { 370 drain1(ctlSp[0]); 371 // This file descriptor does not have 372 // one shot semantic => register it again. 373 pollsetCtl(pollset, PS_ADD, ctlSp[0], POLLIN); 374 processControlQueue(); 375 } 376 continue; 377 } 378 379 PollableChannel channel = fdToChannel.get(fd); 380 if (channel != null) { 381 int events = getRevents(eventAddress); 382 Event ev = new Event(channel, events); 383 384 // n-1 events are queued; This thread handles 385 // the last one except for the wakeup 386 if (n > 0) { 387 queue.offer(ev); 388 } else { 389 return ev; 390 } 391 } 392 } 393 } finally { 394 fdToChannelLock.readLock().unlock(); 395 } 396 } 397 } finally { 398 // to ensure that some thread will poll when all events have 399 // been consumed 400 queue.offer(NEED_TO_POLL); 401 } 402 } 403 404 public void run() { 405 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 406 Invoker.getGroupAndInvokeCount(); 407 final boolean isPooledThread = (myGroupAndInvokeCount != null); 408 boolean replaceMe = false; 409 Event ev; 410 try { 411 for (;;) { 412 // reset invoke count 413 if (isPooledThread) 414 myGroupAndInvokeCount.resetInvokeCount(); 415 416 try { 417 replaceMe = false; 418 ev = queue.take(); 419 420 // no events and this thread has been "selected" to 421 // poll for more. 422 if (ev == NEED_TO_POLL) { 423 try { 424 ev = poll(); 425 } catch (IOException x) { 426 x.printStackTrace(); 427 return; 428 } 429 } 430 } catch (InterruptedException x) { 431 continue; 432 } 433 434 // handle wakeup to execute task or shutdown 435 if (ev == EXECUTE_TASK_OR_SHUTDOWN) { 436 Runnable task = pollTask(); 437 if (task == null) { 438 // shutdown request 439 return; 440 } 441 // run task (may throw error/exception) 442 replaceMe = true; 443 task.run(); 444 continue; 445 } 446 447 // process event 448 try { 449 ev.channel().onEvent(ev.events(), isPooledThread); 450 } catch (Error x) { 451 replaceMe = true; throw x; 452 } catch (RuntimeException x) { 453 replaceMe = true; throw x; 454 } 455 } 456 } finally { 457 // last handler to exit when shutdown releases resources 458 int remaining = threadExit(this, replaceMe); 459 if (remaining == 0 && isShutdown()) { 460 implClose(); 461 } 462 } 463 } 464 } 465 466 /** 467 * Allocates a poll array to handle up to {@code count} events. 468 */ 469 private static long allocatePollArray(int count) { 470 return unsafe.allocateMemory(count * SIZEOF_POLLFD); 471 } 472 473 /** 474 * Free a poll array 475 */ 476 private static void freePollArray(long address) { 477 unsafe.freeMemory(address); 478 } 479 480 /** 481 * Returns event[i]; 482 */ 483 private static long getEvent(long address, int i) { 484 return address + (SIZEOF_POLLFD*i); 485 } 486 487 /** 488 * Returns event->fd 489 */ 490 private static int getDescriptor(long eventAddress) { 491 return unsafe.getInt(eventAddress + OFFSETOF_FD); 492 } 493 494 /** 495 * Returns event->events 496 */ 497 private static int getEvents(long eventAddress) { 498 return unsafe.getChar(eventAddress + OFFSETOF_EVENTS); 499 } 500 501 /** 502 * Returns event->revents 503 */ 504 private static int getRevents(long eventAddress) { 505 return unsafe.getChar(eventAddress + OFFSETOF_REVENTS); 506 } 507 508 // -- Native methods -- 509 510 private static native void init(); 511 512 private static native int eventSize(); 513 514 private static native int eventsOffset(); 515 516 private static native int reventsOffset(); 517 518 private static native int fdOffset(); 519 520 private static native int pollsetCreate() throws IOException; 521 522 private static native int pollsetCtl(int pollset, int opcode, int fd, int events); 523 524 private static native int pollsetPoll(int pollset, long pollAddress, int numfds) 525 throws IOException; 526 527 private static native void pollsetDestroy(int pollset); 528 529 private static native void socketpair(int[] sv) throws IOException; 530 531 private static native void interrupt(int fd) throws IOException; 532 533 private static native void drain1(int fd) throws IOException; 534 535 private static native void close0(int fd); 536 }