1 /* 2 * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package rdma.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.net.ServerSocket; 32 import java.net.SocketAddress; 33 import java.net.SocketOption; 34 import java.net.StandardSocketOptions; 35 import java.nio.channels.AlreadyBoundException; 36 import java.nio.channels.AsynchronousCloseException; 37 import java.nio.channels.ClosedChannelException; 38 import java.nio.channels.NotYetBoundException; 39 import java.nio.channels.SelectionKey; 40 import java.nio.channels.ServerSocketChannel; 41 import java.nio.channels.SocketChannel; 42 import java.nio.channels.spi.SelectorProvider; 43 import java.util.Collections; 44 import java.util.HashSet; 45 import java.util.Objects; 46 import java.util.Set; 47 import java.util.concurrent.locks.ReentrantLock; 48 import sun.nio.ch.IOStatus; 49 import sun.nio.ch.IOUtil; 50 import sun.nio.ch.NativeThread; 51 import sun.nio.ch.SelChImpl; 52 import sun.nio.ch.SelectionKeyImpl; 53 import sun.net.ext.RdmaSocketOptions; 54 55 public class RdmaServerSocketChannelImpl 56 extends ServerSocketChannel 57 implements SelChImpl 58 { 59 private static RdmaSocketDispatcher nd; 60 61 private final FileDescriptor fd; 62 private final int fdVal; 63 64 private final ReentrantLock acceptLock = new ReentrantLock(); 65 66 private final Object stateLock = new Object(); 67 68 private static final int ST_INUSE = 0; 69 private static final int ST_CLOSING = 1; 70 private static final int ST_KILLPENDING = 2; 71 private static final int ST_KILLED = 3; 72 private int state; 73 74 private long thread; 75 76 private InetSocketAddress localAddress; 77 78 private boolean isReuseAddress; 79 80 private ServerSocket socket; 81 82 RdmaServerSocketChannelImpl(SelectorProvider sp) throws IOException { 83 super(sp); 84 this.fd = RdmaNet.serverSocket(true); 85 this.fdVal = IOUtil.fdVal(fd); 86 } 87 88 RdmaServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 89 throws IOException 90 { 91 super(sp); 92 this.fd = fd; 93 this.fdVal = IOUtil.fdVal(fd); 94 if (bound) { 95 synchronized (stateLock) { 96 localAddress = RdmaNet.localAddress(fd); 97 } 98 } 99 } 100 101 private void ensureOpen() throws ClosedChannelException { 102 if (!isOpen()) 103 throw new ClosedChannelException(); 104 } 105 106 @Override 107 public ServerSocket socket() { 108 synchronized (stateLock) { 109 if (socket == null) 110 socket = RdmaServerSocketAdaptor.create(this); 111 return socket; 112 } 113 } 114 115 @Override 116 public SocketAddress getLocalAddress() throws IOException { 117 synchronized (stateLock) { 118 ensureOpen(); 119 return (localAddress == null) 120 ? null 121 : RdmaNet.getRevealedLocalAddress(localAddress); 122 } 123 } 124 125 @Override 126 public <T> ServerSocketChannel setOption(SocketOption<T> name, T value) 127 throws IOException 128 { 129 Objects.requireNonNull(name); 130 if (!supportedOptions().contains(name)) 131 throw new UnsupportedOperationException("'" + name + "' not supported"); 132 synchronized (stateLock) { 133 ensureOpen(); 134 135 if (name == StandardSocketOptions.SO_REUSEADDR && 136 RdmaNet.useExclusiveBind()) { 137 isReuseAddress = (Boolean)value; 138 return this; 139 } 140 141 if (isBound() && (name == StandardSocketOptions.SO_REUSEADDR || 142 name == StandardSocketOptions.SO_RCVBUF)) 143 throw new UnsupportedOperationException( 144 "RDMA server socket channel cannot set the socket option " 145 + name.toString() + " after bind."); 146 147 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 148 return this; 149 } 150 } 151 152 @Override 153 @SuppressWarnings("unchecked") 154 public <T> T getOption(SocketOption<T> name) 155 throws IOException 156 { 157 Objects.requireNonNull(name); 158 if (!supportedOptions().contains(name)) 159 throw new UnsupportedOperationException("'" + name + "' not supported"); 160 161 synchronized (stateLock) { 162 ensureOpen(); 163 if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { 164 return (T)Boolean.valueOf(isReuseAddress); 165 } 166 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 167 } 168 } 169 170 private static class DefaultOptionsHolder { 171 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 172 173 private static Set<SocketOption<?>> defaultOptions() { 174 HashSet<SocketOption<?>> set = new HashSet<>(2); 175 set.add(StandardSocketOptions.SO_RCVBUF); 176 set.add(StandardSocketOptions.SO_REUSEADDR); 177 if (RdmaNet.isRdmaAvailable()) { 178 RdmaSocketOptions rdmaOptions = 179 RdmaSocketOptions.getInstance(); 180 set.addAll(rdmaOptions.options()); 181 } 182 return Collections.unmodifiableSet(set); 183 } 184 } 185 186 public final Set<SocketOption<?>> supportedOptions() { 187 return DefaultOptionsHolder.defaultOptions; 188 } 189 190 @Override 191 public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { 192 synchronized (stateLock) { 193 ensureOpen(); 194 if (localAddress != null) 195 throw new AlreadyBoundException(); 196 InetSocketAddress isa = (local == null) 197 ? new InetSocketAddress(0) 198 : RdmaNet.checkAddress(local); 199 SecurityManager sm = System.getSecurityManager(); 200 if (sm != null) 201 sm.checkListen(isa.getPort()); 202 RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); 203 RdmaNet.listen(fd, backlog < 1 ? 50 : backlog); 204 localAddress = RdmaNet.localAddress(fd); 205 } 206 return this; 207 } 208 209 private void begin(boolean blocking) throws ClosedChannelException { 210 if (blocking) 211 begin(); 212 synchronized (stateLock) { 213 ensureOpen(); 214 if (localAddress == null) 215 throw new NotYetBoundException(); 216 if (blocking) 217 thread = NativeThread.current(); 218 } 219 } 220 221 private void end(boolean blocking, boolean completed) 222 throws AsynchronousCloseException 223 { 224 if (blocking) { 225 synchronized (stateLock) { 226 thread = 0; 227 if (state == ST_CLOSING) { 228 stateLock.notifyAll(); 229 } 230 } 231 end(completed); 232 } 233 } 234 235 @Override 236 public SocketChannel accept() throws IOException { 237 acceptLock.lock(); 238 try { 239 int n = 0; 240 FileDescriptor newfd = new FileDescriptor(); 241 InetSocketAddress[] isaa = new InetSocketAddress[1]; 242 243 boolean blocking = isBlocking(); 244 try { 245 begin(blocking); 246 do { 247 n = accept(this.fd, newfd, isaa); 248 } while (n == IOStatus.INTERRUPTED && isOpen()); 249 } finally { 250 end(blocking, n > 0); 251 assert IOStatus.check(n); 252 } 253 254 if (n < 1) 255 return null; 256 257 // newly accepted socket is initially in blocking mode 258 RdmaNet.configureBlocking(newfd, true); 259 260 InetSocketAddress isa = isaa[0]; 261 SocketChannel sc = new RdmaSocketChannelImpl(provider(), newfd, isa); 262 263 // check permitted to accept connections from the remote address 264 SecurityManager sm = System.getSecurityManager(); 265 if (sm != null) { 266 try { 267 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); 268 } catch (SecurityException x) { 269 sc.close(); 270 throw x; 271 } 272 } 273 return sc; 274 275 } finally { 276 acceptLock.unlock(); 277 } 278 } 279 280 @Override 281 protected void implConfigureBlocking(boolean block) throws IOException { 282 acceptLock.lock(); 283 try { 284 synchronized (stateLock) { 285 ensureOpen(); 286 RdmaNet.configureBlocking(fd, block); 287 } 288 } finally { 289 acceptLock.unlock(); 290 } 291 } 292 293 @Override 294 protected void implCloseSelectableChannel() throws IOException { 295 assert !isOpen(); 296 297 boolean interrupted = false; 298 boolean blocking; 299 300 // set state to ST_CLOSING 301 synchronized (stateLock) { 302 assert state < ST_CLOSING; 303 state = ST_CLOSING; 304 blocking = isBlocking(); 305 } 306 307 // wait for any outstanding accept to complete 308 if (blocking) { 309 synchronized (stateLock) { 310 assert state == ST_CLOSING; 311 long th = thread; 312 if (th != 0) { 313 nd.preClose(fd); 314 NativeThread.signal(th); 315 316 // wait for accept operation to end 317 while (thread != 0) { 318 try { 319 stateLock.wait(); 320 } catch (InterruptedException e) { 321 interrupted = true; 322 } 323 } 324 } 325 } 326 } else { 327 // non-blocking mode: wait for accept to complete 328 acceptLock.lock(); 329 acceptLock.unlock(); 330 } 331 332 // set state to ST_KILLPENDING 333 synchronized (stateLock) { 334 assert state == ST_CLOSING; 335 state = ST_KILLPENDING; 336 } 337 338 // close socket if not registered with Selector 339 if (!isRegistered()) 340 kill(); 341 342 // restore interrupt status 343 if (interrupted) 344 Thread.currentThread().interrupt(); 345 } 346 347 @Override 348 public void kill() throws IOException { 349 synchronized (stateLock) { 350 if (state == ST_KILLPENDING) { 351 state = ST_KILLED; 352 nd.close(fd); 353 } 354 } 355 } 356 357 boolean isBound() { 358 synchronized (stateLock) { 359 return localAddress != null; 360 } 361 } 362 363 InetSocketAddress localAddress() { 364 synchronized (stateLock) { 365 return localAddress; 366 } 367 } 368 369 /** 370 * Poll this channel's socket for a new connection up to the given timeout. 371 * @return {@code true} if there is a connection to accept 372 */ 373 boolean pollAccept(long timeout) throws IOException { 374 assert Thread.holdsLock(blockingLock()) && isBlocking(); 375 acceptLock.lock(); 376 try { 377 boolean polled = false; 378 try { 379 begin(true); 380 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout); 381 polled = (events != 0); 382 } finally { 383 end(true, polled); 384 } 385 return polled; 386 } finally { 387 acceptLock.unlock(); 388 } 389 } 390 391 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 392 int intOps = ski.nioInterestOps(); 393 int oldOps = ski.nioReadyOps(); 394 int newOps = initialOps; 395 396 if ((ops & RdmaNet.POLLNVAL) != 0) { 397 return false; 398 } 399 400 if ((ops & (RdmaNet.POLLERR | RdmaNet.POLLHUP)) != 0) { 401 newOps = intOps; 402 ski.nioReadyOps(newOps); 403 return (newOps & ~oldOps) != 0; 404 } 405 406 if (((ops & RdmaNet.POLLIN) != 0) && 407 ((intOps & SelectionKey.OP_ACCEPT) != 0)) 408 newOps |= SelectionKey.OP_ACCEPT; 409 410 ski.nioReadyOps(newOps); 411 return (newOps & ~oldOps) != 0; 412 } 413 414 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 415 return translateReadyOps(ops, ski.nioReadyOps(), ski); 416 } 417 418 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 419 return translateReadyOps(ops, 0, ski); 420 } 421 422 public int translateInterestOps(int ops) { 423 int newOps = 0; 424 if ((ops & SelectionKey.OP_ACCEPT) != 0) 425 newOps |= RdmaNet.POLLIN; 426 return newOps; 427 } 428 429 public FileDescriptor getFD() { 430 return fd; 431 } 432 433 public int getFDVal() { 434 return fdVal; 435 } 436 437 public String toString() { 438 StringBuilder sb = new StringBuilder(); 439 sb.append(this.getClass().getName()); 440 sb.append('['); 441 if (!isOpen()) { 442 sb.append("closed"); 443 } else { 444 synchronized (stateLock) { 445 InetSocketAddress addr = localAddress; 446 if (addr == null) { 447 sb.append("unbound"); 448 } else { 449 sb.append(RdmaNet.getRevealedLocalAddressAsString(addr)); 450 } 451 } 452 } 453 sb.append(']'); 454 return sb.toString(); 455 } 456 457 private int accept(FileDescriptor ssfd, 458 FileDescriptor newfd, 459 InetSocketAddress[] isaa) 460 throws IOException 461 { 462 return accept0(ssfd, newfd, isaa); 463 } 464 465 // -- Native methods -- 466 467 private native int accept0(FileDescriptor ssfd, 468 FileDescriptor newfd, 469 InetSocketAddress[] isaa) 470 throws IOException; 471 472 private static native void initIDs(); 473 474 static { 475 IOUtil.load(); 476 initIDs(); 477 nd = new RdmaSocketDispatcher(); 478 } 479 }