< prev index next >

src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java

Print this page




  25 package sun.nio.ch;
  26 
  27 import java.io.IOException;
  28 import java.nio.channels.ClosedSelectorException;
  29 import java.nio.channels.SelectionKey;
  30 import java.nio.channels.Selector;
  31 import java.nio.channels.spi.SelectorProvider;
  32 import java.util.ArrayDeque;
  33 import java.util.ArrayList;
  34 import java.util.Deque;
  35 import java.util.List;
  36 import java.util.concurrent.TimeUnit;
  37 import java.util.function.Consumer;
  38 
  39 import jdk.internal.misc.Unsafe;
  40 
  41 /**
  42  * Selector implementation based on poll
  43  */
  44 
  45 class PollSelectorImpl extends SelectorImpl {
  46 
  47     // initial capacity of poll array
  48     private static final int INITIAL_CAPACITY = 16;
  49 
  50     // poll array, grows as needed
  51     private int pollArrayCapacity = INITIAL_CAPACITY;
  52     private int pollArraySize;
  53     private AllocatedNativeObject pollArray;
  54 
  55     // file descriptors used for interrupt
  56     private final int fd0;
  57     private final int fd1;
  58 
  59     // keys for file descriptors in poll array, synchronize on selector
  60     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
  61 
  62     // pending updates, queued by putEventOps
  63     private final Object updateLock = new Object();
  64     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
  65 
  66     // interrupt triggering and clearing
  67     private final Object interruptLock = new Object();
  68     private boolean interruptTriggered;
  69 
  70     PollSelectorImpl(SelectorProvider sp) throws IOException {
  71         super(sp);
  72 
  73         int size = pollArrayCapacity * SIZE_POLLFD;
  74         this.pollArray = new AllocatedNativeObject(size, false);
  75 
  76         try {
  77             long fds = IOUtil.makePipe(false);
  78             this.fd0 = (int) (fds >>> 32);
  79             this.fd1 = (int) fds;
  80         } catch (IOException ioe) {
  81             pollArray.free();
  82             throw ioe;
  83         }
  84 
  85         // wakeup support
  86         synchronized (this) {
  87             setFirst(fd0, Net.POLLIN);
  88         }
  89     }
  90 


 116                     // timed poll interrupted so need to adjust timeout
 117                     long adjust = System.nanoTime() - startTime;
 118                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
 119                     if (to <= 0) {
 120                         // timeout expired so no retry
 121                         numPolled = 0;
 122                     }
 123                 }
 124             } while (numPolled == IOStatus.INTERRUPTED);
 125             assert numPolled <= pollArraySize;
 126 
 127         } finally {
 128             end(blocking);
 129         }
 130 
 131         processDeregisterQueue();
 132         return processEvents(action);
 133     }
 134 
 135     /**








 136      * Process changes to the interest ops.
 137      */
 138     private void processUpdateQueue() {
 139         assert Thread.holdsLock(this);
 140 
 141         synchronized (updateLock) {
 142             SelectionKeyImpl ski;
 143             while ((ski = updateKeys.pollFirst()) != null) {
 144                 int newEvents = ski.translateInterestOps();
 145                 if (ski.isValid()) {
 146                     int index = ski.getIndex();
 147                     assert index >= 0 && index < pollArraySize;
 148                     if (index > 0) {
 149                         assert pollKeys.get(index) == ski;
 150                         if (newEvents == 0) {
 151                             remove(ski);
 152                         } else {
 153                             update(ski, newEvents);
 154                         }
 155                     } else if (newEvents != 0) {


 360     private void putEventOps(int i, int event) {
 361         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
 362         pollArray.putShort(offset, (short)event);
 363     }
 364 
 365     private int getEventOps(int i) {
 366         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
 367         return pollArray.getShort(offset);
 368     }
 369 
 370     private void putReventOps(int i, int revent) {
 371         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
 372         pollArray.putShort(offset, (short)revent);
 373     }
 374 
 375     private int getReventOps(int i) {
 376         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
 377         return pollArray.getShort(offset);
 378     }
 379 
 380     private static native int poll(long pollAddress, int numfds, int timeout);
 381 
 382     static {
 383         IOUtil.load();
 384     }
 385 }


  25 package sun.nio.ch;
  26 
  27 import java.io.IOException;
  28 import java.nio.channels.ClosedSelectorException;
  29 import java.nio.channels.SelectionKey;
  30 import java.nio.channels.Selector;
  31 import java.nio.channels.spi.SelectorProvider;
  32 import java.util.ArrayDeque;
  33 import java.util.ArrayList;
  34 import java.util.Deque;
  35 import java.util.List;
  36 import java.util.concurrent.TimeUnit;
  37 import java.util.function.Consumer;
  38 
  39 import jdk.internal.misc.Unsafe;
  40 
  41 /**
  42  * Selector implementation based on poll
  43  */
  44 
  45 public class PollSelectorImpl extends SelectorImpl {
  46 
  47     // initial capacity of poll array
  48     private static final int INITIAL_CAPACITY = 16;
  49 
  50     // poll array, grows as needed
  51     private int pollArrayCapacity = INITIAL_CAPACITY;
  52     private int pollArraySize;
  53     private AllocatedNativeObject pollArray;
  54 
  55     // file descriptors used for interrupt
  56     private final int fd0;
  57     private final int fd1;
  58 
  59     // keys for file descriptors in poll array, synchronize on selector
  60     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
  61 
  62     // pending updates, queued by putEventOps
  63     private final Object updateLock = new Object();
  64     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
  65 
  66     // interrupt triggering and clearing
  67     private final Object interruptLock = new Object();
  68     private boolean interruptTriggered;
  69 
  70     protected PollSelectorImpl(SelectorProvider sp) throws IOException {
  71         super(sp);
  72 
  73         int size = pollArrayCapacity * SIZE_POLLFD;
  74         this.pollArray = new AllocatedNativeObject(size, false);
  75 
  76         try {
  77             long fds = IOUtil.makePipe(false);
  78             this.fd0 = (int) (fds >>> 32);
  79             this.fd1 = (int) fds;
  80         } catch (IOException ioe) {
  81             pollArray.free();
  82             throw ioe;
  83         }
  84 
  85         // wakeup support
  86         synchronized (this) {
  87             setFirst(fd0, Net.POLLIN);
  88         }
  89     }
  90 


 116                     // timed poll interrupted so need to adjust timeout
 117                     long adjust = System.nanoTime() - startTime;
 118                     to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
 119                     if (to <= 0) {
 120                         // timeout expired so no retry
 121                         numPolled = 0;
 122                     }
 123                 }
 124             } while (numPolled == IOStatus.INTERRUPTED);
 125             assert numPolled <= pollArraySize;
 126 
 127         } finally {
 128             end(blocking);
 129         }
 130 
 131         processDeregisterQueue();
 132         return processEvents(action);
 133     }
 134 
 135     /**
 136      * Protected poll method allows different platform-specific
 137      * native implementations
 138      */
 139     protected int poll(long pollAddress, int numfds, int timeout) {
 140         return poll0(pollAddress, numfds, timeout);
 141     }
 142 
 143     /**
 144      * Process changes to the interest ops.
 145      */
 146     private void processUpdateQueue() {
 147         assert Thread.holdsLock(this);
 148 
 149         synchronized (updateLock) {
 150             SelectionKeyImpl ski;
 151             while ((ski = updateKeys.pollFirst()) != null) {
 152                 int newEvents = ski.translateInterestOps();
 153                 if (ski.isValid()) {
 154                     int index = ski.getIndex();
 155                     assert index >= 0 && index < pollArraySize;
 156                     if (index > 0) {
 157                         assert pollKeys.get(index) == ski;
 158                         if (newEvents == 0) {
 159                             remove(ski);
 160                         } else {
 161                             update(ski, newEvents);
 162                         }
 163                     } else if (newEvents != 0) {


 368     private void putEventOps(int i, int event) {
 369         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
 370         pollArray.putShort(offset, (short)event);
 371     }
 372 
 373     private int getEventOps(int i) {
 374         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
 375         return pollArray.getShort(offset);
 376     }
 377 
 378     private void putReventOps(int i, int revent) {
 379         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
 380         pollArray.putShort(offset, (short)revent);
 381     }
 382 
 383     private int getReventOps(int i) {
 384         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
 385         return pollArray.getShort(offset);
 386     }
 387 
 388     private static native int poll0(long pollAddress, int numfds, int timeout);
 389 
 390     static {
 391         IOUtil.load();
 392     }
 393 }
< prev index next >