1 /*
   2  * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.nio.ch.sctp;
  26 
  27 import java.net.InetAddress;
  28 import java.net.SocketAddress;
  29 import java.net.SocketException;
  30 import java.net.InetSocketAddress;
  31 import java.io.FileDescriptor;
  32 import java.io.IOException;
  33 import java.util.Collections;
  34 import java.util.Map.Entry;
  35 import java.util.Iterator;
  36 import java.util.Set;
  37 import java.util.HashSet;
  38 import java.util.HashMap;
  39 import java.nio.ByteBuffer;
  40 import java.nio.channels.SelectionKey;
  41 import java.nio.channels.ClosedChannelException;
  42 import java.nio.channels.NotYetBoundException;
  43 import java.nio.channels.spi.SelectorProvider;
  44 import com.sun.nio.sctp.AbstractNotificationHandler;
  45 import com.sun.nio.sctp.Association;
  46 import com.sun.nio.sctp.AssociationChangeNotification;
  47 import com.sun.nio.sctp.HandlerResult;
  48 import com.sun.nio.sctp.IllegalReceiveException;
  49 import com.sun.nio.sctp.InvalidStreamException;
  50 import com.sun.nio.sctp.IllegalUnbindException;
  51 import com.sun.nio.sctp.NotificationHandler;
  52 import com.sun.nio.sctp.MessageInfo;
  53 import com.sun.nio.sctp.SctpChannel;
  54 import com.sun.nio.sctp.SctpMultiChannel;
  55 import com.sun.nio.sctp.SctpSocketOption;
  56 import sun.nio.ch.DirectBuffer;
  57 import sun.nio.ch.NativeThread;
  58 import sun.nio.ch.IOStatus;
  59 import sun.nio.ch.IOUtil;
  60 import sun.nio.ch.Net;
  61 import sun.nio.ch.PollArrayWrapper;
  62 import sun.nio.ch.SelChImpl;
  63 import sun.nio.ch.SelectionKeyImpl;
  64 import sun.nio.ch.Util;
  65 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
  66 import static sun.nio.ch.sctp.ResultContainer.*;
  67 
  68 /**
  69  * An implementation of SctpMultiChannel
  70  */
  71 public class SctpMultiChannelImpl extends SctpMultiChannel
  72     implements SelChImpl
  73 {
  74     private final FileDescriptor fd;
  75 
  76     private final int fdVal;
  77 
  78     /* IDs of native threads doing send and receives, for signalling */
  79     private volatile long receiverThread = 0;
  80     private volatile long senderThread = 0;
  81 
  82     /* Lock held by current receiving thread */
  83     private final Object receiveLock = new Object();
  84 
  85     /* Lock held by current sending thread */
  86     private final Object sendLock = new Object();
  87 
  88     /* Lock held by any thread that modifies the state fields declared below
  89      * DO NOT invoke a blocking I/O operation while holding this lock! */
  90     private final Object stateLock = new Object();
  91 
  92     private enum ChannelState {
  93         UNINITIALIZED,
  94         KILLPENDING,
  95         KILLED,
  96     }
  97 
  98     /* -- The following fields are protected by stateLock -- */
  99     private ChannelState state = ChannelState.UNINITIALIZED;
 100 
 101     /* Binding: Once bound the port will remain constant. */
 102     int port = -1;
 103     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
 104     /* Has the channel been bound to the wildcard address */
 105     private boolean wildcard; /* false */
 106 
 107     /* Keeps a map of addresses to association, and visa versa */
 108     private HashMap<SocketAddress, Association> addressMap =
 109                          new HashMap<SocketAddress, Association>();
 110     private HashMap<Association, Set<SocketAddress>> associationMap =
 111                          new HashMap<Association, Set<SocketAddress>>();
 112 
 113     /* -- End of fields protected by stateLock -- */
 114 
 115     /* If an association has been shutdown mark it for removal after
 116      * the user handler has been invoked */
 117     private final ThreadLocal<Association> associationToRemove =
 118         new ThreadLocal<Association>() {
 119              @Override protected Association initialValue() {
 120                  return null;
 121             }
 122     };
 123 
 124     /* A notification handler cannot invoke receive */
 125     private final ThreadLocal<Boolean> receiveInvoked =
 126         new ThreadLocal<Boolean>() {
 127              @Override protected Boolean initialValue() {
 128                  return Boolean.FALSE;
 129             }
 130     };
 131 
 132     public SctpMultiChannelImpl(SelectorProvider provider)
 133             throws IOException {
 134         //TODO: update provider, remove public modifier
 135         super(provider);
 136         this.fd = SctpNet.socket(false /*one-to-many*/);
 137         this.fdVal = IOUtil.fdVal(fd);
 138     }
 139 
 140     @Override
 141     public SctpMultiChannel bind(SocketAddress local, int backlog)
 142             throws IOException {
 143         synchronized (receiveLock) {
 144             synchronized (sendLock) {
 145                 synchronized (stateLock) {
 146                     ensureOpen();
 147                     if (isBound())
 148                         SctpNet.throwAlreadyBoundException();
 149                     InetSocketAddress isa = (local == null) ?
 150                         new InetSocketAddress(0) : Net.checkAddress(local);
 151 
 152                     SecurityManager sm = System.getSecurityManager();
 153                     if (sm != null)
 154                         sm.checkListen(isa.getPort());
 155                     Net.bind(fd, isa.getAddress(), isa.getPort());
 156 
 157                     InetSocketAddress boundIsa = Net.localAddress(fd);
 158                     port = boundIsa.getPort();
 159                     localAddresses.add(isa);
 160                     if (isa.getAddress().isAnyLocalAddress())
 161                         wildcard = true;
 162 
 163                     SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog);
 164                 }
 165             }
 166         }
 167         return this;
 168     }
 169 
 170     @Override
 171     public SctpMultiChannel bindAddress(InetAddress address)
 172             throws IOException {
 173         return bindUnbindAddress(address, true);
 174     }
 175 
 176     @Override
 177     public SctpMultiChannel unbindAddress(InetAddress address)
 178             throws IOException {
 179         return bindUnbindAddress(address, false);
 180     }
 181 
 182     private SctpMultiChannel bindUnbindAddress(InetAddress address,
 183                                                boolean add)
 184             throws IOException {
 185         if (address == null)
 186             throw new IllegalArgumentException();
 187 
 188         synchronized (receiveLock) {
 189             synchronized (sendLock) {
 190                 synchronized (stateLock) {
 191                     if (!isOpen())
 192                         throw new ClosedChannelException();
 193                     if (!isBound())
 194                         throw new NotYetBoundException();
 195                     if (wildcard)
 196                         throw new IllegalStateException(
 197                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
 198                     if (address.isAnyLocalAddress())
 199                         throw new IllegalArgumentException(
 200                                 "Cannot add or remove the wildcard address");
 201                     if (add) {
 202                         for (InetSocketAddress addr : localAddresses) {
 203                             if (addr.getAddress().equals(address)) {
 204                                 SctpNet.throwAlreadyBoundException();
 205                             }
 206                         }
 207                     } else { /*removing */
 208                         /* Verify that there is more than one address
 209                          * and that address is already bound */
 210                         if (localAddresses.size() <= 1)
 211                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
 212                         boolean foundAddress = false;
 213                         for (InetSocketAddress addr : localAddresses) {
 214                             if (addr.getAddress().equals(address)) {
 215                                 foundAddress = true;
 216                                 break;
 217                             }
 218                         }
 219                         if (!foundAddress )
 220                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
 221                     }
 222 
 223                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
 224 
 225                     /* Update our internal Set to reflect the addition/removal */
 226                     if (add)
 227                         localAddresses.add(new InetSocketAddress(address, port));
 228                     else {
 229                         for (InetSocketAddress addr : localAddresses) {
 230                             if (addr.getAddress().equals(address)) {
 231                                 localAddresses.remove(addr);
 232                                 break;
 233                             }
 234                         }
 235                     }
 236                 }
 237             }
 238         }
 239         return this;
 240     }
 241 
 242     @Override
 243     public Set<Association> associations()
 244             throws ClosedChannelException, NotYetBoundException {
 245         synchronized (stateLock) {
 246             if (!isOpen())
 247                 throw new ClosedChannelException();
 248             if (!isBound())
 249                 throw new NotYetBoundException();
 250 
 251             return Collections.unmodifiableSet(associationMap.keySet());
 252         }
 253     }
 254 
 255     private boolean isBound() {
 256         synchronized (stateLock) {
 257             return port == -1 ? false : true;
 258         }
 259     }
 260 
 261     private void ensureOpen() throws IOException {
 262         synchronized (stateLock) {
 263             if (!isOpen())
 264                 throw new ClosedChannelException();
 265         }
 266     }
 267 
 268     private void receiverCleanup() throws IOException {
 269         synchronized (stateLock) {
 270             receiverThread = 0;
 271             if (state == ChannelState.KILLPENDING)
 272                 kill();
 273         }
 274     }
 275 
 276     private void senderCleanup() throws IOException {
 277         synchronized (stateLock) {
 278             senderThread = 0;
 279             if (state == ChannelState.KILLPENDING)
 280                 kill();
 281         }
 282     }
 283 
 284     @Override
 285     protected void implConfigureBlocking(boolean block) throws IOException {
 286         IOUtil.configureBlocking(fd, block);
 287     }
 288 
 289     @Override
 290     public void implCloseSelectableChannel() throws IOException {
 291         synchronized (stateLock) {
 292             SctpNet.preClose(fdVal);
 293 
 294             if (receiverThread != 0)
 295                 NativeThread.signal(receiverThread);
 296 
 297             if (senderThread != 0)
 298                 NativeThread.signal(senderThread);
 299 
 300             if (!isRegistered())
 301                 kill();
 302         }
 303     }
 304 
 305     @Override
 306     public FileDescriptor getFD() {
 307         return fd;
 308     }
 309 
 310     @Override
 311     public int getFDVal() {
 312         return fdVal;
 313     }
 314 
 315     /**
 316      * Translates native poll revent ops into a ready operation ops
 317      */
 318     private boolean translateReadyOps(int ops, int initialOps,
 319                                       SelectionKeyImpl sk) {
 320         int intOps = sk.nioInterestOps();
 321         int oldOps = sk.nioReadyOps();
 322         int newOps = initialOps;
 323 
 324         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
 325             /* This should only happen if this channel is pre-closed while a
 326              * selection operation is in progress
 327              * ## Throw an error if this channel has not been pre-closed */
 328             return false;
 329         }
 330 
 331         if ((ops & (PollArrayWrapper.POLLERR
 332                     | PollArrayWrapper.POLLHUP)) != 0) {
 333             newOps = intOps;
 334             sk.nioReadyOps(newOps);
 335             return (newOps & ~oldOps) != 0;
 336         }
 337 
 338         if (((ops & PollArrayWrapper.POLLIN) != 0) &&
 339             ((intOps & SelectionKey.OP_READ) != 0))
 340             newOps |= SelectionKey.OP_READ;
 341 
 342         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
 343             ((intOps & SelectionKey.OP_WRITE) != 0))
 344             newOps |= SelectionKey.OP_WRITE;
 345 
 346         sk.nioReadyOps(newOps);
 347         return (newOps & ~oldOps) != 0;
 348     }
 349 
 350     @Override
 351     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
 352         return translateReadyOps(ops, sk.nioReadyOps(), sk);
 353     }
 354 
 355     @Override
 356     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
 357         return translateReadyOps(ops, 0, sk);
 358     }
 359 
 360     @Override
 361     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
 362         int newOps = 0;
 363         if ((ops & SelectionKey.OP_READ) != 0)
 364             newOps |= PollArrayWrapper.POLLIN;
 365         if ((ops & SelectionKey.OP_WRITE) != 0)
 366             newOps |= PollArrayWrapper.POLLOUT;
 367         sk.selector.putEventOps(sk, newOps);
 368     }
 369 
 370     @Override
 371     public void kill() throws IOException {
 372         synchronized (stateLock) {
 373             if (state == ChannelState.KILLED)
 374                 return;
 375             if (state == ChannelState.UNINITIALIZED) {
 376                 state = ChannelState.KILLED;
 377                 return;
 378             }
 379             assert !isOpen() && !isRegistered();
 380 
 381             /* Postpone the kill if there is a thread sending or receiving. */
 382             if (receiverThread == 0 && senderThread == 0) {
 383                 SctpNet.close(fdVal);
 384                 state = ChannelState.KILLED;
 385             } else {
 386                 state = ChannelState.KILLPENDING;
 387             }
 388         }
 389     }
 390 
 391     @Override
 392     public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
 393                                           T value,
 394                                           Association association)
 395             throws IOException {
 396         if (name == null)
 397             throw new NullPointerException();
 398         if (!(supportedOptions().contains(name)))
 399             throw new UnsupportedOperationException("'" + name + "' not supported");
 400 
 401         synchronized (stateLock) {
 402             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
 403                     name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
 404                 checkAssociation(association);
 405             }
 406             if (!isOpen())
 407                 throw new ClosedChannelException();
 408 
 409             int assocId = association == null ? 0 : association.associationID();
 410             SctpNet.setSocketOption(fdVal, name, value, assocId);
 411         }
 412         return this;
 413     }
 414 
 415     @Override
 416     @SuppressWarnings("unchecked")
 417     public <T> T getOption(SctpSocketOption<T> name, Association association)
 418             throws IOException {
 419         if (name == null)
 420             throw new NullPointerException();
 421         if (!supportedOptions().contains(name))
 422             throw new UnsupportedOperationException("'" + name + "' not supported");
 423 
 424         synchronized (stateLock) {
 425             if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
 426                     name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
 427                 checkAssociation(association);
 428             }
 429             if (!isOpen())
 430                 throw new ClosedChannelException();
 431 
 432             int assocId = association == null ? 0 : association.associationID();
 433             return (T)SctpNet.getSocketOption(fdVal, name, assocId);
 434         }
 435     }
 436 
 437     private static class DefaultOptionsHolder {
 438         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
 439 
 440         private static Set<SctpSocketOption<?>> defaultOptions() {
 441             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
 442             set.add(SCTP_DISABLE_FRAGMENTS);
 443             set.add(SCTP_EXPLICIT_COMPLETE);
 444             set.add(SCTP_FRAGMENT_INTERLEAVE);
 445             set.add(SCTP_INIT_MAXSTREAMS);
 446             set.add(SCTP_NODELAY);
 447             set.add(SCTP_PRIMARY_ADDR);
 448             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
 449             set.add(SO_SNDBUF);
 450             set.add(SO_RCVBUF);
 451             set.add(SO_LINGER);
 452             return Collections.unmodifiableSet(set);
 453         }
 454     }
 455 
 456     @Override
 457     public final Set<SctpSocketOption<?>> supportedOptions() {
 458         return DefaultOptionsHolder.defaultOptions;
 459     }
 460 
 461     @Override
 462     public <T> MessageInfo receive(ByteBuffer buffer,
 463                                    T attachment,
 464                                    NotificationHandler<T> handler)
 465             throws IOException {
 466         if (buffer == null)
 467             throw new IllegalArgumentException("buffer cannot be null");
 468 
 469         if (buffer.isReadOnly())
 470             throw new IllegalArgumentException("Read-only buffer");
 471 
 472         if (receiveInvoked.get())
 473             throw new IllegalReceiveException(
 474                     "cannot invoke receive from handler");
 475         receiveInvoked.set(Boolean.TRUE);
 476 
 477         try {
 478             ResultContainer resultContainer = new ResultContainer();
 479             do {
 480                 resultContainer.clear();
 481                 synchronized (receiveLock) {
 482                     ensureOpen();
 483                     if (!isBound())
 484                         throw new NotYetBoundException();
 485 
 486                     int n = 0;
 487                     try {
 488                         begin();
 489 
 490                         synchronized (stateLock) {
 491                             if(!isOpen())
 492                                 return null;
 493                             receiverThread = NativeThread.current();
 494                         }
 495 
 496                         do {
 497                             n = receive(fdVal, buffer, resultContainer);
 498                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
 499 
 500                     } finally {
 501                         receiverCleanup();
 502                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
 503                         assert IOStatus.check(n);
 504                     }
 505 
 506                     if (!resultContainer.isNotification()) {
 507                         /* message or nothing */
 508                         if (resultContainer.hasSomething()) {
 509                             /* Set the association before returning */
 510                             MessageInfoImpl info =
 511                                     resultContainer.getMessageInfo();
 512                             info.setAssociation(lookupAssociation(info.
 513                                     associationID()));
 514                             SecurityManager sm = System.getSecurityManager();
 515                             if (sm != null) {
 516                                 InetSocketAddress isa  = (InetSocketAddress)info.address();
 517                                 if (!addressMap.containsKey(isa)) {
 518                                     /* must be a new association */
 519                                     try {
 520                                         sm.checkAccept(isa.getAddress().getHostAddress(),
 521                                                        isa.getPort());
 522                                     } catch (SecurityException se) {
 523                                         buffer.clear();
 524                                         throw se;
 525                                     }
 526                                 }
 527                             }
 528 
 529                             assert info.association() != null;
 530                             return info;
 531                         } else  {
 532                           /* Non-blocking may return null if nothing available*/
 533                             return null;
 534                         }
 535                     } else { /* notification */
 536                         synchronized (stateLock) {
 537                             handleNotificationInternal(
 538                                     resultContainer);
 539                         }
 540                     }
 541                 } /* receiveLock */
 542             } while (handler == null ? true :
 543                 (invokeNotificationHandler(resultContainer, handler, attachment)
 544                  == HandlerResult.CONTINUE));
 545         } finally {
 546             receiveInvoked.set(Boolean.FALSE);
 547         }
 548 
 549         return null;
 550     }
 551 
 552     private int receive(int fd,
 553                         ByteBuffer dst,
 554                         ResultContainer resultContainer)
 555             throws IOException {
 556         int pos = dst.position();
 557         int lim = dst.limit();
 558         assert (pos <= lim);
 559         int rem = (pos <= lim ? lim - pos : 0);
 560         if (dst instanceof DirectBuffer && rem > 0)
 561             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
 562 
 563         /* Substitute a native buffer. */
 564         int newSize = Math.max(rem, 1);
 565         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
 566         try {
 567             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
 568             bb.flip();
 569             if (n > 0 && rem > 0)
 570                 dst.put(bb);
 571             return n;
 572         } finally {
 573             Util.releaseTemporaryDirectBuffer(bb);
 574         }
 575     }
 576 
 577     private int receiveIntoNativeBuffer(int fd,
 578                                         ResultContainer resultContainer,
 579                                         ByteBuffer bb,
 580                                         int rem,
 581                                         int pos)
 582             throws IOException {
 583         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
 584         if (n > 0)
 585             bb.position(pos + n);
 586         return n;
 587     }
 588 
 589     private InternalNotificationHandler internalNotificationHandler =
 590             new InternalNotificationHandler();
 591 
 592     private void handleNotificationInternal(ResultContainer resultContainer)
 593     {
 594         invokeNotificationHandler(resultContainer,
 595                 internalNotificationHandler, null);
 596     }
 597 
 598     private class InternalNotificationHandler
 599             extends AbstractNotificationHandler<Object>
 600     {
 601         @Override
 602         public HandlerResult handleNotification(
 603                 AssociationChangeNotification not, Object unused) {
 604             AssociationChange sac = (AssociationChange) not;
 605 
 606             /* Update map to reflect change in association */
 607             switch (not.event()) {
 608                 case COMM_UP :
 609                     Association newAssociation = new AssociationImpl
 610                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
 611                     addAssociation(newAssociation);
 612                     break;
 613                 case SHUTDOWN :
 614                 case COMM_LOST :
 615                 //case RESTART: ???
 616                     /* mark association for removal after user handler invoked*/
 617                     associationToRemove.set(lookupAssociation(sac.assocId()));
 618             }
 619             return HandlerResult.CONTINUE;
 620         }
 621     }
 622 
 623     private <T> HandlerResult invokeNotificationHandler(
 624                                    ResultContainer resultContainer,
 625                                    NotificationHandler<T> handler,
 626                                    T attachment) {
 627         HandlerResult result;
 628         SctpNotification notification = resultContainer.notification();
 629         notification.setAssociation(lookupAssociation(notification.assocId()));
 630 
 631         if (!(handler instanceof AbstractNotificationHandler)) {
 632             result = handler.handleNotification(notification, attachment);
 633         } else { /* AbstractNotificationHandler */
 634             AbstractNotificationHandler<T> absHandler =
 635                     (AbstractNotificationHandler<T>)handler;
 636             switch(resultContainer.type()) {
 637                 case ASSOCIATION_CHANGED :
 638                     result = absHandler.handleNotification(
 639                             resultContainer.getAssociationChanged(), attachment);
 640                     break;
 641                 case PEER_ADDRESS_CHANGED :
 642                     result = absHandler.handleNotification(
 643                             resultContainer.getPeerAddressChanged(), attachment);
 644                     break;
 645                 case SEND_FAILED :
 646                     result = absHandler.handleNotification(
 647                             resultContainer.getSendFailed(), attachment);
 648                     break;
 649                 case SHUTDOWN :
 650                     result =  absHandler.handleNotification(
 651                             resultContainer.getShutdown(), attachment);
 652                     break;
 653                 default :
 654                     /* implementation specific handlers */
 655                     result =  absHandler.handleNotification(
 656                             resultContainer.notification(), attachment);
 657             }
 658         }
 659 
 660         if (!(handler instanceof InternalNotificationHandler)) {
 661             /* Only remove associations after user handler
 662              * has finished with them */
 663             Association assoc = associationToRemove.get();
 664             if (assoc != null) {
 665                 removeAssociation(assoc);
 666                 associationToRemove.set(null);
 667             }
 668 
 669         }
 670 
 671         return result;
 672     }
 673 
 674     private Association lookupAssociation(int assocId) {
 675         /* Lookup the association in our internal map */
 676         synchronized (stateLock) {
 677             Set<Association> assocs = associationMap.keySet();
 678             for (Association a : assocs) {
 679                 if (a.associationID() == assocId) {
 680                     return a;
 681                 }
 682             }
 683         }
 684         return null;
 685     }
 686 
 687     private void addAssociation(Association association) {
 688         synchronized (stateLock) {
 689             int assocId = association.associationID();
 690             Set<SocketAddress> addresses = null;
 691 
 692             try {
 693                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
 694             } catch (IOException unused) {
 695                 /* OK, determining connected addresses may not be possible
 696                  * shutdown, connection lost, etc */
 697             }
 698 
 699             associationMap.put(association, addresses);
 700             if (addresses != null) {
 701                 for (SocketAddress addr : addresses)
 702                     addressMap.put(addr, association);
 703             }
 704         }
 705     }
 706 
 707     private void removeAssociation(Association association) {
 708         synchronized (stateLock) {
 709             int assocId = association.associationID();
 710             Set<SocketAddress> addresses = null;
 711 
 712              try {
 713                 addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
 714             } catch (IOException unused) {
 715                 /* OK, determining connected addresses may not be possible
 716                  * shutdown, connection lost, etc */
 717             }
 718 
 719             Set<Association> assocs = associationMap.keySet();
 720             for (Association a : assocs) {
 721                 if (a.associationID() == assocId) {
 722                     associationMap.remove(a);
 723                     break;
 724                 }
 725             }
 726             if (addresses != null) {
 727                 for (SocketAddress addr : addresses)
 728                     addressMap.remove(addr);
 729             } else {
 730                 /* We cannot determine the connected addresses */
 731                 Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
 732                         addressMap.entrySet();
 733                 Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
 734                 while (iterator.hasNext()) {
 735                     Entry<SocketAddress, Association> entry = iterator.next();
 736                     if (entry.getValue().equals(association)) {
 737                         iterator.remove();
 738                     }
 739                 }
 740             }
 741         }
 742     }
 743 
 744     /**
 745      * @throws  IllegalArgumentException
 746      *          If the given association is not controlled by this channel
 747      *
 748      * @return  {@code true} if, and only if, the given association is one
 749      *          of the current associations controlled by this channel
 750      */
 751     private boolean checkAssociation(Association messageAssoc) {
 752         synchronized (stateLock) {
 753             for (Association association : associationMap.keySet()) {
 754                 if (messageAssoc.equals(association)) {
 755                     return true;
 756                 }
 757             }
 758         }
 759         throw new IllegalArgumentException(
 760               "Given Association is not controlled by this channel");
 761     }
 762 
 763     private void checkStreamNumber(Association assoc, int streamNumber) {
 764         synchronized (stateLock) {
 765             if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams())
 766                 throw new InvalidStreamException();
 767         }
 768     }
 769 
 770     /* TODO: Add support for ttl and isComplete to both 121 12M
 771      *       SCTP_EOR not yet supported on reference platforms
 772      *       TTL support limited...
 773      */
 774     @Override
 775     public int send(ByteBuffer buffer, MessageInfo messageInfo)
 776             throws IOException {
 777         if (buffer == null)
 778             throw new IllegalArgumentException("buffer cannot be null");
 779 
 780         if (messageInfo == null)
 781             throw new IllegalArgumentException("messageInfo cannot be null");
 782 
 783         synchronized (sendLock) {
 784             ensureOpen();
 785 
 786             if (!isBound())
 787                 bind(null, 0);
 788 
 789             int n = 0;
 790             try {
 791                 int assocId = -1;
 792                 SocketAddress address = null;
 793                 begin();
 794 
 795                 synchronized (stateLock) {
 796                     if(!isOpen())
 797                         return 0;
 798                     senderThread = NativeThread.current();
 799 
 800                     /* Determine what address or association to send to */
 801                     Association assoc = messageInfo.association();
 802                     InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
 803                     if (assoc != null) {
 804                         checkAssociation(assoc);
 805                         checkStreamNumber(assoc, messageInfo.streamNumber());
 806                         assocId = assoc.associationID();
 807                         /* have we also got a preferred address */
 808                         if (addr != null) {
 809                             if (!assoc.equals(addressMap.get(addr)))
 810                                 throw new IllegalArgumentException("given preferred address is not part of this association");
 811                             address = addr;
 812                         }
 813                     } else if (addr != null) {
 814                         address = addr;
 815                         Association association = addressMap.get(addr);
 816                         if (association != null) {
 817                             checkStreamNumber(association, messageInfo.streamNumber());
 818                             assocId = association.associationID();
 819 
 820                         } else { /* must be new association */
 821                             SecurityManager sm = System.getSecurityManager();
 822                             if (sm != null)
 823                                 sm.checkConnect(addr.getAddress().getHostAddress(),
 824                                                 addr.getPort());
 825                         }
 826                     } else {
 827                         throw new AssertionError(
 828                             "Both association and address cannot be null");
 829                     }
 830                 }
 831 
 832                 do {
 833                     n = send(fdVal, buffer, assocId, address, messageInfo);
 834                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 835 
 836                 return IOStatus.normalize(n);
 837             } finally {
 838                 senderCleanup();
 839                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 840                 assert IOStatus.check(n);
 841             }
 842         }
 843     }
 844 
 845     private int send(int fd,
 846                      ByteBuffer src,
 847                      int assocId,
 848                      SocketAddress target,
 849                      MessageInfo messageInfo)
 850             throws IOException {
 851         int streamNumber = messageInfo.streamNumber();
 852         boolean unordered = messageInfo.isUnordered();
 853         int ppid = messageInfo.payloadProtocolID();
 854 
 855         if (src instanceof DirectBuffer)
 856             return sendFromNativeBuffer(fd, src, target, assocId,
 857                     streamNumber, unordered, ppid);
 858 
 859         /* Substitute a native buffer */
 860         int pos = src.position();
 861         int lim = src.limit();
 862         assert (pos <= lim && streamNumber >= 0);
 863 
 864         int rem = (pos <= lim ? lim - pos : 0);
 865         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
 866         try {
 867             bb.put(src);
 868             bb.flip();
 869             /* Do not update src until we see how many bytes were written */
 870             src.position(pos);
 871 
 872             int n = sendFromNativeBuffer(fd, bb, target, assocId,
 873                     streamNumber, unordered, ppid);
 874             if (n > 0) {
 875                 /* now update src */
 876                 src.position(pos + n);
 877             }
 878             return n;
 879         } finally {
 880             Util.releaseTemporaryDirectBuffer(bb);
 881         }
 882     }
 883 
 884     private int sendFromNativeBuffer(int fd,
 885                                      ByteBuffer bb,
 886                                      SocketAddress target,
 887                                      int assocId,
 888                                      int streamNumber,
 889                                      boolean unordered,
 890                                      int ppid)
 891             throws IOException {
 892         int pos = bb.position();
 893         int lim = bb.limit();
 894         assert (pos <= lim);
 895         int rem = (pos <= lim ? lim - pos : 0);
 896 
 897         int written = send0(fd, ((DirectBuffer)bb).address() + pos,
 898                             rem, target, assocId, streamNumber, unordered, ppid);
 899         if (written > 0)
 900             bb.position(pos + written);
 901         return written;
 902     }
 903 
 904     @Override
 905     public SctpMultiChannel shutdown(Association association)
 906             throws IOException {
 907         synchronized (stateLock) {
 908             checkAssociation(association);
 909             if (!isOpen())
 910                 throw new ClosedChannelException();
 911 
 912             SctpNet.shutdown(fdVal, association.associationID());
 913         }
 914         return this;
 915     }
 916 
 917     @Override
 918     public Set<SocketAddress> getAllLocalAddresses()
 919             throws IOException {
 920         synchronized (stateLock) {
 921             if (!isOpen())
 922                 throw new ClosedChannelException();
 923             if (!isBound())
 924                 return Collections.emptySet();
 925 
 926             return SctpNet.getLocalAddresses(fdVal);
 927         }
 928     }
 929 
 930     @Override
 931     public Set<SocketAddress> getRemoteAddresses(Association association)
 932             throws IOException {
 933         synchronized (stateLock) {
 934             checkAssociation(association);
 935             if (!isOpen())
 936                 throw new ClosedChannelException();
 937 
 938             try {
 939                 return SctpNet.getRemoteAddresses(fdVal, association.associationID());
 940             } catch (SocketException se) {
 941                 /* a valid association should always have remote addresses */
 942                 Set<SocketAddress> addrs = associationMap.get(association);
 943                 return addrs != null ? addrs : Collections.<SocketAddress>emptySet();
 944             }
 945         }
 946     }
 947 
 948     @Override
 949     public SctpChannel branch(Association association)
 950             throws IOException {
 951         synchronized (stateLock) {
 952             checkAssociation(association);
 953             if (!isOpen())
 954                 throw new ClosedChannelException();
 955 
 956             FileDescriptor bFd = SctpNet.branch(fdVal,
 957                                                 association.associationID());
 958             /* successfully branched, we can now remove it from assoc list */
 959             removeAssociation(association);
 960 
 961             return new SctpChannelImpl(provider(), bFd, association);
 962         }
 963     }
 964 
 965     /* Use common native implementation shared between
 966      * one-to-one and one-to-many */
 967     private static int receive0(int fd,
 968                                 ResultContainer resultContainer,
 969                                 long address,
 970                                 int length)
 971             throws IOException{
 972         return SctpChannelImpl.receive0(fd, resultContainer, address,
 973                 length, false /*peek */);
 974     }
 975 
 976     private static int send0(int fd,
 977                              long address,
 978                              int length,
 979                              SocketAddress target,
 980                              int assocId,
 981                              int streamNumber,
 982                              boolean unordered,
 983                              int ppid)
 984             throws IOException {
 985         return SctpChannelImpl.send0(fd, address, length, target, assocId,
 986                 streamNumber, unordered, ppid);
 987     }
 988 
 989     static {
 990         Util.load();   /* loads nio & net native libraries */
 991         java.security.AccessController.doPrivileged(
 992                 new sun.security.action.LoadLibraryAction("sctp"));
 993     }
 994 }