--- /dev/null 2018-06-20 11:01:03.657616567 -0700 +++ new/src/jdk.net/share/classes/rdma/ch/RdmaServerSocketChannelImpl.java 2018-06-22 12:23:57.806671903 -0700 @@ -0,0 +1,479 @@ +/* + * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package rdma.ch; + +import java.io.FileDescriptor; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.net.SocketOption; +import java.net.StandardSocketOptions; +import java.nio.channels.AlreadyBoundException; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NotYetBoundException; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import sun.nio.ch.IOStatus; +import sun.nio.ch.IOUtil; +import sun.nio.ch.NativeThread; +import sun.nio.ch.SelChImpl; +import sun.nio.ch.SelectionKeyImpl; +import sun.net.ext.RdmaSocketOptions; + +public class RdmaServerSocketChannelImpl + extends ServerSocketChannel + implements SelChImpl +{ + private static RdmaSocketDispatcher nd; + + private final FileDescriptor fd; + private final int fdVal; + + private final ReentrantLock acceptLock = new ReentrantLock(); + + private final Object stateLock = new Object(); + + private static final int ST_INUSE = 0; + private static final int ST_CLOSING = 1; + private static final int ST_KILLPENDING = 2; + private static final int ST_KILLED = 3; + private int state; + + private long thread; + + private InetSocketAddress localAddress; + + private boolean isReuseAddress; + + private ServerSocket socket; + + RdmaServerSocketChannelImpl(SelectorProvider sp) throws IOException { + super(sp); + this.fd = RdmaNet.serverSocket(true); + this.fdVal = IOUtil.fdVal(fd); + } + + RdmaServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) + throws IOException + { + super(sp); + this.fd = fd; + this.fdVal = IOUtil.fdVal(fd); + if (bound) { + synchronized (stateLock) { + localAddress = RdmaNet.localAddress(fd); + } + } + } + + private void ensureOpen() throws ClosedChannelException { + if (!isOpen()) + throw new ClosedChannelException(); + } + + @Override + public ServerSocket socket() { + synchronized (stateLock) { + if (socket == null) + socket = RdmaServerSocketAdaptor.create(this); + return socket; + } + } + + @Override + public SocketAddress getLocalAddress() throws IOException { + synchronized (stateLock) { + ensureOpen(); + return (localAddress == null) + ? null + : RdmaNet.getRevealedLocalAddress(localAddress); + } + } + + @Override + public ServerSocketChannel setOption(SocketOption name, T value) + throws IOException + { + Objects.requireNonNull(name); + if (!supportedOptions().contains(name)) + throw new UnsupportedOperationException("'" + name + "' not supported"); + synchronized (stateLock) { + ensureOpen(); + + if (name == StandardSocketOptions.SO_REUSEADDR && + RdmaNet.useExclusiveBind()) { + isReuseAddress = (Boolean)value; + return this; + } + + if (isBound() && (name == StandardSocketOptions.SO_REUSEADDR || + name == StandardSocketOptions.SO_RCVBUF)) + throw new UnsupportedOperationException( + "RDMA server socket channel cannot set the socket option " + + name.toString() + " after bind."); + + RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); + return this; + } + } + + @Override + @SuppressWarnings("unchecked") + public T getOption(SocketOption name) + throws IOException + { + Objects.requireNonNull(name); + if (!supportedOptions().contains(name)) + throw new UnsupportedOperationException("'" + name + "' not supported"); + + synchronized (stateLock) { + ensureOpen(); + if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { + return (T)Boolean.valueOf(isReuseAddress); + } + return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); + } + } + + private static class DefaultOptionsHolder { + static final Set> defaultOptions = defaultOptions(); + + private static Set> defaultOptions() { + HashSet> set = new HashSet<>(2); + set.add(StandardSocketOptions.SO_RCVBUF); + set.add(StandardSocketOptions.SO_REUSEADDR); + if (RdmaNet.isRdmaAvailable()) { + RdmaSocketOptions rdmaOptions = + RdmaSocketOptions.getInstance(); + set.addAll(rdmaOptions.options()); + } + return Collections.unmodifiableSet(set); + } + } + + public final Set> supportedOptions() { + return DefaultOptionsHolder.defaultOptions; + } + + @Override + public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { + synchronized (stateLock) { + ensureOpen(); + if (localAddress != null) + throw new AlreadyBoundException(); + InetSocketAddress isa = (local == null) + ? new InetSocketAddress(0) + : RdmaNet.checkAddress(local); + SecurityManager sm = System.getSecurityManager(); + if (sm != null) + sm.checkListen(isa.getPort()); + RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); + RdmaNet.listen(fd, backlog < 1 ? 50 : backlog); + localAddress = RdmaNet.localAddress(fd); + } + return this; + } + + private void begin(boolean blocking) throws ClosedChannelException { + if (blocking) + begin(); + synchronized (stateLock) { + ensureOpen(); + if (localAddress == null) + throw new NotYetBoundException(); + if (blocking) + thread = NativeThread.current(); + } + } + + private void end(boolean blocking, boolean completed) + throws AsynchronousCloseException + { + if (blocking) { + synchronized (stateLock) { + thread = 0; + if (state == ST_CLOSING) { + stateLock.notifyAll(); + } + } + end(completed); + } + } + + @Override + public SocketChannel accept() throws IOException { + acceptLock.lock(); + try { + int n = 0; + FileDescriptor newfd = new FileDescriptor(); + InetSocketAddress[] isaa = new InetSocketAddress[1]; + + boolean blocking = isBlocking(); + try { + begin(blocking); + do { + n = accept(this.fd, newfd, isaa); + } while (n == IOStatus.INTERRUPTED && isOpen()); + } finally { + end(blocking, n > 0); + assert IOStatus.check(n); + } + + if (n < 1) + return null; + + // newly accepted socket is initially in blocking mode + RdmaNet.configureBlocking(newfd, true); + + InetSocketAddress isa = isaa[0]; + SocketChannel sc = new RdmaSocketChannelImpl(provider(), newfd, isa); + + // check permitted to accept connections from the remote address + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + try { + sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); + } catch (SecurityException x) { + sc.close(); + throw x; + } + } + return sc; + + } finally { + acceptLock.unlock(); + } + } + + @Override + protected void implConfigureBlocking(boolean block) throws IOException { + acceptLock.lock(); + try { + synchronized (stateLock) { + ensureOpen(); + RdmaNet.configureBlocking(fd, block); + } + } finally { + acceptLock.unlock(); + } + } + + @Override + protected void implCloseSelectableChannel() throws IOException { + assert !isOpen(); + + boolean interrupted = false; + boolean blocking; + + // set state to ST_CLOSING + synchronized (stateLock) { + assert state < ST_CLOSING; + state = ST_CLOSING; + blocking = isBlocking(); + } + + // wait for any outstanding accept to complete + if (blocking) { + synchronized (stateLock) { + assert state == ST_CLOSING; + long th = thread; + if (th != 0) { + nd.preClose(fd); + NativeThread.signal(th); + + // wait for accept operation to end + while (thread != 0) { + try { + stateLock.wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + } + } + } else { + // non-blocking mode: wait for accept to complete + acceptLock.lock(); + acceptLock.unlock(); + } + + // set state to ST_KILLPENDING + synchronized (stateLock) { + assert state == ST_CLOSING; + state = ST_KILLPENDING; + } + + // close socket if not registered with Selector + if (!isRegistered()) + kill(); + + // restore interrupt status + if (interrupted) + Thread.currentThread().interrupt(); + } + + @Override + public void kill() throws IOException { + synchronized (stateLock) { + if (state == ST_KILLPENDING) { + state = ST_KILLED; + nd.close(fd); + } + } + } + + boolean isBound() { + synchronized (stateLock) { + return localAddress != null; + } + } + + InetSocketAddress localAddress() { + synchronized (stateLock) { + return localAddress; + } + } + + /** + * Poll this channel's socket for a new connection up to the given timeout. + * @return {@code true} if there is a connection to accept + */ + boolean pollAccept(long timeout) throws IOException { + assert Thread.holdsLock(blockingLock()) && isBlocking(); + acceptLock.lock(); + try { + boolean polled = false; + try { + begin(true); + int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout); + polled = (events != 0); + } finally { + end(true, polled); + } + return polled; + } finally { + acceptLock.unlock(); + } + } + + public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { + int intOps = ski.nioInterestOps(); + int oldOps = ski.nioReadyOps(); + int newOps = initialOps; + + if ((ops & RdmaNet.POLLNVAL) != 0) { + return false; + } + + if ((ops & (RdmaNet.POLLERR | RdmaNet.POLLHUP)) != 0) { + newOps = intOps; + ski.nioReadyOps(newOps); + return (newOps & ~oldOps) != 0; + } + + if (((ops & RdmaNet.POLLIN) != 0) && + ((intOps & SelectionKey.OP_ACCEPT) != 0)) + newOps |= SelectionKey.OP_ACCEPT; + + ski.nioReadyOps(newOps); + return (newOps & ~oldOps) != 0; + } + + public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { + return translateReadyOps(ops, ski.nioReadyOps(), ski); + } + + public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { + return translateReadyOps(ops, 0, ski); + } + + public int translateInterestOps(int ops) { + int newOps = 0; + if ((ops & SelectionKey.OP_ACCEPT) != 0) + newOps |= RdmaNet.POLLIN; + return newOps; + } + + public FileDescriptor getFD() { + return fd; + } + + public int getFDVal() { + return fdVal; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.getClass().getName()); + sb.append('['); + if (!isOpen()) { + sb.append("closed"); + } else { + synchronized (stateLock) { + InetSocketAddress addr = localAddress; + if (addr == null) { + sb.append("unbound"); + } else { + sb.append(RdmaNet.getRevealedLocalAddressAsString(addr)); + } + } + } + sb.append(']'); + return sb.toString(); + } + + private int accept(FileDescriptor ssfd, + FileDescriptor newfd, + InetSocketAddress[] isaa) + throws IOException + { + return accept0(ssfd, newfd, isaa); + } + + // -- Native methods -- + + private native int accept0(FileDescriptor ssfd, + FileDescriptor newfd, + InetSocketAddress[] isaa) + throws IOException; + + private static native void initIDs(); + + static { + IOUtil.load(); + initIDs(); + nd = new RdmaSocketDispatcher(); + } +}