1 /*
   2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package rdma.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.InetAddress;
  31 import java.net.InetSocketAddress;
  32 import java.net.Socket;
  33 import java.net.SocketAddress;
  34 import java.net.SocketOption;
  35 import java.net.StandardSocketOptions;
  36 import java.nio.ByteBuffer;
  37 import java.nio.channels.AlreadyBoundException;
  38 import java.nio.channels.AlreadyConnectedException;
  39 import java.nio.channels.AsynchronousCloseException;
  40 import java.nio.channels.ClosedChannelException;
  41 import java.nio.channels.ConnectionPendingException;
  42 import java.nio.channels.NoConnectionPendingException;
  43 import java.nio.channels.NotYetConnectedException;
  44 import java.nio.channels.SelectionKey;
  45 import java.nio.channels.SocketChannel;
  46 import java.nio.channels.spi.SelectorProvider;
  47 import java.util.Collections;
  48 import java.util.HashSet;
  49 import java.util.Objects;
  50 import java.util.Set;
  51 import java.util.concurrent.locks.ReentrantLock;
  52 import sun.net.ext.RdmaSocketOptions;
  53 import sun.nio.ch.IOStatus;
  54 import sun.nio.ch.IOUtil;
  55 import sun.nio.ch.Net;
  56 import sun.nio.ch.NativeThread;
  57 import sun.nio.ch.SelChImpl;
  58 import sun.nio.ch.SelectionKeyImpl;
  59 
  60 public class RdmaSocketChannelImpl
  61     extends SocketChannel
  62     implements SelChImpl
  63 {
  64     private static RdmaSocketDispatcher nd;
  65     private final FileDescriptor fd;
  66     private final int fdVal;
  67 
  68     private final ReentrantLock readLock = new ReentrantLock();
  69     private final ReentrantLock writeLock = new ReentrantLock();
  70 
  71     private final Object stateLock = new Object();
  72 
  73     private volatile boolean isInputClosed;
  74     private volatile boolean isOutputClosed;
  75 
  76     private boolean isReuseAddress;
  77 
  78     private static final int ST_UNCONNECTED = 0;
  79     private static final int ST_CONNECTIONPENDING = 1;
  80     private static final int ST_CONNECTED = 2;
  81     private static final int ST_CLOSING = 3;
  82     private static final int ST_KILLPENDING = 4;
  83     private static final int ST_KILLED = 5;
  84     private volatile int state;  // need stateLock to change
  85 
  86     private long readerThread;
  87     private long writerThread;
  88 
  89     private InetSocketAddress localAddress;
  90     private InetSocketAddress remoteAddress;
  91 
  92     private Socket socket;
  93 
  94     protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException {
  95         super(sp);
  96         this.fd = RdmaNet.socket(true);
  97         this.fdVal = IOUtil.fdVal(fd);
  98     }
  99 
 100     protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
 101         throws IOException
 102     {
 103         super(sp);
 104         this.fd = fd;
 105         this.fdVal = IOUtil.fdVal(fd);
 106         if (bound) {
 107             synchronized (stateLock) {
 108                 this.localAddress = RdmaNet.localAddress(fd);
 109             }
 110         }
 111     }
 112 
 113     RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
 114         throws IOException
 115     {
 116         super(sp);
 117         this.fd = fd;
 118         this.fdVal = IOUtil.fdVal(fd);
 119         synchronized (stateLock) {
 120             this.localAddress = RdmaNet.localAddress(fd);
 121             this.remoteAddress = isa;
 122             this.state = ST_CONNECTED;
 123         }
 124     }
 125 
 126     private void ensureOpen() throws ClosedChannelException {
 127         if (!isOpen())
 128             throw new ClosedChannelException();
 129     }
 130 
 131     private void ensureOpenAndConnected() throws ClosedChannelException {
 132         int state = this.state;
 133         if (state < ST_CONNECTED) {
 134             throw new NotYetConnectedException();
 135         } else if (state > ST_CONNECTED) {
 136             throw new ClosedChannelException();
 137         }
 138     }
 139 
 140     @Override
 141     public Socket socket() {
 142         synchronized (stateLock) {
 143             if (socket == null)
 144                 socket = RdmaSocketAdaptor.create(this);
 145             return socket;
 146         }
 147     }
 148 
 149     @Override
 150     public SocketAddress getLocalAddress() throws IOException {
 151         synchronized (stateLock) {
 152             ensureOpen();
 153             return RdmaNet.getRevealedLocalAddress(localAddress);
 154         }
 155     }
 156 
 157     @Override
 158     public SocketAddress getRemoteAddress() throws IOException {
 159         synchronized (stateLock) {
 160             ensureOpen();
 161             return remoteAddress;
 162         }
 163     }
 164 
 165     @Override
 166     public <T> SocketChannel setOption(SocketOption<T> name, T value)
 167         throws IOException
 168     {
 169         Objects.requireNonNull(name);
 170         if (!supportedOptions().contains(name))
 171             throw new UnsupportedOperationException("'" + name + "' not supported");
 172 
 173         synchronized (stateLock) {
 174             ensureOpen();
 175 
 176             if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) {
 177                 isReuseAddress = (Boolean)value;
 178                 return this;
 179             }
 180 
 181             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value);
 182             return this;
 183         }
 184     }
 185 
 186     @Override
 187     @SuppressWarnings("unchecked")
 188     public <T> T getOption(SocketOption<T> name)
 189         throws IOException
 190     {
 191         Objects.requireNonNull(name);
 192         if (!supportedOptions().contains(name))
 193             throw new UnsupportedOperationException("'" + name + "' not supported");
 194 
 195         synchronized (stateLock) {
 196             ensureOpen();
 197 
 198             if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) {
 199                 return (T)Boolean.valueOf(isReuseAddress);
 200             }
 201 
 202             return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name);
 203         }
 204     }
 205 
 206     private static class DefaultOptionsHolder {
 207         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 208 
 209         private static Set<SocketOption<?>> defaultOptions() {
 210             HashSet<SocketOption<?>> set = new HashSet<>();
 211             set.add(StandardSocketOptions.SO_SNDBUF);
 212             set.add(StandardSocketOptions.SO_RCVBUF);
 213             set.add(StandardSocketOptions.SO_KEEPALIVE);
 214             set.add(StandardSocketOptions.SO_REUSEADDR);
 215             set.add(StandardSocketOptions.TCP_NODELAY);
 216             RdmaSocketOptions rdmaOptions =
 217                     RdmaSocketOptions.getInstance();
 218             set.addAll(rdmaOptions.options());
 219             return Collections.unmodifiableSet(set);
 220         }
 221     }
 222 
 223     public Set<SocketOption<?>> supportedOptions() {
 224          return DefaultOptionsHolder.defaultOptions;
 225     }
 226 
 227     private void beginRead(boolean blocking) throws ClosedChannelException {
 228         if (blocking) {
 229             // set hook for Thread.interrupt
 230             begin();
 231 
 232             synchronized (stateLock) {
 233                 ensureOpenAndConnected();
 234                 // record thread so it can be signalled if needed
 235                 readerThread = NativeThread.current();
 236             }
 237         } else {
 238             ensureOpenAndConnected();
 239         }
 240     }
 241     private void endRead(boolean blocking, boolean completed)
 242         throws AsynchronousCloseException
 243     {
 244         if (blocking) {
 245             synchronized (stateLock) {
 246                 readerThread = 0;
 247                 // notify any thread waiting in implCloseSelectableChannel
 248                 if (state == ST_CLOSING) {
 249                     stateLock.notifyAll();
 250                 }
 251             }
 252             // remove hook for Thread.interrupt
 253             end(completed);
 254         }
 255     }
 256 
 257     @Override
 258     public int read(ByteBuffer buf) throws IOException {
 259         Objects.requireNonNull(buf);
 260 
 261         readLock.lock();
 262         try {
 263             boolean blocking = isBlocking();
 264             int n = 0;
 265             try {
 266                 beginRead(blocking);
 267 
 268                 // check if input is shutdown
 269                 if (isInputClosed)
 270                     return IOStatus.EOF;
 271 
 272                 if (blocking) {
 273                     do {
 274                         n = IOUtil.read(fd, buf, -1, nd);
 275                     } while (n == IOStatus.INTERRUPTED && isOpen());
 276                 } else {
 277                     n = IOUtil.read(fd, buf, -1, nd);
 278                 }
 279             } finally {
 280                 endRead(blocking, n > 0);
 281                 if (n <= 0 && isInputClosed)
 282                     return IOStatus.EOF;
 283             }
 284             return IOStatus.normalize(n);
 285         } finally {
 286             readLock.unlock();
 287         }
 288     }
 289 
 290     @Override
 291     public long read(ByteBuffer[] dsts, int offset, int length)
 292         throws IOException
 293     {
 294         Objects.checkFromIndexSize(offset, length, dsts.length);
 295 
 296         readLock.lock();
 297         try {
 298             boolean blocking = isBlocking();
 299             long n = 0;
 300             try {
 301                 beginRead(blocking);
 302 
 303                 // check if input is shutdown
 304                 if (isInputClosed)
 305                     return IOStatus.EOF;
 306 
 307                 if (blocking) {
 308                     do {
 309                         n = IOUtil.read(fd, dsts, offset, length, nd);
 310                     } while (n == IOStatus.INTERRUPTED && isOpen());
 311                 } else {
 312                     n = IOUtil.read(fd, dsts, offset, length, nd);
 313                 }
 314             } finally {
 315                 endRead(blocking, n > 0);
 316                 if (n <= 0 && isInputClosed)
 317                     return IOStatus.EOF;
 318             }
 319             return IOStatus.normalize(n);
 320         } finally {
 321             readLock.unlock();
 322         }
 323     }
 324 
 325     private void beginWrite(boolean blocking) throws ClosedChannelException {
 326         if (blocking) {
 327             // set hook for Thread.interrupt
 328             begin();
 329 
 330             synchronized (stateLock) {
 331                 ensureOpenAndConnected();
 332                 if (isOutputClosed)
 333                     throw new ClosedChannelException();
 334                 // record thread so it can be signalled if needed
 335                 writerThread = NativeThread.current();
 336             }
 337         } else {
 338             ensureOpenAndConnected();
 339         }
 340     }
 341 
 342     private void endWrite(boolean blocking, boolean completed)
 343         throws AsynchronousCloseException
 344     {
 345         if (blocking) {
 346             synchronized (stateLock) {
 347                 writerThread = 0;
 348                 // notify any thread waiting in implCloseSelectableChannel
 349                 if (state == ST_CLOSING) {
 350                     stateLock.notifyAll();
 351                 }
 352             }
 353             // remove hook for Thread.interrupt
 354             end(completed);
 355         }
 356     }
 357 
 358     @Override
 359     public int write(ByteBuffer buf) throws IOException {
 360         Objects.requireNonNull(buf);
 361 
 362         writeLock.lock();
 363         try {
 364             boolean blocking = isBlocking();
 365             int n = 0;
 366             try {
 367                 beginWrite(blocking);
 368                 if (blocking) {
 369                     do {
 370                         n = IOUtil.write(fd, buf, -1, nd);
 371                     } while (n == IOStatus.INTERRUPTED && isOpen());
 372                 } else {
 373                     n = IOUtil.write(fd, buf, -1, nd);
 374                 }
 375             } finally {
 376                 endWrite(blocking, n > 0);
 377                 if (n <= 0 && isOutputClosed)
 378                     throw new AsynchronousCloseException();
 379             }
 380             return IOStatus.normalize(n);
 381         } finally {
 382             writeLock.unlock();
 383         }
 384     }
 385 
 386     @Override
 387     public long write(ByteBuffer[] srcs, int offset, int length)
 388         throws IOException
 389     {
 390         Objects.checkFromIndexSize(offset, length, srcs.length);
 391 
 392         writeLock.lock();
 393         try {
 394             boolean blocking = isBlocking();
 395             long n = 0;
 396             try {
 397                 beginWrite(blocking);
 398                 if (blocking) {
 399                     do {
 400                         n = IOUtil.write(fd, srcs, offset, length, nd);
 401                     } while (n == IOStatus.INTERRUPTED && isOpen());
 402                 } else {
 403                     n = IOUtil.write(fd, srcs, offset, length, nd);
 404                 }
 405             } finally {
 406                 endWrite(blocking, n > 0);
 407                 if (n <= 0 && isOutputClosed)
 408                     throw new AsynchronousCloseException();
 409             }
 410             return IOStatus.normalize(n);
 411         } finally {
 412             writeLock.unlock();
 413         }
 414     }
 415 
 416     int sendOutOfBandData(byte b) throws IOException {
 417         writeLock.lock();
 418         try {
 419             boolean blocking = isBlocking();
 420             int n = 0;
 421             try {
 422                 beginWrite(blocking);
 423                 if (blocking) {
 424                     do {
 425                         n = sendOutOfBandData(fd, b);
 426                     } while (n == IOStatus.INTERRUPTED && isOpen());
 427                 } else {
 428                     n = sendOutOfBandData(fd, b);
 429                 }
 430             } finally {
 431                 endWrite(blocking, n > 0);
 432                 if (n <= 0 && isOutputClosed)
 433                     throw new AsynchronousCloseException();
 434             }
 435             return IOStatus.normalize(n);
 436         } finally {
 437             writeLock.unlock();
 438         }
 439     }
 440 
 441     @Override
 442     protected void implConfigureBlocking(boolean block) throws IOException {
 443         readLock.lock();
 444         try {
 445             writeLock.lock();
 446             try {
 447                 synchronized (stateLock) {
 448                     ensureOpen();
 449                     RdmaNet.configureBlocking(fd, block);
 450                 }
 451             } finally {
 452                 writeLock.unlock();
 453             }
 454         } finally {
 455             readLock.unlock();
 456         }
 457     }
 458 
 459     InetSocketAddress localAddress() {
 460         synchronized (stateLock) {
 461             return localAddress;
 462         }
 463     }
 464 
 465     InetSocketAddress remoteAddress() {
 466         synchronized (stateLock) {
 467             return remoteAddress;
 468         }
 469     }
 470 
 471     @Override
 472     public SocketChannel bind(SocketAddress local) throws IOException {
 473         readLock.lock();
 474         try {
 475             writeLock.lock();
 476             try {
 477                 synchronized (stateLock) {
 478                     ensureOpen();
 479                     if (state == ST_CONNECTIONPENDING)
 480                         throw new ConnectionPendingException();
 481                     if (localAddress != null)
 482                         throw new AlreadyBoundException();
 483                     InetSocketAddress isa = (local == null) ?
 484                         new InetSocketAddress(0) : RdmaNet.checkAddress(local);
 485                     SecurityManager sm = System.getSecurityManager();
 486                     if (sm != null) {
 487                         sm.checkListen(isa.getPort());
 488                     }
 489                     RdmaNet.bind(fd, isa.getAddress(), isa.getPort());
 490                     localAddress = RdmaNet.localAddress(fd);
 491                 }
 492             } finally {
 493                 writeLock.unlock();
 494             }
 495         } finally {
 496             readLock.unlock();
 497         }
 498         return this;
 499     }
 500 
 501     @Override
 502     public boolean isConnected() {
 503         return (state == ST_CONNECTED);
 504     }
 505 
 506     @Override
 507     public boolean isConnectionPending() {
 508         return (state == ST_CONNECTIONPENDING);
 509     }
 510 
 511     private void beginConnect(boolean blocking, InetSocketAddress isa)
 512         throws IOException
 513     {
 514         if (blocking) {
 515             // set hook for Thread.interrupt
 516             begin();
 517         }
 518         synchronized (stateLock) {
 519             ensureOpen();
 520             int state = this.state;
 521             if (state == ST_CONNECTED)
 522                 throw new AlreadyConnectedException();
 523             if (state == ST_CONNECTIONPENDING)
 524                 throw new ConnectionPendingException();
 525             assert state == ST_UNCONNECTED;
 526             this.state = ST_CONNECTIONPENDING;
 527 
 528             remoteAddress = isa;
 529 
 530             if (blocking) {
 531                 // record thread so it can be signalled if needed
 532                 readerThread = NativeThread.current();
 533             }
 534         }
 535     }
 536 
 537     private void endConnect(boolean blocking, boolean completed)
 538         throws IOException
 539     {
 540         endRead(blocking, completed);
 541 
 542         if (completed) {
 543             synchronized (stateLock) {
 544                 if (state == ST_CONNECTIONPENDING) {
 545                     localAddress = RdmaNet.localAddress(fd);
 546                     state = ST_CONNECTED;
 547                 }
 548             }
 549         }
 550     }
 551 
 552     @Override
 553     public boolean connect(SocketAddress sa) throws IOException {
 554         InetSocketAddress isa = RdmaNet.checkAddress(sa);
 555         SecurityManager sm = System.getSecurityManager();
 556         if (sm != null)
 557             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 558 
 559         InetAddress ia = isa.getAddress();
 560         if (ia.isAnyLocalAddress())
 561             ia = InetAddress.getLocalHost();
 562 
 563         try {
 564             readLock.lock();
 565             try {
 566                 writeLock.lock();
 567                 try {
 568                     int n = 0;
 569                     boolean blocking = isBlocking();
 570                     try {
 571                         beginConnect(blocking, isa);
 572                         do {
 573                             n = RdmaNet.connect(fd, ia, isa.getPort());
 574                         } while (n == IOStatus.INTERRUPTED && isOpen());
 575                     } finally {
 576                         endConnect(blocking, (n > 0));
 577                     }
 578                     assert IOStatus.check(n);
 579                     return n > 0;
 580                 } finally {
 581                     writeLock.unlock();
 582                 }
 583             } finally {
 584                 readLock.unlock();
 585             }
 586         } catch (IOException ioe) {
 587             // connect failed, close the channel
 588             close();
 589             throw ioe;
 590         }
 591     }
 592 
 593     private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
 594         if (blocking) {
 595             // set hook for Thread.interrupt
 596             begin();
 597         }
 598         synchronized (stateLock) {
 599             ensureOpen();
 600             if (state != ST_CONNECTIONPENDING)
 601                 throw new NoConnectionPendingException();
 602             if (blocking) {
 603                 // record thread so it can be signalled if needed
 604                 readerThread = NativeThread.current();
 605             }
 606         }
 607     }
 608 
 609     private void endFinishConnect(boolean blocking, boolean completed)
 610         throws IOException
 611     {
 612         endRead(blocking, completed);
 613 
 614         if (completed) {
 615             synchronized (stateLock) {
 616                 if (state == ST_CONNECTIONPENDING) {
 617                     localAddress = RdmaNet.localAddress(fd);
 618                     state = ST_CONNECTED;
 619                 }
 620             }
 621         }
 622     }
 623 
 624     @Override
 625     public boolean finishConnect() throws IOException {
 626         try {
 627             readLock.lock();
 628             try {
 629                 writeLock.lock();
 630                 try {
 631                     // no-op if already connected
 632                     if (isConnected())
 633                         return true;
 634 
 635                     boolean blocking = isBlocking();
 636                     boolean connected = false;
 637                     try {
 638                         beginFinishConnect(blocking);
 639                         int n = 0;
 640                         if (blocking) {
 641                             do {
 642                                 n = checkConnect(fd, true);
 643                             } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen());
 644                         } else {
 645                             n = checkConnect(fd, false);
 646                         }
 647                         connected = (n > 0);
 648                     } finally {
 649                         endFinishConnect(blocking, connected);
 650                     }
 651                     assert (blocking && connected) ^ !blocking;
 652                     return connected;
 653                 } finally {
 654                     writeLock.unlock();
 655                 }
 656             } finally {
 657                 readLock.unlock();
 658             }
 659         } catch (IOException ioe) {
 660             // connect failed, close the channel
 661             close();
 662             throw ioe;
 663         }
 664     }
 665 
 666     @Override
 667     protected void implCloseSelectableChannel() throws IOException {
 668         assert !isOpen();
 669 
 670         boolean blocking;
 671         boolean connected;
 672         boolean interrupted = false;
 673 
 674         // set state to ST_CLOSING
 675         synchronized (stateLock) {
 676             assert state < ST_CLOSING;
 677             blocking = isBlocking();
 678             connected = (state == ST_CONNECTED);
 679             state = ST_CLOSING;
 680         }
 681 
 682         // wait for any outstanding I/O operations to complete
 683         if (blocking) {
 684             synchronized (stateLock) {
 685                 assert state == ST_CLOSING;
 686                 long reader = readerThread;
 687                 long writer = writerThread;
 688                 if (reader != 0 || writer != 0) {
 689                     nd.preClose(fd);
 690                     connected = false; // fd is no longer connected socket
 691 
 692                     if (reader != 0)
 693                         NativeThread.signal(reader);
 694                     if (writer != 0)
 695                         NativeThread.signal(writer);
 696 
 697                     // wait for blocking I/O operations to end
 698                     while (readerThread != 0 || writerThread != 0) {
 699                         try {
 700                             stateLock.wait();
 701                         } catch (InterruptedException e) {
 702                             interrupted = true;
 703                         }
 704                     }
 705                 }
 706             }
 707         } else {
 708             // non-blocking mode: wait for read/write to complete
 709             readLock.lock();
 710             try {
 711                 writeLock.lock();
 712                 writeLock.unlock();
 713             } finally {
 714                 readLock.unlock();
 715             }
 716         }
 717 
 718         // set state to ST_KILLPENDING
 719         synchronized (stateLock) {
 720             assert state == ST_CLOSING;
 721             // if connected, and the channel is registered with a Selector, we
 722             // shutdown the output so that the peer reads EOF
 723             if (connected && isRegistered()) {
 724                 try {
 725                     RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 726                 } catch (IOException ignore) { }
 727             }
 728             state = ST_KILLPENDING;
 729         }
 730 
 731         // close socket if not registered with Selector
 732         if (!isRegistered())
 733             kill();
 734 
 735         // restore interrupt status
 736         if (interrupted)
 737             Thread.currentThread().interrupt();
 738     }
 739 
 740     @Override
 741     public void kill() throws IOException {
 742         synchronized (stateLock) {
 743             if (state == ST_KILLPENDING) {
 744                 state = ST_KILLED;
 745                 nd.close(fd);
 746             }
 747         }
 748     }
 749 
 750     @Override
 751     public SocketChannel shutdownInput() throws IOException {
 752         synchronized (stateLock) {
 753             ensureOpen();
 754             if (!isConnected())
 755                 throw new NotYetConnectedException();
 756             if (!isInputClosed) {
 757                 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD);
 758                 long thread = readerThread;
 759                 if (thread != 0)
 760                     NativeThread.signal(thread);
 761                 isInputClosed = true;
 762             }
 763             return this;
 764         }
 765     }
 766 
 767     @Override
 768     public SocketChannel shutdownOutput() throws IOException {
 769         synchronized (stateLock) {
 770             ensureOpen();
 771             if (!isConnected())
 772                 throw new NotYetConnectedException();
 773             if (!isOutputClosed) {
 774                 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 775                 long thread = writerThread;
 776                 if (thread != 0)
 777                     NativeThread.signal(thread);
 778                 isOutputClosed = true;
 779             }
 780             return this;
 781         }
 782     }
 783 
 784     boolean isInputOpen() {
 785         return !isInputClosed;
 786     }
 787 
 788     boolean isOutputOpen() {
 789         return !isOutputClosed;
 790     }
 791 
 792     /**
 793      * Poll this channel's socket for reading up to the given timeout.
 794      * @return {@code true} if the socket is polled
 795      */
 796     boolean pollRead(long timeout) throws IOException {
 797         boolean blocking = isBlocking();
 798         assert Thread.holdsLock(blockingLock()) && blocking;
 799 
 800         readLock.lock();
 801         try {
 802             boolean polled = false;
 803             try {
 804                 beginRead(blocking);
 805                 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout);
 806                 polled = (events != 0);
 807             } finally {
 808                 endRead(blocking, polled);
 809             }
 810             return polled;
 811         } finally {
 812             readLock.unlock();
 813         }
 814     }
 815 
 816     /**
 817      * Poll this channel's socket for a connection, up to the given timeout.
 818      * @return {@code true} if the socket is polled
 819      */
 820     boolean pollConnected(long timeout) throws IOException {
 821         boolean blocking = isBlocking();
 822         assert Thread.holdsLock(blockingLock()) && blocking;
 823 
 824         readLock.lock();
 825         try {
 826             writeLock.lock();
 827             try {
 828                 boolean polled = false;
 829                 try {
 830                     beginFinishConnect(blocking);
 831                     int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout);
 832                     polled = (events != 0);
 833                 } finally {
 834                     // invoke endFinishConnect with completed = false so that
 835                     // the state is not changed to ST_CONNECTED. The socket
 836                     // adaptor will use finishConnect to finish.
 837                     endFinishConnect(blocking, /*completed*/false);
 838                 }
 839                 return polled;
 840             } finally {
 841                 writeLock.unlock();
 842             }
 843         } finally {
 844             readLock.unlock();
 845         }
 846     }
 847 
 848     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
 849         int intOps = ski.nioInterestOps();
 850         int oldOps = ski.nioReadyOps();
 851         int newOps = initialOps;
 852 
 853         if ((ops & Net.POLLNVAL) != 0) {
 854             return false;
 855         }
 856 
 857         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 858             newOps = intOps;
 859             ski.nioReadyOps(newOps);
 860             return (newOps & ~oldOps) != 0;
 861         }
 862 
 863         boolean connected = isConnected();
 864         if (((ops & Net.POLLIN) != 0) &&
 865             ((intOps & SelectionKey.OP_READ) != 0) && connected)
 866             newOps |= SelectionKey.OP_READ;
 867 
 868         if (((ops & Net.POLLCONN) != 0) &&
 869             ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
 870             newOps |= SelectionKey.OP_CONNECT;
 871 
 872         if (((ops & Net.POLLOUT) != 0) &&
 873             ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
 874             newOps |= SelectionKey.OP_WRITE;
 875 
 876         ski.nioReadyOps(newOps);
 877         return (newOps & ~oldOps) != 0;
 878     }
 879 
 880     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 881         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 882     }
 883 
 884     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 885         return translateReadyOps(ops, 0, ski);
 886     }
 887 
 888     public int translateInterestOps(int ops) {
 889         int newOps = 0;
 890         if ((ops & SelectionKey.OP_READ) != 0)
 891             newOps |= Net.POLLIN;
 892         if ((ops & SelectionKey.OP_WRITE) != 0)
 893             newOps |= Net.POLLOUT;
 894         if ((ops & SelectionKey.OP_CONNECT) != 0)
 895             newOps |= Net.POLLCONN;
 896         return newOps;
 897     }
 898 
 899     public FileDescriptor getFD() {
 900         return fd;
 901     }
 902 
 903     public int getFDVal() {
 904         return fdVal;
 905     }
 906 
 907     @Override
 908     public String toString() {
 909         StringBuilder sb = new StringBuilder();
 910         sb.append(this.getClass().getSuperclass().getName());
 911         sb.append('[');
 912         if (!isOpen())
 913             sb.append("closed");
 914         else {
 915             synchronized (stateLock) {
 916                 switch (state) {
 917                 case ST_UNCONNECTED:
 918                     sb.append("unconnected");
 919                     break;
 920                 case ST_CONNECTIONPENDING:
 921                     sb.append("connection-pending");
 922                     break;
 923                 case ST_CONNECTED:
 924                     sb.append("connected");
 925                     if (isInputClosed)
 926                         sb.append(" ishut");
 927                     if (isOutputClosed)
 928                         sb.append(" oshut");
 929                     break;
 930                 }
 931                 InetSocketAddress addr = localAddress();
 932                 if (addr != null) {
 933                     sb.append(" local=");
 934                     sb.append(RdmaNet.getRevealedLocalAddressAsString(addr));
 935                 }
 936                 if (remoteAddress() != null) {
 937                     sb.append(" remote=");
 938                     sb.append(remoteAddress().toString());
 939                 }
 940             }
 941         }
 942         sb.append(']');
 943         return sb.toString();
 944     }
 945 
 946     // -- Native methods --
 947 
 948     private static native void initIDs();
 949 
 950     private static native int checkConnect(FileDescriptor fd, boolean block)
 951         throws IOException;
 952 
 953     private static native int sendOutOfBandData(FileDescriptor fd, byte data)
 954         throws IOException;
 955 
 956     static {
 957         IOUtil.load();
 958         initIDs();
 959         nd = new RdmaSocketDispatcher();
 960     }
 961 
 962 }