1 /*
   2  * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package sun.nio.ch.sctp;
  26 
  27 import java.net.InetAddress;
  28 import java.net.SocketAddress;
  29 import java.net.SocketException;
  30 import java.net.InetSocketAddress;
  31 import java.io.FileDescriptor;
  32 import java.io.IOException;
  33 import java.util.Collections;
  34 import java.util.Set;
  35 import java.util.HashSet;
  36 import java.nio.ByteBuffer;
  37 import java.nio.channels.SelectionKey;
  38 import java.nio.channels.ClosedChannelException;
  39 import java.nio.channels.ConnectionPendingException;
  40 import java.nio.channels.NoConnectionPendingException;
  41 import java.nio.channels.AlreadyConnectedException;
  42 import java.nio.channels.NotYetBoundException;
  43 import java.nio.channels.NotYetConnectedException;
  44 import java.nio.channels.spi.SelectorProvider;
  45 import com.sun.nio.sctp.AbstractNotificationHandler;
  46 import com.sun.nio.sctp.Association;
  47 import com.sun.nio.sctp.AssociationChangeNotification;
  48 import com.sun.nio.sctp.HandlerResult;
  49 import com.sun.nio.sctp.IllegalReceiveException;
  50 import com.sun.nio.sctp.InvalidStreamException;
  51 import com.sun.nio.sctp.IllegalUnbindException;
  52 import com.sun.nio.sctp.MessageInfo;
  53 import com.sun.nio.sctp.NotificationHandler;
  54 import com.sun.nio.sctp.SctpChannel;
  55 import com.sun.nio.sctp.SctpSocketOption;
  56 import sun.nio.ch.DirectBuffer;
  57 import sun.nio.ch.IOStatus;
  58 import sun.nio.ch.IOUtil;
  59 import sun.nio.ch.NativeThread;
  60 import sun.nio.ch.Net;
  61 import sun.nio.ch.PollArrayWrapper;
  62 import sun.nio.ch.SelChImpl;
  63 import sun.nio.ch.SelectionKeyImpl;
  64 import sun.nio.ch.Util;
  65 import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
  66 import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
  67 import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
  68 import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
  69 import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
  70 
  71 /**
  72  * An implementation of an SctpChannel
  73  */
  74 public class SctpChannelImpl extends SctpChannel
  75     implements SelChImpl
  76 {
  77     private final FileDescriptor fd;
  78 
  79     private final int fdVal;
  80 
  81     /* IDs of native threads doing send and receivess, for signalling */
  82     private volatile long receiverThread = 0;
  83     private volatile long senderThread = 0;
  84 
  85     /* Lock held by current receiving or connecting thread */
  86     private final Object receiveLock = new Object();
  87 
  88     /* Lock held by current sending or connecting thread */
  89     private final Object sendLock = new Object();
  90 
  91     private final ThreadLocal<Boolean> receiveInvoked =
  92         new ThreadLocal<Boolean>() {
  93              @Override protected Boolean initialValue() {
  94                  return Boolean.FALSE;
  95             }
  96     };
  97 
  98     /* Lock held by any thread that modifies the state fields declared below
  99        DO NOT invoke a blocking I/O operation while holding this lock! */
 100     private final Object stateLock = new Object();
 101 
 102     private enum ChannelState {
 103         UNINITIALIZED,
 104         UNCONNECTED,
 105         PENDING,
 106         CONNECTED,
 107         KILLPENDING,
 108         KILLED,
 109     }
 110     /* -- The following fields are protected by stateLock -- */
 111     private ChannelState state = ChannelState.UNINITIALIZED;
 112 
 113     /* Binding; Once bound the port will remain constant. */
 114     int port = -1;
 115     private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
 116     /* Has the channel been bound to the wildcard address */
 117     private boolean wildcard; /* false */
 118     //private InetSocketAddress remoteAddress = null;
 119 
 120     /* Input/Output open */
 121     private boolean readyToConnect;
 122 
 123     /* Shutdown */
 124     private boolean isShutdown;
 125 
 126     private Association association;
 127 
 128     private Set<SocketAddress> remoteAddresses = Collections.emptySet();
 129 
 130     /* -- End of fields protected by stateLock -- */
 131 
 132     /**
 133      * Constructor for normal connecting sockets
 134      */
 135     public SctpChannelImpl(SelectorProvider provider) throws IOException {
 136         //TODO: update provider remove public modifier
 137         super(provider);
 138         this.fd = SctpNet.socket(true);
 139         this.fdVal = IOUtil.fdVal(fd);
 140         this.state = ChannelState.UNCONNECTED;
 141     }
 142 
 143     /**
 144      * Constructor for sockets obtained from server sockets
 145      */
 146     public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
 147          throws IOException {
 148         this(provider, fd, null);
 149     }
 150 
 151     /**
 152      * Constructor for sockets obtained from branching
 153      */
 154     public SctpChannelImpl(SelectorProvider provider,
 155                            FileDescriptor fd,
 156                            Association association)
 157             throws IOException {
 158         super(provider);
 159         this.fd = fd;
 160         this.fdVal = IOUtil.fdVal(fd);
 161         this.state = ChannelState.CONNECTED;
 162         port = (Net.localAddress(fd)).getPort();
 163 
 164         if (association != null) { /* branched */
 165             this.association = association;
 166         } else { /* obtained from server channel */
 167             /* Receive COMM_UP */
 168             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
 169             try {
 170                 receive(buf, null, null, true);
 171             } finally {
 172                 Util.releaseTemporaryDirectBuffer(buf);
 173             }
 174         }
 175     }
 176 
 177     /**
 178      * Binds the channel's socket to a local address.
 179      */
 180     @Override
 181     public SctpChannel bind(SocketAddress local) throws IOException {
 182         synchronized (receiveLock) {
 183             synchronized (sendLock) {
 184                 synchronized (stateLock) {
 185                     ensureOpenAndUnconnected();
 186                     if (isBound())
 187                         SctpNet.throwAlreadyBoundException();
 188                     InetSocketAddress isa = (local == null) ?
 189                         new InetSocketAddress(0) : Net.checkAddress(local);
 190                     Net.bind(fd, isa.getAddress(), isa.getPort());
 191                     InetSocketAddress boundIsa = Net.localAddress(fd);
 192                     port = boundIsa.getPort();
 193                     localAddresses.add(isa);
 194                     if (isa.getAddress().isAnyLocalAddress())
 195                         wildcard = true;
 196                 }
 197             }
 198         }
 199         return this;
 200     }
 201 
 202     @Override
 203     public SctpChannel bindAddress(InetAddress address)
 204             throws IOException {
 205         bindUnbindAddress(address, true);
 206         localAddresses.add(new InetSocketAddress(address, port));
 207         return this;
 208     }
 209 
 210     @Override
 211     public SctpChannel unbindAddress(InetAddress address)
 212             throws IOException {
 213         bindUnbindAddress(address, false);
 214         localAddresses.remove(new InetSocketAddress(address, port));
 215         return this;
 216     }
 217 
 218     private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
 219             throws IOException {
 220         if (address == null)
 221             throw new IllegalArgumentException();
 222 
 223         synchronized (receiveLock) {
 224             synchronized (sendLock) {
 225                 synchronized (stateLock) {
 226                     if (!isOpen())
 227                         throw new ClosedChannelException();
 228                     if (!isBound())
 229                         throw new NotYetBoundException();
 230                     if (wildcard)
 231                         throw new IllegalStateException(
 232                                 "Cannot add or remove addresses from a channel that is bound to the wildcard address");
 233                     if (address.isAnyLocalAddress())
 234                         throw new IllegalArgumentException(
 235                                 "Cannot add or remove the wildcard address");
 236                     if (add) {
 237                         for (InetSocketAddress addr : localAddresses) {
 238                             if (addr.getAddress().equals(address)) {
 239                                 SctpNet.throwAlreadyBoundException();
 240                             }
 241                         }
 242                     } else { /*removing */
 243                         /* Verify that there is more than one address
 244                          * and that address is already bound */
 245                         if (localAddresses.size() <= 1)
 246                             throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
 247                         boolean foundAddress = false;
 248                         for (InetSocketAddress addr : localAddresses) {
 249                             if (addr.getAddress().equals(address)) {
 250                                 foundAddress = true;
 251                                 break;
 252                             }
 253                         }
 254                         if (!foundAddress )
 255                             throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
 256                     }
 257 
 258                     SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
 259 
 260                     /* Update our internal Set to reflect the addition/removal */
 261                     if (add)
 262                         localAddresses.add(new InetSocketAddress(address, port));
 263                     else {
 264                         for (InetSocketAddress addr : localAddresses) {
 265                             if (addr.getAddress().equals(address)) {
 266                                 localAddresses.remove(addr);
 267                                 break;
 268                             }
 269                         }
 270                     }
 271                 }
 272             }
 273         }
 274         return this;
 275     }
 276 
 277     private boolean isBound() {
 278         synchronized (stateLock) {
 279             return port == -1 ? false : true;
 280         }
 281     }
 282 
 283     private boolean isConnected() {
 284         synchronized (stateLock) {
 285             return (state == ChannelState.CONNECTED);
 286         }
 287     }
 288 
 289     private void ensureOpenAndUnconnected() throws IOException {
 290         synchronized (stateLock) {
 291             if (!isOpen())
 292                 throw new ClosedChannelException();
 293             if (isConnected())
 294                 throw new AlreadyConnectedException();
 295             if (state == ChannelState.PENDING)
 296                 throw new ConnectionPendingException();
 297         }
 298     }
 299 
 300     private boolean ensureReceiveOpen() throws ClosedChannelException {
 301         synchronized (stateLock) {
 302             if (!isOpen())
 303                 throw new ClosedChannelException();
 304             if (!isConnected())
 305                 throw new NotYetConnectedException();
 306             else
 307                 return true;
 308         }
 309     }
 310 
 311     private void ensureSendOpen() throws ClosedChannelException {
 312         synchronized (stateLock) {
 313             if (!isOpen())
 314                 throw new ClosedChannelException();
 315             if (isShutdown)
 316                 throw new ClosedChannelException();
 317             if (!isConnected())
 318                 throw new NotYetConnectedException();
 319         }
 320     }
 321 
 322     private void receiverCleanup() throws IOException {
 323         synchronized (stateLock) {
 324             receiverThread = 0;
 325             if (state == ChannelState.KILLPENDING)
 326                 kill();
 327         }
 328     }
 329 
 330     private void senderCleanup() throws IOException {
 331         synchronized (stateLock) {
 332             senderThread = 0;
 333             if (state == ChannelState.KILLPENDING)
 334                 kill();
 335         }
 336     }
 337 
 338     @Override
 339     public Association association() throws ClosedChannelException {
 340         synchronized (stateLock) {
 341             if (!isOpen())
 342                 throw new ClosedChannelException();
 343             if (!isConnected())
 344                 return null;
 345 
 346             return association;
 347         }
 348     }
 349 
 350     @Override
 351     public boolean connect(SocketAddress endpoint) throws IOException {
 352         synchronized (receiveLock) {
 353             synchronized (sendLock) {
 354                 ensureOpenAndUnconnected();
 355                 InetSocketAddress isa = Net.checkAddress(endpoint);
 356                 SecurityManager sm = System.getSecurityManager();
 357                 if (sm != null)
 358                     sm.checkConnect(isa.getAddress().getHostAddress(),
 359                                     isa.getPort());
 360                 synchronized (blockingLock()) {
 361                     int n = 0;
 362                     try {
 363                         try {
 364                             begin();
 365                             synchronized (stateLock) {
 366                                 if (!isOpen()) {
 367                                     return false;
 368                                 }
 369                                 receiverThread = NativeThread.current();
 370                             }
 371                             for (;;) {
 372                                 InetAddress ia = isa.getAddress();
 373                                 if (ia.isAnyLocalAddress())
 374                                     ia = InetAddress.getLocalHost();
 375                                 n = SctpNet.connect(fdVal, ia, isa.getPort());
 376                                 if (  (n == IOStatus.INTERRUPTED)
 377                                       && isOpen())
 378                                     continue;
 379                                 break;
 380                             }
 381                         } finally {
 382                             receiverCleanup();
 383                             end((n > 0) || (n == IOStatus.UNAVAILABLE));
 384                             assert IOStatus.check(n);
 385                         }
 386                     } catch (IOException x) {
 387                         /* If an exception was thrown, close the channel after
 388                          * invoking end() so as to avoid bogus
 389                          * AsynchronousCloseExceptions */
 390                         close();
 391                         throw x;
 392                     }
 393 
 394                     if (n > 0) {
 395                         synchronized (stateLock) {
 396                             /* Connection succeeded */
 397                             state = ChannelState.CONNECTED;
 398                             if (!isBound()) {
 399                                 InetSocketAddress boundIsa =
 400                                         Net.localAddress(fd);
 401                                 port = boundIsa.getPort();
 402                             }
 403 
 404                             /* Receive COMM_UP */
 405                             ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
 406                             try {
 407                                 receive(buf, null, null, true);
 408                             } finally {
 409                                 Util.releaseTemporaryDirectBuffer(buf);
 410                             }
 411 
 412                             /* cache remote addresses */
 413                             try {
 414                                 remoteAddresses = getRemoteAddresses();
 415                             } catch (IOException unused) { /* swallow exception */ }
 416 
 417                             return true;
 418                         }
 419                     } else  {
 420                         synchronized (stateLock) {
 421                             /* If nonblocking and no exception then connection
 422                              * pending; disallow another invocation */
 423                             if (!isBlocking())
 424                                 state = ChannelState.PENDING;
 425                             else
 426                                 assert false;
 427                         }
 428                     }
 429                 }
 430                 return false;
 431             }
 432         }
 433     }
 434 
 435     @Override
 436     public boolean connect(SocketAddress endpoint,
 437                            int maxOutStreams,
 438                            int maxInStreams)
 439             throws IOException {
 440         ensureOpenAndUnconnected();
 441         return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
 442                 create(maxInStreams, maxOutStreams)).connect(endpoint);
 443 
 444     }
 445 
 446     @Override
 447     public boolean isConnectionPending() {
 448         synchronized (stateLock) {
 449             return (state == ChannelState.PENDING);
 450         }
 451     }
 452 
 453     @Override
 454     public boolean finishConnect() throws IOException {
 455         synchronized (receiveLock) {
 456             synchronized (sendLock) {
 457                 synchronized (stateLock) {
 458                     if (!isOpen())
 459                         throw new ClosedChannelException();
 460                     if (isConnected())
 461                         return true;
 462                     if (state != ChannelState.PENDING)
 463                         throw new NoConnectionPendingException();
 464                 }
 465                 int n = 0;
 466                 try {
 467                     try {
 468                         begin();
 469                         synchronized (blockingLock()) {
 470                             synchronized (stateLock) {
 471                                 if (!isOpen()) {
 472                                     return false;
 473                                 }
 474                                 receiverThread = NativeThread.current();
 475                             }
 476                             if (!isBlocking()) {
 477                                 for (;;) {
 478                                     n = checkConnect(fd, false, readyToConnect);
 479                                     if (  (n == IOStatus.INTERRUPTED)
 480                                           && isOpen())
 481                                         continue;
 482                                     break;
 483                                 }
 484                             } else {
 485                                 for (;;) {
 486                                     n = checkConnect(fd, true, readyToConnect);
 487                                     if (n == 0) {
 488                                         // Loop in case of
 489                                         // spurious notifications
 490                                         continue;
 491                                     }
 492                                     if (  (n == IOStatus.INTERRUPTED)
 493                                           && isOpen())
 494                                         continue;
 495                                     break;
 496                                 }
 497                             }
 498                         }
 499                     } finally {
 500                         synchronized (stateLock) {
 501                             receiverThread = 0;
 502                             if (state == ChannelState.KILLPENDING) {
 503                                 kill();
 504                                 /* poll()/getsockopt() does not report
 505                                  * error (throws exception, with n = 0)
 506                                  * on Linux platform after dup2 and
 507                                  * signal-wakeup. Force n to 0 so the
 508                                  * end() can throw appropriate exception */
 509                                 n = 0;
 510                             }
 511                         }
 512                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
 513                         assert IOStatus.check(n);
 514                     }
 515                 } catch (IOException x) {
 516                     /* If an exception was thrown, close the channel after
 517                      * invoking end() so as to avoid bogus
 518                      * AsynchronousCloseExceptions */
 519                     close();
 520                     throw x;
 521                 }
 522 
 523                 if (n > 0) {
 524                     synchronized (stateLock) {
 525                         state = ChannelState.CONNECTED;
 526                         if (!isBound()) {
 527                             InetSocketAddress boundIsa =
 528                                     Net.localAddress(fd);
 529                             port = boundIsa.getPort();
 530                         }
 531 
 532                         /* Receive COMM_UP */
 533                         ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
 534                         try {
 535                             receive(buf, null, null, true);
 536                         } finally {
 537                             Util.releaseTemporaryDirectBuffer(buf);
 538                         }
 539 
 540                         /* cache remote addresses */
 541                         try {
 542                             remoteAddresses = getRemoteAddresses();
 543                         } catch (IOException unused) { /* swallow exception */ }
 544 
 545                         return true;
 546                     }
 547                 }
 548             }
 549         }
 550         return false;
 551     }
 552 
 553     @Override
 554     protected void implConfigureBlocking(boolean block) throws IOException {
 555         IOUtil.configureBlocking(fd, block);
 556     }
 557 
 558     @Override
 559     public void implCloseSelectableChannel() throws IOException {
 560         synchronized (stateLock) {
 561             SctpNet.preClose(fdVal);
 562 
 563             if (receiverThread != 0)
 564                 NativeThread.signal(receiverThread);
 565 
 566             if (senderThread != 0)
 567                 NativeThread.signal(senderThread);
 568 
 569             if (!isRegistered())
 570                 kill();
 571         }
 572     }
 573 
 574     @Override
 575     public FileDescriptor getFD() {
 576         return fd;
 577     }
 578 
 579     @Override
 580     public int getFDVal() {
 581         return fdVal;
 582     }
 583 
 584     /**
 585      * Translates native poll revent ops into a ready operation ops
 586      */
 587     private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
 588         int intOps = sk.nioInterestOps();
 589         int oldOps = sk.nioReadyOps();
 590         int newOps = initialOps;
 591 
 592         if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
 593             /* This should only happen if this channel is pre-closed while a
 594              * selection operation is in progress
 595              * ## Throw an error if this channel has not been pre-closed */
 596             return false;
 597         }
 598 
 599         if ((ops & (PollArrayWrapper.POLLERR
 600                     | PollArrayWrapper.POLLHUP)) != 0) {
 601             newOps = intOps;
 602             sk.nioReadyOps(newOps);
 603             /* No need to poll again in checkConnect,
 604              * the error will be detected there */
 605             readyToConnect = true;
 606             return (newOps & ~oldOps) != 0;
 607         }
 608 
 609         if (((ops & PollArrayWrapper.POLLIN) != 0) &&
 610             ((intOps & SelectionKey.OP_READ) != 0) &&
 611             isConnected())
 612             newOps |= SelectionKey.OP_READ;
 613 
 614         if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
 615             ((intOps & SelectionKey.OP_CONNECT) != 0) &&
 616             ((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
 617             newOps |= SelectionKey.OP_CONNECT;
 618             readyToConnect = true;
 619         }
 620 
 621         if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
 622             ((intOps & SelectionKey.OP_WRITE) != 0) &&
 623             isConnected())
 624             newOps |= SelectionKey.OP_WRITE;
 625 
 626         sk.nioReadyOps(newOps);
 627         return (newOps & ~oldOps) != 0;
 628     }
 629 
 630     @Override
 631     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
 632         return translateReadyOps(ops, sk.nioReadyOps(), sk);
 633     }
 634 
 635     @Override
 636     @SuppressWarnings("all")
 637     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
 638         return translateReadyOps(ops, 0, sk);
 639     }
 640 
 641     @Override
 642     public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
 643         int newOps = 0;
 644         if ((ops & SelectionKey.OP_READ) != 0)
 645             newOps |= PollArrayWrapper.POLLIN;
 646         if ((ops & SelectionKey.OP_WRITE) != 0)
 647             newOps |= PollArrayWrapper.POLLOUT;
 648         if ((ops & SelectionKey.OP_CONNECT) != 0)
 649             newOps |= PollArrayWrapper.POLLCONN;
 650         sk.selector.putEventOps(sk, newOps);
 651     }
 652 
 653     @Override
 654     public void kill() throws IOException {
 655         synchronized (stateLock) {
 656             if (state == ChannelState.KILLED)
 657                 return;
 658             if (state == ChannelState.UNINITIALIZED) {
 659                 state = ChannelState.KILLED;
 660                 return;
 661             }
 662             assert !isOpen() && !isRegistered();
 663 
 664             /* Postpone the kill if there is a waiting reader
 665              * or writer thread. */
 666             if (receiverThread == 0 && senderThread == 0) {
 667                 SctpNet.close(fdVal);
 668                 state = ChannelState.KILLED;
 669             } else {
 670                 state = ChannelState.KILLPENDING;
 671             }
 672         }
 673     }
 674 
 675     @Override
 676     public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
 677             throws IOException {
 678         if (name == null)
 679             throw new NullPointerException();
 680         if (!supportedOptions().contains(name))
 681             throw new UnsupportedOperationException("'" + name + "' not supported");
 682 
 683         synchronized (stateLock) {
 684             if (!isOpen())
 685                 throw new ClosedChannelException();
 686 
 687             SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
 688         }
 689         return this;
 690     }
 691 
 692     @Override
 693     @SuppressWarnings("unchecked")
 694     public <T> T getOption(SctpSocketOption<T> name) throws IOException {
 695         if (name == null)
 696             throw new NullPointerException();
 697         if (!supportedOptions().contains(name))
 698             throw new UnsupportedOperationException("'" + name + "' not supported");
 699 
 700         synchronized (stateLock) {
 701             if (!isOpen())
 702                 throw new ClosedChannelException();
 703 
 704             return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
 705         }
 706     }
 707 
 708     private static class DefaultOptionsHolder {
 709         static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
 710 
 711         private static Set<SctpSocketOption<?>> defaultOptions() {
 712             HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
 713             set.add(SCTP_DISABLE_FRAGMENTS);
 714             set.add(SCTP_EXPLICIT_COMPLETE);
 715             set.add(SCTP_FRAGMENT_INTERLEAVE);
 716             set.add(SCTP_INIT_MAXSTREAMS);
 717             set.add(SCTP_NODELAY);
 718             set.add(SCTP_PRIMARY_ADDR);
 719             set.add(SCTP_SET_PEER_PRIMARY_ADDR);
 720             set.add(SO_SNDBUF);
 721             set.add(SO_RCVBUF);
 722             set.add(SO_LINGER);
 723             return Collections.unmodifiableSet(set);
 724         }
 725     }
 726 
 727     @Override
 728     public final Set<SctpSocketOption<?>> supportedOptions() {
 729         return DefaultOptionsHolder.defaultOptions;
 730     }
 731 
 732     @Override
 733     public <T> MessageInfo receive(ByteBuffer buffer,
 734                                    T attachment,
 735                                    NotificationHandler<T> handler)
 736             throws IOException {
 737         return receive(buffer, attachment, handler, false);
 738     }
 739 
 740     private <T> MessageInfo receive(ByteBuffer buffer,
 741                                     T attachment,
 742                                     NotificationHandler<T> handler,
 743                                     boolean fromConnect)
 744             throws IOException {
 745         if (buffer == null)
 746             throw new IllegalArgumentException("buffer cannot be null");
 747 
 748         if (buffer.isReadOnly())
 749             throw new IllegalArgumentException("Read-only buffer");
 750 
 751         if (receiveInvoked.get())
 752             throw new IllegalReceiveException(
 753                     "cannot invoke receive from handler");
 754         receiveInvoked.set(Boolean.TRUE);
 755 
 756         try {
 757             ResultContainer resultContainer = new ResultContainer();
 758             do {
 759                 resultContainer.clear();
 760                 synchronized (receiveLock) {
 761                     if (!ensureReceiveOpen())
 762                         return null;
 763 
 764                     int n = 0;
 765                     try {
 766                         begin();
 767 
 768                         synchronized (stateLock) {
 769                             if(!isOpen())
 770                                 return null;
 771                             receiverThread = NativeThread.current();
 772                         }
 773 
 774                         do {
 775                             n = receive(fdVal, buffer, resultContainer, fromConnect);
 776                         } while ((n == IOStatus.INTERRUPTED) && isOpen());
 777                     } finally {
 778                         receiverCleanup();
 779                         end((n > 0) || (n == IOStatus.UNAVAILABLE));
 780                         assert IOStatus.check(n);
 781                     }
 782 
 783                     if (!resultContainer.isNotification()) {
 784                         /* message or nothing */
 785                         if (resultContainer.hasSomething()) {
 786                             /* Set the association before returning */
 787                             MessageInfoImpl info =
 788                                     resultContainer.getMessageInfo();
 789                             synchronized (stateLock) {
 790                                 assert association != null;
 791                                 info.setAssociation(association);
 792                             }
 793                             return info;
 794                         } else
 795                             /* Non-blocking may return null if nothing available*/
 796                             return null;
 797                     } else { /* notification */
 798                         synchronized (stateLock) {
 799                             handleNotificationInternal(
 800                                     resultContainer);
 801                         }
 802                     }
 803 
 804                     if (fromConnect)  {
 805                         /* If we reach here, then it was connect that invoked
 806                          * receive and received the COMM_UP. We have already
 807                          * handled the COMM_UP with the internal notification
 808                          * handler. Simply return. */
 809                         return null;
 810                     }
 811                 }  /* receiveLock */
 812             } while (handler == null ? true :
 813                 (invokeNotificationHandler(resultContainer, handler, attachment)
 814                  == HandlerResult.CONTINUE));
 815 
 816             return null;
 817         } finally {
 818             receiveInvoked.set(Boolean.FALSE);
 819         }
 820     }
 821 
 822     private int receive(int fd,
 823                         ByteBuffer dst,
 824                         ResultContainer resultContainer,
 825                         boolean peek)
 826             throws IOException {
 827         int pos = dst.position();
 828         int lim = dst.limit();
 829         assert (pos <= lim);
 830         int rem = (pos <= lim ? lim - pos : 0);
 831         if (dst instanceof DirectBuffer && rem > 0)
 832             return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
 833 
 834         /* Substitute a native buffer */
 835         int newSize = Math.max(rem, 1);
 836         ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
 837         try {
 838             int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
 839             bb.flip();
 840             if (n > 0 && rem > 0)
 841                 dst.put(bb);
 842             return n;
 843         } finally {
 844             Util.releaseTemporaryDirectBuffer(bb);
 845         }
 846     }
 847 
 848     private int receiveIntoNativeBuffer(int fd,
 849                                         ResultContainer resultContainer,
 850                                         ByteBuffer bb,
 851                                         int rem,
 852                                         int pos,
 853                                         boolean peek)
 854         throws IOException
 855     {
 856         int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
 857 
 858         if (n > 0)
 859             bb.position(pos + n);
 860         return n;
 861     }
 862 
 863     private InternalNotificationHandler internalNotificationHandler =
 864             new InternalNotificationHandler();
 865 
 866     private void handleNotificationInternal(ResultContainer resultContainer)
 867     {
 868         invokeNotificationHandler(resultContainer,
 869                 internalNotificationHandler, null);
 870     }
 871 
 872     private class InternalNotificationHandler
 873             extends AbstractNotificationHandler<Object>
 874     {
 875         @Override
 876         public HandlerResult handleNotification(
 877                 AssociationChangeNotification not, Object unused) {
 878             if (not.event().equals(
 879                     AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
 880                     association == null) {
 881                 AssociationChange sac = (AssociationChange) not;
 882                 association = new AssociationImpl
 883                        (sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
 884             }
 885             return HandlerResult.CONTINUE;
 886         }
 887     }
 888 
 889     private <T> HandlerResult invokeNotificationHandler
 890                                  (ResultContainer resultContainer,
 891                                   NotificationHandler<T> handler,
 892                                   T attachment) {
 893         SctpNotification notification = resultContainer.notification();
 894         synchronized (stateLock) {
 895             notification.setAssociation(association);
 896         }
 897 
 898         if (!(handler instanceof AbstractNotificationHandler)) {
 899             return handler.handleNotification(notification, attachment);
 900         }
 901 
 902         /* AbstractNotificationHandler */
 903         AbstractNotificationHandler<T> absHandler =
 904                 (AbstractNotificationHandler<T>)handler;
 905         switch(resultContainer.type()) {
 906             case ASSOCIATION_CHANGED :
 907                 return absHandler.handleNotification(
 908                         resultContainer.getAssociationChanged(), attachment);
 909             case PEER_ADDRESS_CHANGED :
 910                 return absHandler.handleNotification(
 911                         resultContainer.getPeerAddressChanged(), attachment);
 912             case SEND_FAILED :
 913                 return absHandler.handleNotification(
 914                         resultContainer.getSendFailed(), attachment);
 915             case SHUTDOWN :
 916                 return absHandler.handleNotification(
 917                         resultContainer.getShutdown(), attachment);
 918             default :
 919                 /* implementation specific handlers */
 920                 return absHandler.handleNotification(
 921                         resultContainer.notification(), attachment);
 922         }
 923     }
 924 
 925     private void checkAssociation(Association sendAssociation) {
 926         synchronized (stateLock) {
 927             if (sendAssociation != null && !sendAssociation.equals(association)) {
 928                 throw new IllegalArgumentException(
 929                         "Cannot send to another association");
 930             }
 931         }
 932     }
 933 
 934     private void checkStreamNumber(int streamNumber) {
 935         synchronized (stateLock) {
 936             if (association != null) {
 937                 if (streamNumber < 0 ||
 938                       streamNumber >= association.maxOutboundStreams())
 939                     throw new InvalidStreamException();
 940             }
 941         }
 942     }
 943 
 944     /* TODO: Add support for ttl and isComplete to both 121 12M
 945      *       SCTP_EOR not yet supported on reference platforms
 946      *       TTL support limited...
 947      */
 948     @Override
 949     public int send(ByteBuffer buffer, MessageInfo messageInfo)
 950             throws IOException {
 951         if (buffer == null)
 952             throw new IllegalArgumentException("buffer cannot be null");
 953 
 954         if (messageInfo == null)
 955             throw new IllegalArgumentException("messageInfo cannot be null");
 956 
 957         checkAssociation(messageInfo.association());
 958         checkStreamNumber(messageInfo.streamNumber());
 959 
 960         synchronized (sendLock) {
 961             ensureSendOpen();
 962 
 963             int n = 0;
 964             try {
 965                 begin();
 966 
 967                 synchronized (stateLock) {
 968                     if(!isOpen())
 969                         return 0;
 970                     senderThread = NativeThread.current();
 971                 }
 972 
 973                 do {
 974                     n = send(fdVal, buffer, messageInfo);
 975                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
 976 
 977                 return IOStatus.normalize(n);
 978             } finally {
 979                 senderCleanup();
 980                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 981                 assert IOStatus.check(n);
 982             }
 983         }
 984     }
 985 
 986     private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
 987             throws IOException {
 988         int streamNumber = messageInfo.streamNumber();
 989         SocketAddress target = messageInfo.address();
 990         boolean unordered = messageInfo.isUnordered();
 991         int ppid = messageInfo.payloadProtocolID();
 992 
 993         if (src instanceof DirectBuffer)
 994             return sendFromNativeBuffer(fd, src, target, streamNumber,
 995                     unordered, ppid);
 996 
 997         /* Substitute a native buffer */
 998         int pos = src.position();
 999         int lim = src.limit();
1000         assert (pos <= lim && streamNumber >= 0);
1001 
1002         int rem = (pos <= lim ? lim - pos : 0);
1003         ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
1004         try {
1005             bb.put(src);
1006             bb.flip();
1007             /* Do not update src until we see how many bytes were written */
1008             src.position(pos);
1009 
1010             int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
1011                     unordered, ppid);
1012             if (n > 0) {
1013                 /* now update src */
1014                 src.position(pos + n);
1015             }
1016             return n;
1017         } finally {
1018             Util.releaseTemporaryDirectBuffer(bb);
1019         }
1020     }
1021 
1022     private int sendFromNativeBuffer(int fd,
1023                                      ByteBuffer bb,
1024                                      SocketAddress target,
1025                                      int streamNumber,
1026                                      boolean unordered,
1027                                      int ppid)
1028             throws IOException {
1029         int pos = bb.position();
1030         int lim = bb.limit();
1031         assert (pos <= lim);
1032         int rem = (pos <= lim ? lim - pos : 0);
1033 
1034         int written = send0(fd, ((DirectBuffer)bb).address() + pos,
1035                             rem, target, -1 /*121*/, streamNumber, unordered, ppid);
1036         if (written > 0)
1037             bb.position(pos + written);
1038         return written;
1039     }
1040 
1041     @Override
1042     public SctpChannel shutdown() throws IOException {
1043         synchronized(stateLock) {
1044             if (isShutdown)
1045                 return this;
1046 
1047             ensureSendOpen();
1048             SctpNet.shutdown(fdVal, -1);
1049             if (senderThread != 0)
1050                 NativeThread.signal(senderThread);
1051             isShutdown = true;
1052         }
1053         return this;
1054     }
1055 
1056     @Override
1057     public Set<SocketAddress> getAllLocalAddresses()
1058             throws IOException {
1059         synchronized (stateLock) {
1060             if (!isOpen())
1061                 throw new ClosedChannelException();
1062             if (!isBound())
1063                 return Collections.emptySet();
1064 
1065             return SctpNet.getLocalAddresses(fdVal);
1066         }
1067     }
1068 
1069     @Override
1070     public Set<SocketAddress> getRemoteAddresses()
1071             throws IOException {
1072         synchronized (stateLock) {
1073             if (!isOpen())
1074                 throw new ClosedChannelException();
1075             if (!isConnected() || isShutdown)
1076                 return Collections.emptySet();
1077 
1078             try {
1079                 return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
1080             } catch (SocketException unused) {
1081                 /* an open connected channel should always have remote addresses */
1082                 return remoteAddresses;
1083             }
1084         }
1085     }
1086 
1087     /* Native */
1088     private static native void initIDs();
1089 
1090     static native int receive0(int fd, ResultContainer resultContainer,
1091             long address, int length, boolean peek) throws IOException;
1092 
1093     static native int send0(int fd, long address, int length,
1094             SocketAddress target, int assocId, int streamNumber,
1095             boolean unordered, int ppid) throws IOException;
1096 
1097     private static native int checkConnect(FileDescriptor fd, boolean block,
1098             boolean ready) throws IOException;
1099 
1100     static {
1101         Util.load();   /* loads nio & net native libraries */
1102         java.security.AccessController.doPrivileged(
1103             new java.security.PrivilegedAction<Void>() {
1104                 public Void run() {
1105                     System.loadLibrary("sctp");
1106                     return null;
1107                 }
1108             });
1109         initIDs();
1110     }
1111 }