1 /* 2 * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package rdma.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.Socket; 33 import java.net.SocketAddress; 34 import java.net.SocketOption; 35 import java.net.StandardSocketOptions; 36 import java.nio.ByteBuffer; 37 import java.nio.channels.AlreadyBoundException; 38 import java.nio.channels.AlreadyConnectedException; 39 import java.nio.channels.AsynchronousCloseException; 40 import java.nio.channels.ClosedChannelException; 41 import java.nio.channels.ConnectionPendingException; 42 import java.nio.channels.NoConnectionPendingException; 43 import java.nio.channels.NotYetConnectedException; 44 import java.nio.channels.SelectionKey; 45 import java.nio.channels.SocketChannel; 46 import java.nio.channels.spi.SelectorProvider; 47 import java.util.Collections; 48 import java.util.HashSet; 49 import java.util.Objects; 50 import java.util.Set; 51 import java.util.concurrent.locks.ReentrantLock; 52 import sun.net.ext.RdmaSocketOptions; 53 import sun.nio.ch.ExtendedSocketOption; 54 import sun.nio.ch.IOStatus; 55 import sun.nio.ch.IOUtil; 56 import sun.nio.ch.Net; 57 import sun.nio.ch.NativeThread; 58 import sun.nio.ch.SelChImpl; 59 import sun.nio.ch.SelectionKeyImpl; 60 61 public class RdmaSocketChannelImpl 62 extends SocketChannel 63 implements SelChImpl 64 { 65 private static RdmaSocketDispatcher nd; 66 private final FileDescriptor fd; 67 private final int fdVal; 68 69 private final ReentrantLock readLock = new ReentrantLock(); 70 private final ReentrantLock writeLock = new ReentrantLock(); 71 72 private final Object stateLock = new Object(); 73 74 private volatile boolean isInputClosed; 75 private volatile boolean isOutputClosed; 76 77 private boolean isReuseAddress; 78 79 private static final int ST_UNCONNECTED = 0; 80 private static final int ST_CONNECTIONPENDING = 1; 81 private static final int ST_CONNECTED = 2; 82 private static final int ST_CLOSING = 3; 83 private static final int ST_KILLPENDING = 4; 84 private static final int ST_KILLED = 5; 85 private volatile int state; // need stateLock to change 86 87 private long readerThread; 88 private long writerThread; 89 90 private InetSocketAddress localAddress; 91 private InetSocketAddress remoteAddress; 92 93 private Socket socket; 94 95 protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException { 96 super(sp); 97 this.fd = RdmaNet.socket(true); 98 this.fdVal = IOUtil.fdVal(fd); 99 } 100 101 protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 102 throws IOException 103 { 104 super(sp); 105 this.fd = fd; 106 this.fdVal = IOUtil.fdVal(fd); 107 if (bound) { 108 synchronized (stateLock) { 109 this.localAddress = RdmaNet.localAddress(fd); 110 } 111 } 112 } 113 114 RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) 115 throws IOException 116 { 117 super(sp); 118 this.fd = fd; 119 this.fdVal = IOUtil.fdVal(fd); 120 synchronized (stateLock) { 121 this.localAddress = RdmaNet.localAddress(fd); 122 this.remoteAddress = isa; 123 this.state = ST_CONNECTED; 124 } 125 } 126 127 private void ensureOpen() throws ClosedChannelException { 128 if (!isOpen()) 129 throw new ClosedChannelException(); 130 } 131 132 private void ensureOpenAndConnected() throws ClosedChannelException { 133 int state = this.state; 134 if (state < ST_CONNECTED) { 135 throw new NotYetConnectedException(); 136 } else if (state > ST_CONNECTED) { 137 throw new ClosedChannelException(); 138 } 139 } 140 141 @Override 142 public Socket socket() { 143 synchronized (stateLock) { 144 if (socket == null) 145 socket = RdmaSocketAdaptor.create(this); 146 return socket; 147 } 148 } 149 150 @Override 151 public SocketAddress getLocalAddress() throws IOException { 152 synchronized (stateLock) { 153 ensureOpen(); 154 return RdmaNet.getRevealedLocalAddress(localAddress); 155 } 156 } 157 158 @Override 159 public SocketAddress getRemoteAddress() throws IOException { 160 synchronized (stateLock) { 161 ensureOpen(); 162 return remoteAddress; 163 } 164 } 165 166 @Override 167 public <T> SocketChannel setOption(SocketOption<T> name, T value) 168 throws IOException 169 { 170 Objects.requireNonNull(name); 171 if (!supportedOptions().contains(name)) 172 throw new UnsupportedOperationException("'" + name + "' not supported"); 173 174 synchronized (stateLock) { 175 ensureOpen(); 176 177 if (name == StandardSocketOptions.SO_REUSEADDR && 178 RdmaNet.useExclusiveBind()) { 179 isReuseAddress = (Boolean)value; 180 return this; 181 } 182 183 if (isConnected() && (name == StandardSocketOptions.SO_REUSEADDR || 184 name == StandardSocketOptions.SO_SNDBUF || 185 name == StandardSocketOptions.SO_RCVBUF)) 186 throw new UnsupportedOperationException( 187 "RDMA socket channel cannot set the socket option " 188 + name.toString() + " after connect."); 189 190 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 191 return this; 192 } 193 } 194 195 @Override 196 @SuppressWarnings("unchecked") 197 public <T> T getOption(SocketOption<T> name) 198 throws IOException 199 { 200 Objects.requireNonNull(name); 201 if (!supportedOptions().contains(name)) 202 throw new UnsupportedOperationException("'" + name + "' not supported"); 203 204 synchronized (stateLock) { 205 ensureOpen(); 206 207 if (name == StandardSocketOptions.SO_REUSEADDR 208 && RdmaNet.useExclusiveBind()) { 209 return (T)Boolean.valueOf(isReuseAddress); 210 } 211 212 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 213 } 214 } 215 216 private static class DefaultOptionsHolder { 217 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 218 219 private static Set<SocketOption<?>> defaultOptions() { 220 HashSet<SocketOption<?>> set = new HashSet<>(); 221 set.add(StandardSocketOptions.SO_SNDBUF); 222 set.add(StandardSocketOptions.SO_RCVBUF); 223 set.add(StandardSocketOptions.SO_REUSEADDR); 224 set.add(StandardSocketOptions.SO_LINGER); 225 set.add(StandardSocketOptions.TCP_NODELAY); 226 RdmaSocketOptions rdmaOptions = 227 RdmaSocketOptions.getInstance(); 228 set.addAll(rdmaOptions.options()); 229 return Collections.unmodifiableSet(set); 230 } 231 } 232 233 public Set<SocketOption<?>> supportedOptions() { 234 return DefaultOptionsHolder.defaultOptions; 235 } 236 237 private void beginRead(boolean blocking) throws ClosedChannelException { 238 if (blocking) { 239 // set hook for Thread.interrupt 240 begin(); 241 242 synchronized (stateLock) { 243 ensureOpenAndConnected(); 244 // record thread so it can be signalled if needed 245 readerThread = NativeThread.current(); 246 } 247 } else { 248 ensureOpenAndConnected(); 249 } 250 } 251 private void endRead(boolean blocking, boolean completed) 252 throws AsynchronousCloseException 253 { 254 if (blocking) { 255 synchronized (stateLock) { 256 readerThread = 0; 257 // notify any thread waiting in implCloseSelectableChannel 258 if (state == ST_CLOSING) { 259 stateLock.notifyAll(); 260 } 261 } 262 // remove hook for Thread.interrupt 263 end(completed); 264 } 265 } 266 267 @Override 268 public int read(ByteBuffer buf) throws IOException { 269 Objects.requireNonNull(buf); 270 271 readLock.lock(); 272 try { 273 boolean blocking = isBlocking(); 274 int n = 0; 275 try { 276 beginRead(blocking); 277 278 // check if input is shutdown 279 if (isInputClosed) 280 return IOStatus.EOF; 281 282 if (blocking) { 283 do { 284 n = IOUtil.read(fd, buf, -1, nd); 285 } while (n == IOStatus.INTERRUPTED && isOpen()); 286 } else { 287 n = IOUtil.read(fd, buf, -1, nd); 288 } 289 } finally { 290 endRead(blocking, n > 0); 291 if (n <= 0 && isInputClosed) 292 return IOStatus.EOF; 293 } 294 return IOStatus.normalize(n); 295 } finally { 296 readLock.unlock(); 297 } 298 } 299 300 @Override 301 public long read(ByteBuffer[] dsts, int offset, int length) 302 throws IOException 303 { 304 Objects.checkFromIndexSize(offset, length, dsts.length); 305 306 readLock.lock(); 307 try { 308 boolean blocking = isBlocking(); 309 long n = 0; 310 try { 311 beginRead(blocking); 312 313 // check if input is shutdown 314 if (isInputClosed) 315 return IOStatus.EOF; 316 317 if (blocking) { 318 do { 319 n = IOUtil.read(fd, dsts, offset, length, nd); 320 } while (n == IOStatus.INTERRUPTED && isOpen()); 321 } else { 322 n = IOUtil.read(fd, dsts, offset, length, nd); 323 } 324 } finally { 325 endRead(blocking, n > 0); 326 if (n <= 0 && isInputClosed) 327 return IOStatus.EOF; 328 } 329 return IOStatus.normalize(n); 330 } finally { 331 readLock.unlock(); 332 } 333 } 334 335 private void beginWrite(boolean blocking) throws ClosedChannelException { 336 if (blocking) { 337 // set hook for Thread.interrupt 338 begin(); 339 340 synchronized (stateLock) { 341 ensureOpenAndConnected(); 342 if (isOutputClosed) 343 throw new ClosedChannelException(); 344 // record thread so it can be signalled if needed 345 writerThread = NativeThread.current(); 346 } 347 } else { 348 ensureOpenAndConnected(); 349 } 350 } 351 352 private void endWrite(boolean blocking, boolean completed) 353 throws AsynchronousCloseException 354 { 355 if (blocking) { 356 synchronized (stateLock) { 357 writerThread = 0; 358 // notify any thread waiting in implCloseSelectableChannel 359 if (state == ST_CLOSING) { 360 stateLock.notifyAll(); 361 } 362 } 363 // remove hook for Thread.interrupt 364 end(completed); 365 } 366 } 367 368 @Override 369 public int write(ByteBuffer buf) throws IOException { 370 Objects.requireNonNull(buf); 371 372 writeLock.lock(); 373 try { 374 boolean blocking = isBlocking(); 375 int n = 0; 376 try { 377 beginWrite(blocking); 378 if (blocking) { 379 do { 380 n = IOUtil.write(fd, buf, -1, nd); 381 } while (n == IOStatus.INTERRUPTED && isOpen()); 382 } else { 383 n = IOUtil.write(fd, buf, -1, nd); 384 } 385 } finally { 386 endWrite(blocking, n > 0); 387 if (n <= 0 && isOutputClosed) 388 throw new AsynchronousCloseException(); 389 } 390 return IOStatus.normalize(n); 391 } finally { 392 writeLock.unlock(); 393 } 394 } 395 396 @Override 397 public long write(ByteBuffer[] srcs, int offset, int length) 398 throws IOException 399 { 400 Objects.checkFromIndexSize(offset, length, srcs.length); 401 402 writeLock.lock(); 403 try { 404 boolean blocking = isBlocking(); 405 long n = 0; 406 try { 407 beginWrite(blocking); 408 if (blocking) { 409 do { 410 n = IOUtil.write(fd, srcs, offset, length, nd); 411 } while (n == IOStatus.INTERRUPTED && isOpen()); 412 } else { 413 n = IOUtil.write(fd, srcs, offset, length, nd); 414 } 415 } finally { 416 endWrite(blocking, n > 0); 417 if (n <= 0 && isOutputClosed) 418 throw new AsynchronousCloseException(); 419 } 420 return IOStatus.normalize(n); 421 } finally { 422 writeLock.unlock(); 423 } 424 } 425 426 int sendOutOfBandData(byte b) throws IOException { 427 writeLock.lock(); 428 try { 429 boolean blocking = isBlocking(); 430 int n = 0; 431 try { 432 beginWrite(blocking); 433 if (blocking) { 434 do { 435 n = sendOutOfBandData(fd, b); 436 } while (n == IOStatus.INTERRUPTED && isOpen()); 437 } else { 438 n = sendOutOfBandData(fd, b); 439 } 440 } finally { 441 endWrite(blocking, n > 0); 442 if (n <= 0 && isOutputClosed) 443 throw new AsynchronousCloseException(); 444 } 445 return IOStatus.normalize(n); 446 } finally { 447 writeLock.unlock(); 448 } 449 } 450 451 @Override 452 protected void implConfigureBlocking(boolean block) throws IOException { 453 readLock.lock(); 454 try { 455 writeLock.lock(); 456 try { 457 synchronized (stateLock) { 458 ensureOpen(); 459 RdmaNet.configureBlocking(fd, block); 460 } 461 } finally { 462 writeLock.unlock(); 463 } 464 } finally { 465 readLock.unlock(); 466 } 467 } 468 469 InetSocketAddress localAddress() { 470 synchronized (stateLock) { 471 return localAddress; 472 } 473 } 474 475 InetSocketAddress remoteAddress() { 476 synchronized (stateLock) { 477 return remoteAddress; 478 } 479 } 480 481 @Override 482 public SocketChannel bind(SocketAddress local) throws IOException { 483 readLock.lock(); 484 try { 485 writeLock.lock(); 486 try { 487 synchronized (stateLock) { 488 ensureOpen(); 489 if (state == ST_CONNECTIONPENDING) 490 throw new ConnectionPendingException(); 491 if (localAddress != null) 492 throw new AlreadyBoundException(); 493 InetSocketAddress isa = (local == null) ? 494 new InetSocketAddress(0) : RdmaNet.checkAddress(local); 495 SecurityManager sm = System.getSecurityManager(); 496 if (sm != null) { 497 sm.checkListen(isa.getPort()); 498 } 499 RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); 500 localAddress = RdmaNet.localAddress(fd); 501 } 502 } finally { 503 writeLock.unlock(); 504 } 505 } finally { 506 readLock.unlock(); 507 } 508 return this; 509 } 510 511 @Override 512 public boolean isConnected() { 513 return (state == ST_CONNECTED); 514 } 515 516 @Override 517 public boolean isConnectionPending() { 518 return (state == ST_CONNECTIONPENDING); 519 } 520 521 private void beginConnect(boolean blocking, InetSocketAddress isa) 522 throws IOException 523 { 524 if (blocking) { 525 // set hook for Thread.interrupt 526 begin(); 527 } 528 synchronized (stateLock) { 529 ensureOpen(); 530 int state = this.state; 531 if (state == ST_CONNECTED) 532 throw new AlreadyConnectedException(); 533 if (state == ST_CONNECTIONPENDING) 534 throw new ConnectionPendingException(); 535 assert state == ST_UNCONNECTED; 536 this.state = ST_CONNECTIONPENDING; 537 538 remoteAddress = isa; 539 540 if (blocking) { 541 // record thread so it can be signalled if needed 542 readerThread = NativeThread.current(); 543 } 544 } 545 } 546 547 private void endConnect(boolean blocking, boolean completed) 548 throws IOException 549 { 550 endRead(blocking, completed); 551 552 if (completed) { 553 synchronized (stateLock) { 554 if (state == ST_CONNECTIONPENDING) { 555 localAddress = RdmaNet.localAddress(fd); 556 state = ST_CONNECTED; 557 } 558 } 559 } 560 } 561 562 @Override 563 public boolean connect(SocketAddress sa) throws IOException { 564 InetSocketAddress isa = RdmaNet.checkAddress(sa); 565 SecurityManager sm = System.getSecurityManager(); 566 if (sm != null) 567 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 568 569 InetAddress ia = isa.getAddress(); 570 if (ia.isAnyLocalAddress()) 571 ia = InetAddress.getLocalHost(); 572 573 try { 574 readLock.lock(); 575 try { 576 writeLock.lock(); 577 try { 578 int n = 0; 579 boolean blocking = isBlocking(); 580 try { 581 beginConnect(blocking, isa); 582 do { 583 n = RdmaNet.connect(fd, ia, isa.getPort()); 584 } while (n == IOStatus.INTERRUPTED && isOpen()); 585 } finally { 586 endConnect(blocking, (n > 0)); 587 } 588 assert IOStatus.check(n); 589 return n > 0; 590 } finally { 591 writeLock.unlock(); 592 } 593 } finally { 594 readLock.unlock(); 595 } 596 } catch (IOException ioe) { 597 // connect failed, close the channel 598 close(); 599 throw ioe; 600 } 601 } 602 603 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { 604 if (blocking) { 605 // set hook for Thread.interrupt 606 begin(); 607 } 608 synchronized (stateLock) { 609 ensureOpen(); 610 if (state != ST_CONNECTIONPENDING) 611 throw new NoConnectionPendingException(); 612 if (blocking) { 613 // record thread so it can be signalled if needed 614 readerThread = NativeThread.current(); 615 } 616 } 617 } 618 619 private void endFinishConnect(boolean blocking, boolean completed) 620 throws IOException 621 { 622 endRead(blocking, completed); 623 624 if (completed) { 625 synchronized (stateLock) { 626 if (state == ST_CONNECTIONPENDING) { 627 localAddress = RdmaNet.localAddress(fd); 628 state = ST_CONNECTED; 629 } 630 } 631 } 632 } 633 634 @Override 635 public boolean finishConnect() throws IOException { 636 try { 637 readLock.lock(); 638 try { 639 writeLock.lock(); 640 try { 641 // no-op if already connected 642 if (isConnected()) 643 return true; 644 645 boolean blocking = isBlocking(); 646 boolean connected = false; 647 try { 648 beginFinishConnect(blocking); 649 int n = 0; 650 if (blocking) { 651 do { 652 n = checkConnect(fd, true); 653 } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); 654 } else { 655 n = checkConnect(fd, false); 656 } 657 connected = (n > 0); 658 } finally { 659 endFinishConnect(blocking, connected); 660 } 661 assert (blocking && connected) ^ !blocking; 662 return connected; 663 } finally { 664 writeLock.unlock(); 665 } 666 } finally { 667 readLock.unlock(); 668 } 669 } catch (IOException ioe) { 670 // connect failed, close the channel 671 close(); 672 throw ioe; 673 } 674 } 675 676 @Override 677 protected void implCloseSelectableChannel() throws IOException { 678 assert !isOpen(); 679 680 boolean blocking; 681 boolean connected; 682 boolean interrupted = false; 683 684 // set state to ST_CLOSING 685 synchronized (stateLock) { 686 assert state < ST_CLOSING; 687 blocking = isBlocking(); 688 connected = (state == ST_CONNECTED); 689 state = ST_CLOSING; 690 } 691 692 // wait for any outstanding I/O operations to complete 693 if (blocking) { 694 synchronized (stateLock) { 695 assert state == ST_CLOSING; 696 long reader = readerThread; 697 long writer = writerThread; 698 if (reader != 0 || writer != 0) { 699 nd.preClose(fd); 700 connected = false; // fd is no longer connected socket 701 702 if (reader != 0) 703 NativeThread.signal(reader); 704 if (writer != 0) 705 NativeThread.signal(writer); 706 707 // wait for blocking I/O operations to end 708 while (readerThread != 0 || writerThread != 0) { 709 try { 710 stateLock.wait(); 711 } catch (InterruptedException e) { 712 interrupted = true; 713 } 714 } 715 } 716 } 717 } else { 718 // non-blocking mode: wait for read/write to complete 719 readLock.lock(); 720 try { 721 writeLock.lock(); 722 writeLock.unlock(); 723 } finally { 724 readLock.unlock(); 725 } 726 } 727 728 // set state to ST_KILLPENDING 729 synchronized (stateLock) { 730 assert state == ST_CLOSING; 731 // if connected and the channel is registered with a Selector then 732 // shutdown the output if possible so that the peer reads EOF. If 733 // SO_LINGER is enabled and set to a non-zero value then it needs to 734 // be disabled so that the Selector does not wait when it closes 735 // the socket. 736 if (connected && isRegistered()) { 737 try { 738 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER; 739 int interval = (int) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, opt); 740 if (interval != 0) { 741 if (interval > 0) { 742 // disable SO_LINGER 743 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, opt, -1); 744 } 745 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); 746 } 747 } catch (IOException ignore) { } 748 } 749 state = ST_KILLPENDING; 750 } 751 752 // close socket if not registered with Selector 753 if (!isRegistered()) 754 kill(); 755 756 // restore interrupt status 757 if (interrupted) 758 Thread.currentThread().interrupt(); 759 } 760 761 @Override 762 public void kill() throws IOException { 763 synchronized (stateLock) { 764 if (state == ST_KILLPENDING) { 765 state = ST_KILLED; 766 nd.close(fd); 767 } 768 } 769 } 770 771 @Override 772 public SocketChannel shutdownInput() throws IOException { 773 synchronized (stateLock) { 774 ensureOpen(); 775 if (!isConnected()) 776 throw new NotYetConnectedException(); 777 if (!isInputClosed) { 778 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD); 779 long thread = readerThread; 780 if (thread != 0) 781 NativeThread.signal(thread); 782 isInputClosed = true; 783 } 784 return this; 785 } 786 } 787 788 @Override 789 public SocketChannel shutdownOutput() throws IOException { 790 synchronized (stateLock) { 791 ensureOpen(); 792 if (!isConnected()) 793 throw new NotYetConnectedException(); 794 if (!isOutputClosed) { 795 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); 796 long thread = writerThread; 797 if (thread != 0) 798 NativeThread.signal(thread); 799 isOutputClosed = true; 800 } 801 return this; 802 } 803 } 804 805 boolean isInputOpen() { 806 return !isInputClosed; 807 } 808 809 boolean isOutputOpen() { 810 return !isOutputClosed; 811 } 812 813 /** 814 * Poll this channel's socket for reading up to the given timeout. 815 * @return {@code true} if the socket is polled 816 */ 817 boolean pollRead(long timeout) throws IOException { 818 boolean blocking = isBlocking(); 819 assert Thread.holdsLock(blockingLock()) && blocking; 820 821 readLock.lock(); 822 try { 823 boolean polled = false; 824 try { 825 beginRead(blocking); 826 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout); 827 polled = (events != 0); 828 } finally { 829 endRead(blocking, polled); 830 } 831 return polled; 832 } finally { 833 readLock.unlock(); 834 } 835 } 836 837 /** 838 * Poll this channel's socket for a connection, up to the given timeout. 839 * @return {@code true} if the socket is polled 840 */ 841 boolean pollConnected(long timeout) throws IOException { 842 boolean blocking = isBlocking(); 843 assert Thread.holdsLock(blockingLock()) && blocking; 844 845 readLock.lock(); 846 try { 847 writeLock.lock(); 848 try { 849 boolean polled = false; 850 try { 851 beginFinishConnect(blocking); 852 int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout); 853 polled = (events != 0); 854 } finally { 855 // invoke endFinishConnect with completed = false so that 856 // the state is not changed to ST_CONNECTED. The socket 857 // adaptor will use finishConnect to finish. 858 endFinishConnect(blocking, /*completed*/false); 859 } 860 return polled; 861 } finally { 862 writeLock.unlock(); 863 } 864 } finally { 865 readLock.unlock(); 866 } 867 } 868 869 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 870 int intOps = ski.nioInterestOps(); 871 int oldOps = ski.nioReadyOps(); 872 int newOps = initialOps; 873 874 if ((ops & Net.POLLNVAL) != 0) { 875 return false; 876 } 877 878 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 879 newOps = intOps; 880 ski.nioReadyOps(newOps); 881 return (newOps & ~oldOps) != 0; 882 } 883 884 boolean connected = isConnected(); 885 if (((ops & Net.POLLIN) != 0) && 886 ((intOps & SelectionKey.OP_READ) != 0) && connected) 887 newOps |= SelectionKey.OP_READ; 888 889 if (((ops & Net.POLLCONN) != 0) && 890 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 891 newOps |= SelectionKey.OP_CONNECT; 892 893 if (((ops & Net.POLLOUT) != 0) && 894 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 895 newOps |= SelectionKey.OP_WRITE; 896 897 ski.nioReadyOps(newOps); 898 return (newOps & ~oldOps) != 0; 899 } 900 901 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 902 return translateReadyOps(ops, ski.nioReadyOps(), ski); 903 } 904 905 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 906 return translateReadyOps(ops, 0, ski); 907 } 908 909 public int translateInterestOps(int ops) { 910 int newOps = 0; 911 if ((ops & SelectionKey.OP_READ) != 0) 912 newOps |= Net.POLLIN; 913 if ((ops & SelectionKey.OP_WRITE) != 0) 914 newOps |= Net.POLLOUT; 915 if ((ops & SelectionKey.OP_CONNECT) != 0) 916 newOps |= Net.POLLCONN; 917 return newOps; 918 } 919 920 public FileDescriptor getFD() { 921 return fd; 922 } 923 924 public int getFDVal() { 925 return fdVal; 926 } 927 928 @Override 929 public String toString() { 930 StringBuilder sb = new StringBuilder(); 931 sb.append(this.getClass().getSuperclass().getName()); 932 sb.append('['); 933 if (!isOpen()) 934 sb.append("closed"); 935 else { 936 synchronized (stateLock) { 937 switch (state) { 938 case ST_UNCONNECTED: 939 sb.append("unconnected"); 940 break; 941 case ST_CONNECTIONPENDING: 942 sb.append("connection-pending"); 943 break; 944 case ST_CONNECTED: 945 sb.append("connected"); 946 if (isInputClosed) 947 sb.append(" ishut"); 948 if (isOutputClosed) 949 sb.append(" oshut"); 950 break; 951 } 952 InetSocketAddress addr = localAddress(); 953 if (addr != null) { 954 sb.append(" local="); 955 sb.append(RdmaNet.getRevealedLocalAddressAsString(addr)); 956 } 957 if (remoteAddress() != null) { 958 sb.append(" remote="); 959 sb.append(remoteAddress().toString()); 960 } 961 } 962 } 963 sb.append(']'); 964 return sb.toString(); 965 } 966 967 // -- Native methods -- 968 969 private static native void initIDs(); 970 971 private static native int checkConnect(FileDescriptor fd, boolean block) 972 throws IOException; 973 974 private static native int sendOutOfBandData(FileDescriptor fd, byte data) 975 throws IOException; 976 977 static { 978 IOUtil.load(); 979 initIDs(); 980 nd = new RdmaSocketDispatcher(); 981 } 982 983 }