1 /*
   2  * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.spi.AsynchronousChannelProvider;
  29 import java.io.IOException;
  30 import java.util.concurrent.ArrayBlockingQueue;
  31 import java.util.concurrent.RejectedExecutionException;
  32 import java.util.concurrent.atomic.AtomicInteger;
  33 import static sun.nio.ch.EPoll.*;
  34 
  35 /**
  36  * AsynchronousChannelGroup implementation based on the Linux epoll facility.
  37  */
  38 
  39 final class EPollPort
  40     extends Port
  41 {
  42     // maximum number of events to poll at a time
  43     private static final int MAX_EPOLL_EVENTS = 512;
  44 
  45     // errors
  46     private static final int ENOENT     = 2;
  47 
  48     // epoll file descriptor
  49     private final int epfd;
  50 
  51     // true if epoll closed
  52     private boolean closed;
  53 
  54     // socket pair used for wakeup
  55     private final int sp[];
  56 
  57     // number of wakeups pending
  58     private final AtomicInteger wakeupCount = new AtomicInteger();
  59 
  60     // address of the poll array passed to epoll_wait
  61     private final long address;
  62 
  63     // encapsulates an event for a channel
  64     static class Event {
  65         final PollableChannel channel;
  66         final int events;
  67 
  68         Event(PollableChannel channel, int events) {
  69             this.channel = channel;
  70             this.events = events;
  71         }
  72 
  73         PollableChannel channel()   { return channel; }
  74         int events()                { return events; }
  75     }
  76 
  77     // queue of events for cases that a polling thread dequeues more than one
  78     // event
  79     private final ArrayBlockingQueue<Event> queue;
  80     private final Event NEED_TO_POLL = new Event(null, 0);
  81     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
  82 
  83     EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
  84         throws IOException
  85     {
  86         super(provider, pool);
  87 
  88         // open epoll
  89         this.epfd = epollCreate();
  90 
  91         // create socket pair for wakeup mechanism
  92         int[] sv = new int[2];
  93         try {
  94             socketpair(sv);
  95             // register one end with epoll
  96             epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);
  97         } catch (IOException x) {
  98             close0(epfd);
  99             throw x;
 100         }
 101         this.sp = sv;
 102 
 103         // allocate the poll array
 104         this.address = allocatePollArray(MAX_EPOLL_EVENTS);
 105 
 106         // create the queue and offer the special event to ensure that the first
 107         // threads polls
 108         this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
 109         this.queue.offer(NEED_TO_POLL);
 110     }
 111 
 112     EPollPort start() {
 113         startThreads(new EventHandlerTask());
 114         return this;
 115     }
 116 
 117     /**
 118      * Release all resources
 119      */
 120     private void implClose() {
 121         synchronized (this) {
 122             if (closed)
 123                 return;
 124             closed = true;
 125         }
 126         freePollArray(address);
 127         close0(sp[0]);
 128         close0(sp[1]);
 129         close0(epfd);
 130     }
 131 
 132     private void wakeup() {
 133         if (wakeupCount.incrementAndGet() == 1) {
 134             // write byte to socketpair to force wakeup
 135             try {
 136                 interrupt(sp[1]);
 137             } catch (IOException x) {
 138                 throw new AssertionError(x);
 139             }
 140         }
 141     }
 142 
 143     @Override
 144     void executeOnHandlerTask(Runnable task) {
 145         synchronized (this) {
 146             if (closed)
 147                 throw new RejectedExecutionException();
 148             offerTask(task);
 149             wakeup();
 150         }
 151     }
 152 
 153     @Override
 154     void shutdownHandlerTasks() {
 155         /*
 156          * If no tasks are running then just release resources; otherwise
 157          * write to the one end of the socketpair to wakeup any polling threads.
 158          */
 159         int nThreads = threadCount();
 160         if (nThreads == 0) {
 161             implClose();
 162         } else {
 163             // send interrupt to each thread
 164             while (nThreads-- > 0) {
 165                 wakeup();
 166             }
 167         }
 168     }
 169 
 170     // invoke by clients to register a file descriptor
 171     @Override
 172     void startPoll(int fd, int events) {
 173         // update events (or add to epoll on first usage)
 174         int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
 175         if (err == ENOENT)
 176             err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
 177         if (err != 0)
 178             throw new AssertionError();     // should not happen
 179     }
 180 
 181     /*
 182      * Task to process events from epoll and dispatch to the channel's
 183      * onEvent handler.
 184      *
 185      * Events are retrieved from epoll in batch and offered to a BlockingQueue
 186      * where they are consumed by handler threads. A special "NEED_TO_POLL"
 187      * event is used to signal one consumer to re-poll when all events have
 188      * been consumed.
 189      */
 190     private class EventHandlerTask implements Runnable {
 191         private Event poll() throws IOException {
 192             try {
 193                 for (;;) {
 194                     int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
 195                     /*
 196                      * 'n' events have been read. Here we map them to their
 197                      * corresponding channel in batch and queue n-1 so that
 198                      * they can be handled by other handler threads. The last
 199                      * event is handled by this thread (and so is not queued).
 200                      */
 201                     fdToChannelLock.readLock().lock();
 202                     try {
 203                         while (n-- > 0) {
 204                             long eventAddress = getEvent(address, n);
 205                             int fd = getDescriptor(eventAddress);
 206 
 207                             // wakeup
 208                             if (fd == sp[0]) {
 209                                 if (wakeupCount.decrementAndGet() == 0) {
 210                                     // no more wakeups so drain pipe
 211                                     drain1(sp[0]);
 212                                 }
 213 
 214                                 // queue special event if there are more events
 215                                 // to handle.
 216                                 if (n > 0) {
 217                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 218                                     continue;
 219                                 }
 220                                 return EXECUTE_TASK_OR_SHUTDOWN;
 221                             }
 222 
 223                             PollableChannel channel = fdToChannel.get(fd);
 224                             if (channel != null) {
 225                                 int events = getEvents(eventAddress);
 226                                 Event ev = new Event(channel, events);
 227 
 228                                 // n-1 events are queued; This thread handles
 229                                 // the last one except for the wakeup
 230                                 if (n > 0) {
 231                                     queue.offer(ev);
 232                                 } else {
 233                                     return ev;
 234                                 }
 235                             }
 236                         }
 237                     } finally {
 238                         fdToChannelLock.readLock().unlock();
 239                     }
 240                 }
 241             } finally {
 242                 // to ensure that some thread will poll when all events have
 243                 // been consumed
 244                 queue.offer(NEED_TO_POLL);
 245             }
 246         }
 247 
 248         public void run() {
 249             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 250                 Invoker.getGroupAndInvokeCount();
 251             final boolean isPooledThread = (myGroupAndInvokeCount != null);
 252             boolean replaceMe = false;
 253             Event ev;
 254             try {
 255                 for (;;) {
 256                     // reset invoke count
 257                     if (isPooledThread)
 258                         myGroupAndInvokeCount.resetInvokeCount();
 259 
 260                     try {
 261                         replaceMe = false;
 262                         ev = queue.take();
 263 
 264                         // no events and this thread has been "selected" to
 265                         // poll for more.
 266                         if (ev == NEED_TO_POLL) {
 267                             try {
 268                                 ev = poll();
 269                             } catch (IOException x) {
 270                                 x.printStackTrace();
 271                                 return;
 272                             }
 273                         }
 274                     } catch (InterruptedException x) {
 275                         continue;
 276                     }
 277 
 278                     // handle wakeup to execute task or shutdown
 279                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
 280                         Runnable task = pollTask();
 281                         if (task == null) {
 282                             // shutdown request
 283                             return;
 284                         }
 285                         // run task (may throw error/exception)
 286                         replaceMe = true;
 287                         task.run();
 288                         continue;
 289                     }
 290 
 291                     // process event
 292                     try {
 293                         ev.channel().onEvent(ev.events(), isPooledThread);
 294                     } catch (Error x) {
 295                         replaceMe = true; throw x;
 296                     } catch (RuntimeException x) {
 297                         replaceMe = true; throw x;
 298                     }
 299                 }
 300             } finally {
 301                 // last handler to exit when shutdown releases resources
 302                 int remaining = threadExit(this, replaceMe);
 303                 if (remaining == 0 && isShutdown()) {
 304                     implClose();
 305                 }
 306             }
 307         }
 308     }
 309 
 310     // -- Native methods --
 311 
 312     private static native void socketpair(int[] sv) throws IOException;
 313 
 314     private static native void interrupt(int fd) throws IOException;
 315 
 316     private static native void drain1(int fd) throws IOException;
 317 
 318     private static native void close0(int fd);
 319 
 320     static {
 321         IOUtil.load();
 322     }
 323 }