< prev index next >

src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page
rev 48993 : imported patch nio

*** 46,55 **** --- 46,56 ---- import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Collections; import java.util.HashSet; + import java.util.Objects; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; import sun.net.NetHooks; import sun.net.ext.ExtendedSocketOptions;
*** 60,113 **** class SocketChannelImpl extends SocketChannel implements SelChImpl { - // Used to make native read and write calls private static NativeDispatcher nd; // Our file descriptor object private final FileDescriptor fd; private final int fdVal; - // IDs of native threads doing reads and writes, for signalling - private volatile long readerThread; - private volatile long writerThread; - // Lock held by current reading or connecting thread private final ReentrantLock readLock = new ReentrantLock(); // Lock held by current writing or connecting thread private final ReentrantLock writeLock = new ReentrantLock(); // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! private final Object stateLock = new Object(); // -- The following fields are protected by stateLock // set true when exclusive binding is on and SO_REUSEADDR is emulated private boolean isReuseAddress; // State, increases monotonically - private static final int ST_UNINITIALIZED = -1; private static final int ST_UNCONNECTED = 0; ! private static final int ST_PENDING = 1; private static final int ST_CONNECTED = 2; ! private static final int ST_KILLPENDING = 3; ! private static final int ST_KILLED = 4; ! private int state = ST_UNINITIALIZED; // Binding private InetSocketAddress localAddress; private InetSocketAddress remoteAddress; - // Input/Output open - private boolean isInputOpen = true; - private boolean isOutputOpen = true; - // Socket adaptor, created on demand private Socket socket; // -- End of fields protected by stateLock --- 61,113 ---- class SocketChannelImpl extends SocketChannel implements SelChImpl { // Used to make native read and write calls private static NativeDispatcher nd; // Our file descriptor object private final FileDescriptor fd; private final int fdVal; // Lock held by current reading or connecting thread private final ReentrantLock readLock = new ReentrantLock(); // Lock held by current writing or connecting thread private final ReentrantLock writeLock = new ReentrantLock(); // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! private final Object stateLock = new Object(); + // Input/Output closed + private volatile boolean isInputClosed; + private volatile boolean isOutputClosed; + // -- The following fields are protected by stateLock // set true when exclusive binding is on and SO_REUSEADDR is emulated private boolean isReuseAddress; // State, increases monotonically private static final int ST_UNCONNECTED = 0; ! private static final int ST_CONNECTIONPENDING = 1; private static final int ST_CONNECTED = 2; ! private static final int ST_CLOSING = 3; ! private static final int ST_KILLPENDING = 4; ! private static final int ST_KILLED = 5; ! private int state; ! ! // IDs of native threads doing reads and writes, for signalling ! private long readerThread; ! private long writerThread; // Binding private InetSocketAddress localAddress; private InetSocketAddress remoteAddress; // Socket adaptor, created on demand private Socket socket; // -- End of fields protected by stateLock
*** 116,155 **** // SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.socket(true); this.fdVal = IOUtil.fdVal(fd); - this.state = ST_UNCONNECTED; } ! SocketChannelImpl(SelectorProvider sp, ! FileDescriptor fd, ! boolean bound) throws IOException { super(sp); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); ! this.state = ST_UNCONNECTED; ! if (bound) this.localAddress = Net.localAddress(fd); } // Constructor for sockets obtained from server sockets // ! SocketChannelImpl(SelectorProvider sp, ! FileDescriptor fd, InetSocketAddress remote) throws IOException { super(sp); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); ! this.state = ST_CONNECTED; this.localAddress = Net.localAddress(fd); ! this.remoteAddress = remote; } public Socket socket() { synchronized (stateLock) { if (socket == null) socket = SocketAdaptor.create(this); return socket; --- 116,162 ---- // SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.socket(true); this.fdVal = IOUtil.fdVal(fd); } ! SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) throws IOException { super(sp); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); ! if (bound) { ! synchronized (stateLock) { this.localAddress = Net.localAddress(fd); } + } + } // Constructor for sockets obtained from server sockets // ! SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) throws IOException { super(sp); this.fd = fd; this.fdVal = IOUtil.fdVal(fd); ! synchronized (stateLock) { this.localAddress = Net.localAddress(fd); ! this.remoteAddress = isa; ! this.state = ST_CONNECTED; ! } ! } ! ! // @throws ClosedChannelException if channel is closed ! private void ensureOpen() throws ClosedChannelException { ! if (!isOpen()) ! throw new ClosedChannelException(); } + @Override public Socket socket() { synchronized (stateLock) { if (socket == null) socket = SocketAdaptor.create(this); return socket;
*** 157,193 **** } @Override public SocketAddress getLocalAddress() throws IOException { synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); return Net.getRevealedLocalAddress(localAddress); } } @Override public SocketAddress getRemoteAddress() throws IOException { synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); return remoteAddress; } } @Override public <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException { ! if (name == null) ! throw new NullPointerException(); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); if (name == StandardSocketOptions.IP_TOS) { ProtocolFamily family = Net.isIPv6Available() ? StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; Net.setSocketOption(fd, family, name, value); --- 164,196 ---- } @Override public SocketAddress getLocalAddress() throws IOException { synchronized (stateLock) { ! ensureOpen(); return Net.getRevealedLocalAddress(localAddress); } } @Override public SocketAddress getRemoteAddress() throws IOException { synchronized (stateLock) { ! ensureOpen(); return remoteAddress; } } @Override public <T> SocketChannel setOption(SocketOption<T> name, T value) throws IOException { ! Objects.requireNonNull(name); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ! ensureOpen(); if (name == StandardSocketOptions.IP_TOS) { ProtocolFamily family = Net.isIPv6Available() ? StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; Net.setSocketOption(fd, family, name, value);
*** 209,230 **** @Override @SuppressWarnings("unchecked") public <T> T getOption(SocketOption<T> name) throws IOException { ! if (name == null) ! throw new NullPointerException(); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); ! if (name == StandardSocketOptions.SO_REUSEADDR && ! Net.useExclusiveBind()) ! { // SO_REUSEADDR emulated when using exclusive bind return (T)Boolean.valueOf(isReuseAddress); } // special handling for IP_TOS: always return 0 when IPv6 --- 212,229 ---- @Override @SuppressWarnings("unchecked") public <T> T getOption(SocketOption<T> name) throws IOException { ! Objects.requireNonNull(name); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ! ensureOpen(); ! if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { // SO_REUSEADDR emulated when using exclusive bind return (T)Boolean.valueOf(isReuseAddress); } // special handling for IP_TOS: always return 0 when IPv6
*** 241,251 **** private static class DefaultOptionsHolder { static final Set<SocketOption<?>> defaultOptions = defaultOptions(); private static Set<SocketOption<?>> defaultOptions() { ! HashSet<SocketOption<?>> set = new HashSet<>(8); set.add(StandardSocketOptions.SO_SNDBUF); set.add(StandardSocketOptions.SO_RCVBUF); set.add(StandardSocketOptions.SO_KEEPALIVE); set.add(StandardSocketOptions.SO_REUSEADDR); if (Net.isReusePortAvailable()) { --- 240,250 ---- private static class DefaultOptionsHolder { static final Set<SocketOption<?>> defaultOptions = defaultOptions(); private static Set<SocketOption<?>> defaultOptions() { ! HashSet<SocketOption<?>> set = new HashSet<>(); set.add(StandardSocketOptions.SO_SNDBUF); set.add(StandardSocketOptions.SO_RCVBUF); set.add(StandardSocketOptions.SO_KEEPALIVE); set.add(StandardSocketOptions.SO_REUSEADDR); if (Net.isReusePortAvailable()) {
*** 254,598 **** set.add(StandardSocketOptions.SO_LINGER); set.add(StandardSocketOptions.TCP_NODELAY); // additional options required by socket adaptor set.add(StandardSocketOptions.IP_TOS); set.add(ExtendedSocketOption.SO_OOBINLINE); ! ExtendedSocketOptions extendedOptions = ! ExtendedSocketOptions.getInstance(); ! set.addAll(extendedOptions.options()); return Collections.unmodifiableSet(set); } } @Override public final Set<SocketOption<?>> supportedOptions() { return DefaultOptionsHolder.defaultOptions; } ! private boolean ensureReadOpen() throws ClosedChannelException { ! synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); ! if (!isConnected()) ! throw new NotYetConnectedException(); ! if (!isInputOpen) ! return false; ! else ! return true; ! } } - - private void ensureWriteOpen() throws ClosedChannelException { synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); ! if (!isOutputOpen) ! throw new ClosedChannelException(); ! if (!isConnected()) throw new NotYetConnectedException(); } } ! private void readerCleanup() throws IOException { synchronized (stateLock) { readerThread = 0; ! if (state == ST_KILLPENDING) ! kill(); } } ! ! private void writerCleanup() throws IOException { ! synchronized (stateLock) { ! writerThread = 0; ! if (state == ST_KILLPENDING) ! kill(); } } public int read(ByteBuffer buf) throws IOException { ! ! if (buf == null) ! throw new NullPointerException(); readLock.lock(); try { ! if (!ensureReadOpen()) ! return -1; int n = 0; try { ! // Set up the interruption machinery; see ! // AbstractInterruptibleChannel for details ! // ! begin(); ! ! synchronized (stateLock) { ! if (!isOpen()) { ! // Either the current thread is already interrupted, so ! // begin() closed the channel, or another thread closed the ! // channel since we checked it a few bytecodes ago. In ! // either case the value returned here is irrelevant since ! // the invocation of end() in the finally block will throw ! // an appropriate exception. ! // ! return 0; ! ! } ! ! // Save this thread so that it can be signalled on those ! // platforms that require it ! // ! readerThread = NativeThread.current(); ! } ! // Between the previous test of isOpen() and the return of the ! // IOUtil.read invocation below, this channel might be closed ! // or this thread might be interrupted. We rely upon the ! // implicit synchronization point in the kernel read() call to ! // make sure that the right thing happens. In either case the ! // implCloseSelectableChannel method is ultimately invoked in ! // some other thread, so there are three possibilities: ! // ! // - implCloseSelectableChannel() invokes nd.preClose() ! // before this thread invokes read(), in which case the ! // read returns immediately with either EOF or an error, ! // the latter of which will cause an IOException to be ! // thrown. ! // ! // - implCloseSelectableChannel() invokes nd.preClose() after ! // this thread is blocked in read(). On some operating ! // systems (e.g., Solaris and Windows) this causes the read ! // to return immediately with either EOF or an error ! // indication. ! // ! // - implCloseSelectableChannel() invokes nd.preClose() after ! // this thread is blocked in read() but the operating ! // system (e.g., Linux) doesn't support preemptive close, ! // so implCloseSelectableChannel() proceeds to signal this ! // thread, thereby causing the read to return immediately ! // with IOStatus.INTERRUPTED. ! // ! // In all three cases the invocation of end() in the finally ! // clause will notice that the channel has been closed and ! // throw an appropriate exception (AsynchronousCloseException ! // or ClosedByInterruptException) if necessary. ! // ! // *There is A fourth possibility. implCloseSelectableChannel() ! // invokes nd.preClose(), signals reader/writer thred and quickly ! // moves on to nd.close() in kill(), which does a real close. ! // Then a third thread accepts a new connection, opens file or ! // whatever that causes the released "fd" to be recycled. All ! // above happens just between our last isOpen() check and the ! // next kernel read reached, with the recycled "fd". The solution ! // is to postpone the real kill() if there is a reader or/and ! // writer thread(s) over there "waiting", leave the cleanup/kill ! // to the reader or writer thread. (the preClose() still happens ! // so the connection gets cut off as usual). ! // ! // For socket channels there is the additional wrinkle that ! // asynchronous shutdown works much like asynchronous close, ! // except that the channel is shutdown rather than completely ! // closed. This is analogous to the first two cases above, ! // except that the shutdown operation plays the role of ! // nd.preClose(). ! for (;;) { n = IOUtil.read(fd, buf, -1, nd); - if ((n == IOStatus.INTERRUPTED) && isOpen()) { - // The system call was interrupted but the channel - // is still open, so retry - continue; - } - return IOStatus.normalize(n); } - } finally { ! readerCleanup(); // Clear reader thread ! // The end method, which is defined in our superclass ! // AbstractInterruptibleChannel, resets the interruption ! // machinery. If its argument is true then it returns ! // normally; otherwise it checks the interrupt and open state ! // of this channel and throws an appropriate exception if ! // necessary. ! // ! // So, if we actually managed to do any I/O in the above try ! // block then we pass true to the end method. We also pass ! // true if the channel was in non-blocking mode when the I/O ! // operation was initiated but no data could be transferred; ! // this prevents spurious exceptions from being thrown in the ! // rare event that a channel is closed or a thread is ! // interrupted at the exact moment that a non-blocking I/O ! // request is made. ! // ! end(n > 0 || (n == IOStatus.UNAVAILABLE)); ! ! // Extra case for socket channels: Asynchronous shutdown ! // ! synchronized (stateLock) { ! if ((n <= 0) && (!isInputOpen)) return IOStatus.EOF; } ! ! assert IOStatus.check(n); ! ! } } finally { readLock.unlock(); } } public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { ! if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) ! throw new IndexOutOfBoundsException(); readLock.lock(); try { ! if (!ensureReadOpen()) ! return -1; long n = 0; try { ! begin(); ! synchronized (stateLock) { ! if (!isOpen()) ! return 0; ! readerThread = NativeThread.current(); ! } ! for (;;) { n = IOUtil.read(fd, dsts, offset, length, nd); - if ((n == IOStatus.INTERRUPTED) && isOpen()) - continue; - return IOStatus.normalize(n); } } finally { ! readerCleanup(); ! end(n > 0 || (n == IOStatus.UNAVAILABLE)); ! synchronized (stateLock) { ! if ((n <= 0) && (!isInputOpen)) return IOStatus.EOF; } ! assert IOStatus.check(n); ! } } finally { readLock.unlock(); } } public int write(ByteBuffer buf) throws IOException { ! if (buf == null) ! throw new NullPointerException(); writeLock.lock(); try { ! ensureWriteOpen(); int n = 0; try { ! begin(); ! synchronized (stateLock) { ! if (!isOpen()) ! return 0; ! writerThread = NativeThread.current(); ! } ! for (;;) { n = IOUtil.write(fd, buf, -1, nd); - if ((n == IOStatus.INTERRUPTED) && isOpen()) - continue; - return IOStatus.normalize(n); } } finally { ! writerCleanup(); ! end(n > 0 || (n == IOStatus.UNAVAILABLE)); ! synchronized (stateLock) { ! if ((n <= 0) && (!isOutputOpen)) throw new AsynchronousCloseException(); } ! assert IOStatus.check(n); ! } } finally { writeLock.unlock(); } } public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { ! if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) ! throw new IndexOutOfBoundsException(); writeLock.lock(); try { ! ensureWriteOpen(); long n = 0; try { ! begin(); ! synchronized (stateLock) { ! if (!isOpen()) ! return 0; ! writerThread = NativeThread.current(); ! } ! for (;;) { n = IOUtil.write(fd, srcs, offset, length, nd); - if ((n == IOStatus.INTERRUPTED) && isOpen()) - continue; - return IOStatus.normalize(n); } } finally { ! writerCleanup(); ! end((n > 0) || (n == IOStatus.UNAVAILABLE)); ! synchronized (stateLock) { ! if ((n <= 0) && (!isOutputOpen)) throw new AsynchronousCloseException(); } ! assert IOStatus.check(n); ! } } finally { writeLock.unlock(); } } ! // package-private int sendOutOfBandData(byte b) throws IOException { writeLock.lock(); try { ! ensureWriteOpen(); int n = 0; try { ! begin(); ! synchronized (stateLock) { ! if (!isOpen()) ! return 0; ! writerThread = NativeThread.current(); ! } ! for (;;) { n = sendOutOfBandData(fd, b); - if ((n == IOStatus.INTERRUPTED) && isOpen()) - continue; - return IOStatus.normalize(n); } } finally { ! writerCleanup(); ! end((n > 0) || (n == IOStatus.UNAVAILABLE)); ! synchronized (stateLock) { ! if ((n <= 0) && (!isOutputOpen)) throw new AsynchronousCloseException(); } ! assert IOStatus.check(n); ! } } finally { writeLock.unlock(); } } protected void implConfigureBlocking(boolean block) throws IOException { IOUtil.configureBlocking(fd, block); } ! public InetSocketAddress localAddress() { synchronized (stateLock) { return localAddress; } } ! public SocketAddress remoteAddress() { synchronized (stateLock) { return remoteAddress; } } --- 253,543 ---- set.add(StandardSocketOptions.SO_LINGER); set.add(StandardSocketOptions.TCP_NODELAY); // additional options required by socket adaptor set.add(StandardSocketOptions.IP_TOS); set.add(ExtendedSocketOption.SO_OOBINLINE); ! set.addAll(ExtendedSocketOptions.getInstance().options()); return Collections.unmodifiableSet(set); } } @Override public final Set<SocketOption<?>> supportedOptions() { return DefaultOptionsHolder.defaultOptions; } ! /** ! * Marks the beginning of a read operation that might block. ! * ! * @throws ClosedChannelException if the channel is closed ! * @throws NotYetConnectedException if the channel is not yet connected ! */ ! private void beginRead(boolean blocking) throws ClosedChannelException { ! if (blocking) { ! // set hook for Thread.interrupt ! begin(); } synchronized (stateLock) { ! ensureOpen(); ! if (state != ST_CONNECTED) throw new NotYetConnectedException(); + if (blocking) + readerThread = NativeThread.current(); } } ! /** ! * Marks the end of a read operation that may have blocked. ! * ! * @throws AsynchronousCloseException if the channel was closed due to this ! * thread being interrupted on a blocking read operation. ! */ ! private void endRead(boolean blocking, boolean completed) ! throws AsynchronousCloseException ! { ! if (blocking) { synchronized (stateLock) { readerThread = 0; ! // notify any thread waiting in implCloseSelectableChannel ! if (state == ST_CLOSING) { ! stateLock.notifyAll(); } } ! // remove hook for Thread.interrupt ! end(completed); } } + @Override public int read(ByteBuffer buf) throws IOException { ! Objects.requireNonNull(buf); readLock.lock(); try { ! boolean blocking = isBlocking(); int n = 0; try { + beginRead(blocking); ! // check if input is shutdown ! if (isInputClosed) ! return IOStatus.EOF; ! if (blocking) { ! do { ! n = IOUtil.read(fd, buf, -1, nd); ! } while (n == IOStatus.INTERRUPTED && isOpen()); ! } else { n = IOUtil.read(fd, buf, -1, nd); } } finally { ! endRead(blocking, n > 0); ! if (n <= 0 && isInputClosed) return IOStatus.EOF; } ! return IOStatus.normalize(n); } finally { readLock.unlock(); } } + @Override public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { ! Objects.checkFromIndexSize(offset, length, dsts.length); ! readLock.lock(); try { ! boolean blocking = isBlocking(); long n = 0; try { ! beginRead(blocking); ! ! // check if input is shutdown ! if (isInputClosed) ! return IOStatus.EOF; ! if (blocking) { ! do { ! n = IOUtil.read(fd, dsts, offset, length, nd); ! } while (n == IOStatus.INTERRUPTED && isOpen()); ! } else { n = IOUtil.read(fd, dsts, offset, length, nd); } } finally { ! endRead(blocking, n > 0); ! if (n <= 0 && isInputClosed) return IOStatus.EOF; } ! return IOStatus.normalize(n); } finally { readLock.unlock(); } } + /** + * Marks the beginning of a write operation that might block. + * + * @throws ClosedChannelException if the channel is closed or output shutdown + * @throws NotYetConnectedException if the channel is not yet connected + */ + private void beginWrite(boolean blocking) throws ClosedChannelException { + if (blocking) { + // set hook for Thread.interrupt + begin(); + } + synchronized (stateLock) { + ensureOpen(); + if (isOutputClosed) + throw new ClosedChannelException(); + if (state != ST_CONNECTED) + throw new NotYetConnectedException(); + if (blocking) + writerThread = NativeThread.current(); + } + } + + /** + * Marks the end of a write operation that may have blocked. + * + * @throws AsynchronousCloseException if the channel was closed due to this + * thread being interrupted on a blocking write operation. + */ + private void endWrite(boolean blocking, boolean completed) + throws AsynchronousCloseException + { + if (blocking) { + synchronized (stateLock) { + writerThread = 0; + // notify any thread waiting in implCloseSelectableChannel + if (state == ST_CLOSING) { + stateLock.notifyAll(); + } + } + // remove hook for Thread.interrupt + end(completed); + } + } + + @Override public int write(ByteBuffer buf) throws IOException { ! Objects.requireNonNull(buf); ! writeLock.lock(); try { ! boolean blocking = isBlocking(); int n = 0; try { ! beginWrite(blocking); ! if (blocking) { ! do { ! n = IOUtil.write(fd, buf, -1, nd); ! } while (n == IOStatus.INTERRUPTED && isOpen()); ! } else { n = IOUtil.write(fd, buf, -1, nd); } } finally { ! endWrite(blocking, n > 0); ! if (n <= 0 && isOutputClosed) throw new AsynchronousCloseException(); } ! return IOStatus.normalize(n); } finally { writeLock.unlock(); } } + @Override public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { ! Objects.checkFromIndexSize(offset, length, srcs.length); ! writeLock.lock(); try { ! boolean blocking = isBlocking(); long n = 0; try { ! beginWrite(blocking); ! if (blocking) { ! do { ! n = IOUtil.write(fd, srcs, offset, length, nd); ! } while (n == IOStatus.INTERRUPTED && isOpen()); ! } else { n = IOUtil.write(fd, srcs, offset, length, nd); } } finally { ! endWrite(blocking, n > 0); ! if (n <= 0 && isOutputClosed) throw new AsynchronousCloseException(); } ! return IOStatus.normalize(n); } finally { writeLock.unlock(); } } ! /** ! * Writes a byte of out of band data. ! */ int sendOutOfBandData(byte b) throws IOException { writeLock.lock(); try { ! boolean blocking = isBlocking(); int n = 0; try { ! beginWrite(blocking); ! if (blocking) { ! do { ! n = sendOutOfBandData(fd, b); ! } while (n == IOStatus.INTERRUPTED && isOpen()); ! } else { n = sendOutOfBandData(fd, b); } } finally { ! endWrite(blocking, n > 0); ! if (n <= 0 && isOutputClosed) throw new AsynchronousCloseException(); } ! return IOStatus.normalize(n); } finally { writeLock.unlock(); } } + @Override protected void implConfigureBlocking(boolean block) throws IOException { + readLock.lock(); + try { + writeLock.lock(); + try { + synchronized (stateLock) { + ensureOpen(); IOUtil.configureBlocking(fd, block); } + } finally { + writeLock.unlock(); + } + } finally { + readLock.unlock(); + } + } ! /** ! * Returns the local address, or null if not bound ! */ ! InetSocketAddress localAddress() { synchronized (stateLock) { return localAddress; } } ! /** ! * Returns the remote address, or null if not connected ! */ ! InetSocketAddress remoteAddress() { synchronized (stateLock) { return remoteAddress; } }
*** 601,613 **** readLock.lock(); try { writeLock.lock(); try { synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); ! if (state == ST_PENDING) throw new ConnectionPendingException(); if (localAddress != null) throw new AlreadyBoundException(); InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : Net.checkAddress(local); --- 546,557 ---- readLock.lock(); try { writeLock.lock(); try { synchronized (stateLock) { ! ensureOpen(); ! if (state == ST_CONNECTIONPENDING) throw new ConnectionPendingException(); if (localAddress != null) throw new AlreadyBoundException(); InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : Net.checkAddress(local);
*** 626,931 **** readLock.unlock(); } return this; } public boolean isConnected() { synchronized (stateLock) { return (state == ST_CONNECTED); } } public boolean isConnectionPending() { synchronized (stateLock) { ! return (state == ST_PENDING); } } ! void ensureOpenAndUnconnected() throws IOException { // package-private synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); if (state == ST_CONNECTED) throw new AlreadyConnectedException(); ! if (state == ST_PENDING) throw new ConnectionPendingException(); } } public boolean connect(SocketAddress sa) throws IOException { - readLock.lock(); - try { - writeLock.lock(); - try { - ensureOpenAndUnconnected(); InetSocketAddress isa = Net.checkAddress(sa); SecurityManager sm = System.getSecurityManager(); if (sm != null) ! sm.checkConnect(isa.getAddress().getHostAddress(), ! isa.getPort()); ! synchronized (blockingLock()) { ! int n = 0; try { try { ! begin(); synchronized (stateLock) { ! if (!isOpen()) { ! return false; ! } ! // notify hook only if unbound ! if (localAddress == null) { ! NetHooks.beforeTcpConnect(fd, ! isa.getAddress(), ! isa.getPort()); } - readerThread = NativeThread.current(); } ! for (;;) { InetAddress ia = isa.getAddress(); if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); - n = Net.connect(fd, - ia, - isa.getPort()); - if ((n == IOStatus.INTERRUPTED) && isOpen()) - continue; - break; - } } finally { ! readerCleanup(); ! end((n > 0) || (n == IOStatus.UNAVAILABLE)); ! assert IOStatus.check(n); } } catch (IOException x) { ! // If an exception was thrown, close the channel after ! // invoking end() so as to avoid bogus ! // AsynchronousCloseExceptions close(); throw x; } synchronized (stateLock) { remoteAddress = isa; if (n > 0) { ! ! // Connection succeeded; disallow further ! // invocation ! state = ST_CONNECTED; ! if (isOpen()) localAddress = Net.localAddress(fd); return true; ! } ! // If nonblocking and no exception then connection ! // pending; disallow another invocation ! if (!isBlocking()) ! state = ST_PENDING; ! else ! assert false; } } - return false; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } public boolean finishConnect() throws IOException { readLock.lock(); try { writeLock.lock(); try { synchronized (stateLock) { - if (!isOpen()) - throw new ClosedChannelException(); if (state == ST_CONNECTED) return true; - if (state != ST_PENDING) - throw new NoConnectionPendingException(); } int n = 0; try { try { ! begin(); ! synchronized (blockingLock()) { ! synchronized (stateLock) { ! if (!isOpen()) { ! return false; ! } ! readerThread = NativeThread.current(); ! } ! if (!isBlocking()) { ! for (;;) { ! n = checkConnect(fd, false); ! if ((n == IOStatus.INTERRUPTED) && isOpen()) ! continue; ! break; ! } ! } else { ! for (;;) { n = checkConnect(fd, true); ! if (n == 0) { ! // Loop in case of ! // spurious notifications ! continue; ! } ! if ((n == IOStatus.INTERRUPTED) && isOpen()) ! continue; ! break; ! } ! } } } finally { ! synchronized (stateLock) { ! readerThread = 0; ! if (state == ST_KILLPENDING) { ! kill(); ! // poll()/getsockopt() does not report ! // error (throws exception, with n = 0) ! // on Linux platform after dup2 and ! // signal-wakeup. Force n to 0 so the ! // end() can throw appropriate exception ! n = 0; ! } ! } ! end((n > 0) || (n == IOStatus.UNAVAILABLE)); ! assert IOStatus.check(n); } } catch (IOException x) { - // If an exception was thrown, close the channel after - // invoking end() so as to avoid bogus - // AsynchronousCloseExceptions close(); throw x; } ! if (n > 0) { synchronized (stateLock) { ! state = ST_CONNECTED; ! if (isOpen()) localAddress = Net.localAddress(fd); ! } return true; ! } return false; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } @Override public SocketChannel shutdownInput() throws IOException { synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); if (!isConnected()) throw new NotYetConnectedException(); ! if (isInputOpen) { Net.shutdown(fd, Net.SHUT_RD); ! if (readerThread != 0) ! NativeThread.signal(readerThread); ! isInputOpen = false; } return this; } } @Override public SocketChannel shutdownOutput() throws IOException { synchronized (stateLock) { ! if (!isOpen()) ! throw new ClosedChannelException(); if (!isConnected()) throw new NotYetConnectedException(); ! if (isOutputOpen) { Net.shutdown(fd, Net.SHUT_WR); ! if (writerThread != 0) ! NativeThread.signal(writerThread); ! isOutputOpen = false; } return this; } } ! public boolean isInputOpen() { ! synchronized (stateLock) { ! return isInputOpen; ! } } ! public boolean isOutputOpen() { ! synchronized (stateLock) { ! return isOutputOpen; ! } } ! // AbstractInterruptibleChannel synchronizes invocations of this method ! // using AbstractInterruptibleChannel.closeLock, and also ensures that this ! // method is only ever invoked once. Before we get to this method, isOpen ! // (which is volatile) will have been set to false. ! // ! protected void implCloseSelectableChannel() throws IOException { ! synchronized (stateLock) { ! isInputOpen = false; ! isOutputOpen = false; ! ! // Close the underlying file descriptor and dup it to a known fd ! // that's already closed. This prevents other operations on this ! // channel from using the old fd, which might be recycled in the ! // meantime and allocated to an entirely different channel. ! // ! if (state != ST_KILLED) ! nd.preClose(fd); ! ! // Signal native threads, if needed. If a target thread is not ! // currently blocked in an I/O operation then no harm is done since ! // the signal handler doesn't actually do anything. ! // ! if (readerThread != 0) ! NativeThread.signal(readerThread); ! ! if (writerThread != 0) ! NativeThread.signal(writerThread); ! // If this channel is not registered then it's safe to close the fd ! // immediately since we know at this point that no thread is ! // blocked in an I/O operation upon the channel and, since the ! // channel is marked closed, no thread will start another such ! // operation. If this channel is registered then we don't close ! // the fd since it might be in use by a selector. In that case ! // closing this channel caused its keys to be cancelled, so the ! // last selector to deregister a key for this channel will invoke ! // kill() to close the fd. ! // ! if (!isRegistered()) ! kill(); } } - - public void kill() throws IOException { - synchronized (stateLock) { - if (state == ST_KILLED) - return; - if (state == ST_UNINITIALIZED) { - state = ST_KILLED; - return; } - assert !isOpen() && !isRegistered(); ! // Postpone the kill if there is a waiting reader ! // or writer thread. See the comments in read() for ! // more detailed explanation. ! if (readerThread == 0 && writerThread == 0) { ! nd.close(fd); ! state = ST_KILLED; ! } else { ! state = ST_KILLPENDING; } } } /** * Translates native poll revent ops into a ready operation ops --- 570,976 ---- readLock.unlock(); } return this; } + @Override public boolean isConnected() { synchronized (stateLock) { return (state == ST_CONNECTED); } } + @Override public boolean isConnectionPending() { synchronized (stateLock) { ! return (state == ST_CONNECTIONPENDING); } } ! /** ! * Marks the beginning of a connect operation that might block. ! * ! * @throws ClosedChannelException if the channel is closed ! * @throws AlreadyConnectedException if already connected ! * @throws ConnectionPendingException is a connection is pending ! */ ! private void beginConnect(boolean blocking) throws ClosedChannelException { ! if (blocking) { ! // set hook for Thread.interrupt ! begin(); ! } synchronized (stateLock) { ! ensureOpen(); if (state == ST_CONNECTED) throw new AlreadyConnectedException(); ! if (state == ST_CONNECTIONPENDING) throw new ConnectionPendingException(); + if (blocking) + readerThread = NativeThread.current(); + } } + + /** + * Marks the end of a connect operation that may have blocked. + * + * @throws AsynchronousCloseException if the channel was closed due to this + * thread being interrupted on a blocking connect operation. + */ + private void endConnect(boolean blocking, boolean completed) + throws AsynchronousCloseException + { + endRead(blocking, completed); } + @Override public boolean connect(SocketAddress sa) throws IOException { InetSocketAddress isa = Net.checkAddress(sa); SecurityManager sm = System.getSecurityManager(); if (sm != null) ! sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); ! ! readLock.lock(); try { + writeLock.lock(); try { ! // notify before-connect hook synchronized (stateLock) { ! if (state == ST_UNCONNECTED && localAddress == null) { ! NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); } } ! InetAddress ia = isa.getAddress(); if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); + int n = 0; + boolean blocking = isBlocking(); + try { + try { + beginConnect(blocking); + if (blocking) { + do { + n = Net.connect(fd, ia, isa.getPort()); + } while (n == IOStatus.INTERRUPTED && isOpen()); + } else { + n = Net.connect(fd, ia, isa.getPort()); + } } finally { ! endConnect(blocking, n > 0); } } catch (IOException x) { ! // connect failed, close socket close(); throw x; } + + // connection may be established synchronized (stateLock) { + if (!isOpen()) + throw new AsynchronousCloseException(); remoteAddress = isa; if (n > 0) { ! // connected established localAddress = Net.localAddress(fd); + state = ST_CONNECTED; return true; ! } else { ! // connection pending ! assert !blocking; ! state = ST_CONNECTIONPENDING; ! return false; } } } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } + /** + * Marks the beginning of a finishConnect operation that might block. + * + * @throws ClosedChannelException if the channel is closed + * @throws NoConnectionPendingException if no connection is pending + */ + private void beginFinishConnect(boolean blocking) throws ClosedChannelException { + if (blocking) { + // set hook for Thread.interrupt + begin(); + } + synchronized (stateLock) { + ensureOpen(); + if (state != ST_CONNECTIONPENDING) + throw new NoConnectionPendingException(); + if (blocking) + readerThread = NativeThread.current(); + } + } + + /** + * Marks the end of a finishConnect operation that may have blocked. + * + * @throws AsynchronousCloseException if the channel was closed due to this + * thread being interrupted on a blocking connect operation. + */ + private void endFinishConnect(boolean blocking, boolean completed) + throws AsynchronousCloseException + { + endRead(blocking, completed); + } + + @Override public boolean finishConnect() throws IOException { readLock.lock(); try { writeLock.lock(); try { + // already connected? synchronized (stateLock) { if (state == ST_CONNECTED) return true; } + int n = 0; + boolean blocking = isBlocking(); try { try { ! beginFinishConnect(blocking); ! if (blocking) { ! do { n = checkConnect(fd, true); ! } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen()); ! } else { ! n = checkConnect(fd, false); } } finally { ! endFinishConnect(blocking, n > 0); } } catch (IOException x) { close(); throw x; } ! ! // post finishConnect, connection may be established synchronized (stateLock) { ! if (!isOpen()) ! throw new AsynchronousCloseException(); ! if (n > 0) { ! // connection established localAddress = Net.localAddress(fd); ! state = ST_CONNECTED; return true; ! } else { ! // connection still pending ! assert !blocking; return false; + } + } } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } + /** + * Invoked by implCloseChannel to close the channel. + * + * This method waits for outstanding I/O operations to complete. When in + * blocking mode, the socket is pre-closed and the threads in blocking I/O + * operations are signalled to ensure that the outstanding I/O operations + * complete quickly. + * + * If the socket is connected then it is shutdown by this method. The + * shutdown ensures that the peer reads EOF for the case that the socket is + * not pre-closed or closed by this method. + * + * The socket is closed by this method when it is not registered with a + * Selector. Note that a channel configured blocking may be registered with + * a Selector. This arises when a key is canceled and the channel configured + * to blocking mode before the key is flushed from the Selector. + */ + @Override + protected void implCloseSelectableChannel() throws IOException { + assert !isOpen(); + + boolean blocking; + boolean connected; + boolean interrupted = false; + + // set state to ST_CLOSING + synchronized (stateLock) { + assert state < ST_CLOSING; + blocking = isBlocking(); + connected = (state == ST_CONNECTED); + state = ST_CLOSING; + } + + // wait for any outstanding I/O operations to complete + if (blocking) { + synchronized (stateLock) { + assert state == ST_CLOSING; + long reader = readerThread; + long writer = writerThread; + if (reader != 0 || writer != 0) { + nd.preClose(fd); + connected = false; // fd is no longer connected socket + + if (reader != 0) + NativeThread.signal(reader); + if (writer != 0) + NativeThread.signal(writer); + + // wait for blocking I/O operations to end + while (readerThread != 0 || writerThread != 0) { + try { + stateLock.wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + } + } + } else { + // non-blocking mode: wait for read/write to complete + readLock.lock(); + try { + writeLock.lock(); + writeLock.unlock(); + } finally { + readLock.unlock(); + } + } + + // set state to ST_KILLPENDING + synchronized (stateLock) { + assert state == ST_CLOSING; + // if connected, and the channel is registered with a Selector, we + // shutdown the output so that the peer reads EOF + if (connected && isRegistered()) { + try { + Net.shutdown(fd, Net.SHUT_WR); + } catch (IOException ignore) { } + } + state = ST_KILLPENDING; + } + + // close socket if not registered with Selector + if (!isRegistered()) + kill(); + + // restore interrupt status + if (interrupted) + Thread.currentThread().interrupt(); + } + + @Override + public void kill() throws IOException { + synchronized (stateLock) { + if (state == ST_KILLPENDING) { + state = ST_KILLED; + nd.close(fd); + } + } + } + @Override public SocketChannel shutdownInput() throws IOException { synchronized (stateLock) { ! ensureOpen(); if (!isConnected()) throw new NotYetConnectedException(); ! if (!isInputClosed) { Net.shutdown(fd, Net.SHUT_RD); ! long thread = readerThread; ! if (thread != 0) ! NativeThread.signal(thread); ! isInputClosed = true; } return this; } } @Override public SocketChannel shutdownOutput() throws IOException { synchronized (stateLock) { ! ensureOpen(); if (!isConnected()) throw new NotYetConnectedException(); ! if (!isOutputClosed) { Net.shutdown(fd, Net.SHUT_WR); ! long thread = writerThread; ! if (thread != 0) ! NativeThread.signal(thread); ! isOutputClosed = true; } return this; } } ! boolean isInputOpen() { ! return !isInputClosed; } ! boolean isOutputOpen() { ! return !isOutputClosed; } ! /** ! * Poll this channel's socket for reading up to the given timeout. ! * @return {@code true} if the socket is polled ! */ ! boolean pollRead(long timeout) throws IOException { ! boolean blocking = isBlocking(); ! assert Thread.holdsLock(blockingLock()) && blocking; ! readLock.lock(); ! try { ! boolean polled = false; ! try { ! beginRead(blocking); ! int n = Net.poll(fd, Net.POLLIN, timeout); ! polled = (n > 0); ! } finally { ! endRead(blocking, polled); } + return polled; + } finally { + readLock.unlock(); } } ! /** ! * Poll this channel's socket for a connection, up to the given timeout. ! * @return {@code true} if the socket is polled ! */ ! boolean pollConnected(long timeout) throws IOException { ! boolean blocking = isBlocking(); ! assert Thread.holdsLock(blockingLock()) && blocking; ! ! readLock.lock(); ! try { ! writeLock.lock(); ! try { ! boolean polled = false; ! try { ! beginFinishConnect(blocking); ! int n = Net.poll(fd, Net.POLLCONN, timeout); ! polled = (n > 0); ! } finally { ! endFinishConnect(blocking, polled); ! } ! return polled; ! } finally { ! writeLock.unlock(); } + } finally { + readLock.unlock(); } } /** * Translates native poll revent ops into a ready operation ops
*** 954,964 **** (state == ST_CONNECTED)) newOps |= SelectionKey.OP_READ; if (((ops & Net.POLLCONN) != 0) && ((intOps & SelectionKey.OP_CONNECT) != 0) && ! ((state == ST_UNCONNECTED) || (state == ST_PENDING))) { newOps |= SelectionKey.OP_CONNECT; } if (((ops & Net.POLLOUT) != 0) && ((intOps & SelectionKey.OP_WRITE) != 0) && --- 999,1009 ---- (state == ST_CONNECTED)) newOps |= SelectionKey.OP_READ; if (((ops & Net.POLLCONN) != 0) && ((intOps & SelectionKey.OP_CONNECT) != 0) && ! ((state == ST_UNCONNECTED) || (state == ST_CONNECTIONPENDING))) { newOps |= SelectionKey.OP_CONNECT; } if (((ops & Net.POLLOUT) != 0) && ((intOps & SelectionKey.OP_WRITE) != 0) &&
*** 975,1009 **** public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { return translateReadyOps(ops, 0, sk); } - // package-private - int poll(int events, long timeout) throws IOException { - assert Thread.holdsLock(blockingLock()) && !isBlocking(); - - readLock.lock(); - try { - int n = 0; - try { - begin(); - synchronized (stateLock) { - if (!isOpen()) - return 0; - readerThread = NativeThread.current(); - } - n = Net.poll(fd, events, timeout); - } finally { - readerCleanup(); - end(n > 0); - } - return n; - } finally { - readLock.unlock(); - } - } - /** * Translates an interest operation set into a native poll event set */ public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { int newOps = 0; --- 1020,1029 ----
*** 1035,1052 **** synchronized (stateLock) { switch (state) { case ST_UNCONNECTED: sb.append("unconnected"); break; ! case ST_PENDING: sb.append("connection-pending"); break; case ST_CONNECTED: sb.append("connected"); ! if (!isInputOpen) sb.append(" ishut"); ! if (!isOutputOpen) sb.append(" oshut"); break; } InetSocketAddress addr = localAddress(); if (addr != null) { --- 1055,1072 ---- synchronized (stateLock) { switch (state) { case ST_UNCONNECTED: sb.append("unconnected"); break; ! case ST_CONNECTIONPENDING: sb.append("connection-pending"); break; case ST_CONNECTED: sb.append("connected"); ! if (isInputClosed) sb.append(" ishut"); ! if (isOutputClosed) sb.append(" oshut"); break; } InetSocketAddress addr = localAddress(); if (addr != null) {
< prev index next >