1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.rdma; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.net.ProtocolFamily; 32 import java.net.ServerSocket; 33 import java.net.SocketAddress; 34 import java.net.SocketOption; 35 import java.net.StandardSocketOptions; 36 import java.net.StandardProtocolFamily; 37 import java.nio.channels.AlreadyBoundException; 38 import java.nio.channels.AsynchronousCloseException; 39 import java.nio.channels.ClosedChannelException; 40 import java.nio.channels.NotYetBoundException; 41 import java.nio.channels.SelectionKey; 42 import java.nio.channels.ServerSocketChannel; 43 import java.nio.channels.SocketChannel; 44 import java.nio.channels.spi.SelectorProvider; 45 import java.util.Collections; 46 import java.util.HashSet; 47 import java.util.Objects; 48 import java.util.Set; 49 import java.util.concurrent.locks.ReentrantLock; 50 import sun.nio.ch.IOStatus; 51 import sun.nio.ch.IOUtil; 52 import sun.nio.ch.NativeThread; 53 import sun.nio.ch.Net; 54 import sun.nio.ch.SelChImpl; 55 import sun.nio.ch.SelectionKeyImpl; 56 import sun.net.ext.RdmaSocketOptions; 57 58 public class RdmaServerSocketChannelImpl 59 extends ServerSocketChannel 60 implements SelChImpl 61 { 62 //The protocol family of the socket 63 private final ProtocolFamily family; 64 65 private static RdmaSocketDispatcher nd; 66 67 private final FileDescriptor fd; 68 private final int fdVal; 69 70 private final ReentrantLock acceptLock = new ReentrantLock(); 71 72 private final Object stateLock = new Object(); 73 74 private static final int ST_INUSE = 0; 75 private static final int ST_CLOSING = 1; 76 private static final int ST_KILLPENDING = 2; 77 private static final int ST_KILLED = 3; 78 private int state; 79 80 private long thread; 81 82 private InetSocketAddress localAddress; 83 84 private boolean isReuseAddress; 85 86 private ServerSocket socket; 87 88 private static final UnsupportedOperationException unsupported; 89 90 private static final SelectorProvider checkSupported(SelectorProvider sp) { 91 if (unsupported != null) 92 throw new UnsupportedOperationException(unsupported.getMessage(), unsupported); 93 else 94 return sp; 95 } 96 97 RdmaServerSocketChannelImpl(SelectorProvider sp, ProtocolFamily family) 98 throws IOException { 99 super(checkSupported(sp)); 100 Objects.requireNonNull(family, "'family' is null"); 101 if ((family != StandardProtocolFamily.INET) && 102 (family != StandardProtocolFamily.INET6)) { 103 throw new UnsupportedOperationException( 104 "Protocol family not supported"); 105 } 106 if (family == StandardProtocolFamily.INET6) { 107 if (!Net.isIPv6Available()) { 108 throw new UnsupportedOperationException( 109 "IPv6 not available"); 110 } 111 } 112 this.family = family; 113 this.fd = RdmaNet.serverSocket(family, true); 114 this.fdVal = IOUtil.fdVal(fd); 115 } 116 117 private void ensureOpen() throws ClosedChannelException { 118 if (!isOpen()) 119 throw new ClosedChannelException(); 120 } 121 122 @Override 123 public ServerSocket socket() { 124 synchronized (stateLock) { 125 if (socket == null) 126 socket = RdmaServerSocketAdaptor.create(this); 127 return socket; 128 } 129 } 130 131 @Override 132 public SocketAddress getLocalAddress() throws IOException { 133 synchronized (stateLock) { 134 ensureOpen(); 135 return (localAddress == null) 136 ? null 137 : Net.getRevealedLocalAddress(localAddress); 138 } 139 } 140 141 @Override 142 public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) 143 throws IOException 144 { 145 Objects.requireNonNull(name); 146 if (!supportedOptions().contains(name)) 147 throw new UnsupportedOperationException("'" + name 148 + "' not supported"); 149 synchronized (stateLock) { 150 ensureOpen(); 151 if (isBound() && (name == StandardSocketOptions.SO_REUSEADDR)) 152 throw new UnsupportedOperationException( 153 "RDMA server socket channel cannot set the socket option " 154 + name.toString() + " after bind."); 155 156 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 157 return this; 158 } 159 } 160 161 @Override 162 @SuppressWarnings("unchecked") 163 public <T> T getOption(SocketOption<T> name) 164 throws IOException 165 { 166 Objects.requireNonNull(name); 167 if (!supportedOptions().contains(name)) 168 throw new UnsupportedOperationException("'" + name 169 + "' not supported"); 170 171 synchronized (stateLock) { 172 ensureOpen(); 173 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 174 } 175 } 176 177 private static class DefaultOptionsHolder { 178 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 179 180 private static Set<SocketOption<?>> defaultOptions() { 181 HashSet<SocketOption<?>> set = new HashSet<>(2); 182 set.add(StandardSocketOptions.SO_RCVBUF); 183 set.add(StandardSocketOptions.SO_REUSEADDR); 184 if (RdmaNet.isRdmaAvailable()) { 185 RdmaSocketOptions rdmaOptions = 186 RdmaSocketOptions.getInstance(); 187 set.addAll(rdmaOptions.options()); 188 } 189 return Collections.unmodifiableSet(set); 190 } 191 } 192 193 public final Set<SocketOption<?>> supportedOptions() { 194 return DefaultOptionsHolder.defaultOptions; 195 } 196 197 @Override 198 public ServerSocketChannel bind(SocketAddress local, int backlog) 199 throws IOException { 200 synchronized (stateLock) { 201 ensureOpen(); 202 if (localAddress != null) 203 throw new AlreadyBoundException(); 204 InetSocketAddress isa = (local == null) 205 ? new InetSocketAddress(0) 206 : RdmaNet.checkAddress(local, family); 207 SecurityManager sm = System.getSecurityManager(); 208 if (sm != null) 209 sm.checkListen(isa.getPort()); 210 RdmaNet.bind(family, fd, isa.getAddress(), isa.getPort()); 211 RdmaNet.listen(fd, backlog < 1 ? 50 : backlog); 212 localAddress = RdmaNet.localAddress(fd); 213 } 214 return this; 215 } 216 217 private void begin(boolean blocking) throws ClosedChannelException { 218 if (blocking) 219 begin(); 220 synchronized (stateLock) { 221 ensureOpen(); 222 if (localAddress == null) 223 throw new NotYetBoundException(); 224 if (blocking) 225 thread = NativeThread.current(); 226 } 227 } 228 229 private void end(boolean blocking, boolean completed) 230 throws AsynchronousCloseException { 231 if (blocking) { 232 synchronized (stateLock) { 233 thread = 0; 234 if (state == ST_CLOSING) { 235 stateLock.notifyAll(); 236 } 237 } 238 end(completed); 239 } 240 } 241 242 @Override 243 public SocketChannel accept() throws IOException { 244 acceptLock.lock(); 245 try { 246 int n = 0; 247 FileDescriptor newfd = new FileDescriptor(); 248 InetSocketAddress[] isaa = new InetSocketAddress[1]; 249 250 boolean blocking = isBlocking(); 251 try { 252 begin(blocking); 253 do { 254 n = accept(this.fd, newfd, isaa); 255 } while (n == IOStatus.INTERRUPTED && isOpen()); 256 } finally { 257 end(blocking, n > 0); 258 assert IOStatus.check(n); 259 } 260 261 if (n < 1) 262 return null; 263 264 // newly accepted socket is initially in blocking mode 265 RdmaNet.configureBlocking(newfd, true); 266 267 InetSocketAddress isa = isaa[0]; 268 SocketChannel sc = new RdmaSocketChannelImpl(provider(), 269 newfd, isa); 270 271 // check permitted to accept connections from the remote address 272 SecurityManager sm = System.getSecurityManager(); 273 if (sm != null) { 274 try { 275 sm.checkAccept(isa.getAddress().getHostAddress(), 276 isa.getPort()); 277 } catch (SecurityException x) { 278 sc.close(); 279 throw x; 280 } 281 } 282 return sc; 283 284 } finally { 285 acceptLock.unlock(); 286 } 287 } 288 289 @Override 290 protected void implConfigureBlocking(boolean block) throws IOException { 291 acceptLock.lock(); 292 try { 293 synchronized (stateLock) { 294 ensureOpen(); 295 RdmaNet.configureBlocking(fd, block); 296 } 297 } finally { 298 acceptLock.unlock(); 299 } 300 } 301 302 @Override 303 protected void implCloseSelectableChannel() throws IOException { 304 assert !isOpen(); 305 306 boolean interrupted = false; 307 boolean blocking; 308 309 // set state to ST_CLOSING 310 synchronized (stateLock) { 311 assert state < ST_CLOSING; 312 state = ST_CLOSING; 313 blocking = isBlocking(); 314 } 315 316 // wait for any outstanding accept to complete 317 if (blocking) { 318 synchronized (stateLock) { 319 assert state == ST_CLOSING; 320 long th = thread; 321 if (th != 0) { 322 nd.preClose(fd); 323 NativeThread.signal(th); 324 325 // wait for accept operation to end 326 while (thread != 0) { 327 try { 328 stateLock.wait(); 329 } catch (InterruptedException e) { 330 interrupted = true; 331 } 332 } 333 } 334 } 335 } else { 336 // non-blocking mode: wait for accept to complete 337 acceptLock.lock(); 338 acceptLock.unlock(); 339 } 340 341 // set state to ST_KILLPENDING 342 synchronized (stateLock) { 343 assert state == ST_CLOSING; 344 state = ST_KILLPENDING; 345 } 346 347 // close socket if not registered with Selector 348 if (!isRegistered()) 349 kill(); 350 351 // restore interrupt status 352 if (interrupted) 353 Thread.currentThread().interrupt(); 354 } 355 356 @Override 357 public void kill() throws IOException { 358 synchronized (stateLock) { 359 if (state == ST_KILLPENDING) { 360 state = ST_KILLED; 361 nd.close(fd); 362 } 363 } 364 } 365 366 boolean isBound() { 367 synchronized (stateLock) { 368 return localAddress != null; 369 } 370 } 371 372 InetSocketAddress localAddress() { 373 synchronized (stateLock) { 374 return localAddress; 375 } 376 } 377 378 /** 379 * Poll this channel's socket for a new connection up to the given timeout. 380 * @return {@code true} if there is a connection to accept 381 */ 382 boolean pollAccept(long timeout) throws IOException { 383 assert Thread.holdsLock(blockingLock()) && isBlocking(); 384 acceptLock.lock(); 385 try { 386 boolean polled = false; 387 try { 388 begin(true); 389 int events = RdmaNet.poll(fd, Net.POLLIN, timeout); 390 polled = (events != 0); 391 } finally { 392 end(true, polled); 393 } 394 return polled; 395 } finally { 396 acceptLock.unlock(); 397 } 398 } 399 400 public boolean translateReadyOps(int ops, int initialOps, 401 SelectionKeyImpl ski) { 402 int intOps = ski.nioInterestOps(); 403 int oldOps = ski.nioReadyOps(); 404 int newOps = initialOps; 405 406 if ((ops & Net.POLLNVAL) != 0) { 407 return false; 408 } 409 410 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 411 newOps = intOps; 412 ski.nioReadyOps(newOps); 413 return (newOps & ~oldOps) != 0; 414 } 415 416 if (((ops & Net.POLLIN) != 0) && 417 ((intOps & SelectionKey.OP_ACCEPT) != 0)) 418 newOps |= SelectionKey.OP_ACCEPT; 419 420 ski.nioReadyOps(newOps); 421 return (newOps & ~oldOps) != 0; 422 } 423 424 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 425 return translateReadyOps(ops, ski.nioReadyOps(), ski); 426 } 427 428 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 429 return translateReadyOps(ops, 0, ski); 430 } 431 432 public int translateInterestOps(int ops) { 433 int newOps = 0; 434 if ((ops & SelectionKey.OP_ACCEPT) != 0) 435 newOps |= Net.POLLIN; 436 return newOps; 437 } 438 439 public FileDescriptor getFD() { 440 return fd; 441 } 442 443 public int getFDVal() { 444 return fdVal; 445 } 446 447 public String toString() { 448 StringBuilder sb = new StringBuilder(); 449 sb.append(this.getClass().getName()); 450 sb.append('['); 451 if (!isOpen()) { 452 sb.append("closed"); 453 } else { 454 synchronized (stateLock) { 455 InetSocketAddress addr = localAddress; 456 if (addr == null) { 457 sb.append("unbound"); 458 } else { 459 sb.append(Net.getRevealedLocalAddressAsString(addr)); 460 } 461 } 462 } 463 sb.append(']'); 464 return sb.toString(); 465 } 466 467 private int accept(FileDescriptor ssfd, FileDescriptor newfd, 468 InetSocketAddress[] isaa) throws IOException { 469 return accept0(ssfd, newfd, isaa); 470 } 471 472 // -- Native methods -- 473 474 private native int accept0(FileDescriptor ssfd, FileDescriptor newfd, 475 InetSocketAddress[] isaa) throws IOException; 476 477 private static native void initIDs()throws UnsupportedOperationException; 478 479 static { 480 IOUtil.load(); 481 System.loadLibrary("extnet"); 482 UnsupportedOperationException uoe = null; 483 try { 484 initIDs(); 485 } catch (UnsupportedOperationException e) { 486 uoe = e; 487 } 488 unsupported = uoe; 489 nd = new RdmaSocketDispatcher(); 490 } 491 }