1 /*
   2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package rdma.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.InetAddress;
  31 import java.net.InetSocketAddress;
  32 import java.net.Socket;
  33 import java.net.SocketAddress;
  34 import java.net.SocketOption;
  35 import java.net.StandardSocketOptions;
  36 import java.nio.ByteBuffer;
  37 import java.nio.channels.AlreadyBoundException;
  38 import java.nio.channels.AlreadyConnectedException;
  39 import java.nio.channels.AsynchronousCloseException;
  40 import java.nio.channels.ClosedChannelException;
  41 import java.nio.channels.ConnectionPendingException;
  42 import java.nio.channels.NoConnectionPendingException;
  43 import java.nio.channels.NotYetConnectedException;
  44 import java.nio.channels.SelectionKey;
  45 import java.nio.channels.SocketChannel;
  46 import java.nio.channels.spi.SelectorProvider;
  47 import java.util.Collections;
  48 import java.util.HashSet;
  49 import java.util.Objects;
  50 import java.util.Set;
  51 import java.util.concurrent.locks.ReentrantLock;
  52 import sun.net.ext.RdmaSocketOptions;
  53 import sun.nio.ch.ExtendedSocketOption;
  54 import sun.nio.ch.IOStatus;
  55 import sun.nio.ch.IOUtil;
  56 import sun.nio.ch.Net;
  57 import sun.nio.ch.NativeThread;
  58 import sun.nio.ch.SelChImpl;
  59 import sun.nio.ch.SelectionKeyImpl;
  60 
  61 public class RdmaSocketChannelImpl
  62     extends SocketChannel
  63     implements SelChImpl
  64 {
  65     private static RdmaSocketDispatcher nd;
  66     private final FileDescriptor fd;
  67     private final int fdVal;
  68 
  69     private final ReentrantLock readLock = new ReentrantLock();
  70     private final ReentrantLock writeLock = new ReentrantLock();
  71 
  72     private final Object stateLock = new Object();
  73 
  74     private volatile boolean isInputClosed;
  75     private volatile boolean isOutputClosed;
  76 
  77     private boolean isReuseAddress;
  78 
  79     private static final int ST_UNCONNECTED = 0;
  80     private static final int ST_CONNECTIONPENDING = 1;
  81     private static final int ST_CONNECTED = 2;
  82     private static final int ST_CLOSING = 3;
  83     private static final int ST_KILLPENDING = 4;
  84     private static final int ST_KILLED = 5;
  85     private volatile int state;  // need stateLock to change
  86 
  87     private long readerThread;
  88     private long writerThread;
  89 
  90     private InetSocketAddress localAddress;
  91     private InetSocketAddress remoteAddress;
  92 
  93     private Socket socket;
  94 
  95     protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException {
  96         super(sp);
  97         this.fd = RdmaNet.socket(true);
  98         this.fdVal = IOUtil.fdVal(fd);
  99     }
 100 
 101     protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
 102         throws IOException
 103     {
 104         super(sp);
 105         this.fd = fd;
 106         this.fdVal = IOUtil.fdVal(fd);
 107         if (bound) {
 108             synchronized (stateLock) {
 109                 this.localAddress = RdmaNet.localAddress(fd);
 110             }
 111         }
 112     }
 113 
 114     RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
 115         throws IOException
 116     {
 117         super(sp);
 118         this.fd = fd;
 119         this.fdVal = IOUtil.fdVal(fd);
 120         synchronized (stateLock) {
 121             this.localAddress = RdmaNet.localAddress(fd);
 122             this.remoteAddress = isa;
 123             this.state = ST_CONNECTED;
 124         }
 125     }
 126 
 127     private void ensureOpen() throws ClosedChannelException {
 128         if (!isOpen())
 129             throw new ClosedChannelException();
 130     }
 131 
 132     private void ensureOpenAndConnected() throws ClosedChannelException {
 133         int state = this.state;
 134         if (state < ST_CONNECTED) {
 135             throw new NotYetConnectedException();
 136         } else if (state > ST_CONNECTED) {
 137             throw new ClosedChannelException();
 138         }
 139     }
 140 
 141     @Override
 142     public Socket socket() {
 143         synchronized (stateLock) {
 144             if (socket == null)
 145                 socket = RdmaSocketAdaptor.create(this);
 146             return socket;
 147         }
 148     }
 149 
 150     @Override
 151     public SocketAddress getLocalAddress() throws IOException {
 152         synchronized (stateLock) {
 153             ensureOpen();
 154             return RdmaNet.getRevealedLocalAddress(localAddress);
 155         }
 156     }
 157 
 158     @Override
 159     public SocketAddress getRemoteAddress() throws IOException {
 160         synchronized (stateLock) {
 161             ensureOpen();
 162             return remoteAddress;
 163         }
 164     }
 165 
 166     @Override
 167     public <T> SocketChannel setOption(SocketOption<T> name, T value)
 168         throws IOException
 169     {
 170         Objects.requireNonNull(name);
 171         if (!supportedOptions().contains(name))
 172             throw new UnsupportedOperationException("'" + name + "' not supported");
 173 
 174         synchronized (stateLock) {
 175             ensureOpen();
 176 
 177             if (name == StandardSocketOptions.SO_REUSEADDR &&
 178                     RdmaNet.useExclusiveBind()) {
 179                 isReuseAddress = (Boolean)value;
 180                 return this;
 181             }
 182 
 183             if (isConnected() && (name == StandardSocketOptions.SO_REUSEADDR ||
 184                     name == StandardSocketOptions.SO_SNDBUF ||
 185                     name == StandardSocketOptions.SO_RCVBUF))
 186                 throw new UnsupportedOperationException(
 187                         "RDMA socket channel cannot set the socket option "
 188                         + name.toString() + " after connect.");
 189 
 190             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value);
 191             return this;
 192         }
 193     }
 194 
 195     @Override
 196     @SuppressWarnings("unchecked")
 197     public <T> T getOption(SocketOption<T> name)
 198         throws IOException
 199     {
 200         Objects.requireNonNull(name);
 201         if (!supportedOptions().contains(name))
 202             throw new UnsupportedOperationException("'" + name + "' not supported");
 203 
 204         synchronized (stateLock) {
 205             ensureOpen();
 206 
 207             if (name == StandardSocketOptions.SO_REUSEADDR
 208                     && RdmaNet.useExclusiveBind()) {
 209                 return (T)Boolean.valueOf(isReuseAddress);
 210             }
 211 
 212             return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name);
 213         }
 214     }
 215 
 216     private static class DefaultOptionsHolder {
 217         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 218 
 219         private static Set<SocketOption<?>> defaultOptions() {
 220             HashSet<SocketOption<?>> set = new HashSet<>();
 221             set.add(StandardSocketOptions.SO_SNDBUF);
 222             set.add(StandardSocketOptions.SO_RCVBUF);
 223             set.add(StandardSocketOptions.SO_REUSEADDR);
 224             set.add(StandardSocketOptions.SO_LINGER);
 225             set.add(StandardSocketOptions.TCP_NODELAY);
 226             RdmaSocketOptions rdmaOptions =
 227                     RdmaSocketOptions.getInstance();
 228             set.addAll(rdmaOptions.options());
 229             return Collections.unmodifiableSet(set);
 230         }
 231     }
 232 
 233     public Set<SocketOption<?>> supportedOptions() {
 234          return DefaultOptionsHolder.defaultOptions;
 235     }
 236 
 237     private void beginRead(boolean blocking) throws ClosedChannelException {
 238         if (blocking) {
 239             // set hook for Thread.interrupt
 240             begin();
 241 
 242             synchronized (stateLock) {
 243                 ensureOpenAndConnected();
 244                 // record thread so it can be signalled if needed
 245                 readerThread = NativeThread.current();
 246             }
 247         } else {
 248             ensureOpenAndConnected();
 249         }
 250     }
 251     private void endRead(boolean blocking, boolean completed)
 252         throws AsynchronousCloseException
 253     {
 254         if (blocking) {
 255             synchronized (stateLock) {
 256                 readerThread = 0;
 257                 // notify any thread waiting in implCloseSelectableChannel
 258                 if (state == ST_CLOSING) {
 259                     stateLock.notifyAll();
 260                 }
 261             }
 262             // remove hook for Thread.interrupt
 263             end(completed);
 264         }
 265     }
 266 
 267     @Override
 268     public int read(ByteBuffer buf) throws IOException {
 269         Objects.requireNonNull(buf);
 270 
 271         readLock.lock();
 272         try {
 273             boolean blocking = isBlocking();
 274             int n = 0;
 275             try {
 276                 beginRead(blocking);
 277 
 278                 // check if input is shutdown
 279                 if (isInputClosed)
 280                     return IOStatus.EOF;
 281 
 282                 if (blocking) {
 283                     do {
 284                         n = IOUtil.read(fd, buf, -1, nd);
 285                     } while (n == IOStatus.INTERRUPTED && isOpen());
 286                 } else {
 287                     n = IOUtil.read(fd, buf, -1, nd);
 288                 }
 289             } finally {
 290                 endRead(blocking, n > 0);
 291                 if (n <= 0 && isInputClosed)
 292                     return IOStatus.EOF;
 293             }
 294             return IOStatus.normalize(n);
 295         } finally {
 296             readLock.unlock();
 297         }
 298     }
 299 
 300     @Override
 301     public long read(ByteBuffer[] dsts, int offset, int length)
 302         throws IOException
 303     {
 304         Objects.checkFromIndexSize(offset, length, dsts.length);
 305 
 306         readLock.lock();
 307         try {
 308             boolean blocking = isBlocking();
 309             long n = 0;
 310             try {
 311                 beginRead(blocking);
 312 
 313                 // check if input is shutdown
 314                 if (isInputClosed)
 315                     return IOStatus.EOF;
 316 
 317                 if (blocking) {
 318                     do {
 319                         n = IOUtil.read(fd, dsts, offset, length, nd);
 320                     } while (n == IOStatus.INTERRUPTED && isOpen());
 321                 } else {
 322                     n = IOUtil.read(fd, dsts, offset, length, nd);
 323                 }
 324             } finally {
 325                 endRead(blocking, n > 0);
 326                 if (n <= 0 && isInputClosed)
 327                     return IOStatus.EOF;
 328             }
 329             return IOStatus.normalize(n);
 330         } finally {
 331             readLock.unlock();
 332         }
 333     }
 334 
 335     private void beginWrite(boolean blocking) throws ClosedChannelException {
 336         if (blocking) {
 337             // set hook for Thread.interrupt
 338             begin();
 339 
 340             synchronized (stateLock) {
 341                 ensureOpenAndConnected();
 342                 if (isOutputClosed)
 343                     throw new ClosedChannelException();
 344                 // record thread so it can be signalled if needed
 345                 writerThread = NativeThread.current();
 346             }
 347         } else {
 348             ensureOpenAndConnected();
 349         }
 350     }
 351 
 352     private void endWrite(boolean blocking, boolean completed)
 353         throws AsynchronousCloseException
 354     {
 355         if (blocking) {
 356             synchronized (stateLock) {
 357                 writerThread = 0;
 358                 // notify any thread waiting in implCloseSelectableChannel
 359                 if (state == ST_CLOSING) {
 360                     stateLock.notifyAll();
 361                 }
 362             }
 363             // remove hook for Thread.interrupt
 364             end(completed);
 365         }
 366     }
 367 
 368     @Override
 369     public int write(ByteBuffer buf) throws IOException {
 370         Objects.requireNonNull(buf);
 371 
 372         writeLock.lock();
 373         try {
 374             boolean blocking = isBlocking();
 375             int n = 0;
 376             try {
 377                 beginWrite(blocking);
 378                 if (blocking) {
 379                     do {
 380                         n = IOUtil.write(fd, buf, -1, nd);
 381                     } while (n == IOStatus.INTERRUPTED && isOpen());
 382                 } else {
 383                     n = IOUtil.write(fd, buf, -1, nd);
 384                 }
 385             } finally {
 386                 endWrite(blocking, n > 0);
 387                 if (n <= 0 && isOutputClosed)
 388                     throw new AsynchronousCloseException();
 389             }
 390             return IOStatus.normalize(n);
 391         } finally {
 392             writeLock.unlock();
 393         }
 394     }
 395 
 396     @Override
 397     public long write(ByteBuffer[] srcs, int offset, int length)
 398         throws IOException
 399     {
 400         Objects.checkFromIndexSize(offset, length, srcs.length);
 401 
 402         writeLock.lock();
 403         try {
 404             boolean blocking = isBlocking();
 405             long n = 0;
 406             try {
 407                 beginWrite(blocking);
 408                 if (blocking) {
 409                     do {
 410                         n = IOUtil.write(fd, srcs, offset, length, nd);
 411                     } while (n == IOStatus.INTERRUPTED && isOpen());
 412                 } else {
 413                     n = IOUtil.write(fd, srcs, offset, length, nd);
 414                 }
 415             } finally {
 416                 endWrite(blocking, n > 0);
 417                 if (n <= 0 && isOutputClosed)
 418                     throw new AsynchronousCloseException();
 419             }
 420             return IOStatus.normalize(n);
 421         } finally {
 422             writeLock.unlock();
 423         }
 424     }
 425 
 426     int sendOutOfBandData(byte b) throws IOException {
 427         writeLock.lock();
 428         try {
 429             boolean blocking = isBlocking();
 430             int n = 0;
 431             try {
 432                 beginWrite(blocking);
 433                 if (blocking) {
 434                     do {
 435                         n = sendOutOfBandData(fd, b);
 436                     } while (n == IOStatus.INTERRUPTED && isOpen());
 437                 } else {
 438                     n = sendOutOfBandData(fd, b);
 439                 }
 440             } finally {
 441                 endWrite(blocking, n > 0);
 442                 if (n <= 0 && isOutputClosed)
 443                     throw new AsynchronousCloseException();
 444             }
 445             return IOStatus.normalize(n);
 446         } finally {
 447             writeLock.unlock();
 448         }
 449     }
 450 
 451     @Override
 452     protected void implConfigureBlocking(boolean block) throws IOException {
 453         readLock.lock();
 454         try {
 455             writeLock.lock();
 456             try {
 457                 synchronized (stateLock) {
 458                     ensureOpen();
 459                     RdmaNet.configureBlocking(fd, block);
 460                 }
 461             } finally {
 462                 writeLock.unlock();
 463             }
 464         } finally {
 465             readLock.unlock();
 466         }
 467     }
 468 
 469     InetSocketAddress localAddress() {
 470         synchronized (stateLock) {
 471             return localAddress;
 472         }
 473     }
 474 
 475     InetSocketAddress remoteAddress() {
 476         synchronized (stateLock) {
 477             return remoteAddress;
 478         }
 479     }
 480 
 481     @Override
 482     public SocketChannel bind(SocketAddress local) throws IOException {
 483         readLock.lock();
 484         try {
 485             writeLock.lock();
 486             try {
 487                 synchronized (stateLock) {
 488                     ensureOpen();
 489                     if (state == ST_CONNECTIONPENDING)
 490                         throw new ConnectionPendingException();
 491                     if (localAddress != null)
 492                         throw new AlreadyBoundException();
 493                     InetSocketAddress isa = (local == null) ?
 494                         new InetSocketAddress(0) : RdmaNet.checkAddress(local);
 495                     SecurityManager sm = System.getSecurityManager();
 496                     if (sm != null) {
 497                         sm.checkListen(isa.getPort());
 498                     }
 499                     RdmaNet.bind(fd, isa.getAddress(), isa.getPort());
 500                     localAddress = RdmaNet.localAddress(fd);
 501                 }
 502             } finally {
 503                 writeLock.unlock();
 504             }
 505         } finally {
 506             readLock.unlock();
 507         }
 508         return this;
 509     }
 510 
 511     @Override
 512     public boolean isConnected() {
 513         return (state == ST_CONNECTED);
 514     }
 515 
 516     @Override
 517     public boolean isConnectionPending() {
 518         return (state == ST_CONNECTIONPENDING);
 519     }
 520 
 521     private void beginConnect(boolean blocking, InetSocketAddress isa)
 522         throws IOException
 523     {
 524         if (blocking) {
 525             // set hook for Thread.interrupt
 526             begin();
 527         }
 528         synchronized (stateLock) {
 529             ensureOpen();
 530             int state = this.state;
 531             if (state == ST_CONNECTED)
 532                 throw new AlreadyConnectedException();
 533             if (state == ST_CONNECTIONPENDING)
 534                 throw new ConnectionPendingException();
 535             assert state == ST_UNCONNECTED;
 536             this.state = ST_CONNECTIONPENDING;
 537 
 538             remoteAddress = isa;
 539 
 540             if (blocking) {
 541                 // record thread so it can be signalled if needed
 542                 readerThread = NativeThread.current();
 543             }
 544         }
 545     }
 546 
 547     private void endConnect(boolean blocking, boolean completed)
 548         throws IOException
 549     {
 550         endRead(blocking, completed);
 551 
 552         if (completed) {
 553             synchronized (stateLock) {
 554                 if (state == ST_CONNECTIONPENDING) {
 555                     localAddress = RdmaNet.localAddress(fd);
 556                     state = ST_CONNECTED;
 557                 }
 558             }
 559         }
 560     }
 561 
 562     @Override
 563     public boolean connect(SocketAddress sa) throws IOException {
 564         InetSocketAddress isa = RdmaNet.checkAddress(sa);
 565         SecurityManager sm = System.getSecurityManager();
 566         if (sm != null)
 567             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 568 
 569         InetAddress ia = isa.getAddress();
 570         if (ia.isAnyLocalAddress())
 571             ia = InetAddress.getLocalHost();
 572 
 573         try {
 574             readLock.lock();
 575             try {
 576                 writeLock.lock();
 577                 try {
 578                     int n = 0;
 579                     boolean blocking = isBlocking();
 580                     try {
 581                         beginConnect(blocking, isa);
 582                         do {
 583                             n = RdmaNet.connect(fd, ia, isa.getPort());
 584                         } while (n == IOStatus.INTERRUPTED && isOpen());
 585                     } finally {
 586                         endConnect(blocking, (n > 0));
 587                     }
 588                     assert IOStatus.check(n);
 589                     return n > 0;
 590                 } finally {
 591                     writeLock.unlock();
 592                 }
 593             } finally {
 594                 readLock.unlock();
 595             }
 596         } catch (IOException ioe) {
 597             // connect failed, close the channel
 598             close();
 599             throw ioe;
 600         }
 601     }
 602 
 603     private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
 604         if (blocking) {
 605             // set hook for Thread.interrupt
 606             begin();
 607         }
 608         synchronized (stateLock) {
 609             ensureOpen();
 610             if (state != ST_CONNECTIONPENDING)
 611                 throw new NoConnectionPendingException();
 612             if (blocking) {
 613                 // record thread so it can be signalled if needed
 614                 readerThread = NativeThread.current();
 615             }
 616         }
 617     }
 618 
 619     private void endFinishConnect(boolean blocking, boolean completed)
 620         throws IOException
 621     {
 622         endRead(blocking, completed);
 623 
 624         if (completed) {
 625             synchronized (stateLock) {
 626                 if (state == ST_CONNECTIONPENDING) {
 627                     localAddress = RdmaNet.localAddress(fd);
 628                     state = ST_CONNECTED;
 629                 }
 630             }
 631         }
 632     }
 633 
 634     @Override
 635     public boolean finishConnect() throws IOException {
 636         try {
 637             readLock.lock();
 638             try {
 639                 writeLock.lock();
 640                 try {
 641                     // no-op if already connected
 642                     if (isConnected())
 643                         return true;
 644 
 645                     boolean blocking = isBlocking();
 646                     boolean connected = false;
 647                     try {
 648                         beginFinishConnect(blocking);
 649                         int n = 0;
 650                         if (blocking) {
 651                             do {
 652                                 n = checkConnect(fd, true);
 653                             } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
 654                         } else {
 655                             n = checkConnect(fd, false);
 656                         }
 657                         connected = (n > 0);
 658                     } finally {
 659                         endFinishConnect(blocking, connected);
 660                     }
 661                     assert (blocking && connected) ^ !blocking;
 662                     return connected;
 663                 } finally {
 664                     writeLock.unlock();
 665                 }
 666             } finally {
 667                 readLock.unlock();
 668             }
 669         } catch (IOException ioe) {
 670             // connect failed, close the channel
 671             close();
 672             throw ioe;
 673         }
 674     }
 675 
 676     @Override
 677     protected void implCloseSelectableChannel() throws IOException {
 678         assert !isOpen();
 679 
 680         boolean blocking;
 681         boolean connected;
 682         boolean interrupted = false;
 683 
 684         // set state to ST_CLOSING
 685         synchronized (stateLock) {
 686             assert state < ST_CLOSING;
 687             blocking = isBlocking();
 688             connected = (state == ST_CONNECTED);
 689             state = ST_CLOSING;
 690         }
 691 
 692         // wait for any outstanding I/O operations to complete
 693         if (blocking) {
 694             synchronized (stateLock) {
 695                 assert state == ST_CLOSING;
 696                 long reader = readerThread;
 697                 long writer = writerThread;
 698                 if (reader != 0 || writer != 0) {
 699                     nd.preClose(fd);
 700                     connected = false; // fd is no longer connected socket
 701 
 702                     if (reader != 0)
 703                         NativeThread.signal(reader);
 704                     if (writer != 0)
 705                         NativeThread.signal(writer);
 706 
 707                     // wait for blocking I/O operations to end
 708                     while (readerThread != 0 || writerThread != 0) {
 709                         try {
 710                             stateLock.wait();
 711                         } catch (InterruptedException e) {
 712                             interrupted = true;
 713                         }
 714                     }
 715                 }
 716             }
 717         } else {
 718             // non-blocking mode: wait for read/write to complete
 719             readLock.lock();
 720             try {
 721                 writeLock.lock();
 722                 writeLock.unlock();
 723             } finally {
 724                 readLock.unlock();
 725             }
 726         }
 727 
 728         // set state to ST_KILLPENDING
 729         synchronized (stateLock) {
 730             assert state == ST_CLOSING;
 731             // if connected and the channel is registered with a Selector then
 732             // shutdown the output if possible so that the peer reads EOF. If
 733             // SO_LINGER is enabled and set to a non-zero value then it needs to
 734             // be disabled so that the Selector does not wait when it closes
 735             // the socket.
 736             if (connected && isRegistered()) {
 737                 try {
 738                     SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER;
 739                     int interval = (int) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, opt);
 740                     if (interval != 0) {
 741                         if (interval > 0) {
 742                             // disable SO_LINGER
 743                             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, opt, -1);
 744                         }
 745                         RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 746                     }
 747                 } catch (IOException ignore) { }
 748             }
 749             state = ST_KILLPENDING;
 750         }
 751 
 752         // close socket if not registered with Selector
 753         if (!isRegistered())
 754             kill();
 755 
 756         // restore interrupt status
 757         if (interrupted)
 758             Thread.currentThread().interrupt();
 759     }
 760 
 761     @Override
 762     public void kill() throws IOException {
 763         synchronized (stateLock) {
 764             if (state == ST_KILLPENDING) {
 765                 state = ST_KILLED;
 766                 nd.close(fd);
 767             }
 768         }
 769     }
 770 
 771     @Override
 772     public SocketChannel shutdownInput() throws IOException {
 773         synchronized (stateLock) {
 774             ensureOpen();
 775             if (!isConnected())
 776                 throw new NotYetConnectedException();
 777             if (!isInputClosed) {
 778                 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD);
 779                 long thread = readerThread;
 780                 if (thread != 0)
 781                     NativeThread.signal(thread);
 782                 isInputClosed = true;
 783             }
 784             return this;
 785         }
 786     }
 787 
 788     @Override
 789     public SocketChannel shutdownOutput() throws IOException {
 790         synchronized (stateLock) {
 791             ensureOpen();
 792             if (!isConnected())
 793                 throw new NotYetConnectedException();
 794             if (!isOutputClosed) {
 795                 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 796                 long thread = writerThread;
 797                 if (thread != 0)
 798                     NativeThread.signal(thread);
 799                 isOutputClosed = true;
 800             }
 801             return this;
 802         }
 803     }
 804 
 805     boolean isInputOpen() {
 806         return !isInputClosed;
 807     }
 808 
 809     boolean isOutputOpen() {
 810         return !isOutputClosed;
 811     }
 812 
 813     /**
 814      * Poll this channel's socket for reading up to the given timeout.
 815      * @return {@code true} if the socket is polled
 816      */
 817     boolean pollRead(long timeout) throws IOException {
 818         boolean blocking = isBlocking();
 819         assert Thread.holdsLock(blockingLock()) && blocking;
 820 
 821         readLock.lock();
 822         try {
 823             boolean polled = false;
 824             try {
 825                 beginRead(blocking);
 826                 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout);
 827                 polled = (events != 0);
 828             } finally {
 829                 endRead(blocking, polled);
 830             }
 831             return polled;
 832         } finally {
 833             readLock.unlock();
 834         }
 835     }
 836 
 837     /**
 838      * Poll this channel's socket for a connection, up to the given timeout.
 839      * @return {@code true} if the socket is polled
 840      */
 841     boolean pollConnected(long timeout) throws IOException {
 842         boolean blocking = isBlocking();
 843         assert Thread.holdsLock(blockingLock()) && blocking;
 844 
 845         readLock.lock();
 846         try {
 847             writeLock.lock();
 848             try {
 849                 boolean polled = false;
 850                 try {
 851                     beginFinishConnect(blocking);
 852                     int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout);
 853                     polled = (events != 0);
 854                 } finally {
 855                     // invoke endFinishConnect with completed = false so that
 856                     // the state is not changed to ST_CONNECTED. The socket
 857                     // adaptor will use finishConnect to finish.
 858                     endFinishConnect(blocking, /*completed*/false);
 859                 }
 860                 return polled;
 861             } finally {
 862                 writeLock.unlock();
 863             }
 864         } finally {
 865             readLock.unlock();
 866         }
 867     }
 868 
 869     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
 870         int intOps = ski.nioInterestOps();
 871         int oldOps = ski.nioReadyOps();
 872         int newOps = initialOps;
 873 
 874         if ((ops & Net.POLLNVAL) != 0) {
 875             return false;
 876         }
 877 
 878         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 879             newOps = intOps;
 880             ski.nioReadyOps(newOps);
 881             return (newOps & ~oldOps) != 0;
 882         }
 883 
 884         boolean connected = isConnected();
 885         if (((ops & Net.POLLIN) != 0) &&
 886             ((intOps & SelectionKey.OP_READ) != 0) && connected)
 887             newOps |= SelectionKey.OP_READ;
 888 
 889         if (((ops & Net.POLLCONN) != 0) &&
 890             ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
 891             newOps |= SelectionKey.OP_CONNECT;
 892 
 893         if (((ops & Net.POLLOUT) != 0) &&
 894             ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
 895             newOps |= SelectionKey.OP_WRITE;
 896 
 897         ski.nioReadyOps(newOps);
 898         return (newOps & ~oldOps) != 0;
 899     }
 900 
 901     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 902         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 903     }
 904 
 905     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 906         return translateReadyOps(ops, 0, ski);
 907     }
 908 
 909     public int translateInterestOps(int ops) {
 910         int newOps = 0;
 911         if ((ops & SelectionKey.OP_READ) != 0)
 912             newOps |= Net.POLLIN;
 913         if ((ops & SelectionKey.OP_WRITE) != 0)
 914             newOps |= Net.POLLOUT;
 915         if ((ops & SelectionKey.OP_CONNECT) != 0)
 916             newOps |= Net.POLLCONN;
 917         return newOps;
 918     }
 919 
 920     public FileDescriptor getFD() {
 921         return fd;
 922     }
 923 
 924     public int getFDVal() {
 925         return fdVal;
 926     }
 927 
 928     @Override
 929     public String toString() {
 930         StringBuilder sb = new StringBuilder();
 931         sb.append(this.getClass().getSuperclass().getName());
 932         sb.append('[');
 933         if (!isOpen())
 934             sb.append("closed");
 935         else {
 936             synchronized (stateLock) {
 937                 switch (state) {
 938                 case ST_UNCONNECTED:
 939                     sb.append("unconnected");
 940                     break;
 941                 case ST_CONNECTIONPENDING:
 942                     sb.append("connection-pending");
 943                     break;
 944                 case ST_CONNECTED:
 945                     sb.append("connected");
 946                     if (isInputClosed)
 947                         sb.append(" ishut");
 948                     if (isOutputClosed)
 949                         sb.append(" oshut");
 950                     break;
 951                 }
 952                 InetSocketAddress addr = localAddress();
 953                 if (addr != null) {
 954                     sb.append(" local=");
 955                     sb.append(RdmaNet.getRevealedLocalAddressAsString(addr));
 956                 }
 957                 if (remoteAddress() != null) {
 958                     sb.append(" remote=");
 959                     sb.append(remoteAddress().toString());
 960                 }
 961             }
 962         }
 963         sb.append(']');
 964         return sb.toString();
 965     }
 966 
 967     // -- Native methods --
 968 
 969     private static native void initIDs();
 970 
 971     private static native int checkConnect(FileDescriptor fd, boolean block)
 972         throws IOException;
 973 
 974     private static native int sendOutOfBandData(FileDescriptor fd, byte data)
 975         throws IOException;
 976 
 977     static {
 978         IOUtil.load();
 979         initIDs();
 980         nd = new RdmaSocketDispatcher();
 981     }
 982 
 983 }