1 /* 2 * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.io.UncheckedIOException; 31 import java.lang.ref.Cleaner.Cleanable; 32 import java.net.DatagramSocket; 33 import java.net.Inet4Address; 34 import java.net.Inet6Address; 35 import java.net.InetAddress; 36 import java.net.InetSocketAddress; 37 import java.net.NetworkInterface; 38 import java.net.PortUnreachableException; 39 import java.net.ProtocolFamily; 40 import java.net.SocketAddress; 41 import java.net.SocketOption; 42 import java.net.StandardProtocolFamily; 43 import java.net.StandardSocketOptions; 44 import java.nio.ByteBuffer; 45 import java.nio.channels.AlreadyBoundException; 46 import java.nio.channels.AlreadyConnectedException; 47 import java.nio.channels.AsynchronousCloseException; 48 import java.nio.channels.ClosedChannelException; 49 import java.nio.channels.DatagramChannel; 50 import java.nio.channels.MembershipKey; 51 import java.nio.channels.NotYetConnectedException; 52 import java.nio.channels.SelectionKey; 53 import java.nio.channels.spi.SelectorProvider; 54 import java.util.Collections; 55 import java.util.HashSet; 56 import java.util.Objects; 57 import java.util.Set; 58 import java.util.concurrent.locks.ReentrantLock; 59 60 import jdk.internal.ref.CleanerFactory; 61 import sun.net.ResourceManager; 62 import sun.net.ext.ExtendedSocketOptions; 63 import sun.net.util.IPAddressUtil; 64 65 /** 66 * An implementation of DatagramChannels. 67 */ 68 69 class DatagramChannelImpl 70 extends DatagramChannel 71 implements SelChImpl 72 { 73 // Used to make native read and write calls 74 private static final NativeDispatcher nd = new DatagramDispatcher(); 75 76 // The protocol family of the socket 77 private final ProtocolFamily family; 78 79 // Our file descriptor 80 private final FileDescriptor fd; 81 private final int fdVal; 82 private final Cleanable cleaner; 83 84 // Cached InetAddress and port for unconnected DatagramChannels 85 // used by receive0 86 private InetAddress cachedSenderInetAddress; 87 private int cachedSenderPort; 88 89 // Lock held by current reading or connecting thread 90 private final ReentrantLock readLock = new ReentrantLock(); 91 92 // Lock held by current writing or connecting thread 93 private final ReentrantLock writeLock = new ReentrantLock(); 94 95 // Lock held by any thread that modifies the state fields declared below 96 // DO NOT invoke a blocking I/O operation while holding this lock! 97 private final Object stateLock = new Object(); 98 99 // -- The following fields are protected by stateLock 100 101 // State (does not necessarily increase monotonically) 102 private static final int ST_UNCONNECTED = 0; 103 private static final int ST_CONNECTED = 1; 104 private static final int ST_CLOSING = 2; 105 private static final int ST_CLOSED = 3; 106 private int state; 107 108 // IDs of native threads doing reads and writes, for signalling 109 private long readerThread; 110 private long writerThread; 111 112 // Binding and remote address (when connected) 113 private InetSocketAddress localAddress; 114 private InetSocketAddress remoteAddress; 115 116 // Our socket adaptor, if any 117 private DatagramSocket socket; 118 119 // Multicast support 120 private MembershipRegistry registry; 121 122 // set true when socket is bound and SO_REUSEADDRESS is emulated 123 private boolean reuseAddressEmulated; 124 125 // set true/false when socket is already bound and SO_REUSEADDR is emulated 126 private boolean isReuseAddress; 127 128 // -- End of fields protected by stateLock 129 130 public DatagramChannelImpl(SelectorProvider sp) 131 throws IOException 132 { 133 super(sp); 134 ResourceManager.beforeUdpCreate(); 135 try { 136 this.family = Net.isIPv6Available() 137 ? StandardProtocolFamily.INET6 138 : StandardProtocolFamily.INET; 139 this.fd = Net.socket(family, false); 140 this.fdVal = IOUtil.fdVal(fd); 141 } catch (IOException ioe) { 142 ResourceManager.afterUdpClose(); 143 throw ioe; 144 } 145 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); 146 } 147 148 public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family) 149 throws IOException 150 { 151 super(sp); 152 Objects.requireNonNull(family, "'family' is null"); 153 if ((family != StandardProtocolFamily.INET) && 154 (family != StandardProtocolFamily.INET6)) { 155 throw new UnsupportedOperationException("Protocol family not supported"); 156 } 157 if (family == StandardProtocolFamily.INET6) { 158 if (!Net.isIPv6Available()) { 159 throw new UnsupportedOperationException("IPv6 not available"); 160 } 161 } 162 163 ResourceManager.beforeUdpCreate(); 164 try { 165 this.family = family; 166 this.fd = Net.socket(family, false); 167 this.fdVal = IOUtil.fdVal(fd); 168 } catch (IOException ioe) { 169 ResourceManager.afterUdpClose(); 170 throw ioe; 171 } 172 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); 173 } 174 175 public DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd) 176 throws IOException 177 { 178 super(sp); 179 180 // increment UDP count to match decrement when closing 181 ResourceManager.beforeUdpCreate(); 182 183 this.family = Net.isIPv6Available() 184 ? StandardProtocolFamily.INET6 185 : StandardProtocolFamily.INET; 186 this.fd = fd; 187 this.fdVal = IOUtil.fdVal(fd); 188 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); 189 synchronized (stateLock) { 190 this.localAddress = Net.localAddress(fd); 191 } 192 } 193 194 // @throws ClosedChannelException if channel is closed 195 private void ensureOpen() throws ClosedChannelException { 196 if (!isOpen()) 197 throw new ClosedChannelException(); 198 } 199 200 @Override 201 public DatagramSocket socket() { 202 synchronized (stateLock) { 203 if (socket == null) 204 socket = DatagramSocketAdaptor.create(this); 205 return socket; 206 } 207 } 208 209 @Override 210 public SocketAddress getLocalAddress() throws IOException { 211 synchronized (stateLock) { 212 ensureOpen(); 213 // Perform security check before returning address 214 return Net.getRevealedLocalAddress(localAddress); 215 } 216 } 217 218 @Override 219 public SocketAddress getRemoteAddress() throws IOException { 220 synchronized (stateLock) { 221 ensureOpen(); 222 return remoteAddress; 223 } 224 } 225 226 @Override 227 public <T> DatagramChannel setOption(SocketOption<T> name, T value) 228 throws IOException 229 { 230 Objects.requireNonNull(name); 231 if (!supportedOptions().contains(name)) 232 throw new UnsupportedOperationException("'" + name + "' not supported"); 233 if (!name.type().isInstance(value)) 234 throw new IllegalArgumentException("Invalid value '" + value + "'"); 235 236 synchronized (stateLock) { 237 ensureOpen(); 238 239 if (name == StandardSocketOptions.IP_TOS || 240 name == StandardSocketOptions.IP_MULTICAST_TTL || 241 name == StandardSocketOptions.IP_MULTICAST_LOOP) 242 { 243 // options are protocol dependent 244 Net.setSocketOption(fd, family, name, value); 245 return this; 246 } 247 248 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 249 NetworkInterface interf = (NetworkInterface)value; 250 if (family == StandardProtocolFamily.INET6) { 251 int index = interf.getIndex(); 252 if (index == -1) 253 throw new IOException("Network interface cannot be identified"); 254 Net.setInterface6(fd, index); 255 } else { 256 // need IPv4 address to identify interface 257 Inet4Address target = Net.anyInet4Address(interf); 258 if (target == null) 259 throw new IOException("Network interface not configured for IPv4"); 260 int targetAddress = Net.inet4AsInt(target); 261 Net.setInterface4(fd, targetAddress); 262 } 263 return this; 264 } 265 if (name == StandardSocketOptions.SO_REUSEADDR 266 && Net.useExclusiveBind() && localAddress != null) { 267 reuseAddressEmulated = true; 268 this.isReuseAddress = (Boolean)value; 269 } 270 271 // remaining options don't need any special handling 272 Net.setSocketOption(fd, Net.UNSPEC, name, value); 273 return this; 274 } 275 } 276 277 @Override 278 @SuppressWarnings("unchecked") 279 public <T> T getOption(SocketOption<T> name) 280 throws IOException 281 { 282 Objects.requireNonNull(name); 283 if (!supportedOptions().contains(name)) 284 throw new UnsupportedOperationException("'" + name + "' not supported"); 285 286 synchronized (stateLock) { 287 ensureOpen(); 288 289 if (name == StandardSocketOptions.IP_TOS || 290 name == StandardSocketOptions.IP_MULTICAST_TTL || 291 name == StandardSocketOptions.IP_MULTICAST_LOOP) 292 { 293 return (T) Net.getSocketOption(fd, family, name); 294 } 295 296 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 297 if (family == StandardProtocolFamily.INET) { 298 int address = Net.getInterface4(fd); 299 if (address == 0) 300 return null; // default interface 301 302 InetAddress ia = Net.inet4FromInt(address); 303 NetworkInterface ni = NetworkInterface.getByInetAddress(ia); 304 if (ni == null) 305 throw new IOException("Unable to map address to interface"); 306 return (T) ni; 307 } else { 308 int index = Net.getInterface6(fd); 309 if (index == 0) 310 return null; // default interface 311 312 NetworkInterface ni = NetworkInterface.getByIndex(index); 313 if (ni == null) 314 throw new IOException("Unable to map index to interface"); 315 return (T) ni; 316 } 317 } 318 319 if (name == StandardSocketOptions.SO_REUSEADDR && reuseAddressEmulated) { 320 return (T)Boolean.valueOf(isReuseAddress); 321 } 322 323 // no special handling 324 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); 325 } 326 } 327 328 private static class DefaultOptionsHolder { 329 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 330 331 private static Set<SocketOption<?>> defaultOptions() { 332 HashSet<SocketOption<?>> set = new HashSet<>(); 333 set.add(StandardSocketOptions.SO_SNDBUF); 334 set.add(StandardSocketOptions.SO_RCVBUF); 335 set.add(StandardSocketOptions.SO_REUSEADDR); 336 if (Net.isReusePortAvailable()) { 337 set.add(StandardSocketOptions.SO_REUSEPORT); 338 } 339 set.add(StandardSocketOptions.SO_BROADCAST); 340 set.add(StandardSocketOptions.IP_TOS); 341 set.add(StandardSocketOptions.IP_MULTICAST_IF); 342 set.add(StandardSocketOptions.IP_MULTICAST_TTL); 343 set.add(StandardSocketOptions.IP_MULTICAST_LOOP); 344 set.addAll(ExtendedSocketOptions.datagramSocketOptions()); 345 return Collections.unmodifiableSet(set); 346 } 347 } 348 349 @Override 350 public final Set<SocketOption<?>> supportedOptions() { 351 return DefaultOptionsHolder.defaultOptions; 352 } 353 354 /** 355 * Marks the beginning of a read operation that might block. 356 * 357 * @param blocking true if configured blocking 358 * @param mustBeConnected true if the socket must be connected 359 * @return remote address if connected 360 * @throws ClosedChannelException if the channel is closed 361 * @throws NotYetConnectedException if mustBeConnected and not connected 362 * @throws IOException if socket not bound and cannot be bound 363 */ 364 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected) 365 throws IOException 366 { 367 if (blocking) { 368 // set hook for Thread.interrupt 369 begin(); 370 } 371 SocketAddress remote; 372 synchronized (stateLock) { 373 ensureOpen(); 374 remote = remoteAddress; 375 if ((remote == null) && mustBeConnected) 376 throw new NotYetConnectedException(); 377 if (localAddress == null) 378 bindInternal(null); 379 if (blocking) 380 readerThread = NativeThread.current(); 381 } 382 return remote; 383 } 384 385 /** 386 * Marks the end of a read operation that may have blocked. 387 * 388 * @throws AsynchronousCloseException if the channel was closed asynchronously 389 */ 390 private void endRead(boolean blocking, boolean completed) 391 throws AsynchronousCloseException 392 { 393 if (blocking) { 394 synchronized (stateLock) { 395 readerThread = 0; 396 if (state == ST_CLOSING) { 397 tryFinishClose(); 398 } 399 } 400 // remove hook for Thread.interrupt 401 end(completed); 402 } 403 } 404 405 private SocketAddress sender; // Set by receive0 (## ugh) 406 407 @Override 408 public SocketAddress receive(ByteBuffer dst) throws IOException { 409 if (dst.isReadOnly()) 410 throw new IllegalArgumentException("Read-only buffer"); 411 412 readLock.lock(); 413 try { 414 boolean blocking = isBlocking(); 415 int n = 0; 416 ByteBuffer bb = null; 417 try { 418 SocketAddress remote = beginRead(blocking, false); 419 boolean connected = (remote != null); 420 SecurityManager sm = System.getSecurityManager(); 421 if (connected || (sm == null)) { 422 // connected or no security manager 423 n = receive(fd, dst, connected); 424 if (blocking) { 425 while (IOStatus.okayToRetry(n) && isOpen()) { 426 park(Net.POLLIN); 427 n = receive(fd, dst, connected); 428 } 429 } else if (n == IOStatus.UNAVAILABLE) { 430 return null; 431 } 432 } else { 433 // Cannot receive into user's buffer when running with a 434 // security manager and not connected 435 bb = Util.getTemporaryDirectBuffer(dst.remaining()); 436 for (;;) { 437 n = receive(fd, bb, connected); 438 if (blocking) { 439 while (IOStatus.okayToRetry(n) && isOpen()) { 440 park(Net.POLLIN); 441 n = receive(fd, bb, connected); 442 } 443 } else if (n == IOStatus.UNAVAILABLE) { 444 return null; 445 } 446 InetSocketAddress isa = (InetSocketAddress)sender; 447 try { 448 sm.checkAccept(isa.getAddress().getHostAddress(), 449 isa.getPort()); 450 } catch (SecurityException se) { 451 // Ignore packet 452 bb.clear(); 453 n = 0; 454 continue; 455 } 456 bb.flip(); 457 dst.put(bb); 458 break; 459 } 460 } 461 assert sender != null; 462 return sender; 463 } finally { 464 if (bb != null) 465 Util.releaseTemporaryDirectBuffer(bb); 466 endRead(blocking, n > 0); 467 assert IOStatus.check(n); 468 } 469 } finally { 470 readLock.unlock(); 471 } 472 } 473 474 private int receive(FileDescriptor fd, ByteBuffer dst, boolean connected) 475 throws IOException 476 { 477 int pos = dst.position(); 478 int lim = dst.limit(); 479 assert (pos <= lim); 480 int rem = (pos <= lim ? lim - pos : 0); 481 if (dst instanceof DirectBuffer && rem > 0) 482 return receiveIntoNativeBuffer(fd, dst, rem, pos, connected); 483 484 // Substitute a native buffer. If the supplied buffer is empty 485 // we must instead use a nonempty buffer, otherwise the call 486 // will not block waiting for a datagram on some platforms. 487 int newSize = Math.max(rem, 1); 488 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 489 try { 490 int n = receiveIntoNativeBuffer(fd, bb, newSize, 0, connected); 491 bb.flip(); 492 if (n > 0 && rem > 0) 493 dst.put(bb); 494 return n; 495 } finally { 496 Util.releaseTemporaryDirectBuffer(bb); 497 } 498 } 499 500 private int receiveIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, 501 int rem, int pos, boolean connected) 502 throws IOException 503 { 504 int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected); 505 if (n > 0) 506 bb.position(pos + n); 507 return n; 508 } 509 510 @Override 511 public int send(ByteBuffer src, SocketAddress target) 512 throws IOException 513 { 514 Objects.requireNonNull(src); 515 InetSocketAddress isa = Net.checkAddress(target, family); 516 517 writeLock.lock(); 518 try { 519 boolean blocking = isBlocking(); 520 int n = 0; 521 try { 522 SocketAddress remote = beginWrite(blocking, false); 523 if (remote != null) { 524 // connected 525 if (!target.equals(remote)) { 526 throw new AlreadyConnectedException(); 527 } 528 n = IOUtil.write(fd, src, -1, nd); 529 if (blocking) { 530 while (IOStatus.okayToRetry(n) && isOpen()) { 531 park(Net.POLLOUT); 532 n = IOUtil.write(fd, src, -1, nd); 533 } 534 } 535 } else { 536 // not connected 537 SecurityManager sm = System.getSecurityManager(); 538 InetAddress ia = isa.getAddress(); 539 if (sm != null) { 540 if (ia.isMulticastAddress()) { 541 sm.checkMulticast(ia); 542 } else { 543 sm.checkConnect(ia.getHostAddress(), isa.getPort()); 544 } 545 } 546 if (ia.isLinkLocalAddress()) 547 isa = IPAddressUtil.toScopedAddress(isa); 548 n = send(fd, src, isa); 549 if (blocking) { 550 while (IOStatus.okayToRetry(n) && isOpen()) { 551 park(Net.POLLOUT); 552 n = send(fd, src, isa); 553 } 554 } 555 } 556 } finally { 557 endWrite(blocking, n > 0); 558 assert IOStatus.check(n); 559 } 560 return IOStatus.normalize(n); 561 } finally { 562 writeLock.unlock(); 563 } 564 } 565 566 private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target) 567 throws IOException 568 { 569 if (src instanceof DirectBuffer) 570 return sendFromNativeBuffer(fd, src, target); 571 572 // Substitute a native buffer 573 int pos = src.position(); 574 int lim = src.limit(); 575 assert (pos <= lim); 576 int rem = (pos <= lim ? lim - pos : 0); 577 578 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 579 try { 580 bb.put(src); 581 bb.flip(); 582 // Do not update src until we see how many bytes were written 583 src.position(pos); 584 585 int n = sendFromNativeBuffer(fd, bb, target); 586 if (n > 0) { 587 // now update src 588 src.position(pos + n); 589 } 590 return n; 591 } finally { 592 Util.releaseTemporaryDirectBuffer(bb); 593 } 594 } 595 596 private int sendFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, 597 InetSocketAddress target) 598 throws IOException 599 { 600 int pos = bb.position(); 601 int lim = bb.limit(); 602 assert (pos <= lim); 603 int rem = (pos <= lim ? lim - pos : 0); 604 605 boolean preferIPv6 = (family != StandardProtocolFamily.INET); 606 int written; 607 try { 608 written = send0(preferIPv6, fd, ((DirectBuffer)bb).address() + pos, 609 rem, target.getAddress(), target.getPort()); 610 } catch (PortUnreachableException pue) { 611 if (isConnected()) 612 throw pue; 613 written = rem; 614 } 615 if (written > 0) 616 bb.position(pos + written); 617 return written; 618 } 619 620 @Override 621 public int read(ByteBuffer buf) throws IOException { 622 Objects.requireNonNull(buf); 623 624 readLock.lock(); 625 try { 626 boolean blocking = isBlocking(); 627 int n = 0; 628 try { 629 beginRead(blocking, true); 630 n = IOUtil.read(fd, buf, -1, nd); 631 if (blocking) { 632 while (IOStatus.okayToRetry(n) && isOpen()) { 633 park(Net.POLLIN); 634 n = IOUtil.read(fd, buf, -1, nd); 635 } 636 } 637 } finally { 638 endRead(blocking, n > 0); 639 assert IOStatus.check(n); 640 } 641 return IOStatus.normalize(n); 642 } finally { 643 readLock.unlock(); 644 } 645 } 646 647 @Override 648 public long read(ByteBuffer[] dsts, int offset, int length) 649 throws IOException 650 { 651 Objects.checkFromIndexSize(offset, length, dsts.length); 652 653 readLock.lock(); 654 try { 655 boolean blocking = isBlocking(); 656 long n = 0; 657 try { 658 beginRead(blocking, true); 659 n = IOUtil.read(fd, dsts, offset, length, nd); 660 if (blocking) { 661 while (IOStatus.okayToRetry(n) && isOpen()) { 662 park(Net.POLLIN); 663 n = IOUtil.read(fd, dsts, offset, length, nd); 664 } 665 } 666 } finally { 667 endRead(blocking, n > 0); 668 assert IOStatus.check(n); 669 } 670 return IOStatus.normalize(n); 671 } finally { 672 readLock.unlock(); 673 } 674 } 675 676 /** 677 * Marks the beginning of a write operation that might block. 678 * @param blocking true if configured blocking 679 * @param mustBeConnected true if the socket must be connected 680 * @return remote address if connected 681 * @throws ClosedChannelException if the channel is closed 682 * @throws NotYetConnectedException if mustBeConnected and not connected 683 * @throws IOException if socket not bound and cannot be bound 684 */ 685 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected) 686 throws IOException 687 { 688 if (blocking) { 689 // set hook for Thread.interrupt 690 begin(); 691 } 692 SocketAddress remote; 693 synchronized (stateLock) { 694 ensureOpen(); 695 remote = remoteAddress; 696 if ((remote == null) && mustBeConnected) 697 throw new NotYetConnectedException(); 698 if (localAddress == null) 699 bindInternal(null); 700 if (blocking) 701 writerThread = NativeThread.current(); 702 } 703 return remote; 704 } 705 706 /** 707 * Marks the end of a write operation that may have blocked. 708 * 709 * @throws AsynchronousCloseException if the channel was closed asynchronously 710 */ 711 private void endWrite(boolean blocking, boolean completed) 712 throws AsynchronousCloseException 713 { 714 if (blocking) { 715 synchronized (stateLock) { 716 writerThread = 0; 717 if (state == ST_CLOSING) { 718 tryFinishClose(); 719 } 720 } 721 // remove hook for Thread.interrupt 722 end(completed); 723 } 724 } 725 726 @Override 727 public int write(ByteBuffer buf) throws IOException { 728 Objects.requireNonNull(buf); 729 730 writeLock.lock(); 731 try { 732 boolean blocking = isBlocking(); 733 int n = 0; 734 try { 735 beginWrite(blocking, true); 736 n = IOUtil.write(fd, buf, -1, nd); 737 if (blocking) { 738 while (IOStatus.okayToRetry(n) && isOpen()) { 739 park(Net.POLLOUT); 740 n = IOUtil.write(fd, buf, -1, nd); 741 } 742 } 743 } finally { 744 endWrite(blocking, n > 0); 745 assert IOStatus.check(n); 746 } 747 return IOStatus.normalize(n); 748 } finally { 749 writeLock.unlock(); 750 } 751 } 752 753 @Override 754 public long write(ByteBuffer[] srcs, int offset, int length) 755 throws IOException 756 { 757 Objects.checkFromIndexSize(offset, length, srcs.length); 758 759 writeLock.lock(); 760 try { 761 boolean blocking = isBlocking(); 762 long n = 0; 763 try { 764 beginWrite(blocking, true); 765 n = IOUtil.write(fd, srcs, offset, length, nd); 766 if (blocking) { 767 while (IOStatus.okayToRetry(n) && isOpen()) { 768 park(Net.POLLOUT); 769 n = IOUtil.write(fd, srcs, offset, length, nd); 770 } 771 } 772 } finally { 773 endWrite(blocking, n > 0); 774 assert IOStatus.check(n); 775 } 776 return IOStatus.normalize(n); 777 } finally { 778 writeLock.unlock(); 779 } 780 } 781 782 @Override 783 protected void implConfigureBlocking(boolean block) throws IOException { 784 readLock.lock(); 785 try { 786 writeLock.lock(); 787 try { 788 synchronized (stateLock) { 789 ensureOpen(); 790 IOUtil.configureBlocking(fd, block); 791 } 792 } finally { 793 writeLock.unlock(); 794 } 795 } finally { 796 readLock.unlock(); 797 } 798 } 799 800 InetSocketAddress localAddress() { 801 synchronized (stateLock) { 802 return localAddress; 803 } 804 } 805 806 InetSocketAddress remoteAddress() { 807 synchronized (stateLock) { 808 return remoteAddress; 809 } 810 } 811 812 @Override 813 public DatagramChannel bind(SocketAddress local) throws IOException { 814 readLock.lock(); 815 try { 816 writeLock.lock(); 817 try { 818 synchronized (stateLock) { 819 ensureOpen(); 820 if (localAddress != null) 821 throw new AlreadyBoundException(); 822 bindInternal(local); 823 } 824 } finally { 825 writeLock.unlock(); 826 } 827 } finally { 828 readLock.unlock(); 829 } 830 return this; 831 } 832 833 private void bindInternal(SocketAddress local) throws IOException { 834 assert Thread.holdsLock(stateLock )&& (localAddress == null); 835 836 InetSocketAddress isa; 837 if (local == null) { 838 // only Inet4Address allowed with IPv4 socket 839 if (family == StandardProtocolFamily.INET) { 840 isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0); 841 } else { 842 isa = new InetSocketAddress(0); 843 } 844 } else { 845 isa = Net.checkAddress(local, family); 846 } 847 SecurityManager sm = System.getSecurityManager(); 848 if (sm != null) 849 sm.checkListen(isa.getPort()); 850 851 Net.bind(family, fd, isa.getAddress(), isa.getPort()); 852 localAddress = Net.localAddress(fd); 853 } 854 855 @Override 856 public boolean isConnected() { 857 synchronized (stateLock) { 858 return (state == ST_CONNECTED); 859 } 860 } 861 862 @Override 863 public DatagramChannel connect(SocketAddress sa) throws IOException { 864 InetSocketAddress isa = Net.checkAddress(sa, family); 865 SecurityManager sm = System.getSecurityManager(); 866 if (sm != null) { 867 InetAddress ia = isa.getAddress(); 868 if (ia.isMulticastAddress()) { 869 sm.checkMulticast(ia); 870 } else { 871 sm.checkConnect(ia.getHostAddress(), isa.getPort()); 872 sm.checkAccept(ia.getHostAddress(), isa.getPort()); 873 } 874 } 875 876 readLock.lock(); 877 try { 878 writeLock.lock(); 879 try { 880 synchronized (stateLock) { 881 ensureOpen(); 882 if (state == ST_CONNECTED) 883 throw new AlreadyConnectedException(); 884 885 // ensure that the socket is bound 886 if (localAddress == null) { 887 bindInternal(null); 888 } 889 890 int n = Net.connect(family, 891 fd, 892 isa.getAddress(), 893 isa.getPort()); 894 if (n <= 0) 895 throw new Error(); // Can't happen 896 897 // connected 898 remoteAddress = isa; 899 state = ST_CONNECTED; 900 901 // refresh local address 902 localAddress = Net.localAddress(fd); 903 904 // flush any packets already received. 905 boolean blocking = isBlocking(); 906 if (blocking) { 907 IOUtil.configureBlocking(fd, false); 908 } 909 try { 910 ByteBuffer buf = ByteBuffer.allocate(100); 911 while (receive(fd, buf, false) > 0) { 912 buf.clear(); 913 } 914 } finally { 915 if (blocking) { 916 IOUtil.configureBlocking(fd, true); 917 } 918 } 919 } 920 } finally { 921 writeLock.unlock(); 922 } 923 } finally { 924 readLock.unlock(); 925 } 926 return this; 927 } 928 929 @Override 930 public DatagramChannel disconnect() throws IOException { 931 readLock.lock(); 932 try { 933 writeLock.lock(); 934 try { 935 synchronized (stateLock) { 936 if (!isOpen() || (state != ST_CONNECTED)) 937 return this; 938 939 // disconnect socket 940 boolean isIPv6 = (family == StandardProtocolFamily.INET6); 941 disconnect0(fd, isIPv6); 942 943 // no longer connected 944 remoteAddress = null; 945 state = ST_UNCONNECTED; 946 947 // check whether rebind is needed 948 InetSocketAddress isa = Net.localAddress(fd); 949 if (isa.getPort() == 0) { 950 // On Linux, if bound to ephemeral port, 951 // disconnect does not preserve that port. 952 // In this case, try to rebind to the previous port. 953 int port = localAddress.getPort(); 954 localAddress = isa; // in case Net.bind fails 955 Net.bind(family, fd, isa.getAddress(), port); 956 isa = Net.localAddress(fd); // refresh address 957 assert isa.getPort() == port; 958 } 959 960 // refresh localAddress 961 localAddress = isa; 962 } 963 } finally { 964 writeLock.unlock(); 965 } 966 } finally { 967 readLock.unlock(); 968 } 969 return this; 970 } 971 972 /** 973 * Joins channel's socket to the given group/interface and 974 * optional source address. 975 */ 976 private MembershipKey innerJoin(InetAddress group, 977 NetworkInterface interf, 978 InetAddress source) 979 throws IOException 980 { 981 if (!group.isMulticastAddress()) 982 throw new IllegalArgumentException("Group not a multicast address"); 983 984 // check multicast address is compatible with this socket 985 if (group instanceof Inet4Address) { 986 if (family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group()) 987 throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group"); 988 } else if (group instanceof Inet6Address) { 989 if (family != StandardProtocolFamily.INET6) 990 throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group"); 991 } else { 992 throw new IllegalArgumentException("Address type not supported"); 993 } 994 995 // check source address 996 if (source != null) { 997 if (source.isAnyLocalAddress()) 998 throw new IllegalArgumentException("Source address is a wildcard address"); 999 if (source.isMulticastAddress()) 1000 throw new IllegalArgumentException("Source address is multicast address"); 1001 if (source.getClass() != group.getClass()) 1002 throw new IllegalArgumentException("Source address is different type to group"); 1003 } 1004 1005 SecurityManager sm = System.getSecurityManager(); 1006 if (sm != null) 1007 sm.checkMulticast(group); 1008 1009 synchronized (stateLock) { 1010 ensureOpen(); 1011 1012 // check the registry to see if we are already a member of the group 1013 if (registry == null) { 1014 registry = new MembershipRegistry(); 1015 } else { 1016 // return existing membership key 1017 MembershipKey key = registry.checkMembership(group, interf, source); 1018 if (key != null) 1019 return key; 1020 } 1021 1022 MembershipKeyImpl key; 1023 if ((family == StandardProtocolFamily.INET6) && 1024 ((group instanceof Inet6Address) || Net.canJoin6WithIPv4Group())) 1025 { 1026 int index = interf.getIndex(); 1027 if (index == -1) 1028 throw new IOException("Network interface cannot be identified"); 1029 1030 // need multicast and source address as byte arrays 1031 byte[] groupAddress = Net.inet6AsByteArray(group); 1032 byte[] sourceAddress = (source == null) ? null : 1033 Net.inet6AsByteArray(source); 1034 1035 // join the group 1036 int n = Net.join6(fd, groupAddress, index, sourceAddress); 1037 if (n == IOStatus.UNAVAILABLE) 1038 throw new UnsupportedOperationException(); 1039 1040 key = new MembershipKeyImpl.Type6(this, group, interf, source, 1041 groupAddress, index, sourceAddress); 1042 1043 } else { 1044 // need IPv4 address to identify interface 1045 Inet4Address target = Net.anyInet4Address(interf); 1046 if (target == null) 1047 throw new IOException("Network interface not configured for IPv4"); 1048 1049 int groupAddress = Net.inet4AsInt(group); 1050 int targetAddress = Net.inet4AsInt(target); 1051 int sourceAddress = (source == null) ? 0 : Net.inet4AsInt(source); 1052 1053 // join the group 1054 int n = Net.join4(fd, groupAddress, targetAddress, sourceAddress); 1055 if (n == IOStatus.UNAVAILABLE) 1056 throw new UnsupportedOperationException(); 1057 1058 key = new MembershipKeyImpl.Type4(this, group, interf, source, 1059 groupAddress, targetAddress, sourceAddress); 1060 } 1061 1062 registry.add(key); 1063 return key; 1064 } 1065 } 1066 1067 @Override 1068 public MembershipKey join(InetAddress group, 1069 NetworkInterface interf) 1070 throws IOException 1071 { 1072 return innerJoin(group, interf, null); 1073 } 1074 1075 @Override 1076 public MembershipKey join(InetAddress group, 1077 NetworkInterface interf, 1078 InetAddress source) 1079 throws IOException 1080 { 1081 Objects.requireNonNull(source); 1082 return innerJoin(group, interf, source); 1083 } 1084 1085 // package-private 1086 void drop(MembershipKeyImpl key) { 1087 assert key.channel() == this; 1088 1089 synchronized (stateLock) { 1090 if (!key.isValid()) 1091 return; 1092 1093 try { 1094 if (key instanceof MembershipKeyImpl.Type6) { 1095 MembershipKeyImpl.Type6 key6 = 1096 (MembershipKeyImpl.Type6)key; 1097 Net.drop6(fd, key6.groupAddress(), key6.index(), key6.source()); 1098 } else { 1099 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4)key; 1100 Net.drop4(fd, key4.groupAddress(), key4.interfaceAddress(), 1101 key4.source()); 1102 } 1103 } catch (IOException ioe) { 1104 // should not happen 1105 throw new AssertionError(ioe); 1106 } 1107 1108 key.invalidate(); 1109 registry.remove(key); 1110 } 1111 } 1112 1113 /** 1114 * Block datagrams from given source if a memory to receive all 1115 * datagrams. 1116 */ 1117 void block(MembershipKeyImpl key, InetAddress source) 1118 throws IOException 1119 { 1120 assert key.channel() == this; 1121 assert key.sourceAddress() == null; 1122 1123 synchronized (stateLock) { 1124 if (!key.isValid()) 1125 throw new IllegalStateException("key is no longer valid"); 1126 if (source.isAnyLocalAddress()) 1127 throw new IllegalArgumentException("Source address is a wildcard address"); 1128 if (source.isMulticastAddress()) 1129 throw new IllegalArgumentException("Source address is multicast address"); 1130 if (source.getClass() != key.group().getClass()) 1131 throw new IllegalArgumentException("Source address is different type to group"); 1132 1133 int n; 1134 if (key instanceof MembershipKeyImpl.Type6) { 1135 MembershipKeyImpl.Type6 key6 = 1136 (MembershipKeyImpl.Type6)key; 1137 n = Net.block6(fd, key6.groupAddress(), key6.index(), 1138 Net.inet6AsByteArray(source)); 1139 } else { 1140 MembershipKeyImpl.Type4 key4 = 1141 (MembershipKeyImpl.Type4)key; 1142 n = Net.block4(fd, key4.groupAddress(), key4.interfaceAddress(), 1143 Net.inet4AsInt(source)); 1144 } 1145 if (n == IOStatus.UNAVAILABLE) { 1146 // ancient kernel 1147 throw new UnsupportedOperationException(); 1148 } 1149 } 1150 } 1151 1152 /** 1153 * Unblock given source. 1154 */ 1155 void unblock(MembershipKeyImpl key, InetAddress source) { 1156 assert key.channel() == this; 1157 assert key.sourceAddress() == null; 1158 1159 synchronized (stateLock) { 1160 if (!key.isValid()) 1161 throw new IllegalStateException("key is no longer valid"); 1162 1163 try { 1164 if (key instanceof MembershipKeyImpl.Type6) { 1165 MembershipKeyImpl.Type6 key6 = 1166 (MembershipKeyImpl.Type6)key; 1167 Net.unblock6(fd, key6.groupAddress(), key6.index(), 1168 Net.inet6AsByteArray(source)); 1169 } else { 1170 MembershipKeyImpl.Type4 key4 = 1171 (MembershipKeyImpl.Type4)key; 1172 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(), 1173 Net.inet4AsInt(source)); 1174 } 1175 } catch (IOException ioe) { 1176 // should not happen 1177 throw new AssertionError(ioe); 1178 } 1179 } 1180 } 1181 1182 /** 1183 * Closes the socket if there are no I/O operations in progress and the 1184 * channel is not registered with a Selector. 1185 */ 1186 private boolean tryClose() throws IOException { 1187 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 1188 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { 1189 state = ST_CLOSED; 1190 try { 1191 // close socket 1192 cleaner.clean(); 1193 } catch (UncheckedIOException ioe) { 1194 throw ioe.getCause(); 1195 } 1196 return true; 1197 } else { 1198 return false; 1199 } 1200 } 1201 1202 /** 1203 * Invokes tryClose to attempt to close the socket. 1204 * 1205 * This method is used for deferred closing by I/O and Selector operations. 1206 */ 1207 private void tryFinishClose() { 1208 try { 1209 tryClose(); 1210 } catch (IOException ignore) { } 1211 } 1212 1213 /** 1214 * Closes this channel when configured in blocking mode. 1215 * 1216 * If there is an I/O operation in progress then the socket is pre-closed 1217 * and the I/O threads signalled, in which case the final close is deferred 1218 * until all I/O operations complete. 1219 */ 1220 private void implCloseBlockingMode() throws IOException { 1221 synchronized (stateLock) { 1222 assert state < ST_CLOSING; 1223 state = ST_CLOSING; 1224 1225 // if member of any multicast groups then invalidate the keys 1226 if (registry != null) 1227 registry.invalidateAll(); 1228 1229 if (!tryClose()) { 1230 long reader = readerThread; 1231 long writer = writerThread; 1232 if (reader != 0 || writer != 0) { 1233 nd.preClose(fd); 1234 if (reader != 0) 1235 NativeThread.signal(reader); 1236 if (writer != 0) 1237 NativeThread.signal(writer); 1238 } 1239 } 1240 } 1241 } 1242 1243 /** 1244 * Closes this channel when configured in non-blocking mode. 1245 * 1246 * If the channel is registered with a Selector then the close is deferred 1247 * until the channel is flushed from all Selectors. 1248 */ 1249 private void implCloseNonBlockingMode() throws IOException { 1250 synchronized (stateLock) { 1251 assert state < ST_CLOSING; 1252 state = ST_CLOSING; 1253 1254 // if member of any multicast groups then invalidate the keys 1255 if (registry != null) 1256 registry.invalidateAll(); 1257 } 1258 1259 // wait for any read/write operations to complete before trying to close 1260 readLock.lock(); 1261 readLock.unlock(); 1262 writeLock.lock(); 1263 writeLock.unlock(); 1264 synchronized (stateLock) { 1265 if (state == ST_CLOSING) { 1266 tryClose(); 1267 } 1268 } 1269 } 1270 1271 /** 1272 * Invoked by implCloseChannel to close the channel. 1273 */ 1274 @Override 1275 protected void implCloseSelectableChannel() throws IOException { 1276 assert !isOpen(); 1277 if (isBlocking()) { 1278 implCloseBlockingMode(); 1279 } else { 1280 implCloseNonBlockingMode(); 1281 } 1282 } 1283 1284 @Override 1285 public void kill() { 1286 synchronized (stateLock) { 1287 if (state == ST_CLOSING) { 1288 tryFinishClose(); 1289 } 1290 } 1291 } 1292 1293 /** 1294 * Translates native poll revent set into a ready operation set 1295 */ 1296 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1297 int intOps = ski.nioInterestOps(); 1298 int oldOps = ski.nioReadyOps(); 1299 int newOps = initialOps; 1300 1301 if ((ops & Net.POLLNVAL) != 0) { 1302 // This should only happen if this channel is pre-closed while a 1303 // selection operation is in progress 1304 // ## Throw an error if this channel has not been pre-closed 1305 return false; 1306 } 1307 1308 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1309 newOps = intOps; 1310 ski.nioReadyOps(newOps); 1311 return (newOps & ~oldOps) != 0; 1312 } 1313 1314 if (((ops & Net.POLLIN) != 0) && 1315 ((intOps & SelectionKey.OP_READ) != 0)) 1316 newOps |= SelectionKey.OP_READ; 1317 1318 if (((ops & Net.POLLOUT) != 0) && 1319 ((intOps & SelectionKey.OP_WRITE) != 0)) 1320 newOps |= SelectionKey.OP_WRITE; 1321 1322 ski.nioReadyOps(newOps); 1323 return (newOps & ~oldOps) != 0; 1324 } 1325 1326 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1327 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1328 } 1329 1330 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1331 return translateReadyOps(ops, 0, ski); 1332 } 1333 1334 /** 1335 * Poll this channel's socket for reading up to the given timeout. 1336 * @return {@code true} if the socket is polled 1337 */ 1338 boolean pollRead(long timeout) throws IOException { 1339 boolean blocking = isBlocking(); 1340 assert Thread.holdsLock(blockingLock()) && blocking; 1341 1342 readLock.lock(); 1343 try { 1344 boolean polled = false; 1345 try { 1346 beginRead(blocking, false); 1347 int events = Net.poll(fd, Net.POLLIN, timeout); 1348 polled = (events != 0); 1349 } finally { 1350 endRead(blocking, polled); 1351 } 1352 return polled; 1353 } finally { 1354 readLock.unlock(); 1355 } 1356 } 1357 1358 /** 1359 * Translates an interest operation set into a native poll event set 1360 */ 1361 public int translateInterestOps(int ops) { 1362 int newOps = 0; 1363 if ((ops & SelectionKey.OP_READ) != 0) 1364 newOps |= Net.POLLIN; 1365 if ((ops & SelectionKey.OP_WRITE) != 0) 1366 newOps |= Net.POLLOUT; 1367 if ((ops & SelectionKey.OP_CONNECT) != 0) 1368 newOps |= Net.POLLIN; 1369 return newOps; 1370 } 1371 1372 public FileDescriptor getFD() { 1373 return fd; 1374 } 1375 1376 public int getFDVal() { 1377 return fdVal; 1378 } 1379 1380 /** 1381 * Returns an action to close the given file descriptor. 1382 */ 1383 private static Runnable closerFor(FileDescriptor fd) { 1384 return () -> { 1385 try { 1386 nd.close(fd); 1387 } catch (IOException ioe) { 1388 throw new UncheckedIOException(ioe); 1389 } finally { 1390 // decrement 1391 ResourceManager.afterUdpClose(); 1392 } 1393 }; 1394 } 1395 1396 // -- Native methods -- 1397 1398 private static native void initIDs(); 1399 1400 private static native void disconnect0(FileDescriptor fd, boolean isIPv6) 1401 throws IOException; 1402 1403 private native int receive0(FileDescriptor fd, long address, int len, 1404 boolean connected) 1405 throws IOException; 1406 1407 private native int send0(boolean preferIPv6, FileDescriptor fd, long address, 1408 int len, InetAddress addr, int port) 1409 throws IOException; 1410 1411 static { 1412 IOUtil.load(); 1413 initIDs(); 1414 } 1415 }