< prev index next >

src/java.base/linux/classes/sun/nio/ch/EPollSelectorImpl.java

Print this page
rev 49271 : [mq]: selector-cleanup


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.IOException;
  29 import java.nio.channels.*;
  30 import java.nio.channels.spi.*;
  31 import java.util.*;















  32 
  33 /**
  34  * An implementation of Selector for Linux 2.6+ kernels that uses
  35  * the epoll event notification facility.
  36  */
  37 class EPollSelectorImpl
  38     extends SelectorImpl
  39 {
  40     // File descriptors used for interrupt









  41     private final int fd0;
  42     private final int fd1;
  43 
  44     // The poll object
  45     private final EPollArrayWrapper pollWrapper;
  46 
  47     // Maps from file descriptors to keys
  48     private final Map<Integer, SelectionKeyImpl> fdToKey;
  49 
  50     // True if this Selector has been closed
  51     private volatile boolean closed;



  52 
  53     // Lock for interrupt triggering and clearing
  54     private final Object interruptLock = new Object();
  55     private boolean interruptTriggered = false;
  56 
  57     /**
  58      * Package private constructor called by factory method in
  59      * the abstract superclass Selector.
  60      */
  61     EPollSelectorImpl(SelectorProvider sp) throws IOException {
  62         super(sp);
  63         long pipeFds = IOUtil.makePipe(false);
  64         fd0 = (int) (pipeFds >>> 32);
  65         fd1 = (int) pipeFds;
  66         try {
  67             pollWrapper = new EPollArrayWrapper(fd0, fd1);
  68             fdToKey = new HashMap<>();
  69         } catch (Throwable t) {
  70             try {
  71                 FileDispatcherImpl.closeIntFD(fd0);
  72             } catch (IOException ioe0) {
  73                 t.addSuppressed(ioe0);
  74             }
  75             try {
  76                 FileDispatcherImpl.closeIntFD(fd1);
  77             } catch (IOException ioe1) {
  78                 t.addSuppressed(ioe1);
  79             }
  80             throw t;


  81         }



  82     }
  83 
  84     private void ensureOpen() {
  85         if (closed)
  86             throw new ClosedSelectorException();
  87     }
  88 
  89     @Override
  90     protected int doSelect(long timeout) throws IOException {
  91         ensureOpen();

  92         int numEntries;

  93         processDeregisterQueue();
  94         try {
  95             begin();
  96             numEntries = pollWrapper.poll(timeout);


















  97         } finally {
  98             end();
  99         }
 100         processDeregisterQueue();
 101         return updateSelectedKeys(numEntries);
 102     }
 103 
 104     /**













































 105      * Update the keys whose fd's have been selected by the epoll.
 106      * Add the ready keys to the ready queue.
 107      */
 108     private int updateSelectedKeys(int numEntries) throws IOException {



 109         boolean interrupted = false;
 110         int numKeysUpdated = 0;
 111         for (int i=0; i<numEntries; i++) {
 112             int nextFD = pollWrapper.getDescriptor(i);
 113             if (nextFD == fd0) {

 114                 interrupted = true;
 115             } else {
 116                 SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
 117                 if (ski != null) {
 118                     int rOps = pollWrapper.getEventOps(i);
 119                     if (selectedKeys.contains(ski)) {
 120                         if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
 121                             numKeysUpdated++;
 122                         }
 123                     } else {
 124                         ski.channel.translateAndSetReadyOps(rOps, ski);
 125                         if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
 126                             selectedKeys.add(ski);
 127                             numKeysUpdated++;
 128                         }
 129                     }
 130                 }
 131             }
 132         }
 133 
 134         if (interrupted) {
 135             clearInterrupt();
 136         }
 137 
 138         return numKeysUpdated;
 139     }
 140 
 141     @Override
 142     protected void implClose() throws IOException {
 143         if (closed)
 144             return;
 145         closed = true;
 146 
 147         // prevent further wakeup
 148         synchronized (interruptLock) {
 149             interruptTriggered = true;
 150         }
 151 
 152         pollWrapper.close();


 153         FileDispatcherImpl.closeIntFD(fd0);
 154         FileDispatcherImpl.closeIntFD(fd1);
 155 
 156         // Deregister channels
 157         Iterator<SelectionKey> i = keys.iterator();
 158         while (i.hasNext()) {
 159             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
 160             deregister(ski);
 161             SelectableChannel selch = ski.channel();
 162             if (!selch.isOpen() && !selch.isRegistered())
 163                 ((SelChImpl)selch).kill();
 164             i.remove();
 165         }
 166     }
 167 
 168     @Override
 169     protected void implRegister(SelectionKeyImpl ski) {

 170         ensureOpen();
 171         SelChImpl ch = ski.channel;
 172         int fd = Integer.valueOf(ch.getFDVal());
 173         fdToKey.put(fd, ski);
 174         pollWrapper.add(fd);
 175         keys.add(ski);
 176     }
 177 
 178     @Override
 179     protected void implDereg(SelectionKeyImpl ski) throws IOException {
 180         assert (ski.getIndex() >= 0);
 181         SelChImpl ch = ski.channel;
 182         int fd = ch.getFDVal();
 183         fdToKey.remove(Integer.valueOf(fd));
 184         pollWrapper.remove(fd);
 185         ski.setIndex(-1);
 186         keys.remove(ski);





 187         selectedKeys.remove(ski);



 188         deregister(ski);

 189         SelectableChannel selch = ski.channel();
 190         if (!selch.isOpen() && !selch.isRegistered())
 191             ((SelChImpl)selch).kill();
 192     }
 193 
 194     @Override
 195     public void putEventOps(SelectionKeyImpl ski, int ops) {
 196         ensureOpen();
 197         SelChImpl ch = ski.channel;
 198         pollWrapper.setInterest(ch.getFDVal(), ops);


 199     }
 200 
 201     @Override
 202     public Selector wakeup() {
 203         synchronized (interruptLock) {
 204             if (!interruptTriggered) {
 205                 pollWrapper.interrupt();




 206                 interruptTriggered = true;
 207             }
 208         }
 209         return this;
 210     }
 211 
 212     private void clearInterrupt() throws IOException {
 213         synchronized (interruptLock) {
 214             IOUtil.drain(fd0);
 215             interruptTriggered = false;
 216         }
 217     }
 218 }


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.IOException;
  29 import java.nio.channels.ClosedSelectorException;
  30 import java.nio.channels.SelectableChannel;
  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.Selector;
  33 import java.nio.channels.spi.SelectorProvider;
  34 import java.util.ArrayDeque;
  35 import java.util.BitSet;
  36 import java.util.Deque;
  37 import java.util.HashMap;
  38 import java.util.Iterator;
  39 import java.util.Map;
  40 import java.util.concurrent.TimeUnit;
  41 
  42 import static sun.nio.ch.EPoll.EPOLLIN;
  43 import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
  44 import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
  45 import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
  46 
  47 
  48 /**
  49  * Linux epoll based Selector implementation

  50  */
  51 
  52 class EPollSelectorImpl extends SelectorImpl {
  53 
  54     // maximum number of events to poll in one call to epoll_wait
  55     private static final int NUM_EPOLLEVENTS = Math.min(IOUtil.fdLimit(), 1024);
  56 
  57     // epoll file descriptor
  58     private final int epfd;
  59 
  60     // address of poll array when polling with epoll_wait
  61     private final long pollArrayAddress;
  62 
  63     // file descriptors used for interrupt
  64     private final int fd0;
  65     private final int fd1;
  66 
  67     // maps file descriptor to selection key, synchronize on selector
  68     private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
  69 
  70     // file descriptors registered with epoll, synchronize on selector
  71     private final BitSet registered = new BitSet();
  72 
  73     // pending new registrations/updates, queued by implRegister and putEventOps
  74     private final Object updateLock = new Object();
  75     private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
  76     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
  77     private final Deque<Integer> updateOps = new ArrayDeque<>();
  78 
  79     // interrupt triggering and clearing
  80     private final Object interruptLock = new Object();
  81     private boolean interruptTriggered;
  82 
  83     /**
  84      * Package private constructor called by factory method in
  85      * the abstract superclass Selector.
  86      */
  87     EPollSelectorImpl(SelectorProvider sp) throws IOException {
  88         super(sp);
  89 
  90         this.epfd = EPoll.create();
  91         this.pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS);
  92 








  93         try {
  94             long fds = IOUtil.makePipe(false);
  95             this.fd0 = (int) (fds >>> 32);
  96             this.fd1 = (int) fds;
  97         } catch (IOException ioe) {
  98             EPoll.freePollArray(pollArrayAddress);
  99             FileDispatcherImpl.closeIntFD(epfd);
 100             throw ioe;
 101         }
 102 
 103         // register one end of the socket pair for wakeups
 104         EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
 105     }
 106 
 107     private void ensureOpen() {
 108         if (!isOpen())
 109             throw new ClosedSelectorException();
 110     }
 111 
 112     @Override
 113     protected int doSelect(long timeout) throws IOException {
 114         assert Thread.holdsLock(this);
 115 
 116         int numEntries;
 117         processUpdateQueue();
 118         processDeregisterQueue();
 119         try {
 120             begin();
 121 
 122             // epoll_wait timeout is int
 123             int to = (int) Math.min(timeout, Integer.MAX_VALUE);
 124             boolean timedPoll = (to > 0);
 125             do {
 126                 long startTime = timedPoll ? System.nanoTime() : 0;
 127                 numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
 128                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
 129                     // timed poll interrupted so need to adjust timeout
 130                     long adjust = System.nanoTime() - startTime;
 131                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
 132                     if (to <= 0) {
 133                         // timeout expired so no retry
 134                         numEntries = 0;
 135                     }
 136                 }
 137             } while (numEntries == IOStatus.INTERRUPTED);
 138             assert IOStatus.check(numEntries);
 139 
 140         } finally {
 141             end();
 142         }
 143         processDeregisterQueue();
 144         return updateSelectedKeys(numEntries);
 145     }
 146 
 147     /**
 148      * Process new registrations and changes to the interest ops.
 149      */
 150     private void processUpdateQueue() {
 151         assert Thread.holdsLock(this);
 152 
 153         synchronized (updateLock) {
 154             SelectionKeyImpl ski;
 155 
 156             // new registrations
 157             while ((ski = newKeys.pollFirst()) != null) {
 158                 if (ski.isValid()) {
 159                     SelChImpl ch = ski.channel;
 160                     int fd = ch.getFDVal();
 161                     SelectionKeyImpl previous = fdToKey.put(fd, ski);
 162                     assert previous == null;
 163                     assert registered.get(fd) == false;
 164                 }
 165             }
 166 
 167             // changes to interest ops
 168             assert updateKeys.size() == updateOps.size();
 169             while ((ski = updateKeys.pollFirst()) != null) {
 170                 int ops = updateOps.pollFirst();
 171                 int fd = ski.channel.getFDVal();
 172                 if (ski.isValid() && fdToKey.containsKey(fd)) {
 173                     if (registered.get(fd)) {
 174                         if (ops == 0) {
 175                             // remove from epoll
 176                             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
 177                             registered.clear(fd);
 178                         } else {
 179                             // modify events
 180                             EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, ops);
 181                         }
 182                     } else if (ops != 0) {
 183                         // add to epoll
 184                         EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, ops);
 185                         registered.set(fd);
 186                     }
 187                 }
 188             }
 189         }
 190     }
 191 
 192     /**
 193      * Update the keys whose fd's have been selected by the epoll.
 194      * Add the ready keys to the ready queue.
 195      */
 196     private int updateSelectedKeys(int numEntries) throws IOException {
 197         assert Thread.holdsLock(this);
 198         assert Thread.holdsLock(nioSelectedKeys());
 199 
 200         boolean interrupted = false;
 201         int numKeysUpdated = 0;
 202         for (int i=0; i<numEntries; i++) {
 203             long event = EPoll.getEvent(pollArrayAddress, i);
 204             int fd = EPoll.getDescriptor(event);
 205             if (fd == fd0) {
 206                 interrupted = true;
 207             } else {
 208                 SelectionKeyImpl ski = fdToKey.get(fd);
 209                 if (ski != null) {
 210                     int rOps = EPoll.getEvents(event);
 211                     if (selectedKeys.contains(ski)) {
 212                         if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
 213                             numKeysUpdated++;
 214                         }
 215                     } else {
 216                         ski.channel.translateAndSetReadyOps(rOps, ski);
 217                         if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
 218                             selectedKeys.add(ski);
 219                             numKeysUpdated++;
 220                         }
 221                     }
 222                 }
 223             }
 224         }
 225 
 226         if (interrupted) {
 227             clearInterrupt();
 228         }
 229 
 230         return numKeysUpdated;
 231     }
 232 
 233     @Override
 234     protected void implClose() throws IOException {
 235         assert Thread.holdsLock(this);
 236         assert Thread.holdsLock(nioKeys());

 237 
 238         // prevent further wakeup
 239         synchronized (interruptLock) {
 240             interruptTriggered = true;
 241         }
 242 
 243         FileDispatcherImpl.closeIntFD(epfd);
 244         EPoll.freePollArray(pollArrayAddress);
 245 
 246         FileDispatcherImpl.closeIntFD(fd0);
 247         FileDispatcherImpl.closeIntFD(fd1);
 248 
 249         // Deregister channels
 250         Iterator<SelectionKey> i = keys.iterator();
 251         while (i.hasNext()) {
 252             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
 253             deregister(ski);
 254             SelectableChannel selch = ski.channel();
 255             if (!selch.isOpen() && !selch.isRegistered())
 256                 ((SelChImpl)selch).kill();
 257             i.remove();
 258         }
 259     }
 260 
 261     @Override
 262     protected void implRegister(SelectionKeyImpl ski) {
 263         assert Thread.holdsLock(nioKeys());
 264         ensureOpen();
 265         synchronized (updateLock) {
 266             newKeys.addLast(ski);
 267         }

 268         keys.add(ski);
 269     }
 270 
 271     @Override
 272     protected void implDereg(SelectionKeyImpl ski) throws IOException {
 273         assert !ski.isValid();
 274         assert Thread.holdsLock(this);
 275         assert Thread.holdsLock(nioKeys());
 276         assert Thread.holdsLock(nioSelectedKeys());
 277 
 278         int fd = ski.channel.getFDVal();
 279         fdToKey.remove(fd);
 280         if (registered.get(fd)) {
 281             EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
 282             registered.clear(fd);
 283         }
 284 
 285         selectedKeys.remove(ski);
 286         keys.remove(ski);
 287 
 288         // remove from channel's key set
 289         deregister(ski);
 290 
 291         SelectableChannel selch = ski.channel();
 292         if (!selch.isOpen() && !selch.isRegistered())
 293             ((SelChImpl) selch).kill();
 294     }
 295 
 296     @Override
 297     public void putEventOps(SelectionKeyImpl ski, int ops) {
 298         ensureOpen();
 299         synchronized (updateLock) {
 300             updateOps.addLast(ops);   // ops first in case adding the key fails
 301             updateKeys.addLast(ski);
 302         }
 303     }
 304 
 305     @Override
 306     public Selector wakeup() {
 307         synchronized (interruptLock) {
 308             if (!interruptTriggered) {
 309                 try {
 310                     IOUtil.write1(fd1, (byte)0);
 311                 } catch (IOException ioe) {
 312                     throw new InternalError(ioe);
 313                 }
 314                 interruptTriggered = true;
 315             }
 316         }
 317         return this;
 318     }
 319 
 320     private void clearInterrupt() throws IOException {
 321         synchronized (interruptLock) {
 322             IOUtil.drain(fd0);
 323             interruptTriggered = false;
 324         }
 325     }
 326 }
< prev index next >