< prev index next >

src/java.base/share/classes/java/nio/channels/Selector.java

Print this page
rev 50580 : [mq]: select-with-consumer

@@ -26,11 +26,13 @@
 package java.nio.channels;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.channels.spi.SelectorProvider;
+import java.util.Objects;
 import java.util.Set;
+import java.util.function.Consumer;
 
 
 /**
  * A multiplexor of {@link SelectableChannel} objects.
  *

@@ -54,11 +56,12 @@
  *   channel registrations of this selector.  This set is returned by the
  *   {@link #keys() keys} method. </p></li>
  *
  *   <li><p> The <i>selected-key set</i> is the set of keys such that each
  *   key's channel was detected to be ready for at least one of the operations
- *   identified in the key's interest set during a prior selection operation.
+ *   identified in the key's interest set during a prior selection operation
+ *   that adds keys or updates keys in the set.
  *   This set is returned by the {@link #selectedKeys() selectedKeys} method.
  *   The selected-key set is always a subset of the key set. </p></li>
  *
  *   <li><p> The <i>cancelled-key</i> set is the set of keys that have been
  *   cancelled but whose channels have not yet been deregistered.  This set is

@@ -90,10 +93,31 @@
  * to the selected-key set. </p>
  *
  * <a id="selop"></a>
  * <h2>Selection</h2>
  *
+ * <p> A selection operation queries the underlying operating system for an
+ * update as to the readiness of each registered channel to perform any of the
+ * operations identified by its key's interest set.  There are two forms of
+ * selection operation:
+ *
+ * <ol>
+ *
+ *   <li><p> The {@link #select()}, {@link #select(long)}, and {@link #selectNow()}
+ *   methods add the keys of channels ready to perform an operation to the
+ *   selected-key set, or update the ready-operation set of keys already in the
+ *   selected-key set. </p></li>
+ *
+ *   <li><p> The {@link #select(Consumer)}, {@link #select(Consumer, long)}, and
+ *   {@link #selectNow(Consumer)} methods perform an <i>action</i> on the key
+ *   of each channel that is ready to perform an operation.  These methods do
+ *   not add to the selected-key set. </p></li>
+ *
+ * </ol>
+ *
+ * <h3>Selection operations that add to the selected-key set</h3>
+ *
  * <p> During each selection operation, keys may be added to and removed from a
  * selector's selected-key set and may be removed from its key and
  * cancelled-key sets.  Selection is performed by the {@link #select()}, {@link
  * #select(long)}, and {@link #selectNow()} methods, and involves three steps:
  * </p>

@@ -139,10 +163,49 @@
  * <p> Whether or not a selection operation blocks to wait for one or more
  * channels to become ready, and if so for how long, is the only essential
  * difference between the three selection methods. </p>
  *
  *
+ * <h3>Selection operations that perform an action on selected keys</h3>
+ *
+ * <p> During each selection operation, keys may be removed from the selector's
+ * key, selected-key, and cancelled-key sets.  Selection is performed by the
+ * {@link #select(Consumer)}, {@link #select(Consumer,long)}, and {@link
+ * #selectNow(Consumer)} methods, and involves three steps:  </p>
+ *
+ * <ol>
+ *
+ *   <li><p> Each key in the cancelled-key set is removed from each key set of
+ *   which it is a member, and its channel is deregistered.  This step leaves
+ *   the cancelled-key set empty. </p></li>
+ *
+ *   <li><p> The underlying operating system is queried for an update as to the
+ *   readiness of each remaining channel to perform any of the operations
+ *   identified by its key's interest set as of the moment that the selection
+ *   operation began.
+ *
+ *   <p> For a channel that is ready for at least one such operation, the
+ *   ready-operation set of the channel's key is set to identify exactly the
+ *   operations for which the channel is ready and the <i>action</i> specified
+ *   to the {@code select} method is invoked to consume the channel's key.  Any
+ *   readiness information previously recorded in the ready set is discarded
+ *   prior to invoking the <i>action</i>.
+ *
+ *   <p> Alternatively, where a channel is ready for more than one operation,
+ *   the <i>action</i> may be invoked more than once with the channel's key and
+ *   ready-operation set modified to a subset of the operations for which the
+ *   channel is ready.  Where the <i>action</i> is invoked more than once for
+ *   the same key then its ready-operation set never contains operation bits
+ *   that were contained in the set at previous calls to the <i>action</i>
+ *   in the same selection operation.  </p></li>
+ *
+ *   <li><p> If any keys were added to the cancelled-key set while step (2) was
+ *   in progress then they are processed as in step (1). </p></li>
+ *
+ * </ol>
+ *
+ *
  * <h2>Concurrency</h2>
  *
  * <p> A Selector and its key set are safe for use by multiple concurrent
  * threads.  Its selected-key set and cancelled-key set, however, are not.
  *

@@ -158,13 +221,12 @@
  * presence of a key in one or more of a selector's key sets does not imply
  * that the key is valid or that its channel is open.  Application code should
  * be careful to synchronize and check these conditions as necessary if there
  * is any possibility that another thread will cancel a key or close a channel.
  *
- * <p> A thread blocked in one of the {@link #select()} or {@link
- * #select(long)} methods may be interrupted by some other thread in one of
- * three ways:
+ * <p> A thread blocked in a selection operation may be interrupted by some
+ * other thread in one of three ways:
  *
  * <ul>
  *
  *   <li><p> By invoking the selector's {@link #wakeup wakeup} method,
  *   </p></li>

@@ -354,22 +416,192 @@
      *          If this selector is closed
      */
     public abstract int select() throws IOException;
 
     /**
+     * Selects and performs an action on the keys whose corresponding channels
+     * are ready for I/O operations.
+     *
+     * <p> This method performs a blocking <a href="#selop">selection
+     * operation</a>.  It wakes up from querying the operating system only when
+     * at least one channel is selected, this selector's {@link #wakeup wakeup}
+     * method is invoked, the current thread is interrupted, or the given
+     * timeout period expires, whichever comes first.
+     *
+     * <p> The specified <i>action</i>'s {@link Consumer#accept(Object) accept}
+     * method is invoked with the key for each channel that is ready to perform
+     * an operation identified by its key's interest set.  The {@code accept}
+     * method may be invoked more than once for the same key but with the
+     * ready-operation set containing a subset of the operations for which the
+     * channel is ready (as described above).  The {@code accept} method is
+     * invoked while synchronized on the selector and its selected-key set.
+     * Great care must be taken to avoid deadlocking with other threads that
+     * also synchronize on these objects.  Selection operations are not reentrant
+     * in general and consequently the <i>action</i> should take great care not
+     * to attempt a selection operation on the same selector.  The behavior when
+     * attempting a reentrant selection operation is implementation specific and
+     * therefore not specified.  If the <i>action</i> closes the selector then
+     * {@code ClosedSelectorException} is thrown when the action completes.
+     * The <i>action</i> is not prohibited from closing channels registered with
+     * the selector, nor prohibited from cancelling keys or changing a key's
+     * interest set.  If a channel is selected but its key is cancelled or its
+     * interest set changed before the <i>action</i> is performed on the key
+     * then it is implementation specific as to whether the <i>action</i> is
+     * invoked (it may be invoked with an {@link SelectionKey#isValid() invalid}
+     * key).  Exceptions thrown by the action are relayed to the caller.
+     *
+     * <p> This method does not offer real-time guarantees: It schedules the
+     * timeout as if by invoking the {@link Object#wait(long)} method.
+     *
+     * @implSpec The default implementation removes all keys from the
+     * selected-key set, invokes {@link #select(long) select(long)} with the
+     * given timeout and then performs the action for each key added to the
+     * selected-key set.  The default implementation does not detect the action
+     * performing a reentrant selection operation.  The selected-key set may
+     * or may not be empty on completion of the default implementation.
+     *
+     * @param  action   The action to perform
+     *
+     * @param  timeout  If positive, block for up to {@code timeout}
+     *                  milliseconds, more or less, while waiting for a
+     *                  channel to become ready; if zero, block indefinitely;
+     *                  must not be negative
+     *
+     * @return  The number of unique keys consumed, possibly zero
+     *
+     * @throws  IOException
+     *          If an I/O error occurs
+     *
+     * @throws  ClosedSelectorException
+     *          If this selector is closed or is closed by the action
+     *
+     * @throws  IllegalArgumentException
+     *          If the value of the timeout argument is negative
+     *
+     * @since 11
+     */
+    public int select(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
+        if (timeout < 0)
+            throw new IllegalArgumentException("Negative timeout");
+        return doSelect(Objects.requireNonNull(action), timeout);
+    }
+
+    /**
+     * Selects and performs an action on the keys whose corresponding channels
+     * are ready for I/O operations.
+     *
+     * <p> This method performs a blocking <a href="#selop">selection
+     * operation</a>.  It wakes up from querying the operating system only when
+     * at least one channel is selected, this selector's {@link #wakeup wakeup}
+     * method is invoked, or the current thread is interrupted, whichever comes
+     * first.
+     *
+     * <p> This method is equivalent to invoking the 2-arg
+     * {@link #select(Consumer, long) select} method with a timeout of {@code 0}
+     * to block indefinitely.  </p>
+     *
+     * @implSpec The default implementation invokes the 2-arg {@code select}
+     * method with a timeout of {@code 0}.
+     *
+     * @param  action   The action to perform
+     *
+     * @return  The number of unique keys consumed, possibly zero
+     *
+     * @throws  IOException
+     *          If an I/O error occurs
+     *
+     * @throws  ClosedSelectorException
+     *          If this selector is closed or is closed by the action
+     *
+     * @since 11
+     */
+    public int select(Consumer<SelectionKey> action) throws IOException {
+        return select(action, 0);
+    }
+
+    /**
+     * Selects and performs an action on the keys whose corresponding channels
+     * are ready for I/O operations.
+     *
+     * <p> This method performs a non-blocking <a href="#selop">selection
+     * operation</a>.
+     *
+     * <p> Invoking this method clears the effect of any previous invocations
+     * of the {@link #wakeup wakeup} method.  </p>
+     *
+     * @implSpec The default implementation removes all keys from the
+     * selected-key set, invokes {@link #selectNow() selectNow()} and then
+     * performs the action for each key added to the selected-key set.  The
+     * default implementation does not detect the action performing a reentrant
+     * selection operation.  The selected-key set may or may not be empty on
+     * completion of the default implementation.
+     *
+     * @param  action   The action to perform
+     *
+     * @return  The number of unique keys consumed, possibly zero
+     *
+     * @throws  IOException
+     *          If an I/O error occurs
+     *
+     * @throws  ClosedSelectorException
+     *          If this selector is closed or is closed by the action
+     *
+     * @since 11
+     */
+    public int selectNow(Consumer<SelectionKey> action) throws IOException {
+        return doSelect(Objects.requireNonNull(action), -1);
+    }
+
+    /**
+     * Default implementation of select(Consumer) and selectNow(Consumer).
+     */
+    private int doSelect(Consumer<SelectionKey> action, long timeout)
+        throws IOException
+    {
+        synchronized (this) {
+            Set<SelectionKey> selectedKeys = selectedKeys();
+            synchronized (selectedKeys) {
+                selectedKeys.clear();
+                int numKeySelected;
+                if (timeout < 0) {
+                    numKeySelected = selectNow();
+                } else {
+                    numKeySelected = select(timeout);
+                }
+
+                // copy selected-key set as action may remove keys
+                Set<SelectionKey> keysToConsume = Set.copyOf(selectedKeys);
+                assert keysToConsume.size() == numKeySelected;
+                selectedKeys.clear();
+
+                // invoke action for each selected key
+                keysToConsume.forEach(k -> {
+                    action.accept(k);
+                    if (!isOpen())
+                        throw new ClosedSelectorException();
+                });
+
+                return numKeySelected;
+            }
+        }
+    }
+
+
+    /**
      * Causes the first selection operation that has not yet returned to return
      * immediately.
      *
-     * <p> If another thread is currently blocked in an invocation of the
-     * {@link #select()} or {@link #select(long)} methods then that invocation
-     * will return immediately.  If no selection operation is currently in
-     * progress then the next invocation of one of these methods will return
-     * immediately unless the {@link #selectNow()} method is invoked in the
-     * meantime.  In any case the value returned by that invocation may be
-     * non-zero.  Subsequent invocations of the {@link #select()} or {@link
-     * #select(long)} methods will block as usual unless this method is invoked
-     * again in the meantime.
+     * <p> If another thread is currently blocked in a selection operation then
+     * that invocation will return immediately.  If no selection operation is
+     * currently in progress then the next invocation of a selection operation
+     * will return immediately unless {@link #selectNow()} or {@link
+     * #selectNow(Consumer)} is invoked in the meantime.  In any case the value
+     * returned by that invocation may be non-zero.  Subsequent selection
+     * operations will block as usual unless this method is invoked again in the
+     * meantime.
      *
      * <p> Invoking this method more than once between two successive selection
      * operations has the same effect as invoking it just once.  </p>
      *
      * @return  This selector

@@ -396,7 +628,6 @@
      *
      * @throws  IOException
      *          If an I/O error occurs
      */
     public abstract void close() throws IOException;
-
 }
< prev index next >