< prev index next >

src/java.base/linux/classes/sun/nio/ch/EPollPort.java

Print this page
rev 49271 : [mq]: selector-cleanup


  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.spi.AsynchronousChannelProvider;
  29 import java.io.IOException;
  30 import java.util.concurrent.ArrayBlockingQueue;
  31 import java.util.concurrent.RejectedExecutionException;
  32 import java.util.concurrent.atomic.AtomicInteger;
  33 import static sun.nio.ch.EPoll.*;






  34 
  35 /**
  36  * AsynchronousChannelGroup implementation based on the Linux epoll facility.
  37  */
  38 
  39 final class EPollPort
  40     extends Port
  41 {
  42     // maximum number of events to poll at a time
  43     private static final int MAX_EPOLL_EVENTS = 512;
  44 
  45     // errors
  46     private static final int ENOENT     = 2;
  47 
  48     // epoll file descriptor
  49     private final int epfd;
  50 



  51     // true if epoll closed
  52     private boolean closed;
  53 
  54     // socket pair used for wakeup
  55     private final int sp[];
  56 
  57     // number of wakeups pending
  58     private final AtomicInteger wakeupCount = new AtomicInteger();
  59 
  60     // address of the poll array passed to epoll_wait
  61     private final long address;
  62 
  63     // encapsulates an event for a channel
  64     static class Event {
  65         final PollableChannel channel;
  66         final int events;
  67 
  68         Event(PollableChannel channel, int events) {
  69             this.channel = channel;
  70             this.events = events;
  71         }
  72 
  73         PollableChannel channel()   { return channel; }
  74         int events()                { return events; }
  75     }
  76 
  77     // queue of events for cases that a polling thread dequeues more than one
  78     // event
  79     private final ArrayBlockingQueue<Event> queue;
  80     private final Event NEED_TO_POLL = new Event(null, 0);
  81     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
  82 
  83     EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
  84         throws IOException
  85     {
  86         super(provider, pool);
  87 
  88         // open epoll
  89         this.epfd = epollCreate();
  90 
  91         // create socket pair for wakeup mechanism
  92         int[] sv = new int[2];
  93         try {
  94             socketpair(sv);
  95             // register one end with epoll
  96             epollCtl(epfd, EPOLL_CTL_ADD, sv[0], EPOLLIN);
  97         } catch (IOException x) {
  98             close0(epfd);
  99             throw x;
 100         }
 101         this.sp = sv;
 102 
 103         // allocate the poll array
 104         this.address = allocatePollArray(MAX_EPOLL_EVENTS);
 105 
 106         // create the queue and offer the special event to ensure that the first
 107         // threads polls
 108         this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
 109         this.queue.offer(NEED_TO_POLL);
 110     }
 111 
 112     EPollPort start() {
 113         startThreads(new EventHandlerTask());
 114         return this;
 115     }
 116 
 117     /**
 118      * Release all resources
 119      */
 120     private void implClose() {
 121         synchronized (this) {
 122             if (closed)
 123                 return;
 124             closed = true;
 125         }
 126         freePollArray(address);
 127         close0(sp[0]);
 128         close0(sp[1]);
 129         close0(epfd);
 130     }
 131 
 132     private void wakeup() {
 133         if (wakeupCount.incrementAndGet() == 1) {
 134             // write byte to socketpair to force wakeup
 135             try {
 136                 interrupt(sp[1]);
 137             } catch (IOException x) {
 138                 throw new AssertionError(x);
 139             }
 140         }
 141     }
 142 
 143     @Override
 144     void executeOnHandlerTask(Runnable task) {
 145         synchronized (this) {
 146             if (closed)
 147                 throw new RejectedExecutionException();
 148             offerTask(task);
 149             wakeup();
 150         }
 151     }
 152 
 153     @Override
 154     void shutdownHandlerTasks() {
 155         /*
 156          * If no tasks are running then just release resources; otherwise
 157          * write to the one end of the socketpair to wakeup any polling threads.
 158          */
 159         int nThreads = threadCount();
 160         if (nThreads == 0) {
 161             implClose();
 162         } else {
 163             // send interrupt to each thread
 164             while (nThreads-- > 0) {
 165                 wakeup();
 166             }
 167         }
 168     }
 169 
 170     // invoke by clients to register a file descriptor
 171     @Override
 172     void startPoll(int fd, int events) {
 173         // update events (or add to epoll on first usage)
 174         int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
 175         if (err == ENOENT)
 176             err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
 177         if (err != 0)
 178             throw new AssertionError();     // should not happen
 179     }
 180 
 181     /*
 182      * Task to process events from epoll and dispatch to the channel's
 183      * onEvent handler.
 184      *
 185      * Events are retreived from epoll in batch and offered to a BlockingQueue
 186      * where they are consumed by handler threads. A special "NEED_TO_POLL"
 187      * event is used to signal one consumer to re-poll when all events have
 188      * been consumed.
 189      */
 190     private class EventHandlerTask implements Runnable {
 191         private Event poll() throws IOException {
 192             try {
 193                 for (;;) {
 194                     int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);




 195                     /*
 196                      * 'n' events have been read. Here we map them to their
 197                      * corresponding channel in batch and queue n-1 so that
 198                      * they can be handled by other handler threads. The last
 199                      * event is handled by this thread (and so is not queued).
 200                      */
 201                     fdToChannelLock.readLock().lock();
 202                     try {
 203                         while (n-- > 0) {
 204                             long eventAddress = getEvent(address, n);
 205                             int fd = getDescriptor(eventAddress);
 206 
 207                             // wakeup
 208                             if (fd == sp[0]) {
 209                                 if (wakeupCount.decrementAndGet() == 0) {
 210                                     // no more wakeups so drain pipe
 211                                     drain1(sp[0]);
 212                                 }
 213 
 214                                 // queue special event if there are more events
 215                                 // to handle.
 216                                 if (n > 0) {
 217                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 218                                     continue;
 219                                 }
 220                                 return EXECUTE_TASK_OR_SHUTDOWN;
 221                             }
 222 
 223                             PollableChannel channel = fdToChannel.get(fd);
 224                             if (channel != null) {
 225                                 int events = getEvents(eventAddress);
 226                                 Event ev = new Event(channel, events);
 227 
 228                                 // n-1 events are queued; This thread handles
 229                                 // the last one except for the wakeup
 230                                 if (n > 0) {
 231                                     queue.offer(ev);
 232                                 } else {
 233                                     return ev;
 234                                 }
 235                             }
 236                         }
 237                     } finally {
 238                         fdToChannelLock.readLock().unlock();
 239                     }
 240                 }
 241             } finally {
 242                 // to ensure that some thread will poll when all events have
 243                 // been consumed
 244                 queue.offer(NEED_TO_POLL);
 245             }


 289                     }
 290 
 291                     // process event
 292                     try {
 293                         ev.channel().onEvent(ev.events(), isPooledThread);
 294                     } catch (Error x) {
 295                         replaceMe = true; throw x;
 296                     } catch (RuntimeException x) {
 297                         replaceMe = true; throw x;
 298                     }
 299                 }
 300             } finally {
 301                 // last handler to exit when shutdown releases resources
 302                 int remaining = threadExit(this, replaceMe);
 303                 if (remaining == 0 && isShutdown()) {
 304                     implClose();
 305                 }
 306             }
 307         }
 308     }
 309 
 310     // -- Native methods --
 311 
 312     private static native void socketpair(int[] sv) throws IOException;
 313 
 314     private static native void interrupt(int fd) throws IOException;
 315 
 316     private static native void drain1(int fd) throws IOException;
 317 
 318     private static native void close0(int fd);
 319 
 320     static {
 321         IOUtil.load();
 322     }
 323 }


  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.spi.AsynchronousChannelProvider;
  29 import java.io.IOException;
  30 import java.util.concurrent.ArrayBlockingQueue;
  31 import java.util.concurrent.RejectedExecutionException;
  32 import java.util.concurrent.atomic.AtomicInteger;
  33 
  34 import static sun.nio.ch.EPoll.EPOLLIN;
  35 import static sun.nio.ch.EPoll.EPOLLONESHOT;
  36 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
  37 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
  38 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
  39 
  40 
  41 /**
  42  * AsynchronousChannelGroup implementation based on the Linux epoll facility.
  43  */
  44 
  45 final class EPollPort
  46     extends Port
  47 {
  48     // maximum number of events to poll at a time
  49     private static final int MAX_EPOLL_EVENTS = 512;
  50 
  51     // errors
  52     private static final int ENOENT     = 2;
  53 
  54     // epoll file descriptor
  55     private final int epfd;
  56 
  57     // address of the poll array passed to epoll_wait
  58     private final long address;
  59 
  60     // true if epoll closed
  61     private boolean closed;
  62 
  63     // socket pair used for wakeup
  64     private final int sp[];
  65 
  66     // number of wakeups pending
  67     private final AtomicInteger wakeupCount = new AtomicInteger();
  68 



  69     // encapsulates an event for a channel
  70     static class Event {
  71         final PollableChannel channel;
  72         final int events;
  73 
  74         Event(PollableChannel channel, int events) {
  75             this.channel = channel;
  76             this.events = events;
  77         }
  78 
  79         PollableChannel channel()   { return channel; }
  80         int events()                { return events; }
  81     }
  82 
  83     // queue of events for cases that a polling thread dequeues more than one
  84     // event
  85     private final ArrayBlockingQueue<Event> queue;
  86     private final Event NEED_TO_POLL = new Event(null, 0);
  87     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
  88 
  89     EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
  90         throws IOException
  91     {
  92         super(provider, pool);
  93 
  94         this.epfd = EPoll.create();
  95         this.address = EPoll.allocatePollArray(MAX_EPOLL_EVENTS);
  96 
  97         // create socket pair for wakeup mechanism

  98         try {
  99             long fds = IOUtil.makePipe(true);
 100             this.sp = new int[]{(int) (fds >>> 32), (int) fds};
 101         } catch (IOException ioe) {
 102             EPoll.freePollArray(address);
 103             FileDispatcherImpl.closeIntFD(epfd);
 104             throw ioe;
 105         }

 106 
 107         // register one end with epoll
 108         EPoll.ctl(epfd, EPOLL_CTL_ADD, sp[0], EPOLLIN);
 109 
 110         // create the queue and offer the special event to ensure that the first
 111         // threads polls
 112         this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
 113         this.queue.offer(NEED_TO_POLL);
 114     }
 115 
 116     EPollPort start() {
 117         startThreads(new EventHandlerTask());
 118         return this;
 119     }
 120 
 121     /**
 122      * Release all resources
 123      */
 124     private void implClose() {
 125         synchronized (this) {
 126             if (closed)
 127                 return;
 128             closed = true;
 129         }
 130         try { FileDispatcherImpl.closeIntFD(epfd); } catch (IOException ioe) { }
 131         try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
 132         try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
 133         EPoll.freePollArray(address);
 134     }
 135 
 136     private void wakeup() {
 137         if (wakeupCount.incrementAndGet() == 1) {
 138             // write byte to socketpair to force wakeup
 139             try {
 140                 IOUtil.write1(sp[1], (byte)0);
 141             } catch (IOException x) {
 142                 throw new AssertionError(x);
 143             }
 144         }
 145     }
 146 
 147     @Override
 148     void executeOnHandlerTask(Runnable task) {
 149         synchronized (this) {
 150             if (closed)
 151                 throw new RejectedExecutionException();
 152             offerTask(task);
 153             wakeup();
 154         }
 155     }
 156 
 157     @Override
 158     void shutdownHandlerTasks() {
 159         /*
 160          * If no tasks are running then just release resources; otherwise
 161          * write to the one end of the socketpair to wakeup any polling threads.
 162          */
 163         int nThreads = threadCount();
 164         if (nThreads == 0) {
 165             implClose();
 166         } else {
 167             // send interrupt to each thread
 168             while (nThreads-- > 0) {
 169                 wakeup();
 170             }
 171         }
 172     }
 173 
 174     // invoke by clients to register a file descriptor
 175     @Override
 176     void startPoll(int fd, int events) {
 177         // update events (or add to epoll on first usage)
 178         int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
 179         if (err == ENOENT)
 180             err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
 181         if (err != 0)
 182             throw new AssertionError();     // should not happen
 183     }
 184 
 185     /*
 186      * Task to process events from epoll and dispatch to the channel's
 187      * onEvent handler.
 188      *
 189      * Events are retreived from epoll in batch and offered to a BlockingQueue
 190      * where they are consumed by handler threads. A special "NEED_TO_POLL"
 191      * event is used to signal one consumer to re-poll when all events have
 192      * been consumed.
 193      */
 194     private class EventHandlerTask implements Runnable {
 195         private Event poll() throws IOException {
 196             try {
 197                 for (;;) {
 198                     int n;
 199                     do {
 200                         n = EPoll.wait(epfd, address, MAX_EPOLL_EVENTS, -1);
 201                     } while (n == IOStatus.INTERRUPTED);
 202 
 203                     /*
 204                      * 'n' events have been read. Here we map them to their
 205                      * corresponding channel in batch and queue n-1 so that
 206                      * they can be handled by other handler threads. The last
 207                      * event is handled by this thread (and so is not queued).
 208                      */
 209                     fdToChannelLock.readLock().lock();
 210                     try {
 211                         while (n-- > 0) {
 212                             long eventAddress = EPoll.getEvent(address, n);
 213                             int fd = EPoll.getDescriptor(eventAddress);
 214 
 215                             // wakeup
 216                             if (fd == sp[0]) {
 217                                 if (wakeupCount.decrementAndGet() == 0) {
 218                                     // no more wakeups so drain pipe
 219                                     IOUtil.drain(sp[0]);
 220                                 }
 221 
 222                                 // queue special event if there are more events
 223                                 // to handle.
 224                                 if (n > 0) {
 225                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
 226                                     continue;
 227                                 }
 228                                 return EXECUTE_TASK_OR_SHUTDOWN;
 229                             }
 230 
 231                             PollableChannel channel = fdToChannel.get(fd);
 232                             if (channel != null) {
 233                                 int events = EPoll.getEvents(eventAddress);
 234                                 Event ev = new Event(channel, events);
 235 
 236                                 // n-1 events are queued; This thread handles
 237                                 // the last one except for the wakeup
 238                                 if (n > 0) {
 239                                     queue.offer(ev);
 240                                 } else {
 241                                     return ev;
 242                                 }
 243                             }
 244                         }
 245                     } finally {
 246                         fdToChannelLock.readLock().unlock();
 247                     }
 248                 }
 249             } finally {
 250                 // to ensure that some thread will poll when all events have
 251                 // been consumed
 252                 queue.offer(NEED_TO_POLL);
 253             }


 297                     }
 298 
 299                     // process event
 300                     try {
 301                         ev.channel().onEvent(ev.events(), isPooledThread);
 302                     } catch (Error x) {
 303                         replaceMe = true; throw x;
 304                     } catch (RuntimeException x) {
 305                         replaceMe = true; throw x;
 306                     }
 307                 }
 308             } finally {
 309                 // last handler to exit when shutdown releases resources
 310                 int remaining = threadExit(this, replaceMe);
 311                 if (remaining == 0 && isShutdown()) {
 312                     implClose();
 313                 }
 314             }
 315         }
 316     }














 317 }
< prev index next >