1 /* 2 * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package rdma.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.Socket; 33 import java.net.SocketAddress; 34 import java.net.SocketOption; 35 import java.net.StandardSocketOptions; 36 import java.nio.ByteBuffer; 37 import java.nio.channels.AlreadyBoundException; 38 import java.nio.channels.AlreadyConnectedException; 39 import java.nio.channels.AsynchronousCloseException; 40 import java.nio.channels.ClosedChannelException; 41 import java.nio.channels.ConnectionPendingException; 42 import java.nio.channels.NoConnectionPendingException; 43 import java.nio.channels.NotYetConnectedException; 44 import java.nio.channels.SelectionKey; 45 import java.nio.channels.SocketChannel; 46 import java.nio.channels.spi.SelectorProvider; 47 import java.util.Collections; 48 import java.util.HashSet; 49 import java.util.Objects; 50 import java.util.Set; 51 import java.util.concurrent.locks.ReentrantLock; 52 import sun.net.ext.RdmaSocketOptions; 53 import sun.nio.ch.IOStatus; 54 import sun.nio.ch.IOUtil; 55 import sun.nio.ch.Net; 56 import sun.nio.ch.NativeThread; 57 import sun.nio.ch.SelChImpl; 58 import sun.nio.ch.SelectionKeyImpl; 59 60 public class RdmaSocketChannelImpl 61 extends SocketChannel 62 implements SelChImpl 63 { 64 private static RdmaSocketDispatcher nd; 65 private final FileDescriptor fd; 66 private final int fdVal; 67 68 private final ReentrantLock readLock = new ReentrantLock(); 69 private final ReentrantLock writeLock = new ReentrantLock(); 70 71 private final Object stateLock = new Object(); 72 73 private volatile boolean isInputClosed; 74 private volatile boolean isOutputClosed; 75 76 private boolean isReuseAddress; 77 78 private static final int ST_UNCONNECTED = 0; 79 private static final int ST_CONNECTIONPENDING = 1; 80 private static final int ST_CONNECTED = 2; 81 private static final int ST_CLOSING = 3; 82 private static final int ST_KILLPENDING = 4; 83 private static final int ST_KILLED = 5; 84 private volatile int state; // need stateLock to change 85 86 private long readerThread; 87 private long writerThread; 88 89 private InetSocketAddress localAddress; 90 private InetSocketAddress remoteAddress; 91 92 private Socket socket; 93 94 protected RdmaSocketChannelImpl(SelectorProvider sp) throws IOException { 95 super(sp); 96 this.fd = RdmaNet.socket(true); 97 this.fdVal = IOUtil.fdVal(fd); 98 } 99 100 protected RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 101 throws IOException 102 { 103 super(sp); 104 this.fd = fd; 105 this.fdVal = IOUtil.fdVal(fd); 106 if (bound) { 107 synchronized (stateLock) { 108 this.localAddress = RdmaNet.localAddress(fd); 109 } 110 } 111 } 112 113 RdmaSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) 114 throws IOException 115 { 116 super(sp); 117 this.fd = fd; 118 this.fdVal = IOUtil.fdVal(fd); 119 synchronized (stateLock) { 120 this.localAddress = RdmaNet.localAddress(fd); 121 this.remoteAddress = isa; 122 this.state = ST_CONNECTED; 123 } 124 } 125 126 private void ensureOpen() throws ClosedChannelException { 127 if (!isOpen()) 128 throw new ClosedChannelException(); 129 } 130 131 private void ensureOpenAndConnected() throws ClosedChannelException { 132 int state = this.state; 133 if (state < ST_CONNECTED) { 134 throw new NotYetConnectedException(); 135 } else if (state > ST_CONNECTED) { 136 throw new ClosedChannelException(); 137 } 138 } 139 140 @Override 141 public Socket socket() { 142 synchronized (stateLock) { 143 if (socket == null) 144 socket = RdmaSocketAdaptor.create(this); 145 return socket; 146 } 147 } 148 149 @Override 150 public SocketAddress getLocalAddress() throws IOException { 151 synchronized (stateLock) { 152 ensureOpen(); 153 return RdmaNet.getRevealedLocalAddress(localAddress); 154 } 155 } 156 157 @Override 158 public SocketAddress getRemoteAddress() throws IOException { 159 synchronized (stateLock) { 160 ensureOpen(); 161 return remoteAddress; 162 } 163 } 164 165 @Override 166 public <T> SocketChannel setOption(SocketOption<T> name, T value) 167 throws IOException 168 { 169 Objects.requireNonNull(name); 170 if (!supportedOptions().contains(name)) 171 throw new UnsupportedOperationException("'" + name + "' not supported"); 172 173 synchronized (stateLock) { 174 ensureOpen(); 175 176 if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { 177 isReuseAddress = (Boolean)value; 178 return this; 179 } 180 181 RdmaNet.setSocketOption(fd, RdmaNet.UNSPEC, name, value); 182 return this; 183 } 184 } 185 186 @Override 187 @SuppressWarnings("unchecked") 188 public <T> T getOption(SocketOption<T> name) 189 throws IOException 190 { 191 Objects.requireNonNull(name); 192 if (!supportedOptions().contains(name)) 193 throw new UnsupportedOperationException("'" + name + "' not supported"); 194 195 synchronized (stateLock) { 196 ensureOpen(); 197 198 if (name == StandardSocketOptions.SO_REUSEADDR && RdmaNet.useExclusiveBind()) { 199 return (T)Boolean.valueOf(isReuseAddress); 200 } 201 202 return (T) RdmaNet.getSocketOption(fd, RdmaNet.UNSPEC, name); 203 } 204 } 205 206 private static class DefaultOptionsHolder { 207 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 208 209 private static Set<SocketOption<?>> defaultOptions() { 210 HashSet<SocketOption<?>> set = new HashSet<>(); 211 set.add(StandardSocketOptions.SO_SNDBUF); 212 set.add(StandardSocketOptions.SO_RCVBUF); 213 set.add(StandardSocketOptions.SO_KEEPALIVE); 214 set.add(StandardSocketOptions.SO_REUSEADDR); 215 set.add(StandardSocketOptions.TCP_NODELAY); 216 RdmaSocketOptions rdmaOptions = 217 RdmaSocketOptions.getInstance(); 218 set.addAll(rdmaOptions.options()); 219 return Collections.unmodifiableSet(set); 220 } 221 } 222 223 public Set<SocketOption<?>> supportedOptions() { 224 return DefaultOptionsHolder.defaultOptions; 225 } 226 227 private void beginRead(boolean blocking) throws ClosedChannelException { 228 if (blocking) { 229 // set hook for Thread.interrupt 230 begin(); 231 232 synchronized (stateLock) { 233 ensureOpenAndConnected(); 234 // record thread so it can be signalled if needed 235 readerThread = NativeThread.current(); 236 } 237 } else { 238 ensureOpenAndConnected(); 239 } 240 } 241 private void endRead(boolean blocking, boolean completed) 242 throws AsynchronousCloseException 243 { 244 if (blocking) { 245 synchronized (stateLock) { 246 readerThread = 0; 247 // notify any thread waiting in implCloseSelectableChannel 248 if (state == ST_CLOSING) { 249 stateLock.notifyAll(); 250 } 251 } 252 // remove hook for Thread.interrupt 253 end(completed); 254 } 255 } 256 257 @Override 258 public int read(ByteBuffer buf) throws IOException { 259 Objects.requireNonNull(buf); 260 261 readLock.lock(); 262 try { 263 boolean blocking = isBlocking(); 264 int n = 0; 265 try { 266 beginRead(blocking); 267 268 // check if input is shutdown 269 if (isInputClosed) 270 return IOStatus.EOF; 271 272 if (blocking) { 273 do { 274 n = IOUtil.read(fd, buf, -1, nd); 275 } while (n == IOStatus.INTERRUPTED && isOpen()); 276 } else { 277 n = IOUtil.read(fd, buf, -1, nd); 278 } 279 } finally { 280 endRead(blocking, n > 0); 281 if (n <= 0 && isInputClosed) 282 return IOStatus.EOF; 283 } 284 return IOStatus.normalize(n); 285 } finally { 286 readLock.unlock(); 287 } 288 } 289 290 @Override 291 public long read(ByteBuffer[] dsts, int offset, int length) 292 throws IOException 293 { 294 Objects.checkFromIndexSize(offset, length, dsts.length); 295 296 readLock.lock(); 297 try { 298 boolean blocking = isBlocking(); 299 long n = 0; 300 try { 301 beginRead(blocking); 302 303 // check if input is shutdown 304 if (isInputClosed) 305 return IOStatus.EOF; 306 307 if (blocking) { 308 do { 309 n = IOUtil.read(fd, dsts, offset, length, nd); 310 } while (n == IOStatus.INTERRUPTED && isOpen()); 311 } else { 312 n = IOUtil.read(fd, dsts, offset, length, nd); 313 } 314 } finally { 315 endRead(blocking, n > 0); 316 if (n <= 0 && isInputClosed) 317 return IOStatus.EOF; 318 } 319 return IOStatus.normalize(n); 320 } finally { 321 readLock.unlock(); 322 } 323 } 324 325 private void beginWrite(boolean blocking) throws ClosedChannelException { 326 if (blocking) { 327 // set hook for Thread.interrupt 328 begin(); 329 330 synchronized (stateLock) { 331 ensureOpenAndConnected(); 332 if (isOutputClosed) 333 throw new ClosedChannelException(); 334 // record thread so it can be signalled if needed 335 writerThread = NativeThread.current(); 336 } 337 } else { 338 ensureOpenAndConnected(); 339 } 340 } 341 342 private void endWrite(boolean blocking, boolean completed) 343 throws AsynchronousCloseException 344 { 345 if (blocking) { 346 synchronized (stateLock) { 347 writerThread = 0; 348 // notify any thread waiting in implCloseSelectableChannel 349 if (state == ST_CLOSING) { 350 stateLock.notifyAll(); 351 } 352 } 353 // remove hook for Thread.interrupt 354 end(completed); 355 } 356 } 357 358 @Override 359 public int write(ByteBuffer buf) throws IOException { 360 Objects.requireNonNull(buf); 361 362 writeLock.lock(); 363 try { 364 boolean blocking = isBlocking(); 365 int n = 0; 366 try { 367 beginWrite(blocking); 368 if (blocking) { 369 do { 370 n = IOUtil.write(fd, buf, -1, nd); 371 } while (n == IOStatus.INTERRUPTED && isOpen()); 372 } else { 373 n = IOUtil.write(fd, buf, -1, nd); 374 } 375 } finally { 376 endWrite(blocking, n > 0); 377 if (n <= 0 && isOutputClosed) 378 throw new AsynchronousCloseException(); 379 } 380 return IOStatus.normalize(n); 381 } finally { 382 writeLock.unlock(); 383 } 384 } 385 386 @Override 387 public long write(ByteBuffer[] srcs, int offset, int length) 388 throws IOException 389 { 390 Objects.checkFromIndexSize(offset, length, srcs.length); 391 392 writeLock.lock(); 393 try { 394 boolean blocking = isBlocking(); 395 long n = 0; 396 try { 397 beginWrite(blocking); 398 if (blocking) { 399 do { 400 n = IOUtil.write(fd, srcs, offset, length, nd); 401 } while (n == IOStatus.INTERRUPTED && isOpen()); 402 } else { 403 n = IOUtil.write(fd, srcs, offset, length, nd); 404 } 405 } finally { 406 endWrite(blocking, n > 0); 407 if (n <= 0 && isOutputClosed) 408 throw new AsynchronousCloseException(); 409 } 410 return IOStatus.normalize(n); 411 } finally { 412 writeLock.unlock(); 413 } 414 } 415 416 int sendOutOfBandData(byte b) throws IOException { 417 writeLock.lock(); 418 try { 419 boolean blocking = isBlocking(); 420 int n = 0; 421 try { 422 beginWrite(blocking); 423 if (blocking) { 424 do { 425 n = sendOutOfBandData(fd, b); 426 } while (n == IOStatus.INTERRUPTED && isOpen()); 427 } else { 428 n = sendOutOfBandData(fd, b); 429 } 430 } finally { 431 endWrite(blocking, n > 0); 432 if (n <= 0 && isOutputClosed) 433 throw new AsynchronousCloseException(); 434 } 435 return IOStatus.normalize(n); 436 } finally { 437 writeLock.unlock(); 438 } 439 } 440 441 @Override 442 protected void implConfigureBlocking(boolean block) throws IOException { 443 readLock.lock(); 444 try { 445 writeLock.lock(); 446 try { 447 synchronized (stateLock) { 448 ensureOpen(); 449 RdmaNet.configureBlocking(fd, block); 450 } 451 } finally { 452 writeLock.unlock(); 453 } 454 } finally { 455 readLock.unlock(); 456 } 457 } 458 459 InetSocketAddress localAddress() { 460 synchronized (stateLock) { 461 return localAddress; 462 } 463 } 464 465 InetSocketAddress remoteAddress() { 466 synchronized (stateLock) { 467 return remoteAddress; 468 } 469 } 470 471 @Override 472 public SocketChannel bind(SocketAddress local) throws IOException { 473 readLock.lock(); 474 try { 475 writeLock.lock(); 476 try { 477 synchronized (stateLock) { 478 ensureOpen(); 479 if (state == ST_CONNECTIONPENDING) 480 throw new ConnectionPendingException(); 481 if (localAddress != null) 482 throw new AlreadyBoundException(); 483 InetSocketAddress isa = (local == null) ? 484 new InetSocketAddress(0) : RdmaNet.checkAddress(local); 485 SecurityManager sm = System.getSecurityManager(); 486 if (sm != null) { 487 sm.checkListen(isa.getPort()); 488 } 489 RdmaNet.bind(fd, isa.getAddress(), isa.getPort()); 490 localAddress = RdmaNet.localAddress(fd); 491 } 492 } finally { 493 writeLock.unlock(); 494 } 495 } finally { 496 readLock.unlock(); 497 } 498 return this; 499 } 500 501 @Override 502 public boolean isConnected() { 503 return (state == ST_CONNECTED); 504 } 505 506 @Override 507 public boolean isConnectionPending() { 508 return (state == ST_CONNECTIONPENDING); 509 } 510 511 private void beginConnect(boolean blocking, InetSocketAddress isa) 512 throws IOException 513 { 514 if (blocking) { 515 // set hook for Thread.interrupt 516 begin(); 517 } 518 synchronized (stateLock) { 519 ensureOpen(); 520 int state = this.state; 521 if (state == ST_CONNECTED) 522 throw new AlreadyConnectedException(); 523 if (state == ST_CONNECTIONPENDING) 524 throw new ConnectionPendingException(); 525 assert state == ST_UNCONNECTED; 526 this.state = ST_CONNECTIONPENDING; 527 528 remoteAddress = isa; 529 530 if (blocking) { 531 // record thread so it can be signalled if needed 532 readerThread = NativeThread.current(); 533 } 534 } 535 } 536 537 private void endConnect(boolean blocking, boolean completed) 538 throws IOException 539 { 540 endRead(blocking, completed); 541 542 if (completed) { 543 synchronized (stateLock) { 544 if (state == ST_CONNECTIONPENDING) { 545 localAddress = RdmaNet.localAddress(fd); 546 state = ST_CONNECTED; 547 } 548 } 549 } 550 } 551 552 @Override 553 public boolean connect(SocketAddress sa) throws IOException { 554 InetSocketAddress isa = RdmaNet.checkAddress(sa); 555 SecurityManager sm = System.getSecurityManager(); 556 if (sm != null) 557 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 558 559 InetAddress ia = isa.getAddress(); 560 if (ia.isAnyLocalAddress()) 561 ia = InetAddress.getLocalHost(); 562 563 try { 564 readLock.lock(); 565 try { 566 writeLock.lock(); 567 try { 568 int n = 0; 569 boolean blocking = isBlocking(); 570 try { 571 beginConnect(blocking, isa); 572 do { 573 n = RdmaNet.connect(fd, ia, isa.getPort()); 574 } while (n == IOStatus.INTERRUPTED && isOpen()); 575 } finally { 576 endConnect(blocking, (n > 0)); 577 } 578 assert IOStatus.check(n); 579 return n > 0; 580 } finally { 581 writeLock.unlock(); 582 } 583 } finally { 584 readLock.unlock(); 585 } 586 } catch (IOException ioe) { 587 // connect failed, close the channel 588 close(); 589 throw ioe; 590 } 591 } 592 593 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { 594 if (blocking) { 595 // set hook for Thread.interrupt 596 begin(); 597 } 598 synchronized (stateLock) { 599 ensureOpen(); 600 if (state != ST_CONNECTIONPENDING) 601 throw new NoConnectionPendingException(); 602 if (blocking) { 603 // record thread so it can be signalled if needed 604 readerThread = NativeThread.current(); 605 } 606 } 607 } 608 609 private void endFinishConnect(boolean blocking, boolean completed) 610 throws IOException 611 { 612 endRead(blocking, completed); 613 614 if (completed) { 615 synchronized (stateLock) { 616 if (state == ST_CONNECTIONPENDING) { 617 localAddress = RdmaNet.localAddress(fd); 618 state = ST_CONNECTED; 619 } 620 } 621 } 622 } 623 624 @Override 625 public boolean finishConnect() throws IOException { 626 try { 627 readLock.lock(); 628 try { 629 writeLock.lock(); 630 try { 631 // no-op if already connected 632 if (isConnected()) 633 return true; 634 635 boolean blocking = isBlocking(); 636 boolean connected = false; 637 try { 638 beginFinishConnect(blocking); 639 int n = 0; 640 if (blocking) { 641 do { 642 n = checkConnect(fd, true); 643 } while ((n == 0 || n == IOStatus.INTERRUPTED) && isOpen()); 644 } else { 645 n = checkConnect(fd, false); 646 } 647 connected = (n > 0); 648 } finally { 649 endFinishConnect(blocking, connected); 650 } 651 assert (blocking && connected) ^ !blocking; 652 return connected; 653 } finally { 654 writeLock.unlock(); 655 } 656 } finally { 657 readLock.unlock(); 658 } 659 } catch (IOException ioe) { 660 // connect failed, close the channel 661 close(); 662 throw ioe; 663 } 664 } 665 666 @Override 667 protected void implCloseSelectableChannel() throws IOException { 668 assert !isOpen(); 669 670 boolean blocking; 671 boolean connected; 672 boolean interrupted = false; 673 674 // set state to ST_CLOSING 675 synchronized (stateLock) { 676 assert state < ST_CLOSING; 677 blocking = isBlocking(); 678 connected = (state == ST_CONNECTED); 679 state = ST_CLOSING; 680 } 681 682 // wait for any outstanding I/O operations to complete 683 if (blocking) { 684 synchronized (stateLock) { 685 assert state == ST_CLOSING; 686 long reader = readerThread; 687 long writer = writerThread; 688 if (reader != 0 || writer != 0) { 689 nd.preClose(fd); 690 connected = false; // fd is no longer connected socket 691 692 if (reader != 0) 693 NativeThread.signal(reader); 694 if (writer != 0) 695 NativeThread.signal(writer); 696 697 // wait for blocking I/O operations to end 698 while (readerThread != 0 || writerThread != 0) { 699 try { 700 stateLock.wait(); 701 } catch (InterruptedException e) { 702 interrupted = true; 703 } 704 } 705 } 706 } 707 } else { 708 // non-blocking mode: wait for read/write to complete 709 readLock.lock(); 710 try { 711 writeLock.lock(); 712 writeLock.unlock(); 713 } finally { 714 readLock.unlock(); 715 } 716 } 717 718 // set state to ST_KILLPENDING 719 synchronized (stateLock) { 720 assert state == ST_CLOSING; 721 // if connected, and the channel is registered with a Selector, we 722 // shutdown the output so that the peer reads EOF 723 if (connected && isRegistered()) { 724 try { 725 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); 726 } catch (IOException ignore) { } 727 } 728 state = ST_KILLPENDING; 729 } 730 731 // close socket if not registered with Selector 732 if (!isRegistered()) 733 kill(); 734 735 // restore interrupt status 736 if (interrupted) 737 Thread.currentThread().interrupt(); 738 } 739 740 @Override 741 public void kill() throws IOException { 742 synchronized (stateLock) { 743 if (state == ST_KILLPENDING) { 744 state = ST_KILLED; 745 nd.close(fd); 746 } 747 } 748 } 749 750 @Override 751 public SocketChannel shutdownInput() throws IOException { 752 synchronized (stateLock) { 753 ensureOpen(); 754 if (!isConnected()) 755 throw new NotYetConnectedException(); 756 if (!isInputClosed) { 757 RdmaNet.shutdown(fd, RdmaNet.SHUT_RD); 758 long thread = readerThread; 759 if (thread != 0) 760 NativeThread.signal(thread); 761 isInputClosed = true; 762 } 763 return this; 764 } 765 } 766 767 @Override 768 public SocketChannel shutdownOutput() throws IOException { 769 synchronized (stateLock) { 770 ensureOpen(); 771 if (!isConnected()) 772 throw new NotYetConnectedException(); 773 if (!isOutputClosed) { 774 RdmaNet.shutdown(fd, RdmaNet.SHUT_WR); 775 long thread = writerThread; 776 if (thread != 0) 777 NativeThread.signal(thread); 778 isOutputClosed = true; 779 } 780 return this; 781 } 782 } 783 784 boolean isInputOpen() { 785 return !isInputClosed; 786 } 787 788 boolean isOutputOpen() { 789 return !isOutputClosed; 790 } 791 792 /** 793 * Poll this channel's socket for reading up to the given timeout. 794 * @return {@code true} if the socket is polled 795 */ 796 boolean pollRead(long timeout) throws IOException { 797 boolean blocking = isBlocking(); 798 assert Thread.holdsLock(blockingLock()) && blocking; 799 800 readLock.lock(); 801 try { 802 boolean polled = false; 803 try { 804 beginRead(blocking); 805 int events = RdmaNet.poll(fd, RdmaNet.POLLIN, timeout); 806 polled = (events != 0); 807 } finally { 808 endRead(blocking, polled); 809 } 810 return polled; 811 } finally { 812 readLock.unlock(); 813 } 814 } 815 816 /** 817 * Poll this channel's socket for a connection, up to the given timeout. 818 * @return {@code true} if the socket is polled 819 */ 820 boolean pollConnected(long timeout) throws IOException { 821 boolean blocking = isBlocking(); 822 assert Thread.holdsLock(blockingLock()) && blocking; 823 824 readLock.lock(); 825 try { 826 writeLock.lock(); 827 try { 828 boolean polled = false; 829 try { 830 beginFinishConnect(blocking); 831 int events = RdmaNet.poll(fd, RdmaNet.POLLCONN, timeout); 832 polled = (events != 0); 833 } finally { 834 // invoke endFinishConnect with completed = false so that 835 // the state is not changed to ST_CONNECTED. The socket 836 // adaptor will use finishConnect to finish. 837 endFinishConnect(blocking, /*completed*/false); 838 } 839 return polled; 840 } finally { 841 writeLock.unlock(); 842 } 843 } finally { 844 readLock.unlock(); 845 } 846 } 847 848 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 849 int intOps = ski.nioInterestOps(); 850 int oldOps = ski.nioReadyOps(); 851 int newOps = initialOps; 852 853 if ((ops & Net.POLLNVAL) != 0) { 854 return false; 855 } 856 857 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 858 newOps = intOps; 859 ski.nioReadyOps(newOps); 860 return (newOps & ~oldOps) != 0; 861 } 862 863 boolean connected = isConnected(); 864 if (((ops & Net.POLLIN) != 0) && 865 ((intOps & SelectionKey.OP_READ) != 0) && connected) 866 newOps |= SelectionKey.OP_READ; 867 868 if (((ops & Net.POLLCONN) != 0) && 869 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 870 newOps |= SelectionKey.OP_CONNECT; 871 872 if (((ops & Net.POLLOUT) != 0) && 873 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 874 newOps |= SelectionKey.OP_WRITE; 875 876 ski.nioReadyOps(newOps); 877 return (newOps & ~oldOps) != 0; 878 } 879 880 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 881 return translateReadyOps(ops, ski.nioReadyOps(), ski); 882 } 883 884 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 885 return translateReadyOps(ops, 0, ski); 886 } 887 888 public int translateInterestOps(int ops) { 889 int newOps = 0; 890 if ((ops & SelectionKey.OP_READ) != 0) 891 newOps |= Net.POLLIN; 892 if ((ops & SelectionKey.OP_WRITE) != 0) 893 newOps |= Net.POLLOUT; 894 if ((ops & SelectionKey.OP_CONNECT) != 0) 895 newOps |= Net.POLLCONN; 896 return newOps; 897 } 898 899 public FileDescriptor getFD() { 900 return fd; 901 } 902 903 public int getFDVal() { 904 return fdVal; 905 } 906 907 @Override 908 public String toString() { 909 StringBuilder sb = new StringBuilder(); 910 sb.append(this.getClass().getSuperclass().getName()); 911 sb.append('['); 912 if (!isOpen()) 913 sb.append("closed"); 914 else { 915 synchronized (stateLock) { 916 switch (state) { 917 case ST_UNCONNECTED: 918 sb.append("unconnected"); 919 break; 920 case ST_CONNECTIONPENDING: 921 sb.append("connection-pending"); 922 break; 923 case ST_CONNECTED: 924 sb.append("connected"); 925 if (isInputClosed) 926 sb.append(" ishut"); 927 if (isOutputClosed) 928 sb.append(" oshut"); 929 break; 930 } 931 InetSocketAddress addr = localAddress(); 932 if (addr != null) { 933 sb.append(" local="); 934 sb.append(RdmaNet.getRevealedLocalAddressAsString(addr)); 935 } 936 if (remoteAddress() != null) { 937 sb.append(" remote="); 938 sb.append(remoteAddress().toString()); 939 } 940 } 941 } 942 sb.append(']'); 943 return sb.toString(); 944 } 945 946 // -- Native methods -- 947 948 private static native void initIDs(); 949 950 private static native int checkConnect(FileDescriptor fd, boolean block) 951 throws IOException; 952 953 private static native int sendOutOfBandData(FileDescriptor fd, byte data) 954 throws IOException; 955 956 static { 957 IOUtil.load(); 958 initIDs(); 959 nd = new RdmaSocketDispatcher(); 960 } 961 962 }