1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * Copyright 2012 SAP AG. All rights reserved. 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This code is free software; you can redistribute it and/or modify it 7 * under the terms of the GNU General Public License version 2 only, as 8 * published by the Free Software Foundation. Oracle designates this 9 * particular file as subject to the "Classpath" exception as provided 10 * by Oracle in the LICENSE file that accompanied this code. 11 * 12 * This code is distributed in the hope that it will be useful, but WITHOUT 13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 14 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 15 * version 2 for more details (a copy is included in the LICENSE file that 16 * accompanied this code). 17 * 18 * You should have received a copy of the GNU General Public License version 19 * 2 along with this work; if not, write to the Free Software Foundation, 20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 21 * 22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 23 * or visit www.oracle.com if you need additional information or have any 24 * questions. 25 */ 26 27 package sun.nio.ch; 28 29 import java.nio.channels.spi.AsynchronousChannelProvider; 30 import java.io.IOException; 31 import java.util.HashSet; 32 import java.util.Iterator; 33 import java.util.concurrent.ArrayBlockingQueue; 34 import java.util.concurrent.RejectedExecutionException; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import java.util.concurrent.locks.ReentrantLock; 37 import jdk.internal.misc.Unsafe; 38 39 /** 40 * AsynchronousChannelGroup implementation based on the AIX pollset framework. 41 */ 42 final class AixPollPort 43 extends Port 44 { 45 private static final Unsafe unsafe = Unsafe.getUnsafe(); 46 47 static { 48 IOUtil.load(); 49 init(); 50 } 51 52 /** 53 * struct pollfd { 54 * int fd; 55 * short events; 56 * short revents; 57 * } 58 */ 59 private static final int SIZEOF_POLLFD = eventSize(); 60 private static final int OFFSETOF_EVENTS = eventsOffset(); 61 private static final int OFFSETOF_REVENTS = reventsOffset(); 62 private static final int OFFSETOF_FD = fdOffset(); 63 64 // opcodes 65 private static final int PS_ADD = 0x0; 66 private static final int PS_MOD = 0x1; 67 private static final int PS_DELETE = 0x2; 68 69 // maximum number of events to poll at a time 70 private static final int MAX_POLL_EVENTS = 512; 71 72 // pollset ID 73 private final int pollset; 74 75 // true if port is closed 76 private boolean closed; 77 78 // socket pair used for wakeup 79 private final int sp[]; 80 81 // socket pair used to indicate pending pollsetCtl calls 82 // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call. 83 private final int ctlSp[]; 84 85 // number of wakeups pending 86 private final AtomicInteger wakeupCount = new AtomicInteger(); 87 88 // address of the poll array passed to pollset_poll 89 private final long address; 90 91 // encapsulates an event for a channel 92 static class Event { 93 final PollableChannel channel; 94 final int events; 95 96 Event(PollableChannel channel, int events) { 97 this.channel = channel; 98 this.events = events; 99 } 100 101 PollableChannel channel() { return channel; } 102 int events() { return events; } 103 } 104 105 // queue of events for cases that a polling thread dequeues more than one 106 // event 107 private final ArrayBlockingQueue<Event> queue; 108 private final Event NEED_TO_POLL = new Event(null, 0); 109 private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0); 110 private final Event CONTINUE_AFTER_CTL_EVENT = new Event(null, 0); 111 112 // encapsulates a pollset control event for a file descriptor 113 static class ControlEvent { 114 final int fd; 115 final int events; 116 final boolean removeOnly; 117 int error = 0; 118 119 ControlEvent(int fd, int events, boolean removeOnly) { 120 this.fd = fd; 121 this.events = events; 122 this.removeOnly = removeOnly; 123 } 124 125 int fd() { return fd; } 126 int events() { return events; } 127 boolean removeOnly() { return removeOnly; } 128 int error() { return error; } 129 void setError(int error) { this.error = error; } 130 } 131 132 // queue of control events that need to be processed 133 // (this object is also used for synchronization) 134 private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>(); 135 136 // lock used to check whether a poll operation is ongoing 137 private final ReentrantLock controlLock = new ReentrantLock(); 138 139 AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool) 140 throws IOException 141 { 142 super(provider, pool); 143 144 // open pollset 145 this.pollset = pollsetCreate(); 146 147 // create socket pair for wakeup mechanism 148 int[] sv = new int[2]; 149 try { 150 socketpair(sv); 151 // register one end with pollset 152 pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN); 153 } catch (IOException x) { 154 pollsetDestroy(pollset); 155 throw x; 156 } 157 this.sp = sv; 158 159 // create socket pair for pollset control mechanism 160 sv = new int[2]; 161 try { 162 socketpair(sv); 163 // register one end with pollset 164 pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN); 165 } catch (IOException x) { 166 pollsetDestroy(pollset); 167 throw x; 168 } 169 this.ctlSp = sv; 170 171 // allocate the poll array 172 this.address = allocatePollArray(MAX_POLL_EVENTS); 173 174 // create the queue and offer the special event to ensure that the first 175 // threads polls 176 this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS); 177 this.queue.offer(NEED_TO_POLL); 178 } 179 180 AixPollPort start() { 181 startThreads(new EventHandlerTask()); 182 return this; 183 } 184 185 /** 186 * Release all resources 187 */ 188 private void implClose() { 189 synchronized (this) { 190 if (closed) 191 return; 192 closed = true; 193 } 194 freePollArray(address); 195 close0(sp[0]); 196 close0(sp[1]); 197 close0(ctlSp[0]); 198 close0(ctlSp[1]); 199 pollsetDestroy(pollset); 200 } 201 202 private void wakeup() { 203 if (wakeupCount.incrementAndGet() == 1) { 204 // write byte to socketpair to force wakeup 205 try { 206 interrupt(sp[1]); 207 } catch (IOException x) { 208 throw new AssertionError(x); 209 } 210 } 211 } 212 213 @Override 214 void executeOnHandlerTask(Runnable task) { 215 synchronized (this) { 216 if (closed) 217 throw new RejectedExecutionException(); 218 offerTask(task); 219 wakeup(); 220 } 221 } 222 223 @Override 224 void shutdownHandlerTasks() { 225 /* 226 * If no tasks are running then just release resources; otherwise 227 * write to the one end of the socketpair to wakeup any polling threads. 228 */ 229 int nThreads = threadCount(); 230 if (nThreads == 0) { 231 implClose(); 232 } else { 233 // send interrupt to each thread 234 while (nThreads-- > 0) { 235 wakeup(); 236 } 237 } 238 } 239 240 // invoke by clients to register a file descriptor 241 @Override 242 void startPoll(int fd, int events) { 243 queueControlEvent(new ControlEvent(fd, events, false)); 244 } 245 246 // Callback method for implementations that need special handling when fd is removed 247 @Override 248 protected void preUnregister(int fd) { 249 queueControlEvent(new ControlEvent(fd, 0, true)); 250 } 251 252 // Add control event into queue and wait for completion. 253 // In case the control lock is free, this method also tries to apply the control change directly. 254 private void queueControlEvent(ControlEvent ev) { 255 // pollsetCtl blocks when a poll call is ongoing. This is very probable. 256 // Therefore we let the polling thread do the pollsetCtl call. 257 synchronized (controlQueue) { 258 controlQueue.add(ev); 259 // write byte to socketpair to force wakeup 260 try { 261 interrupt(ctlSp[1]); 262 } catch (IOException x) { 263 throw new AssertionError(x); 264 } 265 do { 266 // Directly empty queue if no poll call is ongoing. 267 if (controlLock.tryLock()) { 268 try { 269 processControlQueue(); 270 } finally { 271 controlLock.unlock(); 272 } 273 } else { 274 try { 275 // Do not starve in case the polling thread returned before 276 // we could write to ctlSp[1] but the polling thread did not 277 // release the control lock until we checked. Therefore, use 278 // a timed wait for the time being. 279 controlQueue.wait(100); 280 } catch (InterruptedException e) { 281 // ignore exception and try again 282 } 283 } 284 } while (controlQueue.contains(ev)); 285 } 286 if (ev.error() != 0) { 287 throw new AssertionError(); 288 } 289 } 290 291 // Process all events currently stored in the control queue. 292 private void processControlQueue() { 293 synchronized (controlQueue) { 294 // On Aix it is only possible to set the event 295 // bits on the first call of pollsetCtl. Later 296 // calls only add bits, but cannot remove them. 297 // Therefore, we always remove the file 298 // descriptor ignoring the error and then add it. 299 Iterator<ControlEvent> iter = controlQueue.iterator(); 300 while (iter.hasNext()) { 301 ControlEvent ev = iter.next(); 302 pollsetCtl(pollset, PS_DELETE, ev.fd(), 0); 303 if (!ev.removeOnly()) { 304 ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events())); 305 } 306 iter.remove(); 307 } 308 controlQueue.notifyAll(); 309 } 310 } 311 312 /* 313 * Task to process events from pollset and dispatch to the channel's 314 * onEvent handler. 315 * 316 * Events are retreived from pollset in batch and offered to a BlockingQueue 317 * where they are consumed by handler threads. A special "NEED_TO_POLL" 318 * event is used to signal one consumer to re-poll when all events have 319 * been consumed. 320 */ 321 private class EventHandlerTask implements Runnable { 322 private Event poll() throws IOException { 323 try { 324 for (;;) { 325 int n; 326 controlLock.lock(); 327 try { 328 n = pollsetPoll(pollset, address, MAX_POLL_EVENTS); 329 } finally { 330 controlLock.unlock(); 331 } 332 /* 333 * 'n' events have been read. Here we map them to their 334 * corresponding channel in batch and queue n-1 so that 335 * they can be handled by other handler threads. The last 336 * event is handled by this thread (and so is not queued). 337 */ 338 fdToChannelLock.readLock().lock(); 339 try { 340 while (n-- > 0) { 341 long eventAddress = getEvent(address, n); 342 int fd = getDescriptor(eventAddress); 343 344 // To emulate one shot semantic we need to remove 345 // the file descriptor here. 346 if (fd != sp[0] && fd != ctlSp[0]) { 347 synchronized (controlQueue) { 348 pollsetCtl(pollset, PS_DELETE, fd, 0); 349 } 350 } 351 352 // wakeup 353 if (fd == sp[0]) { 354 if (wakeupCount.decrementAndGet() == 0) { 355 // no more wakeups so drain pipe 356 drain1(sp[0]); 357 } 358 359 // queue special event if there are more events 360 // to handle. 361 if (n > 0) { 362 queue.offer(EXECUTE_TASK_OR_SHUTDOWN); 363 continue; 364 } 365 return EXECUTE_TASK_OR_SHUTDOWN; 366 } 367 368 // wakeup to process control event 369 if (fd == ctlSp[0]) { 370 synchronized (controlQueue) { 371 drain1(ctlSp[0]); 372 processControlQueue(); 373 } 374 if (n > 0) { 375 continue; 376 } 377 return CONTINUE_AFTER_CTL_EVENT; 378 } 379 380 PollableChannel channel = fdToChannel.get(fd); 381 if (channel != null) { 382 int events = getRevents(eventAddress); 383 Event ev = new Event(channel, events); 384 385 // n-1 events are queued; This thread handles 386 // the last one except for the wakeup 387 if (n > 0) { 388 queue.offer(ev); 389 } else { 390 return ev; 391 } 392 } 393 } 394 } finally { 395 fdToChannelLock.readLock().unlock(); 396 } 397 } 398 } finally { 399 // to ensure that some thread will poll when all events have 400 // been consumed 401 queue.offer(NEED_TO_POLL); 402 } 403 } 404 405 public void run() { 406 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 407 Invoker.getGroupAndInvokeCount(); 408 final boolean isPooledThread = (myGroupAndInvokeCount != null); 409 boolean replaceMe = false; 410 Event ev; 411 try { 412 for (;;) { 413 // reset invoke count 414 if (isPooledThread) 415 myGroupAndInvokeCount.resetInvokeCount(); 416 417 try { 418 replaceMe = false; 419 ev = queue.take(); 420 421 // no events and this thread has been "selected" to 422 // poll for more. 423 if (ev == NEED_TO_POLL) { 424 try { 425 ev = poll(); 426 } catch (IOException x) { 427 x.printStackTrace(); 428 return; 429 } 430 } 431 } catch (InterruptedException x) { 432 continue; 433 } 434 435 // contine after we processed a control event 436 if (ev == CONTINUE_AFTER_CTL_EVENT) { 437 continue; 438 } 439 440 // handle wakeup to execute task or shutdown 441 if (ev == EXECUTE_TASK_OR_SHUTDOWN) { 442 Runnable task = pollTask(); 443 if (task == null) { 444 // shutdown request 445 return; 446 } 447 // run task (may throw error/exception) 448 replaceMe = true; 449 task.run(); 450 continue; 451 } 452 453 // process event 454 try { 455 ev.channel().onEvent(ev.events(), isPooledThread); 456 } catch (Error x) { 457 replaceMe = true; throw x; 458 } catch (RuntimeException x) { 459 replaceMe = true; throw x; 460 } 461 } 462 } finally { 463 // last handler to exit when shutdown releases resources 464 int remaining = threadExit(this, replaceMe); 465 if (remaining == 0 && isShutdown()) { 466 implClose(); 467 } 468 } 469 } 470 } 471 472 /** 473 * Allocates a poll array to handle up to {@code count} events. 474 */ 475 private static long allocatePollArray(int count) { 476 return unsafe.allocateMemory(count * SIZEOF_POLLFD); 477 } 478 479 /** 480 * Free a poll array 481 */ 482 private static void freePollArray(long address) { 483 unsafe.freeMemory(address); 484 } 485 486 /** 487 * Returns event[i]; 488 */ 489 private static long getEvent(long address, int i) { 490 return address + (SIZEOF_POLLFD*i); 491 } 492 493 /** 494 * Returns event->fd 495 */ 496 private static int getDescriptor(long eventAddress) { 497 return unsafe.getInt(eventAddress + OFFSETOF_FD); 498 } 499 500 /** 501 * Returns event->events 502 */ 503 private static int getEvents(long eventAddress) { 504 return unsafe.getChar(eventAddress + OFFSETOF_EVENTS); 505 } 506 507 /** 508 * Returns event->revents 509 */ 510 private static int getRevents(long eventAddress) { 511 return unsafe.getChar(eventAddress + OFFSETOF_REVENTS); 512 } 513 514 // -- Native methods -- 515 516 private static native void init(); 517 518 private static native int eventSize(); 519 520 private static native int eventsOffset(); 521 522 private static native int reventsOffset(); 523 524 private static native int fdOffset(); 525 526 private static native int pollsetCreate() throws IOException; 527 528 private static native int pollsetCtl(int pollset, int opcode, int fd, int events); 529 530 private static native int pollsetPoll(int pollset, long pollAddress, int numfds) 531 throws IOException; 532 533 private static native void pollsetDestroy(int pollset); 534 535 private static native void socketpair(int[] sv) throws IOException; 536 537 private static native void interrupt(int fd) throws IOException; 538 539 private static native void drain1(int fd) throws IOException; 540 541 private static native void close0(int fd); 542 }