1 /* 2 * Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.*; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.*; 33 import java.nio.channels.spi.*; 34 import java.util.*; 35 import sun.net.NetHooks; 36 import sun.misc.IoTrace; 37 import sun.misc.IoTraceContext; 38 39 /** 40 * An implementation of SocketChannels 41 */ 42 43 class SocketChannelImpl 44 extends SocketChannel 45 implements SelChImpl 46 { 47 48 // Used to make native read and write calls 49 private static NativeDispatcher nd; 50 51 // Our file descriptor object 52 private final FileDescriptor fd; 53 54 // fd value needed for dev/poll. This value will remain valid 55 // even after the value in the file descriptor object has been set to -1 56 private final int fdVal; 57 58 // IDs of native threads doing reads and writes, for signalling 59 private volatile long readerThread = 0; 60 private volatile long writerThread = 0; 61 62 // Lock held by current reading or connecting thread 63 private final Object readLock = new Object(); 64 65 // Lock held by current writing or connecting thread 66 private final Object writeLock = new Object(); 67 68 // Lock held by any thread that modifies the state fields declared below 69 // DO NOT invoke a blocking I/O operation while holding this lock! 70 private final Object stateLock = new Object(); 71 72 // -- The following fields are protected by stateLock 73 74 // State, increases monotonically 75 private static final int ST_UNINITIALIZED = -1; 76 private static final int ST_UNCONNECTED = 0; 77 private static final int ST_PENDING = 1; 78 private static final int ST_CONNECTED = 2; 79 private static final int ST_KILLPENDING = 3; 80 private static final int ST_KILLED = 4; 81 private int state = ST_UNINITIALIZED; 82 83 // Binding 84 private InetSocketAddress localAddress; 85 private InetSocketAddress remoteAddress; 86 87 // Input/Output open 88 private boolean isInputOpen = true; 89 private boolean isOutputOpen = true; 90 private boolean readyToConnect = false; 91 92 // Socket adaptor, created on demand 93 private Socket socket; 94 95 // -- End of fields protected by stateLock 96 97 98 // Constructor for normal connecting sockets 99 // 100 SocketChannelImpl(SelectorProvider sp) throws IOException { 101 super(sp); 102 this.fd = Net.socket(true); 103 this.fdVal = IOUtil.fdVal(fd); 104 this.state = ST_UNCONNECTED; 105 } 106 107 SocketChannelImpl(SelectorProvider sp, 108 FileDescriptor fd, 109 boolean bound) 110 throws IOException 111 { 112 super(sp); 113 this.fd = fd; 114 this.fdVal = IOUtil.fdVal(fd); 115 this.state = ST_UNCONNECTED; 116 if (bound) 117 this.localAddress = Net.localAddress(fd); 118 } 119 120 // Constructor for sockets obtained from server sockets 121 // 122 SocketChannelImpl(SelectorProvider sp, 123 FileDescriptor fd, InetSocketAddress remote) 124 throws IOException 125 { 126 super(sp); 127 this.fd = fd; 128 this.fdVal = IOUtil.fdVal(fd); 129 this.state = ST_CONNECTED; 130 this.localAddress = Net.localAddress(fd); 131 this.remoteAddress = remote; 132 } 133 134 public Socket socket() { 135 synchronized (stateLock) { 136 if (socket == null) 137 socket = SocketAdaptor.create(this); 138 return socket; 139 } 140 } 141 142 @Override 143 public SocketAddress getLocalAddress() throws IOException { 144 synchronized (stateLock) { 145 if (!isOpen()) 146 throw new ClosedChannelException(); 147 return localAddress; 148 } 149 } 150 151 @Override 152 public SocketAddress getRemoteAddress() throws IOException { 153 synchronized (stateLock) { 154 if (!isOpen()) 155 throw new ClosedChannelException(); 156 return remoteAddress; 157 } 158 } 159 160 @Override 161 public <T> SocketChannel setOption(SocketOption<T> name, T value) 162 throws IOException 163 { 164 if (name == null) 165 throw new NullPointerException(); 166 if (!supportedOptions().contains(name)) 167 throw new UnsupportedOperationException("'" + name + "' not supported"); 168 169 synchronized (stateLock) { 170 if (!isOpen()) 171 throw new ClosedChannelException(); 172 173 // special handling for IP_TOS: no-op when IPv6 174 if (name == StandardSocketOptions.IP_TOS) { 175 if (!Net.isIPv6Available()) 176 Net.setSocketOption(fd, StandardProtocolFamily.INET, name, value); 177 return this; 178 } 179 180 // no options that require special handling 181 Net.setSocketOption(fd, Net.UNSPEC, name, value); 182 return this; 183 } 184 } 185 186 @Override 187 @SuppressWarnings("unchecked") 188 public <T> T getOption(SocketOption<T> name) 189 throws IOException 190 { 191 if (name == null) 192 throw new NullPointerException(); 193 if (!supportedOptions().contains(name)) 194 throw new UnsupportedOperationException("'" + name + "' not supported"); 195 196 synchronized (stateLock) { 197 if (!isOpen()) 198 throw new ClosedChannelException(); 199 200 // special handling for IP_TOS: always return 0 when IPv6 201 if (name == StandardSocketOptions.IP_TOS) { 202 return (Net.isIPv6Available()) ? (T) Integer.valueOf(0) : 203 (T) Net.getSocketOption(fd, StandardProtocolFamily.INET, name); 204 } 205 206 // no options that require special handling 207 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); 208 } 209 } 210 211 private static class DefaultOptionsHolder { 212 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 213 214 private static Set<SocketOption<?>> defaultOptions() { 215 HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(8); 216 set.add(StandardSocketOptions.SO_SNDBUF); 217 set.add(StandardSocketOptions.SO_RCVBUF); 218 set.add(StandardSocketOptions.SO_KEEPALIVE); 219 set.add(StandardSocketOptions.SO_REUSEADDR); 220 set.add(StandardSocketOptions.SO_LINGER); 221 set.add(StandardSocketOptions.TCP_NODELAY); 222 // additional options required by socket adaptor 223 set.add(StandardSocketOptions.IP_TOS); 224 set.add(ExtendedSocketOption.SO_OOBINLINE); 225 return Collections.unmodifiableSet(set); 226 } 227 } 228 229 @Override 230 public final Set<SocketOption<?>> supportedOptions() { 231 return DefaultOptionsHolder.defaultOptions; 232 } 233 234 private boolean ensureReadOpen() throws ClosedChannelException { 235 synchronized (stateLock) { 236 if (!isOpen()) 237 throw new ClosedChannelException(); 238 if (!isConnected()) 239 throw new NotYetConnectedException(); 240 if (!isInputOpen) 241 return false; 242 else 243 return true; 244 } 245 } 246 247 private void ensureWriteOpen() throws ClosedChannelException { 248 synchronized (stateLock) { 249 if (!isOpen()) 250 throw new ClosedChannelException(); 251 if (!isOutputOpen) 252 throw new ClosedChannelException(); 253 if (!isConnected()) 254 throw new NotYetConnectedException(); 255 } 256 } 257 258 private void readerCleanup() throws IOException { 259 synchronized (stateLock) { 260 readerThread = 0; 261 if (state == ST_KILLPENDING) 262 kill(); 263 } 264 } 265 266 private void writerCleanup() throws IOException { 267 synchronized (stateLock) { 268 writerThread = 0; 269 if (state == ST_KILLPENDING) 270 kill(); 271 } 272 } 273 274 public int read(ByteBuffer buf) throws IOException { 275 276 if (buf == null) 277 throw new NullPointerException(); 278 279 synchronized (readLock) { 280 if (!ensureReadOpen()) 281 return -1; 282 IoTraceContext traceContext = null; 283 if (isBlocking()) { 284 traceContext = IoTrace.socketReadBegin(remoteAddress.getAddress(), 285 remoteAddress.getPort(), 0); 286 } 287 int n = 0; 288 try { 289 290 // Set up the interruption machinery; see 291 // AbstractInterruptibleChannel for details 292 // 293 begin(); 294 295 synchronized (stateLock) { 296 if (!isOpen()) { 297 // Either the current thread is already interrupted, so 298 // begin() closed the channel, or another thread closed the 299 // channel since we checked it a few bytecodes ago. In 300 // either case the value returned here is irrelevant since 301 // the invocation of end() in the finally block will throw 302 // an appropriate exception. 303 // 304 return 0; 305 306 } 307 308 // Save this thread so that it can be signalled on those 309 // platforms that require it 310 // 311 readerThread = NativeThread.current(); 312 } 313 314 // Between the previous test of isOpen() and the return of the 315 // IOUtil.read invocation below, this channel might be closed 316 // or this thread might be interrupted. We rely upon the 317 // implicit synchronization point in the kernel read() call to 318 // make sure that the right thing happens. In either case the 319 // implCloseSelectableChannel method is ultimately invoked in 320 // some other thread, so there are three possibilities: 321 // 322 // - implCloseSelectableChannel() invokes nd.preClose() 323 // before this thread invokes read(), in which case the 324 // read returns immediately with either EOF or an error, 325 // the latter of which will cause an IOException to be 326 // thrown. 327 // 328 // - implCloseSelectableChannel() invokes nd.preClose() after 329 // this thread is blocked in read(). On some operating 330 // systems (e.g., Solaris and Windows) this causes the read 331 // to return immediately with either EOF or an error 332 // indication. 333 // 334 // - implCloseSelectableChannel() invokes nd.preClose() after 335 // this thread is blocked in read() but the operating 336 // system (e.g., Linux) doesn't support preemptive close, 337 // so implCloseSelectableChannel() proceeds to signal this 338 // thread, thereby causing the read to return immediately 339 // with IOStatus.INTERRUPTED. 340 // 341 // In all three cases the invocation of end() in the finally 342 // clause will notice that the channel has been closed and 343 // throw an appropriate exception (AsynchronousCloseException 344 // or ClosedByInterruptException) if necessary. 345 // 346 // *There is A fourth possibility. implCloseSelectableChannel() 347 // invokes nd.preClose(), signals reader/writer thred and quickly 348 // moves on to nd.close() in kill(), which does a real close. 349 // Then a third thread accepts a new connection, opens file or 350 // whatever that causes the released "fd" to be recycled. All 351 // above happens just between our last isOpen() check and the 352 // next kernel read reached, with the recycled "fd". The solution 353 // is to postpone the real kill() if there is a reader or/and 354 // writer thread(s) over there "waiting", leave the cleanup/kill 355 // to the reader or writer thread. (the preClose() still happens 356 // so the connection gets cut off as usual). 357 // 358 // For socket channels there is the additional wrinkle that 359 // asynchronous shutdown works much like asynchronous close, 360 // except that the channel is shutdown rather than completely 361 // closed. This is analogous to the first two cases above, 362 // except that the shutdown operation plays the role of 363 // nd.preClose(). 364 for (;;) { 365 n = IOUtil.read(fd, buf, -1, nd, readLock); 366 if ((n == IOStatus.INTERRUPTED) && isOpen()) { 367 // The system call was interrupted but the channel 368 // is still open, so retry 369 continue; 370 } 371 return IOStatus.normalize(n); 372 } 373 374 } finally { 375 readerCleanup(); // Clear reader thread 376 377 if (isBlocking()) { 378 IoTrace.socketReadEnd(traceContext, n > 0 ? n : 0); 379 } 380 381 // The end method, which is defined in our superclass 382 // AbstractInterruptibleChannel, resets the interruption 383 // machinery. If its argument is true then it returns 384 // normally; otherwise it checks the interrupt and open state 385 // of this channel and throws an appropriate exception if 386 // necessary. 387 // 388 // So, if we actually managed to do any I/O in the above try 389 // block then we pass true to the end method. We also pass 390 // true if the channel was in non-blocking mode when the I/O 391 // operation was initiated but no data could be transferred; 392 // this prevents spurious exceptions from being thrown in the 393 // rare event that a channel is closed or a thread is 394 // interrupted at the exact moment that a non-blocking I/O 395 // request is made. 396 // 397 end(n > 0 || (n == IOStatus.UNAVAILABLE)); 398 399 // Extra case for socket channels: Asynchronous shutdown 400 // 401 synchronized (stateLock) { 402 if ((n <= 0) && (!isInputOpen)) 403 return IOStatus.EOF; 404 } 405 406 assert IOStatus.check(n); 407 408 } 409 } 410 } 411 412 public long read(ByteBuffer[] dsts, int offset, int length) 413 throws IOException 414 { 415 if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) 416 throw new IndexOutOfBoundsException(); 417 synchronized (readLock) { 418 if (!ensureReadOpen()) 419 return -1; 420 long n = 0; 421 IoTraceContext traceContext = null; 422 if (isBlocking()) { 423 traceContext = IoTrace.socketReadBegin(remoteAddress.getAddress(), 424 remoteAddress.getPort(), 0); 425 } 426 try { 427 begin(); 428 synchronized (stateLock) { 429 if (!isOpen()) 430 return 0; 431 readerThread = NativeThread.current(); 432 } 433 434 for (;;) { 435 n = IOUtil.read(fd, dsts, offset, length, nd); 436 if ((n == IOStatus.INTERRUPTED) && isOpen()) 437 continue; 438 return IOStatus.normalize(n); 439 } 440 } finally { 441 readerCleanup(); 442 if (isBlocking()) { 443 IoTrace.socketReadEnd(traceContext, n > 0 ? n : 0); 444 } 445 end(n > 0 || (n == IOStatus.UNAVAILABLE)); 446 synchronized (stateLock) { 447 if ((n <= 0) && (!isInputOpen)) 448 return IOStatus.EOF; 449 } 450 assert IOStatus.check(n); 451 } 452 } 453 } 454 455 public int write(ByteBuffer buf) throws IOException { 456 if (buf == null) 457 throw new NullPointerException(); 458 synchronized (writeLock) { 459 ensureWriteOpen(); 460 int n = 0; 461 IoTraceContext traceContext = 462 IoTrace.socketWriteBegin(remoteAddress.getAddress(), 463 remoteAddress.getPort()); 464 465 try { 466 begin(); 467 synchronized (stateLock) { 468 if (!isOpen()) 469 return 0; 470 writerThread = NativeThread.current(); 471 } 472 for (;;) { 473 n = IOUtil.write(fd, buf, -1, nd, writeLock); 474 if ((n == IOStatus.INTERRUPTED) && isOpen()) 475 continue; 476 return IOStatus.normalize(n); 477 } 478 } finally { 479 writerCleanup(); 480 IoTrace.socketWriteEnd(traceContext, n > 0 ? n : 0); 481 end(n > 0 || (n == IOStatus.UNAVAILABLE)); 482 synchronized (stateLock) { 483 if ((n <= 0) && (!isOutputOpen)) 484 throw new AsynchronousCloseException(); 485 } 486 assert IOStatus.check(n); 487 } 488 } 489 } 490 491 public long write(ByteBuffer[] srcs, int offset, int length) 492 throws IOException 493 { 494 if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) 495 throw new IndexOutOfBoundsException(); 496 synchronized (writeLock) { 497 ensureWriteOpen(); 498 long n = 0; 499 IoTraceContext traceContext = 500 IoTrace.socketWriteBegin(remoteAddress.getAddress(), 501 remoteAddress.getPort()); 502 try { 503 begin(); 504 synchronized (stateLock) { 505 if (!isOpen()) 506 return 0; 507 writerThread = NativeThread.current(); 508 } 509 for (;;) { 510 n = IOUtil.write(fd, srcs, offset, length, nd); 511 if ((n == IOStatus.INTERRUPTED) && isOpen()) 512 continue; 513 return IOStatus.normalize(n); 514 } 515 } finally { 516 writerCleanup(); 517 IoTrace.socketWriteEnd(traceContext, n > 0 ? n : 0); 518 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 519 synchronized (stateLock) { 520 if ((n <= 0) && (!isOutputOpen)) 521 throw new AsynchronousCloseException(); 522 } 523 assert IOStatus.check(n); 524 } 525 } 526 } 527 528 // package-private 529 int sendOutOfBandData(byte b) throws IOException { 530 synchronized (writeLock) { 531 ensureWriteOpen(); 532 int n = 0; 533 try { 534 begin(); 535 synchronized (stateLock) { 536 if (!isOpen()) 537 return 0; 538 writerThread = NativeThread.current(); 539 } 540 for (;;) { 541 n = sendOutOfBandData(fd, b); 542 if ((n == IOStatus.INTERRUPTED) && isOpen()) 543 continue; 544 return IOStatus.normalize(n); 545 } 546 } finally { 547 writerCleanup(); 548 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 549 synchronized (stateLock) { 550 if ((n <= 0) && (!isOutputOpen)) 551 throw new AsynchronousCloseException(); 552 } 553 assert IOStatus.check(n); 554 } 555 } 556 } 557 558 protected void implConfigureBlocking(boolean block) throws IOException { 559 IOUtil.configureBlocking(fd, block); 560 } 561 562 public SocketAddress localAddress() { 563 synchronized (stateLock) { 564 return localAddress; 565 } 566 } 567 568 public SocketAddress remoteAddress() { 569 synchronized (stateLock) { 570 return remoteAddress; 571 } 572 } 573 574 @Override 575 public SocketChannel bind(SocketAddress local) throws IOException { 576 synchronized (readLock) { 577 synchronized (writeLock) { 578 synchronized (stateLock) { 579 if (!isOpen()) 580 throw new ClosedChannelException(); 581 if (state == ST_PENDING) 582 throw new ConnectionPendingException(); 583 if (localAddress != null) 584 throw new AlreadyBoundException(); 585 InetSocketAddress isa = (local == null) ? 586 new InetSocketAddress(0) : Net.checkAddress(local); 587 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); 588 Net.bind(fd, isa.getAddress(), isa.getPort()); 589 localAddress = Net.localAddress(fd); 590 } 591 } 592 } 593 return this; 594 } 595 596 public boolean isConnected() { 597 synchronized (stateLock) { 598 return (state == ST_CONNECTED); 599 } 600 } 601 602 public boolean isConnectionPending() { 603 synchronized (stateLock) { 604 return (state == ST_PENDING); 605 } 606 } 607 608 void ensureOpenAndUnconnected() throws IOException { // package-private 609 synchronized (stateLock) { 610 if (!isOpen()) 611 throw new ClosedChannelException(); 612 if (state == ST_CONNECTED) 613 throw new AlreadyConnectedException(); 614 if (state == ST_PENDING) 615 throw new ConnectionPendingException(); 616 } 617 } 618 619 public boolean connect(SocketAddress sa) throws IOException { 620 int localPort = 0; 621 622 synchronized (readLock) { 623 synchronized (writeLock) { 624 ensureOpenAndUnconnected(); 625 InetSocketAddress isa = Net.checkAddress(sa); 626 SecurityManager sm = System.getSecurityManager(); 627 if (sm != null) 628 sm.checkConnect(isa.getAddress().getHostAddress(), 629 isa.getPort()); 630 synchronized (blockingLock()) { 631 int n = 0; 632 try { 633 try { 634 begin(); 635 synchronized (stateLock) { 636 if (!isOpen()) { 637 return false; 638 } 639 // notify hook only if unbound 640 if (localAddress == null) { 641 NetHooks.beforeTcpConnect(fd, 642 isa.getAddress(), 643 isa.getPort()); 644 } 645 readerThread = NativeThread.current(); 646 } 647 for (;;) { 648 InetAddress ia = isa.getAddress(); 649 if (ia.isAnyLocalAddress()) 650 ia = InetAddress.getLocalHost(); 651 n = Net.connect(fd, 652 ia, 653 isa.getPort()); 654 if ( (n == IOStatus.INTERRUPTED) 655 && isOpen()) 656 continue; 657 break; 658 } 659 660 } finally { 661 readerCleanup(); 662 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 663 assert IOStatus.check(n); 664 } 665 } catch (IOException x) { 666 // If an exception was thrown, close the channel after 667 // invoking end() so as to avoid bogus 668 // AsynchronousCloseExceptions 669 close(); 670 throw x; 671 } 672 synchronized (stateLock) { 673 remoteAddress = isa; 674 if (n > 0) { 675 676 // Connection succeeded; disallow further 677 // invocation 678 state = ST_CONNECTED; 679 if (isOpen()) 680 localAddress = Net.localAddress(fd); 681 return true; 682 } 683 // If nonblocking and no exception then connection 684 // pending; disallow another invocation 685 if (!isBlocking()) 686 state = ST_PENDING; 687 else 688 assert false; 689 } 690 } 691 return false; 692 } 693 } 694 } 695 696 public boolean finishConnect() throws IOException { 697 synchronized (readLock) { 698 synchronized (writeLock) { 699 synchronized (stateLock) { 700 if (!isOpen()) 701 throw new ClosedChannelException(); 702 if (state == ST_CONNECTED) 703 return true; 704 if (state != ST_PENDING) 705 throw new NoConnectionPendingException(); 706 } 707 int n = 0; 708 try { 709 try { 710 begin(); 711 synchronized (blockingLock()) { 712 synchronized (stateLock) { 713 if (!isOpen()) { 714 return false; 715 } 716 readerThread = NativeThread.current(); 717 } 718 if (!isBlocking()) { 719 for (;;) { 720 n = checkConnect(fd, false, 721 readyToConnect); 722 if ( (n == IOStatus.INTERRUPTED) 723 && isOpen()) 724 continue; 725 break; 726 } 727 } else { 728 for (;;) { 729 n = checkConnect(fd, true, 730 readyToConnect); 731 if (n == 0) { 732 // Loop in case of 733 // spurious notifications 734 continue; 735 } 736 if ( (n == IOStatus.INTERRUPTED) 737 && isOpen()) 738 continue; 739 break; 740 } 741 } 742 } 743 } finally { 744 synchronized (stateLock) { 745 readerThread = 0; 746 if (state == ST_KILLPENDING) { 747 kill(); 748 // poll()/getsockopt() does not report 749 // error (throws exception, with n = 0) 750 // on Linux platform after dup2 and 751 // signal-wakeup. Force n to 0 so the 752 // end() can throw appropriate exception 753 n = 0; 754 } 755 } 756 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 757 assert IOStatus.check(n); 758 } 759 } catch (IOException x) { 760 // If an exception was thrown, close the channel after 761 // invoking end() so as to avoid bogus 762 // AsynchronousCloseExceptions 763 close(); 764 throw x; 765 } 766 if (n > 0) { 767 synchronized (stateLock) { 768 state = ST_CONNECTED; 769 if (isOpen()) 770 localAddress = Net.localAddress(fd); 771 } 772 return true; 773 } 774 return false; 775 } 776 } 777 } 778 779 @Override 780 public SocketChannel shutdownInput() throws IOException { 781 synchronized (stateLock) { 782 if (!isOpen()) 783 throw new ClosedChannelException(); 784 if (!isConnected()) 785 throw new NotYetConnectedException(); 786 if (isInputOpen) { 787 Net.shutdown(fd, Net.SHUT_RD); 788 if (readerThread != 0) 789 NativeThread.signal(readerThread); 790 isInputOpen = false; 791 } 792 return this; 793 } 794 } 795 796 @Override 797 public SocketChannel shutdownOutput() throws IOException { 798 synchronized (stateLock) { 799 if (!isOpen()) 800 throw new ClosedChannelException(); 801 if (!isConnected()) 802 throw new NotYetConnectedException(); 803 if (isOutputOpen) { 804 Net.shutdown(fd, Net.SHUT_WR); 805 if (writerThread != 0) 806 NativeThread.signal(writerThread); 807 isOutputOpen = false; 808 } 809 return this; 810 } 811 } 812 813 public boolean isInputOpen() { 814 synchronized (stateLock) { 815 return isInputOpen; 816 } 817 } 818 819 public boolean isOutputOpen() { 820 synchronized (stateLock) { 821 return isOutputOpen; 822 } 823 } 824 825 // AbstractInterruptibleChannel synchronizes invocations of this method 826 // using AbstractInterruptibleChannel.closeLock, and also ensures that this 827 // method is only ever invoked once. Before we get to this method, isOpen 828 // (which is volatile) will have been set to false. 829 // 830 protected void implCloseSelectableChannel() throws IOException { 831 synchronized (stateLock) { 832 isInputOpen = false; 833 isOutputOpen = false; 834 835 // Close the underlying file descriptor and dup it to a known fd 836 // that's already closed. This prevents other operations on this 837 // channel from using the old fd, which might be recycled in the 838 // meantime and allocated to an entirely different channel. 839 // 840 if (state != ST_KILLED) 841 nd.preClose(fd); 842 843 // Signal native threads, if needed. If a target thread is not 844 // currently blocked in an I/O operation then no harm is done since 845 // the signal handler doesn't actually do anything. 846 // 847 if (readerThread != 0) 848 NativeThread.signal(readerThread); 849 850 if (writerThread != 0) 851 NativeThread.signal(writerThread); 852 853 // If this channel is not registered then it's safe to close the fd 854 // immediately since we know at this point that no thread is 855 // blocked in an I/O operation upon the channel and, since the 856 // channel is marked closed, no thread will start another such 857 // operation. If this channel is registered then we don't close 858 // the fd since it might be in use by a selector. In that case 859 // closing this channel caused its keys to be cancelled, so the 860 // last selector to deregister a key for this channel will invoke 861 // kill() to close the fd. 862 // 863 if (!isRegistered()) 864 kill(); 865 } 866 } 867 868 public void kill() throws IOException { 869 synchronized (stateLock) { 870 if (state == ST_KILLED) 871 return; 872 if (state == ST_UNINITIALIZED) { 873 state = ST_KILLED; 874 return; 875 } 876 assert !isOpen() && !isRegistered(); 877 878 // Postpone the kill if there is a waiting reader 879 // or writer thread. See the comments in read() for 880 // more detailed explanation. 881 if (readerThread == 0 && writerThread == 0) { 882 nd.close(fd); 883 state = ST_KILLED; 884 } else { 885 state = ST_KILLPENDING; 886 } 887 } 888 } 889 890 /** 891 * Translates native poll revent ops into a ready operation ops 892 */ 893 public boolean translateReadyOps(int ops, int initialOps, 894 SelectionKeyImpl sk) { 895 int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes 896 int oldOps = sk.nioReadyOps(); 897 int newOps = initialOps; 898 899 if ((ops & PollArrayWrapper.POLLNVAL) != 0) { 900 // This should only happen if this channel is pre-closed while a 901 // selection operation is in progress 902 // ## Throw an error if this channel has not been pre-closed 903 return false; 904 } 905 906 if ((ops & (PollArrayWrapper.POLLERR 907 | PollArrayWrapper.POLLHUP)) != 0) { 908 newOps = intOps; 909 sk.nioReadyOps(newOps); 910 // No need to poll again in checkConnect, 911 // the error will be detected there 912 readyToConnect = true; 913 return (newOps & ~oldOps) != 0; 914 } 915 916 if (((ops & PollArrayWrapper.POLLIN) != 0) && 917 ((intOps & SelectionKey.OP_READ) != 0) && 918 (state == ST_CONNECTED)) 919 newOps |= SelectionKey.OP_READ; 920 921 if (((ops & PollArrayWrapper.POLLCONN) != 0) && 922 ((intOps & SelectionKey.OP_CONNECT) != 0) && 923 ((state == ST_UNCONNECTED) || (state == ST_PENDING))) { 924 newOps |= SelectionKey.OP_CONNECT; 925 readyToConnect = true; 926 } 927 928 if (((ops & PollArrayWrapper.POLLOUT) != 0) && 929 ((intOps & SelectionKey.OP_WRITE) != 0) && 930 (state == ST_CONNECTED)) 931 newOps |= SelectionKey.OP_WRITE; 932 933 sk.nioReadyOps(newOps); 934 return (newOps & ~oldOps) != 0; 935 } 936 937 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 938 return translateReadyOps(ops, sk.nioReadyOps(), sk); 939 } 940 941 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 942 return translateReadyOps(ops, 0, sk); 943 } 944 945 // package-private 946 int poll(int events, long timeout) throws IOException { 947 assert Thread.holdsLock(blockingLock()) && !isBlocking(); 948 949 synchronized (readLock) { 950 int n = 0; 951 try { 952 begin(); 953 synchronized (stateLock) { 954 if (!isOpen()) 955 return 0; 956 readerThread = NativeThread.current(); 957 } 958 n = Net.poll(fd, events, timeout); 959 } finally { 960 readerCleanup(); 961 end(n > 0); 962 } 963 return n; 964 } 965 } 966 967 /** 968 * Translates an interest operation set into a native poll event set 969 */ 970 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { 971 int newOps = 0; 972 if ((ops & SelectionKey.OP_READ) != 0) 973 newOps |= PollArrayWrapper.POLLIN; 974 if ((ops & SelectionKey.OP_WRITE) != 0) 975 newOps |= PollArrayWrapper.POLLOUT; 976 if ((ops & SelectionKey.OP_CONNECT) != 0) 977 newOps |= PollArrayWrapper.POLLCONN; 978 sk.selector.putEventOps(sk, newOps); 979 } 980 981 public FileDescriptor getFD() { 982 return fd; 983 } 984 985 public int getFDVal() { 986 return fdVal; 987 } 988 989 public String toString() { 990 StringBuffer sb = new StringBuffer(); 991 sb.append(this.getClass().getSuperclass().getName()); 992 sb.append('['); 993 if (!isOpen()) 994 sb.append("closed"); 995 else { 996 synchronized (stateLock) { 997 switch (state) { 998 case ST_UNCONNECTED: 999 sb.append("unconnected"); 1000 break; 1001 case ST_PENDING: 1002 sb.append("connection-pending"); 1003 break; 1004 case ST_CONNECTED: 1005 sb.append("connected"); 1006 if (!isInputOpen) 1007 sb.append(" ishut"); 1008 if (!isOutputOpen) 1009 sb.append(" oshut"); 1010 break; 1011 } 1012 if (localAddress() != null) { 1013 sb.append(" local="); 1014 sb.append(localAddress().toString()); 1015 } 1016 if (remoteAddress() != null) { 1017 sb.append(" remote="); 1018 sb.append(remoteAddress().toString()); 1019 } 1020 } 1021 } 1022 sb.append(']'); 1023 return sb.toString(); 1024 } 1025 1026 1027 // -- Native methods -- 1028 1029 private static native int checkConnect(FileDescriptor fd, 1030 boolean block, boolean ready) 1031 throws IOException; 1032 1033 private static native int sendOutOfBandData(FileDescriptor fd, byte data) 1034 throws IOException; 1035 1036 static { 1037 Util.load(); 1038 nd = new SocketDispatcher(); 1039 } 1040 1041 }