1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.rdma; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.Inet4Address; 31 import java.net.Inet6Address; 32 import java.net.InetAddress; 33 import java.net.InetSocketAddress; 34 import java.net.ProtocolFamily; 35 import java.net.Socket; 36 import java.net.SocketAddress; 37 import java.net.SocketOption; 38 import java.net.StandardProtocolFamily; 39 import java.net.StandardSocketOptions; 40 import java.nio.ByteBuffer; 41 import java.nio.channels.AlreadyBoundException; 42 import java.nio.channels.AlreadyConnectedException; 43 import java.nio.channels.AsynchronousCloseException; 44 import java.nio.channels.ClosedChannelException; 45 import java.nio.channels.ConnectionPendingException; 46 import java.nio.channels.NoConnectionPendingException; 47 import java.nio.channels.NotYetConnectedException; 48 import java.nio.channels.SelectionKey; 49 import java.nio.channels.SocketChannel; 50 import java.nio.channels.spi.SelectorProvider; 51 import java.util.Collections; 52 import java.util.HashSet; 53 import java.util.Objects; 54 import java.util.Set; 55 import java.util.concurrent.locks.ReentrantLock; 56 import sun.net.ext.RdmaSocketOptions; 57 import sun.nio.ch.IOStatus; 58 import sun.nio.ch.IOUtil; 59 import sun.nio.ch.Net; 60 import sun.nio.ch.NativeThread; 61 import sun.nio.ch.SelChImpl; 62 import sun.nio.ch.SelectionKeyImpl; 63 64 public class RdmaSocketChannelImpl 65 extends SocketChannel 66 implements SelChImpl 67 { 68 // The protocol family of the socket 69 private final ProtocolFamily family; 70 71 private static RdmaSocketDispatcher nd; 72 private final FileDescriptor fd; 73 private final int fdVal; 74 75 private final ReentrantLock readLock = new ReentrantLock(); 76 private final ReentrantLock writeLock = new ReentrantLock(); 77 78 private final Object stateLock = new Object(); 79 80 private volatile boolean isInputClosed; 81 private volatile boolean isOutputClosed; 82 83 private boolean isReuseAddress; 84 85 private static final int ST_UNCONNECTED = 0; 86 private static final int ST_CONNECTIONPENDING = 1; 87 private static final int ST_CONNECTED = 2; 88 private static final int ST_CLOSING = 3; 89 private static final int ST_KILLPENDING = 4; 90 private static final int ST_KILLED = 5; 91 private volatile int state; // need stateLock to change 92 93 private long readerThread; 94 private long writerThread; 95 96 private InetSocketAddress localAddress; 97 private InetSocketAddress remoteAddress; 98 99 private Socket socket; 100 101 private static final UnsupportedOperationException unsupported; 102 103 private static final SelectorProvider checkSupported(SelectorProvider sp) { 104 if (unsupported != null) 105 throw new UnsupportedOperationException(unsupported.getMessage(), 106 unsupported); 107 else 108 return sp; 109 } 110 111 protected RdmaSocketChannelImpl(SelectorProvider sp, ProtocolFamily family) 112 throws IOException { 113 super(checkSupported(sp)); 114 115 Objects.requireNonNull(family, "'family' is null"); 116 if ((family != StandardProtocolFamily.INET) && 117 (family != StandardProtocolFamily.INET6)) { 118 throw new UnsupportedOperationException( 119 "Protocol family not supported"); 120 } 121 if (family == StandardProtocolFamily.INET6) { 122 if (!Net.isIPv6Available()) { 123 throw new UnsupportedOperationException("IPv6 not available"); 124 } 125 } 126 127 this.family = family; 128 this.fd = RdmaNet.socket(family, true); 129 this.fdVal = IOUtil.fdVal(fd); 130 } 131 132 RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, 133 InetSocketAddress isa) throws IOException { 134 super(checkSupported(sp)); 135 this.family = Net.isIPv6Available() 136 ? StandardProtocolFamily.INET6 137 : StandardProtocolFamily.INET; 138 this.fd = fd; 139 this.fdVal = IOUtil.fdVal(fd); 140 synchronized (stateLock) { 141 this.localAddress = RdmaNet.localAddress(fd); 142 this.remoteAddress = isa; 143 this.state = ST_CONNECTED; 144 } 145 } 146 147 private void ensureOpen() throws ClosedChannelException { 148 if (!isOpen()) 149 throw new ClosedChannelException(); 150 } 151 152 private void ensureOpenAndConnected() throws ClosedChannelException { 153 int state = this.state; 154 if (state < ST_CONNECTED) { 155 throw new NotYetConnectedException(); 156 } else if (state > ST_CONNECTED) { 157 throw new ClosedChannelException(); 158 } 159 } 160 161 @Override 162 public Socket socket() { 163 synchronized (stateLock) { 164 if (socket == null) 165 socket = RdmaSocketAdaptor.create(this); 166 return socket; 167 } 168 } 169 170 @Override 171 public SocketAddress getLocalAddress() throws IOException { 172 synchronized (stateLock) { 173 ensureOpen(); 174 return Net.getRevealedLocalAddress(localAddress); 175 } 176 } 177 178 @Override 179 public SocketAddress getRemoteAddress() throws IOException { 180 synchronized (stateLock) { 181 ensureOpen(); 182 return remoteAddress; 183 } 184 } 185 186 @Override 187 public <T> SocketChannel setOption(SocketOption<T> name, T value) 188 throws IOException { 189 Objects.requireNonNull(name); 190 if (!supportedOptions().contains(name)) 191 throw new UnsupportedOperationException("'" + name 192 + "' not supported"); 193 194 synchronized (stateLock) { 195 ensureOpen(); 196 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 197 return this; 198 } 199 } 200 201 @Override 202 @SuppressWarnings("unchecked") 203 public <T> T getOption(SocketOption<T> name) 204 throws IOException { 205 Objects.requireNonNull(name); 206 if (!supportedOptions().contains(name)) 207 throw new UnsupportedOperationException("'" + name 208 + "' not supported"); 209 210 synchronized (stateLock) { 211 ensureOpen(); 212 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 213 } 214 } 215 216 private static class DefaultOptionsHolder { 217 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 218 219 private static Set<SocketOption<?>> defaultOptions() { 220 HashSet<SocketOption<?>> set = new HashSet<>(); 221 set.add(StandardSocketOptions.SO_SNDBUF); 222 set.add(StandardSocketOptions.SO_RCVBUF); 223 set.add(StandardSocketOptions.SO_REUSEADDR); 224 set.add(StandardSocketOptions.TCP_NODELAY); 225 RdmaSocketOptions rdmaOptions = 226 RdmaSocketOptions.getInstance(); 227 set.addAll(rdmaOptions.options()); 228 return Collections.unmodifiableSet(set); 229 } 230 } 231 232 public Set<SocketOption<?>> supportedOptions() { 233 return DefaultOptionsHolder.defaultOptions; 234 } 235 236 private void beginRead(boolean blocking) throws ClosedChannelException { 237 if (blocking) { 238 // set hook for Thread.interrupt 239 begin(); 240 241 synchronized (stateLock) { 242 ensureOpenAndConnected(); 243 // record thread so it can be signalled if needed 244 readerThread = NativeThread.current(); 245 } 246 } else { 247 ensureOpenAndConnected(); 248 } 249 } 250 private void endRead(boolean blocking, boolean completed) 251 throws AsynchronousCloseException 252 { 253 if (blocking) { 254 synchronized (stateLock) { 255 readerThread = 0; 256 // notify any thread waiting in implCloseSelectableChannel 257 if (state == ST_CLOSING) { 258 stateLock.notifyAll(); 259 } 260 } 261 // remove hook for Thread.interrupt 262 end(completed); 263 } 264 } 265 266 @Override 267 public int read(ByteBuffer buf) throws IOException { 268 Objects.requireNonNull(buf); 269 270 readLock.lock(); 271 try { 272 boolean blocking = isBlocking(); 273 int n = 0; 274 try { 275 beginRead(blocking); 276 277 // check if input is shutdown 278 if (isInputClosed) 279 return IOStatus.EOF; 280 281 if (blocking) { 282 do { 283 n = IOUtil.read(fd, buf, -1, nd); 284 } while (n == IOStatus.INTERRUPTED && isOpen()); 285 } else { 286 n = IOUtil.read(fd, buf, -1, nd); 287 } 288 } finally { 289 endRead(blocking, n > 0); 290 if (n <= 0 && isInputClosed) 291 return IOStatus.EOF; 292 } 293 return IOStatus.normalize(n); 294 } finally { 295 readLock.unlock(); 296 } 297 } 298 299 @Override 300 public long read(ByteBuffer[] dsts, int offset, int length) 301 throws IOException 302 { 303 Objects.checkFromIndexSize(offset, length, dsts.length); 304 305 readLock.lock(); 306 try { 307 boolean blocking = isBlocking(); 308 long n = 0; 309 try { 310 beginRead(blocking); 311 312 // check if input is shutdown 313 if (isInputClosed) 314 return IOStatus.EOF; 315 316 if (blocking) { 317 do { 318 n = IOUtil.read(fd, dsts, offset, length, nd); 319 } while (n == IOStatus.INTERRUPTED && isOpen()); 320 } else { 321 n = IOUtil.read(fd, dsts, offset, length, nd); 322 } 323 } finally { 324 endRead(blocking, n > 0); 325 if (n <= 0 && isInputClosed) 326 return IOStatus.EOF; 327 } 328 return IOStatus.normalize(n); 329 } finally { 330 readLock.unlock(); 331 } 332 } 333 334 private void beginWrite(boolean blocking) throws ClosedChannelException { 335 if (blocking) { 336 // set hook for Thread.interrupt 337 begin(); 338 339 synchronized (stateLock) { 340 ensureOpenAndConnected(); 341 if (isOutputClosed) 342 throw new ClosedChannelException(); 343 // record thread so it can be signalled if needed 344 writerThread = NativeThread.current(); 345 } 346 } else { 347 ensureOpenAndConnected(); 348 } 349 } 350 351 private void endWrite(boolean blocking, boolean completed) 352 throws AsynchronousCloseException { 353 if (blocking) { 354 synchronized (stateLock) { 355 writerThread = 0; 356 // notify any thread waiting in implCloseSelectableChannel 357 if (state == ST_CLOSING) { 358 stateLock.notifyAll(); 359 } 360 } 361 // remove hook for Thread.interrupt 362 end(completed); 363 } 364 } 365 366 @Override 367 public int write(ByteBuffer buf) throws IOException { 368 Objects.requireNonNull(buf); 369 370 writeLock.lock(); 371 try { 372 boolean blocking = isBlocking(); 373 int n = 0; 374 try { 375 beginWrite(blocking); 376 if (blocking) { 377 do { 378 n = IOUtil.write(fd, buf, -1, nd); 379 } while (n == IOStatus.INTERRUPTED && isOpen()); 380 } else { 381 n = IOUtil.write(fd, buf, -1, nd); 382 } 383 } finally { 384 endWrite(blocking, n > 0); 385 if (n <= 0 && isOutputClosed) 386 throw new AsynchronousCloseException(); 387 } 388 return IOStatus.normalize(n); 389 } finally { 390 writeLock.unlock(); 391 } 392 } 393 394 @Override 395 public long write(ByteBuffer[] srcs, int offset, int length) 396 throws IOException { 397 Objects.checkFromIndexSize(offset, length, srcs.length); 398 399 writeLock.lock(); 400 try { 401 boolean blocking = isBlocking(); 402 long n = 0; 403 try { 404 beginWrite(blocking); 405 if (blocking) { 406 do { 407 n = IOUtil.write(fd, srcs, offset, length, nd); 408 } while (n == IOStatus.INTERRUPTED && isOpen()); 409 } else { 410 n = IOUtil.write(fd, srcs, offset, length, nd); 411 } 412 } finally { 413 endWrite(blocking, n > 0); 414 if (n <= 0 && isOutputClosed) 415 throw new AsynchronousCloseException(); 416 } 417 return IOStatus.normalize(n); 418 } finally { 419 writeLock.unlock(); 420 } 421 } 422 423 int sendOutOfBandData(byte b) throws IOException { 424 writeLock.lock(); 425 try { 426 boolean blocking = isBlocking(); 427 int n = 0; 428 try { 429 beginWrite(blocking); 430 if (blocking) { 431 do { 432 n = sendOutOfBandData(fd, b); 433 } while (n == IOStatus.INTERRUPTED && isOpen()); 434 } else { 435 n = sendOutOfBandData(fd, b); 436 } 437 } finally { 438 endWrite(blocking, n > 0); 439 if (n <= 0 && isOutputClosed) 440 throw new AsynchronousCloseException(); 441 } 442 return IOStatus.normalize(n); 443 } finally { 444 writeLock.unlock(); 445 } 446 } 447 448 @Override 449 protected void implConfigureBlocking(boolean block) throws IOException { 450 readLock.lock(); 451 try { 452 writeLock.lock(); 453 try { 454 synchronized (stateLock) { 455 ensureOpen(); 456 RdmaNet.configureBlocking(fd, block); 457 } 458 } finally { 459 writeLock.unlock(); 460 } 461 } finally { 462 readLock.unlock(); 463 } 464 } 465 466 InetSocketAddress localAddress() { 467 synchronized (stateLock) { 468 return localAddress; 469 } 470 } 471 472 InetSocketAddress remoteAddress() { 473 synchronized (stateLock) { 474 return remoteAddress; 475 } 476 } 477 478 @Override 479 public SocketChannel bind(SocketAddress local) throws IOException { 480 readLock.lock(); 481 try { 482 writeLock.lock(); 483 try { 484 synchronized (stateLock) { 485 ensureOpen(); 486 if (state == ST_CONNECTIONPENDING) 487 throw new ConnectionPendingException(); 488 if (localAddress != null) 489 throw new AlreadyBoundException(); 490 InetSocketAddress isa = (local == null) ? 491 new InetSocketAddress(0) 492 : RdmaNet.checkAddress(local, family); 493 SecurityManager sm = System.getSecurityManager(); 494 if (sm != null) { 495 sm.checkListen(isa.getPort()); 496 } 497 RdmaNet.bind(family, fd, isa.getAddress(), isa.getPort()); 498 localAddress = RdmaNet.localAddress(fd); 499 } 500 } finally { 501 writeLock.unlock(); 502 } 503 } finally { 504 readLock.unlock(); 505 } 506 return this; 507 } 508 509 @Override 510 public boolean isConnected() { 511 return (state == ST_CONNECTED); 512 } 513 514 @Override 515 public boolean isConnectionPending() { 516 return (state == ST_CONNECTIONPENDING); 517 } 518 519 private void beginConnect(boolean blocking, InetSocketAddress isa) 520 throws IOException { 521 if (blocking) { 522 // set hook for Thread.interrupt 523 begin(); 524 } 525 synchronized (stateLock) { 526 ensureOpen(); 527 int state = this.state; 528 if (state == ST_CONNECTED) 529 throw new AlreadyConnectedException(); 530 if (state == ST_CONNECTIONPENDING) 531 throw new ConnectionPendingException(); 532 assert state == ST_UNCONNECTED; 533 this.state = ST_CONNECTIONPENDING; 534 535 remoteAddress = isa; 536 537 if (blocking) { 538 // record thread so it can be signalled if needed 539 readerThread = NativeThread.current(); 540 } 541 } 542 } 543 544 private void endConnect(boolean blocking, boolean completed) 545 throws IOException { 546 endRead(blocking, completed); 547 548 if (completed) { 549 synchronized (stateLock) { 550 if (state == ST_CONNECTIONPENDING) { 551 localAddress = RdmaNet.localAddress(fd); 552 state = ST_CONNECTED; 553 } 554 } 555 } 556 } 557 558 @Override 559 public boolean connect(SocketAddress sa) throws IOException { 560 InetSocketAddress isa = RdmaNet.checkAddress(sa, family); 561 SecurityManager sm = System.getSecurityManager(); 562 if (sm != null) 563 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 564 565 InetAddress ia = isa.getAddress(); 566 if (ia.isAnyLocalAddress()) 567 ia = InetAddress.getLocalHost(); 568 569 try { 570 readLock.lock(); 571 try { 572 writeLock.lock(); 573 try { 574 int n = 0; 575 boolean blocking = isBlocking(); 576 try { 577 beginConnect(blocking, isa); 578 do { 579 n = RdmaNet.connect(family, fd, ia, isa.getPort()); 580 } while (n == IOStatus.INTERRUPTED && isOpen()); 581 } finally { 582 endConnect(blocking, (n > 0)); 583 } 584 assert IOStatus.check(n); 585 return n > 0; 586 } finally { 587 writeLock.unlock(); 588 } 589 } finally { 590 readLock.unlock(); 591 } 592 } catch (IOException ioe) { 593 // connect failed, close the channel 594 close(); 595 throw ioe; 596 } 597 } 598 599 private void beginFinishConnect(boolean blocking) 600 throws ClosedChannelException { 601 if (blocking) { 602 // set hook for Thread.interrupt 603 begin(); 604 } 605 synchronized (stateLock) { 606 ensureOpen(); 607 if (state != ST_CONNECTIONPENDING) 608 throw new NoConnectionPendingException(); 609 if (blocking) { 610 // record thread so it can be signalled if needed 611 readerThread = NativeThread.current(); 612 } 613 } 614 } 615 616 private void endFinishConnect(boolean blocking, boolean completed) 617 throws IOException 618 { 619 endRead(blocking, completed); 620 621 if (completed) { 622 synchronized (stateLock) { 623 if (state == ST_CONNECTIONPENDING) { 624 localAddress = RdmaNet.localAddress(fd); 625 state = ST_CONNECTED; 626 } 627 } 628 } 629 } 630 631 @Override 632 public boolean finishConnect() throws IOException { 633 try { 634 readLock.lock(); 635 try { 636 writeLock.lock(); 637 try { 638 // no-op if already connected 639 if (isConnected()) 640 return true; 641 642 boolean blocking = isBlocking(); 643 boolean connected = false; 644 try { 645 beginFinishConnect(blocking); 646 int n = 0; 647 if (blocking) { 648 do { 649 n = checkConnect(fd, true); 650 } while ((n == 0 || n == IOStatus.INTERRUPTED) 651 && isOpen()); 652 } else { 653 n = checkConnect(fd, false); 654 } 655 connected = (n > 0); 656 } finally { 657 endFinishConnect(blocking, connected); 658 } 659 assert (blocking && connected) ^ !blocking; 660 return connected; 661 } finally { 662 writeLock.unlock(); 663 } 664 } finally { 665 readLock.unlock(); 666 } 667 } catch (IOException ioe) { 668 // connect failed, close the channel 669 close(); 670 throw ioe; 671 } 672 } 673 674 @Override 675 protected void implCloseSelectableChannel() throws IOException { 676 assert !isOpen(); 677 boolean blocking; 678 boolean connected; 679 boolean interrupted = false; 680 681 // set state to ST_CLOSING 682 synchronized (stateLock) { 683 assert state < ST_CLOSING; 684 blocking = isBlocking(); 685 connected = (state == ST_CONNECTED); 686 state = ST_CLOSING; 687 } 688 689 // wait for any outstanding I/O operations to complete 690 if (blocking) { 691 synchronized (stateLock) { 692 assert state == ST_CLOSING; 693 long reader = readerThread; 694 long writer = writerThread; 695 if (reader != 0 || writer != 0) { 696 nd.preClose(fd); 697 connected = false; // fd is no longer connected socket 698 699 if (reader != 0) 700 NativeThread.signal(reader); 701 if (writer != 0) 702 NativeThread.signal(writer); 703 704 // wait for blocking I/O operations to end 705 while (readerThread != 0 || writerThread != 0) { 706 try { 707 stateLock.wait(); 708 } catch (InterruptedException e) { 709 interrupted = true; 710 } 711 } 712 } 713 } 714 } else { 715 // non-blocking mode: wait for read/write to complete 716 readLock.lock(); 717 try { 718 writeLock.lock(); 719 writeLock.unlock(); 720 } finally { 721 readLock.unlock(); 722 } 723 } 724 725 // set state to ST_KILLPENDING 726 synchronized (stateLock) { 727 assert state == ST_CLOSING; 728 // if connected and the channel is registered with a Selector then 729 // shutdown the output if possible so that the peer reads EOF. 730 if (connected && isRegistered()) { 731 try { 732 RdmaNet.shutdown(fd, Net.SHUT_WR); 733 } catch (IOException ignore) { } 734 } 735 state = ST_KILLPENDING; 736 } 737 738 // close socket if not registered with Selector 739 if (!isRegistered()) 740 kill(); 741 742 // restore interrupt status 743 if (interrupted) 744 Thread.currentThread().interrupt(); 745 } 746 747 @Override 748 public void kill() throws IOException { 749 synchronized (stateLock) { 750 if (state == ST_KILLPENDING) { 751 state = ST_KILLED; 752 nd.close(fd); 753 } 754 } 755 } 756 757 @Override 758 public SocketChannel shutdownInput() throws IOException { 759 synchronized (stateLock) { 760 ensureOpen(); 761 if (!isConnected()) 762 throw new NotYetConnectedException(); 763 if (!isInputClosed) { 764 RdmaNet.shutdown(fd, Net.SHUT_RD); 765 long thread = readerThread; 766 if (thread != 0) 767 NativeThread.signal(thread); 768 isInputClosed = true; 769 } 770 return this; 771 } 772 } 773 774 @Override 775 public SocketChannel shutdownOutput() throws IOException { 776 synchronized (stateLock) { 777 ensureOpen(); 778 if (!isConnected()) 779 throw new NotYetConnectedException(); 780 if (!isOutputClosed) { 781 RdmaNet.shutdown(fd, Net.SHUT_WR); 782 long thread = writerThread; 783 if (thread != 0) 784 NativeThread.signal(thread); 785 isOutputClosed = true; 786 } 787 return this; 788 } 789 } 790 791 boolean isInputOpen() { 792 return !isInputClosed; 793 } 794 795 boolean isOutputOpen() { 796 return !isOutputClosed; 797 } 798 799 /** 800 * Poll this channel's socket for reading up to the given timeout. 801 * @return {@code true} if the socket is polled 802 */ 803 boolean pollRead(long timeout) throws IOException { 804 boolean blocking = isBlocking(); 805 assert Thread.holdsLock(blockingLock()) && blocking; 806 807 readLock.lock(); 808 try { 809 boolean polled = false; 810 try { 811 beginRead(blocking); 812 int events = RdmaNet.poll(fd, Net.POLLIN, timeout); 813 polled = (events != 0); 814 } finally { 815 endRead(blocking, polled); 816 } 817 return polled; 818 } finally { 819 readLock.unlock(); 820 } 821 } 822 823 /** 824 * Poll this channel's socket for a connection, up to the given timeout. 825 * @return {@code true} if the socket is polled 826 */ 827 boolean pollConnected(long timeout) throws IOException { 828 boolean blocking = isBlocking(); 829 assert Thread.holdsLock(blockingLock()) && blocking; 830 831 readLock.lock(); 832 try { 833 writeLock.lock(); 834 try { 835 boolean polled = false; 836 try { 837 beginFinishConnect(blocking); 838 int events = RdmaNet.poll(fd, Net.POLLCONN, timeout); 839 polled = (events != 0); 840 } finally { 841 // invoke endFinishConnect with completed = false so that 842 // the state is not changed to ST_CONNECTED. The socket 843 // adaptor will use finishConnect to finish. 844 endFinishConnect(blocking, /*completed*/false); 845 } 846 return polled; 847 } finally { 848 writeLock.unlock(); 849 } 850 } finally { 851 readLock.unlock(); 852 } 853 } 854 855 public boolean translateReadyOps(int ops, int initialOps, 856 SelectionKeyImpl ski) { 857 int intOps = ski.nioInterestOps(); 858 int oldOps = ski.nioReadyOps(); 859 int newOps = initialOps; 860 861 if ((ops & Net.POLLNVAL) != 0) { 862 return false; 863 } 864 865 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 866 newOps = intOps; 867 ski.nioReadyOps(newOps); 868 return (newOps & ~oldOps) != 0; 869 } 870 871 boolean connected = isConnected(); 872 if (((ops & Net.POLLIN) != 0) && 873 ((intOps & SelectionKey.OP_READ) != 0) && connected) 874 newOps |= SelectionKey.OP_READ; 875 876 if (((ops & Net.POLLCONN) != 0) && 877 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 878 newOps |= SelectionKey.OP_CONNECT; 879 880 if (((ops & Net.POLLOUT) != 0) && 881 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 882 newOps |= SelectionKey.OP_WRITE; 883 884 ski.nioReadyOps(newOps); 885 return (newOps & ~oldOps) != 0; 886 } 887 888 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 889 return translateReadyOps(ops, ski.nioReadyOps(), ski); 890 } 891 892 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 893 return translateReadyOps(ops, 0, ski); 894 } 895 896 public int translateInterestOps(int ops) { 897 int newOps = 0; 898 if ((ops & SelectionKey.OP_READ) != 0) 899 newOps |= Net.POLLIN; 900 if ((ops & SelectionKey.OP_WRITE) != 0) 901 newOps |= Net.POLLOUT; 902 if ((ops & SelectionKey.OP_CONNECT) != 0) 903 newOps |= Net.POLLCONN; 904 return newOps; 905 } 906 907 public FileDescriptor getFD() { 908 return fd; 909 } 910 911 public int getFDVal() { 912 return fdVal; 913 } 914 915 @Override 916 public String toString() { 917 StringBuilder sb = new StringBuilder(); 918 sb.append(this.getClass().getSuperclass().getName()); 919 sb.append('['); 920 if (!isOpen()) 921 sb.append("closed"); 922 else { 923 synchronized (stateLock) { 924 switch (state) { 925 case ST_UNCONNECTED: 926 sb.append("unconnected"); 927 break; 928 case ST_CONNECTIONPENDING: 929 sb.append("connection-pending"); 930 break; 931 case ST_CONNECTED: 932 sb.append("connected"); 933 if (isInputClosed) 934 sb.append(" ishut"); 935 if (isOutputClosed) 936 sb.append(" oshut"); 937 break; 938 } 939 InetSocketAddress addr = localAddress(); 940 if (addr != null) { 941 sb.append(" local="); 942 sb.append(Net.getRevealedLocalAddressAsString(addr)); 943 } 944 if (remoteAddress() != null) { 945 sb.append(" remote="); 946 sb.append(remoteAddress().toString()); 947 } 948 } 949 } 950 sb.append(']'); 951 return sb.toString(); 952 } 953 954 // -- Native methods -- 955 956 private static native void initIDs() throws UnsupportedOperationException; 957 958 private static native int checkConnect(FileDescriptor fd, boolean block) 959 throws IOException; 960 961 private static native int sendOutOfBandData(FileDescriptor fd, byte data) 962 throws IOException; 963 964 static { 965 IOUtil.load(); 966 System.loadLibrary("extnet"); 967 UnsupportedOperationException uoe = null; 968 try { 969 initIDs(); 970 } catch (UnsupportedOperationException e) { 971 uoe = e; 972 } 973 unsupported = uoe; 974 nd = new RdmaSocketDispatcher(); 975 } 976 }