1 /* 2 * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package sun.nio.ch.sctp; 26 27 import java.net.InetAddress; 28 import java.net.SocketAddress; 29 import java.net.SocketException; 30 import java.net.InetSocketAddress; 31 import java.io.FileDescriptor; 32 import java.io.IOException; 33 import java.util.Collections; 34 import java.util.Map.Entry; 35 import java.util.Iterator; 36 import java.util.Set; 37 import java.util.HashSet; 38 import java.util.HashMap; 39 import java.nio.ByteBuffer; 40 import java.nio.channels.SelectionKey; 41 import java.nio.channels.ClosedChannelException; 42 import java.nio.channels.NotYetBoundException; 43 import java.nio.channels.spi.SelectorProvider; 44 import com.sun.nio.sctp.AbstractNotificationHandler; 45 import com.sun.nio.sctp.Association; 46 import com.sun.nio.sctp.AssociationChangeNotification; 47 import com.sun.nio.sctp.HandlerResult; 48 import com.sun.nio.sctp.IllegalReceiveException; 49 import com.sun.nio.sctp.InvalidStreamException; 50 import com.sun.nio.sctp.IllegalUnbindException; 51 import com.sun.nio.sctp.NotificationHandler; 52 import com.sun.nio.sctp.MessageInfo; 53 import com.sun.nio.sctp.SctpChannel; 54 import com.sun.nio.sctp.SctpMultiChannel; 55 import com.sun.nio.sctp.SctpSocketOption; 56 import sun.nio.ch.DirectBuffer; 57 import sun.nio.ch.NativeThread; 58 import sun.nio.ch.IOStatus; 59 import sun.nio.ch.IOUtil; 60 import sun.nio.ch.Net; 61 import sun.nio.ch.PollArrayWrapper; 62 import sun.nio.ch.SelChImpl; 63 import sun.nio.ch.SelectionKeyImpl; 64 import sun.nio.ch.Util; 65 import static com.sun.nio.sctp.SctpStandardSocketOptions.*; 66 import static sun.nio.ch.sctp.ResultContainer.*; 67 68 /** 69 * An implementation of SctpMultiChannel 70 */ 71 public class SctpMultiChannelImpl extends SctpMultiChannel 72 implements SelChImpl 73 { 74 private final FileDescriptor fd; 75 76 private final int fdVal; 77 78 /* IDs of native threads doing send and receives, for signalling */ 79 private volatile long receiverThread = 0; 80 private volatile long senderThread = 0; 81 82 /* Lock held by current receiving thread */ 83 private final Object receiveLock = new Object(); 84 85 /* Lock held by current sending thread */ 86 private final Object sendLock = new Object(); 87 88 /* Lock held by any thread that modifies the state fields declared below 89 * DO NOT invoke a blocking I/O operation while holding this lock! */ 90 private final Object stateLock = new Object(); 91 92 private enum ChannelState { 93 UNINITIALIZED, 94 KILLPENDING, 95 KILLED, 96 } 97 98 /* -- The following fields are protected by stateLock -- */ 99 private ChannelState state = ChannelState.UNINITIALIZED; 100 101 /* Binding: Once bound the port will remain constant. */ 102 int port = -1; 103 private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>(); 104 /* Has the channel been bound to the wildcard address */ 105 private boolean wildcard; /* false */ 106 107 /* Keeps a map of addresses to association, and visa versa */ 108 private HashMap<SocketAddress, Association> addressMap = 109 new HashMap<SocketAddress, Association>(); 110 private HashMap<Association, Set<SocketAddress>> associationMap = 111 new HashMap<Association, Set<SocketAddress>>(); 112 113 /* -- End of fields protected by stateLock -- */ 114 115 /* If an association has been shutdown mark it for removal after 116 * the user handler has been invoked */ 117 private final ThreadLocal<Association> associationToRemove = 118 new ThreadLocal<Association>() { 119 @Override protected Association initialValue() { 120 return null; 121 } 122 }; 123 124 /* A notification handler cannot invoke receive */ 125 private final ThreadLocal<Boolean> receiveInvoked = 126 new ThreadLocal<Boolean>() { 127 @Override protected Boolean initialValue() { 128 return Boolean.FALSE; 129 } 130 }; 131 132 public SctpMultiChannelImpl(SelectorProvider provider) 133 throws IOException { 134 //TODO: update provider, remove public modifier 135 super(provider); 136 this.fd = SctpNet.socket(false /*one-to-many*/); 137 this.fdVal = IOUtil.fdVal(fd); 138 } 139 140 @Override 141 public SctpMultiChannel bind(SocketAddress local, int backlog) 142 throws IOException { 143 synchronized (receiveLock) { 144 synchronized (sendLock) { 145 synchronized (stateLock) { 146 ensureOpen(); 147 if (isBound()) 148 SctpNet.throwAlreadyBoundException(); 149 InetSocketAddress isa = (local == null) ? 150 new InetSocketAddress(0) : Net.checkAddress(local); 151 152 SecurityManager sm = System.getSecurityManager(); 153 if (sm != null) 154 sm.checkListen(isa.getPort()); 155 Net.bind(fd, isa.getAddress(), isa.getPort()); 156 157 InetSocketAddress boundIsa = Net.localAddress(fd); 158 port = boundIsa.getPort(); 159 localAddresses.add(isa); 160 if (isa.getAddress().isAnyLocalAddress()) 161 wildcard = true; 162 163 SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog); 164 } 165 } 166 } 167 return this; 168 } 169 170 @Override 171 public SctpMultiChannel bindAddress(InetAddress address) 172 throws IOException { 173 return bindUnbindAddress(address, true); 174 } 175 176 @Override 177 public SctpMultiChannel unbindAddress(InetAddress address) 178 throws IOException { 179 return bindUnbindAddress(address, false); 180 } 181 182 private SctpMultiChannel bindUnbindAddress(InetAddress address, 183 boolean add) 184 throws IOException { 185 if (address == null) 186 throw new IllegalArgumentException(); 187 188 synchronized (receiveLock) { 189 synchronized (sendLock) { 190 synchronized (stateLock) { 191 if (!isOpen()) 192 throw new ClosedChannelException(); 193 if (!isBound()) 194 throw new NotYetBoundException(); 195 if (wildcard) 196 throw new IllegalStateException( 197 "Cannot add or remove addresses from a channel that is bound to the wildcard address"); 198 if (address.isAnyLocalAddress()) 199 throw new IllegalArgumentException( 200 "Cannot add or remove the wildcard address"); 201 if (add) { 202 for (InetSocketAddress addr : localAddresses) { 203 if (addr.getAddress().equals(address)) { 204 SctpNet.throwAlreadyBoundException(); 205 } 206 } 207 } else { /*removing */ 208 /* Verify that there is more than one address 209 * and that address is already bound */ 210 if (localAddresses.size() <= 1) 211 throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound"); 212 boolean foundAddress = false; 213 for (InetSocketAddress addr : localAddresses) { 214 if (addr.getAddress().equals(address)) { 215 foundAddress = true; 216 break; 217 } 218 } 219 if (!foundAddress ) 220 throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address"); 221 } 222 223 SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add); 224 225 /* Update our internal Set to reflect the addition/removal */ 226 if (add) 227 localAddresses.add(new InetSocketAddress(address, port)); 228 else { 229 for (InetSocketAddress addr : localAddresses) { 230 if (addr.getAddress().equals(address)) { 231 localAddresses.remove(addr); 232 break; 233 } 234 } 235 } 236 } 237 } 238 } 239 return this; 240 } 241 242 @Override 243 public Set<Association> associations() 244 throws ClosedChannelException, NotYetBoundException { 245 synchronized (stateLock) { 246 if (!isOpen()) 247 throw new ClosedChannelException(); 248 if (!isBound()) 249 throw new NotYetBoundException(); 250 251 return Collections.unmodifiableSet(associationMap.keySet()); 252 } 253 } 254 255 private boolean isBound() { 256 synchronized (stateLock) { 257 return port == -1 ? false : true; 258 } 259 } 260 261 private void ensureOpen() throws IOException { 262 synchronized (stateLock) { 263 if (!isOpen()) 264 throw new ClosedChannelException(); 265 } 266 } 267 268 private void receiverCleanup() throws IOException { 269 synchronized (stateLock) { 270 receiverThread = 0; 271 if (state == ChannelState.KILLPENDING) 272 kill(); 273 } 274 } 275 276 private void senderCleanup() throws IOException { 277 synchronized (stateLock) { 278 senderThread = 0; 279 if (state == ChannelState.KILLPENDING) 280 kill(); 281 } 282 } 283 284 @Override 285 protected void implConfigureBlocking(boolean block) throws IOException { 286 IOUtil.configureBlocking(fd, block); 287 } 288 289 @Override 290 public void implCloseSelectableChannel() throws IOException { 291 synchronized (stateLock) { 292 SctpNet.preClose(fdVal); 293 294 if (receiverThread != 0) 295 NativeThread.signal(receiverThread); 296 297 if (senderThread != 0) 298 NativeThread.signal(senderThread); 299 300 if (!isRegistered()) 301 kill(); 302 } 303 } 304 305 @Override 306 public FileDescriptor getFD() { 307 return fd; 308 } 309 310 @Override 311 public int getFDVal() { 312 return fdVal; 313 } 314 315 /** 316 * Translates native poll revent ops into a ready operation ops 317 */ 318 private boolean translateReadyOps(int ops, int initialOps, 319 SelectionKeyImpl sk) { 320 int intOps = sk.nioInterestOps(); 321 int oldOps = sk.nioReadyOps(); 322 int newOps = initialOps; 323 324 if ((ops & PollArrayWrapper.POLLNVAL) != 0) { 325 /* This should only happen if this channel is pre-closed while a 326 * selection operation is in progress 327 * ## Throw an error if this channel has not been pre-closed */ 328 return false; 329 } 330 331 if ((ops & (PollArrayWrapper.POLLERR 332 | PollArrayWrapper.POLLHUP)) != 0) { 333 newOps = intOps; 334 sk.nioReadyOps(newOps); 335 return (newOps & ~oldOps) != 0; 336 } 337 338 if (((ops & PollArrayWrapper.POLLIN) != 0) && 339 ((intOps & SelectionKey.OP_READ) != 0)) 340 newOps |= SelectionKey.OP_READ; 341 342 if (((ops & PollArrayWrapper.POLLOUT) != 0) && 343 ((intOps & SelectionKey.OP_WRITE) != 0)) 344 newOps |= SelectionKey.OP_WRITE; 345 346 sk.nioReadyOps(newOps); 347 return (newOps & ~oldOps) != 0; 348 } 349 350 @Override 351 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) { 352 return translateReadyOps(ops, sk.nioReadyOps(), sk); 353 } 354 355 @Override 356 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) { 357 return translateReadyOps(ops, 0, sk); 358 } 359 360 @Override 361 public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { 362 int newOps = 0; 363 if ((ops & SelectionKey.OP_READ) != 0) 364 newOps |= PollArrayWrapper.POLLIN; 365 if ((ops & SelectionKey.OP_WRITE) != 0) 366 newOps |= PollArrayWrapper.POLLOUT; 367 sk.selector.putEventOps(sk, newOps); 368 } 369 370 @Override 371 public void kill() throws IOException { 372 synchronized (stateLock) { 373 if (state == ChannelState.KILLED) 374 return; 375 if (state == ChannelState.UNINITIALIZED) { 376 state = ChannelState.KILLED; 377 return; 378 } 379 assert !isOpen() && !isRegistered(); 380 381 /* Postpone the kill if there is a thread sending or receiving. */ 382 if (receiverThread == 0 && senderThread == 0) { 383 SctpNet.close(fdVal); 384 state = ChannelState.KILLED; 385 } else { 386 state = ChannelState.KILLPENDING; 387 } 388 } 389 } 390 391 @Override 392 public <T> SctpMultiChannel setOption(SctpSocketOption<T> name, 393 T value, 394 Association association) 395 throws IOException { 396 if (name == null) 397 throw new NullPointerException(); 398 if (!(supportedOptions().contains(name))) 399 throw new UnsupportedOperationException("'" + name + "' not supported"); 400 401 synchronized (stateLock) { 402 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) || 403 name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) { 404 checkAssociation(association); 405 } 406 if (!isOpen()) 407 throw new ClosedChannelException(); 408 409 int assocId = association == null ? 0 : association.associationID(); 410 SctpNet.setSocketOption(fdVal, name, value, assocId); 411 } 412 return this; 413 } 414 415 @Override 416 @SuppressWarnings("unchecked") 417 public <T> T getOption(SctpSocketOption<T> name, Association association) 418 throws IOException { 419 if (name == null) 420 throw new NullPointerException(); 421 if (!supportedOptions().contains(name)) 422 throw new UnsupportedOperationException("'" + name + "' not supported"); 423 424 synchronized (stateLock) { 425 if (association != null && (name.equals(SCTP_PRIMARY_ADDR) || 426 name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) { 427 checkAssociation(association); 428 } 429 if (!isOpen()) 430 throw new ClosedChannelException(); 431 432 int assocId = association == null ? 0 : association.associationID(); 433 return (T)SctpNet.getSocketOption(fdVal, name, assocId); 434 } 435 } 436 437 private static class DefaultOptionsHolder { 438 static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions(); 439 440 private static Set<SctpSocketOption<?>> defaultOptions() { 441 HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10); 442 set.add(SCTP_DISABLE_FRAGMENTS); 443 set.add(SCTP_EXPLICIT_COMPLETE); 444 set.add(SCTP_FRAGMENT_INTERLEAVE); 445 set.add(SCTP_INIT_MAXSTREAMS); 446 set.add(SCTP_NODELAY); 447 set.add(SCTP_PRIMARY_ADDR); 448 set.add(SCTP_SET_PEER_PRIMARY_ADDR); 449 set.add(SO_SNDBUF); 450 set.add(SO_RCVBUF); 451 set.add(SO_LINGER); 452 return Collections.unmodifiableSet(set); 453 } 454 } 455 456 @Override 457 public final Set<SctpSocketOption<?>> supportedOptions() { 458 return DefaultOptionsHolder.defaultOptions; 459 } 460 461 @Override 462 public <T> MessageInfo receive(ByteBuffer buffer, 463 T attachment, 464 NotificationHandler<T> handler) 465 throws IOException { 466 if (buffer == null) 467 throw new IllegalArgumentException("buffer cannot be null"); 468 469 if (buffer.isReadOnly()) 470 throw new IllegalArgumentException("Read-only buffer"); 471 472 if (receiveInvoked.get()) 473 throw new IllegalReceiveException( 474 "cannot invoke receive from handler"); 475 receiveInvoked.set(Boolean.TRUE); 476 477 try { 478 ResultContainer resultContainer = new ResultContainer(); 479 do { 480 resultContainer.clear(); 481 synchronized (receiveLock) { 482 ensureOpen(); 483 if (!isBound()) 484 throw new NotYetBoundException(); 485 486 int n = 0; 487 try { 488 begin(); 489 490 synchronized (stateLock) { 491 if(!isOpen()) 492 return null; 493 receiverThread = NativeThread.current(); 494 } 495 496 do { 497 n = receive(fdVal, buffer, resultContainer); 498 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 499 500 } finally { 501 receiverCleanup(); 502 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 503 assert IOStatus.check(n); 504 } 505 506 if (!resultContainer.isNotification()) { 507 /* message or nothing */ 508 if (resultContainer.hasSomething()) { 509 /* Set the association before returning */ 510 MessageInfoImpl info = 511 resultContainer.getMessageInfo(); 512 info.setAssociation(lookupAssociation(info. 513 associationID())); 514 SecurityManager sm = System.getSecurityManager(); 515 if (sm != null) { 516 InetSocketAddress isa = (InetSocketAddress)info.address(); 517 if (!addressMap.containsKey(isa)) { 518 /* must be a new association */ 519 try { 520 sm.checkAccept(isa.getAddress().getHostAddress(), 521 isa.getPort()); 522 } catch (SecurityException se) { 523 buffer.clear(); 524 throw se; 525 } 526 } 527 } 528 529 assert info.association() != null; 530 return info; 531 } else { 532 /* Non-blocking may return null if nothing available*/ 533 return null; 534 } 535 } else { /* notification */ 536 synchronized (stateLock) { 537 handleNotificationInternal( 538 resultContainer); 539 } 540 } 541 } /* receiveLock */ 542 } while (handler == null ? true : 543 (invokeNotificationHandler(resultContainer, handler, attachment) 544 == HandlerResult.CONTINUE)); 545 } finally { 546 receiveInvoked.set(Boolean.FALSE); 547 } 548 549 return null; 550 } 551 552 private int receive(int fd, 553 ByteBuffer dst, 554 ResultContainer resultContainer) 555 throws IOException { 556 int pos = dst.position(); 557 int lim = dst.limit(); 558 assert (pos <= lim); 559 int rem = (pos <= lim ? lim - pos : 0); 560 if (dst instanceof DirectBuffer && rem > 0) 561 return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos); 562 563 /* Substitute a native buffer. */ 564 int newSize = Math.max(rem, 1); 565 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 566 try { 567 int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0); 568 bb.flip(); 569 if (n > 0 && rem > 0) 570 dst.put(bb); 571 return n; 572 } finally { 573 Util.releaseTemporaryDirectBuffer(bb); 574 } 575 } 576 577 private int receiveIntoNativeBuffer(int fd, 578 ResultContainer resultContainer, 579 ByteBuffer bb, 580 int rem, 581 int pos) 582 throws IOException { 583 int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem); 584 if (n > 0) 585 bb.position(pos + n); 586 return n; 587 } 588 589 private InternalNotificationHandler internalNotificationHandler = 590 new InternalNotificationHandler(); 591 592 private void handleNotificationInternal(ResultContainer resultContainer) 593 { 594 invokeNotificationHandler(resultContainer, 595 internalNotificationHandler, null); 596 } 597 598 private class InternalNotificationHandler 599 extends AbstractNotificationHandler<Object> 600 { 601 @Override 602 public HandlerResult handleNotification( 603 AssociationChangeNotification not, Object unused) { 604 AssociationChange sac = (AssociationChange) not; 605 606 /* Update map to reflect change in association */ 607 switch (not.event()) { 608 case COMM_UP : 609 Association newAssociation = new AssociationImpl 610 (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams()); 611 addAssociation(newAssociation); 612 break; 613 case SHUTDOWN : 614 case COMM_LOST : 615 //case RESTART: ??? 616 /* mark association for removal after user handler invoked*/ 617 associationToRemove.set(lookupAssociation(sac.assocId())); 618 } 619 return HandlerResult.CONTINUE; 620 } 621 } 622 623 private <T> HandlerResult invokeNotificationHandler( 624 ResultContainer resultContainer, 625 NotificationHandler<T> handler, 626 T attachment) { 627 HandlerResult result; 628 SctpNotification notification = resultContainer.notification(); 629 notification.setAssociation(lookupAssociation(notification.assocId())); 630 631 if (!(handler instanceof AbstractNotificationHandler)) { 632 result = handler.handleNotification(notification, attachment); 633 } else { /* AbstractNotificationHandler */ 634 AbstractNotificationHandler<T> absHandler = 635 (AbstractNotificationHandler<T>)handler; 636 switch(resultContainer.type()) { 637 case ASSOCIATION_CHANGED : 638 result = absHandler.handleNotification( 639 resultContainer.getAssociationChanged(), attachment); 640 break; 641 case PEER_ADDRESS_CHANGED : 642 result = absHandler.handleNotification( 643 resultContainer.getPeerAddressChanged(), attachment); 644 break; 645 case SEND_FAILED : 646 result = absHandler.handleNotification( 647 resultContainer.getSendFailed(), attachment); 648 break; 649 case SHUTDOWN : 650 result = absHandler.handleNotification( 651 resultContainer.getShutdown(), attachment); 652 break; 653 default : 654 /* implementation specific handlers */ 655 result = absHandler.handleNotification( 656 resultContainer.notification(), attachment); 657 } 658 } 659 660 if (!(handler instanceof InternalNotificationHandler)) { 661 /* Only remove associations after user handler 662 * has finished with them */ 663 Association assoc = associationToRemove.get(); 664 if (assoc != null) { 665 removeAssociation(assoc); 666 associationToRemove.set(null); 667 } 668 669 } 670 671 return result; 672 } 673 674 private Association lookupAssociation(int assocId) { 675 /* Lookup the association in our internal map */ 676 synchronized (stateLock) { 677 Set<Association> assocs = associationMap.keySet(); 678 for (Association a : assocs) { 679 if (a.associationID() == assocId) { 680 return a; 681 } 682 } 683 } 684 return null; 685 } 686 687 private void addAssociation(Association association) { 688 synchronized (stateLock) { 689 int assocId = association.associationID(); 690 Set<SocketAddress> addresses = null; 691 692 try { 693 addresses = SctpNet.getRemoteAddresses(fdVal, assocId); 694 } catch (IOException unused) { 695 /* OK, determining connected addresses may not be possible 696 * shutdown, connection lost, etc */ 697 } 698 699 associationMap.put(association, addresses); 700 if (addresses != null) { 701 for (SocketAddress addr : addresses) 702 addressMap.put(addr, association); 703 } 704 } 705 } 706 707 private void removeAssociation(Association association) { 708 synchronized (stateLock) { 709 int assocId = association.associationID(); 710 Set<SocketAddress> addresses = null; 711 712 try { 713 addresses = SctpNet.getRemoteAddresses(fdVal, assocId); 714 } catch (IOException unused) { 715 /* OK, determining connected addresses may not be possible 716 * shutdown, connection lost, etc */ 717 } 718 719 Set<Association> assocs = associationMap.keySet(); 720 for (Association a : assocs) { 721 if (a.associationID() == assocId) { 722 associationMap.remove(a); 723 break; 724 } 725 } 726 if (addresses != null) { 727 for (SocketAddress addr : addresses) 728 addressMap.remove(addr); 729 } else { 730 /* We cannot determine the connected addresses */ 731 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs = 732 addressMap.entrySet(); 733 Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator(); 734 while (iterator.hasNext()) { 735 Entry<SocketAddress, Association> entry = iterator.next(); 736 if (entry.getValue().equals(association)) { 737 iterator.remove(); 738 } 739 } 740 } 741 } 742 } 743 744 /** 745 * @throws IllegalArgumentException 746 * If the given association is not controlled by this channel 747 * 748 * @return {@code true} if, and only if, the given association is one 749 * of the current associations controlled by this channel 750 */ 751 private boolean checkAssociation(Association messageAssoc) { 752 synchronized (stateLock) { 753 for (Association association : associationMap.keySet()) { 754 if (messageAssoc.equals(association)) { 755 return true; 756 } 757 } 758 } 759 throw new IllegalArgumentException( 760 "Given Association is not controlled by this channel"); 761 } 762 763 private void checkStreamNumber(Association assoc, int streamNumber) { 764 synchronized (stateLock) { 765 if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams()) 766 throw new InvalidStreamException(); 767 } 768 } 769 770 /* TODO: Add support for ttl and isComplete to both 121 12M 771 * SCTP_EOR not yet supported on reference platforms 772 * TTL support limited... 773 */ 774 @Override 775 public int send(ByteBuffer buffer, MessageInfo messageInfo) 776 throws IOException { 777 if (buffer == null) 778 throw new IllegalArgumentException("buffer cannot be null"); 779 780 if (messageInfo == null) 781 throw new IllegalArgumentException("messageInfo cannot be null"); 782 783 synchronized (sendLock) { 784 ensureOpen(); 785 786 if (!isBound()) 787 bind(null, 0); 788 789 int n = 0; 790 try { 791 int assocId = -1; 792 SocketAddress address = null; 793 begin(); 794 795 synchronized (stateLock) { 796 if(!isOpen()) 797 return 0; 798 senderThread = NativeThread.current(); 799 800 /* Determine what address or association to send to */ 801 Association assoc = messageInfo.association(); 802 InetSocketAddress addr = (InetSocketAddress)messageInfo.address(); 803 if (assoc != null) { 804 checkAssociation(assoc); 805 checkStreamNumber(assoc, messageInfo.streamNumber()); 806 assocId = assoc.associationID(); 807 /* have we also got a preferred address */ 808 if (addr != null) { 809 if (!assoc.equals(addressMap.get(addr))) 810 throw new IllegalArgumentException("given preferred address is not part of this association"); 811 address = addr; 812 } 813 } else if (addr != null) { 814 address = addr; 815 Association association = addressMap.get(addr); 816 if (association != null) { 817 checkStreamNumber(association, messageInfo.streamNumber()); 818 assocId = association.associationID(); 819 820 } else { /* must be new association */ 821 SecurityManager sm = System.getSecurityManager(); 822 if (sm != null) 823 sm.checkConnect(addr.getAddress().getHostAddress(), 824 addr.getPort()); 825 } 826 } else { 827 throw new AssertionError( 828 "Both association and address cannot be null"); 829 } 830 } 831 832 do { 833 n = send(fdVal, buffer, assocId, address, messageInfo); 834 } while ((n == IOStatus.INTERRUPTED) && isOpen()); 835 836 return IOStatus.normalize(n); 837 } finally { 838 senderCleanup(); 839 end((n > 0) || (n == IOStatus.UNAVAILABLE)); 840 assert IOStatus.check(n); 841 } 842 } 843 } 844 845 private int send(int fd, 846 ByteBuffer src, 847 int assocId, 848 SocketAddress target, 849 MessageInfo messageInfo) 850 throws IOException { 851 int streamNumber = messageInfo.streamNumber(); 852 boolean unordered = messageInfo.isUnordered(); 853 int ppid = messageInfo.payloadProtocolID(); 854 855 if (src instanceof DirectBuffer) 856 return sendFromNativeBuffer(fd, src, target, assocId, 857 streamNumber, unordered, ppid); 858 859 /* Substitute a native buffer */ 860 int pos = src.position(); 861 int lim = src.limit(); 862 assert (pos <= lim && streamNumber >= 0); 863 864 int rem = (pos <= lim ? lim - pos : 0); 865 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 866 try { 867 bb.put(src); 868 bb.flip(); 869 /* Do not update src until we see how many bytes were written */ 870 src.position(pos); 871 872 int n = sendFromNativeBuffer(fd, bb, target, assocId, 873 streamNumber, unordered, ppid); 874 if (n > 0) { 875 /* now update src */ 876 src.position(pos + n); 877 } 878 return n; 879 } finally { 880 Util.releaseTemporaryDirectBuffer(bb); 881 } 882 } 883 884 private int sendFromNativeBuffer(int fd, 885 ByteBuffer bb, 886 SocketAddress target, 887 int assocId, 888 int streamNumber, 889 boolean unordered, 890 int ppid) 891 throws IOException { 892 int pos = bb.position(); 893 int lim = bb.limit(); 894 assert (pos <= lim); 895 int rem = (pos <= lim ? lim - pos : 0); 896 897 int written = send0(fd, ((DirectBuffer)bb).address() + pos, 898 rem, target, assocId, streamNumber, unordered, ppid); 899 if (written > 0) 900 bb.position(pos + written); 901 return written; 902 } 903 904 @Override 905 public SctpMultiChannel shutdown(Association association) 906 throws IOException { 907 synchronized (stateLock) { 908 checkAssociation(association); 909 if (!isOpen()) 910 throw new ClosedChannelException(); 911 912 SctpNet.shutdown(fdVal, association.associationID()); 913 } 914 return this; 915 } 916 917 @Override 918 public Set<SocketAddress> getAllLocalAddresses() 919 throws IOException { 920 synchronized (stateLock) { 921 if (!isOpen()) 922 throw new ClosedChannelException(); 923 if (!isBound()) 924 return Collections.emptySet(); 925 926 return SctpNet.getLocalAddresses(fdVal); 927 } 928 } 929 930 @Override 931 public Set<SocketAddress> getRemoteAddresses(Association association) 932 throws IOException { 933 synchronized (stateLock) { 934 checkAssociation(association); 935 if (!isOpen()) 936 throw new ClosedChannelException(); 937 938 try { 939 return SctpNet.getRemoteAddresses(fdVal, association.associationID()); 940 } catch (SocketException se) { 941 /* a valid association should always have remote addresses */ 942 Set<SocketAddress> addrs = associationMap.get(association); 943 return addrs != null ? addrs : Collections.<SocketAddress>emptySet(); 944 } 945 } 946 } 947 948 @Override 949 public SctpChannel branch(Association association) 950 throws IOException { 951 synchronized (stateLock) { 952 checkAssociation(association); 953 if (!isOpen()) 954 throw new ClosedChannelException(); 955 956 FileDescriptor bFd = SctpNet.branch(fdVal, 957 association.associationID()); 958 /* successfully branched, we can now remove it from assoc list */ 959 removeAssociation(association); 960 961 return new SctpChannelImpl(provider(), bFd, association); 962 } 963 } 964 965 /* Use common native implementation shared between 966 * one-to-one and one-to-many */ 967 private static int receive0(int fd, 968 ResultContainer resultContainer, 969 long address, 970 int length) 971 throws IOException{ 972 return SctpChannelImpl.receive0(fd, resultContainer, address, 973 length, false /*peek */); 974 } 975 976 private static int send0(int fd, 977 long address, 978 int length, 979 SocketAddress target, 980 int assocId, 981 int streamNumber, 982 boolean unordered, 983 int ppid) 984 throws IOException { 985 return SctpChannelImpl.send0(fd, address, length, target, assocId, 986 streamNumber, unordered, ppid); 987 } 988 989 static { 990 Util.load(); /* loads nio & net native libraries */ 991 java.security.AccessController.doPrivileged( 992 new sun.security.action.LoadLibraryAction("sctp")); 993 } 994 }