1 /* 2 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.net.InetAddress; 28 import java.net.SocketAddress; 29 import java.net.SocketException; 30 import java.net.InetSocketAddress; 31 import java.io.FileDescriptor; 32 import java.io.IOException; 33 import java.util.Collections; 34 import java.util.Set; 35 import java.util.HashSet; 36 import java.nio.ByteBuffer; 37 import java.nio.channels.SelectionKey; 38 import java.nio.channels.ClosedChannelException; 39 import java.nio.channels.ConnectionPendingException; 40 import java.nio.channels.NoConnectionPendingException; 41 import java.nio.channels.AlreadyConnectedException; 42 import java.nio.channels.NotYetBoundException; 43 import java.nio.channels.NotYetConnectedException; 44 import java.nio.channels.spi.SelectorProvider; 45 import com.sun.nio.sctp.AbstractNotificationHandler; 46 import com.sun.nio.sctp.Association; 47 import com.sun.nio.sctp.AssociationChangeNotification; 48 import com.sun.nio.sctp.HandlerResult; 49 import com.sun.nio.sctp.IllegalReceiveException; 50 import com.sun.nio.sctp.InvalidStreamException; 51 import com.sun.nio.sctp.IllegalUnbindException; 52 import com.sun.nio.sctp.MessageInfo; 53 import com.sun.nio.sctp.NotificationHandler; 54 import com.sun.nio.sctp.SctpChannel; 55 import com.sun.nio.sctp.SctpSocketOption; 56 import sun.nio.ch.PollArrayWrapper; 57 import sun.nio.ch.SelChImpl; 58 import static com.sun.nio.sctp.SctpStandardSocketOptions.*; 59 import static sun.nio.ch.SctpResultContainer.SEND_FAILED; 60 import static sun.nio.ch.SctpResultContainer.ASSOCIATION_CHANGED; 61 import static sun.nio.ch.SctpResultContainer.PEER_ADDRESS_CHANGED; 62 import static sun.nio.ch.SctpResultContainer.SHUTDOWN; 63 64 /** 65 * An implementation of an SctpChannel 66 */ 67 public class SctpChannelImpl extends SctpChannel 68 implements SelChImpl 69 { 70 private final FileDescriptor fd; 71 72 private final int fdVal; 73 74 /* IDs of native threads doing send and receivess, for signalling */ 75 private volatile long receiverThread = 0; 76 private volatile long senderThread = 0; 77 78 /* Lock held by current receiving or connecting thread */ 79 private final Object receiveLock = new Object(); 80 81 /* Lock held by current sending or connecting thread */ 82 private final Object sendLock = new Object(); 83 84 private final ThreadLocal<Boolean> receiveInvoked = 85 new ThreadLocal<Boolean>() { 86 @Override protected Boolean initialValue() { 87 return Boolean.FALSE; 88 } 89 }; 90 91 /* Lock held by any thread that modifies the state fields declared below 92 DO NOT invoke a blocking I/O operation while holding this lock! */ 93 private final Object stateLock = new Object(); 94 95 private enum ChannelState { 96 UNINITIALIZED, 97 UNCONNECTED, 98 PENDING, 99 CONNECTED, 100 KILLPENDING, 101 KILLED, 102 } 103 /* -- The following fields are protected by stateLock -- */ 104 private ChannelState state = ChannelState.UNINITIALIZED; 105 106 /* Binding; Once bound the port will remain constant. */ 107 int port = -1; 108 private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>(); 109 /* Has the channel been bound to the wildcard address */ 110 private boolean wildcard; /* false */ 111 //private InetSocketAddress remoteAddress = null; 112 113 /* Input/Output open */ 114 private boolean readyToConnect; 115 116 /* Shutdown */ 117 private boolean isShutdown; 118 119 private Association association; 120 121 private Set<SocketAddress> remoteAddresses = Collections.EMPTY_SET; 122 123 /* -- End of fields protected by stateLock -- */ 124 125 /** 126 * Constructor for normal connecting sockets 127 */ 128 public SctpChannelImpl(SelectorProvider provider) throws IOException { 129 //TODO: update provider remove public modifier 130 super(provider); 131 this.fd = SctpNet.socket(true); 132 this.fdVal = IOUtil.fdVal(fd); 133 this.state = ChannelState.UNCONNECTED; 134 } 135 136 /** 137 * Constructor for sockets obtained from server sockets 138 */ 139 public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd) 140 throws IOException { 141 this(provider, fd, null); 142 } 143 144 /** 145 * Constructor for sockets obtained from branching 146 */ 147 public SctpChannelImpl(SelectorProvider provider, 148 FileDescriptor fd, 149 Association association) 150 throws IOException { 151 super(provider); 152 this.fd = fd; 153 this.fdVal = IOUtil.fdVal(fd); 154 this.state = ChannelState.CONNECTED; 155 port = (Net.localAddress(fd)).getPort(); 156 157 if (association != null) { /* branched */ 158 this.association = association; 159 } else { /* obtained from server channel */ 160 /* Receive COMM_UP */ 161 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 162 try { 163 receive(buf, null, null, true); 164 } finally { 165 Util.releaseTemporaryDirectBuffer(buf); 166 } 167 } 168 } 169 170 /** 171 * Binds the channel's socket to a local address. 172 */ 173 @Override 174 public SctpChannel bind(SocketAddress local) throws IOException { 175 synchronized (receiveLock) { 176 synchronized (sendLock) { 177 synchronized (stateLock) { 178 ensureOpenAndUnconnected(); 179 if (isBound()) 180 SctpNet.throwAlreadyBoundException(); 181 InetSocketAddress isa = (local == null) ? 182 new InetSocketAddress(0) : Net.checkAddress(local); 183 Net.bind(fd, isa.getAddress(), isa.getPort()); 184 InetSocketAddress boundIsa = Net.localAddress(fd); 185 port = boundIsa.getPort(); 186 localAddresses.add(isa); 187 if (isa.getAddress().isAnyLocalAddress()) 188 wildcard = true; 189 } 190 } 191 } 192 return this; 193 } 194 195 @Override 196 public SctpChannel bindAddress(InetAddress address) 197 throws IOException { 198 bindUnbindAddress(address, true); 199 localAddresses.add(new InetSocketAddress(address, port)); 200 return this; 201 } 202 203 @Override 204 public SctpChannel unbindAddress(InetAddress address) 205 throws IOException { 206 bindUnbindAddress(address, false); 207 localAddresses.remove(new InetSocketAddress(address, port)); 208 return this; 209 } 210 211 private SctpChannel bindUnbindAddress(InetAddress address, boolean add) 212 throws IOException { 213 if (address == null) 214 throw new IllegalArgumentException(); 215 216 synchronized (receiveLock) { 217 synchronized (sendLock) { 218 synchronized (stateLock) { 219 if (!isOpen()) 220 throw new ClosedChannelException(); 221 if (!isBound()) 222 throw new NotYetBoundException(); 223 if (wildcard) 224 throw new IllegalStateException( 225 "Cannot add or remove addresses from a channel that is bound to the wildcard address"); 226 if (address.isAnyLocalAddress()) 227 throw new IllegalArgumentException( 228 "Cannot add or remove the wildcard address"); 229 if (add) { 230 for (InetSocketAddress addr : localAddresses) { 231 if (addr.getAddress().equals(address)) { 232 SctpNet.throwAlreadyBoundException(); 233 } 234 } 235 } else { /*removing */ 236 /* Verify that there is more than one address 237 * and that address is already bound */ 238 if (localAddresses.size() <= 1) 239 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound"); 240 boolean foundAddress = false; 241 for (InetSocketAddress addr : localAddresses) { 242 if (addr.getAddress().equals(address)) { 243 foundAddress = true; 244 break; 245 } 246 } 247 if (!foundAddress ) 248 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address"); 249 } 250 251 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add); 252 253 /* Update our internal Set to reflect the addition/removal */ 254 if (add) 255 localAddresses.add(new InetSocketAddress(address, port)); 256 else { 257 for (InetSocketAddress addr : localAddresses) { 258 if (addr.getAddress().equals(address)) { 259 localAddresses.remove(addr); 260 break; 261 } 262 } 263 } 264 } 265 } 266 } 267 return this; 268 } 269 270 private boolean isBound() { 271 synchronized (stateLock) { 272 return port == -1 ? false : true; 273 } 274 } 275 276 private boolean isConnected() { 277 synchronized (stateLock) { 278 return (state == ChannelState.CONNECTED); 279 } 280 } 281 282 private void ensureOpenAndUnconnected() throws IOException { 283 synchronized (stateLock) { 284 if (!isOpen()) 285 throw new ClosedChannelException(); 286 if (isConnected()) 287 throw new AlreadyConnectedException(); 288 if (state == ChannelState.PENDING) 289 throw new ConnectionPendingException(); 290 } 291 } 292 293 private boolean ensureReceiveOpen() throws ClosedChannelException { 294 synchronized (stateLock) { 295 if (!isOpen()) 296 throw new ClosedChannelException(); 297 if (!isConnected()) 298 throw new NotYetConnectedException(); 299 else 300 return true; 301 } 302 } 303 304 private void ensureSendOpen() throws ClosedChannelException { 305 synchronized (stateLock) { 306 if (!isOpen()) 307 throw new ClosedChannelException(); 308 if (isShutdown) 309 throw new ClosedChannelException(); 310 if (!isConnected()) 311 throw new NotYetConnectedException(); 312 } 313 } 314 315 private void receiverCleanup() throws IOException { 316 synchronized (stateLock) { 317 receiverThread = 0; 318 if (state == ChannelState.KILLPENDING) 319 kill(); 320 } 321 } 322 323 private void senderCleanup() throws IOException { 324 synchronized (stateLock) { 325 senderThread = 0; 326 if (state == ChannelState.KILLPENDING) 327 kill(); 328 } 329 } 330 331 @Override 332 public Association association() throws ClosedChannelException { 333 synchronized (stateLock) { 334 if (!isOpen()) 335 throw new ClosedChannelException(); 336 if (!isConnected()) 337 return null; 338 339 return association; 340 } 341 } 342 343 @Override 344 public boolean connect(SocketAddress endpoint) throws IOException { 345 synchronized (receiveLock) { 346 synchronized (sendLock) { 347 ensureOpenAndUnconnected(); 348 InetSocketAddress isa = Net.checkAddress(endpoint); 349 SecurityManager sm = System.getSecurityManager(); 350 if (sm != null) 351 sm.checkConnect(isa.getAddress().getHostAddress(), 352 isa.getPort()); 353 synchronized (blockingLock()) { 354 int n = 0; 355 try { 356 try { 357 begin(); 358 synchronized (stateLock) { 359 if (!isOpen()) { 360 return false; 361 } 362 receiverThread = NativeThread.current(); 363 } 364 for (;;) { 365 InetAddress ia = isa.getAddress(); 366 if (ia.isAnyLocalAddress()) 367 ia = InetAddress.getLocalHost(); 368 n = SctpNet.connect(fdVal, ia, isa.getPort()); 369 if ( (n == IOStatus.INTERRUPTED) 370 && isOpen()) 371 continue; 372 break; 373 } 374 } finally { 375 receiverCleanup(); 376 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 377 assert IOStatus.check(n); 378 } 379 } catch (IOException x) { 380 /* If an exception was thrown, close the channel after 381 * invoking end() so as to avoid bogus 382 * AsynchronousCloseExceptions */ 383 close(); 384 throw x; 385 } 386 387 if (n > 0) { 388 synchronized (stateLock) { 389 /* Connection succeeded */ 390 state = ChannelState.CONNECTED; 391 if (!isBound()) { 392 InetSocketAddress boundIsa = 393 Net.localAddress(fd); 394 port = boundIsa.getPort(); 395 } 396 397 /* Receive COMM_UP */ 398 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 399 try { 400 receive(buf, null, null, true); 401 } finally { 402 Util.releaseTemporaryDirectBuffer(buf); 403 } 404 405 /* cache remote addresses */ 406 try { 407 remoteAddresses = getRemoteAddresses(); 408 } catch (IOException unused) { /* swallow exception */ } 409 410 return true; 411 } 412 } else { 413 synchronized (stateLock) { 414 /* If nonblocking and no exception then connection 415 * pending; disallow another invocation */ 416 if (!isBlocking()) 417 state = ChannelState.PENDING; 418 else 419 assert false; 420 } 421 } 422 } 423 return false; 424 } 425 } 426 } 427 428 @Override 429 public boolean connect(SocketAddress endpoint, 430 int maxOutStreams, 431 int maxInStreams) 432 throws IOException { 433 ensureOpenAndUnconnected(); 434 return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams. 435 create(maxInStreams, maxOutStreams)).connect(endpoint); 436 437 } 438 439 @Override 440 public boolean isConnectionPending() { 441 synchronized (stateLock) { 442 return (state == ChannelState.PENDING); 443 } 444 } 445 446 @Override 447 public boolean finishConnect() throws IOException { 448 synchronized (receiveLock) { 449 synchronized (sendLock) { 450 synchronized (stateLock) { 451 if (!isOpen()) 452 throw new ClosedChannelException(); 453 if (isConnected()) 454 return true; 455 if (state != ChannelState.PENDING) 456 throw new NoConnectionPendingException(); 457 } 458 int n = 0; 459 try { 460 try { 461 begin(); 462 synchronized (blockingLock()) { 463 synchronized (stateLock) { 464 if (!isOpen()) { 465 return false; 466 } 467 receiverThread = NativeThread.current(); 468 } 469 if (!isBlocking()) { 470 for (;;) { 471 n = checkConnect(fd, false, readyToConnect); 472 if ( (n == IOStatus.INTERRUPTED) 473 && isOpen()) 474 continue; 475 break; 476 } 477 } else { 478 for (;;) { 479 n = checkConnect(fd, true, readyToConnect); 480 if (n == 0) { 481 // Loop in case of 482 // spurious notifications 483 continue; 484 } 485 if ( (n == IOStatus.INTERRUPTED) 486 && isOpen()) 487 continue; 488 break; 489 } 490 } 491 } 492 } finally { 493 synchronized (stateLock) { 494 receiverThread = 0; 495 if (state == ChannelState.KILLPENDING) { 496 kill(); 497 /* poll()/getsockopt() does not report 498 * error (throws exception, with n = 0) 499 * on Linux platform after dup2 and 500 * signal-wakeup. Force n to 0 so the 501 * end() can throw appropriate exception */ 502 n = 0; 503 } 504 } 505 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 506 assert IOStatus.check(n); 507 } 508 } catch (IOException x) { 509 /* If an exception was thrown, close the channel after 510 * invoking end() so as to avoid bogus 511 * AsynchronousCloseExceptions */ 512 close(); 513 throw x; 514 } 515 516 if (n > 0) { 517 synchronized (stateLock) { 518 state = ChannelState.CONNECTED; 519 if (!isBound()) { 520 InetSocketAddress boundIsa = 521 Net.localAddress(fd); 522 port = boundIsa.getPort(); 523 } 524 525 /* Receive COMM_UP */ 526 ByteBuffer buf = Util.getTemporaryDirectBuffer(50); 527 try { 528 receive(buf, null, null, true); 529 } finally { 530 Util.releaseTemporaryDirectBuffer(buf); 531 } 532 533 /* cache remote addresses */ 534 try { 535 remoteAddresses = getRemoteAddresses(); 536 } catch (IOException unused) { /* swallow exception */ } 537 538 return true; 539 } 540 } 541 } 542 } 543 return false; 544 } 545 546 @Override 547 protected void implConfigureBlocking(boolean block) throws IOException { 548 IOUtil.configureBlocking(fd, block); 549 } 550 551 @Override 552 public void implCloseSelectableChannel() throws IOException { 553 synchronized (stateLock) { 554 SctpNet.preClose(fdVal); 555 556 if (receiverThread != 0) 557 NativeThread.signal(receiverThread); 558 559 if (senderThread != 0) 560 NativeThread.signal(senderThread); 561 562 if (!isRegistered()) 563 kill(); 564 } 565 } 566 567 @Override 568 public FileDescriptor getFD() { 569 return fd; 570 } 571 572 @Override 573 public int getFDVal() { 574 return fdVal; 575 } 576 577 /** 578 * Translates native poll revent ops into a ready operation ops 579 */ 580 private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) { 581 int intOps = sk.nioInterestOps(); 582 int oldOps = sk.nioReadyOps(); 583 int newOps = initialOps; 584 585 if ((ops & PollArrayWrapper.POLLNVAL) != 0) { 586 /* This should only happen if this channel is pre-closed while a 587 * selection operation is in progress 588 * ## Throw an error if this channel has not been pre-closed */ 589 return false; 590 } 591 592 if ((ops & (PollArrayWrapper.POLLERR 593 | PollArrayWrapper.POLLHUP)) != 0) { 594 newOps = intOps; 595 sk.nioReadyOps(newOps); 596 /* No need to poll again in checkConnect, 597 * the error will be detected there */ 598 readyToConnect = true; 599 return (newOps & ~oldOps) != 0; 600 } 601 602 if (((ops & PollArrayWrapper.POLLIN) != 0) && 603 ((intOps & SelectionKey.OP_READ) != 0) && 604 isConnected()) 605 newOps |= SelectionKey.OP_READ; 606 607 if (((ops & PollArrayWrapper.POLLCONN) != 0) && 608 ((intOps & SelectionKey.OP_CONNECT) != 0) && 609 ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) { 610 newOps |= SelectionKey.OP_CONNECT; 611 readyToConnect = true; 612 } 613 614 if (((ops & PollArrayWrapper.POLLOUT) != 0) && 615 ((intOps & SelectionKey.OP_WRITE) != 0) && 616 isConnected()) 617 newOps |= SelectionKey.OP_WRITE; 618 619 sk.nioReadyOps(newOps); 620 return (newOps & ~oldOps) != 0; 621 } 622 623 @Override 624 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 625 return translateReadyOps(ops, sk.nioReadyOps(), sk); 626 } 627 628 @Override 629 @SuppressWarnings("all") 630 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 631 return translateReadyOps(ops, 0, sk); 632 } 633 634 @Override 635 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { 636 int newOps = 0; 637 if ((ops & SelectionKey.OP_READ) != 0) 638 newOps |= PollArrayWrapper.POLLIN; 639 if ((ops & SelectionKey.OP_WRITE) != 0) 640 newOps |= PollArrayWrapper.POLLOUT; 641 if ((ops & SelectionKey.OP_CONNECT) != 0) 642 newOps |= PollArrayWrapper.POLLCONN; 643 sk.selector.putEventOps(sk, newOps); 644 } 645 646 @Override 647 public void kill() throws IOException { 648 synchronized (stateLock) { 649 if (state == ChannelState.KILLED) 650 return; 651 if (state == ChannelState.UNINITIALIZED) { 652 state = ChannelState.KILLED; 653 return; 654 } 655 assert !isOpen() && !isRegistered(); 656 657 /* Postpone the kill if there is a waiting reader 658 * or writer thread. */ 659 if (receiverThread == 0 && senderThread == 0) { 660 SctpNet.close(fdVal); 661 state = ChannelState.KILLED; 662 } else { 663 state = ChannelState.KILLPENDING; 664 } 665 } 666 } 667 668 @Override 669 public <T> SctpChannel setOption(SctpSocketOption<T> name, T value) 670 throws IOException { 671 if (name == null) 672 throw new NullPointerException(); 673 if (!supportedOptions().contains(name)) 674 throw new UnsupportedOperationException("'" + name + "' not supported"); 675 676 synchronized (stateLock) { 677 if (!isOpen()) 678 throw new ClosedChannelException(); 679 680 SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/); 681 } 682 return this; 683 } 684 685 @Override 686 @SuppressWarnings("unchecked") 687 public <T> T getOption(SctpSocketOption<T> name) throws IOException { 688 if (name == null) 689 throw new NullPointerException(); 690 if (!supportedOptions().contains(name)) 691 throw new UnsupportedOperationException("'" + name + "' not supported"); 692 693 synchronized (stateLock) { 694 if (!isOpen()) 695 throw new ClosedChannelException(); 696 697 return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/); 698 } 699 } 700 701 private static class DefaultOptionsHolder { 702 static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions(); 703 704 private static Set<SctpSocketOption<?>> defaultOptions() { 705 HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10); 706 set.add(SCTP_DISABLE_FRAGMENTS); 707 set.add(SCTP_EXPLICIT_COMPLETE); 708 set.add(SCTP_FRAGMENT_INTERLEAVE); 709 set.add(SCTP_INIT_MAXSTREAMS); 710 set.add(SCTP_NODELAY); 711 set.add(SCTP_PRIMARY_ADDR); 712 set.add(SCTP_SET_PEER_PRIMARY_ADDR); 713 set.add(SO_SNDBUF); 714 set.add(SO_RCVBUF); 715 set.add(SO_LINGER); 716 return Collections.unmodifiableSet(set); 717 } 718 } 719 720 @Override 721 public final Set<SctpSocketOption<?>> supportedOptions() { 722 return DefaultOptionsHolder.defaultOptions; 723 } 724 725 @Override 726 public <T> MessageInfo receive(ByteBuffer buffer, 727 T attachment, 728 NotificationHandler<T> handler) 729 throws IOException { 730 return receive(buffer, attachment, handler, false); 731 } 732 733 private <T> MessageInfo receive(ByteBuffer buffer, 734 T attachment, 735 NotificationHandler<T> handler, 736 boolean fromConnect) 737 throws IOException { 738 if (buffer == null) 739 throw new IllegalArgumentException("buffer cannot be null"); 740 741 if (buffer.isReadOnly()) 742 throw new IllegalArgumentException("Read-only buffer"); 743 744 if (receiveInvoked.get()) 745 throw new IllegalReceiveException( 746 "cannot invoke receive from handler"); 747 receiveInvoked.set(Boolean.TRUE); 748 749 try { 750 SctpResultContainer resultContainer = new SctpResultContainer(); 751 do { 752 resultContainer.clear(); 753 synchronized (receiveLock) { 754 if (!ensureReceiveOpen()) 755 return null; 756 757 int n = 0; 758 try { 759 begin(); 760 761 synchronized (stateLock) { 762 if(!isOpen()) 763 return null; 764 receiverThread = NativeThread.current(); 765 } 766 767 do { 768 n = receive(fdVal, buffer, resultContainer, fromConnect); 769 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 770 } finally { 771 receiverCleanup(); 772 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 773 assert IOStatus.check(n); 774 } 775 776 if (!resultContainer.isNotification()) { 777 /* message or nothing */ 778 if (resultContainer.hasSomething()) { 779 /* Set the association before returning */ 780 SctpMessageInfoImpl info = 781 resultContainer.getMessageInfo(); 782 synchronized (stateLock) { 783 assert association != null; 784 info.setAssociation(association); 785 } 786 return info; 787 } else 788 /* Non-blocking may return null if nothing available*/ 789 return null; 790 } else { /* notification */ 791 synchronized (stateLock) { 792 handleNotificationInternal( 793 resultContainer); 794 } 795 } 796 797 if (fromConnect) { 798 /* If we reach here, then it was connect that invoked 799 * receive and received the COMM_UP. We have already 800 * handled the COMM_UP with the internal notification 801 * handler. Simply return. */ 802 return null; 803 } 804 } /* receiveLock */ 805 } while (handler == null ? true : 806 (invokeNotificationHandler(resultContainer, handler, attachment) 807 == HandlerResult.CONTINUE)); 808 809 return null; 810 } finally { 811 receiveInvoked.set(Boolean.FALSE); 812 } 813 } 814 815 private int receive(int fd, 816 ByteBuffer dst, 817 SctpResultContainer resultContainer, 818 boolean peek) 819 throws IOException { 820 int pos = dst.position(); 821 int lim = dst.limit(); 822 assert (pos <= lim); 823 int rem = (pos <= lim ? lim - pos : 0); 824 if (dst instanceof DirectBuffer && rem > 0) 825 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek); 826 827 /* Substitute a native buffer */ 828 int newSize = Math.max(rem, 1); 829 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 830 try { 831 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek); 832 bb.flip(); 833 if (n > 0 && rem > 0) 834 dst.put(bb); 835 return n; 836 } finally { 837 Util.releaseTemporaryDirectBuffer(bb); 838 } 839 } 840 841 private int receiveIntoNativeBuffer(int fd, 842 SctpResultContainer resultContainer, 843 ByteBuffer bb, 844 int rem, 845 int pos, 846 boolean peek) 847 throws IOException 848 { 849 int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek); 850 851 if (n > 0) 852 bb.position(pos + n); 853 return n; 854 } 855 856 private InternalNotificationHandler<?> internalNotificationHandler = 857 new InternalNotificationHandler(); 858 859 private void handleNotificationInternal(SctpResultContainer resultContainer) 860 { 861 invokeNotificationHandler(resultContainer, 862 internalNotificationHandler, null); 863 } 864 865 private class InternalNotificationHandler<T> 866 extends AbstractNotificationHandler<T> 867 { 868 @Override 869 public HandlerResult handleNotification( 870 AssociationChangeNotification not, T unused) { 871 if (not.event().equals( 872 AssociationChangeNotification.AssocChangeEvent.COMM_UP) && 873 association == null) { 874 SctpAssocChange sac = (SctpAssocChange) not; 875 association = new SctpAssociationImpl 876 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams()); 877 } 878 return HandlerResult.CONTINUE; 879 } 880 } 881 882 private <T> HandlerResult invokeNotificationHandler 883 (SctpResultContainer resultContainer, 884 NotificationHandler<T> handler, 885 T attachment) { 886 SctpNotification notification = resultContainer.notification(); 887 synchronized (stateLock) { 888 notification.setAssociation(association); 889 } 890 891 if (!(handler instanceof AbstractNotificationHandler)) { 892 return handler.handleNotification(notification, attachment); 893 } 894 895 /* AbstractNotificationHandler */ 896 AbstractNotificationHandler absHandler = 897 (AbstractNotificationHandler)handler; 898 switch(resultContainer.type()) { 899 case ASSOCIATION_CHANGED : 900 return absHandler.handleNotification( 901 resultContainer.getAssociationChanged(), attachment); 902 case PEER_ADDRESS_CHANGED : 903 return absHandler.handleNotification( 904 resultContainer.getPeerAddressChanged(), attachment); 905 case SEND_FAILED : 906 return absHandler.handleNotification( 907 resultContainer.getSendFailed(), attachment); 908 case SHUTDOWN : 909 return absHandler.handleNotification( 910 resultContainer.getShutdown(), attachment); 911 default : 912 /* implementation specific handlers */ 913 return absHandler.handleNotification( 914 resultContainer.notification(), attachment); 915 } 916 } 917 918 private void checkAssociation(Association sendAssociation) { 919 synchronized (stateLock) { 920 if (sendAssociation != null && !sendAssociation.equals(association)) { 921 throw new IllegalArgumentException( 922 "Cannot send to another association"); 923 } 924 } 925 } 926 927 private void checkStreamNumber(int streamNumber) { 928 synchronized (stateLock) { 929 if (association != null) { 930 if (streamNumber < 0 || 931 streamNumber >= association.maxOutboundStreams()) 932 throw new InvalidStreamException(); 933 } 934 } 935 } 936 937 /* TODO: Add support for ttl and isComplete to both 121 12M 938 * SCTP_EOR not yet supported on reference platforms 939 * TTL support limited... 940 */ 941 @Override 942 public int send(ByteBuffer buffer, MessageInfo messageInfo) 943 throws IOException { 944 if (buffer == null) 945 throw new IllegalArgumentException("buffer cannot be null"); 946 947 if (messageInfo == null) 948 throw new IllegalArgumentException("messageInfo cannot be null"); 949 950 checkAssociation(messageInfo.association()); 951 checkStreamNumber(messageInfo.streamNumber()); 952 953 synchronized (sendLock) { 954 ensureSendOpen(); 955 956 int n = 0; 957 try { 958 begin(); 959 960 synchronized (stateLock) { 961 if(!isOpen()) 962 return 0; 963 senderThread = NativeThread.current(); 964 } 965 966 do { 967 n = send(fdVal, buffer, messageInfo); 968 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 969 970 return IOStatus.normalize(n); 971 } finally { 972 senderCleanup(); 973 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 974 assert IOStatus.check(n); 975 } 976 } 977 } 978 979 private int send(int fd, ByteBuffer src, MessageInfo messageInfo) 980 throws IOException { 981 int streamNumber = messageInfo.streamNumber(); 982 SocketAddress target = messageInfo.address(); 983 boolean unordered = messageInfo.isUnordered(); 984 int ppid = messageInfo.payloadProtocolID(); 985 986 if (src instanceof DirectBuffer) 987 return sendFromNativeBuffer(fd, src, target, streamNumber, 988 unordered, ppid); 989 990 /* Substitute a native buffer */ 991 int pos = src.position(); 992 int lim = src.limit(); 993 assert (pos <= lim && streamNumber >= 0); 994 995 int rem = (pos <= lim ? lim - pos : 0); 996 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 997 try { 998 bb.put(src); 999 bb.flip(); 1000 /* Do not update src until we see how many bytes were written */ 1001 src.position(pos); 1002 1003 int n = sendFromNativeBuffer(fd, bb, target, streamNumber, 1004 unordered, ppid); 1005 if (n > 0) { 1006 /* now update src */ 1007 src.position(pos + n); 1008 } 1009 return n; 1010 } finally { 1011 Util.releaseTemporaryDirectBuffer(bb); 1012 } 1013 } 1014 1015 private int sendFromNativeBuffer(int fd, 1016 ByteBuffer bb, 1017 SocketAddress target, 1018 int streamNumber, 1019 boolean unordered, 1020 int ppid) 1021 throws IOException { 1022 InetAddress addr = null; // no preferred address 1023 int port = 0; 1024 if (target != null) { 1025 InetSocketAddress isa = Net.checkAddress(target); 1026 addr = isa.getAddress(); 1027 port = isa.getPort(); 1028 } 1029 1030 int pos = bb.position(); 1031 int lim = bb.limit(); 1032 assert (pos <= lim); 1033 int rem = (pos <= lim ? lim - pos : 0); 1034 1035 int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr, 1036 port, -1 /*121*/, streamNumber, unordered, ppid); 1037 if (written > 0) 1038 bb.position(pos + written); 1039 return written; 1040 } 1041 1042 @Override 1043 public SctpChannel shutdown() throws IOException { 1044 synchronized(stateLock) { 1045 if (isShutdown) 1046 return this; 1047 1048 ensureSendOpen(); 1049 SctpNet.shutdown(fdVal, -1); 1050 if (senderThread != 0) 1051 NativeThread.signal(senderThread); 1052 isShutdown = true; 1053 } 1054 return this; 1055 } 1056 1057 @Override 1058 public Set<SocketAddress> getAllLocalAddresses() 1059 throws IOException { 1060 synchronized (stateLock) { 1061 if (!isOpen()) 1062 throw new ClosedChannelException(); 1063 if (!isBound()) 1064 return Collections.EMPTY_SET; 1065 1066 return SctpNet.getLocalAddresses(fdVal); 1067 } 1068 } 1069 1070 @Override 1071 public Set<SocketAddress> getRemoteAddresses() 1072 throws IOException { 1073 synchronized (stateLock) { 1074 if (!isOpen()) 1075 throw new ClosedChannelException(); 1076 if (!isConnected() || isShutdown) 1077 return Collections.EMPTY_SET; 1078 1079 try { 1080 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/); 1081 } catch (SocketException unused) { 1082 /* an open connected channel should always have remote addresses */ 1083 return remoteAddresses; 1084 } 1085 } 1086 } 1087 1088 /* Native */ 1089 private static native void initIDs(); 1090 1091 static native int receive0(int fd, SctpResultContainer resultContainer, 1092 long address, int length, boolean peek) throws IOException; 1093 1094 static native int send0(int fd, long address, int length, 1095 InetAddress addr, int port, int assocId, int streamNumber, 1096 boolean unordered, int ppid) throws IOException; 1097 1098 private static native int checkConnect(FileDescriptor fd, boolean block, 1099 boolean ready) throws IOException; 1100 1101 static { 1102 Util.load(); /* loads nio & net native libraries */ 1103 java.security.AccessController.doPrivileged( 1104 new sun.security.action.LoadLibraryAction("sctp")); 1105 initIDs(); 1106 } 1107 }