1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.rdma; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.net.ServerSocket; 32 import java.net.SocketAddress; 33 import java.net.SocketOption; 34 import java.net.StandardSocketOptions; 35 import java.nio.channels.AlreadyBoundException; 36 import java.nio.channels.AsynchronousCloseException; 37 import java.nio.channels.ClosedChannelException; 38 import java.nio.channels.NotYetBoundException; 39 import java.nio.channels.SelectionKey; 40 import java.nio.channels.ServerSocketChannel; 41 import java.nio.channels.SocketChannel; 42 import java.nio.channels.spi.SelectorProvider; 43 import java.util.Collections; 44 import java.util.HashSet; 45 import java.util.Objects; 46 import java.util.Set; 47 import java.util.concurrent.locks.ReentrantLock; 48 import sun.nio.ch.IOStatus; 49 import sun.nio.ch.IOUtil; 50 import sun.nio.ch.NativeThread; 51 import sun.nio.ch.Net; 52 import sun.nio.ch.SelChImpl; 53 import sun.nio.ch.SelectionKeyImpl; 54 import sun.net.ext.RdmaSocketOptions; 55 56 public class RdmaServerSocketChannelImpl 57 extends ServerSocketChannel 58 implements SelChImpl 59 { 60 private static RdmaSocketDispatcher nd; 61 62 private final FileDescriptor fd; 63 private final int fdVal; 64 65 private final ReentrantLock acceptLock = new ReentrantLock(); 66 67 private final Object stateLock = new Object(); 68 69 private static final int ST_INUSE = 0; 70 private static final int ST_CLOSING = 1; 71 private static final int ST_KILLPENDING = 2; 72 private static final int ST_KILLED = 3; 73 private int state; 74 75 private long thread; 76 77 private InetSocketAddress localAddress; 78 79 private boolean isReuseAddress; 80 81 private ServerSocket socket; 82 83 private static final UnsupportedOperationException unsupported; 84 85 private static final SelectorProvider checkSupported(SelectorProvider sp) { 86 if (unsupported != null) 87 throw new UnsupportedOperationException(unsupported.getMessage(), unsupported); 88 else 89 return sp; 90 } 91 92 RdmaServerSocketChannelImpl(SelectorProvider sp) throws IOException { 93 super(checkSupported(sp)); 94 this.fd = RdmaNet.serverSocket(); 95 this.fdVal = IOUtil.fdVal(fd); 96 } 97 98 RdmaServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 99 throws IOException 100 { 101 super(checkSupported(sp)); 102 this.fd = fd; 103 this.fdVal = IOUtil.fdVal(fd); 104 if (bound) { 105 synchronized (stateLock) { 106 localAddress = RdmaNet.localAddress(fd); 107 } 108 } 109 } 110 111 private void ensureOpen() throws ClosedChannelException { 112 if (!isOpen()) 113 throw new ClosedChannelException(); 114 } 115 116 @Override 117 public ServerSocket socket() { 118 synchronized (stateLock) { 119 if (socket == null) 120 socket = RdmaServerSocketAdaptor.create(this); 121 return socket; 122 } 123 } 124 125 @Override 126 public SocketAddress getLocalAddress() throws IOException { 127 synchronized (stateLock) { 128 ensureOpen(); 129 return (localAddress == null) 130 ? null 131 : Net.getRevealedLocalAddress(localAddress); 132 } 133 } 134 135 @Override 136 public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) 137 throws IOException 138 { 139 Objects.requireNonNull(name); 140 if (!supportedOptions().contains(name)) 141 throw new UnsupportedOperationException("'" + name + "' not supported"); 142 synchronized (stateLock) { 143 ensureOpen(); 144 145 if (name == StandardSocketOptions.SO_REUSEADDR && 146 RdmaNet.useExclusiveBind()) { 147 isReuseAddress = (Boolean)value; 148 return this; 149 } 150 151 if (isBound() && (name == StandardSocketOptions.SO_REUSEADDR)) 152 throw new UnsupportedOperationException( 153 "RDMA server socket channel cannot set the socket option " 154 + name.toString() + " after bind."); 155 156 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 157 return this; 158 } 159 } 160 161 @Override 162 @SuppressWarnings("unchecked") 163 public <T> T getOption(SocketOption<T> name) 164 throws IOException 165 { 166 Objects.requireNonNull(name); 167 if (!supportedOptions().contains(name)) 168 throw new UnsupportedOperationException("'" + name + "' not supported"); 169 170 synchronized (stateLock) { 171 ensureOpen(); 172 if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { 173 return (T)Boolean.valueOf(isReuseAddress); 174 } 175 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 176 } 177 } 178 179 private static class DefaultOptionsHolder { 180 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 181 182 private static Set<SocketOption<?>> defaultOptions() { 183 HashSet<SocketOption<?>> set = new HashSet<>(2); 184 set.add(StandardSocketOptions.SO_RCVBUF); 185 set.add(StandardSocketOptions.SO_REUSEADDR); 186 if (RdmaNet.isRdmaAvailable()) { 187 RdmaSocketOptions rdmaOptions = 188 RdmaSocketOptions.getInstance(); 189 set.addAll(rdmaOptions.options()); 190 } 191 return Collections.unmodifiableSet(set); 192 } 193 } 194 195 public final Set<SocketOption<?>> supportedOptions() { 196 return DefaultOptionsHolder.defaultOptions; 197 } 198 199 @Override 200 public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { 201 synchronized (stateLock) { 202 ensureOpen(); 203 if (localAddress != null) 204 throw new AlreadyBoundException(); 205 InetSocketAddress isa = (local == null) 206 ? new InetSocketAddress(0) 207 : Net.checkAddress(local); 208 SecurityManager sm = System.getSecurityManager(); 209 if (sm != null) 210 sm.checkListen(isa.getPort()); 211 RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); 212 RdmaNet.listen(fd, backlog < 1 ? 50 : backlog); 213 localAddress = RdmaNet.localAddress(fd); 214 } 215 return this; 216 } 217 218 private void begin(boolean blocking) throws ClosedChannelException { 219 if (blocking) 220 begin(); 221 synchronized (stateLock) { 222 ensureOpen(); 223 if (localAddress == null) 224 throw new NotYetBoundException(); 225 if (blocking) 226 thread = NativeThread.current(); 227 } 228 } 229 230 private void end(boolean blocking, boolean completed) 231 throws AsynchronousCloseException 232 { 233 if (blocking) { 234 synchronized (stateLock) { 235 thread = 0; 236 if (state == ST_CLOSING) { 237 stateLock.notifyAll(); 238 } 239 } 240 end(completed); 241 } 242 } 243 244 @Override 245 public SocketChannel accept() throws IOException { 246 acceptLock.lock(); 247 try { 248 int n = 0; 249 FileDescriptor newfd = new FileDescriptor(); 250 InetSocketAddress[] isaa = new InetSocketAddress[1]; 251 252 boolean blocking = isBlocking(); 253 try { 254 begin(blocking); 255 do { 256 n = accept(this.fd, newfd, isaa); 257 } while (n == IOStatus.INTERRUPTED && isOpen()); 258 } finally { 259 end(blocking, n > 0); 260 assert IOStatus.check(n); 261 } 262 263 if (n < 1) 264 return null; 265 266 // newly accepted socket is initially in blocking mode 267 RdmaNet.configureBlocking(newfd, true); 268 269 InetSocketAddress isa = isaa[0]; 270 SocketChannel sc = new RdmaSocketChannelImpl(provider(), newfd, isa); 271 272 // check permitted to accept connections from the remote address 273 SecurityManager sm = System.getSecurityManager(); 274 if (sm != null) { 275 try { 276 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); 277 } catch (SecurityException x) { 278 sc.close(); 279 throw x; 280 } 281 } 282 return sc; 283 284 } finally { 285 acceptLock.unlock(); 286 } 287 } 288 289 @Override 290 protected void implConfigureBlocking(boolean block) throws IOException { 291 acceptLock.lock(); 292 try { 293 synchronized (stateLock) { 294 ensureOpen(); 295 RdmaNet.configureBlocking(fd, block); 296 } 297 } finally { 298 acceptLock.unlock(); 299 } 300 } 301 302 @Override 303 protected void implCloseSelectableChannel() throws IOException { 304 assert !isOpen(); 305 306 boolean interrupted = false; 307 boolean blocking; 308 309 // set state to ST_CLOSING 310 synchronized (stateLock) { 311 assert state < ST_CLOSING; 312 state = ST_CLOSING; 313 blocking = isBlocking(); 314 } 315 316 // wait for any outstanding accept to complete 317 if (blocking) { 318 synchronized (stateLock) { 319 assert state == ST_CLOSING; 320 long th = thread; 321 if (th != 0) { 322 nd.preClose(fd); 323 NativeThread.signal(th); 324 325 // wait for accept operation to end 326 while (thread != 0) { 327 try { 328 stateLock.wait(); 329 } catch (InterruptedException e) { 330 interrupted = true; 331 } 332 } 333 } 334 } 335 } else { 336 // non-blocking mode: wait for accept to complete 337 acceptLock.lock(); 338 acceptLock.unlock(); 339 } 340 341 // set state to ST_KILLPENDING 342 synchronized (stateLock) { 343 assert state == ST_CLOSING; 344 state = ST_KILLPENDING; 345 } 346 347 // close socket if not registered with Selector 348 if (!isRegistered()) 349 kill(); 350 351 // restore interrupt status 352 if (interrupted) 353 Thread.currentThread().interrupt(); 354 } 355 356 @Override 357 public void kill() throws IOException { 358 synchronized (stateLock) { 359 if (state == ST_KILLPENDING) { 360 state = ST_KILLED; 361 nd.close(fd); 362 } 363 } 364 } 365 366 boolean isBound() { 367 synchronized (stateLock) { 368 return localAddress != null; 369 } 370 } 371 372 InetSocketAddress localAddress() { 373 synchronized (stateLock) { 374 return localAddress; 375 } 376 } 377 378 /** 379 * Poll this channel's socket for a new connection up to the given timeout. 380 * @return {@code true} if there is a connection to accept 381 */ 382 boolean pollAccept(long timeout) throws IOException { 383 assert Thread.holdsLock(blockingLock()) && isBlocking(); 384 acceptLock.lock(); 385 try { 386 boolean polled = false; 387 try { 388 begin(true); 389 int events = RdmaNet.poll(fd, Net.POLLIN, timeout); 390 polled = (events != 0); 391 } finally { 392 end(true, polled); 393 } 394 return polled; 395 } finally { 396 acceptLock.unlock(); 397 } 398 } 399 400 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 401 int intOps = ski.nioInterestOps(); 402 int oldOps = ski.nioReadyOps(); 403 int newOps = initialOps; 404 405 if ((ops & Net.POLLNVAL) != 0) { 406 return false; 407 } 408 409 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 410 newOps = intOps; 411 ski.nioReadyOps(newOps); 412 return (newOps & ~oldOps) != 0; 413 } 414 415 if (((ops & Net.POLLIN) != 0) && 416 ((intOps & SelectionKey.OP_ACCEPT) != 0)) 417 newOps |= SelectionKey.OP_ACCEPT; 418 419 ski.nioReadyOps(newOps); 420 return (newOps & ~oldOps) != 0; 421 } 422 423 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 424 return translateReadyOps(ops, ski.nioReadyOps(), ski); 425 } 426 427 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 428 return translateReadyOps(ops, 0, ski); 429 } 430 431 public int translateInterestOps(int ops) { 432 int newOps = 0; 433 if ((ops & SelectionKey.OP_ACCEPT) != 0) 434 newOps |= Net.POLLIN; 435 return newOps; 436 } 437 438 public FileDescriptor getFD() { 439 return fd; 440 } 441 442 public int getFDVal() { 443 return fdVal; 444 } 445 446 public String toString() { 447 StringBuilder sb = new StringBuilder(); 448 sb.append(this.getClass().getName()); 449 sb.append('['); 450 if (!isOpen()) { 451 sb.append("closed"); 452 } else { 453 synchronized (stateLock) { 454 InetSocketAddress addr = localAddress; 455 if (addr == null) { 456 sb.append("unbound"); 457 } else { 458 sb.append(Net.getRevealedLocalAddressAsString(addr)); 459 } 460 } 461 } 462 sb.append(']'); 463 return sb.toString(); 464 } 465 466 private int accept(FileDescriptor ssfd, 467 FileDescriptor newfd, 468 InetSocketAddress[] isaa) 469 throws IOException 470 { 471 return accept0(ssfd, newfd, isaa); 472 } 473 474 // -- Native methods -- 475 476 private native int accept0(FileDescriptor ssfd, 477 FileDescriptor newfd, 478 InetSocketAddress[] isaa) 479 throws IOException; 480 481 private static native void initIDs()throws UnsupportedOperationException; 482 483 static { 484 IOUtil.load(); 485 System.loadLibrary("extnet"); 486 UnsupportedOperationException uoe = null; 487 try { 488 initIDs(); 489 } catch (UnsupportedOperationException e) { 490 uoe = e; 491 } 492 unsupported = uoe; 493 nd = new RdmaSocketDispatcher(); 494 } 495 }