< prev index next >

src/java.base/share/classes/sun/nio/ch/ServerSocketChannelImpl.java

Print this page
rev 48993 : imported patch nio

@@ -33,18 +33,20 @@
 import java.net.SocketAddress;
 import java.net.SocketOption;
 import java.net.StandardProtocolFamily;
 import java.net.StandardSocketOptions;
 import java.nio.channels.AlreadyBoundException;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.NotYetBoundException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
 import sun.net.NetHooks;
 

@@ -54,35 +56,35 @@
 
 class ServerSocketChannelImpl
     extends ServerSocketChannel
     implements SelChImpl
 {
-
     // Used to make native close and configure calls
     private static NativeDispatcher nd;
 
     // Our file descriptor
     private final FileDescriptor fd;
     private final int fdVal;
 
-    // ID of native thread currently blocked in this channel, for signalling
-    private volatile long thread;
-
-    // Lock held by thread currently blocked in this channel
+    // Lock held by thread currently blocked on this channel
     private final ReentrantLock acceptLock = new ReentrantLock();
 
     // Lock held by any thread that modifies the state fields declared below
     // DO NOT invoke a blocking I/O operation while holding this lock!
     private final Object stateLock = new Object();
 
     // -- The following fields are protected by stateLock
 
     // Channel state, increases monotonically
-    private static final int ST_UNINITIALIZED = -1;
     private static final int ST_INUSE = 0;
-    private static final int ST_KILLED = 1;
-    private int state = ST_UNINITIALIZED;
+    private static final int ST_CLOSING = 1;
+    private static final int ST_KILLPENDING = 2;
+    private static final int ST_KILLED = 3;
+    private int state;
+
+    // ID of native thread currently blocked in this channel, for signalling
+    private long thread;
 
     // Binding
     private InetSocketAddress localAddress; // null => unbound
 
     // set true when exclusive binding is on and SO_REUSEADDR is emulated

@@ -96,26 +98,32 @@
 
     ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
         super(sp);
         this.fd =  Net.serverSocket(true);
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_INUSE;
     }
 
-    ServerSocketChannelImpl(SelectorProvider sp,
-                            FileDescriptor fd,
-                            boolean bound)
+    ServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
         throws IOException
     {
         super(sp);
         this.fd =  fd;
         this.fdVal = IOUtil.fdVal(fd);
-        this.state = ST_INUSE;
-        if (bound)
+        if (bound) {
+            synchronized (stateLock) {
             localAddress = Net.localAddress(fd);
     }
+        }
+    }
 
+    // @throws ClosedChannelException if channel is closed
+    private void ensureOpen() throws ClosedChannelException {
+        if (!isOpen())
+            throw new ClosedChannelException();
+    }
+
+    @Override
     public ServerSocket socket() {
         synchronized (stateLock) {
             if (socket == null)
                 socket = ServerSocketAdaptor.create(this);
             return socket;

@@ -123,40 +131,35 @@
     }
 
     @Override
     public SocketAddress getLocalAddress() throws IOException {
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            return localAddress == null ? localAddress
-                    : Net.getRevealedLocalAddress(
-                          Net.asInetSocketAddress(localAddress));
+            ensureOpen();
+            return (localAddress == null)
+                    ? null
+                    : Net.getRevealedLocalAddress(localAddress);
         }
     }
 
     @Override
     public <T> ServerSocketChannel setOption(SocketOption<T> name, T value)
         throws IOException
     {
-        if (name == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(name);
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
+            ensureOpen();
 
             if (name == StandardSocketOptions.IP_TOS) {
                 ProtocolFamily family = Net.isIPv6Available() ?
                     StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
                 Net.setSocketOption(fd, family, name, value);
                 return this;
             }
 
-            if (name == StandardSocketOptions.SO_REUSEADDR &&
-                    Net.useExclusiveBind())
-            {
+            if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
                 // SO_REUSEADDR emulated when using exclusive bind
                 isReuseAddress = (Boolean)value;
             } else {
                 // no options that require special handling
                 Net.setSocketOption(fd, Net.UNSPEC, name, value);

@@ -168,21 +171,17 @@
     @Override
     @SuppressWarnings("unchecked")
     public <T> T getOption(SocketOption<T> name)
         throws IOException
     {
-        if (name == null)
-            throw new NullPointerException();
+        Objects.requireNonNull(name);
         if (!supportedOptions().contains(name))
             throw new UnsupportedOperationException("'" + name + "' not supported");
 
         synchronized (stateLock) {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            if (name == StandardSocketOptions.SO_REUSEADDR &&
-                    Net.useExclusiveBind())
-            {
+            ensureOpen();
+            if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
                 // SO_REUSEADDR emulated when using exclusive bind
                 return (T)Boolean.valueOf(isReuseAddress);
             }
             // no options that require special handling
             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);

@@ -191,11 +190,11 @@
 
     private static class DefaultOptionsHolder {
         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 
         private static Set<SocketOption<?>> defaultOptions() {
-            HashSet<SocketOption<?>> set = new HashSet<>(2);
+            HashSet<SocketOption<?>> set = new HashSet<>();
             set.add(StandardSocketOptions.SO_RCVBUF);
             set.add(StandardSocketOptions.SO_REUSEADDR);
             if (Net.isReusePortAvailable()) {
                 set.add(StandardSocketOptions.SO_REUSEPORT);
             }

@@ -207,88 +206,107 @@
     @Override
     public final Set<SocketOption<?>> supportedOptions() {
         return DefaultOptionsHolder.defaultOptions;
     }
 
-    public boolean isBound() {
-        synchronized (stateLock) {
-            return localAddress != null;
-        }
-    }
-
-    public InetSocketAddress localAddress() {
-        synchronized (stateLock) {
-            return localAddress;
-        }
-    }
-
     @Override
     public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
         acceptLock.lock();
         try {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            if (isBound())
+            synchronized (stateLock) {
+                ensureOpen();
+                if (localAddress != null)
                 throw new AlreadyBoundException();
-            InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
-                Net.checkAddress(local);
+                InetSocketAddress isa = (local == null)
+                                        ? new InetSocketAddress(0)
+                                        : Net.checkAddress(local);
             SecurityManager sm = System.getSecurityManager();
             if (sm != null)
                 sm.checkListen(isa.getPort());
             NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
             Net.bind(fd, isa.getAddress(), isa.getPort());
             Net.listen(fd, backlog < 1 ? 50 : backlog);
-            synchronized (stateLock) {
                 localAddress = Net.localAddress(fd);
             }
         } finally {
             acceptLock.unlock();
         }
         return this;
     }
 
+    /**
+     * Marks the beginning of an I/O operation that might block.
+     *
+     * @throws ClosedChannelException if the channel is closed
+     * @throws NotYetBoundException if the channel's socket has not been bound yet
+     */
+    private void begin(boolean blocking) throws ClosedChannelException {
+        if (blocking)
+            begin();  // set blocker to close channel if interrupted
+        synchronized (stateLock) {
+            ensureOpen();
+            if (localAddress == null)
+                throw new NotYetBoundException();
+            if (blocking)
+                thread = NativeThread.current();
+        }
+    }
+
+    /**
+     * Marks the end of an I/O operation that may have blocked.
+     *
+     * @throws AsynchronousCloseException if the channel was closed due to this
+     * thread being interrupted on a blocking I/O operation.
+     */
+    private void end(boolean blocking, boolean completed)
+        throws AsynchronousCloseException
+    {
+        if (blocking) {
+            synchronized (stateLock) {
+                thread = 0;
+                // notify any thread waiting in implCloseSelectableChannel
+                if (state == ST_CLOSING) {
+                    stateLock.notifyAll();
+                }
+            }
+            end(completed);
+        }
+    }
+
+    @Override
     public SocketChannel accept() throws IOException {
         acceptLock.lock();
         try {
-            if (!isOpen())
-                throw new ClosedChannelException();
-            if (!isBound())
-                throw new NotYetBoundException();
-            SocketChannel sc = null;
-
             int n = 0;
             FileDescriptor newfd = new FileDescriptor();
             InetSocketAddress[] isaa = new InetSocketAddress[1];
 
+            boolean blocking = isBlocking();
             try {
-                begin();
-                if (!isOpen())
-                    return null;
-                thread = NativeThread.current();
-                for (;;) {
+                begin(blocking);
+                do {
                     n = accept(this.fd, newfd, isaa);
-                    if ((n == IOStatus.INTERRUPTED) && isOpen())
-                        continue;
-                    break;
-                }
+                } while (n == IOStatus.INTERRUPTED && isOpen());
             } finally {
-                thread = 0;
-                end(n > 0);
+                end(blocking, n > 0);
                 assert IOStatus.check(n);
             }
 
             if (n < 1)
                 return null;
 
+            // newly accepted socket is initially in blocking mode
             IOUtil.configureBlocking(newfd, true);
+
             InetSocketAddress isa = isaa[0];
-            sc = new SocketChannelImpl(provider(), newfd, isa);
+            SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
+
+            // check permitted to accept connections from the remote address
             SecurityManager sm = System.getSecurityManager();
             if (sm != null) {
                 try {
-                    sm.checkAccept(isa.getAddress().getHostAddress(),
-                                   isa.getPort());
+                    sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
                 } catch (SecurityException x) {
                     sc.close();
                     throw x;
                 }
             }

@@ -297,37 +315,137 @@
         } finally {
             acceptLock.unlock();
         }
     }
 
+    @Override
     protected void implConfigureBlocking(boolean block) throws IOException {
+        acceptLock.lock();
+        try {
+            synchronized (stateLock) {
+                ensureOpen();
         IOUtil.configureBlocking(fd, block);
     }
+        } finally {
+            acceptLock.unlock();
+        }
+    }
 
+    /**
+     * Invoked by implCloseChannel to close the channel.
+     *
+     * This method waits for outstanding I/O operations to complete. When in
+     * blocking mode, the socket is pre-closed and the threads in blocking I/O
+     * operations are signalled to ensure that the outstanding I/O operations
+     * complete quickly.
+     *
+     * The socket is closed by this method when it is not registered with a
+     * Selector. Note that a channel configured blocking may be registered with
+     * a Selector. This arises when a key is canceled and the channel configured
+     * to blocking mode before the key is flushed from the Selector.
+     */
+    @Override
     protected void implCloseSelectableChannel() throws IOException {
+        assert !isOpen();
+
+        boolean interrupted = false;
+        boolean blocking;
+
+        // set state to ST_CLOSING
         synchronized (stateLock) {
-            if (state != ST_KILLED)
-                nd.preClose(fd);
+            assert state < ST_CLOSING;
+            state = ST_CLOSING;
+            blocking = isBlocking();
+        }
+
+        // wait for any outstanding accept to complete
+        if (blocking) {
+            synchronized (stateLock) {
+                assert state == ST_CLOSING;
             long th = thread;
-            if (th != 0)
+                if (th != 0) {
+                    nd.preClose(fd);
                 NativeThread.signal(th);
+
+                    // wait for accept operation to end
+                    while (thread != 0) {
+                        try {
+                            stateLock.wait();
+                        } catch (InterruptedException e) {
+                            interrupted = true;
+                        }
+                    }
+                }
+            }
+        } else {
+            // non-blocking mode: wait for accept to complete
+            acceptLock.lock();
+            acceptLock.unlock();
+        }
+
+        // set state to ST_KILLPENDING
+        synchronized (stateLock) {
+            assert state == ST_CLOSING;
+            state = ST_KILLPENDING;
+        }
+
+        // close socket if not registered with Selector
             if (!isRegistered())
                 kill();
-        }
+
+        // restore interrupt status
+        if (interrupted)
+            Thread.currentThread().interrupt();
     }
 
+    @Override
     public void kill() throws IOException {
         synchronized (stateLock) {
-            if (state == ST_KILLED)
-                return;
-            if (state == ST_UNINITIALIZED) {
+            if (state == ST_KILLPENDING) {
                 state = ST_KILLED;
-                return;
-            }
-            assert !isOpen() && !isRegistered();
             nd.close(fd);
-            state = ST_KILLED;
+            }
+        }
+    }
+
+    /**
+     * Returns true if channel's socket is bound
+     */
+    boolean isBound() {
+        synchronized (stateLock) {
+            return localAddress != null;
+        }
+    }
+
+    /**
+     * Returns the local address, or null if not bound
+     */
+    InetSocketAddress localAddress() {
+        synchronized (stateLock) {
+            return localAddress;
+        }
+    }
+
+    /**
+     * Poll this channel's socket for a new connection up to the given timeout.
+     * @return {@code true} if there is a connection to accept
+     */
+    boolean pollAccept(long timeout) throws IOException {
+        assert Thread.holdsLock(blockingLock()) && isBlocking();
+        acceptLock.lock();
+        try {
+            boolean polled = false;
+            try {
+                begin(true);
+                int n = Net.poll(fd, Net.POLLIN, timeout);
+                polled = (n > 0);
+            } finally {
+                end(true, polled);
+            }
+            return polled;
+        } finally {
+            acceptLock.unlock();
         }
     }
 
     /**
      * Translates native poll revent set into a ready operation set

@@ -365,35 +483,10 @@
 
     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
         return translateReadyOps(ops, 0, sk);
     }
 
-    // package-private
-    int poll(int events, long timeout) throws IOException {
-        assert Thread.holdsLock(blockingLock()) && !isBlocking();
-
-        acceptLock.lock();
-        try {
-            int n = 0;
-            try {
-                begin();
-                synchronized (stateLock) {
-                    if (!isOpen())
-                        return 0;
-                    thread = NativeThread.current();
-                }
-                n = Net.poll(fd, events, timeout);
-            } finally {
-                thread = 0;
-                end(n > 0);
-            }
-            return n;
-        } finally {
-            acceptLock.unlock();
-        }
-    }
-
     /**
      * Translates an interest operation set into a native poll event set
      */
     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
         int newOps = 0;

@@ -419,11 +512,11 @@
         sb.append('[');
         if (!isOpen()) {
             sb.append("closed");
         } else {
             synchronized (stateLock) {
-                InetSocketAddress addr = localAddress();
+                InetSocketAddress addr = localAddress;
                 if (addr == null) {
                     sb.append("unbound");
                 } else {
                     sb.append(Net.getRevealedLocalAddressAsString(addr));
                 }

@@ -436,11 +529,12 @@
     /**
      * Accept a connection on a socket.
      *
      * @implNote Wrap native call to allow instrumentation.
      */
-    private int accept(FileDescriptor ssfd, FileDescriptor newfd,
+    private int accept(FileDescriptor ssfd,
+                       FileDescriptor newfd,
                        InetSocketAddress[] isaa)
         throws IOException
     {
         return accept0(ssfd, newfd, isaa);
     }

@@ -450,11 +544,12 @@
     // Accepts a new connection, setting the given file descriptor to refer to
     // the new socket and setting isaa[0] to the socket's remote address.
     // Returns 1 on success, or IOStatus.UNAVAILABLE (if non-blocking and no
     // connections are pending) or IOStatus.INTERRUPTED.
     //
-    private native int accept0(FileDescriptor ssfd, FileDescriptor newfd,
+    private native int accept0(FileDescriptor ssfd,
+                               FileDescriptor newfd,
                                InetSocketAddress[] isaa)
         throws IOException;
 
     private static native void initIDs();
 
< prev index next >