1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * Copyright 2012 SAP AG. All rights reserved.
   4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5  *
   6  * This code is free software; you can redistribute it and/or modify it
   7  * under the terms of the GNU General Public License version 2 only, as
   8  * published by the Free Software Foundation.  Oracle designates this
   9  * particular file as subject to the "Classpath" exception as provided
  10  * by Oracle in the LICENSE file that accompanied this code.
  11  *
  12  * This code is distributed in the hope that it will be useful, but WITHOUT
  13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  14  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  15  * version 2 for more details (a copy is included in the LICENSE file that
  16  * accompanied this code).
  17  *
  18  * You should have received a copy of the GNU General Public License version
  19  * 2 along with this work; if not, write to the Free Software Foundation,
  20  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  21  *
  22  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  23  * or visit www.oracle.com if you need additional information or have any
  24  * questions.
  25  */
  26 
  27 package sun.nio.ch;
  28 
  29 import java.nio.channels.spi.AsynchronousChannelProvider;
  30 import java.io.IOException;
  31 import java.util.HashSet;
  32 import java.util.Iterator;
  33 import java.util.concurrent.ArrayBlockingQueue;
  34 import java.util.concurrent.RejectedExecutionException;
  35 import java.util.concurrent.atomic.AtomicInteger;
  36 import java.util.concurrent.locks.ReentrantLock;
  37 import jdk.internal.misc.Unsafe;
  38 
  39 /**
  40  * AsynchronousChannelGroup implementation based on the AIX pollset framework.
  41  */
  42 final class AixPollPort
  43     extends Port
  44 {
  45     private static final Unsafe unsafe = Unsafe.getUnsafe();
  46 
  47     static {
  48         IOUtil.load();
  49         init();
  50     }
  51 
  52     /**
  53      * struct pollfd {
  54      *     int fd;
  55      *     short events;
  56      *     short revents;
  57      * }
  58      */
  59     private static final int SIZEOF_POLLFD    = eventSize();
  60     private static final int OFFSETOF_EVENTS  = eventsOffset();
  61     private static final int OFFSETOF_REVENTS = reventsOffset();
  62     private static final int OFFSETOF_FD      = fdOffset();
  63 
  64     // opcodes
  65     private static final int PS_ADD     = 0x0;
  66     private static final int PS_MOD     = 0x1;
  67     private static final int PS_DELETE  = 0x2;
  68 
  69     // maximum number of events to poll at a time
  70     private static final int MAX_POLL_EVENTS = 512;
  71 
  72     // pollset ID
  73     private final int pollset;
  74 
  75     // true if port is closed
  76     private boolean closed;
  77 
  78     // socket pair used for wakeup
  79     private final int sp[];
  80 
  81     // socket pair used to indicate pending pollsetCtl calls
  82     // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.
  83     private final int ctlSp[];
  84 
  85     // number of wakeups pending
  86     private final AtomicInteger wakeupCount = new AtomicInteger();
  87 
  88     // address of the poll array passed to pollset_poll
  89     private final long address;
  90 
  91     // encapsulates an event for a channel
  92     static class Event {
  93         final PollableChannel channel;
  94         final int events;
  95 
  96         Event(PollableChannel channel, int events) {
  97             this.channel = channel;
  98             this.events = events;
  99         }
 100 
 101         PollableChannel channel()   { return channel; }
 102         int events()                { return events; }
 103     }
 104 
 105     // queue of events for cases that a polling thread dequeues more than one
 106     // event
 107     private final ArrayBlockingQueue<Event> queue;
 108     private final Event NEED_TO_POLL = new Event(null, 0);
 109     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
 110     private final Event CONTINUE_AFTER_CTL_EVENT = new Event(null, 0);
 111 
 112     // encapsulates a pollset control event for a file descriptor
 113     static class ControlEvent {
 114         final int fd;
 115         final int events;
 116         final boolean removeOnly;
 117         int error = 0;
 118 
 119         ControlEvent(int fd, int events, boolean removeOnly) {
 120             this.fd = fd;
 121             this.events = events;
 122             this.removeOnly = removeOnly;
 123         }
 124 
 125         int fd()                 { return fd; }
 126         int events()             { return events; }
 127         boolean removeOnly()     { return removeOnly; }
 128         int error()              { return error; }
 129         void setError(int error) { this.error = error; }
 130     }
 131 
 132     // queue of control events that need to be processed
 133     // (this object is also used for synchronization)
 134     private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();
 135 
 136     // lock used to check whether a poll operation is ongoing
 137     private final ReentrantLock controlLock = new ReentrantLock();
 138 
 139     AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
 140         throws IOException
 141     {
 142         super(provider, pool);
 143 
 144         // open pollset
 145         this.pollset = pollsetCreate();
 146 
 147         // create socket pair for wakeup mechanism
 148         int[] sv = new int[2];
 149         try {
 150             socketpair(sv);
 151             // register one end with pollset
 152             pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
 153         } catch (IOException x) {
 154             pollsetDestroy(pollset);
 155             throw x;
 156         }
 157         this.sp = sv;
 158 
 159         // create socket pair for pollset control mechanism
 160         sv = new int[2];
 161         try {
 162             socketpair(sv);
 163             // register one end with pollset
 164             pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
 165         } catch (IOException x) {
 166             pollsetDestroy(pollset);
 167             throw x;
 168         }
 169         this.ctlSp = sv;
 170 
 171         // allocate the poll array
 172         this.address = allocatePollArray(MAX_POLL_EVENTS);
 173 
 174         // create the queue and offer the special event to ensure that the first
 175         // threads polls
 176         this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);
 177         this.queue.offer(NEED_TO_POLL);
 178     }
 179 
 180     AixPollPort start() {
 181         startThreads(new EventHandlerTask());
 182         return this;
 183     }
 184 
 185     /**
 186      * Release all resources
 187      */
 188     private void implClose() {
 189         synchronized (this) {
 190             if (closed)
 191                 return;
 192             closed = true;
 193         }
 194         freePollArray(address);
 195         close0(sp[0]);
 196         close0(sp[1]);
 197         close0(ctlSp[0]);
 198         close0(ctlSp[1]);
 199         pollsetDestroy(pollset);
 200     }
 201 
 202     private void wakeup() {
 203         if (wakeupCount.incrementAndGet() == 1) {
 204             // write byte to socketpair to force wakeup
 205             try {
 206                 interrupt(sp[1]);
 207             } catch (IOException x) {
 208                 throw new AssertionError(x);
 209             }
 210         }
 211     }
 212 
 213     @Override
 214     void executeOnHandlerTask(Runnable task) {
 215         synchronized (this) {
 216             if (closed)
 217                 throw new RejectedExecutionException();
 218             offerTask(task);
 219             wakeup();
 220         }
 221     }
 222 
 223     @Override
 224     void shutdownHandlerTasks() {
 225         /*
 226          * If no tasks are running then just release resources; otherwise
 227          * write to the one end of the socketpair to wakeup any polling threads.
 228          */
 229         int nThreads = threadCount();
 230         if (nThreads == 0) {
 231             implClose();
 232         } else {
 233             // send interrupt to each thread
 234             while (nThreads-- > 0) {
 235                 wakeup();
 236             }
 237         }
 238     }
 239 
 240     // invoke by clients to register a file descriptor
 241     @Override
 242     void startPoll(int fd, int events) {
 243         queueControlEvent(new ControlEvent(fd, events, false));
 244     }
 245 
 246     // Callback method for implementations that need special handling when fd is removed
 247     @Override
 248     protected void preUnregister(int fd) {
 249         queueControlEvent(new ControlEvent(fd, 0, true));
 250     }
 251 
 252     // Add control event into queue and wait for completion.
 253     // In case the control lock is free, this method also tries to apply the control change directly.
 254     private void queueControlEvent(ControlEvent ev) {
 255         // pollsetCtl blocks when a poll call is ongoing. This is very probable.
 256         // Therefore we let the polling thread do the pollsetCtl call.
 257         synchronized (controlQueue) {
 258             controlQueue.add(ev);
 259             // write byte to socketpair to force wakeup
 260             try {
 261                 interrupt(ctlSp[1]);
 262             } catch (IOException x) {
 263                 throw new AssertionError(x);
 264             }
 265             do {
 266                 // Directly empty queue if no poll call is ongoing.
 267                 if (controlLock.tryLock()) {
 268                     try {
 269                         processControlQueue();
 270                     } finally {
 271                         controlLock.unlock();
 272                     }
 273                 } else {
 274                     try {
 275                         // Do not starve in case the polling thread returned before
 276                         // we could write to ctlSp[1] but the polling thread did not
 277                         // release the control lock until we checked. Therefore, use
 278                         // a timed wait for the time being.
 279                         controlQueue.wait(100);
 280                     } catch (InterruptedException e) {
 281                         // ignore exception and try again
 282                     }
 283                 }
 284             } while (controlQueue.contains(ev));
 285         }
 286         if (ev.error() != 0) {
 287             throw new AssertionError();
 288         }
 289     }
 290 
 291     // Process all events currently stored in the control queue.
 292     private void processControlQueue() {
 293         synchronized (controlQueue) {
 294             // On Aix it is only possible to set the event
 295             // bits on the first call of pollsetCtl. Later
 296             // calls only add bits, but cannot remove them.
 297             // Therefore, we always remove the file
 298             // descriptor ignoring the error and then add it.
 299             Iterator<ControlEvent> iter = controlQueue.iterator();
 300             while (iter.hasNext()) {
 301                 ControlEvent ev = iter.next();
 302                 pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);
 303                 if (!ev.removeOnly()) {
 304                     ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));
 305                 }
 306                 iter.remove();
 307             }
 308             controlQueue.notifyAll();
 309         }
 310     }
 311 
 312     /*
 313      * Task to process events from pollset and dispatch to the channel's
 314      * onEvent handler.
 315      *
 316      * Events are retreived from pollset in batch and offered to a BlockingQueue
 317      * where they are consumed by handler threads. A special "NEED_TO_POLL"
 318      * event is used to signal one consumer to re-poll when all events have
 319      * been consumed.
 320      */
 321     private class EventHandlerTask implements Runnable {
 322         private Event poll() throws IOException {
 323             try {
 324                 for (;;) {
 325                     int n;
 326                     controlLock.lock();
 327                     try {
 328                         n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);
 329                     } finally {
 330                         controlLock.unlock();
 331                     }
 332                     /*
 333                      * 'n' events have been read. Here we map them to their
 334                      * corresponding channel in batch and queue n-1 so that
 335                      * they can be handled by other handler threads. The last
 336                      * event is handled by this thread (and so is not queued).
 337                      */
 338                     fdToChannelLock.readLock().lock();
 339                     try {
 340                         while (n-- > 0) {
 341                             long eventAddress = getEvent(address, n);
 342                             int fd = getDescriptor(eventAddress);
 343 
 344                             // To emulate one shot semantic we need to remove
 345                             // the file descriptor here.
 346                             if (fd != sp[0] && fd != ctlSp[0]) {
 347                                 synchronized (controlQueue) {
 348                                     pollsetCtl(pollset, PS_DELETE, fd, 0);
 349                                 }
 350                             }
 351 
 352                             // wakeup
 353                             if (fd == sp[0]) {
 354                                 if (wakeupCount.decrementAndGet() == 0) {
 355                                     // no more wakeups so drain pipe
 356                                     drain1(sp[0]);
 357                                 }
 358 
 359                                 // queue special event if there are more events
 360                                 // to handle.
 361                                 if (n > 0) {
 362                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 363                                     continue;
 364                                 }
 365                                 return EXECUTE_TASK_OR_SHUTDOWN;
 366                             }
 367 
 368                             // wakeup to process control event
 369                             if (fd == ctlSp[0]) {
 370                                 synchronized (controlQueue) {
 371                                     drain1(ctlSp[0]);
 372                                     processControlQueue();
 373                                 }
 374                                 if (n > 0) {
 375                                     continue;
 376                                 }
 377                                 return CONTINUE_AFTER_CTL_EVENT;
 378                             }
 379 
 380                             PollableChannel channel = fdToChannel.get(fd);
 381                             if (channel != null) {
 382                                 int events = getRevents(eventAddress);
 383                                 Event ev = new Event(channel, events);
 384 
 385                                 // n-1 events are queued; This thread handles
 386                                 // the last one except for the wakeup
 387                                 if (n > 0) {
 388                                     queue.offer(ev);
 389                                 } else {
 390                                     return ev;
 391                                 }
 392                             }
 393                         }
 394                     } finally {
 395                         fdToChannelLock.readLock().unlock();
 396                     }
 397                 }
 398             } finally {
 399                 // to ensure that some thread will poll when all events have
 400                 // been consumed
 401                 queue.offer(NEED_TO_POLL);
 402             }
 403         }
 404 
 405         public void run() {
 406             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 407                 Invoker.getGroupAndInvokeCount();
 408             final boolean isPooledThread = (myGroupAndInvokeCount != null);
 409             boolean replaceMe = false;
 410             Event ev;
 411             try {
 412                 for (;;) {
 413                     // reset invoke count
 414                     if (isPooledThread)
 415                         myGroupAndInvokeCount.resetInvokeCount();
 416 
 417                     try {
 418                         replaceMe = false;
 419                         ev = queue.take();
 420 
 421                         // no events and this thread has been "selected" to
 422                         // poll for more.
 423                         if (ev == NEED_TO_POLL) {
 424                             try {
 425                                 ev = poll();
 426                             } catch (IOException x) {
 427                                 x.printStackTrace();
 428                                 return;
 429                             }
 430                         }
 431                     } catch (InterruptedException x) {
 432                         continue;
 433                     }
 434 
 435                     // contine after we processed a control event
 436                     if (ev == CONTINUE_AFTER_CTL_EVENT) {
 437                         continue;
 438                     }
 439 
 440                     // handle wakeup to execute task or shutdown
 441                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
 442                         Runnable task = pollTask();
 443                         if (task == null) {
 444                             // shutdown request
 445                             return;
 446                         }
 447                         // run task (may throw error/exception)
 448                         replaceMe = true;
 449                         task.run();
 450                         continue;
 451                     }
 452 
 453                     // process event
 454                     try {
 455                         ev.channel().onEvent(ev.events(), isPooledThread);
 456                     } catch (Error x) {
 457                         replaceMe = true; throw x;
 458                     } catch (RuntimeException x) {
 459                         replaceMe = true; throw x;
 460                     }
 461                 }
 462             } finally {
 463                 // last handler to exit when shutdown releases resources
 464                 int remaining = threadExit(this, replaceMe);
 465                 if (remaining == 0 && isShutdown()) {
 466                     implClose();
 467                 }
 468             }
 469         }
 470     }
 471 
 472     /**
 473      * Allocates a poll array to handle up to {@code count} events.
 474      */
 475     private static long allocatePollArray(int count) {
 476         return unsafe.allocateMemory(count * SIZEOF_POLLFD);
 477     }
 478 
 479     /**
 480      * Free a poll array
 481      */
 482     private static void freePollArray(long address) {
 483         unsafe.freeMemory(address);
 484     }
 485 
 486     /**
 487      * Returns event[i];
 488      */
 489     private static long getEvent(long address, int i) {
 490         return address + (SIZEOF_POLLFD*i);
 491     }
 492 
 493     /**
 494      * Returns event->fd
 495      */
 496     private static int getDescriptor(long eventAddress) {
 497         return unsafe.getInt(eventAddress + OFFSETOF_FD);
 498     }
 499 
 500     /**
 501      * Returns event->events
 502      */
 503     private static int getEvents(long eventAddress) {
 504         return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);
 505     }
 506 
 507     /**
 508      * Returns event->revents
 509      */
 510     private static int getRevents(long eventAddress) {
 511         return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);
 512     }
 513 
 514     // -- Native methods --
 515 
 516     private static native void init();
 517 
 518     private static native int eventSize();
 519 
 520     private static native int eventsOffset();
 521 
 522     private static native int reventsOffset();
 523 
 524     private static native int fdOffset();
 525 
 526     private static native int pollsetCreate() throws IOException;
 527 
 528     private static native int pollsetCtl(int pollset, int opcode, int fd, int events);
 529 
 530     private static native int pollsetPoll(int pollset, long pollAddress, int numfds)
 531         throws IOException;
 532 
 533     private static native void pollsetDestroy(int pollset);
 534 
 535     private static native void socketpair(int[] sv) throws IOException;
 536 
 537     private static native void interrupt(int fd) throws IOException;
 538 
 539     private static native void drain1(int fd) throws IOException;
 540 
 541     private static native void close0(int fd);
 542 }