1 /*
   2  * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package rdma.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.InetAddress;
  31 import java.net.InetSocketAddress;
  32 import java.net.Socket;
  33 import java.net.SocketAddress;
  34 import java.net.SocketOption;
  35 import java.net.StandardSocketOptions;
  36 import java.nio.channels.AlreadyBoundException;
  37 import java.nio.channels.ConnectionPendingException;
  38 import java.nio.channels.NotYetConnectedException;
  39 import java.nio.channels.SocketChannel;
  40 import java.nio.channels.spi.SelectorProvider;
  41 import java.util.Collections;
  42 import java.util.HashSet;
  43 import java.util.Objects;
  44 import java.util.Set;
  45 import sun.nio.ch.NativeThread;
  46 import sun.nio.ch.IOStatus;
  47 import sun.nio.ch.IOUtil;
  48 import sun.nio.ch.SocketChannelImpl;
  49 import sun.net.ext.RdmaSocketOptions;
  50 
  51 public class RdmaSocketChannelImpl
  52     extends SocketChannelImpl
  53 {
  54     // Constructor for normal connecting sockets
  55     
  56     protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException {
  57         super(sp);
  58     }
  59 
  60     protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
  61         throws IOException
  62     {
  63         super(sp, fd, bound);
  64     }
  65 
  66     RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
  67         throws IOException
  68     {
  69         super(sp, fd, isa);
  70     }
  71 
  72     protected FileDescriptor createFD() throws IOException {
  73         return RdmaNet.socket(true);
  74     }
  75 
  76     protected InetSocketAddress createLocalAddress(FileDescriptor fd)
  77         throws IOException {
  78         return RdmaNet.localAddress(fd);
  79     }
  80 
  81     @Override
  82     public SocketAddress getLocalAddress() throws IOException {
  83         synchronized (stateLock) {
  84             ensureOpen();
  85             return RdmaNet.getRevealedLocalAddress(localAddress);
  86         }
  87     }
  88 
  89     @Override
  90     public <T> SocketChannel setOption(SocketOption<T> name, T value)
  91         throws IOException
  92     {
  93         Objects.requireNonNull(name);
  94         if (!supportedOptions().contains(name))
  95             throw new UnsupportedOperationException("'" + name + "' not supported");
  96 
  97         synchronized (stateLock) {
  98             ensureOpen();
  99 
 100             if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) {
 101                 isReuseAddress = (Boolean)value;
 102                 return this;
 103             }
 104 
 105             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value);
 106             return this;
 107         }
 108     }
 109 
 110     @Override
 111     @SuppressWarnings("unchecked")
 112     public <T> T getOption(SocketOption<T> name)
 113         throws IOException
 114     {
 115         Objects.requireNonNull(name);
 116         if (!supportedOptions().contains(name))
 117             throw new UnsupportedOperationException("'" + name + "' not supported");
 118 
 119         synchronized (stateLock) {
 120             ensureOpen();
 121 
 122             if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) {
 123                 return (T)Boolean.valueOf(isReuseAddress);
 124             }
 125 
 126             return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name);
 127         }
 128     }
 129 
 130     private static class DefaultOptionsHolder {
 131         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 132 
 133         private static Set<SocketOption<?>> defaultOptions() {
 134             HashSet<SocketOption<?>> set = new HashSet<>();
 135             set.add(StandardSocketOptions.SO_SNDBUF);
 136             set.add(StandardSocketOptions.SO_RCVBUF);
 137             set.add(StandardSocketOptions.SO_KEEPALIVE);
 138             set.add(StandardSocketOptions.SO_REUSEADDR);
 139             set.add(StandardSocketOptions.TCP_NODELAY);
 140             RdmaSocketOptions rdmaOptions =
 141                     RdmaSocketOptions.getInstance();
 142             set.addAll(rdmaOptions.options());
 143             return Collections.unmodifiableSet(set);
 144         }
 145     }
 146 
 147     public Set<SocketOption<?>> supportedOptions() {
 148          return DefaultOptionsHolder.defaultOptions;
 149     }
 150 
 151     @Override
 152     protected void implConfigureBlocking(boolean block) throws IOException {
 153         readLock.lock();
 154         try {
 155             writeLock.lock();
 156             try {
 157                 synchronized (stateLock) {
 158                     ensureOpen();
 159                     RdmaNet.configureBlocking(fd, block);
 160                 }
 161             } finally {
 162                 writeLock.unlock();
 163             }
 164         } finally {
 165             readLock.unlock();
 166         }
 167     }
 168 
 169     @Override
 170     public SocketChannel bind(SocketAddress local) throws IOException {
 171         readLock.lock();
 172         try {
 173             writeLock.lock();
 174             try {
 175                 synchronized (stateLock) {
 176                     ensureOpen();
 177                     if (state == ST_CONNECTIONPENDING)
 178                         throw new ConnectionPendingException();
 179                     if (localAddress != null)
 180                         throw new AlreadyBoundException();
 181                     InetSocketAddress isa = (local == null) ?
 182                         new InetSocketAddress(0) : RdmaNet.checkAddress(local);
 183                     SecurityManager sm = System.getSecurityManager();
 184                     if (sm != null) {
 185                         sm.checkListen(isa.getPort());
 186                     }
 187                     RdmaNet.bind(fd, isa.getAddress(), isa.getPort());
 188                     localAddress = RdmaNet.localAddress(fd);
 189                 }
 190             } finally {
 191                 writeLock.unlock();
 192             }
 193         } finally {
 194             readLock.unlock();
 195         }
 196         return this;
 197     }
 198 
 199     protected void endConnect(boolean blocking, boolean completed)
 200         throws IOException
 201     {
 202         endRead(blocking, completed);
 203 
 204         if (completed) {
 205             synchronized (stateLock) {
 206                 if (state == ST_CONNECTIONPENDING) {
 207                     localAddress = RdmaNet.localAddress(fd);
 208                     state = ST_CONNECTED;
 209                 }
 210             }
 211         }
 212     }
 213 
 214     @Override
 215     public boolean connect(SocketAddress sa) throws IOException {
 216         InetSocketAddress isa = RdmaNet.checkAddress(sa);
 217         SecurityManager sm = System.getSecurityManager();
 218         if (sm != null)
 219             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
 220 
 221         InetAddress ia = isa.getAddress();
 222         if (ia.isAnyLocalAddress())
 223             ia = InetAddress.getLocalHost();
 224 
 225         try {
 226             readLock.lock();
 227             try {
 228                 writeLock.lock();
 229                 try {
 230                     int n = 0;
 231                     boolean blocking = isBlocking();
 232                     try {
 233                         beginConnect(blocking, isa);
 234                         do {
 235                             n = RdmaNet.connect(fd, ia, isa.getPort());
 236                         } while (n == IOStatus.INTERRUPTED && isOpen());
 237                     } finally {
 238                         endConnect(blocking, (n > 0));
 239                     }
 240                     assert IOStatus.check(n);
 241                     return n > 0;
 242                 } finally {
 243                     writeLock.unlock();
 244                 }
 245             } finally {
 246                 readLock.unlock();
 247             }
 248         } catch (IOException ioe) {
 249             // connect failed, close the channel
 250             close();
 251             throw ioe;
 252         }
 253     }
 254 
 255     protected void endFinishConnect(boolean blocking, boolean completed)
 256         throws IOException
 257     {
 258         endRead(blocking, completed);
 259 
 260         if (completed) {
 261             synchronized (stateLock) {
 262                 if (state == ST_CONNECTIONPENDING) {
 263                     localAddress = RdmaNet.localAddress(fd);
 264                     state = ST_CONNECTED;
 265                 }
 266             }
 267         }
 268     }
 269 
 270     @Override
 271     protected void implCloseSelectableChannel() throws IOException {
 272         assert !isOpen();
 273 
 274         boolean blocking;
 275         boolean connected;
 276         boolean interrupted = false;
 277 
 278         // set state to ST_CLOSING
 279         synchronized (stateLock) {
 280             assert state < ST_CLOSING;
 281             blocking = isBlocking();
 282             connected = (state == ST_CONNECTED);
 283             state = ST_CLOSING;
 284         }
 285 
 286         // wait for any outstanding I/O operations to complete
 287         if (blocking) {
 288             synchronized (stateLock) {
 289                 assert state == ST_CLOSING;
 290                 long reader = readerThread;
 291                 long writer = writerThread;
 292                 if (reader != 0 || writer != 0) {
 293                     ((RdmaSocketDispatcher)nd).preClose(fd);
 294                     connected = false; // fd is no longer connected socket
 295 
 296                     if (reader != 0)
 297                         NativeThread.signal(reader);
 298                     if (writer != 0)
 299                         NativeThread.signal(writer);
 300 
 301                     // wait for blocking I/O operations to end
 302                     while (readerThread != 0 || writerThread != 0) {
 303                         try {
 304                             stateLock.wait();
 305                         } catch (InterruptedException e) {
 306                             interrupted = true;
 307                         }
 308                     }
 309                 }
 310             }
 311         } else {
 312             // non-blocking mode: wait for read/write to complete
 313             readLock.lock();
 314             try {
 315                 writeLock.lock();
 316                 writeLock.unlock();
 317             } finally {
 318                 readLock.unlock();
 319             }
 320         }
 321 
 322         // set state to ST_KILLPENDING
 323         synchronized (stateLock) {
 324             assert state == ST_CLOSING;
 325             // if connected, and the channel is registered with a Selector, we
 326             // shutdown the output so that the peer reads EOF
 327             if (connected && isRegistered()) {
 328                 try {
 329                     RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 330                 } catch (IOException ignore) { }
 331             }
 332             state = ST_KILLPENDING;
 333         }
 334 
 335         // close socket if not registered with Selector
 336         if (!isRegistered())
 337             kill();
 338 
 339         // restore interrupt status
 340         if (interrupted)
 341             Thread.currentThread().interrupt();
 342     }
 343 
 344     @Override
 345     public SocketChannel shutdownInput() throws IOException {
 346         synchronized (stateLock) {
 347             ensureOpen();
 348             if (!isConnected())
 349                 throw new NotYetConnectedException();
 350             if (!isInputClosed) {
 351                 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD);
 352                 long thread = readerThread;
 353                 if (thread != 0)
 354                     NativeThread.signal(thread);
 355                 isInputClosed = true;
 356             }
 357             return this;
 358         }
 359     }
 360 
 361     @Override
 362     public SocketChannel shutdownOutput() throws IOException {
 363         synchronized (stateLock) {
 364             ensureOpen();
 365             if (!isConnected())
 366                 throw new NotYetConnectedException();
 367             if (!isOutputClosed) {
 368                 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR);
 369                 long thread = writerThread;
 370                 if (thread != 0)
 371                     NativeThread.signal(thread);
 372                 isOutputClosed = true;
 373             }
 374             return this;
 375         }
 376     }
 377 
 378     /**
 379      * Poll this channel's socket for reading up to the given timeout.
 380      * @return {@code true} if the socket is polled
 381      */
 382     boolean pollRead(long timeout) throws IOException {
 383         boolean blocking = isBlocking();
 384         assert Thread.holdsLock(blockingLock()) && blocking;
 385 
 386         readLock.lock();
 387         try {
 388             boolean polled = false;
 389             try {
 390                 beginRead(blocking);
 391                 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout);
 392                 polled = (events != 0);
 393             } finally {
 394                 endRead(blocking, polled);
 395             }
 396             return polled;
 397         } finally {
 398             readLock.unlock();
 399         }
 400     }
 401 
 402     /**
 403      * Poll this channel's socket for a connection, up to the given timeout.
 404      * @return {@code true} if the socket is polled
 405      */
 406     boolean pollConnected(long timeout) throws IOException {
 407         boolean blocking = isBlocking();
 408         assert Thread.holdsLock(blockingLock()) && blocking;
 409 
 410         readLock.lock();
 411         try {
 412             writeLock.lock();
 413             try {
 414                 boolean polled = false;
 415                 try {
 416                     beginFinishConnect(blocking);
 417                     int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout);
 418                     polled = (events != 0);
 419                 } finally {
 420                     // invoke endFinishConnect with completed = false so that
 421                     // the state is not changed to ST_CONNECTED. The socket
 422                     // adaptor will use finishConnect to finish.
 423                     endFinishConnect(blocking, /*completed*/false);
 424                 }
 425                 return polled;
 426             } finally {
 427                 writeLock.unlock();
 428             }
 429         } finally {
 430             readLock.unlock();
 431         }
 432     }
 433 
 434     @Override
 435     public String toString() {
 436         StringBuilder sb = new StringBuilder();
 437         sb.append(this.getClass().getSuperclass().getName());
 438         sb.append('[');
 439         if (!isOpen())
 440             sb.append("closed");
 441         else {
 442             synchronized (stateLock) {
 443                 switch (state) {
 444                 case ST_UNCONNECTED:
 445                     sb.append("unconnected");
 446                     break;
 447                 case ST_CONNECTIONPENDING:
 448                     sb.append("connection-pending");
 449                     break;
 450                 case ST_CONNECTED:
 451                     sb.append("connected");
 452                     if (isInputClosed)
 453                         sb.append(" ishut");
 454                     if (isOutputClosed)
 455                         sb.append(" oshut");
 456                     break;
 457                 }
 458                 InetSocketAddress addr = localAddress();
 459                 if (addr != null) {
 460                     sb.append(" local=");
 461                     sb.append(RdmaNet.getRevealedLocalAddressAsString(addr));
 462                 }
 463                 if (remoteAddress() != null) {
 464                     sb.append(" remote=");
 465                     sb.append(remoteAddress().toString());
 466                 }
 467             }
 468         }
 469         sb.append(']');
 470         return sb.toString();
 471     }
 472 
 473 
 474     // -- Native methods --
 475 
 476     private static native int checkConnect(FileDescriptor fd, boolean block)
 477         throws IOException;
 478 
 479     private static native int sendOutOfBandData(FileDescriptor fd, byte data)
 480         throws IOException;
 481 
 482     static {
 483         IOUtil.load();
 484         nd = new RdmaSocketDispatcher();
 485     }
 486 
 487 }