31 import java.net.InetSocketAddress;
32 import java.net.ProtocolFamily;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.net.SocketOption;
36 import java.net.StandardProtocolFamily;
37 import java.net.StandardSocketOptions;
38 import java.nio.ByteBuffer;
39 import java.nio.channels.AlreadyBoundException;
40 import java.nio.channels.AlreadyConnectedException;
41 import java.nio.channels.AsynchronousCloseException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.ConnectionPendingException;
44 import java.nio.channels.NoConnectionPendingException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.SelectionKey;
47 import java.nio.channels.SocketChannel;
48 import java.nio.channels.spi.SelectorProvider;
49 import java.util.Collections;
50 import java.util.HashSet;
51 import java.util.Set;
52 import java.util.concurrent.locks.ReentrantLock;
53
54 import sun.net.NetHooks;
55 import sun.net.ext.ExtendedSocketOptions;
56
57 /**
58 * An implementation of SocketChannels
59 */
60
61 class SocketChannelImpl
62 extends SocketChannel
63 implements SelChImpl
64 {
65
66 // Used to make native read and write calls
67 private static NativeDispatcher nd;
68
69 // Our file descriptor object
70 private final FileDescriptor fd;
71 private final int fdVal;
72
73 // IDs of native threads doing reads and writes, for signalling
74 private volatile long readerThread;
75 private volatile long writerThread;
76
77 // Lock held by current reading or connecting thread
78 private final ReentrantLock readLock = new ReentrantLock();
79
80 // Lock held by current writing or connecting thread
81 private final ReentrantLock writeLock = new ReentrantLock();
82
83 // Lock held by any thread that modifies the state fields declared below
84 // DO NOT invoke a blocking I/O operation while holding this lock!
85 private final Object stateLock = new Object();
86
87 // -- The following fields are protected by stateLock
88
89 // set true when exclusive binding is on and SO_REUSEADDR is emulated
90 private boolean isReuseAddress;
91
92 // State, increases monotonically
93 private static final int ST_UNINITIALIZED = -1;
94 private static final int ST_UNCONNECTED = 0;
95 private static final int ST_PENDING = 1;
96 private static final int ST_CONNECTED = 2;
97 private static final int ST_KILLPENDING = 3;
98 private static final int ST_KILLED = 4;
99 private int state = ST_UNINITIALIZED;
100
101 // Binding
102 private InetSocketAddress localAddress;
103 private InetSocketAddress remoteAddress;
104
105 // Input/Output open
106 private boolean isInputOpen = true;
107 private boolean isOutputOpen = true;
108
109 // Socket adaptor, created on demand
110 private Socket socket;
111
112 // -- End of fields protected by stateLock
113
114
115 // Constructor for normal connecting sockets
116 //
117 SocketChannelImpl(SelectorProvider sp) throws IOException {
118 super(sp);
119 this.fd = Net.socket(true);
120 this.fdVal = IOUtil.fdVal(fd);
121 this.state = ST_UNCONNECTED;
122 }
123
124 SocketChannelImpl(SelectorProvider sp,
125 FileDescriptor fd,
126 boolean bound)
127 throws IOException
128 {
129 super(sp);
130 this.fd = fd;
131 this.fdVal = IOUtil.fdVal(fd);
132 this.state = ST_UNCONNECTED;
133 if (bound)
134 this.localAddress = Net.localAddress(fd);
135 }
136
137 // Constructor for sockets obtained from server sockets
138 //
139 SocketChannelImpl(SelectorProvider sp,
140 FileDescriptor fd, InetSocketAddress remote)
141 throws IOException
142 {
143 super(sp);
144 this.fd = fd;
145 this.fdVal = IOUtil.fdVal(fd);
146 this.state = ST_CONNECTED;
147 this.localAddress = Net.localAddress(fd);
148 this.remoteAddress = remote;
149 }
150
151 public Socket socket() {
152 synchronized (stateLock) {
153 if (socket == null)
154 socket = SocketAdaptor.create(this);
155 return socket;
156 }
157 }
158
159 @Override
160 public SocketAddress getLocalAddress() throws IOException {
161 synchronized (stateLock) {
162 if (!isOpen())
163 throw new ClosedChannelException();
164 return Net.getRevealedLocalAddress(localAddress);
165 }
166 }
167
168 @Override
169 public SocketAddress getRemoteAddress() throws IOException {
170 synchronized (stateLock) {
171 if (!isOpen())
172 throw new ClosedChannelException();
173 return remoteAddress;
174 }
175 }
176
177 @Override
178 public <T> SocketChannel setOption(SocketOption<T> name, T value)
179 throws IOException
180 {
181 if (name == null)
182 throw new NullPointerException();
183 if (!supportedOptions().contains(name))
184 throw new UnsupportedOperationException("'" + name + "' not supported");
185
186 synchronized (stateLock) {
187 if (!isOpen())
188 throw new ClosedChannelException();
189
190 if (name == StandardSocketOptions.IP_TOS) {
191 ProtocolFamily family = Net.isIPv6Available() ?
192 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
193 Net.setSocketOption(fd, family, name, value);
194 return this;
195 }
196
197 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
198 // SO_REUSEADDR emulated when using exclusive bind
199 isReuseAddress = (Boolean)value;
200 return this;
201 }
202
203 // no options that require special handling
204 Net.setSocketOption(fd, Net.UNSPEC, name, value);
205 return this;
206 }
207 }
208
209 @Override
210 @SuppressWarnings("unchecked")
211 public <T> T getOption(SocketOption<T> name)
212 throws IOException
213 {
214 if (name == null)
215 throw new NullPointerException();
216 if (!supportedOptions().contains(name))
217 throw new UnsupportedOperationException("'" + name + "' not supported");
218
219 synchronized (stateLock) {
220 if (!isOpen())
221 throw new ClosedChannelException();
222
223 if (name == StandardSocketOptions.SO_REUSEADDR &&
224 Net.useExclusiveBind())
225 {
226 // SO_REUSEADDR emulated when using exclusive bind
227 return (T)Boolean.valueOf(isReuseAddress);
228 }
229
230 // special handling for IP_TOS: always return 0 when IPv6
231 if (name == StandardSocketOptions.IP_TOS) {
232 ProtocolFamily family = Net.isIPv6Available() ?
233 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
234 return (T) Net.getSocketOption(fd, family, name);
235 }
236
237 // no options that require special handling
238 return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
239 }
240 }
241
242 private static class DefaultOptionsHolder {
243 static final Set<SocketOption<?>> defaultOptions = defaultOptions();
244
245 private static Set<SocketOption<?>> defaultOptions() {
246 HashSet<SocketOption<?>> set = new HashSet<>(8);
247 set.add(StandardSocketOptions.SO_SNDBUF);
248 set.add(StandardSocketOptions.SO_RCVBUF);
249 set.add(StandardSocketOptions.SO_KEEPALIVE);
250 set.add(StandardSocketOptions.SO_REUSEADDR);
251 if (Net.isReusePortAvailable()) {
252 set.add(StandardSocketOptions.SO_REUSEPORT);
253 }
254 set.add(StandardSocketOptions.SO_LINGER);
255 set.add(StandardSocketOptions.TCP_NODELAY);
256 // additional options required by socket adaptor
257 set.add(StandardSocketOptions.IP_TOS);
258 set.add(ExtendedSocketOption.SO_OOBINLINE);
259 ExtendedSocketOptions extendedOptions =
260 ExtendedSocketOptions.getInstance();
261 set.addAll(extendedOptions.options());
262 return Collections.unmodifiableSet(set);
263 }
264 }
265
266 @Override
267 public final Set<SocketOption<?>> supportedOptions() {
268 return DefaultOptionsHolder.defaultOptions;
269 }
270
271 private boolean ensureReadOpen() throws ClosedChannelException {
272 synchronized (stateLock) {
273 if (!isOpen())
274 throw new ClosedChannelException();
275 if (!isConnected())
276 throw new NotYetConnectedException();
277 if (!isInputOpen)
278 return false;
279 else
280 return true;
281 }
282 }
283
284 private void ensureWriteOpen() throws ClosedChannelException {
285 synchronized (stateLock) {
286 if (!isOpen())
287 throw new ClosedChannelException();
288 if (!isOutputOpen)
289 throw new ClosedChannelException();
290 if (!isConnected())
291 throw new NotYetConnectedException();
292 }
293 }
294
295 private void readerCleanup() throws IOException {
296 synchronized (stateLock) {
297 readerThread = 0;
298 if (state == ST_KILLPENDING)
299 kill();
300 }
301 }
302
303 private void writerCleanup() throws IOException {
304 synchronized (stateLock) {
305 writerThread = 0;
306 if (state == ST_KILLPENDING)
307 kill();
308 }
309 }
310
311 public int read(ByteBuffer buf) throws IOException {
312
313 if (buf == null)
314 throw new NullPointerException();
315
316 readLock.lock();
317 try {
318 if (!ensureReadOpen())
319 return -1;
320 int n = 0;
321 try {
322
323 // Set up the interruption machinery; see
324 // AbstractInterruptibleChannel for details
325 //
326 begin();
327
328 synchronized (stateLock) {
329 if (!isOpen()) {
330 // Either the current thread is already interrupted, so
331 // begin() closed the channel, or another thread closed the
332 // channel since we checked it a few bytecodes ago. In
333 // either case the value returned here is irrelevant since
334 // the invocation of end() in the finally block will throw
335 // an appropriate exception.
336 //
337 return 0;
338
339 }
340
341 // Save this thread so that it can be signalled on those
342 // platforms that require it
343 //
344 readerThread = NativeThread.current();
345 }
346
347 // Between the previous test of isOpen() and the return of the
348 // IOUtil.read invocation below, this channel might be closed
349 // or this thread might be interrupted. We rely upon the
350 // implicit synchronization point in the kernel read() call to
351 // make sure that the right thing happens. In either case the
352 // implCloseSelectableChannel method is ultimately invoked in
353 // some other thread, so there are three possibilities:
354 //
355 // - implCloseSelectableChannel() invokes nd.preClose()
356 // before this thread invokes read(), in which case the
357 // read returns immediately with either EOF or an error,
358 // the latter of which will cause an IOException to be
359 // thrown.
360 //
361 // - implCloseSelectableChannel() invokes nd.preClose() after
362 // this thread is blocked in read(). On some operating
363 // systems (e.g., Solaris and Windows) this causes the read
364 // to return immediately with either EOF or an error
365 // indication.
366 //
367 // - implCloseSelectableChannel() invokes nd.preClose() after
368 // this thread is blocked in read() but the operating
369 // system (e.g., Linux) doesn't support preemptive close,
370 // so implCloseSelectableChannel() proceeds to signal this
371 // thread, thereby causing the read to return immediately
372 // with IOStatus.INTERRUPTED.
373 //
374 // In all three cases the invocation of end() in the finally
375 // clause will notice that the channel has been closed and
376 // throw an appropriate exception (AsynchronousCloseException
377 // or ClosedByInterruptException) if necessary.
378 //
379 // *There is A fourth possibility. implCloseSelectableChannel()
380 // invokes nd.preClose(), signals reader/writer thred and quickly
381 // moves on to nd.close() in kill(), which does a real close.
382 // Then a third thread accepts a new connection, opens file or
383 // whatever that causes the released "fd" to be recycled. All
384 // above happens just between our last isOpen() check and the
385 // next kernel read reached, with the recycled "fd". The solution
386 // is to postpone the real kill() if there is a reader or/and
387 // writer thread(s) over there "waiting", leave the cleanup/kill
388 // to the reader or writer thread. (the preClose() still happens
389 // so the connection gets cut off as usual).
390 //
391 // For socket channels there is the additional wrinkle that
392 // asynchronous shutdown works much like asynchronous close,
393 // except that the channel is shutdown rather than completely
394 // closed. This is analogous to the first two cases above,
395 // except that the shutdown operation plays the role of
396 // nd.preClose().
397 for (;;) {
398 n = IOUtil.read(fd, buf, -1, nd);
399 if ((n == IOStatus.INTERRUPTED) && isOpen()) {
400 // The system call was interrupted but the channel
401 // is still open, so retry
402 continue;
403 }
404 return IOStatus.normalize(n);
405 }
406
407 } finally {
408 readerCleanup(); // Clear reader thread
409 // The end method, which is defined in our superclass
410 // AbstractInterruptibleChannel, resets the interruption
411 // machinery. If its argument is true then it returns
412 // normally; otherwise it checks the interrupt and open state
413 // of this channel and throws an appropriate exception if
414 // necessary.
415 //
416 // So, if we actually managed to do any I/O in the above try
417 // block then we pass true to the end method. We also pass
418 // true if the channel was in non-blocking mode when the I/O
419 // operation was initiated but no data could be transferred;
420 // this prevents spurious exceptions from being thrown in the
421 // rare event that a channel is closed or a thread is
422 // interrupted at the exact moment that a non-blocking I/O
423 // request is made.
424 //
425 end(n > 0 || (n == IOStatus.UNAVAILABLE));
426
427 // Extra case for socket channels: Asynchronous shutdown
428 //
429 synchronized (stateLock) {
430 if ((n <= 0) && (!isInputOpen))
431 return IOStatus.EOF;
432 }
433
434 assert IOStatus.check(n);
435
436 }
437 } finally {
438 readLock.unlock();
439 }
440 }
441
442 public long read(ByteBuffer[] dsts, int offset, int length)
443 throws IOException
444 {
445 if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
446 throw new IndexOutOfBoundsException();
447 readLock.lock();
448 try {
449 if (!ensureReadOpen())
450 return -1;
451 long n = 0;
452 try {
453 begin();
454 synchronized (stateLock) {
455 if (!isOpen())
456 return 0;
457 readerThread = NativeThread.current();
458 }
459
460 for (;;) {
461 n = IOUtil.read(fd, dsts, offset, length, nd);
462 if ((n == IOStatus.INTERRUPTED) && isOpen())
463 continue;
464 return IOStatus.normalize(n);
465 }
466 } finally {
467 readerCleanup();
468 end(n > 0 || (n == IOStatus.UNAVAILABLE));
469 synchronized (stateLock) {
470 if ((n <= 0) && (!isInputOpen))
471 return IOStatus.EOF;
472 }
473 assert IOStatus.check(n);
474 }
475 } finally {
476 readLock.unlock();
477 }
478 }
479
480 public int write(ByteBuffer buf) throws IOException {
481 if (buf == null)
482 throw new NullPointerException();
483 writeLock.lock();
484 try {
485 ensureWriteOpen();
486 int n = 0;
487 try {
488 begin();
489 synchronized (stateLock) {
490 if (!isOpen())
491 return 0;
492 writerThread = NativeThread.current();
493 }
494 for (;;) {
495 n = IOUtil.write(fd, buf, -1, nd);
496 if ((n == IOStatus.INTERRUPTED) && isOpen())
497 continue;
498 return IOStatus.normalize(n);
499 }
500 } finally {
501 writerCleanup();
502 end(n > 0 || (n == IOStatus.UNAVAILABLE));
503 synchronized (stateLock) {
504 if ((n <= 0) && (!isOutputOpen))
505 throw new AsynchronousCloseException();
506 }
507 assert IOStatus.check(n);
508 }
509 } finally {
510 writeLock.unlock();
511 }
512 }
513
514 public long write(ByteBuffer[] srcs, int offset, int length)
515 throws IOException
516 {
517 if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
518 throw new IndexOutOfBoundsException();
519 writeLock.lock();
520 try {
521 ensureWriteOpen();
522 long n = 0;
523 try {
524 begin();
525 synchronized (stateLock) {
526 if (!isOpen())
527 return 0;
528 writerThread = NativeThread.current();
529 }
530 for (;;) {
531 n = IOUtil.write(fd, srcs, offset, length, nd);
532 if ((n == IOStatus.INTERRUPTED) && isOpen())
533 continue;
534 return IOStatus.normalize(n);
535 }
536 } finally {
537 writerCleanup();
538 end((n > 0) || (n == IOStatus.UNAVAILABLE));
539 synchronized (stateLock) {
540 if ((n <= 0) && (!isOutputOpen))
541 throw new AsynchronousCloseException();
542 }
543 assert IOStatus.check(n);
544 }
545 } finally {
546 writeLock.unlock();
547 }
548 }
549
550 // package-private
551 int sendOutOfBandData(byte b) throws IOException {
552 writeLock.lock();
553 try {
554 ensureWriteOpen();
555 int n = 0;
556 try {
557 begin();
558 synchronized (stateLock) {
559 if (!isOpen())
560 return 0;
561 writerThread = NativeThread.current();
562 }
563 for (;;) {
564 n = sendOutOfBandData(fd, b);
565 if ((n == IOStatus.INTERRUPTED) && isOpen())
566 continue;
567 return IOStatus.normalize(n);
568 }
569 } finally {
570 writerCleanup();
571 end((n > 0) || (n == IOStatus.UNAVAILABLE));
572 synchronized (stateLock) {
573 if ((n <= 0) && (!isOutputOpen))
574 throw new AsynchronousCloseException();
575 }
576 assert IOStatus.check(n);
577 }
578 } finally {
579 writeLock.unlock();
580 }
581 }
582
583 protected void implConfigureBlocking(boolean block) throws IOException {
584 IOUtil.configureBlocking(fd, block);
585 }
586
587 public InetSocketAddress localAddress() {
588 synchronized (stateLock) {
589 return localAddress;
590 }
591 }
592
593 public SocketAddress remoteAddress() {
594 synchronized (stateLock) {
595 return remoteAddress;
596 }
597 }
598
599 @Override
600 public SocketChannel bind(SocketAddress local) throws IOException {
601 readLock.lock();
602 try {
603 writeLock.lock();
604 try {
605 synchronized (stateLock) {
606 if (!isOpen())
607 throw new ClosedChannelException();
608 if (state == ST_PENDING)
609 throw new ConnectionPendingException();
610 if (localAddress != null)
611 throw new AlreadyBoundException();
612 InetSocketAddress isa = (local == null) ?
613 new InetSocketAddress(0) : Net.checkAddress(local);
614 SecurityManager sm = System.getSecurityManager();
615 if (sm != null) {
616 sm.checkListen(isa.getPort());
617 }
618 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
619 Net.bind(fd, isa.getAddress(), isa.getPort());
620 localAddress = Net.localAddress(fd);
621 }
622 } finally {
623 writeLock.unlock();
624 }
625 } finally {
626 readLock.unlock();
627 }
628 return this;
629 }
630
631 public boolean isConnected() {
632 synchronized (stateLock) {
633 return (state == ST_CONNECTED);
634 }
635 }
636
637 public boolean isConnectionPending() {
638 synchronized (stateLock) {
639 return (state == ST_PENDING);
640 }
641 }
642
643 void ensureOpenAndUnconnected() throws IOException { // package-private
644 synchronized (stateLock) {
645 if (!isOpen())
646 throw new ClosedChannelException();
647 if (state == ST_CONNECTED)
648 throw new AlreadyConnectedException();
649 if (state == ST_PENDING)
650 throw new ConnectionPendingException();
651 }
652 }
653
654 public boolean connect(SocketAddress sa) throws IOException {
655 readLock.lock();
656 try {
657 writeLock.lock();
658 try {
659 ensureOpenAndUnconnected();
660 InetSocketAddress isa = Net.checkAddress(sa);
661 SecurityManager sm = System.getSecurityManager();
662 if (sm != null)
663 sm.checkConnect(isa.getAddress().getHostAddress(),
664 isa.getPort());
665 synchronized (blockingLock()) {
666 int n = 0;
667 try {
668 try {
669 begin();
670 synchronized (stateLock) {
671 if (!isOpen()) {
672 return false;
673 }
674 // notify hook only if unbound
675 if (localAddress == null) {
676 NetHooks.beforeTcpConnect(fd,
677 isa.getAddress(),
678 isa.getPort());
679 }
680 readerThread = NativeThread.current();
681 }
682 for (;;) {
683 InetAddress ia = isa.getAddress();
684 if (ia.isAnyLocalAddress())
685 ia = InetAddress.getLocalHost();
686 n = Net.connect(fd,
687 ia,
688 isa.getPort());
689 if ((n == IOStatus.INTERRUPTED) && isOpen())
690 continue;
691 break;
692 }
693
694 } finally {
695 readerCleanup();
696 end((n > 0) || (n == IOStatus.UNAVAILABLE));
697 assert IOStatus.check(n);
698 }
699 } catch (IOException x) {
700 // If an exception was thrown, close the channel after
701 // invoking end() so as to avoid bogus
702 // AsynchronousCloseExceptions
703 close();
704 throw x;
705 }
706 synchronized (stateLock) {
707 remoteAddress = isa;
708 if (n > 0) {
709
710 // Connection succeeded; disallow further
711 // invocation
712 state = ST_CONNECTED;
713 if (isOpen())
714 localAddress = Net.localAddress(fd);
715 return true;
716 }
717 // If nonblocking and no exception then connection
718 // pending; disallow another invocation
719 if (!isBlocking())
720 state = ST_PENDING;
721 else
722 assert false;
723 }
724 }
725 return false;
726 } finally {
727 writeLock.unlock();
728 }
729 } finally {
730 readLock.unlock();
731 }
732 }
733
734 public boolean finishConnect() throws IOException {
735 readLock.lock();
736 try {
737 writeLock.lock();
738 try {
739 synchronized (stateLock) {
740 if (!isOpen())
741 throw new ClosedChannelException();
742 if (state == ST_CONNECTED)
743 return true;
744 if (state != ST_PENDING)
745 throw new NoConnectionPendingException();
746 }
747 int n = 0;
748 try {
749 try {
750 begin();
751 synchronized (blockingLock()) {
752 synchronized (stateLock) {
753 if (!isOpen()) {
754 return false;
755 }
756 readerThread = NativeThread.current();
757 }
758 if (!isBlocking()) {
759 for (;;) {
760 n = checkConnect(fd, false);
761 if ((n == IOStatus.INTERRUPTED) && isOpen())
762 continue;
763 break;
764 }
765 } else {
766 for (;;) {
767 n = checkConnect(fd, true);
768 if (n == 0) {
769 // Loop in case of
770 // spurious notifications
771 continue;
772 }
773 if ((n == IOStatus.INTERRUPTED) && isOpen())
774 continue;
775 break;
776 }
777 }
778 }
779 } finally {
780 synchronized (stateLock) {
781 readerThread = 0;
782 if (state == ST_KILLPENDING) {
783 kill();
784 // poll()/getsockopt() does not report
785 // error (throws exception, with n = 0)
786 // on Linux platform after dup2 and
787 // signal-wakeup. Force n to 0 so the
788 // end() can throw appropriate exception
789 n = 0;
790 }
791 }
792 end((n > 0) || (n == IOStatus.UNAVAILABLE));
793 assert IOStatus.check(n);
794 }
795 } catch (IOException x) {
796 // If an exception was thrown, close the channel after
797 // invoking end() so as to avoid bogus
798 // AsynchronousCloseExceptions
799 close();
800 throw x;
801 }
802 if (n > 0) {
803 synchronized (stateLock) {
804 state = ST_CONNECTED;
805 if (isOpen())
806 localAddress = Net.localAddress(fd);
807 }
808 return true;
809 }
810 return false;
811 } finally {
812 writeLock.unlock();
813 }
814 } finally {
815 readLock.unlock();
816 }
817 }
818
819 @Override
820 public SocketChannel shutdownInput() throws IOException {
821 synchronized (stateLock) {
822 if (!isOpen())
823 throw new ClosedChannelException();
824 if (!isConnected())
825 throw new NotYetConnectedException();
826 if (isInputOpen) {
827 Net.shutdown(fd, Net.SHUT_RD);
828 if (readerThread != 0)
829 NativeThread.signal(readerThread);
830 isInputOpen = false;
831 }
832 return this;
833 }
834 }
835
836 @Override
837 public SocketChannel shutdownOutput() throws IOException {
838 synchronized (stateLock) {
839 if (!isOpen())
840 throw new ClosedChannelException();
841 if (!isConnected())
842 throw new NotYetConnectedException();
843 if (isOutputOpen) {
844 Net.shutdown(fd, Net.SHUT_WR);
845 if (writerThread != 0)
846 NativeThread.signal(writerThread);
847 isOutputOpen = false;
848 }
849 return this;
850 }
851 }
852
853 public boolean isInputOpen() {
854 synchronized (stateLock) {
855 return isInputOpen;
856 }
857 }
858
859 public boolean isOutputOpen() {
860 synchronized (stateLock) {
861 return isOutputOpen;
862 }
863 }
864
865 // AbstractInterruptibleChannel synchronizes invocations of this method
866 // using AbstractInterruptibleChannel.closeLock, and also ensures that this
867 // method is only ever invoked once. Before we get to this method, isOpen
868 // (which is volatile) will have been set to false.
869 //
870 protected void implCloseSelectableChannel() throws IOException {
871 synchronized (stateLock) {
872 isInputOpen = false;
873 isOutputOpen = false;
874
875 // Close the underlying file descriptor and dup it to a known fd
876 // that's already closed. This prevents other operations on this
877 // channel from using the old fd, which might be recycled in the
878 // meantime and allocated to an entirely different channel.
879 //
880 if (state != ST_KILLED)
881 nd.preClose(fd);
882
883 // Signal native threads, if needed. If a target thread is not
884 // currently blocked in an I/O operation then no harm is done since
885 // the signal handler doesn't actually do anything.
886 //
887 if (readerThread != 0)
888 NativeThread.signal(readerThread);
889
890 if (writerThread != 0)
891 NativeThread.signal(writerThread);
892
893 // If this channel is not registered then it's safe to close the fd
894 // immediately since we know at this point that no thread is
895 // blocked in an I/O operation upon the channel and, since the
896 // channel is marked closed, no thread will start another such
897 // operation. If this channel is registered then we don't close
898 // the fd since it might be in use by a selector. In that case
899 // closing this channel caused its keys to be cancelled, so the
900 // last selector to deregister a key for this channel will invoke
901 // kill() to close the fd.
902 //
903 if (!isRegistered())
904 kill();
905 }
906 }
907
908 public void kill() throws IOException {
909 synchronized (stateLock) {
910 if (state == ST_KILLED)
911 return;
912 if (state == ST_UNINITIALIZED) {
913 state = ST_KILLED;
914 return;
915 }
916 assert !isOpen() && !isRegistered();
917
918 // Postpone the kill if there is a waiting reader
919 // or writer thread. See the comments in read() for
920 // more detailed explanation.
921 if (readerThread == 0 && writerThread == 0) {
922 nd.close(fd);
923 state = ST_KILLED;
924 } else {
925 state = ST_KILLPENDING;
926 }
927 }
928 }
929
930 /**
931 * Translates native poll revent ops into a ready operation ops
932 */
933 public boolean translateReadyOps(int ops, int initialOps,
934 SelectionKeyImpl sk) {
935 int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
936 int oldOps = sk.nioReadyOps();
937 int newOps = initialOps;
938
939 if ((ops & Net.POLLNVAL) != 0) {
940 // This should only happen if this channel is pre-closed while a
941 // selection operation is in progress
942 // ## Throw an error if this channel has not been pre-closed
943 return false;
944 }
945
946 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
947 newOps = intOps;
948 sk.nioReadyOps(newOps);
949 return (newOps & ~oldOps) != 0;
950 }
951
952 if (((ops & Net.POLLIN) != 0) &&
953 ((intOps & SelectionKey.OP_READ) != 0) &&
954 (state == ST_CONNECTED))
955 newOps |= SelectionKey.OP_READ;
956
957 if (((ops & Net.POLLCONN) != 0) &&
958 ((intOps & SelectionKey.OP_CONNECT) != 0) &&
959 ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
960 newOps |= SelectionKey.OP_CONNECT;
961 }
962
963 if (((ops & Net.POLLOUT) != 0) &&
964 ((intOps & SelectionKey.OP_WRITE) != 0) &&
965 (state == ST_CONNECTED))
966 newOps |= SelectionKey.OP_WRITE;
967
968 sk.nioReadyOps(newOps);
969 return (newOps & ~oldOps) != 0;
970 }
971
972 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
973 return translateReadyOps(ops, sk.nioReadyOps(), sk);
974 }
975
976 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
977 return translateReadyOps(ops, 0, sk);
978 }
979
980 // package-private
981 int poll(int events, long timeout) throws IOException {
982 assert Thread.holdsLock(blockingLock()) && !isBlocking();
983
984 readLock.lock();
985 try {
986 int n = 0;
987 try {
988 begin();
989 synchronized (stateLock) {
990 if (!isOpen())
991 return 0;
992 readerThread = NativeThread.current();
993 }
994 n = Net.poll(fd, events, timeout);
995 } finally {
996 readerCleanup();
997 end(n > 0);
998 }
999 return n;
1000 } finally {
1001 readLock.unlock();
1002 }
1003 }
1004
1005 /**
1006 * Translates an interest operation set into a native poll event set
1007 */
1008 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
1009 int newOps = 0;
1010 if ((ops & SelectionKey.OP_READ) != 0)
1011 newOps |= Net.POLLIN;
1012 if ((ops & SelectionKey.OP_WRITE) != 0)
1013 newOps |= Net.POLLOUT;
1014 if ((ops & SelectionKey.OP_CONNECT) != 0)
1015 newOps |= Net.POLLCONN;
1016 sk.selector.putEventOps(sk, newOps);
1017 }
1018
1019 public FileDescriptor getFD() {
1020 return fd;
1021 }
1022
1023 public int getFDVal() {
1024 return fdVal;
1025 }
1026
1027 @Override
1028 public String toString() {
1029 StringBuilder sb = new StringBuilder();
1030 sb.append(this.getClass().getSuperclass().getName());
1031 sb.append('[');
1032 if (!isOpen())
1033 sb.append("closed");
1034 else {
1035 synchronized (stateLock) {
1036 switch (state) {
1037 case ST_UNCONNECTED:
1038 sb.append("unconnected");
1039 break;
1040 case ST_PENDING:
1041 sb.append("connection-pending");
1042 break;
1043 case ST_CONNECTED:
1044 sb.append("connected");
1045 if (!isInputOpen)
1046 sb.append(" ishut");
1047 if (!isOutputOpen)
1048 sb.append(" oshut");
1049 break;
1050 }
1051 InetSocketAddress addr = localAddress();
1052 if (addr != null) {
1053 sb.append(" local=");
1054 sb.append(Net.getRevealedLocalAddressAsString(addr));
1055 }
1056 if (remoteAddress() != null) {
1057 sb.append(" remote=");
1058 sb.append(remoteAddress().toString());
1059 }
1060 }
1061 }
1062 sb.append(']');
1063 return sb.toString();
1064 }
1065
1066
1067 // -- Native methods --
|
31 import java.net.InetSocketAddress;
32 import java.net.ProtocolFamily;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.net.SocketOption;
36 import java.net.StandardProtocolFamily;
37 import java.net.StandardSocketOptions;
38 import java.nio.ByteBuffer;
39 import java.nio.channels.AlreadyBoundException;
40 import java.nio.channels.AlreadyConnectedException;
41 import java.nio.channels.AsynchronousCloseException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.ConnectionPendingException;
44 import java.nio.channels.NoConnectionPendingException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.SelectionKey;
47 import java.nio.channels.SocketChannel;
48 import java.nio.channels.spi.SelectorProvider;
49 import java.util.Collections;
50 import java.util.HashSet;
51 import java.util.Objects;
52 import java.util.Set;
53 import java.util.concurrent.locks.ReentrantLock;
54
55 import sun.net.NetHooks;
56 import sun.net.ext.ExtendedSocketOptions;
57
58 /**
59 * An implementation of SocketChannels
60 */
61
62 class SocketChannelImpl
63 extends SocketChannel
64 implements SelChImpl
65 {
66 // Used to make native read and write calls
67 private static NativeDispatcher nd;
68
69 // Our file descriptor object
70 private final FileDescriptor fd;
71 private final int fdVal;
72
73 // Lock held by current reading or connecting thread
74 private final ReentrantLock readLock = new ReentrantLock();
75
76 // Lock held by current writing or connecting thread
77 private final ReentrantLock writeLock = new ReentrantLock();
78
79 // Lock held by any thread that modifies the state fields declared below
80 // DO NOT invoke a blocking I/O operation while holding this lock!
81 private final Object stateLock = new Object();
82
83 // Input/Output closed
84 private volatile boolean isInputClosed;
85 private volatile boolean isOutputClosed;
86
87 // -- The following fields are protected by stateLock
88
89 // set true when exclusive binding is on and SO_REUSEADDR is emulated
90 private boolean isReuseAddress;
91
92 // State, increases monotonically
93 private static final int ST_UNCONNECTED = 0;
94 private static final int ST_CONNECTIONPENDING = 1;
95 private static final int ST_CONNECTED = 2;
96 private static final int ST_CLOSING = 3;
97 private static final int ST_KILLPENDING = 4;
98 private static final int ST_KILLED = 5;
99 private int state;
100
101 // IDs of native threads doing reads and writes, for signalling
102 private long readerThread;
103 private long writerThread;
104
105 // Binding
106 private InetSocketAddress localAddress;
107 private InetSocketAddress remoteAddress;
108
109 // Socket adaptor, created on demand
110 private Socket socket;
111
112 // -- End of fields protected by stateLock
113
114
115 // Constructor for normal connecting sockets
116 //
117 SocketChannelImpl(SelectorProvider sp) throws IOException {
118 super(sp);
119 this.fd = Net.socket(true);
120 this.fdVal = IOUtil.fdVal(fd);
121 }
122
123 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
124 throws IOException
125 {
126 super(sp);
127 this.fd = fd;
128 this.fdVal = IOUtil.fdVal(fd);
129 if (bound) {
130 synchronized (stateLock) {
131 this.localAddress = Net.localAddress(fd);
132 }
133 }
134 }
135
136 // Constructor for sockets obtained from server sockets
137 //
138 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
139 throws IOException
140 {
141 super(sp);
142 this.fd = fd;
143 this.fdVal = IOUtil.fdVal(fd);
144 synchronized (stateLock) {
145 this.localAddress = Net.localAddress(fd);
146 this.remoteAddress = isa;
147 this.state = ST_CONNECTED;
148 }
149 }
150
151 // @throws ClosedChannelException if channel is closed
152 private void ensureOpen() throws ClosedChannelException {
153 if (!isOpen())
154 throw new ClosedChannelException();
155 }
156
157 @Override
158 public Socket socket() {
159 synchronized (stateLock) {
160 if (socket == null)
161 socket = SocketAdaptor.create(this);
162 return socket;
163 }
164 }
165
166 @Override
167 public SocketAddress getLocalAddress() throws IOException {
168 synchronized (stateLock) {
169 ensureOpen();
170 return Net.getRevealedLocalAddress(localAddress);
171 }
172 }
173
174 @Override
175 public SocketAddress getRemoteAddress() throws IOException {
176 synchronized (stateLock) {
177 ensureOpen();
178 return remoteAddress;
179 }
180 }
181
182 @Override
183 public <T> SocketChannel setOption(SocketOption<T> name, T value)
184 throws IOException
185 {
186 Objects.requireNonNull(name);
187 if (!supportedOptions().contains(name))
188 throw new UnsupportedOperationException("'" + name + "' not supported");
189
190 synchronized (stateLock) {
191 ensureOpen();
192
193 if (name == StandardSocketOptions.IP_TOS) {
194 ProtocolFamily family = Net.isIPv6Available() ?
195 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
196 Net.setSocketOption(fd, family, name, value);
197 return this;
198 }
199
200 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
201 // SO_REUSEADDR emulated when using exclusive bind
202 isReuseAddress = (Boolean)value;
203 return this;
204 }
205
206 // no options that require special handling
207 Net.setSocketOption(fd, Net.UNSPEC, name, value);
208 return this;
209 }
210 }
211
212 @Override
213 @SuppressWarnings("unchecked")
214 public <T> T getOption(SocketOption<T> name)
215 throws IOException
216 {
217 Objects.requireNonNull(name);
218 if (!supportedOptions().contains(name))
219 throw new UnsupportedOperationException("'" + name + "' not supported");
220
221 synchronized (stateLock) {
222 ensureOpen();
223
224 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
225 // SO_REUSEADDR emulated when using exclusive bind
226 return (T)Boolean.valueOf(isReuseAddress);
227 }
228
229 // special handling for IP_TOS: always return 0 when IPv6
230 if (name == StandardSocketOptions.IP_TOS) {
231 ProtocolFamily family = Net.isIPv6Available() ?
232 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
233 return (T) Net.getSocketOption(fd, family, name);
234 }
235
236 // no options that require special handling
237 return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
238 }
239 }
240
241 private static class DefaultOptionsHolder {
242 static final Set<SocketOption<?>> defaultOptions = defaultOptions();
243
244 private static Set<SocketOption<?>> defaultOptions() {
245 HashSet<SocketOption<?>> set = new HashSet<>();
246 set.add(StandardSocketOptions.SO_SNDBUF);
247 set.add(StandardSocketOptions.SO_RCVBUF);
248 set.add(StandardSocketOptions.SO_KEEPALIVE);
249 set.add(StandardSocketOptions.SO_REUSEADDR);
250 if (Net.isReusePortAvailable()) {
251 set.add(StandardSocketOptions.SO_REUSEPORT);
252 }
253 set.add(StandardSocketOptions.SO_LINGER);
254 set.add(StandardSocketOptions.TCP_NODELAY);
255 // additional options required by socket adaptor
256 set.add(StandardSocketOptions.IP_TOS);
257 set.add(ExtendedSocketOption.SO_OOBINLINE);
258 set.addAll(ExtendedSocketOptions.getInstance().options());
259 return Collections.unmodifiableSet(set);
260 }
261 }
262
263 @Override
264 public final Set<SocketOption<?>> supportedOptions() {
265 return DefaultOptionsHolder.defaultOptions;
266 }
267
268 /**
269 * Marks the beginning of a read operation that might block.
270 *
271 * @throws ClosedChannelException if the channel is closed
272 * @throws NotYetConnectedException if the channel is not yet connected
273 */
274 private void beginRead(boolean blocking) throws ClosedChannelException {
275 if (blocking) {
276 // set hook for Thread.interrupt
277 begin();
278 }
279 synchronized (stateLock) {
280 ensureOpen();
281 if (state != ST_CONNECTED)
282 throw new NotYetConnectedException();
283 if (blocking)
284 readerThread = NativeThread.current();
285 }
286 }
287
288 /**
289 * Marks the end of a read operation that may have blocked.
290 *
291 * @throws AsynchronousCloseException if the channel was closed due to this
292 * thread being interrupted on a blocking read operation.
293 */
294 private void endRead(boolean blocking, boolean completed)
295 throws AsynchronousCloseException
296 {
297 if (blocking) {
298 synchronized (stateLock) {
299 readerThread = 0;
300 // notify any thread waiting in implCloseSelectableChannel
301 if (state == ST_CLOSING) {
302 stateLock.notifyAll();
303 }
304 }
305 // remove hook for Thread.interrupt
306 end(completed);
307 }
308 }
309
310 @Override
311 public int read(ByteBuffer buf) throws IOException {
312 Objects.requireNonNull(buf);
313
314 readLock.lock();
315 try {
316 boolean blocking = isBlocking();
317 int n = 0;
318 try {
319 beginRead(blocking);
320
321 // check if input is shutdown
322 if (isInputClosed)
323 return IOStatus.EOF;
324
325 if (blocking) {
326 do {
327 n = IOUtil.read(fd, buf, -1, nd);
328 } while (n == IOStatus.INTERRUPTED && isOpen());
329 } else {
330 n = IOUtil.read(fd, buf, -1, nd);
331 }
332 } finally {
333 endRead(blocking, n > 0);
334 if (n <= 0 && isInputClosed)
335 return IOStatus.EOF;
336 }
337 return IOStatus.normalize(n);
338 } finally {
339 readLock.unlock();
340 }
341 }
342
343 @Override
344 public long read(ByteBuffer[] dsts, int offset, int length)
345 throws IOException
346 {
347 Objects.checkFromIndexSize(offset, length, dsts.length);
348
349 readLock.lock();
350 try {
351 boolean blocking = isBlocking();
352 long n = 0;
353 try {
354 beginRead(blocking);
355
356 // check if input is shutdown
357 if (isInputClosed)
358 return IOStatus.EOF;
359
360 if (blocking) {
361 do {
362 n = IOUtil.read(fd, dsts, offset, length, nd);
363 } while (n == IOStatus.INTERRUPTED && isOpen());
364 } else {
365 n = IOUtil.read(fd, dsts, offset, length, nd);
366 }
367 } finally {
368 endRead(blocking, n > 0);
369 if (n <= 0 && isInputClosed)
370 return IOStatus.EOF;
371 }
372 return IOStatus.normalize(n);
373 } finally {
374 readLock.unlock();
375 }
376 }
377
378 /**
379 * Marks the beginning of a write operation that might block.
380 *
381 * @throws ClosedChannelException if the channel is closed or output shutdown
382 * @throws NotYetConnectedException if the channel is not yet connected
383 */
384 private void beginWrite(boolean blocking) throws ClosedChannelException {
385 if (blocking) {
386 // set hook for Thread.interrupt
387 begin();
388 }
389 synchronized (stateLock) {
390 ensureOpen();
391 if (isOutputClosed)
392 throw new ClosedChannelException();
393 if (state != ST_CONNECTED)
394 throw new NotYetConnectedException();
395 if (blocking)
396 writerThread = NativeThread.current();
397 }
398 }
399
400 /**
401 * Marks the end of a write operation that may have blocked.
402 *
403 * @throws AsynchronousCloseException if the channel was closed due to this
404 * thread being interrupted on a blocking write operation.
405 */
406 private void endWrite(boolean blocking, boolean completed)
407 throws AsynchronousCloseException
408 {
409 if (blocking) {
410 synchronized (stateLock) {
411 writerThread = 0;
412 // notify any thread waiting in implCloseSelectableChannel
413 if (state == ST_CLOSING) {
414 stateLock.notifyAll();
415 }
416 }
417 // remove hook for Thread.interrupt
418 end(completed);
419 }
420 }
421
422 @Override
423 public int write(ByteBuffer buf) throws IOException {
424 Objects.requireNonNull(buf);
425
426 writeLock.lock();
427 try {
428 boolean blocking = isBlocking();
429 int n = 0;
430 try {
431 beginWrite(blocking);
432 if (blocking) {
433 do {
434 n = IOUtil.write(fd, buf, -1, nd);
435 } while (n == IOStatus.INTERRUPTED && isOpen());
436 } else {
437 n = IOUtil.write(fd, buf, -1, nd);
438 }
439 } finally {
440 endWrite(blocking, n > 0);
441 if (n <= 0 && isOutputClosed)
442 throw new AsynchronousCloseException();
443 }
444 return IOStatus.normalize(n);
445 } finally {
446 writeLock.unlock();
447 }
448 }
449
450 @Override
451 public long write(ByteBuffer[] srcs, int offset, int length)
452 throws IOException
453 {
454 Objects.checkFromIndexSize(offset, length, srcs.length);
455
456 writeLock.lock();
457 try {
458 boolean blocking = isBlocking();
459 long n = 0;
460 try {
461 beginWrite(blocking);
462 if (blocking) {
463 do {
464 n = IOUtil.write(fd, srcs, offset, length, nd);
465 } while (n == IOStatus.INTERRUPTED && isOpen());
466 } else {
467 n = IOUtil.write(fd, srcs, offset, length, nd);
468 }
469 } finally {
470 endWrite(blocking, n > 0);
471 if (n <= 0 && isOutputClosed)
472 throw new AsynchronousCloseException();
473 }
474 return IOStatus.normalize(n);
475 } finally {
476 writeLock.unlock();
477 }
478 }
479
480 /**
481 * Writes a byte of out of band data.
482 */
483 int sendOutOfBandData(byte b) throws IOException {
484 writeLock.lock();
485 try {
486 boolean blocking = isBlocking();
487 int n = 0;
488 try {
489 beginWrite(blocking);
490 if (blocking) {
491 do {
492 n = sendOutOfBandData(fd, b);
493 } while (n == IOStatus.INTERRUPTED && isOpen());
494 } else {
495 n = sendOutOfBandData(fd, b);
496 }
497 } finally {
498 endWrite(blocking, n > 0);
499 if (n <= 0 && isOutputClosed)
500 throw new AsynchronousCloseException();
501 }
502 return IOStatus.normalize(n);
503 } finally {
504 writeLock.unlock();
505 }
506 }
507
508 @Override
509 protected void implConfigureBlocking(boolean block) throws IOException {
510 readLock.lock();
511 try {
512 writeLock.lock();
513 try {
514 synchronized (stateLock) {
515 ensureOpen();
516 IOUtil.configureBlocking(fd, block);
517 }
518 } finally {
519 writeLock.unlock();
520 }
521 } finally {
522 readLock.unlock();
523 }
524 }
525
526 /**
527 * Returns the local address, or null if not bound
528 */
529 InetSocketAddress localAddress() {
530 synchronized (stateLock) {
531 return localAddress;
532 }
533 }
534
535 /**
536 * Returns the remote address, or null if not connected
537 */
538 InetSocketAddress remoteAddress() {
539 synchronized (stateLock) {
540 return remoteAddress;
541 }
542 }
543
544 @Override
545 public SocketChannel bind(SocketAddress local) throws IOException {
546 readLock.lock();
547 try {
548 writeLock.lock();
549 try {
550 synchronized (stateLock) {
551 ensureOpen();
552 if (state == ST_CONNECTIONPENDING)
553 throw new ConnectionPendingException();
554 if (localAddress != null)
555 throw new AlreadyBoundException();
556 InetSocketAddress isa = (local == null) ?
557 new InetSocketAddress(0) : Net.checkAddress(local);
558 SecurityManager sm = System.getSecurityManager();
559 if (sm != null) {
560 sm.checkListen(isa.getPort());
561 }
562 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
563 Net.bind(fd, isa.getAddress(), isa.getPort());
564 localAddress = Net.localAddress(fd);
565 }
566 } finally {
567 writeLock.unlock();
568 }
569 } finally {
570 readLock.unlock();
571 }
572 return this;
573 }
574
575 @Override
576 public boolean isConnected() {
577 synchronized (stateLock) {
578 return (state == ST_CONNECTED);
579 }
580 }
581
582 @Override
583 public boolean isConnectionPending() {
584 synchronized (stateLock) {
585 return (state == ST_CONNECTIONPENDING);
586 }
587 }
588
589 /**
590 * Marks the beginning of a connect operation that might block.
591 *
592 * @throws ClosedChannelException if the channel is closed
593 * @throws AlreadyConnectedException if already connected
594 * @throws ConnectionPendingException is a connection is pending
595 */
596 private void beginConnect(boolean blocking) throws ClosedChannelException {
597 if (blocking) {
598 // set hook for Thread.interrupt
599 begin();
600 }
601 synchronized (stateLock) {
602 ensureOpen();
603 if (state == ST_CONNECTED)
604 throw new AlreadyConnectedException();
605 if (state == ST_CONNECTIONPENDING)
606 throw new ConnectionPendingException();
607 if (blocking)
608 readerThread = NativeThread.current();
609 }
610 }
611
612 /**
613 * Marks the end of a connect operation that may have blocked.
614 *
615 * @throws AsynchronousCloseException if the channel was closed due to this
616 * thread being interrupted on a blocking connect operation.
617 */
618 private void endConnect(boolean blocking, boolean completed)
619 throws AsynchronousCloseException
620 {
621 endRead(blocking, completed);
622 }
623
624 @Override
625 public boolean connect(SocketAddress sa) throws IOException {
626 InetSocketAddress isa = Net.checkAddress(sa);
627 SecurityManager sm = System.getSecurityManager();
628 if (sm != null)
629 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
630
631 readLock.lock();
632 try {
633 writeLock.lock();
634 try {
635 // notify before-connect hook
636 synchronized (stateLock) {
637 if (state == ST_UNCONNECTED && localAddress == null) {
638 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
639 }
640 }
641
642 InetAddress ia = isa.getAddress();
643 if (ia.isAnyLocalAddress())
644 ia = InetAddress.getLocalHost();
645
646 int n = 0;
647 boolean blocking = isBlocking();
648 try {
649 try {
650 beginConnect(blocking);
651 if (blocking) {
652 do {
653 n = Net.connect(fd, ia, isa.getPort());
654 } while (n == IOStatus.INTERRUPTED && isOpen());
655 } else {
656 n = Net.connect(fd, ia, isa.getPort());
657 }
658 } finally {
659 endConnect(blocking, n > 0);
660 }
661 } catch (IOException x) {
662 // connect failed, close socket
663 close();
664 throw x;
665 }
666
667 // connection may be established
668 synchronized (stateLock) {
669 if (!isOpen())
670 throw new AsynchronousCloseException();
671 remoteAddress = isa;
672 if (n > 0) {
673 // connected established
674 localAddress = Net.localAddress(fd);
675 state = ST_CONNECTED;
676 return true;
677 } else {
678 // connection pending
679 assert !blocking;
680 state = ST_CONNECTIONPENDING;
681 return false;
682 }
683 }
684 } finally {
685 writeLock.unlock();
686 }
687 } finally {
688 readLock.unlock();
689 }
690 }
691
692 /**
693 * Marks the beginning of a finishConnect operation that might block.
694 *
695 * @throws ClosedChannelException if the channel is closed
696 * @throws NoConnectionPendingException if no connection is pending
697 */
698 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
699 if (blocking) {
700 // set hook for Thread.interrupt
701 begin();
702 }
703 synchronized (stateLock) {
704 ensureOpen();
705 if (state != ST_CONNECTIONPENDING)
706 throw new NoConnectionPendingException();
707 if (blocking)
708 readerThread = NativeThread.current();
709 }
710 }
711
712 /**
713 * Marks the end of a finishConnect operation that may have blocked.
714 *
715 * @throws AsynchronousCloseException if the channel was closed due to this
716 * thread being interrupted on a blocking connect operation.
717 */
718 private void endFinishConnect(boolean blocking, boolean completed)
719 throws AsynchronousCloseException
720 {
721 endRead(blocking, completed);
722 }
723
724 @Override
725 public boolean finishConnect() throws IOException {
726 readLock.lock();
727 try {
728 writeLock.lock();
729 try {
730 // already connected?
731 synchronized (stateLock) {
732 if (state == ST_CONNECTED)
733 return true;
734 }
735
736 int n = 0;
737 boolean blocking = isBlocking();
738 try {
739 try {
740 beginFinishConnect(blocking);
741 if (blocking) {
742 do {
743 n = checkConnect(fd, true);
744 } while (n == 0 || (n == IOStatus.INTERRUPTED) && isOpen());
745 } else {
746 n = checkConnect(fd, false);
747 }
748 } finally {
749 endFinishConnect(blocking, n > 0);
750 }
751 } catch (IOException x) {
752 close();
753 throw x;
754 }
755
756 // post finishConnect, connection may be established
757 synchronized (stateLock) {
758 if (!isOpen())
759 throw new AsynchronousCloseException();
760 if (n > 0) {
761 // connection established
762 localAddress = Net.localAddress(fd);
763 state = ST_CONNECTED;
764 return true;
765 } else {
766 // connection still pending
767 assert !blocking;
768 return false;
769 }
770 }
771 } finally {
772 writeLock.unlock();
773 }
774 } finally {
775 readLock.unlock();
776 }
777 }
778
779 /**
780 * Invoked by implCloseChannel to close the channel.
781 *
782 * This method waits for outstanding I/O operations to complete. When in
783 * blocking mode, the socket is pre-closed and the threads in blocking I/O
784 * operations are signalled to ensure that the outstanding I/O operations
785 * complete quickly.
786 *
787 * If the socket is connected then it is shutdown by this method. The
788 * shutdown ensures that the peer reads EOF for the case that the socket is
789 * not pre-closed or closed by this method.
790 *
791 * The socket is closed by this method when it is not registered with a
792 * Selector. Note that a channel configured blocking may be registered with
793 * a Selector. This arises when a key is canceled and the channel configured
794 * to blocking mode before the key is flushed from the Selector.
795 */
796 @Override
797 protected void implCloseSelectableChannel() throws IOException {
798 assert !isOpen();
799
800 boolean blocking;
801 boolean connected;
802 boolean interrupted = false;
803
804 // set state to ST_CLOSING
805 synchronized (stateLock) {
806 assert state < ST_CLOSING;
807 blocking = isBlocking();
808 connected = (state == ST_CONNECTED);
809 state = ST_CLOSING;
810 }
811
812 // wait for any outstanding I/O operations to complete
813 if (blocking) {
814 synchronized (stateLock) {
815 assert state == ST_CLOSING;
816 long reader = readerThread;
817 long writer = writerThread;
818 if (reader != 0 || writer != 0) {
819 nd.preClose(fd);
820 connected = false; // fd is no longer connected socket
821
822 if (reader != 0)
823 NativeThread.signal(reader);
824 if (writer != 0)
825 NativeThread.signal(writer);
826
827 // wait for blocking I/O operations to end
828 while (readerThread != 0 || writerThread != 0) {
829 try {
830 stateLock.wait();
831 } catch (InterruptedException e) {
832 interrupted = true;
833 }
834 }
835 }
836 }
837 } else {
838 // non-blocking mode: wait for read/write to complete
839 readLock.lock();
840 try {
841 writeLock.lock();
842 writeLock.unlock();
843 } finally {
844 readLock.unlock();
845 }
846 }
847
848 // set state to ST_KILLPENDING
849 synchronized (stateLock) {
850 assert state == ST_CLOSING;
851 // if connected, and the channel is registered with a Selector, we
852 // shutdown the output so that the peer reads EOF
853 if (connected && isRegistered()) {
854 try {
855 Net.shutdown(fd, Net.SHUT_WR);
856 } catch (IOException ignore) { }
857 }
858 state = ST_KILLPENDING;
859 }
860
861 // close socket if not registered with Selector
862 if (!isRegistered())
863 kill();
864
865 // restore interrupt status
866 if (interrupted)
867 Thread.currentThread().interrupt();
868 }
869
870 @Override
871 public void kill() throws IOException {
872 synchronized (stateLock) {
873 if (state == ST_KILLPENDING) {
874 state = ST_KILLED;
875 nd.close(fd);
876 }
877 }
878 }
879
880 @Override
881 public SocketChannel shutdownInput() throws IOException {
882 synchronized (stateLock) {
883 ensureOpen();
884 if (!isConnected())
885 throw new NotYetConnectedException();
886 if (!isInputClosed) {
887 Net.shutdown(fd, Net.SHUT_RD);
888 long thread = readerThread;
889 if (thread != 0)
890 NativeThread.signal(thread);
891 isInputClosed = true;
892 }
893 return this;
894 }
895 }
896
897 @Override
898 public SocketChannel shutdownOutput() throws IOException {
899 synchronized (stateLock) {
900 ensureOpen();
901 if (!isConnected())
902 throw new NotYetConnectedException();
903 if (!isOutputClosed) {
904 Net.shutdown(fd, Net.SHUT_WR);
905 long thread = writerThread;
906 if (thread != 0)
907 NativeThread.signal(thread);
908 isOutputClosed = true;
909 }
910 return this;
911 }
912 }
913
914 boolean isInputOpen() {
915 return !isInputClosed;
916 }
917
918 boolean isOutputOpen() {
919 return !isOutputClosed;
920 }
921
922 /**
923 * Poll this channel's socket for reading up to the given timeout.
924 * @return {@code true} if the socket is polled
925 */
926 boolean pollRead(long timeout) throws IOException {
927 boolean blocking = isBlocking();
928 assert Thread.holdsLock(blockingLock()) && blocking;
929
930 readLock.lock();
931 try {
932 boolean polled = false;
933 try {
934 beginRead(blocking);
935 int n = Net.poll(fd, Net.POLLIN, timeout);
936 polled = (n > 0);
937 } finally {
938 endRead(blocking, polled);
939 }
940 return polled;
941 } finally {
942 readLock.unlock();
943 }
944 }
945
946 /**
947 * Poll this channel's socket for a connection, up to the given timeout.
948 * @return {@code true} if the socket is polled
949 */
950 boolean pollConnected(long timeout) throws IOException {
951 boolean blocking = isBlocking();
952 assert Thread.holdsLock(blockingLock()) && blocking;
953
954 readLock.lock();
955 try {
956 writeLock.lock();
957 try {
958 boolean polled = false;
959 try {
960 beginFinishConnect(blocking);
961 int n = Net.poll(fd, Net.POLLCONN, timeout);
962 polled = (n > 0);
963 } finally {
964 endFinishConnect(blocking, polled);
965 }
966 return polled;
967 } finally {
968 writeLock.unlock();
969 }
970 } finally {
971 readLock.unlock();
972 }
973 }
974
975 /**
976 * Translates native poll revent ops into a ready operation ops
977 */
978 public boolean translateReadyOps(int ops, int initialOps,
979 SelectionKeyImpl sk) {
980 int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
981 int oldOps = sk.nioReadyOps();
982 int newOps = initialOps;
983
984 if ((ops & Net.POLLNVAL) != 0) {
985 // This should only happen if this channel is pre-closed while a
986 // selection operation is in progress
987 // ## Throw an error if this channel has not been pre-closed
988 return false;
989 }
990
991 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
992 newOps = intOps;
993 sk.nioReadyOps(newOps);
994 return (newOps & ~oldOps) != 0;
995 }
996
997 if (((ops & Net.POLLIN) != 0) &&
998 ((intOps & SelectionKey.OP_READ) != 0) &&
999 (state == ST_CONNECTED))
1000 newOps |= SelectionKey.OP_READ;
1001
1002 if (((ops & Net.POLLCONN) != 0) &&
1003 ((intOps & SelectionKey.OP_CONNECT) != 0) &&
1004 ((state == ST_UNCONNECTED) || (state == ST_CONNECTIONPENDING))) {
1005 newOps |= SelectionKey.OP_CONNECT;
1006 }
1007
1008 if (((ops & Net.POLLOUT) != 0) &&
1009 ((intOps & SelectionKey.OP_WRITE) != 0) &&
1010 (state == ST_CONNECTED))
1011 newOps |= SelectionKey.OP_WRITE;
1012
1013 sk.nioReadyOps(newOps);
1014 return (newOps & ~oldOps) != 0;
1015 }
1016
1017 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
1018 return translateReadyOps(ops, sk.nioReadyOps(), sk);
1019 }
1020
1021 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
1022 return translateReadyOps(ops, 0, sk);
1023 }
1024
1025 /**
1026 * Translates an interest operation set into a native poll event set
1027 */
1028 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
1029 int newOps = 0;
1030 if ((ops & SelectionKey.OP_READ) != 0)
1031 newOps |= Net.POLLIN;
1032 if ((ops & SelectionKey.OP_WRITE) != 0)
1033 newOps |= Net.POLLOUT;
1034 if ((ops & SelectionKey.OP_CONNECT) != 0)
1035 newOps |= Net.POLLCONN;
1036 sk.selector.putEventOps(sk, newOps);
1037 }
1038
1039 public FileDescriptor getFD() {
1040 return fd;
1041 }
1042
1043 public int getFDVal() {
1044 return fdVal;
1045 }
1046
1047 @Override
1048 public String toString() {
1049 StringBuilder sb = new StringBuilder();
1050 sb.append(this.getClass().getSuperclass().getName());
1051 sb.append('[');
1052 if (!isOpen())
1053 sb.append("closed");
1054 else {
1055 synchronized (stateLock) {
1056 switch (state) {
1057 case ST_UNCONNECTED:
1058 sb.append("unconnected");
1059 break;
1060 case ST_CONNECTIONPENDING:
1061 sb.append("connection-pending");
1062 break;
1063 case ST_CONNECTED:
1064 sb.append("connected");
1065 if (isInputClosed)
1066 sb.append(" ishut");
1067 if (isOutputClosed)
1068 sb.append(" oshut");
1069 break;
1070 }
1071 InetSocketAddress addr = localAddress();
1072 if (addr != null) {
1073 sb.append(" local=");
1074 sb.append(Net.getRevealedLocalAddressAsString(addr));
1075 }
1076 if (remoteAddress() != null) {
1077 sb.append(" remote=");
1078 sb.append(remoteAddress().toString());
1079 }
1080 }
1081 }
1082 sb.append(']');
1083 return sb.toString();
1084 }
1085
1086
1087 // -- Native methods --
|