< prev index next >

src/java.base/macosx/classes/sun/nio/ch/KQueueSelectorImpl.java

Print this page
rev 50580 : [mq]: select-with-consumer


  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.IOException;
  29 import java.nio.channels.ClosedSelectorException;

  30 import java.nio.channels.Selector;
  31 import java.nio.channels.spi.SelectorProvider;
  32 import java.util.ArrayDeque;
  33 import java.util.Deque;
  34 import java.util.HashMap;
  35 import java.util.Map;
  36 import java.util.concurrent.TimeUnit;

  37 
  38 import static sun.nio.ch.KQueue.EVFILT_READ;
  39 import static sun.nio.ch.KQueue.EVFILT_WRITE;
  40 import static sun.nio.ch.KQueue.EV_ADD;
  41 import static sun.nio.ch.KQueue.EV_DELETE;
  42 
  43 /**
  44  * KQueue based Selector implementation for macOS
  45  */
  46 
  47 class KQueueSelectorImpl extends SelectorImpl {
  48 
  49     // maximum number of events to poll in one call to kqueue
  50     private static final int MAX_KEVENTS = 256;
  51 
  52     // kqueue file descriptor
  53     private final int kqfd;
  54 
  55     // address of poll array (event list) when polling for pending events
  56     private final long pollArrayAddress;


  83         try {
  84             long fds = IOUtil.makePipe(false);
  85             this.fd0 = (int) (fds >>> 32);
  86             this.fd1 = (int) fds;
  87         } catch (IOException ioe) {
  88             KQueue.freePollArray(pollArrayAddress);
  89             FileDispatcherImpl.closeIntFD(kqfd);
  90             throw ioe;
  91         }
  92 
  93         // register one end of the socket pair for wakeups
  94         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
  95     }
  96 
  97     private void ensureOpen() {
  98         if (!isOpen())
  99             throw new ClosedSelectorException();
 100     }
 101 
 102     @Override
 103     protected int doSelect(long timeout) throws IOException {


 104         assert Thread.holdsLock(this);
 105 
 106         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
 107         boolean blocking = (to != 0);
 108         boolean timedPoll = (to > 0);
 109 
 110         int numEntries;
 111         processUpdateQueue();
 112         processDeregisterQueue();
 113         try {
 114             begin(blocking);
 115 
 116             do {
 117                 long startTime = timedPoll ? System.nanoTime() : 0;
 118                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
 119                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
 120                     // timed poll interrupted so need to adjust timeout
 121                     long adjust = System.nanoTime() - startTime;
 122                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
 123                     if (to <= 0) {
 124                         // timeout expired so no retry
 125                         numEntries = 0;
 126                     }
 127                 }
 128             } while (numEntries == IOStatus.INTERRUPTED);
 129             assert IOStatus.check(numEntries);
 130 
 131         } finally {
 132             end(blocking);
 133         }
 134         processDeregisterQueue();
 135         return updateSelectedKeys(numEntries);
 136     }
 137 
 138     /**
 139      * Process changes to the interest ops.
 140      */
 141     private void processUpdateQueue() {
 142         assert Thread.holdsLock(this);
 143 
 144         synchronized (updateLock) {
 145             SelectionKeyImpl ski;
 146             while ((ski = updateKeys.pollFirst()) != null) {
 147                 if (ski.isValid()) {
 148                     int fd = ski.getFDVal();
 149                     // add to fdToKey if needed
 150                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
 151                     assert (previous == null) || (previous == ski);
 152 
 153                     int newEvents = ski.translateInterestOps();
 154                     int registeredEvents = ski.registeredEvents();
 155                     if (newEvents != registeredEvents) {


 163                             KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
 164                         }
 165 
 166                         // add or delete interest in write events
 167                         if ((registeredEvents & Net.POLLOUT) != 0) {
 168                             if ((newEvents & Net.POLLOUT) == 0) {
 169                                 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
 170                             }
 171                         } else if ((newEvents & Net.POLLOUT) != 0) {
 172                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
 173                         }
 174 
 175                         ski.registeredEvents(newEvents);
 176                     }
 177                 }
 178             }
 179         }
 180     }
 181 
 182     /**
 183      * Update the keys of file descriptors that were polled and add them to
 184      * the selected-key set.
 185      * If the interrupt fd has been selected, drain it and clear the interrupt.
 186      */
 187     private int updateSelectedKeys(int numEntries) throws IOException {


 188         assert Thread.holdsLock(this);
 189         assert Thread.holdsLock(nioSelectedKeys());
 190 
 191         int numKeysUpdated = 0;
 192         boolean interrupted = false;
 193 
 194         // A file descriptor may be registered with kqueue with more than one
 195         // filter and so there may be more than one event for a fd. The poll
 196         // count is incremented here and compared against the SelectionKey's
 197         // "lastPolled" field. This ensures that the ready ops is updated rather
 198         // than replaced when a file descriptor is polled by both the read and
 199         // write filter.
 200         pollCount++;
 201 
 202         for (int i = 0; i < numEntries; i++) {
 203             long kevent = KQueue.getEvent(pollArrayAddress, i);
 204             int fd = KQueue.getDescriptor(kevent);
 205             if (fd == fd0) {
 206                 interrupted = true;
 207             } else {
 208                 SelectionKeyImpl ski = fdToKey.get(fd);
 209                 if (ski != null) {
 210                     int rOps = 0;
 211                     short filter = KQueue.getFilter(kevent);
 212                     if (filter == EVFILT_READ) {
 213                         rOps |= Net.POLLIN;
 214                     } else if (filter == EVFILT_WRITE) {
 215                         rOps |= Net.POLLOUT;
 216                     }
 217 
 218                     if (selectedKeys.contains(ski)) {
 219                         if (ski.translateAndUpdateReadyOps(rOps)) {
 220                             // file descriptor may be polled more than once per poll
 221                             if (ski.lastPolled != pollCount) {
 222                                 numKeysUpdated++;
 223                                 ski.lastPolled = pollCount;
 224                             }
 225                         }
 226                     } else {
 227                         ski.translateAndSetReadyOps(rOps);
 228                         if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
 229                             selectedKeys.add(ski);
 230                             numKeysUpdated++;
 231                             ski.lastPolled = pollCount;
 232                         }
 233                     }
 234                 }
 235             }
 236         }
 237 
 238         if (interrupted) {
 239             clearInterrupt();
 240         }
 241         return numKeysUpdated;
 242     }
 243 
 244     @Override
 245     protected void implClose() throws IOException {
 246         assert !isOpen();
 247         assert Thread.holdsLock(this);
 248 
 249         // prevent further wakeup
 250         synchronized (interruptLock) {
 251             interruptTriggered = true;
 252         }
 253 
 254         FileDispatcherImpl.closeIntFD(kqfd);




  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.IOException;
  29 import java.nio.channels.ClosedSelectorException;
  30 import java.nio.channels.SelectionKey;
  31 import java.nio.channels.Selector;
  32 import java.nio.channels.spi.SelectorProvider;
  33 import java.util.ArrayDeque;
  34 import java.util.Deque;
  35 import java.util.HashMap;
  36 import java.util.Map;
  37 import java.util.concurrent.TimeUnit;
  38 import java.util.function.Consumer;
  39 
  40 import static sun.nio.ch.KQueue.EVFILT_READ;
  41 import static sun.nio.ch.KQueue.EVFILT_WRITE;
  42 import static sun.nio.ch.KQueue.EV_ADD;
  43 import static sun.nio.ch.KQueue.EV_DELETE;
  44 
  45 /**
  46  * KQueue based Selector implementation for macOS
  47  */
  48 
  49 class KQueueSelectorImpl extends SelectorImpl {
  50 
  51     // maximum number of events to poll in one call to kqueue
  52     private static final int MAX_KEVENTS = 256;
  53 
  54     // kqueue file descriptor
  55     private final int kqfd;
  56 
  57     // address of poll array (event list) when polling for pending events
  58     private final long pollArrayAddress;


  85         try {
  86             long fds = IOUtil.makePipe(false);
  87             this.fd0 = (int) (fds >>> 32);
  88             this.fd1 = (int) fds;
  89         } catch (IOException ioe) {
  90             KQueue.freePollArray(pollArrayAddress);
  91             FileDispatcherImpl.closeIntFD(kqfd);
  92             throw ioe;
  93         }
  94 
  95         // register one end of the socket pair for wakeups
  96         KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
  97     }
  98 
  99     private void ensureOpen() {
 100         if (!isOpen())
 101             throw new ClosedSelectorException();
 102     }
 103 
 104     @Override
 105     protected int doSelect(Consumer<SelectionKey> action, long timeout)
 106         throws IOException
 107     {
 108         assert Thread.holdsLock(this);
 109 
 110         long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
 111         boolean blocking = (to != 0);
 112         boolean timedPoll = (to > 0);
 113 
 114         int numEntries;
 115         processUpdateQueue();
 116         processDeregisterQueue();
 117         try {
 118             begin(blocking);
 119 
 120             do {
 121                 long startTime = timedPoll ? System.nanoTime() : 0;
 122                 numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
 123                 if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
 124                     // timed poll interrupted so need to adjust timeout
 125                     long adjust = System.nanoTime() - startTime;
 126                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
 127                     if (to <= 0) {
 128                         // timeout expired so no retry
 129                         numEntries = 0;
 130                     }
 131                 }
 132             } while (numEntries == IOStatus.INTERRUPTED);
 133             assert IOStatus.check(numEntries);
 134 
 135         } finally {
 136             end(blocking);
 137         }
 138         processDeregisterQueue();
 139         return processEvents(numEntries, action);
 140     }
 141 
 142     /**
 143      * Process changes to the interest ops.
 144      */
 145     private void processUpdateQueue() {
 146         assert Thread.holdsLock(this);
 147 
 148         synchronized (updateLock) {
 149             SelectionKeyImpl ski;
 150             while ((ski = updateKeys.pollFirst()) != null) {
 151                 if (ski.isValid()) {
 152                     int fd = ski.getFDVal();
 153                     // add to fdToKey if needed
 154                     SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
 155                     assert (previous == null) || (previous == ski);
 156 
 157                     int newEvents = ski.translateInterestOps();
 158                     int registeredEvents = ski.registeredEvents();
 159                     if (newEvents != registeredEvents) {


 167                             KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
 168                         }
 169 
 170                         // add or delete interest in write events
 171                         if ((registeredEvents & Net.POLLOUT) != 0) {
 172                             if ((newEvents & Net.POLLOUT) == 0) {
 173                                 KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
 174                             }
 175                         } else if ((newEvents & Net.POLLOUT) != 0) {
 176                             KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
 177                         }
 178 
 179                         ski.registeredEvents(newEvents);
 180                     }
 181                 }
 182             }
 183         }
 184     }
 185 
 186     /**
 187      * Process the polled events.

 188      * If the interrupt fd has been selected, drain it and clear the interrupt.
 189      */
 190     private int processEvents(int numEntries, Consumer<SelectionKey> action)
 191         throws IOException
 192     {
 193         assert Thread.holdsLock(this);

 194 
 195         int numKeysUpdated = 0;
 196         boolean interrupted = false;
 197 
 198         // A file descriptor may be registered with kqueue with more than one
 199         // filter and so there may be more than one event for a fd. The poll
 200         // count is incremented here and compared against the SelectionKey's
 201         // "lastPolled" field. This ensures that the ready ops is updated rather
 202         // than replaced when a file descriptor is polled by both the read and
 203         // write filter.
 204         pollCount++;
 205 
 206         for (int i = 0; i < numEntries; i++) {
 207             long kevent = KQueue.getEvent(pollArrayAddress, i);
 208             int fd = KQueue.getDescriptor(kevent);
 209             if (fd == fd0) {
 210                 interrupted = true;
 211             } else {
 212                 SelectionKeyImpl ski = fdToKey.get(fd);
 213                 if (ski != null) {
 214                     int rOps = 0;
 215                     short filter = KQueue.getFilter(kevent);
 216                     if (filter == EVFILT_READ) {
 217                         rOps |= Net.POLLIN;
 218                     } else if (filter == EVFILT_WRITE) {
 219                         rOps |= Net.POLLOUT;
 220                     }
 221                     int updated = processReadyEvents(rOps, ski, action);
 222                     if (updated > 0 && ski.lastPolled != pollCount) {



 223                         numKeysUpdated++;
 224                         ski.lastPolled = pollCount;
 225                     }
 226                 }









 227             }
 228         }
 229 
 230         if (interrupted) {
 231             clearInterrupt();
 232         }
 233         return numKeysUpdated;
 234     }
 235 
 236     @Override
 237     protected void implClose() throws IOException {
 238         assert !isOpen();
 239         assert Thread.holdsLock(this);
 240 
 241         // prevent further wakeup
 242         synchronized (interruptLock) {
 243             interruptTriggered = true;
 244         }
 245 
 246         FileDispatcherImpl.closeIntFD(kqfd);


< prev index next >