1 /*
   2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package rdma.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.InetAddress;
  31 import java.net.InetSocketAddress;
  32 import java.net.Socket;
  33 import java.net.SocketAddress;
  34 import java.net.SocketOption;
  35 import java.net.StandardSocketOptions;
  36 import java.nio.ByteBuffer;
  37 import java.nio.channels.AlreadyBoundException;
  38 import java.nio.channels.AlreadyConnectedException;
  39 import java.nio.channels.AsynchronousCloseException;
  40 import java.nio.channels.ClosedChannelException;
  41 import java.nio.channels.ConnectionPendingException;
  42 import java.nio.channels.NoConnectionPendingException;
  43 import java.nio.channels.NotYetConnectedException;
  44 import java.nio.channels.SelectionKey;
  45 import java.nio.channels.SocketChannel;
  46 import java.nio.channels.spi.SelectorProvider;
  47 import java.util.Collections;
  48 import java.util.HashSet;
  49 import java.util.Objects;
  50 import java.util.Set;
  51 import java.util.concurrent.locks.ReentrantLock;
  52 import sun.net.ext.RdmaSocketOptions;
  53 import sun.nio.ch.IOStatus;
  54 import sun.nio.ch.IOUtil;
  55 import sun.nio.ch.Net;
  56 import sun.nio.ch.NativeThread;
  57 import sun.nio.ch.SelChImpl;
  58 import sun.nio.ch.SelectionKeyImpl;
  59 
  60 public class RdmaSocketChannelImpl
  61     extends SocketChannel
  62     implements SelChImpl
  63 {
  64     private static RdmaSocketDispatcher nd;
  65     private final FileDescriptor fd;
  66     private final int fdVal;
  67 
  68     private final ReentrantLock readLock = new ReentrantLock();
  69     private final ReentrantLock writeLock = new ReentrantLock();
  70 
  71     private final Object stateLock = new Object();
  72 
  73     private volatile boolean isInputClosed;
  74     private volatile boolean isOutputClosed;
  75 
  76     private boolean isReuseAddress;
  77 
  78     private static final int ST_UNCONNECTED = 0;
  79     private static final int ST_CONNECTIONPENDING = 1;
  80     private static final int ST_CONNECTED = 2;
  81     private static final int ST_CLOSING = 3;
  82     private static final int ST_KILLPENDING = 4;
  83     private static final int ST_KILLED = 5;
  84     private volatile int state;  // need stateLock to change
  85 
  86     private long readerThread;
  87     private long writerThread;
  88 
  89     private InetSocketAddress localAddress;
  90     private InetSocketAddress remoteAddress;
  91 
  92     private Socket socket;
  93 
  94     protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException {
  95         super(sp);
  96         this.fd = RdmaNet.socket(true);
  97         this.fdVal = IOUtil.fdVal(fd);
  98     }
  99 
 100     protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
 101         throws IOException
 102     {
 103         super(sp);
 104         this.fd = fd;
 105         this.fdVal = IOUtil.fdVal(fd);
 106         if (bound) {
 107             synchronized (stateLock) {
 108                 this.localAddress = RdmaNet.localAddress(fd);
 109             }
 110         }
 111     }
 112 
 113     RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
 114         throws IOException
 115     {
 116         super(sp);
 117         this.fd = fd;
 118         this.fdVal = IOUtil.fdVal(fd);
 119         synchronized (stateLock) {
 120             this.localAddress = RdmaNet.localAddress(fd);
 121             this.remoteAddress = isa;
 122             this.state = ST_CONNECTED;
 123         }
 124     }
 125 
 126     private void ensureOpen() throws ClosedChannelException {
 127         if (!isOpen())
 128             throw new ClosedChannelException();
 129     }
 130 
 131     private void ensureOpenAndConnected() throws ClosedChannelException {
 132         int state = this.state;
 133         if (state < ST_CONNECTED) {
 134             throw new NotYetConnectedException();
 135         } else if (state > ST_CONNECTED) {
 136             throw new ClosedChannelException();
 137         }
 138     }
 139 
 140     @Override
 141     public Socket socket() {
 142         synchronized (stateLock) {
 143             if (socket == null)
 144                 socket = RdmaSocketAdaptor.create(this);
 145             return socket;
 146         }
 147     }
 148 
 149     @Override
 150     public SocketAddress getLocalAddress() throws IOException {
 151         synchronized (stateLock) {
 152             ensureOpen();
 153             return RdmaNet.getRevealedLocalAddress(localAddress);
 154         }
 155     }
 156 
 157     @Override
 158     public SocketAddress getRemoteAddress() throws IOException {
 159         synchronized (stateLock) {
 160             ensureOpen();
 161             return remoteAddress;
 162         }
 163     }
 164 
 165     @Override
 166     public <T> SocketChannel setOption(SocketOption<T> name, T value)
 167         throws IOException
 168     {
 169         Objects.requireNonNull(name);
 170         if (!supportedOptions().contains(name))
 171             throw new UnsupportedOperationException("'" + name + "' not supported");
 172 
 173         synchronized (stateLock) {
 174             ensureOpen();
 175 
 176             if (name == StandardSocketOptions.SO_REUSEADDR &&
 177                     RdmaNet.useExclusiveBind()) {
 178                 isReuseAddress = (Boolean)value;
 179                 return this;
 180             }
 181 
 182             if (isConnected() && (name == StandardSocketOptions.SO_REUSEADDR ||
 183                     name == StandardSocketOptions.SO_SNDBUF ||
 184                     name == StandardSocketOptions.SO_RCVBUF))
 185                 throw new UnsupportedOperationException(
 186                         "RDMA socket channel cannot set the socket option "
 187                         + name.toString() + " after connect.");
 188 
 189             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value);
 190             return this;
 191         }
 192     }
 193 
 194     @Override
 195     @SuppressWarnings("unchecked")
 196     public <T> T getOption(SocketOption<T> name)
 197         throws IOException
 198     {
 199         Objects.requireNonNull(name);
 200         if (!supportedOptions().contains(name))
 201             throw new UnsupportedOperationException("'" + name + "' not supported");
 202 
 203         synchronized (stateLock) {
 204             ensureOpen();
 205 
 206             if (name == StandardSocketOptions.SO_REUSEADDR
 207                     && RdmaNet.useExclusiveBind()) {
 208                 return (T)Boolean.valueOf(isReuseAddress);
 209             }
 210 
 211             return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name);
 212         }
 213     }
 214 
 215     private static class DefaultOptionsHolder {
 216         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 217 
 218         private static Set<SocketOption<?>> defaultOptions() {
 219             HashSet<SocketOption<?>> set = new HashSet<>();
 220             set.add(StandardSocketOptions.SO_SNDBUF);
 221             set.add(StandardSocketOptions.SO_RCVBUF);
 222             set.add(StandardSocketOptions.SO_REUSEADDR);
 223             set.add(StandardSocketOptions.SO_LINGER);
 224             set.add(StandardSocketOptions.TCP_NODELAY);
 225             RdmaSocketOptions rdmaOptions =
 226                     RdmaSocketOptions.getInstance();
 227             set.addAll(rdmaOptions.options());
 228             return Collections.unmodifiableSet(set);
 229         }
 230     }
 231 
 232     public Set<SocketOption<?>> supportedOptions() {
 233          return DefaultOptionsHolder.defaultOptions;
 234     }
 235 
 236     private void beginRead(boolean blocking) throws ClosedChannelException {
 237         if (blocking) {
 238             // set hook for Thread.interrupt
 239             begin();
 240 
 241             synchronized (stateLock) {
 242                 ensureOpenAndConnected();
 243                 // record thread so it can be signalled if needed
 244                 readerThread = NativeThread.current();
 245             }
 246         } else {
 247             ensureOpenAndConnected();
 248         }
 249     }
 250     private void endRead(boolean blocking, boolean completed)
 251         throws AsynchronousCloseException
 252     {
 253         if (blocking) {
 254             synchronized (stateLock) {
 255                 readerThread = 0;
 256                 // notify any thread waiting in implCloseSelectableChannel
 257                 if (state == ST_CLOSING) {
 258                     stateLock.notifyAll();
 259                 }
 260             }
 261             // remove hook for Thread.interrupt
 262             end(completed);
 263         }
 264     }
 265 
 266     @Override
 267     public int read(ByteBuffer buf) throws IOException {
 268         Objects.requireNonNull(buf);
 269 
 270         readLock.lock();
 271         try {
 272             boolean blocking = isBlocking();
 273             int n = 0;
 274             try {
 275                 beginRead(blocking);
 276 
 277                 // check if input is shutdown
 278                 if (isInputClosed)
 279                     return IOStatus.EOF;
 280 
 281                 if (blocking) {
 282                     do {
 283                         n = IOUtil.read(fd, buf, -1, nd);
 284                     } while (n == IOStatus.INTERRUPTED && isOpen());
 285                 } else {
 286                     n = IOUtil.read(fd, buf, -1, nd);
 287                 }
 288             } finally {
 289                 endRead(blocking, n > 0);
 290                 if (n <= 0 && isInputClosed)
 291                     return IOStatus.EOF;
 292             }
 293             return IOStatus.normalize(n);
 294         } finally {
 295             readLock.unlock();
 296         }
 297     }
 298 
 299     @Override
 300     public long read(ByteBuffer[] dsts, int offset, int length)
 301         throws IOException
 302     {
 303         Objects.checkFromIndexSize(offset, length, dsts.length);
 304 
 305         readLock.lock();
 306         try {
 307             boolean blocking = isBlocking();
 308             long n = 0;
 309             try {
 310                 beginRead(blocking);
 311 
 312                 // check if input is shutdown
 313                 if (isInputClosed)
 314                     return IOStatus.EOF;
 315 
 316                 if (blocking) {
 317                     do {
 318                         n = IOUtil.read(fd, dsts, offset, length, nd);
 319                     } while (n == IOStatus.INTERRUPTED && isOpen());
 320                 } else {
 321                     n = IOUtil.read(fd, dsts, offset, length, nd);
 322                 }
 323             } finally {
 324                 endRead(blocking, n > 0);
 325                 if (n <= 0 && isInputClosed)
 326                     return IOStatus.EOF;
 327             }
 328             return IOStatus.normalize(n);
 329         } finally {
 330             readLock.unlock();
 331         }
 332     }
 333 
 334     private void beginWrite(boolean blocking) throws ClosedChannelException {
 335         if (blocking) {
 336             // set hook for Thread.interrupt
 337             begin();
 338 
 339             synchronized (stateLock) {
 340                 ensureOpenAndConnected();
 341                 if (isOutputClosed)
 342                     throw new ClosedChannelException();
 343                 // record thread so it can be signalled if needed
 344                 writerThread = NativeThread.current();
 345             }
 346         } else {
 347             ensureOpenAndConnected();
 348         }
 349     }
 350 
 351     private void endWrite(boolean blocking, boolean completed)
 352         throws AsynchronousCloseException
 353     {
 354         if (blocking) {
 355             synchronized (stateLock) {
 356                 writerThread = 0;
 357                 // notify any thread waiting in implCloseSelectableChannel
 358                 if (state == ST_CLOSING) {
 359                     stateLock.notifyAll();
 360                 }
 361             }
 362             // remove hook for Thread.interrupt
 363             end(completed);
 364         }
 365     }
 366 
 367     @Override
 368     public int write(ByteBuffer buf) throws IOException {
 369         Objects.requireNonNull(buf);
 370 
 371         writeLock.lock();
 372         try {
 373             boolean blocking = isBlocking();
 374             int n = 0;
 375             try {
 376                 beginWrite(blocking);
 377                 if (blocking) {
 378                     do {
 379                         n = IOUtil.write(fd, buf, -1, nd);
 380                     } while (n == IOStatus.INTERRUPTED && isOpen());
 381                 } else {
 382                     n = IOUtil.write(fd, buf, -1, nd);
 383                 }
 384             } finally {
 385                 endWrite(blocking, n > 0);
 386                 if (n <= 0 && isOutputClosed)
 387                     throw new AsynchronousCloseException();
 388             }
 389             return IOStatus.normalize(n);
 390         } finally {
 391             writeLock.unlock();
 392         }
 393     }
 394 
 395     @Override
 396     public long write(ByteBuffer[] srcs, int offset, int length)
 397         throws IOException
 398     {
 399         Objects.checkFromIndexSize(offset, length, srcs.length);
 400 
 401         writeLock.lock();
 402         try {
 403             boolean blocking = isBlocking();
 404             long n = 0;
 405             try {
 406                 beginWrite(blocking);
 407                 if (blocking) {
 408                     do {
 409                         n = IOUtil.write(fd, srcs, offset, length, nd);
 410                     } while (n == IOStatus.INTERRUPTED && isOpen());
 411                 } else {
 412                     n = IOUtil.write(fd, srcs, offset, length, nd);
 413                 }
 414             } finally {
 415                 endWrite(blocking, n > 0);
 416                 if (n <= 0 && isOutputClosed)
 417                     throw new AsynchronousCloseException();
 418             }
 419             return IOStatus.normalize(n);
 420         } finally {
 421             writeLock.unlock();
 422         }
 423     }
 424 
 425     int sendOutOfBandData(byte b) throws IOException {
 426         writeLock.lock();
 427         try {
 428             boolean blocking = isBlocking();
 429             int n = 0;
 430             try {
 431                 beginWrite(blocking);
 432                 if (blocking) {
 433                     do {
 434                         n = sendOutOfBandData(fd, b);
 435                     } while (n == IOStatus.INTERRUPTED && isOpen());
 436                 } else {
 437                     n = sendOutOfBandData(fd, b);
 438                 }
 439             } finally {
 440                 endWrite(blocking, n > 0);
 441                 if (n <= 0 && isOutputClosed)
 442                     throw new AsynchronousCloseException();
 443             }
 444             return IOStatus.normalize(n);
 445         } finally {
 446             writeLock.unlock();
 447         }
 448     }
 449 
 450     @Override
 451     protected void implConfigureBlocking(boolean block) throws IOException {
 452         readLock.lock();
 453         try {
 454             writeLock.lock();
 455             try {
 456                 synchronized (stateLock) {
 457                     ensureOpen();
 458                     RdmaNet.configureBlocking(fd, block);
 459                 }
 460             } finally {
 461                 writeLock.unlock();
 462             }
 463         } finally {
 464             readLock.unlock();
 465         }
 466     }
 467 
 468     InetSocketAddress localAddress() {
 469         synchronized (stateLock) {
 470             return localAddress;
 471         }
 472     }
 473 
 474     InetSocketAddress remoteAddress() {
 475         synchronized (stateLock) {
 476             return remoteAddress;
 477         }
 478     }
 479 
 480     @Override
 481     public SocketChannel bind(SocketAddress local) throws IOException {
 482         readLock.lock();
 483         try {
 484             writeLock.lock();
 485             try {
 486                 synchronized (stateLock) {
 487                     ensureOpen();
 488                     if (state == ST_CONNECTIONPENDING)
 489                         throw new ConnectionPendingException();
 490                     if (localAddress != null)
 491                         throw new AlreadyBoundException();
 492                     InetSocketAddress isa = (local == null) ?
 493                         new InetSocketAddress(0) : RdmaNet.checkAddress(local);
 494                     SecurityManager sm = System.getSecurityManager();
 495                     if (sm != null) {
 496                         sm.checkListen(isa.getPort());
 497                     }
 498                     RdmaNet.bind(fd, isa.getAddress(), isa.getPort());
 499                     localAddress = RdmaNet.localAddress(fd);
 500                 }
 501             } finally {
 502                 writeLock.unlock();
 503             }
 504         } finally {
 505             readLock.unlock();
 506         }
 507         return this;
 508     }
 509 
 510     @Override
 511     public boolean isConnected() {
 512         return (state == ST_CONNECTED);
 513     }
 514 
 515     @Override
 516     public boolean isConnectionPending() {
 517         return (state == ST_CONNECTIONPENDING);
 518     }
 519 
 520     private void beginConnect(boolean blocking, InetSocketAddress isa)
 521         throws IOException
 522     {
 523         if (blocking) {
 524             // set hook for Thread.interrupt
 525             begin();
 526         }
 527         synchronized (stateLock) {
 528             ensureOpen();
 529             int state = this.state;
 530             if (state == ST_CONNECTED)
 531                 throw new AlreadyConnectedException();
 532             if (state == ST_CONNECTIONPENDING)
 533                 throw new ConnectionPendingException();
 534             assert state == ST_UNCONNECTED;
 535             this.state = ST_CONNECTIONPENDING;
 536 
 537             remoteAddress = isa;
 538 
 539             if (blocking) {
 540                 // record thread so it can be signalled if needed
 541                 readerThread = NativeThread.current();
 542             }
 543         }
 544     }
 545 
 546     private void endConnect(boolean blocking, boolean completed)
 547         throws IOException
 548     {
 549         endRead(blocking, completed);
 550 
 551         if (completed) {
 552             synchronized (stateLock) {
 553                 if (state == ST_CONNECTIONPENDING) {
 554                     localAddress = RdmaNet.localAddress(fd);
 555                     state = ST_CONNECTED;
 556                 }
 557             }
 558         }
 559     }
 560 
 561     @Override
 562     public boolean connect(SocketAddress sa) throws IOException {
 563         InetSocketAddress isa = RdmaNet.checkAddress(sa);
 564         SecurityManager sm = System.getSecurityManager();
 565         if (sm != null)
 566             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 567 
 568         InetAddress ia = isa.getAddress();
 569         if (ia.isAnyLocalAddress())
 570             ia = InetAddress.getLocalHost();
 571 
 572         try {
 573             readLock.lock();
 574             try {
 575                 writeLock.lock();
 576                 try {
 577                     int n = 0;
 578                     boolean blocking = isBlocking();
 579                     try {
 580                         beginConnect(blocking, isa);
 581                         do {
 582                             n = RdmaNet.connect(fd, ia, isa.getPort());
 583                         } while (n == IOStatus.INTERRUPTED && isOpen());
 584                     } finally {
 585                         endConnect(blocking, (n > 0));
 586                     }
 587                     assert IOStatus.check(n);
 588                     return n > 0;
 589                 } finally {
 590                     writeLock.unlock();
 591                 }
 592             } finally {
 593                 readLock.unlock();
 594             }
 595         } catch (IOException ioe) {
 596             // connect failed, close the channel
 597             close();
 598             throw ioe;
 599         }
 600     }
 601 
 602     private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
 603         if (blocking) {
 604             // set hook for Thread.interrupt
 605             begin();
 606         }
 607         synchronized (stateLock) {
 608             ensureOpen();
 609             if (state != ST_CONNECTIONPENDING)
 610                 throw new NoConnectionPendingException();
 611             if (blocking) {
 612                 // record thread so it can be signalled if needed
 613                 readerThread = NativeThread.current();
 614             }
 615         }
 616     }
 617 
 618     private void endFinishConnect(boolean blocking, boolean completed)
 619         throws IOException
 620     {
 621         endRead(blocking, completed);
 622 
 623         if (completed) {
 624             synchronized (stateLock) {
 625                 if (state == ST_CONNECTIONPENDING) {
 626                     localAddress = RdmaNet.localAddress(fd);
 627                     state = ST_CONNECTED;
 628                 }
 629             }
 630         }
 631     }
 632 
 633     @Override
 634     public boolean finishConnect() throws IOException {
 635         try {
 636             readLock.lock();
 637             try {
 638                 writeLock.lock();
 639                 try {
 640                     // no-op if already connected
 641                     if (isConnected())
 642                         return true;
 643 
 644                     boolean blocking = isBlocking();
 645                     boolean connected = false;
 646                     try {
 647                         beginFinishConnect(blocking);
 648                         int n = 0;
 649                         if (blocking) {
 650                             do {
 651                                 n = checkConnect(fd, true);
 652                             } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
 653                         } else {
 654                             n = checkConnect(fd, false);
 655                         }
 656                         connected = (n > 0);
 657                     } finally {
 658                         endFinishConnect(blocking, connected);
 659                     }
 660                     assert (blocking && connected) ^ !blocking;
 661                     return connected;
 662                 } finally {
 663                     writeLock.unlock();
 664                 }
 665             } finally {
 666                 readLock.unlock();
 667             }
 668         } catch (IOException ioe) {
 669             // connect failed, close the channel
 670             close();
 671             throw ioe;
 672         }
 673     }
 674 
 675     @Override
 676     protected void implCloseSelectableChannel() throws IOException {
 677         assert !isOpen();
 678 
 679         boolean blocking;
 680         boolean connected;
 681         boolean interrupted = false;
 682 
 683         // set state to ST_CLOSING
 684         synchronized (stateLock) {
 685             assert state < ST_CLOSING;
 686             blocking = isBlocking();
 687             connected = (state == ST_CONNECTED);
 688             state = ST_CLOSING;
 689         }
 690 
 691         // wait for any outstanding I/O operations to complete
 692         if (blocking) {
 693             synchronized (stateLock) {
 694                 assert state == ST_CLOSING;
 695                 long reader = readerThread;
 696                 long writer = writerThread;
 697                 if (reader != 0 || writer != 0) {
 698                     nd.preClose(fd);
 699                     connected = false; // fd is no longer connected socket
 700 
 701                     if (reader != 0)
 702                         NativeThread.signal(reader);
 703                     if (writer != 0)
 704                         NativeThread.signal(writer);
 705 
 706                     // wait for blocking I/O operations to end
 707                     while (readerThread != 0 || writerThread != 0) {
 708                         try {
 709                             stateLock.wait();
 710                         } catch (InterruptedException e) {
 711                             interrupted = true;
 712                         }
 713                     }
 714                 }
 715             }
 716         } else {
 717             // non-blocking mode: wait for read/write to complete
 718             readLock.lock();
 719             try {
 720                 writeLock.lock();
 721                 writeLock.unlock();
 722             } finally {
 723                 readLock.unlock();
 724             }
 725         }
 726 
 727         // set state to ST_KILLPENDING
 728         synchronized (stateLock) {
 729             assert state == ST_CLOSING;
 730             // if connected and the channel is registered with a Selector then
 731             // shutdown the output if possible so that the peer reads EOF. If
 732             // SO_LINGER is enabled and set to a non-zero value then it needs to
 733             // be disabled so that the Selector does not wait when it closes
 734             // the socket.
 735             if (connected && isRegistered()) {
 736                 try {
 737                     SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER;
 738                     int interval = (int) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, opt);
 739                     if (interval != 0) {
 740                         if (interval > 0) {
 741                             // disable SO_LINGER
 742                             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, opt, -1);
 743                         }
 744                         RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 745                     }
 746                 } catch (IOException ignore) { }
 747             }
 748             state = ST_KILLPENDING;
 749         }
 750 
 751         // close socket if not registered with Selector
 752         if (!isRegistered())
 753             kill();
 754 
 755         // restore interrupt status
 756         if (interrupted)
 757             Thread.currentThread().interrupt();
 758     }
 759 
 760     @Override
 761     public void kill() throws IOException {
 762         synchronized (stateLock) {
 763             if (state == ST_KILLPENDING) {
 764                 state = ST_KILLED;
 765                 nd.close(fd);
 766             }
 767         }
 768     }
 769 
 770     @Override
 771     public SocketChannel shutdownInput() throws IOException {
 772         synchronized (stateLock) {
 773             ensureOpen();
 774             if (!isConnected())
 775                 throw new NotYetConnectedException();
 776             if (!isInputClosed) {
 777                 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD);
 778                 long thread = readerThread;
 779                 if (thread != 0)
 780                     NativeThread.signal(thread);
 781                 isInputClosed = true;
 782             }
 783             return this;
 784         }
 785     }
 786 
 787     @Override
 788     public SocketChannel shutdownOutput() throws IOException {
 789         synchronized (stateLock) {
 790             ensureOpen();
 791             if (!isConnected())
 792                 throw new NotYetConnectedException();
 793             if (!isOutputClosed) {
 794                 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 795                 long thread = writerThread;
 796                 if (thread != 0)
 797                     NativeThread.signal(thread);
 798                 isOutputClosed = true;
 799             }
 800             return this;
 801         }
 802     }
 803 
 804     boolean isInputOpen() {
 805         return !isInputClosed;
 806     }
 807 
 808     boolean isOutputOpen() {
 809         return !isOutputClosed;
 810     }
 811 
 812     /**
 813      * Poll this channel's socket for reading up to the given timeout.
 814      * @return {@code true} if the socket is polled
 815      */
 816     boolean pollRead(long timeout) throws IOException {
 817         boolean blocking = isBlocking();
 818         assert Thread.holdsLock(blockingLock()) && blocking;
 819 
 820         readLock.lock();
 821         try {
 822             boolean polled = false;
 823             try {
 824                 beginRead(blocking);
 825                 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout);
 826                 polled = (events != 0);
 827             } finally {
 828                 endRead(blocking, polled);
 829             }
 830             return polled;
 831         } finally {
 832             readLock.unlock();
 833         }
 834     }
 835 
 836     /**
 837      * Poll this channel's socket for a connection, up to the given timeout.
 838      * @return {@code true} if the socket is polled
 839      */
 840     boolean pollConnected(long timeout) throws IOException {
 841         boolean blocking = isBlocking();
 842         assert Thread.holdsLock(blockingLock()) && blocking;
 843 
 844         readLock.lock();
 845         try {
 846             writeLock.lock();
 847             try {
 848                 boolean polled = false;
 849                 try {
 850                     beginFinishConnect(blocking);
 851                     int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout);
 852                     polled = (events != 0);
 853                 } finally {
 854                     // invoke endFinishConnect with completed = false so that
 855                     // the state is not changed to ST_CONNECTED. The socket
 856                     // adaptor will use finishConnect to finish.
 857                     endFinishConnect(blocking, /*completed*/false);
 858                 }
 859                 return polled;
 860             } finally {
 861                 writeLock.unlock();
 862             }
 863         } finally {
 864             readLock.unlock();
 865         }
 866     }
 867 
 868     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
 869         int intOps = ski.nioInterestOps();
 870         int oldOps = ski.nioReadyOps();
 871         int newOps = initialOps;
 872 
 873         if ((ops & Net.POLLNVAL) != 0) {
 874             return false;
 875         }
 876 
 877         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 878             newOps = intOps;
 879             ski.nioReadyOps(newOps);
 880             return (newOps & ~oldOps) != 0;
 881         }
 882 
 883         boolean connected = isConnected();
 884         if (((ops & Net.POLLIN) != 0) &&
 885             ((intOps & SelectionKey.OP_READ) != 0) && connected)
 886             newOps |= SelectionKey.OP_READ;
 887 
 888         if (((ops & Net.POLLCONN) != 0) &&
 889             ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
 890             newOps |= SelectionKey.OP_CONNECT;
 891 
 892         if (((ops & Net.POLLOUT) != 0) &&
 893             ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
 894             newOps |= SelectionKey.OP_WRITE;
 895 
 896         ski.nioReadyOps(newOps);
 897         return (newOps & ~oldOps) != 0;
 898     }
 899 
 900     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 901         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 902     }
 903 
 904     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 905         return translateReadyOps(ops, 0, ski);
 906     }
 907 
 908     public int translateInterestOps(int ops) {
 909         int newOps = 0;
 910         if ((ops & SelectionKey.OP_READ) != 0)
 911             newOps |= Net.POLLIN;
 912         if ((ops & SelectionKey.OP_WRITE) != 0)
 913             newOps |= Net.POLLOUT;
 914         if ((ops & SelectionKey.OP_CONNECT) != 0)
 915             newOps |= Net.POLLCONN;
 916         return newOps;
 917     }
 918 
 919     public FileDescriptor getFD() {
 920         return fd;
 921     }
 922 
 923     public int getFDVal() {
 924         return fdVal;
 925     }
 926 
 927     @Override
 928     public String toString() {
 929         StringBuilder sb = new StringBuilder();
 930         sb.append(this.getClass().getSuperclass().getName());
 931         sb.append('[');
 932         if (!isOpen())
 933             sb.append("closed");
 934         else {
 935             synchronized (stateLock) {
 936                 switch (state) {
 937                 case ST_UNCONNECTED:
 938                     sb.append("unconnected");
 939                     break;
 940                 case ST_CONNECTIONPENDING:
 941                     sb.append("connection-pending");
 942                     break;
 943                 case ST_CONNECTED:
 944                     sb.append("connected");
 945                     if (isInputClosed)
 946                         sb.append(" ishut");
 947                     if (isOutputClosed)
 948                         sb.append(" oshut");
 949                     break;
 950                 }
 951                 InetSocketAddress addr = localAddress();
 952                 if (addr != null) {
 953                     sb.append(" local=");
 954                     sb.append(RdmaNet.getRevealedLocalAddressAsString(addr));
 955                 }
 956                 if (remoteAddress() != null) {
 957                     sb.append(" remote=");
 958                     sb.append(remoteAddress().toString());
 959                 }
 960             }
 961         }
 962         sb.append(']');
 963         return sb.toString();
 964     }
 965 
 966     // -- Native methods --
 967 
 968     private static native void initIDs();
 969 
 970     private static native int checkConnect(FileDescriptor fd, boolean block)
 971         throws IOException;
 972 
 973     private static native int sendOutOfBandData(FileDescriptor fd, byte data)
 974         throws IOException;
 975 
 976     static {
 977         IOUtil.load();
 978         initIDs();
 979         nd = new RdmaSocketDispatcher();
 980     }
 981 
 982 }