1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.rdma;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.InetSocketAddress;
  31 import java.net.ServerSocket;
  32 import java.net.SocketAddress;
  33 import java.net.SocketOption;
  34 import java.net.StandardSocketOptions;
  35 import java.nio.channels.AlreadyBoundException;
  36 import java.nio.channels.AsynchronousCloseException;
  37 import java.nio.channels.ClosedChannelException;
  38 import java.nio.channels.NotYetBoundException;
  39 import java.nio.channels.SelectionKey;
  40 import java.nio.channels.ServerSocketChannel;
  41 import java.nio.channels.SocketChannel;
  42 import java.nio.channels.spi.SelectorProvider;
  43 import java.util.Collections;
  44 import java.util.HashSet;
  45 import java.util.Objects;
  46 import java.util.Set;
  47 import java.util.concurrent.locks.ReentrantLock;
  48 import sun.nio.ch.IOStatus;
  49 import sun.nio.ch.IOUtil;
  50 import sun.nio.ch.NativeThread;
  51 import sun.nio.ch.Net;
  52 import sun.nio.ch.SelChImpl;
  53 import sun.nio.ch.SelectionKeyImpl;
  54 import sun.net.ext.RdmaSocketOptions;
  55 
  56 public class RdmaServerSocketChannelImpl
  57     extends ServerSocketChannel
  58     implements SelChImpl
  59 {
  60     private static RdmaSocketDispatcher nd;
  61 
  62     private final FileDescriptor fd;
  63     private final int fdVal;
  64 
  65     private final ReentrantLock acceptLock = new ReentrantLock();
  66 
  67     private final Object stateLock = new Object();
  68 
  69     private static final int ST_INUSE = 0;
  70     private static final int ST_CLOSING = 1;
  71     private static final int ST_KILLPENDING = 2;
  72     private static final int ST_KILLED = 3;
  73     private int state;
  74 
  75     private long thread;
  76 
  77     private InetSocketAddress localAddress;
  78 
  79     private boolean isReuseAddress;
  80 
  81     private ServerSocket socket;
  82 
  83     private static final UnsupportedOperationException unsupported;
  84 
  85     private static final SelectorProvider checkSupported(SelectorProvider sp) {
  86         if (unsupported != null)
  87             throw new UnsupportedOperationException(unsupported.getMessage(), unsupported);
  88         else
  89             return sp;
  90     }
  91 
  92     RdmaServerSocketChannelImpl(SelectorProvider sp) throws IOException {
  93         super(checkSupported(sp));
  94         this.fd =  RdmaNet.serverSocket();
  95         this.fdVal = IOUtil.fdVal(fd);
  96     }
  97 
  98     RdmaServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
  99         throws IOException
 100     {
 101         super(checkSupported(sp));
 102         this.fd =  fd;
 103         this.fdVal = IOUtil.fdVal(fd);
 104         if (bound) {
 105             synchronized (stateLock) {
 106                 localAddress = RdmaNet.localAddress(fd);
 107             }
 108         }
 109     }
 110 
 111     private void ensureOpen() throws ClosedChannelException {
 112         if (!isOpen())
 113             throw new ClosedChannelException();
 114     }
 115 
 116     @Override
 117     public ServerSocket socket() {
 118         synchronized (stateLock) {
 119             if (socket == null)
 120                 socket = RdmaServerSocketAdaptor.create(this);
 121             return socket;
 122         }
 123     }
 124 
 125     @Override
 126     public SocketAddress getLocalAddress() throws IOException {
 127         synchronized (stateLock) {
 128             ensureOpen();
 129             return (localAddress == null)
 130                     ? null
 131                     : Net.getRevealedLocalAddress(localAddress);
 132         }
 133     }
 134 
 135     @Override
 136     public <T> ServerSocketChannel setOption(SocketOption<T> name, T value)
 137         throws IOException
 138     {
 139         Objects.requireNonNull(name);
 140         if (!supportedOptions().contains(name))
 141             throw new UnsupportedOperationException("'" + name + "' not supported");
 142         synchronized (stateLock) {
 143             ensureOpen();
 144 
 145             if (name == StandardSocketOptions.SO_REUSEADDR &&
 146                     RdmaNet.useExclusiveBind()) {
 147                 isReuseAddress = (Boolean)value;
 148                 return this;
 149             }
 150 
 151             if (isBound() && (name == StandardSocketOptions.SO_REUSEADDR))
 152                 throw new UnsupportedOperationException(
 153                         "RDMA server socket channel cannot set the socket option "
 154                         + name.toString() + " after bind.");
 155 
 156             RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value);
 157             return this;
 158         }
 159     }
 160 
 161     @Override
 162     @SuppressWarnings("unchecked")
 163     public <T> T getOption(SocketOption<T> name)
 164         throws IOException
 165     {
 166         Objects.requireNonNull(name);
 167         if (!supportedOptions().contains(name))
 168             throw new UnsupportedOperationException("'" + name + "' not supported");
 169 
 170         synchronized (stateLock) {
 171             ensureOpen();
 172             if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) {
 173                 return (T)Boolean.valueOf(isReuseAddress);
 174             }
 175             return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name);
 176         }
 177     }
 178 
 179     private static class DefaultOptionsHolder {
 180         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 181 
 182         private static Set<SocketOption<?>> defaultOptions() {
 183             HashSet<SocketOption<?>> set = new HashSet<>(2);
 184             set.add(StandardSocketOptions.SO_RCVBUF);
 185             set.add(StandardSocketOptions.SO_REUSEADDR);
 186             if (RdmaNet.isRdmaAvailable()) {
 187                 RdmaSocketOptions rdmaOptions =
 188                         RdmaSocketOptions.getInstance();
 189                 set.addAll(rdmaOptions.options());
 190             }
 191             return Collections.unmodifiableSet(set);
 192         }
 193     }
 194 
 195     public final Set<SocketOption<?>> supportedOptions() {
 196         return DefaultOptionsHolder.defaultOptions;
 197     }
 198 
 199     @Override
 200     public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
 201         synchronized (stateLock) {
 202             ensureOpen();
 203             if (localAddress != null)
 204                 throw new AlreadyBoundException();
 205             InetSocketAddress isa = (local == null)
 206                                     ? new InetSocketAddress(0)
 207                                     : Net.checkAddress(local);
 208             SecurityManager sm = System.getSecurityManager();
 209             if (sm != null)
 210                 sm.checkListen(isa.getPort());
 211             RdmaNet.bind(fd, isa.getAddress(), isa.getPort());
 212             RdmaNet.listen(fd, backlog < 1 ? 50 : backlog);
 213             localAddress = RdmaNet.localAddress(fd);
 214         }
 215         return this;
 216     }
 217 
 218     private void begin(boolean blocking) throws ClosedChannelException {
 219         if (blocking)
 220             begin();
 221         synchronized (stateLock) {
 222             ensureOpen();
 223             if (localAddress == null)
 224                 throw new NotYetBoundException();
 225             if (blocking)
 226                 thread = NativeThread.current();
 227         }
 228     }
 229 
 230     private void end(boolean blocking, boolean completed)
 231         throws AsynchronousCloseException
 232     {
 233         if (blocking) {
 234             synchronized (stateLock) {
 235                 thread = 0;
 236                 if (state == ST_CLOSING) {
 237                     stateLock.notifyAll();
 238                 }
 239             }
 240             end(completed);
 241         }
 242     }
 243 
 244     @Override
 245     public SocketChannel accept() throws IOException {
 246         acceptLock.lock();
 247         try {
 248             int n = 0;
 249             FileDescriptor newfd = new FileDescriptor();
 250             InetSocketAddress[] isaa = new InetSocketAddress[1];
 251 
 252             boolean blocking = isBlocking();
 253             try {
 254                 begin(blocking);
 255                 do {
 256                     n = accept(this.fd, newfd, isaa);
 257                 } while (n == IOStatus.INTERRUPTED && isOpen());
 258             } finally {
 259                 end(blocking, n > 0);
 260                 assert IOStatus.check(n);
 261             }
 262 
 263             if (n < 1)
 264                 return null;
 265 
 266             // newly accepted socket is initially in blocking mode
 267             RdmaNet.configureBlocking(newfd, true);
 268 
 269             InetSocketAddress isa = isaa[0];
 270             SocketChannel sc = new RdmaSocketChannelImpl(provider(), newfd, isa);
 271 
 272             // check permitted to accept connections from the remote address
 273             SecurityManager sm = System.getSecurityManager();
 274             if (sm != null) {
 275                 try {
 276                     sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
 277                 } catch (SecurityException x) {
 278                     sc.close();
 279                     throw x;
 280                 }
 281             }
 282             return sc;
 283 
 284         } finally {
 285             acceptLock.unlock();
 286         }
 287     }
 288 
 289     @Override
 290     protected void implConfigureBlocking(boolean block) throws IOException {
 291         acceptLock.lock();
 292         try {
 293             synchronized (stateLock) {
 294                 ensureOpen();
 295                 RdmaNet.configureBlocking(fd, block);
 296             }
 297         } finally {
 298             acceptLock.unlock();
 299         }
 300     }
 301 
 302     @Override
 303     protected void implCloseSelectableChannel() throws IOException {
 304         assert !isOpen();
 305 
 306         boolean interrupted = false;
 307         boolean blocking;
 308 
 309         // set state to ST_CLOSING
 310         synchronized (stateLock) {
 311             assert state < ST_CLOSING;
 312             state = ST_CLOSING;
 313             blocking = isBlocking();
 314         }
 315 
 316         // wait for any outstanding accept to complete
 317         if (blocking) {
 318             synchronized (stateLock) {
 319                 assert state == ST_CLOSING;
 320                 long th = thread;
 321                 if (th != 0) {
 322                     nd.preClose(fd);
 323                     NativeThread.signal(th);
 324 
 325                     // wait for accept operation to end
 326                     while (thread != 0) {
 327                         try {
 328                             stateLock.wait();
 329                         } catch (InterruptedException e) {
 330                             interrupted = true;
 331                         }
 332                     }
 333                 }
 334             }
 335         } else {
 336             // non-blocking mode: wait for accept to complete
 337             acceptLock.lock();
 338             acceptLock.unlock();
 339         }
 340 
 341         // set state to ST_KILLPENDING
 342         synchronized (stateLock) {
 343             assert state == ST_CLOSING;
 344             state = ST_KILLPENDING;
 345         }
 346 
 347         // close socket if not registered with Selector
 348         if (!isRegistered())
 349             kill();
 350 
 351         // restore interrupt status
 352         if (interrupted)
 353             Thread.currentThread().interrupt();
 354     }
 355 
 356     @Override
 357     public void kill() throws IOException {
 358         synchronized (stateLock) {
 359             if (state == ST_KILLPENDING) {
 360                 state = ST_KILLED;
 361                 nd.close(fd);
 362             }
 363         }
 364     }
 365 
 366     boolean isBound() {
 367         synchronized (stateLock) {
 368             return localAddress != null;
 369         }
 370     }
 371 
 372     InetSocketAddress localAddress() {
 373         synchronized (stateLock) {
 374             return localAddress;
 375         }
 376     }
 377 
 378     /**
 379      * Poll this channel's socket for a new connection up to the given timeout.
 380      * @return {@code true} if there is a connection to accept
 381      */
 382     boolean pollAccept(long timeout) throws IOException {
 383         assert Thread.holdsLock(blockingLock()) && isBlocking();
 384         acceptLock.lock();
 385         try {
 386             boolean polled = false;
 387             try {
 388                 begin(true);
 389                 int events = RdmaNet.poll(fd, Net.POLLIN, timeout);
 390                 polled = (events != 0);
 391             } finally {
 392                 end(true, polled);
 393             }
 394             return polled;
 395         } finally {
 396             acceptLock.unlock();
 397         }
 398     }
 399 
 400     public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
 401         int intOps = ski.nioInterestOps();
 402         int oldOps = ski.nioReadyOps();
 403         int newOps = initialOps;
 404 
 405         if ((ops & Net.POLLNVAL) != 0) {
 406             return false;
 407         }
 408 
 409         if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
 410             newOps = intOps;
 411             ski.nioReadyOps(newOps);
 412             return (newOps & ~oldOps) != 0;
 413         }
 414 
 415         if (((ops & Net.POLLIN) != 0) &&
 416             ((intOps & SelectionKey.OP_ACCEPT) != 0))
 417                 newOps |= SelectionKey.OP_ACCEPT;
 418 
 419         ski.nioReadyOps(newOps);
 420         return (newOps & ~oldOps) != 0;
 421     }
 422 
 423     public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
 424         return translateReadyOps(ops, ski.nioReadyOps(), ski);
 425     }
 426 
 427     public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
 428         return translateReadyOps(ops, 0, ski);
 429     }
 430 
 431     public int translateInterestOps(int ops) {
 432         int newOps = 0;
 433         if ((ops & SelectionKey.OP_ACCEPT) != 0)
 434             newOps |= Net.POLLIN;
 435         return newOps;
 436     }
 437 
 438     public FileDescriptor getFD() {
 439         return fd;
 440     }
 441 
 442     public int getFDVal() {
 443         return fdVal;
 444     }
 445 
 446     public String toString() {
 447         StringBuilder sb = new StringBuilder();
 448         sb.append(this.getClass().getName());
 449         sb.append('[');
 450         if (!isOpen()) {
 451             sb.append("closed");
 452         } else {
 453             synchronized (stateLock) {
 454                 InetSocketAddress addr = localAddress;
 455                 if (addr == null) {
 456                     sb.append("unbound");
 457                 } else {
 458                     sb.append(Net.getRevealedLocalAddressAsString(addr));
 459                 }
 460             }
 461         }
 462         sb.append(']');
 463         return sb.toString();
 464     }
 465 
 466     private int accept(FileDescriptor ssfd,
 467                        FileDescriptor newfd,
 468                        InetSocketAddress[] isaa)
 469         throws IOException
 470     {
 471         return accept0(ssfd, newfd, isaa);
 472     }
 473 
 474     // -- Native methods --
 475 
 476     private native int accept0(FileDescriptor ssfd,
 477                                FileDescriptor newfd,
 478                                InetSocketAddress[] isaa)
 479         throws IOException;
 480 
 481     private static native void initIDs()throws UnsupportedOperationException;
 482 
 483     static {
 484         IOUtil.load();
 485         System.loadLibrary("extnet");
 486         UnsupportedOperationException uoe = null;
 487         try {
 488             initIDs();
 489         } catch (UnsupportedOperationException e) {
 490             uoe = e;
 491         }
 492         unsupported = uoe;
 493         nd = new RdmaSocketDispatcher();
 494     }
 495 }