1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.rdma;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.Inet4Address;
  31 import java.net.Inet6Address;
  32 import java.net.InetAddress;
  33 import java.net.InetSocketAddress;
  34 import java.net.ProtocolFamily;
  35 import java.net.Socket;
  36 import java.net.SocketAddress;
  37 import java.net.SocketOption;
  38 import java.net.StandardProtocolFamily;
  39 import java.net.StandardSocketOptions;
  40 import java.nio.ByteBuffer;
  41 import java.nio.channels.AlreadyBoundException;
  42 import java.nio.channels.AlreadyConnectedException;
  43 import java.nio.channels.AsynchronousCloseException;
  44 import java.nio.channels.ClosedChannelException;
  45 import java.nio.channels.ConnectionPendingException;
  46 import java.nio.channels.NoConnectionPendingException;
  47 import java.nio.channels.NotYetConnectedException;
  48 import java.nio.channels.SelectionKey;
  49 import java.nio.channels.SocketChannel;
  50 import java.nio.channels.spi.SelectorProvider;
  51 import java.util.Collections;
  52 import java.util.HashSet;
  53 import java.util.Objects;
  54 import java.util.Set;
  55 import java.util.concurrent.locks.ReentrantLock;
  56 import sun.net.ext.RdmaSocketOptions;
  57 import sun.nio.ch.IOStatus;
  58 import sun.nio.ch.IOUtil;
  59 import sun.nio.ch.Net;
  60 import sun.nio.ch.NativeThread;
  61 import sun.nio.ch.SelChImpl;
  62 import sun.nio.ch.SelectionKeyImpl;
  63 
  64 public class RdmaSocketChannelImpl
  65     extends SocketChannel
  66     implements SelChImpl
  67 {
  68     // The protocol family of the socket
  69     private final ProtocolFamily family;
  70     
  71     private static RdmaSocketDispatcher nd;
  72     private final FileDescriptor fd;
  73     private final int fdVal;
  74 
  75     private final ReentrantLock readLock = new ReentrantLock();
  76     private final ReentrantLock writeLock = new ReentrantLock();
  77 
  78     private final Object stateLock = new Object();
  79 
  80     private volatile boolean isInputClosed;
  81     private volatile boolean isOutputClosed;
  82 
  83     private boolean isReuseAddress;
  84 
  85     private static final int ST_UNCONNECTED = 0;
  86     private static final int ST_CONNECTIONPENDING = 1;
  87     private static final int ST_CONNECTED = 2;
  88     private static final int ST_CLOSING = 3;
  89     private static final int ST_KILLPENDING = 4;
  90     private static final int ST_KILLED = 5;
  91     private volatile int state;  // need stateLock to change
  92 
  93     private long readerThread;
  94     private long writerThread;
  95 
  96     private InetSocketAddress localAddress;
  97     private InetSocketAddress remoteAddress;
  98 
  99     private Socket socket;
 100 
 101     private static final UnsupportedOperationException unsupported;
 102 
 103     private static final SelectorProvider checkSupported(SelectorProvider sp) {
 104         if (unsupported != null)
 105             throw new UnsupportedOperationException(unsupported.getMessage(),
 106                     unsupported);
 107         else
 108             return sp;
 109     }
 110 
 111     protected RdmaSocketChannelImpl(SelectorProvider sp, ProtocolFamily family)
 112             throws IOException {
 113         super(checkSupported(sp));
 114 
 115         Objects.requireNonNull(family, "'family' is null");
 116         if ((family != StandardProtocolFamily.INET) &&
 117             (family != StandardProtocolFamily.INET6)) {
 118             throw new UnsupportedOperationException(
 119                     "Protocol family not supported");
 120         }
 121         if (family == StandardProtocolFamily.INET6) {
 122             if (!Net.isIPv6Available()) {
 123                 throw new UnsupportedOperationException("IPv6 not available");
 124             }
 125         }
 126 
 127         this.family = family;
 128         this.fd = RdmaNet.socket(family, true);
 129         this.fdVal = IOUtil.fdVal(fd);
 130     }
 131 
 132     RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd,
 133             InetSocketAddress isa) throws IOException {
 134         super(checkSupported(sp));
 135         this.family = Net.isIPv6Available()
 136                 ? StandardProtocolFamily.INET6
 137                 : StandardProtocolFamily.INET;
 138         this.fd = fd;
 139         this.fdVal = IOUtil.fdVal(fd);
 140         synchronized (stateLock) {
 141             this.localAddress = RdmaNet.localAddress(fd);
 142             this.remoteAddress = isa;
 143             this.state = ST_CONNECTED;
 144         }
 145     }
 146 
 147     private void ensureOpen() throws ClosedChannelException {
 148         if (!isOpen())
 149             throw new ClosedChannelException();
 150     }
 151 
 152     private void ensureOpenAndConnected() throws ClosedChannelException {
 153         int state = this.state;
 154         if (state < ST_CONNECTED) {
 155             throw new NotYetConnectedException();
 156         } else if (state > ST_CONNECTED) {
 157             throw new ClosedChannelException();
 158         }
 159     }
 160 
 161     @Override
 162     public Socket socket() {
 163         synchronized (stateLock) {
 164             if (socket == null)
 165                 socket = RdmaSocketAdaptor.create(this);
 166             return socket;
 167         }
 168     }
 169 
 170     @Override
 171     public SocketAddress getLocalAddress() throws IOException {
 172         synchronized (stateLock) {
 173             ensureOpen();
 174             return Net.getRevealedLocalAddress(localAddress);
 175         }
 176     }
 177 
 178     @Override
 179     public SocketAddress getRemoteAddress() throws IOException {
 180         synchronized (stateLock) {
 181             ensureOpen();
 182             return remoteAddress;
 183         }
 184     }
 185 
 186     @Override
 187     public <T> SocketChannel setOption(SocketOption<T> name, T value)
 188             throws IOException {
 189         Objects.requireNonNull(name);
 190         if (!supportedOptions().contains(name))
 191             throw new UnsupportedOperationException("'" + name
 192                     + "' not supported");
 193 
 194         synchronized (stateLock) {
 195             ensureOpen();
 196             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value);
 197             return this;
 198         }
 199     }
 200 
 201     @Override
 202     @SuppressWarnings("unchecked")
 203     public <T> T getOption(SocketOption<T> name)
 204             throws IOException {
 205         Objects.requireNonNull(name);
 206         if (!supportedOptions().contains(name))
 207             throw new UnsupportedOperationException("'" + name
 208                     + "' not supported");
 209 
 210         synchronized (stateLock) {
 211             ensureOpen();
 212             return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name);
 213         }
 214     }
 215 
 216     private static class DefaultOptionsHolder {
 217         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 218 
 219         private static Set<SocketOption<?>> defaultOptions() {
 220             HashSet<SocketOption<?>> set = new HashSet<>();
 221             set.add(StandardSocketOptions.SO_SNDBUF);
 222             set.add(StandardSocketOptions.SO_RCVBUF);
 223             set.add(StandardSocketOptions.SO_REUSEADDR);
 224             set.add(StandardSocketOptions.TCP_NODELAY);
 225             RdmaSocketOptions rdmaOptions =
 226                     RdmaSocketOptions.getInstance();
 227             set.addAll(rdmaOptions.options());
 228             return Collections.unmodifiableSet(set);
 229         }
 230     }
 231 
 232     public Set<SocketOption<?>> supportedOptions() {
 233          return DefaultOptionsHolder.defaultOptions;
 234     }
 235 
 236     private void beginRead(boolean blocking) throws ClosedChannelException {
 237         if (blocking) {
 238             // set hook for Thread.interrupt
 239             begin();
 240 
 241             synchronized (stateLock) {
 242                 ensureOpenAndConnected();
 243                 // record thread so it can be signalled if needed
 244                 readerThread = NativeThread.current();
 245             }
 246         } else {
 247             ensureOpenAndConnected();
 248         }
 249     }
 250     private void endRead(boolean blocking, boolean completed)
 251             throws AsynchronousCloseException
 252     {
 253         if (blocking) {
 254             synchronized (stateLock) {
 255                 readerThread = 0;
 256                 // notify any thread waiting in implCloseSelectableChannel
 257                 if (state == ST_CLOSING) {
 258                     stateLock.notifyAll();
 259                 }
 260             }
 261             // remove hook for Thread.interrupt
 262             end(completed);
 263         }
 264     }
 265 
 266     @Override
 267     public int read(ByteBuffer buf) throws IOException {
 268         Objects.requireNonNull(buf);
 269 
 270         readLock.lock();
 271         try {
 272             boolean blocking = isBlocking();
 273             int n = 0;
 274             try {
 275                 beginRead(blocking);
 276 
 277                 // check if input is shutdown
 278                 if (isInputClosed)
 279                     return IOStatus.EOF;
 280 
 281                 if (blocking) {
 282                     do {
 283                         n = IOUtil.read(fd, buf, -1, nd);
 284                     } while (n == IOStatus.INTERRUPTED && isOpen());
 285                 } else {
 286                     n = IOUtil.read(fd, buf, -1, nd);
 287                 }
 288             } finally {
 289                 endRead(blocking, n > 0);
 290                 if (n <= 0 && isInputClosed)
 291                     return IOStatus.EOF;
 292             }
 293             return IOStatus.normalize(n);
 294         } finally {
 295             readLock.unlock();
 296         }
 297     }
 298 
 299     @Override
 300     public long read(ByteBuffer[] dsts, int offset, int length)
 301             throws IOException
 302     {
 303         Objects.checkFromIndexSize(offset, length, dsts.length);
 304 
 305         readLock.lock();
 306         try {
 307             boolean blocking = isBlocking();
 308             long n = 0;
 309             try {
 310                 beginRead(blocking);
 311 
 312                 // check if input is shutdown
 313                 if (isInputClosed)
 314                     return IOStatus.EOF;
 315 
 316                 if (blocking) {
 317                     do {
 318                         n = IOUtil.read(fd, dsts, offset, length, nd);
 319                     } while (n == IOStatus.INTERRUPTED && isOpen());
 320                 } else {
 321                     n = IOUtil.read(fd, dsts, offset, length, nd);
 322                 }
 323             } finally {
 324                 endRead(blocking, n > 0);
 325                 if (n <= 0 && isInputClosed)
 326                     return IOStatus.EOF;
 327             }
 328             return IOStatus.normalize(n);
 329         } finally {
 330             readLock.unlock();
 331         }
 332     }
 333 
 334     private void beginWrite(boolean blocking) throws ClosedChannelException {
 335         if (blocking) {
 336             // set hook for Thread.interrupt
 337             begin();
 338 
 339             synchronized (stateLock) {
 340                 ensureOpenAndConnected();
 341                 if (isOutputClosed)
 342                     throw new ClosedChannelException();
 343                 // record thread so it can be signalled if needed
 344                 writerThread = NativeThread.current();
 345             }
 346         } else {
 347             ensureOpenAndConnected();
 348         }
 349     }
 350 
 351     private void endWrite(boolean blocking, boolean completed)
 352             throws AsynchronousCloseException {
 353         if (blocking) {
 354             synchronized (stateLock) {
 355                 writerThread = 0;
 356                 // notify any thread waiting in implCloseSelectableChannel
 357                 if (state == ST_CLOSING) {
 358                     stateLock.notifyAll();
 359                 }
 360             }
 361             // remove hook for Thread.interrupt
 362             end(completed);
 363         }
 364     }
 365 
 366     @Override
 367     public int write(ByteBuffer buf) throws IOException {
 368         Objects.requireNonNull(buf);
 369 
 370         writeLock.lock();
 371         try {
 372             boolean blocking = isBlocking();
 373             int n = 0;
 374             try {
 375                 beginWrite(blocking);
 376                 if (blocking) {
 377                     do {
 378                         n = IOUtil.write(fd, buf, -1, nd);
 379                     } while (n == IOStatus.INTERRUPTED && isOpen());
 380                 } else {
 381                     n = IOUtil.write(fd, buf, -1, nd);
 382                 }
 383             } finally {
 384                 endWrite(blocking, n > 0);
 385                 if (n <= 0 && isOutputClosed)
 386                     throw new AsynchronousCloseException();
 387             }
 388             return IOStatus.normalize(n);
 389         } finally {
 390             writeLock.unlock();
 391         }
 392     }
 393 
 394     @Override
 395     public long write(ByteBuffer[] srcs, int offset, int length)
 396             throws IOException {
 397         Objects.checkFromIndexSize(offset, length, srcs.length);
 398 
 399         writeLock.lock();
 400         try {
 401             boolean blocking = isBlocking();
 402             long n = 0;
 403             try {
 404                 beginWrite(blocking);
 405                 if (blocking) {
 406                     do {
 407                         n = IOUtil.write(fd, srcs, offset, length, nd);
 408                     } while (n == IOStatus.INTERRUPTED && isOpen());
 409                 } else {
 410                     n = IOUtil.write(fd, srcs, offset, length, nd);
 411                 }
 412             } finally {
 413                 endWrite(blocking, n > 0);
 414                 if (n <= 0 && isOutputClosed)
 415                     throw new AsynchronousCloseException();
 416             }
 417             return IOStatus.normalize(n);
 418         } finally {
 419             writeLock.unlock();
 420         }
 421     }
 422 
 423     int sendOutOfBandData(byte b) throws IOException {
 424         writeLock.lock();
 425         try {
 426             boolean blocking = isBlocking();
 427             int n = 0;
 428             try {
 429                 beginWrite(blocking);
 430                 if (blocking) {
 431                     do {
 432                         n = sendOutOfBandData(fd, b);
 433                     } while (n == IOStatus.INTERRUPTED && isOpen());
 434                 } else {
 435                     n = sendOutOfBandData(fd, b);
 436                 }
 437             } finally {
 438                 endWrite(blocking, n > 0);
 439                 if (n <= 0 && isOutputClosed)
 440                     throw new AsynchronousCloseException();
 441             }
 442             return IOStatus.normalize(n);
 443         } finally {
 444             writeLock.unlock();
 445         }
 446     }
 447 
 448     @Override
 449     protected void implConfigureBlocking(boolean block) throws IOException {
 450         readLock.lock();
 451         try {
 452             writeLock.lock();
 453             try {
 454                 synchronized (stateLock) {
 455                     ensureOpen();
 456                     RdmaNet.configureBlocking(fd, block);
 457                 }
 458             } finally {
 459                 writeLock.unlock();
 460             }
 461         } finally {
 462             readLock.unlock();
 463         }
 464     }
 465 
 466     InetSocketAddress localAddress() {
 467         synchronized (stateLock) {
 468             return localAddress;
 469         }
 470     }
 471 
 472     InetSocketAddress remoteAddress() {
 473         synchronized (stateLock) {
 474             return remoteAddress;
 475         }
 476     }
 477 
 478     @Override
 479     public SocketChannel bind(SocketAddress local) throws IOException {
 480         readLock.lock();
 481         try {
 482             writeLock.lock();
 483             try {
 484                 synchronized (stateLock) {
 485                     ensureOpen();
 486                     if (state == ST_CONNECTIONPENDING)
 487                         throw new ConnectionPendingException();
 488                     if (localAddress != null)
 489                         throw new AlreadyBoundException();
 490                     InetSocketAddress isa = (local == null) ?
 491                         new InetSocketAddress(0)
 492                         : RdmaNet.checkAddress(local, family);
 493                     SecurityManager sm = System.getSecurityManager();
 494                     if (sm != null) {
 495                         sm.checkListen(isa.getPort());
 496                     }
 497                     RdmaNet.bind(family, fd, isa.getAddress(), isa.getPort());
 498                     localAddress = RdmaNet.localAddress(fd);
 499                 }
 500             } finally {
 501                 writeLock.unlock();
 502             }
 503         } finally {
 504             readLock.unlock();
 505         }
 506         return this;
 507     }
 508 
 509     @Override
 510     public boolean isConnected() {
 511         return (state == ST_CONNECTED);
 512     }
 513 
 514     @Override
 515     public boolean isConnectionPending() {
 516         return (state == ST_CONNECTIONPENDING);
 517     }
 518 
 519     private void beginConnect(boolean blocking, InetSocketAddress isa)
 520             throws IOException {
 521         if (blocking) {
 522             // set hook for Thread.interrupt
 523             begin();
 524         }
 525         synchronized (stateLock) {
 526             ensureOpen();
 527             int state = this.state;
 528             if (state == ST_CONNECTED)
 529                 throw new AlreadyConnectedException();
 530             if (state == ST_CONNECTIONPENDING)
 531                 throw new ConnectionPendingException();
 532             assert state == ST_UNCONNECTED;
 533             this.state = ST_CONNECTIONPENDING;
 534 
 535             remoteAddress = isa;
 536 
 537             if (blocking) {
 538                 // record thread so it can be signalled if needed
 539                 readerThread = NativeThread.current();
 540             }
 541         }
 542     }
 543 
 544     private void endConnect(boolean blocking, boolean completed)
 545             throws IOException {
 546         endRead(blocking, completed);
 547 
 548         if (completed) {
 549             synchronized (stateLock) {
 550                 if (state == ST_CONNECTIONPENDING) {
 551                     localAddress = RdmaNet.localAddress(fd);
 552                     state = ST_CONNECTED;
 553                 }
 554             }
 555         }
 556     }
 557 
 558     @Override
 559     public boolean connect(SocketAddress sa) throws IOException {
 560         InetSocketAddress isa = RdmaNet.checkAddress(sa, family);
 561         SecurityManager sm = System.getSecurityManager();
 562         if (sm != null)
 563             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 564 
 565         InetAddress ia = isa.getAddress();
 566         if (ia.isAnyLocalAddress())
 567             ia = InetAddress.getLocalHost();
 568 
 569         try {
 570             readLock.lock();
 571             try {
 572                 writeLock.lock();
 573                 try {
 574                     int n = 0;
 575                     boolean blocking = isBlocking();
 576                     try {
 577                         beginConnect(blocking, isa);
 578                         do {
 579                             n = RdmaNet.connect(family, fd, ia, isa.getPort());
 580                         } while (n == IOStatus.INTERRUPTED && isOpen());
 581                     } finally {
 582                         endConnect(blocking, (n > 0));
 583                     }
 584                     assert IOStatus.check(n);
 585                     return n > 0;
 586                 } finally {
 587                     writeLock.unlock();
 588                 }
 589             } finally {
 590                 readLock.unlock();
 591             }
 592         } catch (IOException ioe) {
 593             // connect failed, close the channel
 594             close();
 595             throw ioe;
 596         }
 597     }
 598 
 599     private void beginFinishConnect(boolean blocking)
 600             throws ClosedChannelException {
 601         if (blocking) {
 602             // set hook for Thread.interrupt
 603             begin();
 604         }
 605         synchronized (stateLock) {
 606             ensureOpen();
 607             if (state != ST_CONNECTIONPENDING)
 608                 throw new NoConnectionPendingException();
 609             if (blocking) {
 610                 // record thread so it can be signalled if needed
 611                 readerThread = NativeThread.current();
 612             }
 613         }
 614     }
 615 
 616     private void endFinishConnect(boolean blocking, boolean completed)
 617             throws IOException
 618     {
 619         endRead(blocking, completed);
 620 
 621         if (completed) {
 622             synchronized (stateLock) {
 623                 if (state == ST_CONNECTIONPENDING) {
 624                     localAddress = RdmaNet.localAddress(fd);
 625                     state = ST_CONNECTED;
 626                 }
 627             }
 628         }
 629     }
 630 
 631     @Override
 632     public boolean finishConnect() throws IOException {
 633         try {
 634             readLock.lock();
 635             try {
 636                 writeLock.lock();
 637                 try {
 638                     // no-op if already connected
 639                     if (isConnected())
 640                         return true;
 641 
 642                     boolean blocking = isBlocking();
 643                     boolean connected = false;
 644                     try {
 645                         beginFinishConnect(blocking);
 646                         int n = 0;
 647                         if (blocking) {
 648                             do {
 649                                 n = checkConnect(fd, true);
 650                             } while ((n == 0 || n == IOStatus.INTERRUPTED)
 651                                     && isOpen());
 652                         } else {
 653                             n = checkConnect(fd, false);
 654                         }
 655                         connected = (n > 0);
 656                     } finally {
 657                         endFinishConnect(blocking, connected);
 658                     }
 659                     assert (blocking && connected) ^ !blocking;
 660                     return connected;
 661                 } finally {
 662                     writeLock.unlock();
 663                 }
 664             } finally {
 665                 readLock.unlock();
 666             }
 667         } catch (IOException ioe) {
 668             // connect failed, close the channel
 669             close();
 670             throw ioe;
 671         }
 672     }
 673 
 674     @Override
 675     protected void implCloseSelectableChannel() throws IOException {
 676         assert !isOpen();
 677         boolean blocking;
 678         boolean connected;
 679         boolean interrupted = false;
 680 
 681         // set state to ST_CLOSING
 682         synchronized (stateLock) {
 683             assert state < ST_CLOSING;
 684             blocking = isBlocking();
 685             connected = (state == ST_CONNECTED);
 686             state = ST_CLOSING;
 687         }
 688 
 689         // wait for any outstanding I/O operations to complete
 690         if (blocking) {
 691             synchronized (stateLock) {
 692                 assert state == ST_CLOSING;
 693                 long reader = readerThread;
 694                 long writer = writerThread;
 695                 if (reader != 0 || writer != 0) {
 696                     nd.preClose(fd);
 697                     connected = false; // fd is no longer connected socket
 698 
 699                     if (reader != 0)
 700                         NativeThread.signal(reader);
 701                     if (writer != 0)
 702                         NativeThread.signal(writer);
 703 
 704                     // wait for blocking I/O operations to end
 705                     while (readerThread != 0 || writerThread != 0) {
 706                         try {
 707                             stateLock.wait();
 708                         } catch (InterruptedException e) {
 709                             interrupted = true;
 710                         }
 711                     }
 712                 }
 713             }
 714         } else {
 715             // non-blocking mode: wait for read/write to complete
 716             readLock.lock();
 717             try {
 718                 writeLock.lock();
 719                 writeLock.unlock();
 720             } finally {
 721                 readLock.unlock();
 722             }
 723         }
 724 
 725         // set state to ST_KILLPENDING
 726         synchronized (stateLock) {
 727             assert state == ST_CLOSING;
 728             // if connected and the channel is registered with a Selector then
 729             // shutdown the output if possible so that the peer reads EOF.
 730             if (connected && isRegistered()) {
 731                 try {
 732                     RdmaNet.shutdown(fd, Net.SHUT_WR);
 733                 } catch (IOException ignore) { }
 734             }
 735             state = ST_KILLPENDING;
 736         }
 737 
 738         // close socket if not registered with Selector
 739         if (!isRegistered())
 740             kill();
 741 
 742         // restore interrupt status
 743         if (interrupted)
 744             Thread.currentThread().interrupt();
 745     }
 746 
 747     @Override
 748     public void kill() throws IOException {
 749         synchronized (stateLock) {
 750             if (state == ST_KILLPENDING) {
 751                 state = ST_KILLED;
 752                 nd.close(fd);
 753             }
 754         }
 755     }
 756 
 757     @Override
 758     public SocketChannel shutdownInput() throws IOException {
 759         synchronized (stateLock) {
 760             ensureOpen();
 761             if (!isConnected())
 762                 throw new NotYetConnectedException();
 763             if (!isInputClosed) {
 764                 RdmaNet.shutdown(fd, Net.SHUT_RD);
 765                 long thread = readerThread;
 766                 if (thread != 0)
 767                     NativeThread.signal(thread);
 768                 isInputClosed = true;
 769             }
 770             return this;
 771         }
 772     }
 773 
 774     @Override
 775     public SocketChannel shutdownOutput() throws IOException {
 776         synchronized (stateLock) {
 777             ensureOpen();
 778             if (!isConnected())
 779                 throw new NotYetConnectedException();
 780             if (!isOutputClosed) {
 781                 RdmaNet.shutdown(fd, Net.SHUT_WR);
 782                 long thread = writerThread;
 783                 if (thread != 0)
 784                     NativeThread.signal(thread);
 785                 isOutputClosed = true;
 786             }
 787             return this;
 788         }
 789     }
 790 
 791     boolean isInputOpen() {
 792         return !isInputClosed;
 793     }
 794 
 795     boolean isOutputOpen() {
 796         return !isOutputClosed;
 797     }
 798 
 799     /**
 800      * Poll this channel's socket for reading up to the given timeout.
 801      * @return {@code true} if the socket is polled
 802      */
 803     boolean pollRead(long timeout) throws IOException {
 804         boolean blocking = isBlocking();
 805         assert Thread.holdsLock(blockingLock()) && blocking;
 806 
 807         readLock.lock();
 808         try {
 809             boolean polled = false;
 810             try {
 811                 beginRead(blocking);
 812                 int events = RdmaNet.poll(fd, Net.POLLIN, timeout);
 813                 polled = (events != 0);
 814             } finally {
 815                 endRead(blocking, polled);
 816             }
 817             return polled;
 818         } finally {
 819             readLock.unlock();
 820         }
 821     }
 822 
 823     /**
 824      * Poll this channel's socket for a connection, up to the given timeout.
 825      * @return {@code true} if the socket is polled
 826      */
 827     boolean pollConnected(long timeout) throws IOException {
 828         boolean blocking = isBlocking();
 829         assert Thread.holdsLock(blockingLock()) && blocking;
 830 
 831         readLock.lock();
 832         try {
 833             writeLock.lock();
 834             try {
 835                 boolean polled = false;
 836                 try {
 837                     beginFinishConnect(blocking);
 838                     int events = RdmaNet.poll(fd, Net.POLLCONN, timeout);
 839                     polled = (events != 0);
 840                 } finally {
 841                     // invoke endFinishConnect with completed = false so that
 842                     // the state is not changed to ST_CONNECTED. The socket
 843                     // adaptor will use finishConnect to finish.
 844                     endFinishConnect(blocking, /*completed*/false);
 845                 }
 846                 return polled;
 847             } finally {
 848                 writeLock.unlock();
 849             }
 850         } finally {
 851             readLock.unlock();
 852         }
 853     }
 854 
 855     public boolean translateReadyOps(int ops, int initialOps,
 856             SelectionKeyImpl ski) {
 857         int intOps = ski.nioInterestOps();
 858         int oldOps = ski.nioReadyOps();
 859         int newOps = initialOps;
 860 
 861         if ((ops & Net.POLLNVAL) != 0) {
 862             return false;
 863         }
 864 
 865         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 866             newOps = intOps;
 867             ski.nioReadyOps(newOps);
 868             return (newOps & ~oldOps) != 0;
 869         }
 870 
 871         boolean connected = isConnected();
 872         if (((ops & Net.POLLIN) != 0) &&
 873             ((intOps & SelectionKey.OP_READ) != 0) && connected)
 874             newOps |= SelectionKey.OP_READ;
 875 
 876         if (((ops & Net.POLLCONN) != 0) &&
 877             ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
 878             newOps |= SelectionKey.OP_CONNECT;
 879 
 880         if (((ops & Net.POLLOUT) != 0) &&
 881             ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
 882             newOps |= SelectionKey.OP_WRITE;
 883 
 884         ski.nioReadyOps(newOps);
 885         return (newOps & ~oldOps) != 0;
 886     }
 887 
 888     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 889         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 890     }
 891 
 892     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 893         return translateReadyOps(ops, 0, ski);
 894     }
 895 
 896     public int translateInterestOps(int ops) {
 897         int newOps = 0;
 898         if ((ops & SelectionKey.OP_READ) != 0)
 899             newOps |= Net.POLLIN;
 900         if ((ops & SelectionKey.OP_WRITE) != 0)
 901             newOps |= Net.POLLOUT;
 902         if ((ops & SelectionKey.OP_CONNECT) != 0)
 903             newOps |= Net.POLLCONN;
 904         return newOps;
 905     }
 906 
 907     public FileDescriptor getFD() {
 908         return fd;
 909     }
 910 
 911     public int getFDVal() {
 912         return fdVal;
 913     }
 914 
 915     @Override
 916     public String toString() {
 917         StringBuilder sb = new StringBuilder();
 918         sb.append(this.getClass().getSuperclass().getName());
 919         sb.append('[');
 920         if (!isOpen())
 921             sb.append("closed");
 922         else {
 923             synchronized (stateLock) {
 924                 switch (state) {
 925                 case ST_UNCONNECTED:
 926                     sb.append("unconnected");
 927                     break;
 928                 case ST_CONNECTIONPENDING:
 929                     sb.append("connection-pending");
 930                     break;
 931                 case ST_CONNECTED:
 932                     sb.append("connected");
 933                     if (isInputClosed)
 934                         sb.append(" ishut");
 935                     if (isOutputClosed)
 936                         sb.append(" oshut");
 937                     break;
 938                 }
 939                 InetSocketAddress addr = localAddress();
 940                 if (addr != null) {
 941                     sb.append(" local=");
 942                     sb.append(Net.getRevealedLocalAddressAsString(addr));
 943                 }
 944                 if (remoteAddress() != null) {
 945                     sb.append(" remote=");
 946                     sb.append(remoteAddress().toString());
 947                 }
 948             }
 949         }
 950         sb.append(']');
 951         return sb.toString();
 952     }
 953 
 954     // -- Native methods --
 955 
 956     private static native void initIDs() throws UnsupportedOperationException;
 957 
 958     private static native int checkConnect(FileDescriptor fd, boolean block)
 959             throws IOException;
 960 
 961     private static native int sendOutOfBandData(FileDescriptor fd, byte data)
 962             throws IOException;
 963 
 964     static {
 965         IOUtil.load();
 966         System.loadLibrary("extnet");
 967         UnsupportedOperationException uoe = null;
 968         try {
 969             initIDs();
 970         } catch (UnsupportedOperationException e) {
 971             uoe = e;
 972         }
 973         unsupported = uoe;
 974         nd = new RdmaSocketDispatcher();
 975     }
 976 }