1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * Copyright 2012 SAP AG. All rights reserved.
   4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5  *
   6  * This code is free software; you can redistribute it and/or modify it
   7  * under the terms of the GNU General Public License version 2 only, as
   8  * published by the Free Software Foundation.  Oracle designates this
   9  * particular file as subject to the "Classpath" exception as provided
  10  * by Oracle in the LICENSE file that accompanied this code.
  11  *
  12  * This code is distributed in the hope that it will be useful, but WITHOUT
  13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  14  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  15  * version 2 for more details (a copy is included in the LICENSE file that
  16  * accompanied this code).
  17  *
  18  * You should have received a copy of the GNU General Public License version
  19  * 2 along with this work; if not, write to the Free Software Foundation,
  20  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  21  *
  22  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  23  * or visit www.oracle.com if you need additional information or have any
  24  * questions.
  25  */
  26 
  27 package sun.nio.ch;
  28 
  29 import java.nio.channels.spi.AsynchronousChannelProvider;
  30 import java.io.IOException;
  31 import java.util.HashSet;
  32 import java.util.Iterator;
  33 import java.util.concurrent.ArrayBlockingQueue;
  34 import java.util.concurrent.RejectedExecutionException;
  35 import java.util.concurrent.atomic.AtomicInteger;
  36 import java.util.concurrent.locks.ReentrantLock;
  37 import sun.misc.Unsafe;
  38 
  39 /**
  40  * AsynchronousChannelGroup implementation based on the AIX pollset framework.
  41  */
  42 final class AixPollPort
  43     extends Port
  44 {
  45     private static final Unsafe unsafe = Unsafe.getUnsafe();
  46 
  47     static {
  48         IOUtil.load();
  49         init();
  50     }
  51 
  52     /**
  53      * struct pollfd {
  54      *     int fd;
  55      *     short events;
  56      *     short revents;
  57      * }
  58      */
  59     private static final int SIZEOF_POLLFD    = eventSize();
  60     private static final int OFFSETOF_EVENTS  = eventsOffset();
  61     private static final int OFFSETOF_REVENTS = reventsOffset();
  62     private static final int OFFSETOF_FD      = fdOffset();
  63 
  64     // opcodes
  65     private static final int PS_ADD     = 0x0;
  66     private static final int PS_MOD     = 0x1;
  67     private static final int PS_DELETE  = 0x2;
  68 
  69     // maximum number of events to poll at a time
  70     private static final int MAX_POLL_EVENTS = 512;
  71 
  72     // pollset ID
  73     private final int pollset;
  74 
  75     // true if port is closed
  76     private boolean closed;
  77 
  78     // socket pair used for wakeup
  79     private final int sp[];
  80 
  81     // socket pair used to indicate pending pollsetCtl calls
  82     // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.
  83     private final int ctlSp[];
  84 
  85     // number of wakeups pending
  86     private final AtomicInteger wakeupCount = new AtomicInteger();
  87 
  88     // address of the poll array passed to pollset_poll
  89     private final long address;
  90 
  91     // encapsulates an event for a channel
  92     static class Event {
  93         final PollableChannel channel;
  94         final int events;
  95 
  96         Event(PollableChannel channel, int events) {
  97             this.channel = channel;
  98             this.events = events;
  99         }
 100 
 101         PollableChannel channel()   { return channel; }
 102         int events()                { return events; }
 103     }
 104 
 105     // queue of events for cases that a polling thread dequeues more than one
 106     // event
 107     private final ArrayBlockingQueue<Event> queue;
 108     private final Event NEED_TO_POLL = new Event(null, 0);
 109     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
 110 
 111     // encapsulates a pollset control event for a file descriptor
 112     static class ControlEvent {
 113         final int fd;
 114         final int events;
 115         final boolean removeOnly;
 116         int error = 0;
 117 
 118         ControlEvent(int fd, int events, boolean removeOnly) {
 119             this.fd = fd;
 120             this.events = events;
 121             this.removeOnly = removeOnly;
 122         }
 123 
 124         int fd()                 { return fd; }
 125         int events()             { return events; }
 126         boolean removeOnly()     { return removeOnly; }
 127         int error()              { return error; }
 128         void setError(int error) { this.error = error; }
 129     }
 130 
 131     // queue of control events that need to be processed
 132     // (this object is also used for synchronization)
 133     private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();
 134 
 135     // lock used to check whether a poll operation is ongoing
 136     private final ReentrantLock controlLock = new ReentrantLock();
 137 
 138     AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
 139         throws IOException
 140     {
 141         super(provider, pool);
 142 
 143         // open pollset
 144         this.pollset = pollsetCreate();
 145 
 146         // create socket pair for wakeup mechanism
 147         int[] sv = new int[2];
 148         try {
 149             socketpair(sv);
 150             // register one end with pollset
 151             pollsetCtl(pollset, PS_ADD, sv[0], POLLIN);
 152         } catch (IOException x) {
 153             pollsetDestroy(pollset);
 154             throw x;
 155         }
 156         this.sp = sv;
 157 
 158         // create socket pair for pollset control mechanism
 159         sv = new int[2];
 160         try {
 161             socketpair(sv);
 162             // register one end with pollset
 163             pollsetCtl(pollset, PS_ADD, sv[0], POLLIN);
 164         } catch (IOException x) {
 165             pollsetDestroy(pollset);
 166             throw x;
 167         }
 168         this.ctlSp = sv;
 169 
 170         // allocate the poll array
 171         this.address = allocatePollArray(MAX_POLL_EVENTS);
 172 
 173         // create the queue and offer the special event to ensure that the first
 174         // threads polls
 175         this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);
 176         this.queue.offer(NEED_TO_POLL);
 177     }
 178 
 179     AixPollPort start() {
 180         startThreads(new EventHandlerTask());
 181         return this;
 182     }
 183 
 184     /**
 185      * Release all resources
 186      */
 187     private void implClose() {
 188         synchronized (this) {
 189             if (closed)
 190                 return;
 191             closed = true;
 192         }
 193         freePollArray(address);
 194         close0(sp[0]);
 195         close0(sp[1]);
 196         close0(ctlSp[0]);
 197         close0(ctlSp[1]);
 198         pollsetDestroy(pollset);
 199     }
 200 
 201     private void wakeup() {
 202         if (wakeupCount.incrementAndGet() == 1) {
 203             // write byte to socketpair to force wakeup
 204             try {
 205                 interrupt(sp[1]);
 206             } catch (IOException x) {
 207                 throw new AssertionError(x);
 208             }
 209         }
 210     }
 211 
 212     @Override
 213     void executeOnHandlerTask(Runnable task) {
 214         synchronized (this) {
 215             if (closed)
 216                 throw new RejectedExecutionException();
 217             offerTask(task);
 218             wakeup();
 219         }
 220     }
 221 
 222     @Override
 223     void shutdownHandlerTasks() {
 224         /*
 225          * If no tasks are running then just release resources; otherwise
 226          * write to the one end of the socketpair to wakeup any polling threads.
 227          */
 228         int nThreads = threadCount();
 229         if (nThreads == 0) {
 230             implClose();
 231         } else {
 232             // send interrupt to each thread
 233             while (nThreads-- > 0) {
 234                 wakeup();
 235             }
 236         }
 237     }
 238 
 239     // invoke by clients to register a file descriptor
 240     @Override
 241     void startPoll(int fd, int events) {
 242         queueControlEvent(new ControlEvent(fd, events, false));
 243     }
 244 
 245     // Callback method for implementations that need special handling when fd is removed
 246     @Override
 247     protected void preUnregister(int fd) {
 248         queueControlEvent(new ControlEvent(fd, 0, true));
 249     }
 250 
 251     // Add control event into queue and wait for completion.
 252     // In case the control lock is free, this method also tries to apply the control change directly.
 253     private void queueControlEvent(ControlEvent ev) {
 254         // pollsetCtl blocks when a poll call is ongoing. This is very probable.
 255         // Therefore we let the polling thread do the pollsetCtl call.
 256         synchronized (controlQueue) {
 257             controlQueue.add(ev);
 258             // write byte to socketpair to force wakeup
 259             try {
 260                 interrupt(ctlSp[1]);
 261             } catch (IOException x) {
 262                 throw new AssertionError(x);
 263             }
 264             do {
 265                 // Directly empty queue if no poll call is ongoing.
 266                 if (controlLock.tryLock()) {
 267                     try {
 268                         processControlQueue();
 269                     } finally {
 270                         controlLock.unlock();
 271                     }
 272                 } else {
 273                     try {
 274                         // Do not starve in case the polling thread returned before
 275                         // we could write to ctlSp[1] but the polling thread did not
 276                         // release the control lock until we checked. Therefore, use
 277                         // a timed wait for the time being.
 278                         controlQueue.wait(100);
 279                     } catch (InterruptedException e) {
 280                         // ignore exception and try again
 281                     }
 282                 }
 283             } while (controlQueue.contains(ev));
 284         }
 285         if (ev.error() != 0) {
 286             throw new AssertionError();
 287         }
 288     }
 289 
 290     // Process all events currently stored in the control queue.
 291     private void processControlQueue() {
 292         synchronized (controlQueue) {
 293             // On Aix it is only possible to set the event
 294             // bits on the first call of pollsetCtl. Later
 295             // calls only add bits, but cannot remove them.
 296             // Therefore, we always remove the file
 297             // descriptor ignoring the error and then add it.
 298             Iterator<ControlEvent> iter = controlQueue.iterator();
 299             while (iter.hasNext()) {
 300                 ControlEvent ev = iter.next();
 301                 pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);
 302                 if (!ev.removeOnly()) {
 303                     ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));
 304                 }
 305                 iter.remove();
 306             }
 307             controlQueue.notifyAll();
 308         }
 309     }
 310 
 311     /*
 312      * Task to process events from pollset and dispatch to the channel's
 313      * onEvent handler.
 314      *
 315      * Events are retreived from pollset in batch and offered to a BlockingQueue
 316      * where they are consumed by handler threads. A special "NEED_TO_POLL"
 317      * event is used to signal one consumer to re-poll when all events have
 318      * been consumed.
 319      */
 320     private class EventHandlerTask implements Runnable {
 321         private Event poll() throws IOException {
 322             try {
 323                 for (;;) {
 324                     int n;
 325                     controlLock.lock();
 326                     try {
 327                         n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);
 328                     } finally {
 329                         controlLock.unlock();
 330                     }
 331                     /*
 332                      * 'n' events have been read. Here we map them to their
 333                      * corresponding channel in batch and queue n-1 so that
 334                      * they can be handled by other handler threads. The last
 335                      * event is handled by this thread (and so is not queued).
 336                      */
 337                     fdToChannelLock.readLock().lock();
 338                     try {
 339                         while (n-- > 0) {
 340                             long eventAddress = getEvent(address, n);
 341                             int fd = getDescriptor(eventAddress);
 342 
 343                             // To emulate one shot semantic we need to remove
 344                             // the file descriptor here.
 345                             pollsetCtl(pollset, PS_DELETE, fd, 0);
 346 
 347                             // wakeup
 348                             if (fd == sp[0]) {
 349                                 if (wakeupCount.decrementAndGet() == 0) {
 350                                     // no more wakeups so drain pipe
 351                                     drain1(sp[0]);
 352                                 }
 353 
 354                                 // This is the only file descriptor without
 355                                 // one shot semantic => register it again.
 356                                 pollsetCtl(pollset, PS_ADD, sp[0], POLLIN);
 357 
 358                                 // queue special event if there are more events
 359                                 // to handle.
 360                                 if (n > 0) {
 361                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 362                                     continue;
 363                                 }
 364                                 return EXECUTE_TASK_OR_SHUTDOWN;
 365                             }
 366 
 367                             // wakeup to process control event
 368                             if (fd == ctlSp[0]) {
 369                                 synchronized (controlQueue) {
 370                                     drain1(ctlSp[0]);
 371                                     // This file descriptor does not have
 372                                     // one shot semantic => register it again.
 373                                     pollsetCtl(pollset, PS_ADD, ctlSp[0], POLLIN);
 374                                     processControlQueue();
 375                                 }
 376                                 continue;
 377                             }
 378 
 379                             PollableChannel channel = fdToChannel.get(fd);
 380                             if (channel != null) {
 381                                 int events = getRevents(eventAddress);
 382                                 Event ev = new Event(channel, events);
 383 
 384                                 // n-1 events are queued; This thread handles
 385                                 // the last one except for the wakeup
 386                                 if (n > 0) {
 387                                     queue.offer(ev);
 388                                 } else {
 389                                     return ev;
 390                                 }
 391                             }
 392                         }
 393                     } finally {
 394                         fdToChannelLock.readLock().unlock();
 395                     }
 396                 }
 397             } finally {
 398                 // to ensure that some thread will poll when all events have
 399                 // been consumed
 400                 queue.offer(NEED_TO_POLL);
 401             }
 402         }
 403 
 404         public void run() {
 405             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 406                 Invoker.getGroupAndInvokeCount();
 407             final boolean isPooledThread = (myGroupAndInvokeCount != null);
 408             boolean replaceMe = false;
 409             Event ev;
 410             try {
 411                 for (;;) {
 412                     // reset invoke count
 413                     if (isPooledThread)
 414                         myGroupAndInvokeCount.resetInvokeCount();
 415 
 416                     try {
 417                         replaceMe = false;
 418                         ev = queue.take();
 419 
 420                         // no events and this thread has been "selected" to
 421                         // poll for more.
 422                         if (ev == NEED_TO_POLL) {
 423                             try {
 424                                 ev = poll();
 425                             } catch (IOException x) {
 426                                 x.printStackTrace();
 427                                 return;
 428                             }
 429                         }
 430                     } catch (InterruptedException x) {
 431                         continue;
 432                     }
 433 
 434                     // handle wakeup to execute task or shutdown
 435                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
 436                         Runnable task = pollTask();
 437                         if (task == null) {
 438                             // shutdown request
 439                             return;
 440                         }
 441                         // run task (may throw error/exception)
 442                         replaceMe = true;
 443                         task.run();
 444                         continue;
 445                     }
 446 
 447                     // process event
 448                     try {
 449                         ev.channel().onEvent(ev.events(), isPooledThread);
 450                     } catch (Error x) {
 451                         replaceMe = true; throw x;
 452                     } catch (RuntimeException x) {
 453                         replaceMe = true; throw x;
 454                     }
 455                 }
 456             } finally {
 457                 // last handler to exit when shutdown releases resources
 458                 int remaining = threadExit(this, replaceMe);
 459                 if (remaining == 0 && isShutdown()) {
 460                     implClose();
 461                 }
 462             }
 463         }
 464     }
 465 
 466     /**
 467      * Allocates a poll array to handle up to {@code count} events.
 468      */
 469     private static long allocatePollArray(int count) {
 470         return unsafe.allocateMemory(count * SIZEOF_POLLFD);
 471     }
 472 
 473     /**
 474      * Free a poll array
 475      */
 476     private static void freePollArray(long address) {
 477         unsafe.freeMemory(address);
 478     }
 479 
 480     /**
 481      * Returns event[i];
 482      */
 483     private static long getEvent(long address, int i) {
 484         return address + (SIZEOF_POLLFD*i);
 485     }
 486 
 487     /**
 488      * Returns event->fd
 489      */
 490     private static int getDescriptor(long eventAddress) {
 491         return unsafe.getInt(eventAddress + OFFSETOF_FD);
 492     }
 493 
 494     /**
 495      * Returns event->events
 496      */
 497     private static int getEvents(long eventAddress) {
 498         return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);
 499     }
 500 
 501     /**
 502      * Returns event->revents
 503      */
 504     private static int getRevents(long eventAddress) {
 505         return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);
 506     }
 507 
 508     // -- Native methods --
 509 
 510     private static native void init();
 511 
 512     private static native int eventSize();
 513 
 514     private static native int eventsOffset();
 515 
 516     private static native int reventsOffset();
 517 
 518     private static native int fdOffset();
 519 
 520     private static native int pollsetCreate() throws IOException;
 521 
 522     private static native int pollsetCtl(int pollset, int opcode, int fd, int events);
 523 
 524     private static native int pollsetPoll(int pollset, long pollAddress, int numfds)
 525         throws IOException;
 526 
 527     private static native void pollsetDestroy(int pollset);
 528 
 529     private static native void socketpair(int[] sv) throws IOException;
 530 
 531     private static native void interrupt(int fd) throws IOException;
 532 
 533     private static native void drain1(int fd) throws IOException;
 534 
 535     private static native void close0(int fd);
 536 }