/* * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package rdma.ch; import java.io.FileDescriptor; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketOption; import java.net.StandardSocketOptions; import java.nio.channels.AlreadyBoundException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.NotYetConnectedException; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; import sun.nio.ch.NativeThread; import sun.nio.ch.IOStatus; import sun.nio.ch.IOUtil; import sun.nio.ch.SocketChannelImpl; import sun.net.ext.RdmaSocketOptions; public class RdmaSocketChannelImpl extends SocketChannelImpl { // Constructor for normal connecting sockets protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); } protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) throws IOException { super(sp, fd, bound); } RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) throws IOException { super(sp, fd, isa); } protected FileDescriptor createFD() throws IOException { return RdmaNet.socket(true); } protected InetSocketAddress createLocalAddress(FileDescriptor fd) throws IOException { return RdmaNet.localAddress(fd); } @Override public SocketAddress getLocalAddress() throws IOException { synchronized (stateLock) { ensureOpen(); return RdmaNet.getRevealedLocalAddress(localAddress); } } @Override public SocketChannel setOption(SocketOption name, T value) throws IOException { Objects.requireNonNull(name); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ensureOpen(); if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { isReuseAddress = (Boolean)value; return this; } RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); return this; } } @Override @SuppressWarnings("unchecked") public T getOption(SocketOption name) throws IOException { Objects.requireNonNull(name); if (!supportedOptions().contains(name)) throw new UnsupportedOperationException("'" + name + "' not supported"); synchronized (stateLock) { ensureOpen(); if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { return (T)Boolean.valueOf(isReuseAddress); } return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); } } private static class DefaultOptionsHolder { static final Set> defaultOptions = defaultOptions(); private static Set> defaultOptions() { HashSet> set = new HashSet<>(); set.add(StandardSocketOptions.SO_SNDBUF); set.add(StandardSocketOptions.SO_RCVBUF); set.add(StandardSocketOptions.SO_KEEPALIVE); set.add(StandardSocketOptions.SO_REUSEADDR); set.add(StandardSocketOptions.TCP_NODELAY); RdmaSocketOptions rdmaOptions = RdmaSocketOptions.getInstance(); set.addAll(rdmaOptions.options()); return Collections.unmodifiableSet(set); } } public Set> supportedOptions() { return DefaultOptionsHolder.defaultOptions; } @Override protected void implConfigureBlocking(boolean block) throws IOException { readLock.lock(); try { writeLock.lock(); try { synchronized (stateLock) { ensureOpen(); RdmaNet.configureBlocking(fd, block); } } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } @Override public SocketChannel bind(SocketAddress local) throws IOException { readLock.lock(); try { writeLock.lock(); try { synchronized (stateLock) { ensureOpen(); if (state == ST_CONNECTIONPENDING) throw new ConnectionPendingException(); if (localAddress != null) throw new AlreadyBoundException(); InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) : RdmaNet.checkAddress(local); SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkListen(isa.getPort()); } RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); localAddress = RdmaNet.localAddress(fd); } } finally { writeLock.unlock(); } } finally { readLock.unlock(); } return this; } protected void endConnect(boolean blocking, boolean completed) throws IOException { endRead(blocking, completed); if (completed) { synchronized (stateLock) { if (state == ST_CONNECTIONPENDING) { localAddress = RdmaNet.localAddress(fd); state = ST_CONNECTED; } } } } @Override public boolean connect(SocketAddress sa) throws IOException { InetSocketAddress isa = RdmaNet.checkAddress(sa); SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); InetAddress ia = isa.getAddress(); if (ia.isAnyLocalAddress()) ia = InetAddress.getLocalHost(); try { readLock.lock(); try { writeLock.lock(); try { int n = 0; boolean blocking = isBlocking(); try { beginConnect(blocking, isa); do { n = RdmaNet.connect(fd, ia, isa.getPort()); } while (n == IOStatus.INTERRUPTED && isOpen()); } finally { endConnect(blocking, (n > 0)); } assert IOStatus.check(n); return n > 0; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } catch (IOException ioe) { // connect failed, close the channel close(); throw ioe; } } protected void endFinishConnect(boolean blocking, boolean completed) throws IOException { endRead(blocking, completed); if (completed) { synchronized (stateLock) { if (state == ST_CONNECTIONPENDING) { localAddress = RdmaNet.localAddress(fd); state = ST_CONNECTED; } } } } @Override protected void implCloseSelectableChannel() throws IOException { assert !isOpen(); boolean blocking; boolean connected; boolean interrupted = false; // set state to ST_CLOSING synchronized (stateLock) { assert state < ST_CLOSING; blocking = isBlocking(); connected = (state == ST_CONNECTED); state = ST_CLOSING; } // wait for any outstanding I/O operations to complete if (blocking) { synchronized (stateLock) { assert state == ST_CLOSING; long reader = readerThread; long writer = writerThread; if (reader != 0 || writer != 0) { ((RdmaSocketDispatcher)nd).preClose(fd); connected = false; // fd is no longer connected socket if (reader != 0) NativeThread.signal(reader); if (writer != 0) NativeThread.signal(writer); // wait for blocking I/O operations to end while (readerThread != 0 || writerThread != 0) { try { stateLock.wait(); } catch (InterruptedException e) { interrupted = true; } } } } } else { // non-blocking mode: wait for read/write to complete readLock.lock(); try { writeLock.lock(); writeLock.unlock(); } finally { readLock.unlock(); } } // set state to ST_KILLPENDING synchronized (stateLock) { assert state == ST_CLOSING; // if connected, and the channel is registered with a Selector, we // shutdown the output so that the peer reads EOF if (connected && isRegistered()) { try { RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); } catch (IOException ignore) { } } state = ST_KILLPENDING; } // close socket if not registered with Selector if (!isRegistered()) kill(); // restore interrupt status if (interrupted) Thread.currentThread().interrupt(); } @Override public SocketChannel shutdownInput() throws IOException { synchronized (stateLock) { ensureOpen(); if (!isConnected()) throw new NotYetConnectedException(); if (!isInputClosed) { RdmaNet.shutdown(fd, RdmaNet.SHUT_RD); long thread = readerThread; if (thread != 0) NativeThread.signal(thread); isInputClosed = true; } return this; } } @Override public SocketChannel shutdownOutput() throws IOException { synchronized (stateLock) { ensureOpen(); if (!isConnected()) throw new NotYetConnectedException(); if (!isOutputClosed) { RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); long thread = writerThread; if (thread != 0) NativeThread.signal(thread); isOutputClosed = true; } return this; } } /** * Poll this channel's socket for reading up to the given timeout. * @return {@code true} if the socket is polled */ boolean pollRead(long timeout) throws IOException { boolean blocking = isBlocking(); assert Thread.holdsLock(blockingLock()) && blocking; readLock.lock(); try { boolean polled = false; try { beginRead(blocking); int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout); polled = (events != 0); } finally { endRead(blocking, polled); } return polled; } finally { readLock.unlock(); } } /** * Poll this channel's socket for a connection, up to the given timeout. * @return {@code true} if the socket is polled */ boolean pollConnected(long timeout) throws IOException { boolean blocking = isBlocking(); assert Thread.holdsLock(blockingLock()) && blocking; readLock.lock(); try { writeLock.lock(); try { boolean polled = false; try { beginFinishConnect(blocking); int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout); polled = (events != 0); } finally { // invoke endFinishConnect with completed = false so that // the state is not changed to ST_CONNECTED. The socket // adaptor will use finishConnect to finish. endFinishConnect(blocking, /*completed*/false); } return polled; } finally { writeLock.unlock(); } } finally { readLock.unlock(); } } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.getClass().getSuperclass().getName()); sb.append('['); if (!isOpen()) sb.append("closed"); else { synchronized (stateLock) { switch (state) { case ST_UNCONNECTED: sb.append("unconnected"); break; case ST_CONNECTIONPENDING: sb.append("connection-pending"); break; case ST_CONNECTED: sb.append("connected"); if (isInputClosed) sb.append(" ishut"); if (isOutputClosed) sb.append(" oshut"); break; } InetSocketAddress addr = localAddress(); if (addr != null) { sb.append(" local="); sb.append(RdmaNet.getRevealedLocalAddressAsString(addr)); } if (remoteAddress() != null) { sb.append(" remote="); sb.append(remoteAddress().toString()); } } } sb.append(']'); return sb.toString(); } // -- Native methods -- private static native int checkConnect(FileDescriptor fd, boolean block) throws IOException; private static native int sendOutOfBandData(FileDescriptor fd, byte data) throws IOException; static { IOUtil.load(); nd = new RdmaSocketDispatcher(); } }