1 /*
   2  * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.spi.AsynchronousChannelProvider;
  29 import java.io.IOException;
  30 import java.util.concurrent.ArrayBlockingQueue;
  31 import java.util.concurrent.RejectedExecutionException;
  32 import java.util.concurrent.atomic.AtomicInteger;
  33 import static sun.nio.ch.KQueue.*;
  34 
  35 /**
  36  * AsynchronousChannelGroup implementation based on the BSD kqueue facility.
  37  */
  38 
  39 final class KQueuePort
  40     extends Port
  41 {
  42     // maximum number of events to poll at a time
  43     private static final int MAX_KEVENTS_TO_POLL = 512;
  44 
  45     // kqueue file descriptor
  46     private final int kqfd;
  47 
  48     // true if kqueue closed
  49     private boolean closed;
  50 
  51     // socket pair used for wakeup
  52     private final int sp[];
  53 
  54     // number of wakeups pending
  55     private final AtomicInteger wakeupCount = new AtomicInteger();
  56 
  57     // address of the poll array passed to kqueue_wait
  58     private final long address;
  59 
  60     // encapsulates an event for a channel
  61     static class Event {
  62         final PollableChannel channel;
  63         final int events;
  64 
  65         Event(PollableChannel channel, int events) {
  66             this.channel = channel;
  67             this.events = events;
  68         }
  69 
  70         PollableChannel channel()   { return channel; }
  71         int events()                { return events; }
  72     }
  73 
  74     // queue of events for cases that a polling thread dequeues more than one
  75     // event
  76     private final ArrayBlockingQueue<Event> queue;
  77     private final Event NEED_TO_POLL = new Event(null, 0);
  78     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
  79 
  80     KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
  81         throws IOException
  82     {
  83         super(provider, pool);
  84 
  85         // open kqueue
  86         this.kqfd = kqueue();
  87 
  88         // create socket pair for wakeup mechanism
  89         int[] sv = new int[2];
  90         try {
  91             socketpair(sv);
  92 
  93             // register one end with kqueue
  94             keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
  95         } catch (IOException x) {
  96             close0(kqfd);
  97             throw x;
  98         }
  99         this.sp = sv;
 100 
 101         // allocate the poll array
 102         this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
 103 
 104         // create the queue and offer the special event to ensure that the first
 105         // threads polls
 106         this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
 107         this.queue.offer(NEED_TO_POLL);
 108     }
 109 
 110     KQueuePort start() {
 111         startThreads(new EventHandlerTask());
 112         return this;
 113     }
 114 
 115     /**
 116      * Release all resources
 117      */
 118     private void implClose() {
 119         synchronized (this) {
 120             if (closed)
 121                 return;
 122             closed = true;
 123         }
 124         freePollArray(address);
 125         close0(sp[0]);
 126         close0(sp[1]);
 127         close0(kqfd);
 128     }
 129 
 130     private void wakeup() {
 131         if (wakeupCount.incrementAndGet() == 1) {
 132             // write byte to socketpair to force wakeup
 133             try {
 134                 interrupt(sp[1]);
 135             } catch (IOException x) {
 136                 throw new AssertionError(x);
 137             }
 138         }
 139     }
 140 
 141     @Override
 142     void executeOnHandlerTask(Runnable task) {
 143         synchronized (this) {
 144             if (closed)
 145                 throw new RejectedExecutionException();
 146             offerTask(task);
 147             wakeup();
 148         }
 149     }
 150 
 151     @Override
 152     void shutdownHandlerTasks() {
 153         /*
 154          * If no tasks are running then just release resources; otherwise
 155          * write to the one end of the socketpair to wakeup any polling threads.
 156          */
 157         int nThreads = threadCount();
 158         if (nThreads == 0) {
 159             implClose();
 160         } else {
 161             // send interrupt to each thread
 162             while (nThreads-- > 0) {
 163                 wakeup();
 164             }
 165         }
 166     }
 167 
 168     // invoked by clients to register a file descriptor
 169     @Override
 170     void startPoll(int fd, int events) {
 171         // We use a separate filter for read and write events.
 172         // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
 173         int err = 0;
 174         int flags = (EV_ADD|EV_ONESHOT);
 175         if ((events & Net.POLLIN) > 0)
 176             err = keventRegister(kqfd, fd, EVFILT_READ, flags);
 177         if (err == 0 && (events & Net.POLLOUT) > 0)
 178             err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
 179         if (err != 0)
 180             throw new InternalError("kevent failed: " + err);  // should not happen
 181     }
 182 
 183     /*
 184      * Task to process events from kqueue and dispatch to the channel's
 185      * onEvent handler.
 186      *
 187      * Events are retrieved from kqueue in batch and offered to a BlockingQueue
 188      * where they are consumed by handler threads. A special "NEED_TO_POLL"
 189      * event is used to signal one consumer to re-poll when all events have
 190      * been consumed.
 191      */
 192     private class EventHandlerTask implements Runnable {
 193         private Event poll() throws IOException {
 194             try {
 195                 for (;;) {
 196                     int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
 197                     /*
 198                      * 'n' events have been read. Here we map them to their
 199                      * corresponding channel in batch and queue n-1 so that
 200                      * they can be handled by other handler threads. The last
 201                      * event is handled by this thread (and so is not queued).
 202                      */
 203                     fdToChannelLock.readLock().lock();
 204                     try {
 205                         while (n-- > 0) {
 206                             long keventAddress = getEvent(address, n);
 207                             int fd = getDescriptor(keventAddress);
 208 
 209                             // wakeup
 210                             if (fd == sp[0]) {
 211                                 if (wakeupCount.decrementAndGet() == 0) {
 212                                     // no more wakeups so drain pipe
 213                                     drain1(sp[0]);
 214                                 }
 215 
 216                                 // queue special event if there are more events
 217                                 // to handle.
 218                                 if (n > 0) {
 219                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 220                                     continue;
 221                                 }
 222                                 return EXECUTE_TASK_OR_SHUTDOWN;
 223                             }
 224 
 225                             PollableChannel channel = fdToChannel.get(fd);
 226                             if (channel != null) {
 227                                 int filter = getFilter(keventAddress);
 228                                 int events = 0;
 229                                 if (filter == EVFILT_READ)
 230                                     events = Net.POLLIN;
 231                                 else if (filter == EVFILT_WRITE)
 232                                     events = Net.POLLOUT;
 233 
 234                                 Event ev = new Event(channel, events);
 235 
 236                                 // n-1 events are queued; This thread handles
 237                                 // the last one except for the wakeup
 238                                 if (n > 0) {
 239                                     queue.offer(ev);
 240                                 } else {
 241                                     return ev;
 242                                 }
 243                             }
 244                         }
 245                     } finally {
 246                         fdToChannelLock.readLock().unlock();
 247                     }
 248                 }
 249             } finally {
 250                 // to ensure that some thread will poll when all events have
 251                 // been consumed
 252                 queue.offer(NEED_TO_POLL);
 253             }
 254         }
 255 
 256         public void run() {
 257             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 258                 Invoker.getGroupAndInvokeCount();
 259             final boolean isPooledThread = (myGroupAndInvokeCount != null);
 260             boolean replaceMe = false;
 261             Event ev;
 262             try {
 263                 for (;;) {
 264                     // reset invoke count
 265                     if (isPooledThread)
 266                         myGroupAndInvokeCount.resetInvokeCount();
 267 
 268                     try {
 269                         replaceMe = false;
 270                         ev = queue.take();
 271 
 272                         // no events and this thread has been "selected" to
 273                         // poll for more.
 274                         if (ev == NEED_TO_POLL) {
 275                             try {
 276                                 ev = poll();
 277                             } catch (IOException x) {
 278                                 x.printStackTrace();
 279                                 return;
 280                             }
 281                         }
 282                     } catch (InterruptedException x) {
 283                         continue;
 284                     }
 285 
 286                     // handle wakeup to execute task or shutdown
 287                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
 288                         Runnable task = pollTask();
 289                         if (task == null) {
 290                             // shutdown request
 291                             return;
 292                         }
 293                         // run task (may throw error/exception)
 294                         replaceMe = true;
 295                         task.run();
 296                         continue;
 297                     }
 298 
 299                     // process event
 300                     try {
 301                         ev.channel().onEvent(ev.events(), isPooledThread);
 302                     } catch (Error x) {
 303                         replaceMe = true; throw x;
 304                     } catch (RuntimeException x) {
 305                         replaceMe = true; throw x;
 306                     }
 307                 }
 308             } finally {
 309                 // last handler to exit when shutdown releases resources
 310                 int remaining = threadExit(this, replaceMe);
 311                 if (remaining == 0 && isShutdown()) {
 312                     implClose();
 313                 }
 314             }
 315         }
 316     }
 317 
 318     // -- Native methods --
 319 
 320     private static native void socketpair(int[] sv) throws IOException;
 321 
 322     private static native void interrupt(int fd) throws IOException;
 323 
 324     private static native void drain1(int fd) throws IOException;
 325 
 326     private static native void close0(int fd);
 327 
 328     static {
 329         IOUtil.load();
 330     }
 331 }