< prev index next >
src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
Print this page
rev 48993 : imported patch nio
*** 46,55 ****
--- 46,56 ----
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashSet;
+ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import sun.net.NetHooks;
import sun.net.ext.ExtendedSocketOptions;
*** 60,113 ****
class SocketChannelImpl
extends SocketChannel
implements SelChImpl
{
-
// Used to make native read and write calls
private static NativeDispatcher nd;
// Our file descriptor object
private final FileDescriptor fd;
private final int fdVal;
- // IDs of native threads doing reads and writes, for signalling
- private volatile long readerThread;
- private volatile long writerThread;
-
// Lock held by current reading or connecting thread
private final ReentrantLock readLock = new ReentrantLock();
// Lock held by current writing or connecting thread
private final ReentrantLock writeLock = new ReentrantLock();
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
private final Object stateLock = new Object();
// -- The following fields are protected by stateLock
// set true when exclusive binding is on and SO_REUSEADDR is emulated
private boolean isReuseAddress;
// State, increases monotonically
- private static final int ST_UNINITIALIZED = -1;
private static final int ST_UNCONNECTED = 0;
! private static final int ST_PENDING = 1;
private static final int ST_CONNECTED = 2;
! private static final int ST_KILLPENDING = 3;
! private static final int ST_KILLED = 4;
! private int state = ST_UNINITIALIZED;
// Binding
private InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
- // Input/Output open
- private boolean isInputOpen = true;
- private boolean isOutputOpen = true;
-
// Socket adaptor, created on demand
private Socket socket;
// -- End of fields protected by stateLock
--- 61,113 ----
class SocketChannelImpl
extends SocketChannel
implements SelChImpl
{
// Used to make native read and write calls
private static NativeDispatcher nd;
// Our file descriptor object
private final FileDescriptor fd;
private final int fdVal;
// Lock held by current reading or connecting thread
private final ReentrantLock readLock = new ReentrantLock();
// Lock held by current writing or connecting thread
private final ReentrantLock writeLock = new ReentrantLock();
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
private final Object stateLock = new Object();
+ // Input/Output closed
+ private volatile boolean isInputClosed;
+ private volatile boolean isOutputClosed;
+
// -- The following fields are protected by stateLock
// set true when exclusive binding is on and SO_REUSEADDR is emulated
private boolean isReuseAddress;
// State, increases monotonically
private static final int ST_UNCONNECTED = 0;
! private static final int ST_CONNECTIONPENDING = 1;
private static final int ST_CONNECTED = 2;
! private static final int ST_CLOSING = 3;
! private static final int ST_KILLPENDING = 4;
! private static final int ST_KILLED = 5;
! private int state;
!
! // IDs of native threads doing reads and writes, for signalling
! private long readerThread;
! private long writerThread;
// Binding
private InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
// Socket adaptor, created on demand
private Socket socket;
// -- End of fields protected by stateLock
*** 116,155 ****
//
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_UNCONNECTED;
}
! SocketChannelImpl(SelectorProvider sp,
! FileDescriptor fd,
! boolean bound)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
! this.state = ST_UNCONNECTED;
! if (bound)
this.localAddress = Net.localAddress(fd);
}
// Constructor for sockets obtained from server sockets
//
! SocketChannelImpl(SelectorProvider sp,
! FileDescriptor fd, InetSocketAddress remote)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
! this.state = ST_CONNECTED;
this.localAddress = Net.localAddress(fd);
! this.remoteAddress = remote;
}
public Socket socket() {
synchronized (stateLock) {
if (socket == null)
socket = SocketAdaptor.create(this);
return socket;
--- 116,162 ----
//
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.socket(true);
this.fdVal = IOUtil.fdVal(fd);
}
! SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
! if (bound) {
! synchronized (stateLock) {
this.localAddress = Net.localAddress(fd);
}
+ }
+ }
// Constructor for sockets obtained from server sockets
//
! SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
! synchronized (stateLock) {
this.localAddress = Net.localAddress(fd);
! this.remoteAddress = isa;
! this.state = ST_CONNECTED;
! }
! }
!
! // @throws ClosedChannelException if channel is closed
! private void ensureOpen() throws ClosedChannelException {
! if (!isOpen())
! throw new ClosedChannelException();
}
+ @Override
public Socket socket() {
synchronized (stateLock) {
if (socket == null)
socket = SocketAdaptor.create(this);
return socket;
*** 157,193 ****
}
@Override
public SocketAddress getLocalAddress() throws IOException {
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
return Net.getRevealedLocalAddress(localAddress);
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
return remoteAddress;
}
}
@Override
public <T> SocketChannel setOption(SocketOption<T> name, T value)
throws IOException
{
! if (name == null)
! throw new NullPointerException();
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
if (name == StandardSocketOptions.IP_TOS) {
ProtocolFamily family = Net.isIPv6Available() ?
StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
Net.setSocketOption(fd, family, name, value);
--- 164,196 ----
}
@Override
public SocketAddress getLocalAddress() throws IOException {
synchronized (stateLock) {
! ensureOpen();
return Net.getRevealedLocalAddress(localAddress);
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
synchronized (stateLock) {
! ensureOpen();
return remoteAddress;
}
}
@Override
public <T> SocketChannel setOption(SocketOption<T> name, T value)
throws IOException
{
! Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
! ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
ProtocolFamily family = Net.isIPv6Available() ?
StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
Net.setSocketOption(fd, family, name, value);
*** 209,230 ****
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name)
throws IOException
{
! if (name == null)
! throw new NullPointerException();
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
! if (name == StandardSocketOptions.SO_REUSEADDR &&
! Net.useExclusiveBind())
! {
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
// special handling for IP_TOS: always return 0 when IPv6
--- 212,229 ----
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(SocketOption<T> name)
throws IOException
{
! Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
! ensureOpen();
! if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
// special handling for IP_TOS: always return 0 when IPv6
*** 241,251 ****
private static class DefaultOptionsHolder {
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
! HashSet<SocketOption<?>> set = new HashSet<>(8);
set.add(StandardSocketOptions.SO_SNDBUF);
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_KEEPALIVE);
set.add(StandardSocketOptions.SO_REUSEADDR);
if (Net.isReusePortAvailable()) {
--- 240,250 ----
private static class DefaultOptionsHolder {
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
! HashSet<SocketOption<?>> set = new HashSet<>();
set.add(StandardSocketOptions.SO_SNDBUF);
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_KEEPALIVE);
set.add(StandardSocketOptions.SO_REUSEADDR);
if (Net.isReusePortAvailable()) {
*** 254,598 ****
set.add(StandardSocketOptions.SO_LINGER);
set.add(StandardSocketOptions.TCP_NODELAY);
// additional options required by socket adaptor
set.add(StandardSocketOptions.IP_TOS);
set.add(ExtendedSocketOption.SO_OOBINLINE);
! ExtendedSocketOptions extendedOptions =
! ExtendedSocketOptions.getInstance();
! set.addAll(extendedOptions.options());
return Collections.unmodifiableSet(set);
}
}
@Override
public final Set<SocketOption<?>> supportedOptions() {
return DefaultOptionsHolder.defaultOptions;
}
! private boolean ensureReadOpen() throws ClosedChannelException {
! synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
! if (!isConnected())
! throw new NotYetConnectedException();
! if (!isInputOpen)
! return false;
! else
! return true;
! }
}
-
- private void ensureWriteOpen() throws ClosedChannelException {
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
! if (!isOutputOpen)
! throw new ClosedChannelException();
! if (!isConnected())
throw new NotYetConnectedException();
}
}
! private void readerCleanup() throws IOException {
synchronized (stateLock) {
readerThread = 0;
! if (state == ST_KILLPENDING)
! kill();
}
}
!
! private void writerCleanup() throws IOException {
! synchronized (stateLock) {
! writerThread = 0;
! if (state == ST_KILLPENDING)
! kill();
}
}
public int read(ByteBuffer buf) throws IOException {
!
! if (buf == null)
! throw new NullPointerException();
readLock.lock();
try {
! if (!ensureReadOpen())
! return -1;
int n = 0;
try {
! // Set up the interruption machinery; see
! // AbstractInterruptibleChannel for details
! //
! begin();
!
! synchronized (stateLock) {
! if (!isOpen()) {
! // Either the current thread is already interrupted, so
! // begin() closed the channel, or another thread closed the
! // channel since we checked it a few bytecodes ago. In
! // either case the value returned here is irrelevant since
! // the invocation of end() in the finally block will throw
! // an appropriate exception.
! //
! return 0;
!
! }
!
! // Save this thread so that it can be signalled on those
! // platforms that require it
! //
! readerThread = NativeThread.current();
! }
! // Between the previous test of isOpen() and the return of the
! // IOUtil.read invocation below, this channel might be closed
! // or this thread might be interrupted. We rely upon the
! // implicit synchronization point in the kernel read() call to
! // make sure that the right thing happens. In either case the
! // implCloseSelectableChannel method is ultimately invoked in
! // some other thread, so there are three possibilities:
! //
! // - implCloseSelectableChannel() invokes nd.preClose()
! // before this thread invokes read(), in which case the
! // read returns immediately with either EOF or an error,
! // the latter of which will cause an IOException to be
! // thrown.
! //
! // - implCloseSelectableChannel() invokes nd.preClose() after
! // this thread is blocked in read(). On some operating
! // systems (e.g., Solaris and Windows) this causes the read
! // to return immediately with either EOF or an error
! // indication.
! //
! // - implCloseSelectableChannel() invokes nd.preClose() after
! // this thread is blocked in read() but the operating
! // system (e.g., Linux) doesn't support preemptive close,
! // so implCloseSelectableChannel() proceeds to signal this
! // thread, thereby causing the read to return immediately
! // with IOStatus.INTERRUPTED.
! //
! // In all three cases the invocation of end() in the finally
! // clause will notice that the channel has been closed and
! // throw an appropriate exception (AsynchronousCloseException
! // or ClosedByInterruptException) if necessary.
! //
! // *There is A fourth possibility. implCloseSelectableChannel()
! // invokes nd.preClose(), signals reader/writer thred and quickly
! // moves on to nd.close() in kill(), which does a real close.
! // Then a third thread accepts a new connection, opens file or
! // whatever that causes the released "fd" to be recycled. All
! // above happens just between our last isOpen() check and the
! // next kernel read reached, with the recycled "fd". The solution
! // is to postpone the real kill() if there is a reader or/and
! // writer thread(s) over there "waiting", leave the cleanup/kill
! // to the reader or writer thread. (the preClose() still happens
! // so the connection gets cut off as usual).
! //
! // For socket channels there is the additional wrinkle that
! // asynchronous shutdown works much like asynchronous close,
! // except that the channel is shutdown rather than completely
! // closed. This is analogous to the first two cases above,
! // except that the shutdown operation plays the role of
! // nd.preClose().
! for (;;) {
n = IOUtil.read(fd, buf, -1, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen()) {
- // The system call was interrupted but the channel
- // is still open, so retry
- continue;
- }
- return IOStatus.normalize(n);
}
-
} finally {
! readerCleanup(); // Clear reader thread
! // The end method, which is defined in our superclass
! // AbstractInterruptibleChannel, resets the interruption
! // machinery. If its argument is true then it returns
! // normally; otherwise it checks the interrupt and open state
! // of this channel and throws an appropriate exception if
! // necessary.
! //
! // So, if we actually managed to do any I/O in the above try
! // block then we pass true to the end method. We also pass
! // true if the channel was in non-blocking mode when the I/O
! // operation was initiated but no data could be transferred;
! // this prevents spurious exceptions from being thrown in the
! // rare event that a channel is closed or a thread is
! // interrupted at the exact moment that a non-blocking I/O
! // request is made.
! //
! end(n > 0 || (n == IOStatus.UNAVAILABLE));
!
! // Extra case for socket channels: Asynchronous shutdown
! //
! synchronized (stateLock) {
! if ((n <= 0) && (!isInputOpen))
return IOStatus.EOF;
}
!
! assert IOStatus.check(n);
!
! }
} finally {
readLock.unlock();
}
}
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
! if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
! throw new IndexOutOfBoundsException();
readLock.lock();
try {
! if (!ensureReadOpen())
! return -1;
long n = 0;
try {
! begin();
! synchronized (stateLock) {
! if (!isOpen())
! return 0;
! readerThread = NativeThread.current();
! }
! for (;;) {
n = IOUtil.read(fd, dsts, offset, length, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
}
} finally {
! readerCleanup();
! end(n > 0 || (n == IOStatus.UNAVAILABLE));
! synchronized (stateLock) {
! if ((n <= 0) && (!isInputOpen))
return IOStatus.EOF;
}
! assert IOStatus.check(n);
! }
} finally {
readLock.unlock();
}
}
public int write(ByteBuffer buf) throws IOException {
! if (buf == null)
! throw new NullPointerException();
writeLock.lock();
try {
! ensureWriteOpen();
int n = 0;
try {
! begin();
! synchronized (stateLock) {
! if (!isOpen())
! return 0;
! writerThread = NativeThread.current();
! }
! for (;;) {
n = IOUtil.write(fd, buf, -1, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
}
} finally {
! writerCleanup();
! end(n > 0 || (n == IOStatus.UNAVAILABLE));
! synchronized (stateLock) {
! if ((n <= 0) && (!isOutputOpen))
throw new AsynchronousCloseException();
}
! assert IOStatus.check(n);
! }
} finally {
writeLock.unlock();
}
}
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
! if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
! throw new IndexOutOfBoundsException();
writeLock.lock();
try {
! ensureWriteOpen();
long n = 0;
try {
! begin();
! synchronized (stateLock) {
! if (!isOpen())
! return 0;
! writerThread = NativeThread.current();
! }
! for (;;) {
n = IOUtil.write(fd, srcs, offset, length, nd);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
}
} finally {
! writerCleanup();
! end((n > 0) || (n == IOStatus.UNAVAILABLE));
! synchronized (stateLock) {
! if ((n <= 0) && (!isOutputOpen))
throw new AsynchronousCloseException();
}
! assert IOStatus.check(n);
! }
} finally {
writeLock.unlock();
}
}
! // package-private
int sendOutOfBandData(byte b) throws IOException {
writeLock.lock();
try {
! ensureWriteOpen();
int n = 0;
try {
! begin();
! synchronized (stateLock) {
! if (!isOpen())
! return 0;
! writerThread = NativeThread.current();
! }
! for (;;) {
n = sendOutOfBandData(fd, b);
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- return IOStatus.normalize(n);
}
} finally {
! writerCleanup();
! end((n > 0) || (n == IOStatus.UNAVAILABLE));
! synchronized (stateLock) {
! if ((n <= 0) && (!isOutputOpen))
throw new AsynchronousCloseException();
}
! assert IOStatus.check(n);
! }
} finally {
writeLock.unlock();
}
}
protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block);
}
! public InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
}
}
! public SocketAddress remoteAddress() {
synchronized (stateLock) {
return remoteAddress;
}
}
--- 253,543 ----
set.add(StandardSocketOptions.SO_LINGER);
set.add(StandardSocketOptions.TCP_NODELAY);
// additional options required by socket adaptor
set.add(StandardSocketOptions.IP_TOS);
set.add(ExtendedSocketOption.SO_OOBINLINE);
! set.addAll(ExtendedSocketOptions.getInstance().options());
return Collections.unmodifiableSet(set);
}
}
@Override
public final Set<SocketOption<?>> supportedOptions() {
return DefaultOptionsHolder.defaultOptions;
}
! /**
! * Marks the beginning of a read operation that might block.
! *
! * @throws ClosedChannelException if the channel is closed
! * @throws NotYetConnectedException if the channel is not yet connected
! */
! private void beginRead(boolean blocking) throws ClosedChannelException {
! if (blocking) {
! // set hook for Thread.interrupt
! begin();
}
synchronized (stateLock) {
! ensureOpen();
! if (state != ST_CONNECTED)
throw new NotYetConnectedException();
+ if (blocking)
+ readerThread = NativeThread.current();
}
}
! /**
! * Marks the end of a read operation that may have blocked.
! *
! * @throws AsynchronousCloseException if the channel was closed due to this
! * thread being interrupted on a blocking read operation.
! */
! private void endRead(boolean blocking, boolean completed)
! throws AsynchronousCloseException
! {
! if (blocking) {
synchronized (stateLock) {
readerThread = 0;
! // notify any thread waiting in implCloseSelectableChannel
! if (state == ST_CLOSING) {
! stateLock.notifyAll();
}
}
! // remove hook for Thread.interrupt
! end(completed);
}
}
+ @Override
public int read(ByteBuffer buf) throws IOException {
! Objects.requireNonNull(buf);
readLock.lock();
try {
! boolean blocking = isBlocking();
int n = 0;
try {
+ beginRead(blocking);
! // check if input is shutdown
! if (isInputClosed)
! return IOStatus.EOF;
! if (blocking) {
! do {
! n = IOUtil.read(fd, buf, -1, nd);
! } while (n == IOStatus.INTERRUPTED && isOpen());
! } else {
n = IOUtil.read(fd, buf, -1, nd);
}
} finally {
! endRead(blocking, n > 0);
! if (n <= 0 && isInputClosed)
return IOStatus.EOF;
}
! return IOStatus.normalize(n);
} finally {
readLock.unlock();
}
}
+ @Override
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
! Objects.checkFromIndexSize(offset, length, dsts.length);
!
readLock.lock();
try {
! boolean blocking = isBlocking();
long n = 0;
try {
! beginRead(blocking);
!
! // check if input is shutdown
! if (isInputClosed)
! return IOStatus.EOF;
! if (blocking) {
! do {
! n = IOUtil.read(fd, dsts, offset, length, nd);
! } while (n == IOStatus.INTERRUPTED && isOpen());
! } else {
n = IOUtil.read(fd, dsts, offset, length, nd);
}
} finally {
! endRead(blocking, n > 0);
! if (n <= 0 && isInputClosed)
return IOStatus.EOF;
}
! return IOStatus.normalize(n);
} finally {
readLock.unlock();
}
}
+ /**
+ * Marks the beginning of a write operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed or output shutdown
+ * @throws NotYetConnectedException if the channel is not yet connected
+ */
+ private void beginWrite(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
+ synchronized (stateLock) {
+ ensureOpen();
+ if (isOutputClosed)
+ throw new ClosedChannelException();
+ if (state != ST_CONNECTED)
+ throw new NotYetConnectedException();
+ if (blocking)
+ writerThread = NativeThread.current();
+ }
+ }
+
+ /**
+ * Marks the end of a write operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking write operation.
+ */
+ private void endWrite(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ if (blocking) {
+ synchronized (stateLock) {
+ writerThread = 0;
+ // notify any thread waiting in implCloseSelectableChannel
+ if (state == ST_CLOSING) {
+ stateLock.notifyAll();
+ }
+ }
+ // remove hook for Thread.interrupt
+ end(completed);
+ }
+ }
+
+ @Override
public int write(ByteBuffer buf) throws IOException {
! Objects.requireNonNull(buf);
!
writeLock.lock();
try {
! boolean blocking = isBlocking();
int n = 0;
try {
! beginWrite(blocking);
! if (blocking) {
! do {
! n = IOUtil.write(fd, buf, -1, nd);
! } while (n == IOStatus.INTERRUPTED && isOpen());
! } else {
n = IOUtil.write(fd, buf, -1, nd);
}
} finally {
! endWrite(blocking, n > 0);
! if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();
}
! return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
+ @Override
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
! Objects.checkFromIndexSize(offset, length, srcs.length);
!
writeLock.lock();
try {
! boolean blocking = isBlocking();
long n = 0;
try {
! beginWrite(blocking);
! if (blocking) {
! do {
! n = IOUtil.write(fd, srcs, offset, length, nd);
! } while (n == IOStatus.INTERRUPTED && isOpen());
! } else {
n = IOUtil.write(fd, srcs, offset, length, nd);
}
} finally {
! endWrite(blocking, n > 0);
! if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();
}
! return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
! /**
! * Writes a byte of out of band data.
! */
int sendOutOfBandData(byte b) throws IOException {
writeLock.lock();
try {
! boolean blocking = isBlocking();
int n = 0;
try {
! beginWrite(blocking);
! if (blocking) {
! do {
! n = sendOutOfBandData(fd, b);
! } while (n == IOStatus.INTERRUPTED && isOpen());
! } else {
n = sendOutOfBandData(fd, b);
}
} finally {
! endWrite(blocking, n > 0);
! if (n <= 0 && isOutputClosed)
throw new AsynchronousCloseException();
}
! return IOStatus.normalize(n);
} finally {
writeLock.unlock();
}
}
+ @Override
protected void implConfigureBlocking(boolean block) throws IOException {
+ readLock.lock();
+ try {
+ writeLock.lock();
+ try {
+ synchronized (stateLock) {
+ ensureOpen();
IOUtil.configureBlocking(fd, block);
}
+ } finally {
+ writeLock.unlock();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
! /**
! * Returns the local address, or null if not bound
! */
! InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
}
}
! /**
! * Returns the remote address, or null if not connected
! */
! InetSocketAddress remoteAddress() {
synchronized (stateLock) {
return remoteAddress;
}
}
*** 601,613 ****
readLock.lock();
try {
writeLock.lock();
try {
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
! if (state == ST_PENDING)
throw new ConnectionPendingException();
if (localAddress != null)
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null) ?
new InetSocketAddress(0) : Net.checkAddress(local);
--- 546,557 ----
readLock.lock();
try {
writeLock.lock();
try {
synchronized (stateLock) {
! ensureOpen();
! if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
if (localAddress != null)
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null) ?
new InetSocketAddress(0) : Net.checkAddress(local);
*** 626,931 ****
readLock.unlock();
}
return this;
}
public boolean isConnected() {
synchronized (stateLock) {
return (state == ST_CONNECTED);
}
}
public boolean isConnectionPending() {
synchronized (stateLock) {
! return (state == ST_PENDING);
}
}
! void ensureOpenAndUnconnected() throws IOException { // package-private
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
! if (state == ST_PENDING)
throw new ConnectionPendingException();
}
}
public boolean connect(SocketAddress sa) throws IOException {
- readLock.lock();
- try {
- writeLock.lock();
- try {
- ensureOpenAndUnconnected();
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
! sm.checkConnect(isa.getAddress().getHostAddress(),
! isa.getPort());
! synchronized (blockingLock()) {
! int n = 0;
try {
try {
! begin();
synchronized (stateLock) {
! if (!isOpen()) {
! return false;
! }
! // notify hook only if unbound
! if (localAddress == null) {
! NetHooks.beforeTcpConnect(fd,
! isa.getAddress(),
! isa.getPort());
}
- readerThread = NativeThread.current();
}
! for (;;) {
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
- n = Net.connect(fd,
- ia,
- isa.getPort());
- if ((n == IOStatus.INTERRUPTED) && isOpen())
- continue;
- break;
- }
} finally {
! readerCleanup();
! end((n > 0) || (n == IOStatus.UNAVAILABLE));
! assert IOStatus.check(n);
}
} catch (IOException x) {
! // If an exception was thrown, close the channel after
! // invoking end() so as to avoid bogus
! // AsynchronousCloseExceptions
close();
throw x;
}
synchronized (stateLock) {
remoteAddress = isa;
if (n > 0) {
!
! // Connection succeeded; disallow further
! // invocation
! state = ST_CONNECTED;
! if (isOpen())
localAddress = Net.localAddress(fd);
return true;
! }
! // If nonblocking and no exception then connection
! // pending; disallow another invocation
! if (!isBlocking())
! state = ST_PENDING;
! else
! assert false;
}
}
- return false;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
}
public boolean finishConnect() throws IOException {
readLock.lock();
try {
writeLock.lock();
try {
synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
if (state == ST_CONNECTED)
return true;
- if (state != ST_PENDING)
- throw new NoConnectionPendingException();
}
int n = 0;
try {
try {
! begin();
! synchronized (blockingLock()) {
! synchronized (stateLock) {
! if (!isOpen()) {
! return false;
! }
! readerThread = NativeThread.current();
! }
! if (!isBlocking()) {
! for (;;) {
! n = checkConnect(fd, false);
! if ((n == IOStatus.INTERRUPTED) && isOpen())
! continue;
! break;
! }
! } else {
! for (;;) {
n = checkConnect(fd, true);
! if (n == 0) {
! // Loop in case of
! // spurious notifications
! continue;
! }
! if ((n == IOStatus.INTERRUPTED) && isOpen())
! continue;
! break;
! }
! }
}
} finally {
! synchronized (stateLock) {
! readerThread = 0;
! if (state == ST_KILLPENDING) {
! kill();
! // poll()/getsockopt() does not report
! // error (throws exception, with n = 0)
! // on Linux platform after dup2 and
! // signal-wakeup. Force n to 0 so the
! // end() can throw appropriate exception
! n = 0;
! }
! }
! end((n > 0) || (n == IOStatus.UNAVAILABLE));
! assert IOStatus.check(n);
}
} catch (IOException x) {
- // If an exception was thrown, close the channel after
- // invoking end() so as to avoid bogus
- // AsynchronousCloseExceptions
close();
throw x;
}
! if (n > 0) {
synchronized (stateLock) {
! state = ST_CONNECTED;
! if (isOpen())
localAddress = Net.localAddress(fd);
! }
return true;
! }
return false;
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
}
@Override
public SocketChannel shutdownInput() throws IOException {
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
if (!isConnected())
throw new NotYetConnectedException();
! if (isInputOpen) {
Net.shutdown(fd, Net.SHUT_RD);
! if (readerThread != 0)
! NativeThread.signal(readerThread);
! isInputOpen = false;
}
return this;
}
}
@Override
public SocketChannel shutdownOutput() throws IOException {
synchronized (stateLock) {
! if (!isOpen())
! throw new ClosedChannelException();
if (!isConnected())
throw new NotYetConnectedException();
! if (isOutputOpen) {
Net.shutdown(fd, Net.SHUT_WR);
! if (writerThread != 0)
! NativeThread.signal(writerThread);
! isOutputOpen = false;
}
return this;
}
}
! public boolean isInputOpen() {
! synchronized (stateLock) {
! return isInputOpen;
! }
}
! public boolean isOutputOpen() {
! synchronized (stateLock) {
! return isOutputOpen;
! }
}
! // AbstractInterruptibleChannel synchronizes invocations of this method
! // using AbstractInterruptibleChannel.closeLock, and also ensures that this
! // method is only ever invoked once. Before we get to this method, isOpen
! // (which is volatile) will have been set to false.
! //
! protected void implCloseSelectableChannel() throws IOException {
! synchronized (stateLock) {
! isInputOpen = false;
! isOutputOpen = false;
!
! // Close the underlying file descriptor and dup it to a known fd
! // that's already closed. This prevents other operations on this
! // channel from using the old fd, which might be recycled in the
! // meantime and allocated to an entirely different channel.
! //
! if (state != ST_KILLED)
! nd.preClose(fd);
!
! // Signal native threads, if needed. If a target thread is not
! // currently blocked in an I/O operation then no harm is done since
! // the signal handler doesn't actually do anything.
! //
! if (readerThread != 0)
! NativeThread.signal(readerThread);
!
! if (writerThread != 0)
! NativeThread.signal(writerThread);
! // If this channel is not registered then it's safe to close the fd
! // immediately since we know at this point that no thread is
! // blocked in an I/O operation upon the channel and, since the
! // channel is marked closed, no thread will start another such
! // operation. If this channel is registered then we don't close
! // the fd since it might be in use by a selector. In that case
! // closing this channel caused its keys to be cancelled, so the
! // last selector to deregister a key for this channel will invoke
! // kill() to close the fd.
! //
! if (!isRegistered())
! kill();
}
}
-
- public void kill() throws IOException {
- synchronized (stateLock) {
- if (state == ST_KILLED)
- return;
- if (state == ST_UNINITIALIZED) {
- state = ST_KILLED;
- return;
}
- assert !isOpen() && !isRegistered();
! // Postpone the kill if there is a waiting reader
! // or writer thread. See the comments in read() for
! // more detailed explanation.
! if (readerThread == 0 && writerThread == 0) {
! nd.close(fd);
! state = ST_KILLED;
! } else {
! state = ST_KILLPENDING;
}
}
}
/**
* Translates native poll revent ops into a ready operation ops
--- 570,976 ----
readLock.unlock();
}
return this;
}
+ @Override
public boolean isConnected() {
synchronized (stateLock) {
return (state == ST_CONNECTED);
}
}
+ @Override
public boolean isConnectionPending() {
synchronized (stateLock) {
! return (state == ST_CONNECTIONPENDING);
}
}
! /**
! * Marks the beginning of a connect operation that might block.
! *
! * @throws ClosedChannelException if the channel is closed
! * @throws AlreadyConnectedException if already connected
! * @throws ConnectionPendingException is a connection is pending
! */
! private void beginConnect(boolean blocking) throws ClosedChannelException {
! if (blocking) {
! // set hook for Thread.interrupt
! begin();
! }
synchronized (stateLock) {
! ensureOpen();
if (state == ST_CONNECTED)
throw new AlreadyConnectedException();
! if (state == ST_CONNECTIONPENDING)
throw new ConnectionPendingException();
+ if (blocking)
+ readerThread = NativeThread.current();
+ }
}
+
+ /**
+ * Marks the end of a connect operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking connect operation.
+ */
+ private void endConnect(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ endRead(blocking, completed);
}
+ @Override
public boolean connect(SocketAddress sa) throws IOException {
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
! sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
!
! readLock.lock();
try {
+ writeLock.lock();
try {
! // notify before-connect hook
synchronized (stateLock) {
! if (state == ST_UNCONNECTED && localAddress == null) {
! NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
}
}
!
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
+ int n = 0;
+ boolean blocking = isBlocking();
+ try {
+ try {
+ beginConnect(blocking);
+ if (blocking) {
+ do {
+ n = Net.connect(fd, ia, isa.getPort());
+ } while (n == IOStatus.INTERRUPTED && isOpen());
+ } else {
+ n = Net.connect(fd, ia, isa.getPort());
+ }
} finally {
! endConnect(blocking, n > 0);
}
} catch (IOException x) {
! // connect failed, close socket
close();
throw x;
}
+
+ // connection may be established
synchronized (stateLock) {
+ if (!isOpen())
+ throw new AsynchronousCloseException();
remoteAddress = isa;
if (n > 0) {
! // connected established
localAddress = Net.localAddress(fd);
+ state = ST_CONNECTED;
return true;
! } else {
! // connection pending
! assert !blocking;
! state = ST_CONNECTIONPENDING;
! return false;
}
}
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
}
+ /**
+ * Marks the beginning of a finishConnect operation that might block.
+ *
+ * @throws ClosedChannelException if the channel is closed
+ * @throws NoConnectionPendingException if no connection is pending
+ */
+ private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
+ if (blocking) {
+ // set hook for Thread.interrupt
+ begin();
+ }
+ synchronized (stateLock) {
+ ensureOpen();
+ if (state != ST_CONNECTIONPENDING)
+ throw new NoConnectionPendingException();
+ if (blocking)
+ readerThread = NativeThread.current();
+ }
+ }
+
+ /**
+ * Marks the end of a finishConnect operation that may have blocked.
+ *
+ * @throws AsynchronousCloseException if the channel was closed due to this
+ * thread being interrupted on a blocking connect operation.
+ */
+ private void endFinishConnect(boolean blocking, boolean completed)
+ throws AsynchronousCloseException
+ {
+ endRead(blocking, completed);
+ }
+
+ @Override
public boolean finishConnect() throws IOException {
readLock.lock();
try {
writeLock.lock();
try {
+ // already connected?
synchronized (stateLock) {
if (state == ST_CONNECTED)
return true;
}
+
int n = 0;
+ boolean blocking = isBlocking();
try {
try {
! beginFinishConnect(blocking);
! if (blocking) {
! do {
n = checkConnect(fd, true);
! } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen());
! } else {
! n = checkConnect(fd, false);
}
} finally {
! endFinishConnect(blocking, n > 0);
}
} catch (IOException x) {
close();
throw x;
}
!
! // post finishConnect, connection may be established
synchronized (stateLock) {
! if (!isOpen())
! throw new AsynchronousCloseException();
! if (n > 0) {
! // connection established
localAddress = Net.localAddress(fd);
! state = ST_CONNECTED;
return true;
! } else {
! // connection still pending
! assert !blocking;
return false;
+ }
+ }
} finally {
writeLock.unlock();
}
} finally {
readLock.unlock();
}
}
+ /**
+ * Invoked by implCloseChannel to close the channel.
+ *
+ * This method waits for outstanding I/O operations to complete. When in
+ * blocking mode, the socket is pre-closed and the threads in blocking I/O
+ * operations are signalled to ensure that the outstanding I/O operations
+ * complete quickly.
+ *
+ * If the socket is connected then it is shutdown by this method. The
+ * shutdown ensures that the peer reads EOF for the case that the socket is
+ * not pre-closed or closed by this method.
+ *
+ * The socket is closed by this method when it is not registered with a
+ * Selector. Note that a channel configured blocking may be registered with
+ * a Selector. This arises when a key is canceled and the channel configured
+ * to blocking mode before the key is flushed from the Selector.
+ */
+ @Override
+ protected void implCloseSelectableChannel() throws IOException {
+ assert !isOpen();
+
+ boolean blocking;
+ boolean connected;
+ boolean interrupted = false;
+
+ // set state to ST_CLOSING
+ synchronized (stateLock) {
+ assert state < ST_CLOSING;
+ blocking = isBlocking();
+ connected = (state == ST_CONNECTED);
+ state = ST_CLOSING;
+ }
+
+ // wait for any outstanding I/O operations to complete
+ if (blocking) {
+ synchronized (stateLock) {
+ assert state == ST_CLOSING;
+ long reader = readerThread;
+ long writer = writerThread;
+ if (reader != 0 || writer != 0) {
+ nd.preClose(fd);
+ connected = false; // fd is no longer connected socket
+
+ if (reader != 0)
+ NativeThread.signal(reader);
+ if (writer != 0)
+ NativeThread.signal(writer);
+
+ // wait for blocking I/O operations to end
+ while (readerThread != 0 || writerThread != 0) {
+ try {
+ stateLock.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ }
+ } else {
+ // non-blocking mode: wait for read/write to complete
+ readLock.lock();
+ try {
+ writeLock.lock();
+ writeLock.unlock();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ // set state to ST_KILLPENDING
+ synchronized (stateLock) {
+ assert state == ST_CLOSING;
+ // if connected, and the channel is registered with a Selector, we
+ // shutdown the output so that the peer reads EOF
+ if (connected && isRegistered()) {
+ try {
+ Net.shutdown(fd, Net.SHUT_WR);
+ } catch (IOException ignore) { }
+ }
+ state = ST_KILLPENDING;
+ }
+
+ // close socket if not registered with Selector
+ if (!isRegistered())
+ kill();
+
+ // restore interrupt status
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ }
+
+ @Override
+ public void kill() throws IOException {
+ synchronized (stateLock) {
+ if (state == ST_KILLPENDING) {
+ state = ST_KILLED;
+ nd.close(fd);
+ }
+ }
+ }
+
@Override
public SocketChannel shutdownInput() throws IOException {
synchronized (stateLock) {
! ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
! if (!isInputClosed) {
Net.shutdown(fd, Net.SHUT_RD);
! long thread = readerThread;
! if (thread != 0)
! NativeThread.signal(thread);
! isInputClosed = true;
}
return this;
}
}
@Override
public SocketChannel shutdownOutput() throws IOException {
synchronized (stateLock) {
! ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
! if (!isOutputClosed) {
Net.shutdown(fd, Net.SHUT_WR);
! long thread = writerThread;
! if (thread != 0)
! NativeThread.signal(thread);
! isOutputClosed = true;
}
return this;
}
}
! boolean isInputOpen() {
! return !isInputClosed;
}
! boolean isOutputOpen() {
! return !isOutputClosed;
}
! /**
! * Poll this channel's socket for reading up to the given timeout.
! * @return {@code true} if the socket is polled
! */
! boolean pollRead(long timeout) throws IOException {
! boolean blocking = isBlocking();
! assert Thread.holdsLock(blockingLock()) && blocking;
! readLock.lock();
! try {
! boolean polled = false;
! try {
! beginRead(blocking);
! int n = Net.poll(fd, Net.POLLIN, timeout);
! polled = (n > 0);
! } finally {
! endRead(blocking, polled);
}
+ return polled;
+ } finally {
+ readLock.unlock();
}
}
! /**
! * Poll this channel's socket for a connection, up to the given timeout.
! * @return {@code true} if the socket is polled
! */
! boolean pollConnected(long timeout) throws IOException {
! boolean blocking = isBlocking();
! assert Thread.holdsLock(blockingLock()) && blocking;
!
! readLock.lock();
! try {
! writeLock.lock();
! try {
! boolean polled = false;
! try {
! beginFinishConnect(blocking);
! int n = Net.poll(fd, Net.POLLCONN, timeout);
! polled = (n > 0);
! } finally {
! endFinishConnect(blocking, polled);
! }
! return polled;
! } finally {
! writeLock.unlock();
}
+ } finally {
+ readLock.unlock();
}
}
/**
* Translates native poll revent ops into a ready operation ops
*** 954,964 ****
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_READ;
if (((ops & Net.POLLCONN) != 0) &&
((intOps & SelectionKey.OP_CONNECT) != 0) &&
! ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
newOps |= SelectionKey.OP_CONNECT;
}
if (((ops & Net.POLLOUT) != 0) &&
((intOps & SelectionKey.OP_WRITE) != 0) &&
--- 999,1009 ----
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_READ;
if (((ops & Net.POLLCONN) != 0) &&
((intOps & SelectionKey.OP_CONNECT) != 0) &&
! ((state == ST_UNCONNECTED) || (state == ST_CONNECTIONPENDING))) {
newOps |= SelectionKey.OP_CONNECT;
}
if (((ops & Net.POLLOUT) != 0) &&
((intOps & SelectionKey.OP_WRITE) != 0) &&
*** 975,1009 ****
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
}
- // package-private
- int poll(int events, long timeout) throws IOException {
- assert Thread.holdsLock(blockingLock()) && !isBlocking();
-
- readLock.lock();
- try {
- int n = 0;
- try {
- begin();
- synchronized (stateLock) {
- if (!isOpen())
- return 0;
- readerThread = NativeThread.current();
- }
- n = Net.poll(fd, events, timeout);
- } finally {
- readerCleanup();
- end(n > 0);
- }
- return n;
- } finally {
- readLock.unlock();
- }
- }
-
/**
* Translates an interest operation set into a native poll event set
*/
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
--- 1020,1029 ----
*** 1035,1052 ****
synchronized (stateLock) {
switch (state) {
case ST_UNCONNECTED:
sb.append("unconnected");
break;
! case ST_PENDING:
sb.append("connection-pending");
break;
case ST_CONNECTED:
sb.append("connected");
! if (!isInputOpen)
sb.append(" ishut");
! if (!isOutputOpen)
sb.append(" oshut");
break;
}
InetSocketAddress addr = localAddress();
if (addr != null) {
--- 1055,1072 ----
synchronized (stateLock) {
switch (state) {
case ST_UNCONNECTED:
sb.append("unconnected");
break;
! case ST_CONNECTIONPENDING:
sb.append("connection-pending");
break;
case ST_CONNECTED:
sb.append("connected");
! if (isInputClosed)
sb.append(" ishut");
! if (isOutputClosed)
sb.append(" oshut");
break;
}
InetSocketAddress addr = localAddress();
if (addr != null) {
< prev index next >