42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.ConnectionPendingException;
44 import java.nio.channels.NoConnectionPendingException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.SelectionKey;
47 import java.nio.channels.SocketChannel;
48 import java.nio.channels.spi.SelectorProvider;
49 import java.util.Collections;
50 import java.util.HashSet;
51 import java.util.Objects;
52 import java.util.Set;
53 import java.util.concurrent.locks.ReentrantLock;
54
55 import sun.net.NetHooks;
56 import sun.net.ext.ExtendedSocketOptions;
57
58 /**
59 * An implementation of SocketChannels
60 */
61
62 class SocketChannelImpl
63 extends SocketChannel
64 implements SelChImpl
65 {
66 // Used to make native read and write calls
67 private static NativeDispatcher nd;
68
69 // Our file descriptor object
70 private final FileDescriptor fd;
71 private final int fdVal;
72
73 // Lock held by current reading or connecting thread
74 private final ReentrantLock readLock = new ReentrantLock();
75
76 // Lock held by current writing or connecting thread
77 private final ReentrantLock writeLock = new ReentrantLock();
78
79 // Lock held by any thread that modifies the state fields declared below
80 // DO NOT invoke a blocking I/O operation while holding this lock!
81 private final Object stateLock = new Object();
82
83 // Input/Output closed
84 private volatile boolean isInputClosed;
85 private volatile boolean isOutputClosed;
86
87 // -- The following fields are protected by stateLock
88
89 // set true when exclusive binding is on and SO_REUSEADDR is emulated
90 private boolean isReuseAddress;
91
92 // State, increases monotonically
93 private static final int ST_UNCONNECTED = 0;
94 private static final int ST_CONNECTIONPENDING = 1;
95 private static final int ST_CONNECTED = 2;
96 private static final int ST_CLOSING = 3;
97 private static final int ST_KILLPENDING = 4;
98 private static final int ST_KILLED = 5;
99 private volatile int state; // need stateLock to change
100
101 // IDs of native threads doing reads and writes, for signalling
102 private long readerThread;
103 private long writerThread;
104
105 // Binding
106 private InetSocketAddress localAddress;
107 private InetSocketAddress remoteAddress;
108
109 // Socket adaptor, created on demand
110 private Socket socket;
111
112 // -- End of fields protected by stateLock
113
114
115 // Constructor for normal connecting sockets
116 //
117 SocketChannelImpl(SelectorProvider sp) throws IOException {
118 super(sp);
119 this.fd = Net.socket(true);
120 this.fdVal = IOUtil.fdVal(fd);
121 }
122
123 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
124 throws IOException
125 {
126 super(sp);
127 this.fd = fd;
128 this.fdVal = IOUtil.fdVal(fd);
129 if (bound) {
130 synchronized (stateLock) {
131 this.localAddress = Net.localAddress(fd);
132 }
133 }
134 }
135
136 // Constructor for sockets obtained from server sockets
137 //
138 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
139 throws IOException
140 {
141 super(sp);
142 this.fd = fd;
143 this.fdVal = IOUtil.fdVal(fd);
144 synchronized (stateLock) {
145 this.localAddress = Net.localAddress(fd);
146 this.remoteAddress = isa;
147 this.state = ST_CONNECTED;
148 }
149 }
150
151 /**
152 * Checks that the channel is open.
153 *
154 * @throws ClosedChannelException if channel is closed (or closing)
155 */
156 private void ensureOpen() throws ClosedChannelException {
157 if (!isOpen())
158 throw new ClosedChannelException();
159 }
160
161 /**
162 * Checks that the channel is open and connected.
163 *
164 * @apiNote This method uses the "state" field to check if the channel is
165 * open. It should never be used in conjuncion with isOpen or ensureOpen
166 * as these methods check AbstractInterruptibleChannel's closed field - that
167 * field is set before implCloseSelectableChannel is called and so before
168 * the state is changed.
169 *
170 * @throws ClosedChannelException if channel is closed (or closing)
171 * @throws NotYetConnectedException if open and not connected
172 */
173 private void ensureOpenAndConnected() throws ClosedChannelException {
174 int state = this.state;
175 if (state < ST_CONNECTED) {
176 throw new NotYetConnectedException();
269 private static Set<SocketOption<?>> defaultOptions() {
270 HashSet<SocketOption<?>> set = new HashSet<>();
271 set.add(StandardSocketOptions.SO_SNDBUF);
272 set.add(StandardSocketOptions.SO_RCVBUF);
273 set.add(StandardSocketOptions.SO_KEEPALIVE);
274 set.add(StandardSocketOptions.SO_REUSEADDR);
275 if (Net.isReusePortAvailable()) {
276 set.add(StandardSocketOptions.SO_REUSEPORT);
277 }
278 set.add(StandardSocketOptions.SO_LINGER);
279 set.add(StandardSocketOptions.TCP_NODELAY);
280 // additional options required by socket adaptor
281 set.add(StandardSocketOptions.IP_TOS);
282 set.add(ExtendedSocketOption.SO_OOBINLINE);
283 set.addAll(ExtendedSocketOptions.getInstance().options());
284 return Collections.unmodifiableSet(set);
285 }
286 }
287
288 @Override
289 public final Set<SocketOption<?>> supportedOptions() {
290 return DefaultOptionsHolder.defaultOptions;
291 }
292
293 /**
294 * Marks the beginning of a read operation that might block.
295 *
296 * @throws ClosedChannelException if the channel is closed
297 * @throws NotYetConnectedException if the channel is not yet connected
298 */
299 private void beginRead(boolean blocking) throws ClosedChannelException {
300 if (blocking) {
301 // set hook for Thread.interrupt
302 begin();
303
304 synchronized (stateLock) {
305 ensureOpenAndConnected();
306 // record thread so it can be signalled if needed
307 readerThread = NativeThread.current();
308 }
309 } else {
310 ensureOpenAndConnected();
311 }
312 }
313
314 /**
315 * Marks the end of a read operation that may have blocked.
316 *
317 * @throws AsynchronousCloseException if the channel was closed due to this
318 * thread being interrupted on a blocking read operation.
319 */
320 private void endRead(boolean blocking, boolean completed)
321 throws AsynchronousCloseException
322 {
323 if (blocking) {
324 synchronized (stateLock) {
325 readerThread = 0;
326 // notify any thread waiting in implCloseSelectableChannel
327 if (state == ST_CLOSING) {
328 stateLock.notifyAll();
329 }
330 }
331 // remove hook for Thread.interrupt
332 end(completed);
333 }
334 }
335
336 @Override
337 public int read(ByteBuffer buf) throws IOException {
338 Objects.requireNonNull(buf);
339
340 readLock.lock();
390 } else {
391 n = IOUtil.read(fd, dsts, offset, length, nd);
392 }
393 } finally {
394 endRead(blocking, n > 0);
395 if (n <= 0 && isInputClosed)
396 return IOStatus.EOF;
397 }
398 return IOStatus.normalize(n);
399 } finally {
400 readLock.unlock();
401 }
402 }
403
404 /**
405 * Marks the beginning of a write operation that might block.
406 *
407 * @throws ClosedChannelException if the channel is closed or output shutdown
408 * @throws NotYetConnectedException if the channel is not yet connected
409 */
410 private void beginWrite(boolean blocking) throws ClosedChannelException {
411 if (blocking) {
412 // set hook for Thread.interrupt
413 begin();
414
415 synchronized (stateLock) {
416 ensureOpenAndConnected();
417 if (isOutputClosed)
418 throw new ClosedChannelException();
419 // record thread so it can be signalled if needed
420 writerThread = NativeThread.current();
421 }
422 } else {
423 ensureOpenAndConnected();
424 }
425 }
426
427 /**
428 * Marks the end of a write operation that may have blocked.
429 *
430 * @throws AsynchronousCloseException if the channel was closed due to this
431 * thread being interrupted on a blocking write operation.
432 */
433 private void endWrite(boolean blocking, boolean completed)
434 throws AsynchronousCloseException
435 {
436 if (blocking) {
437 synchronized (stateLock) {
438 writerThread = 0;
439 // notify any thread waiting in implCloseSelectableChannel
440 if (state == ST_CLOSING) {
441 stateLock.notifyAll();
442 }
443 }
444 // remove hook for Thread.interrupt
445 end(completed);
446 }
447 }
448
449 @Override
450 public int write(ByteBuffer buf) throws IOException {
451 Objects.requireNonNull(buf);
452
453 writeLock.lock();
490 do {
491 n = IOUtil.write(fd, srcs, offset, length, nd);
492 } while (n == IOStatus.INTERRUPTED && isOpen());
493 } else {
494 n = IOUtil.write(fd, srcs, offset, length, nd);
495 }
496 } finally {
497 endWrite(blocking, n > 0);
498 if (n <= 0 && isOutputClosed)
499 throw new AsynchronousCloseException();
500 }
501 return IOStatus.normalize(n);
502 } finally {
503 writeLock.unlock();
504 }
505 }
506
507 /**
508 * Writes a byte of out of band data.
509 */
510 int sendOutOfBandData(byte b) throws IOException {
511 writeLock.lock();
512 try {
513 boolean blocking = isBlocking();
514 int n = 0;
515 try {
516 beginWrite(blocking);
517 if (blocking) {
518 do {
519 n = sendOutOfBandData(fd, b);
520 } while (n == IOStatus.INTERRUPTED && isOpen());
521 } else {
522 n = sendOutOfBandData(fd, b);
523 }
524 } finally {
525 endWrite(blocking, n > 0);
526 if (n <= 0 && isOutputClosed)
527 throw new AsynchronousCloseException();
528 }
529 return IOStatus.normalize(n);
530 } finally {
536 protected void implConfigureBlocking(boolean block) throws IOException {
537 readLock.lock();
538 try {
539 writeLock.lock();
540 try {
541 synchronized (stateLock) {
542 ensureOpen();
543 IOUtil.configureBlocking(fd, block);
544 }
545 } finally {
546 writeLock.unlock();
547 }
548 } finally {
549 readLock.unlock();
550 }
551 }
552
553 /**
554 * Returns the local address, or null if not bound
555 */
556 InetSocketAddress localAddress() {
557 synchronized (stateLock) {
558 return localAddress;
559 }
560 }
561
562 /**
563 * Returns the remote address, or null if not connected
564 */
565 InetSocketAddress remoteAddress() {
566 synchronized (stateLock) {
567 return remoteAddress;
568 }
569 }
570
571 @Override
572 public SocketChannel bind(SocketAddress local) throws IOException {
573 readLock.lock();
574 try {
575 writeLock.lock();
576 try {
577 synchronized (stateLock) {
578 ensureOpen();
579 if (state == ST_CONNECTIONPENDING)
580 throw new ConnectionPendingException();
581 if (localAddress != null)
582 throw new AlreadyBoundException();
583 InetSocketAddress isa = (local == null) ?
584 new InetSocketAddress(0) : Net.checkAddress(local);
585 SecurityManager sm = System.getSecurityManager();
601
602 @Override
603 public boolean isConnected() {
604 return (state == ST_CONNECTED);
605 }
606
607 @Override
608 public boolean isConnectionPending() {
609 return (state == ST_CONNECTIONPENDING);
610 }
611
612 /**
613 * Marks the beginning of a connect operation that might block.
614 * @param blocking true if configured blocking
615 * @param isa the remote address
616 * @throws ClosedChannelException if the channel is closed
617 * @throws AlreadyConnectedException if already connected
618 * @throws ConnectionPendingException is a connection is pending
619 * @throws IOException if the pre-connect hook fails
620 */
621 private void beginConnect(boolean blocking, InetSocketAddress isa)
622 throws IOException
623 {
624 if (blocking) {
625 // set hook for Thread.interrupt
626 begin();
627 }
628 synchronized (stateLock) {
629 ensureOpen();
630 int state = this.state;
631 if (state == ST_CONNECTED)
632 throw new AlreadyConnectedException();
633 if (state == ST_CONNECTIONPENDING)
634 throw new ConnectionPendingException();
635 assert state == ST_UNCONNECTED;
636 this.state = ST_CONNECTIONPENDING;
637
638 if (localAddress == null)
639 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
640 remoteAddress = isa;
641
642 if (blocking) {
643 // record thread so it can be signalled if needed
644 readerThread = NativeThread.current();
645 }
646 }
647 }
648
649 /**
650 * Marks the end of a connect operation that may have blocked.
651 *
652 * @throws AsynchronousCloseException if the channel was closed due to this
653 * thread being interrupted on a blocking connect operation.
654 * @throws IOException if completed and unable to obtain the local address
655 */
656 private void endConnect(boolean blocking, boolean completed)
657 throws IOException
658 {
659 endRead(blocking, completed);
660
661 if (completed) {
662 synchronized (stateLock) {
663 if (state == ST_CONNECTIONPENDING) {
664 localAddress = Net.localAddress(fd);
665 state = ST_CONNECTED;
666 }
667 }
668 }
669 }
670
671 @Override
672 public boolean connect(SocketAddress sa) throws IOException {
673 InetSocketAddress isa = Net.checkAddress(sa);
674 SecurityManager sm = System.getSecurityManager();
675 if (sm != null)
676 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
698 return n > 0;
699 } finally {
700 writeLock.unlock();
701 }
702 } finally {
703 readLock.unlock();
704 }
705 } catch (IOException ioe) {
706 // connect failed, close the channel
707 close();
708 throw ioe;
709 }
710 }
711
712 /**
713 * Marks the beginning of a finishConnect operation that might block.
714 *
715 * @throws ClosedChannelException if the channel is closed
716 * @throws NoConnectionPendingException if no connection is pending
717 */
718 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
719 if (blocking) {
720 // set hook for Thread.interrupt
721 begin();
722 }
723 synchronized (stateLock) {
724 ensureOpen();
725 if (state != ST_CONNECTIONPENDING)
726 throw new NoConnectionPendingException();
727 if (blocking) {
728 // record thread so it can be signalled if needed
729 readerThread = NativeThread.current();
730 }
731 }
732 }
733
734 /**
735 * Marks the end of a finishConnect operation that may have blocked.
736 *
737 * @throws AsynchronousCloseException if the channel was closed due to this
738 * thread being interrupted on a blocking connect operation.
739 * @throws IOException if completed and unable to obtain the local address
740 */
741 private void endFinishConnect(boolean blocking, boolean completed)
742 throws IOException
743 {
744 endRead(blocking, completed);
745
746 if (completed) {
747 synchronized (stateLock) {
748 if (state == ST_CONNECTIONPENDING) {
749 localAddress = Net.localAddress(fd);
750 state = ST_CONNECTED;
751 }
752 }
753 }
754 }
755
756 @Override
757 public boolean finishConnect() throws IOException {
758 try {
759 readLock.lock();
760 try {
761 writeLock.lock();
913 }
914 }
915
916 @Override
917 public SocketChannel shutdownOutput() throws IOException {
918 synchronized (stateLock) {
919 ensureOpen();
920 if (!isConnected())
921 throw new NotYetConnectedException();
922 if (!isOutputClosed) {
923 Net.shutdown(fd, Net.SHUT_WR);
924 long thread = writerThread;
925 if (thread != 0)
926 NativeThread.signal(thread);
927 isOutputClosed = true;
928 }
929 return this;
930 }
931 }
932
933 boolean isInputOpen() {
934 return !isInputClosed;
935 }
936
937 boolean isOutputOpen() {
938 return !isOutputClosed;
939 }
940
941 /**
942 * Poll this channel's socket for reading up to the given timeout.
943 * @return {@code true} if the socket is polled
944 */
945 boolean pollRead(long timeout) throws IOException {
946 boolean blocking = isBlocking();
947 assert Thread.holdsLock(blockingLock()) && blocking;
948
949 readLock.lock();
950 try {
951 boolean polled = false;
952 try {
953 beginRead(blocking);
954 int events = Net.poll(fd, Net.POLLIN, timeout);
955 polled = (events != 0);
956 } finally {
957 endRead(blocking, polled);
|
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.ConnectionPendingException;
44 import java.nio.channels.NoConnectionPendingException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.SelectionKey;
47 import java.nio.channels.SocketChannel;
48 import java.nio.channels.spi.SelectorProvider;
49 import java.util.Collections;
50 import java.util.HashSet;
51 import java.util.Objects;
52 import java.util.Set;
53 import java.util.concurrent.locks.ReentrantLock;
54
55 import sun.net.NetHooks;
56 import sun.net.ext.ExtendedSocketOptions;
57
58 /**
59 * An implementation of SocketChannels
60 */
61
62 public class SocketChannelImpl
63 extends SocketChannel
64 implements SelChImpl
65 {
66 // Used to make native read and write calls
67 protected static NativeDispatcher nd;
68
69 // Our file descriptor object
70 protected FileDescriptor fd;
71 protected final int fdVal;
72
73 // Lock held by current reading or connecting thread
74 protected final ReentrantLock readLock = new ReentrantLock();
75
76 // Lock held by current writing or connecting thread
77 protected final ReentrantLock writeLock = new ReentrantLock();
78
79 // Lock held by any thread that modifies the state fields declared below
80 // DO NOT invoke a blocking I/O operation while holding this lock!
81 protected final Object stateLock = new Object();
82
83 // Input/Output closed
84 protected volatile boolean isInputClosed;
85 protected volatile boolean isOutputClosed;
86
87 // -- The following fields are protected by stateLock
88
89 // set true when exclusive binding is on and SO_REUSEADDR is emulated
90 protected boolean isReuseAddress;
91
92 // State, increases monotonically
93 protected static final int ST_UNCONNECTED = 0;
94 protected static final int ST_CONNECTIONPENDING = 1;
95 protected static final int ST_CONNECTED = 2;
96 protected static final int ST_CLOSING = 3;
97 protected static final int ST_KILLPENDING = 4;
98 protected static final int ST_KILLED = 5;
99 protected volatile int state; // need stateLock to change
100
101 // IDs of native threads doing reads and writes, for signalling
102 protected long readerThread;
103 protected long writerThread;
104
105 // Binding
106 protected InetSocketAddress localAddress;
107 protected InetSocketAddress remoteAddress;
108
109 // Socket adaptor, created on demand
110 protected Socket socket;
111
112 // -- End of fields protected by stateLock
113
114
115 // Constructor for normal connecting sockets
116 //
117 protected SocketChannelImpl(SelectorProvider sp) throws IOException {
118 super(sp);
119 this.fd = createFD();
120 this.fdVal = IOUtil.fdVal(fd);
121 }
122
123 protected SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
124 throws IOException
125 {
126 super(sp);
127 this.fd = fd;
128 this.fdVal = IOUtil.fdVal(fd);
129 if (bound) {
130 synchronized (stateLock) {
131 this.localAddress = createLocalAddress(fd);
132 }
133 }
134 }
135
136 // Constructor for sockets obtained from server sockets
137 //
138 protected SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
139 throws IOException
140 {
141 super(sp);
142 this.fd = fd;
143 this.fdVal = IOUtil.fdVal(fd);
144 synchronized (stateLock) {
145 this.localAddress = createLocalAddress(fd);
146 this.remoteAddress = isa;
147 this.state = ST_CONNECTED;
148 }
149 }
150
151 protected FileDescriptor createFD() throws IOException {
152 return Net.socket(true);
153 }
154
155 protected InetSocketAddress createLocalAddress(FileDescriptor fd)
156 throws IOException {
157 return Net.localAddress(fd);
158 }
159
160 /**
161 * Checks that the channel is open.
162 *
163 * @throws ClosedChannelException if channel is closed (or closing)
164 */
165 protected void ensureOpen() throws ClosedChannelException {
166 if (!isOpen())
167 throw new ClosedChannelException();
168 }
169
170 /**
171 * Checks that the channel is open and connected.
172 *
173 * @apiNote This method uses the "state" field to check if the channel is
174 * open. It should never be used in conjuncion with isOpen or ensureOpen
175 * as these methods check AbstractInterruptibleChannel's closed field - that
176 * field is set before implCloseSelectableChannel is called and so before
177 * the state is changed.
178 *
179 * @throws ClosedChannelException if channel is closed (or closing)
180 * @throws NotYetConnectedException if open and not connected
181 */
182 private void ensureOpenAndConnected() throws ClosedChannelException {
183 int state = this.state;
184 if (state < ST_CONNECTED) {
185 throw new NotYetConnectedException();
278 private static Set<SocketOption<?>> defaultOptions() {
279 HashSet<SocketOption<?>> set = new HashSet<>();
280 set.add(StandardSocketOptions.SO_SNDBUF);
281 set.add(StandardSocketOptions.SO_RCVBUF);
282 set.add(StandardSocketOptions.SO_KEEPALIVE);
283 set.add(StandardSocketOptions.SO_REUSEADDR);
284 if (Net.isReusePortAvailable()) {
285 set.add(StandardSocketOptions.SO_REUSEPORT);
286 }
287 set.add(StandardSocketOptions.SO_LINGER);
288 set.add(StandardSocketOptions.TCP_NODELAY);
289 // additional options required by socket adaptor
290 set.add(StandardSocketOptions.IP_TOS);
291 set.add(ExtendedSocketOption.SO_OOBINLINE);
292 set.addAll(ExtendedSocketOptions.getInstance().options());
293 return Collections.unmodifiableSet(set);
294 }
295 }
296
297 @Override
298 public Set<SocketOption<?>> supportedOptions() {
299 return DefaultOptionsHolder.defaultOptions;
300 }
301
302 /**
303 * Marks the beginning of a read operation that might block.
304 *
305 * @throws ClosedChannelException if the channel is closed
306 * @throws NotYetConnectedException if the channel is not yet connected
307 */
308 protected void beginRead(boolean blocking) throws ClosedChannelException {
309 if (blocking) {
310 // set hook for Thread.interrupt
311 begin();
312
313 synchronized (stateLock) {
314 ensureOpenAndConnected();
315 // record thread so it can be signalled if needed
316 readerThread = NativeThread.current();
317 }
318 } else {
319 ensureOpenAndConnected();
320 }
321 }
322
323 /**
324 * Marks the end of a read operation that may have blocked.
325 *
326 * @throws AsynchronousCloseException if the channel was closed due to this
327 * thread being interrupted on a blocking read operation.
328 */
329 protected void endRead(boolean blocking, boolean completed)
330 throws AsynchronousCloseException
331 {
332 if (blocking) {
333 synchronized (stateLock) {
334 readerThread = 0;
335 // notify any thread waiting in implCloseSelectableChannel
336 if (state == ST_CLOSING) {
337 stateLock.notifyAll();
338 }
339 }
340 // remove hook for Thread.interrupt
341 end(completed);
342 }
343 }
344
345 @Override
346 public int read(ByteBuffer buf) throws IOException {
347 Objects.requireNonNull(buf);
348
349 readLock.lock();
399 } else {
400 n = IOUtil.read(fd, dsts, offset, length, nd);
401 }
402 } finally {
403 endRead(blocking, n > 0);
404 if (n <= 0 && isInputClosed)
405 return IOStatus.EOF;
406 }
407 return IOStatus.normalize(n);
408 } finally {
409 readLock.unlock();
410 }
411 }
412
413 /**
414 * Marks the beginning of a write operation that might block.
415 *
416 * @throws ClosedChannelException if the channel is closed or output shutdown
417 * @throws NotYetConnectedException if the channel is not yet connected
418 */
419 protected void beginWrite(boolean blocking) throws ClosedChannelException {
420 if (blocking) {
421 // set hook for Thread.interrupt
422 begin();
423
424 synchronized (stateLock) {
425 ensureOpenAndConnected();
426 if (isOutputClosed)
427 throw new ClosedChannelException();
428 // record thread so it can be signalled if needed
429 writerThread = NativeThread.current();
430 }
431 } else {
432 ensureOpenAndConnected();
433 }
434 }
435
436 /**
437 * Marks the end of a write operation that may have blocked.
438 *
439 * @throws AsynchronousCloseException if the channel was closed due to this
440 * thread being interrupted on a blocking write operation.
441 */
442 protected void endWrite(boolean blocking, boolean completed)
443 throws AsynchronousCloseException
444 {
445 if (blocking) {
446 synchronized (stateLock) {
447 writerThread = 0;
448 // notify any thread waiting in implCloseSelectableChannel
449 if (state == ST_CLOSING) {
450 stateLock.notifyAll();
451 }
452 }
453 // remove hook for Thread.interrupt
454 end(completed);
455 }
456 }
457
458 @Override
459 public int write(ByteBuffer buf) throws IOException {
460 Objects.requireNonNull(buf);
461
462 writeLock.lock();
499 do {
500 n = IOUtil.write(fd, srcs, offset, length, nd);
501 } while (n == IOStatus.INTERRUPTED && isOpen());
502 } else {
503 n = IOUtil.write(fd, srcs, offset, length, nd);
504 }
505 } finally {
506 endWrite(blocking, n > 0);
507 if (n <= 0 && isOutputClosed)
508 throw new AsynchronousCloseException();
509 }
510 return IOStatus.normalize(n);
511 } finally {
512 writeLock.unlock();
513 }
514 }
515
516 /**
517 * Writes a byte of out of band data.
518 */
519 protected int sendOutOfBandData(byte b) throws IOException {
520 writeLock.lock();
521 try {
522 boolean blocking = isBlocking();
523 int n = 0;
524 try {
525 beginWrite(blocking);
526 if (blocking) {
527 do {
528 n = sendOutOfBandData(fd, b);
529 } while (n == IOStatus.INTERRUPTED && isOpen());
530 } else {
531 n = sendOutOfBandData(fd, b);
532 }
533 } finally {
534 endWrite(blocking, n > 0);
535 if (n <= 0 && isOutputClosed)
536 throw new AsynchronousCloseException();
537 }
538 return IOStatus.normalize(n);
539 } finally {
545 protected void implConfigureBlocking(boolean block) throws IOException {
546 readLock.lock();
547 try {
548 writeLock.lock();
549 try {
550 synchronized (stateLock) {
551 ensureOpen();
552 IOUtil.configureBlocking(fd, block);
553 }
554 } finally {
555 writeLock.unlock();
556 }
557 } finally {
558 readLock.unlock();
559 }
560 }
561
562 /**
563 * Returns the local address, or null if not bound
564 */
565 protected InetSocketAddress localAddress() {
566 synchronized (stateLock) {
567 return localAddress;
568 }
569 }
570
571 /**
572 * Returns the remote address, or null if not connected
573 */
574 protected InetSocketAddress remoteAddress() {
575 synchronized (stateLock) {
576 return remoteAddress;
577 }
578 }
579
580 @Override
581 public SocketChannel bind(SocketAddress local) throws IOException {
582 readLock.lock();
583 try {
584 writeLock.lock();
585 try {
586 synchronized (stateLock) {
587 ensureOpen();
588 if (state == ST_CONNECTIONPENDING)
589 throw new ConnectionPendingException();
590 if (localAddress != null)
591 throw new AlreadyBoundException();
592 InetSocketAddress isa = (local == null) ?
593 new InetSocketAddress(0) : Net.checkAddress(local);
594 SecurityManager sm = System.getSecurityManager();
610
611 @Override
612 public boolean isConnected() {
613 return (state == ST_CONNECTED);
614 }
615
616 @Override
617 public boolean isConnectionPending() {
618 return (state == ST_CONNECTIONPENDING);
619 }
620
621 /**
622 * Marks the beginning of a connect operation that might block.
623 * @param blocking true if configured blocking
624 * @param isa the remote address
625 * @throws ClosedChannelException if the channel is closed
626 * @throws AlreadyConnectedException if already connected
627 * @throws ConnectionPendingException is a connection is pending
628 * @throws IOException if the pre-connect hook fails
629 */
630 protected void beginConnect(boolean blocking, InetSocketAddress isa)
631 throws IOException
632 {
633 if (blocking) {
634 // set hook for Thread.interrupt
635 begin();
636 }
637 synchronized (stateLock) {
638 ensureOpen();
639 int state = this.state;
640 if (state == ST_CONNECTED)
641 throw new AlreadyConnectedException();
642 if (state == ST_CONNECTIONPENDING)
643 throw new ConnectionPendingException();
644 assert state == ST_UNCONNECTED;
645 this.state = ST_CONNECTIONPENDING;
646
647 if (localAddress == null)
648 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
649 remoteAddress = isa;
650
651 if (blocking) {
652 // record thread so it can be signalled if needed
653 readerThread = NativeThread.current();
654 }
655 }
656 }
657
658 /**
659 * Marks the end of a connect operation that may have blocked.
660 *
661 * @throws AsynchronousCloseException if the channel was closed due to this
662 * thread being interrupted on a blocking connect operation.
663 * @throws IOException if completed and unable to obtain the local address
664 */
665 protected void endConnect(boolean blocking, boolean completed)
666 throws IOException
667 {
668 endRead(blocking, completed);
669
670 if (completed) {
671 synchronized (stateLock) {
672 if (state == ST_CONNECTIONPENDING) {
673 localAddress = Net.localAddress(fd);
674 state = ST_CONNECTED;
675 }
676 }
677 }
678 }
679
680 @Override
681 public boolean connect(SocketAddress sa) throws IOException {
682 InetSocketAddress isa = Net.checkAddress(sa);
683 SecurityManager sm = System.getSecurityManager();
684 if (sm != null)
685 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
707 return n > 0;
708 } finally {
709 writeLock.unlock();
710 }
711 } finally {
712 readLock.unlock();
713 }
714 } catch (IOException ioe) {
715 // connect failed, close the channel
716 close();
717 throw ioe;
718 }
719 }
720
721 /**
722 * Marks the beginning of a finishConnect operation that might block.
723 *
724 * @throws ClosedChannelException if the channel is closed
725 * @throws NoConnectionPendingException if no connection is pending
726 */
727 protected void beginFinishConnect(boolean blocking) throws ClosedChannelException {
728 if (blocking) {
729 // set hook for Thread.interrupt
730 begin();
731 }
732 synchronized (stateLock) {
733 ensureOpen();
734 if (state != ST_CONNECTIONPENDING)
735 throw new NoConnectionPendingException();
736 if (blocking) {
737 // record thread so it can be signalled if needed
738 readerThread = NativeThread.current();
739 }
740 }
741 }
742
743 /**
744 * Marks the end of a finishConnect operation that may have blocked.
745 *
746 * @throws AsynchronousCloseException if the channel was closed due to this
747 * thread being interrupted on a blocking connect operation.
748 * @throws IOException if completed and unable to obtain the local address
749 */
750 protected void endFinishConnect(boolean blocking, boolean completed)
751 throws IOException
752 {
753 endRead(blocking, completed);
754
755 if (completed) {
756 synchronized (stateLock) {
757 if (state == ST_CONNECTIONPENDING) {
758 localAddress = Net.localAddress(fd);
759 state = ST_CONNECTED;
760 }
761 }
762 }
763 }
764
765 @Override
766 public boolean finishConnect() throws IOException {
767 try {
768 readLock.lock();
769 try {
770 writeLock.lock();
922 }
923 }
924
925 @Override
926 public SocketChannel shutdownOutput() throws IOException {
927 synchronized (stateLock) {
928 ensureOpen();
929 if (!isConnected())
930 throw new NotYetConnectedException();
931 if (!isOutputClosed) {
932 Net.shutdown(fd, Net.SHUT_WR);
933 long thread = writerThread;
934 if (thread != 0)
935 NativeThread.signal(thread);
936 isOutputClosed = true;
937 }
938 return this;
939 }
940 }
941
942 protected boolean isInputOpen() {
943 return !isInputClosed;
944 }
945
946 protected boolean isOutputOpen() {
947 return !isOutputClosed;
948 }
949
950 /**
951 * Poll this channel's socket for reading up to the given timeout.
952 * @return {@code true} if the socket is polled
953 */
954 boolean pollRead(long timeout) throws IOException {
955 boolean blocking = isBlocking();
956 assert Thread.holdsLock(blockingLock()) && blocking;
957
958 readLock.lock();
959 try {
960 boolean polled = false;
961 try {
962 beginRead(blocking);
963 int events = Net.poll(fd, Net.POLLIN, timeout);
964 polled = (events != 0);
965 } finally {
966 endRead(blocking, polled);
|