1 /* 2 * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package rdma.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.Socket; 33 import java.net.SocketAddress; 34 import java.net.SocketOption; 35 import java.net.StandardSocketOptions; 36 import java.nio.ByteBuffer; 37 import java.nio.channels.AlreadyBoundException; 38 import java.nio.channels.AlreadyConnectedException; 39 import java.nio.channels.AsynchronousCloseException; 40 import java.nio.channels.ClosedChannelException; 41 import java.nio.channels.ConnectionPendingException; 42 import java.nio.channels.NoConnectionPendingException; 43 import java.nio.channels.NotYetConnectedException; 44 import java.nio.channels.SelectionKey; 45 import java.nio.channels.SocketChannel; 46 import java.nio.channels.spi.SelectorProvider; 47 import java.util.Collections; 48 import java.util.HashSet; 49 import java.util.Objects; 50 import java.util.Set; 51 import java.util.concurrent.locks.ReentrantLock; 52 import sun.net.ext.RdmaSocketOptions; 53 import sun.nio.ch.IOStatus; 54 import sun.nio.ch.IOUtil; 55 import sun.nio.ch.Net; 56 import sun.nio.ch.NativeThread; 57 import sun.nio.ch.SelChImpl; 58 import sun.nio.ch.SelectionKeyImpl; 59 60 public class RdmaSocketChannelImpl 61 extends SocketChannel 62 implements SelChImpl 63 { 64 private static RdmaSocketDispatcher nd; 65 private final FileDescriptor fd; 66 private final int fdVal; 67 68 private final ReentrantLock readLock = new ReentrantLock(); 69 private final ReentrantLock writeLock = new ReentrantLock(); 70 71 private final Object stateLock = new Object(); 72 73 private volatile boolean isInputClosed; 74 private volatile boolean isOutputClosed; 75 76 private boolean isReuseAddress; 77 78 private static final int ST_UNCONNECTED = 0; 79 private static final int ST_CONNECTIONPENDING = 1; 80 private static final int ST_CONNECTED = 2; 81 private static final int ST_CLOSING = 3; 82 private static final int ST_KILLPENDING = 4; 83 private static final int ST_KILLED = 5; 84 private volatile int state; // need stateLock to change 85 86 private long readerThread; 87 private long writerThread; 88 89 private InetSocketAddress localAddress; 90 private InetSocketAddress remoteAddress; 91 92 private Socket socket; 93 94 protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException { 95 super(sp); 96 this.fd = RdmaNet.socket(true); 97 this.fdVal = IOUtil.fdVal(fd); 98 } 99 100 protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 101 throws IOException 102 { 103 super(sp); 104 this.fd = fd; 105 this.fdVal = IOUtil.fdVal(fd); 106 if (bound) { 107 synchronized (stateLock) { 108 this.localAddress = RdmaNet.localAddress(fd); 109 } 110 } 111 } 112 113 RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) 114 throws IOException 115 { 116 super(sp); 117 this.fd = fd; 118 this.fdVal = IOUtil.fdVal(fd); 119 synchronized (stateLock) { 120 this.localAddress = RdmaNet.localAddress(fd); 121 this.remoteAddress = isa; 122 this.state = ST_CONNECTED; 123 } 124 } 125 126 private void ensureOpen() throws ClosedChannelException { 127 if (!isOpen()) 128 throw new ClosedChannelException(); 129 } 130 131 private void ensureOpenAndConnected() throws ClosedChannelException { 132 int state = this.state; 133 if (state < ST_CONNECTED) { 134 throw new NotYetConnectedException(); 135 } else if (state > ST_CONNECTED) { 136 throw new ClosedChannelException(); 137 } 138 } 139 140 @Override 141 public Socket socket() { 142 synchronized (stateLock) { 143 if (socket == null) 144 socket = RdmaSocketAdaptor.create(this); 145 return socket; 146 } 147 } 148 149 @Override 150 public SocketAddress getLocalAddress() throws IOException { 151 synchronized (stateLock) { 152 ensureOpen(); 153 return RdmaNet.getRevealedLocalAddress(localAddress); 154 } 155 } 156 157 @Override 158 public SocketAddress getRemoteAddress() throws IOException { 159 synchronized (stateLock) { 160 ensureOpen(); 161 return remoteAddress; 162 } 163 } 164 165 @Override 166 public <T> SocketChannel setOption(SocketOption<T> name, T value) 167 throws IOException 168 { 169 Objects.requireNonNull(name); 170 if (!supportedOptions().contains(name)) 171 throw new UnsupportedOperationException("'" + name + "' not supported"); 172 173 synchronized (stateLock) { 174 ensureOpen(); 175 176 if (name == StandardSocketOptions.SO_REUSEADDR && 177 RdmaNet.useExclusiveBind()) { 178 isReuseAddress = (Boolean)value; 179 return this; 180 } 181 182 if (isConnected() && (name == StandardSocketOptions.SO_REUSEADDR || 183 name == StandardSocketOptions.SO_SNDBUF || 184 name == StandardSocketOptions.SO_RCVBUF)) 185 throw new UnsupportedOperationException( 186 "RDMA socket channel cannot set the socket option " 187 + name.toString() + " after connect."); 188 189 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 190 return this; 191 } 192 } 193 194 @Override 195 @SuppressWarnings("unchecked") 196 public <T> T getOption(SocketOption<T> name) 197 throws IOException 198 { 199 Objects.requireNonNull(name); 200 if (!supportedOptions().contains(name)) 201 throw new UnsupportedOperationException("'" + name + "' not supported"); 202 203 synchronized (stateLock) { 204 ensureOpen(); 205 206 if (name == StandardSocketOptions.SO_REUSEADDR 207 && RdmaNet.useExclusiveBind()) { 208 return (T)Boolean.valueOf(isReuseAddress); 209 } 210 211 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 212 } 213 } 214 215 private static class DefaultOptionsHolder { 216 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 217 218 private static Set<SocketOption<?>> defaultOptions() { 219 HashSet<SocketOption<?>> set = new HashSet<>(); 220 set.add(StandardSocketOptions.SO_SNDBUF); 221 set.add(StandardSocketOptions.SO_RCVBUF); 222 set.add(StandardSocketOptions.SO_REUSEADDR); 223 set.add(StandardSocketOptions.SO_LINGER); 224 set.add(StandardSocketOptions.TCP_NODELAY); 225 RdmaSocketOptions rdmaOptions = 226 RdmaSocketOptions.getInstance(); 227 set.addAll(rdmaOptions.options()); 228 return Collections.unmodifiableSet(set); 229 } 230 } 231 232 public Set<SocketOption<?>> supportedOptions() { 233 return DefaultOptionsHolder.defaultOptions; 234 } 235 236 private void beginRead(boolean blocking) throws ClosedChannelException { 237 if (blocking) { 238 // set hook for Thread.interrupt 239 begin(); 240 241 synchronized (stateLock) { 242 ensureOpenAndConnected(); 243 // record thread so it can be signalled if needed 244 readerThread = NativeThread.current(); 245 } 246 } else { 247 ensureOpenAndConnected(); 248 } 249 } 250 private void endRead(boolean blocking, boolean completed) 251 throws AsynchronousCloseException 252 { 253 if (blocking) { 254 synchronized (stateLock) { 255 readerThread = 0; 256 // notify any thread waiting in implCloseSelectableChannel 257 if (state == ST_CLOSING) { 258 stateLock.notifyAll(); 259 } 260 } 261 // remove hook for Thread.interrupt 262 end(completed); 263 } 264 } 265 266 @Override 267 public int read(ByteBuffer buf) throws IOException { 268 Objects.requireNonNull(buf); 269 270 readLock.lock(); 271 try { 272 boolean blocking = isBlocking(); 273 int n = 0; 274 try { 275 beginRead(blocking); 276 277 // check if input is shutdown 278 if (isInputClosed) 279 return IOStatus.EOF; 280 281 if (blocking) { 282 do { 283 n = IOUtil.read(fd, buf, -1, nd); 284 } while (n == IOStatus.INTERRUPTED && isOpen()); 285 } else { 286 n = IOUtil.read(fd, buf, -1, nd); 287 } 288 } finally { 289 endRead(blocking, n > 0); 290 if (n <= 0 && isInputClosed) 291 return IOStatus.EOF; 292 } 293 return IOStatus.normalize(n); 294 } finally { 295 readLock.unlock(); 296 } 297 } 298 299 @Override 300 public long read(ByteBuffer[] dsts, int offset, int length) 301 throws IOException 302 { 303 Objects.checkFromIndexSize(offset, length, dsts.length); 304 305 readLock.lock(); 306 try { 307 boolean blocking = isBlocking(); 308 long n = 0; 309 try { 310 beginRead(blocking); 311 312 // check if input is shutdown 313 if (isInputClosed) 314 return IOStatus.EOF; 315 316 if (blocking) { 317 do { 318 n = IOUtil.read(fd, dsts, offset, length, nd); 319 } while (n == IOStatus.INTERRUPTED && isOpen()); 320 } else { 321 n = IOUtil.read(fd, dsts, offset, length, nd); 322 } 323 } finally { 324 endRead(blocking, n > 0); 325 if (n <= 0 && isInputClosed) 326 return IOStatus.EOF; 327 } 328 return IOStatus.normalize(n); 329 } finally { 330 readLock.unlock(); 331 } 332 } 333 334 private void beginWrite(boolean blocking) throws ClosedChannelException { 335 if (blocking) { 336 // set hook for Thread.interrupt 337 begin(); 338 339 synchronized (stateLock) { 340 ensureOpenAndConnected(); 341 if (isOutputClosed) 342 throw new ClosedChannelException(); 343 // record thread so it can be signalled if needed 344 writerThread = NativeThread.current(); 345 } 346 } else { 347 ensureOpenAndConnected(); 348 } 349 } 350 351 private void endWrite(boolean blocking, boolean completed) 352 throws AsynchronousCloseException 353 { 354 if (blocking) { 355 synchronized (stateLock) { 356 writerThread = 0; 357 // notify any thread waiting in implCloseSelectableChannel 358 if (state == ST_CLOSING) { 359 stateLock.notifyAll(); 360 } 361 } 362 // remove hook for Thread.interrupt 363 end(completed); 364 } 365 } 366 367 @Override 368 public int write(ByteBuffer buf) throws IOException { 369 Objects.requireNonNull(buf); 370 371 writeLock.lock(); 372 try { 373 boolean blocking = isBlocking(); 374 int n = 0; 375 try { 376 beginWrite(blocking); 377 if (blocking) { 378 do { 379 n = IOUtil.write(fd, buf, -1, nd); 380 } while (n == IOStatus.INTERRUPTED && isOpen()); 381 } else { 382 n = IOUtil.write(fd, buf, -1, nd); 383 } 384 } finally { 385 endWrite(blocking, n > 0); 386 if (n <= 0 && isOutputClosed) 387 throw new AsynchronousCloseException(); 388 } 389 return IOStatus.normalize(n); 390 } finally { 391 writeLock.unlock(); 392 } 393 } 394 395 @Override 396 public long write(ByteBuffer[] srcs, int offset, int length) 397 throws IOException 398 { 399 Objects.checkFromIndexSize(offset, length, srcs.length); 400 401 writeLock.lock(); 402 try { 403 boolean blocking = isBlocking(); 404 long n = 0; 405 try { 406 beginWrite(blocking); 407 if (blocking) { 408 do { 409 n = IOUtil.write(fd, srcs, offset, length, nd); 410 } while (n == IOStatus.INTERRUPTED && isOpen()); 411 } else { 412 n = IOUtil.write(fd, srcs, offset, length, nd); 413 } 414 } finally { 415 endWrite(blocking, n > 0); 416 if (n <= 0 && isOutputClosed) 417 throw new AsynchronousCloseException(); 418 } 419 return IOStatus.normalize(n); 420 } finally { 421 writeLock.unlock(); 422 } 423 } 424 425 int sendOutOfBandData(byte b) throws IOException { 426 writeLock.lock(); 427 try { 428 boolean blocking = isBlocking(); 429 int n = 0; 430 try { 431 beginWrite(blocking); 432 if (blocking) { 433 do { 434 n = sendOutOfBandData(fd, b); 435 } while (n == IOStatus.INTERRUPTED && isOpen()); 436 } else { 437 n = sendOutOfBandData(fd, b); 438 } 439 } finally { 440 endWrite(blocking, n > 0); 441 if (n <= 0 && isOutputClosed) 442 throw new AsynchronousCloseException(); 443 } 444 return IOStatus.normalize(n); 445 } finally { 446 writeLock.unlock(); 447 } 448 } 449 450 @Override 451 protected void implConfigureBlocking(boolean block) throws IOException { 452 readLock.lock(); 453 try { 454 writeLock.lock(); 455 try { 456 synchronized (stateLock) { 457 ensureOpen(); 458 RdmaNet.configureBlocking(fd, block); 459 } 460 } finally { 461 writeLock.unlock(); 462 } 463 } finally { 464 readLock.unlock(); 465 } 466 } 467 468 InetSocketAddress localAddress() { 469 synchronized (stateLock) { 470 return localAddress; 471 } 472 } 473 474 InetSocketAddress remoteAddress() { 475 synchronized (stateLock) { 476 return remoteAddress; 477 } 478 } 479 480 @Override 481 public SocketChannel bind(SocketAddress local) throws IOException { 482 readLock.lock(); 483 try { 484 writeLock.lock(); 485 try { 486 synchronized (stateLock) { 487 ensureOpen(); 488 if (state == ST_CONNECTIONPENDING) 489 throw new ConnectionPendingException(); 490 if (localAddress != null) 491 throw new AlreadyBoundException(); 492 InetSocketAddress isa = (local == null) ? 493 new InetSocketAddress(0) : RdmaNet.checkAddress(local); 494 SecurityManager sm = System.getSecurityManager(); 495 if (sm != null) { 496 sm.checkListen(isa.getPort()); 497 } 498 RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); 499 localAddress = RdmaNet.localAddress(fd); 500 } 501 } finally { 502 writeLock.unlock(); 503 } 504 } finally { 505 readLock.unlock(); 506 } 507 return this; 508 } 509 510 @Override 511 public boolean isConnected() { 512 return (state == ST_CONNECTED); 513 } 514 515 @Override 516 public boolean isConnectionPending() { 517 return (state == ST_CONNECTIONPENDING); 518 } 519 520 private void beginConnect(boolean blocking, InetSocketAddress isa) 521 throws IOException 522 { 523 if (blocking) { 524 // set hook for Thread.interrupt 525 begin(); 526 } 527 synchronized (stateLock) { 528 ensureOpen(); 529 int state = this.state; 530 if (state == ST_CONNECTED) 531 throw new AlreadyConnectedException(); 532 if (state == ST_CONNECTIONPENDING) 533 throw new ConnectionPendingException(); 534 assert state == ST_UNCONNECTED; 535 this.state = ST_CONNECTIONPENDING; 536 537 remoteAddress = isa; 538 539 if (blocking) { 540 // record thread so it can be signalled if needed 541 readerThread = NativeThread.current(); 542 } 543 } 544 } 545 546 private void endConnect(boolean blocking, boolean completed) 547 throws IOException 548 { 549 endRead(blocking, completed); 550 551 if (completed) { 552 synchronized (stateLock) { 553 if (state == ST_CONNECTIONPENDING) { 554 localAddress = RdmaNet.localAddress(fd); 555 state = ST_CONNECTED; 556 } 557 } 558 } 559 } 560 561 @Override 562 public boolean connect(SocketAddress sa) throws IOException { 563 InetSocketAddress isa = RdmaNet.checkAddress(sa); 564 SecurityManager sm = System.getSecurityManager(); 565 if (sm != null) 566 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 567 568 InetAddress ia = isa.getAddress(); 569 if (ia.isAnyLocalAddress()) 570 ia = InetAddress.getLocalHost(); 571 572 try { 573 readLock.lock(); 574 try { 575 writeLock.lock(); 576 try { 577 int n = 0; 578 boolean blocking = isBlocking(); 579 try { 580 beginConnect(blocking, isa); 581 do { 582 n = RdmaNet.connect(fd, ia, isa.getPort()); 583 } while (n == IOStatus.INTERRUPTED && isOpen()); 584 } finally { 585 endConnect(blocking, (n > 0)); 586 } 587 assert IOStatus.check(n); 588 return n > 0; 589 } finally { 590 writeLock.unlock(); 591 } 592 } finally { 593 readLock.unlock(); 594 } 595 } catch (IOException ioe) { 596 // connect failed, close the channel 597 close(); 598 throw ioe; 599 } 600 } 601 602 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { 603 if (blocking) { 604 // set hook for Thread.interrupt 605 begin(); 606 } 607 synchronized (stateLock) { 608 ensureOpen(); 609 if (state != ST_CONNECTIONPENDING) 610 throw new NoConnectionPendingException(); 611 if (blocking) { 612 // record thread so it can be signalled if needed 613 readerThread = NativeThread.current(); 614 } 615 } 616 } 617 618 private void endFinishConnect(boolean blocking, boolean completed) 619 throws IOException 620 { 621 endRead(blocking, completed); 622 623 if (completed) { 624 synchronized (stateLock) { 625 if (state == ST_CONNECTIONPENDING) { 626 localAddress = RdmaNet.localAddress(fd); 627 state = ST_CONNECTED; 628 } 629 } 630 } 631 } 632 633 @Override 634 public boolean finishConnect() throws IOException { 635 try { 636 readLock.lock(); 637 try { 638 writeLock.lock(); 639 try { 640 // no-op if already connected 641 if (isConnected()) 642 return true; 643 644 boolean blocking = isBlocking(); 645 boolean connected = false; 646 try { 647 beginFinishConnect(blocking); 648 int n = 0; 649 if (blocking) { 650 do { 651 n = checkConnect(fd, true); 652 } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); 653 } else { 654 n = checkConnect(fd, false); 655 } 656 connected = (n > 0); 657 } finally { 658 endFinishConnect(blocking, connected); 659 } 660 assert (blocking && connected) ^ !blocking; 661 return connected; 662 } finally { 663 writeLock.unlock(); 664 } 665 } finally { 666 readLock.unlock(); 667 } 668 } catch (IOException ioe) { 669 // connect failed, close the channel 670 close(); 671 throw ioe; 672 } 673 } 674 675 @Override 676 protected void implCloseSelectableChannel() throws IOException { 677 assert !isOpen(); 678 679 boolean blocking; 680 boolean connected; 681 boolean interrupted = false; 682 683 // set state to ST_CLOSING 684 synchronized (stateLock) { 685 assert state < ST_CLOSING; 686 blocking = isBlocking(); 687 connected = (state == ST_CONNECTED); 688 state = ST_CLOSING; 689 } 690 691 // wait for any outstanding I/O operations to complete 692 if (blocking) { 693 synchronized (stateLock) { 694 assert state == ST_CLOSING; 695 long reader = readerThread; 696 long writer = writerThread; 697 if (reader != 0 || writer != 0) { 698 nd.preClose(fd); 699 connected = false; // fd is no longer connected socket 700 701 if (reader != 0) 702 NativeThread.signal(reader); 703 if (writer != 0) 704 NativeThread.signal(writer); 705 706 // wait for blocking I/O operations to end 707 while (readerThread != 0 || writerThread != 0) { 708 try { 709 stateLock.wait(); 710 } catch (InterruptedException e) { 711 interrupted = true; 712 } 713 } 714 } 715 } 716 } else { 717 // non-blocking mode: wait for read/write to complete 718 readLock.lock(); 719 try { 720 writeLock.lock(); 721 writeLock.unlock(); 722 } finally { 723 readLock.unlock(); 724 } 725 } 726 727 // set state to ST_KILLPENDING 728 synchronized (stateLock) { 729 assert state == ST_CLOSING; 730 // if connected and the channel is registered with a Selector then 731 // shutdown the output if possible so that the peer reads EOF. If 732 // SO_LINGER is enabled and set to a non-zero value then it needs to 733 // be disabled so that the Selector does not wait when it closes 734 // the socket. 735 if (connected && isRegistered()) { 736 try { 737 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER; 738 int interval = (int) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, opt); 739 if (interval != 0) { 740 if (interval > 0) { 741 // disable SO_LINGER 742 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, opt, -1); 743 } 744 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); 745 } 746 } catch (IOException ignore) { } 747 } 748 state = ST_KILLPENDING; 749 } 750 751 // close socket if not registered with Selector 752 if (!isRegistered()) 753 kill(); 754 755 // restore interrupt status 756 if (interrupted) 757 Thread.currentThread().interrupt(); 758 } 759 760 @Override 761 public void kill() throws IOException { 762 synchronized (stateLock) { 763 if (state == ST_KILLPENDING) { 764 state = ST_KILLED; 765 nd.close(fd); 766 } 767 } 768 } 769 770 @Override 771 public SocketChannel shutdownInput() throws IOException { 772 synchronized (stateLock) { 773 ensureOpen(); 774 if (!isConnected()) 775 throw new NotYetConnectedException(); 776 if (!isInputClosed) { 777 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD); 778 long thread = readerThread; 779 if (thread != 0) 780 NativeThread.signal(thread); 781 isInputClosed = true; 782 } 783 return this; 784 } 785 } 786 787 @Override 788 public SocketChannel shutdownOutput() throws IOException { 789 synchronized (stateLock) { 790 ensureOpen(); 791 if (!isConnected()) 792 throw new NotYetConnectedException(); 793 if (!isOutputClosed) { 794 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); 795 long thread = writerThread; 796 if (thread != 0) 797 NativeThread.signal(thread); 798 isOutputClosed = true; 799 } 800 return this; 801 } 802 } 803 804 boolean isInputOpen() { 805 return !isInputClosed; 806 } 807 808 boolean isOutputOpen() { 809 return !isOutputClosed; 810 } 811 812 /** 813 * Poll this channel's socket for reading up to the given timeout. 814 * @return {@code true} if the socket is polled 815 */ 816 boolean pollRead(long timeout) throws IOException { 817 boolean blocking = isBlocking(); 818 assert Thread.holdsLock(blockingLock()) && blocking; 819 820 readLock.lock(); 821 try { 822 boolean polled = false; 823 try { 824 beginRead(blocking); 825 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout); 826 polled = (events != 0); 827 } finally { 828 endRead(blocking, polled); 829 } 830 return polled; 831 } finally { 832 readLock.unlock(); 833 } 834 } 835 836 /** 837 * Poll this channel's socket for a connection, up to the given timeout. 838 * @return {@code true} if the socket is polled 839 */ 840 boolean pollConnected(long timeout) throws IOException { 841 boolean blocking = isBlocking(); 842 assert Thread.holdsLock(blockingLock()) && blocking; 843 844 readLock.lock(); 845 try { 846 writeLock.lock(); 847 try { 848 boolean polled = false; 849 try { 850 beginFinishConnect(blocking); 851 int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout); 852 polled = (events != 0); 853 } finally { 854 // invoke endFinishConnect with completed = false so that 855 // the state is not changed to ST_CONNECTED. The socket 856 // adaptor will use finishConnect to finish. 857 endFinishConnect(blocking, /*completed*/false); 858 } 859 return polled; 860 } finally { 861 writeLock.unlock(); 862 } 863 } finally { 864 readLock.unlock(); 865 } 866 } 867 868 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 869 int intOps = ski.nioInterestOps(); 870 int oldOps = ski.nioReadyOps(); 871 int newOps = initialOps; 872 873 if ((ops & Net.POLLNVAL) != 0) { 874 return false; 875 } 876 877 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 878 newOps = intOps; 879 ski.nioReadyOps(newOps); 880 return (newOps & ~oldOps) != 0; 881 } 882 883 boolean connected = isConnected(); 884 if (((ops & Net.POLLIN) != 0) && 885 ((intOps & SelectionKey.OP_READ) != 0) && connected) 886 newOps |= SelectionKey.OP_READ; 887 888 if (((ops & Net.POLLCONN) != 0) && 889 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 890 newOps |= SelectionKey.OP_CONNECT; 891 892 if (((ops & Net.POLLOUT) != 0) && 893 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 894 newOps |= SelectionKey.OP_WRITE; 895 896 ski.nioReadyOps(newOps); 897 return (newOps & ~oldOps) != 0; 898 } 899 900 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 901 return translateReadyOps(ops, ski.nioReadyOps(), ski); 902 } 903 904 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 905 return translateReadyOps(ops, 0, ski); 906 } 907 908 public int translateInterestOps(int ops) { 909 int newOps = 0; 910 if ((ops & SelectionKey.OP_READ) != 0) 911 newOps |= Net.POLLIN; 912 if ((ops & SelectionKey.OP_WRITE) != 0) 913 newOps |= Net.POLLOUT; 914 if ((ops & SelectionKey.OP_CONNECT) != 0) 915 newOps |= Net.POLLCONN; 916 return newOps; 917 } 918 919 public FileDescriptor getFD() { 920 return fd; 921 } 922 923 public int getFDVal() { 924 return fdVal; 925 } 926 927 @Override 928 public String toString() { 929 StringBuilder sb = new StringBuilder(); 930 sb.append(this.getClass().getSuperclass().getName()); 931 sb.append('['); 932 if (!isOpen()) 933 sb.append("closed"); 934 else { 935 synchronized (stateLock) { 936 switch (state) { 937 case ST_UNCONNECTED: 938 sb.append("unconnected"); 939 break; 940 case ST_CONNECTIONPENDING: 941 sb.append("connection-pending"); 942 break; 943 case ST_CONNECTED: 944 sb.append("connected"); 945 if (isInputClosed) 946 sb.append(" ishut"); 947 if (isOutputClosed) 948 sb.append(" oshut"); 949 break; 950 } 951 InetSocketAddress addr = localAddress(); 952 if (addr != null) { 953 sb.append(" local="); 954 sb.append(RdmaNet.getRevealedLocalAddressAsString(addr)); 955 } 956 if (remoteAddress() != null) { 957 sb.append(" remote="); 958 sb.append(remoteAddress().toString()); 959 } 960 } 961 } 962 sb.append(']'); 963 return sb.toString(); 964 } 965 966 // -- Native methods -- 967 968 private static native void initIDs(); 969 970 private static native int checkConnect(FileDescriptor fd, boolean block) 971 throws IOException; 972 973 private static native int sendOutOfBandData(FileDescriptor fd, byte data) 974 throws IOException; 975 976 static { 977 IOUtil.load(); 978 initIDs(); 979 nd = new RdmaSocketDispatcher(); 980 } 981 982 }