1 /* 2 * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch; 26 27 import java.net.InetAddress; 28 import java.net.SocketAddress; 29 import java.net.SocketException; 30 import java.net.InetSocketAddress; 31 import java.io.FileDescriptor; 32 import java.io.IOException; 33 import java.util.Collections; 34 import java.util.Map.Entry; 35 import java.util.Iterator; 36 import java.util.Set; 37 import java.util.HashSet; 38 import java.util.HashMap; 39 import java.nio.ByteBuffer; 40 import java.nio.channels.SelectionKey; 41 import java.nio.channels.ClosedChannelException; 42 import java.nio.channels.NotYetBoundException; 43 import java.nio.channels.spi.SelectorProvider; 44 import com.sun.nio.sctp.AbstractNotificationHandler; 45 import com.sun.nio.sctp.Association; 46 import com.sun.nio.sctp.AssociationChangeNotification; 47 import com.sun.nio.sctp.HandlerResult; 48 import com.sun.nio.sctp.IllegalReceiveException; 49 import com.sun.nio.sctp.InvalidStreamException; 50 import com.sun.nio.sctp.IllegalUnbindException; 51 import com.sun.nio.sctp.NotificationHandler; 52 import com.sun.nio.sctp.MessageInfo; 53 import com.sun.nio.sctp.SctpChannel; 54 import com.sun.nio.sctp.SctpMultiChannel; 55 import com.sun.nio.sctp.SctpSocketOption; 56 import static com.sun.nio.sctp.SctpStandardSocketOptions.*; 57 import static sun.nio.ch.SctpResultContainer.*; 58 59 /** 60 * An implementation of SctpMultiChannel 61 */ 62 public class SctpMultiChannelImpl extends SctpMultiChannel 63 implements SelChImpl 64 { 65 private final FileDescriptor fd; 66 67 private final int fdVal; 68 69 /* IDs of native threads doing send and receives, for signalling */ 70 private volatile long receiverThread = 0; 71 private volatile long senderThread = 0; 72 73 /* Lock held by current receiving thread */ 74 private final Object receiveLock = new Object(); 75 76 /* Lock held by current sending thread */ 77 private final Object sendLock = new Object(); 78 79 /* Lock held by any thread that modifies the state fields declared below 80 * DO NOT invoke a blocking I/O operation while holding this lock! */ 81 private final Object stateLock = new Object(); 82 83 private enum ChannelState { 84 UNINITIALIZED, 85 KILLPENDING, 86 KILLED, 87 } 88 89 /* -- The following fields are protected by stateLock -- */ 90 private ChannelState state = ChannelState.UNINITIALIZED; 91 92 /* Binding: Once bound the port will remain constant. */ 93 int port = -1; 94 private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>(); 95 /* Has the channel been bound to the wildcard address */ 96 private boolean wildcard; /* false */ 97 98 /* Keeps a map of addresses to association, and visa versa */ 99 private HashMap<SocketAddress, Association> addressMap = 100 new HashMap<SocketAddress, Association>(); 101 private HashMap<Association, Set<SocketAddress>> associationMap = 102 new HashMap<Association, Set<SocketAddress>>(); 103 104 /* -- End of fields protected by stateLock -- */ 105 106 /* If an association has been shutdown mark it for removal after 107 * the user handler has been invoked */ 108 private final ThreadLocal<Association> associationToRemove = 109 new ThreadLocal<Association>() { 110 @Override protected Association initialValue() { 111 return null; 112 } 113 }; 114 115 /* A notification handler cannot invoke receive */ 116 private final ThreadLocal<Boolean> receiveInvoked = 117 new ThreadLocal<Boolean>() { 118 @Override protected Boolean initialValue() { 119 return Boolean.FALSE; 120 } 121 }; 122 123 public SctpMultiChannelImpl(SelectorProvider provider) 124 throws IOException { 125 //TODO: update provider, remove public modifier 126 super(provider); 127 this.fd = SctpNet.socket(false /*one-to-many*/); 128 this.fdVal = IOUtil.fdVal(fd); 129 } 130 131 @Override 132 public SctpMultiChannel bind(SocketAddress local, int backlog) 133 throws IOException { 134 synchronized (receiveLock) { 135 synchronized (sendLock) { 136 synchronized (stateLock) { 137 ensureOpen(); 138 if (isBound()) 139 SctpNet.throwAlreadyBoundException(); 140 InetSocketAddress isa = (local == null) ? 141 new InetSocketAddress(0) : Net.checkAddress(local); 142 143 SecurityManager sm = System.getSecurityManager(); 144 if (sm != null) 145 sm.checkListen(isa.getPort()); 146 Net.bind(fd, isa.getAddress(), isa.getPort()); 147 148 InetSocketAddress boundIsa = Net.localAddress(fd); 149 port = boundIsa.getPort(); 150 localAddresses.add(isa); 151 if (isa.getAddress().isAnyLocalAddress()) 152 wildcard = true; 153 154 SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog); 155 } 156 } 157 } 158 return this; 159 } 160 161 @Override 162 public SctpMultiChannel bindAddress(InetAddress address) 163 throws IOException { 164 return bindUnbindAddress(address, true); 165 } 166 167 @Override 168 public SctpMultiChannel unbindAddress(InetAddress address) 169 throws IOException { 170 return bindUnbindAddress(address, false); 171 } 172 173 private SctpMultiChannel bindUnbindAddress(InetAddress address, 174 boolean add) 175 throws IOException { 176 if (address == null) 177 throw new IllegalArgumentException(); 178 179 synchronized (receiveLock) { 180 synchronized (sendLock) { 181 synchronized (stateLock) { 182 if (!isOpen()) 183 throw new ClosedChannelException(); 184 if (!isBound()) 185 throw new NotYetBoundException(); 186 if (wildcard) 187 throw new IllegalStateException( 188 "Cannot add or remove addresses from a channel that is bound to the wildcard address"); 189 if (address.isAnyLocalAddress()) 190 throw new IllegalArgumentException( 191 "Cannot add or remove the wildcard address"); 192 if (add) { 193 for (InetSocketAddress addr : localAddresses) { 194 if (addr.getAddress().equals(address)) { 195 SctpNet.throwAlreadyBoundException(); 196 } 197 } 198 } else { /*removing */ 199 /* Verify that there is more than one address 200 * and that address is already bound */ 201 if (localAddresses.size() <= 1) 202 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound"); 203 boolean foundAddress = false; 204 for (InetSocketAddress addr : localAddresses) { 205 if (addr.getAddress().equals(address)) { 206 foundAddress = true; 207 break; 208 } 209 } 210 if (!foundAddress ) 211 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address"); 212 } 213 214 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add); 215 216 /* Update our internal Set to reflect the addition/removal */ 217 if (add) 218 localAddresses.add(new InetSocketAddress(address, port)); 219 else { 220 for (InetSocketAddress addr : localAddresses) { 221 if (addr.getAddress().equals(address)) { 222 localAddresses.remove(addr); 223 break; 224 } 225 } 226 } 227 } 228 } 229 } 230 return this; 231 } 232 233 @Override 234 public Set<Association> associations() 235 throws ClosedChannelException, NotYetBoundException { 236 synchronized (stateLock) { 237 if (!isOpen()) 238 throw new ClosedChannelException(); 239 if (!isBound()) 240 throw new NotYetBoundException(); 241 242 return Collections.unmodifiableSet(associationMap.keySet()); 243 } 244 } 245 246 private boolean isBound() { 247 synchronized (stateLock) { 248 return port == -1 ? false : true; 249 } 250 } 251 252 private void ensureOpen() throws IOException { 253 synchronized (stateLock) { 254 if (!isOpen()) 255 throw new ClosedChannelException(); 256 } 257 } 258 259 private void receiverCleanup() throws IOException { 260 synchronized (stateLock) { 261 receiverThread = 0; 262 if (state == ChannelState.KILLPENDING) 263 kill(); 264 } 265 } 266 267 private void senderCleanup() throws IOException { 268 synchronized (stateLock) { 269 senderThread = 0; 270 if (state == ChannelState.KILLPENDING) 271 kill(); 272 } 273 } 274 275 @Override 276 protected void implConfigureBlocking(boolean block) throws IOException { 277 IOUtil.configureBlocking(fd, block); 278 } 279 280 @Override 281 public void implCloseSelectableChannel() throws IOException { 282 synchronized (stateLock) { 283 SctpNet.preClose(fdVal); 284 285 if (receiverThread != 0) 286 NativeThread.signal(receiverThread); 287 288 if (senderThread != 0) 289 NativeThread.signal(senderThread); 290 291 if (!isRegistered()) 292 kill(); 293 } 294 } 295 296 @Override 297 public FileDescriptor getFD() { 298 return fd; 299 } 300 301 @Override 302 public int getFDVal() { 303 return fdVal; 304 } 305 306 /** 307 * Translates native poll revent ops into a ready operation ops 308 */ 309 private boolean translateReadyOps(int ops, int initialOps, 310 SelectionKeyImpl sk) { 311 int intOps = sk.nioInterestOps(); 312 int oldOps = sk.nioReadyOps(); 313 int newOps = initialOps; 314 315 if ((ops & PollArrayWrapper.POLLNVAL) != 0) { 316 /* This should only happen if this channel is pre-closed while a 317 * selection operation is in progress 318 * ## Throw an error if this channel has not been pre-closed */ 319 return false; 320 } 321 322 if ((ops & (PollArrayWrapper.POLLERR 323 | PollArrayWrapper.POLLHUP)) != 0) { 324 newOps = intOps; 325 sk.nioReadyOps(newOps); 326 return (newOps & ~oldOps) != 0; 327 } 328 329 if (((ops & PollArrayWrapper.POLLIN) != 0) && 330 ((intOps & SelectionKey.OP_READ) != 0)) 331 newOps |= SelectionKey.OP_READ; 332 333 if (((ops & PollArrayWrapper.POLLOUT) != 0) && 334 ((intOps & SelectionKey.OP_WRITE) != 0)) 335 newOps |= SelectionKey.OP_WRITE; 336 337 sk.nioReadyOps(newOps); 338 return (newOps & ~oldOps) != 0; 339 } 340 341 @Override 342 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 343 return translateReadyOps(ops, sk.nioReadyOps(), sk); 344 } 345 346 @Override 347 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 348 return translateReadyOps(ops, 0, sk); 349 } 350 351 @Override 352 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { 353 int newOps = 0; 354 if ((ops & SelectionKey.OP_READ) != 0) 355 newOps |= PollArrayWrapper.POLLIN; 356 if ((ops & SelectionKey.OP_WRITE) != 0) 357 newOps |= PollArrayWrapper.POLLOUT; 358 sk.selector.putEventOps(sk, newOps); 359 } 360 361 @Override 362 public void kill() throws IOException { 363 synchronized (stateLock) { 364 if (state == ChannelState.KILLED) 365 return; 366 if (state == ChannelState.UNINITIALIZED) { 367 state = ChannelState.KILLED; 368 return; 369 } 370 assert !isOpen() && !isRegistered(); 371 372 /* Postpone the kill if there is a thread sending or receiving. */ 373 if (receiverThread == 0 && senderThread == 0) { 374 SctpNet.close(fdVal); 375 state = ChannelState.KILLED; 376 } else { 377 state = ChannelState.KILLPENDING; 378 } 379 } 380 } 381 382 @Override 383 public <T> SctpMultiChannel setOption(SctpSocketOption<T> name, 384 T value, 385 Association association) 386 throws IOException { 387 if (name == null) 388 throw new NullPointerException(); 389 if (!(supportedOptions().contains(name))) 390 throw new UnsupportedOperationException("'" + name + "' not supported"); 391 392 synchronized (stateLock) { 393 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) || 394 name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) { 395 checkAssociation(association); 396 } 397 if (!isOpen()) 398 throw new ClosedChannelException(); 399 400 int assocId = association == null ? 0 : association.associationID(); 401 SctpNet.setSocketOption(fdVal, name, value, assocId); 402 } 403 return this; 404 } 405 406 @Override 407 @SuppressWarnings("unchecked") 408 public <T> T getOption(SctpSocketOption<T> name, Association association) 409 throws IOException { 410 if (name == null) 411 throw new NullPointerException(); 412 if (!supportedOptions().contains(name)) 413 throw new UnsupportedOperationException("'" + name + "' not supported"); 414 415 synchronized (stateLock) { 416 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) || 417 name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) { 418 checkAssociation(association); 419 } 420 if (!isOpen()) 421 throw new ClosedChannelException(); 422 423 int assocId = association == null ? 0 : association.associationID(); 424 return (T)SctpNet.getSocketOption(fdVal, name, assocId); 425 } 426 } 427 428 private static class DefaultOptionsHolder { 429 static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions(); 430 431 private static Set<SctpSocketOption<?>> defaultOptions() { 432 HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10); 433 set.add(SCTP_DISABLE_FRAGMENTS); 434 set.add(SCTP_EXPLICIT_COMPLETE); 435 set.add(SCTP_FRAGMENT_INTERLEAVE); 436 set.add(SCTP_INIT_MAXSTREAMS); 437 set.add(SCTP_NODELAY); 438 set.add(SCTP_PRIMARY_ADDR); 439 set.add(SCTP_SET_PEER_PRIMARY_ADDR); 440 set.add(SO_SNDBUF); 441 set.add(SO_RCVBUF); 442 set.add(SO_LINGER); 443 return Collections.unmodifiableSet(set); 444 } 445 } 446 447 @Override 448 public final Set<SctpSocketOption<?>> supportedOptions() { 449 return DefaultOptionsHolder.defaultOptions; 450 } 451 452 @Override 453 public <T> MessageInfo receive(ByteBuffer buffer, 454 T attachment, 455 NotificationHandler<T> handler) 456 throws IOException { 457 if (buffer == null) 458 throw new IllegalArgumentException("buffer cannot be null"); 459 460 if (buffer.isReadOnly()) 461 throw new IllegalArgumentException("Read-only buffer"); 462 463 if (receiveInvoked.get()) 464 throw new IllegalReceiveException( 465 "cannot invoke receive from handler"); 466 receiveInvoked.set(Boolean.TRUE); 467 468 try { 469 SctpResultContainer resultContainer = new SctpResultContainer(); 470 do { 471 resultContainer.clear(); 472 synchronized (receiveLock) { 473 ensureOpen(); 474 if (!isBound()) 475 throw new NotYetBoundException(); 476 477 int n = 0; 478 try { 479 begin(); 480 481 synchronized (stateLock) { 482 if(!isOpen()) 483 return null; 484 receiverThread = NativeThread.current(); 485 } 486 487 do { 488 n = receive(fdVal, buffer, resultContainer); 489 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 490 491 } finally { 492 receiverCleanup(); 493 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 494 assert IOStatus.check(n); 495 } 496 497 if (!resultContainer.isNotification()) { 498 /* message or nothing */ 499 if (resultContainer.hasSomething()) { 500 /* Set the association before returning */ 501 SctpMessageInfoImpl info = 502 resultContainer.getMessageInfo(); 503 info.setAssociation(lookupAssociation(info. 504 associationID())); 505 SecurityManager sm = System.getSecurityManager(); 506 if (sm != null) { 507 InetSocketAddress isa = (InetSocketAddress)info.address(); 508 if (!addressMap.containsKey(isa)) { 509 /* must be a new association */ 510 try { 511 sm.checkAccept(isa.getAddress().getHostAddress(), 512 isa.getPort()); 513 } catch (SecurityException se) { 514 buffer.clear(); 515 throw se; 516 } 517 } 518 } 519 520 assert info.association() != null; 521 return info; 522 } else { 523 /* Non-blocking may return null if nothing available*/ 524 return null; 525 } 526 } else { /* notification */ 527 synchronized (stateLock) { 528 handleNotificationInternal( 529 resultContainer); 530 } 531 } 532 } /* receiveLock */ 533 } while (handler == null ? true : 534 (invokeNotificationHandler(resultContainer, handler, attachment) 535 == HandlerResult.CONTINUE)); 536 } finally { 537 receiveInvoked.set(Boolean.FALSE); 538 } 539 540 return null; 541 } 542 543 private int receive(int fd, 544 ByteBuffer dst, 545 SctpResultContainer resultContainer) 546 throws IOException { 547 int pos = dst.position(); 548 int lim = dst.limit(); 549 assert (pos <= lim); 550 int rem = (pos <= lim ? lim - pos : 0); 551 if (dst instanceof DirectBuffer && rem > 0) 552 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos); 553 554 /* Substitute a native buffer. */ 555 int newSize = Math.max(rem, 1); 556 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 557 try { 558 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0); 559 bb.flip(); 560 if (n > 0 && rem > 0) 561 dst.put(bb); 562 return n; 563 } finally { 564 Util.releaseTemporaryDirectBuffer(bb); 565 } 566 } 567 568 private int receiveIntoNativeBuffer(int fd, 569 SctpResultContainer resultContainer, 570 ByteBuffer bb, 571 int rem, 572 int pos) 573 throws IOException { 574 int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem); 575 if (n > 0) 576 bb.position(pos + n); 577 return n; 578 } 579 580 private InternalNotificationHandler internalNotificationHandler = 581 new InternalNotificationHandler(); 582 583 private void handleNotificationInternal(SctpResultContainer resultContainer) 584 { 585 invokeNotificationHandler(resultContainer, 586 internalNotificationHandler, null); 587 } 588 589 private class InternalNotificationHandler 590 extends AbstractNotificationHandler<Object> 591 { 592 @Override 593 public HandlerResult handleNotification( 594 AssociationChangeNotification not, Object unused) { 595 SctpAssocChange sac = (SctpAssocChange) not; 596 597 /* Update map to reflect change in association */ 598 switch (not.event()) { 599 case COMM_UP : 600 Association newAssociation = new SctpAssociationImpl 601 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams()); 602 addAssociation(newAssociation); 603 break; 604 case SHUTDOWN : 605 case COMM_LOST : 606 //case RESTART: ??? 607 /* mark association for removal after user handler invoked*/ 608 associationToRemove.set(lookupAssociation(sac.assocId())); 609 } 610 return HandlerResult.CONTINUE; 611 } 612 } 613 614 private <T> HandlerResult invokeNotificationHandler( 615 SctpResultContainer resultContainer, 616 NotificationHandler<T> handler, 617 T attachment) { 618 HandlerResult result; 619 SctpNotification notification = resultContainer.notification(); 620 notification.setAssociation(lookupAssociation(notification.assocId())); 621 622 if (!(handler instanceof AbstractNotificationHandler)) { 623 result = handler.handleNotification(notification, attachment); 624 } else { /* AbstractNotificationHandler */ 625 AbstractNotificationHandler<T> absHandler = 626 (AbstractNotificationHandler<T>)handler; 627 switch(resultContainer.type()) { 628 case ASSOCIATION_CHANGED : 629 result = absHandler.handleNotification( 630 resultContainer.getAssociationChanged(), attachment); 631 break; 632 case PEER_ADDRESS_CHANGED : 633 result = absHandler.handleNotification( 634 resultContainer.getPeerAddressChanged(), attachment); 635 break; 636 case SEND_FAILED : 637 result = absHandler.handleNotification( 638 resultContainer.getSendFailed(), attachment); 639 break; 640 case SHUTDOWN : 641 result = absHandler.handleNotification( 642 resultContainer.getShutdown(), attachment); 643 break; 644 default : 645 /* implementation specific handlers */ 646 result = absHandler.handleNotification( 647 resultContainer.notification(), attachment); 648 } 649 } 650 651 if (!(handler instanceof InternalNotificationHandler)) { 652 /* Only remove associations after user handler 653 * has finished with them */ 654 Association assoc = associationToRemove.get(); 655 if (assoc != null) { 656 removeAssociation(assoc); 657 associationToRemove.set(null); 658 } 659 660 } 661 662 return result; 663 } 664 665 private Association lookupAssociation(int assocId) { 666 /* Lookup the association in our internal map */ 667 synchronized (stateLock) { 668 Set<Association> assocs = associationMap.keySet(); 669 for (Association a : assocs) { 670 if (a.associationID() == assocId) { 671 return a; 672 } 673 } 674 } 675 return null; 676 } 677 678 private void addAssociation(Association association) { 679 synchronized (stateLock) { 680 int assocId = association.associationID(); 681 Set<SocketAddress> addresses = null; 682 683 try { 684 addresses = SctpNet.getRemoteAddresses(fdVal, assocId); 685 } catch (IOException unused) { 686 /* OK, determining connected addresses may not be possible 687 * shutdown, connection lost, etc */ 688 } 689 690 associationMap.put(association, addresses); 691 if (addresses != null) { 692 for (SocketAddress addr : addresses) 693 addressMap.put(addr, association); 694 } 695 } 696 } 697 698 private void removeAssociation(Association association) { 699 synchronized (stateLock) { 700 int assocId = association.associationID(); 701 Set<SocketAddress> addresses = null; 702 703 try { 704 addresses = SctpNet.getRemoteAddresses(fdVal, assocId); 705 } catch (IOException unused) { 706 /* OK, determining connected addresses may not be possible 707 * shutdown, connection lost, etc */ 708 } 709 710 Set<Association> assocs = associationMap.keySet(); 711 for (Association a : assocs) { 712 if (a.associationID() == assocId) { 713 associationMap.remove(a); 714 break; 715 } 716 } 717 if (addresses != null) { 718 for (SocketAddress addr : addresses) 719 addressMap.remove(addr); 720 } else { 721 /* We cannot determine the connected addresses */ 722 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs = 723 addressMap.entrySet(); 724 Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator(); 725 while (iterator.hasNext()) { 726 Entry<SocketAddress, Association> entry = iterator.next(); 727 if (entry.getValue().equals(association)) { 728 iterator.remove(); 729 } 730 } 731 } 732 } 733 } 734 735 /** 736 * @throws IllegalArgumentException 737 * If the given association is not controlled by this channel 738 * 739 * @return {@code true} if, and only if, the given association is one 740 * of the current associations controlled by this channel 741 */ 742 private boolean checkAssociation(Association messageAssoc) { 743 synchronized (stateLock) { 744 for (Association association : associationMap.keySet()) { 745 if (messageAssoc.equals(association)) { 746 return true; 747 } 748 } 749 } 750 throw new IllegalArgumentException( 751 "Given Association is not controlled by this channel"); 752 } 753 754 private void checkStreamNumber(Association assoc, int streamNumber) { 755 synchronized (stateLock) { 756 if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams()) 757 throw new InvalidStreamException(); 758 } 759 } 760 761 /* TODO: Add support for ttl and isComplete to both 121 12M 762 * SCTP_EOR not yet supported on reference platforms 763 * TTL support limited... 764 */ 765 @Override 766 public int send(ByteBuffer buffer, MessageInfo messageInfo) 767 throws IOException { 768 if (buffer == null) 769 throw new IllegalArgumentException("buffer cannot be null"); 770 771 if (messageInfo == null) 772 throw new IllegalArgumentException("messageInfo cannot be null"); 773 774 synchronized (sendLock) { 775 ensureOpen(); 776 777 if (!isBound()) 778 bind(null, 0); 779 780 int n = 0; 781 try { 782 int assocId = -1; 783 SocketAddress address = null; 784 begin(); 785 786 synchronized (stateLock) { 787 if(!isOpen()) 788 return 0; 789 senderThread = NativeThread.current(); 790 791 /* Determine what address or association to send to */ 792 Association assoc = messageInfo.association(); 793 InetSocketAddress addr = (InetSocketAddress)messageInfo.address(); 794 if (assoc != null) { 795 checkAssociation(assoc); 796 checkStreamNumber(assoc, messageInfo.streamNumber()); 797 assocId = assoc.associationID(); 798 /* have we also got a preferred address */ 799 if (addr != null) { 800 if (!assoc.equals(addressMap.get(addr))) 801 throw new IllegalArgumentException("given preferred address is not part of this association"); 802 address = addr; 803 } 804 } else if (addr != null) { 805 address = addr; 806 Association association = addressMap.get(addr); 807 if (association != null) { 808 checkStreamNumber(association, messageInfo.streamNumber()); 809 assocId = association.associationID(); 810 811 } else { /* must be new association */ 812 SecurityManager sm = System.getSecurityManager(); 813 if (sm != null) 814 sm.checkConnect(addr.getAddress().getHostAddress(), 815 addr.getPort()); 816 } 817 } else { 818 throw new AssertionError( 819 "Both association and address cannot be null"); 820 } 821 } 822 823 do { 824 n = send(fdVal, buffer, assocId, address, messageInfo); 825 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 826 827 return IOStatus.normalize(n); 828 } finally { 829 senderCleanup(); 830 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 831 assert IOStatus.check(n); 832 } 833 } 834 } 835 836 private int send(int fd, 837 ByteBuffer src, 838 int assocId, 839 SocketAddress target, 840 MessageInfo messageInfo) 841 throws IOException { 842 int streamNumber = messageInfo.streamNumber(); 843 boolean unordered = messageInfo.isUnordered(); 844 int ppid = messageInfo.payloadProtocolID(); 845 846 if (src instanceof DirectBuffer) 847 return sendFromNativeBuffer(fd, src, target, assocId, 848 streamNumber, unordered, ppid); 849 850 /* Substitute a native buffer */ 851 int pos = src.position(); 852 int lim = src.limit(); 853 assert (pos <= lim && streamNumber >= 0); 854 855 int rem = (pos <= lim ? lim - pos : 0); 856 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 857 try { 858 bb.put(src); 859 bb.flip(); 860 /* Do not update src until we see how many bytes were written */ 861 src.position(pos); 862 863 int n = sendFromNativeBuffer(fd, bb, target, assocId, 864 streamNumber, unordered, ppid); 865 if (n > 0) { 866 /* now update src */ 867 src.position(pos + n); 868 } 869 return n; 870 } finally { 871 Util.releaseTemporaryDirectBuffer(bb); 872 } 873 } 874 875 private int sendFromNativeBuffer(int fd, 876 ByteBuffer bb, 877 SocketAddress target, 878 int assocId, 879 int streamNumber, 880 boolean unordered, 881 int ppid) 882 throws IOException { 883 int pos = bb.position(); 884 int lim = bb.limit(); 885 assert (pos <= lim); 886 int rem = (pos <= lim ? lim - pos : 0); 887 888 int written = send0(fd, ((DirectBuffer)bb).address() + pos, 889 rem, target, assocId, streamNumber, unordered, ppid); 890 if (written > 0) 891 bb.position(pos + written); 892 return written; 893 } 894 895 @Override 896 public SctpMultiChannel shutdown(Association association) 897 throws IOException { 898 synchronized (stateLock) { 899 checkAssociation(association); 900 if (!isOpen()) 901 throw new ClosedChannelException(); 902 903 SctpNet.shutdown(fdVal, association.associationID()); 904 } 905 return this; 906 } 907 908 @Override 909 public Set<SocketAddress> getAllLocalAddresses() 910 throws IOException { 911 synchronized (stateLock) { 912 if (!isOpen()) 913 throw new ClosedChannelException(); 914 if (!isBound()) 915 return Collections.emptySet(); 916 917 return SctpNet.getLocalAddresses(fdVal); 918 } 919 } 920 921 @Override 922 public Set<SocketAddress> getRemoteAddresses(Association association) 923 throws IOException { 924 synchronized (stateLock) { 925 checkAssociation(association); 926 if (!isOpen()) 927 throw new ClosedChannelException(); 928 929 try { 930 return SctpNet.getRemoteAddresses(fdVal, association.associationID()); 931 } catch (SocketException se) { 932 /* a valid association should always have remote addresses */ 933 Set<SocketAddress> addrs = associationMap.get(association); 934 return addrs != null ? addrs : Collections.<SocketAddress>emptySet(); 935 } 936 } 937 } 938 939 @Override 940 public SctpChannel branch(Association association) 941 throws IOException { 942 synchronized (stateLock) { 943 checkAssociation(association); 944 if (!isOpen()) 945 throw new ClosedChannelException(); 946 947 FileDescriptor bFd = SctpNet.branch(fdVal, 948 association.associationID()); 949 /* successfully branched, we can now remove it from assoc list */ 950 removeAssociation(association); 951 952 return new SctpChannelImpl(provider(), bFd, association); 953 } 954 } 955 956 /* Use common native implementation shared between 957 * one-to-one and one-to-many */ 958 private static int receive0(int fd, 959 SctpResultContainer resultContainer, 960 long address, 961 int length) 962 throws IOException{ 963 return SctpChannelImpl.receive0(fd, resultContainer, address, 964 length, false /*peek */); 965 } 966 967 private static int send0(int fd, 968 long address, 969 int length, 970 SocketAddress target, 971 int assocId, 972 int streamNumber, 973 boolean unordered, 974 int ppid) 975 throws IOException { 976 return SctpChannelImpl.send0(fd, address, length, target, assocId, 977 streamNumber, unordered, ppid); 978 } 979 980 static { 981 Util.load(); /* loads nio & net native libraries */ 982 java.security.AccessController.doPrivileged( 983 new sun.security.action.LoadLibraryAction("sctp")); 984 } 985 }