1 /* 2 * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.io.UncheckedIOException; 31 import java.lang.ref.Cleaner.Cleanable; 32 import java.net.DatagramSocket; 33 import java.net.Inet4Address; 34 import java.net.Inet6Address; 35 import java.net.InetAddress; 36 import java.net.InetSocketAddress; 37 import java.net.NetworkInterface; 38 import java.net.PortUnreachableException; 39 import java.net.ProtocolFamily; 40 import java.net.SocketAddress; 41 import java.net.SocketOption; 42 import java.net.StandardProtocolFamily; 43 import java.net.StandardSocketOptions; 44 import java.nio.ByteBuffer; 45 import java.nio.channels.AlreadyBoundException; 46 import java.nio.channels.AlreadyConnectedException; 47 import java.nio.channels.AsynchronousCloseException; 48 import java.nio.channels.ClosedChannelException; 49 import java.nio.channels.DatagramChannel; 50 import java.nio.channels.MembershipKey; 51 import java.nio.channels.NotYetConnectedException; 52 import java.nio.channels.SelectionKey; 53 import java.nio.channels.spi.SelectorProvider; 54 import java.util.Collections; 55 import java.util.HashSet; 56 import java.util.Objects; 57 import java.util.Set; 58 import java.util.concurrent.locks.ReentrantLock; 59 60 import jdk.internal.ref.CleanerFactory; 61 import sun.net.ResourceManager; 62 import sun.net.ext.ExtendedSocketOptions; 63 import sun.net.util.IPAddressUtil; 64 65 /** 66 * An implementation of DatagramChannels. 67 */ 68 69 class DatagramChannelImpl 70 extends DatagramChannel 71 implements SelChImpl 72 { 73 // Used to make native read and write calls 74 private static final NativeDispatcher nd = new DatagramDispatcher(); 75 76 // The protocol family of the socket 77 private final ProtocolFamily family; 78 79 // Our file descriptor 80 private final FileDescriptor fd; 81 private final int fdVal; 82 private final Cleanable cleaner; 83 84 // Cached InetAddress and port for unconnected DatagramChannels 85 // used by receive0 86 private InetAddress cachedSenderInetAddress; 87 private int cachedSenderPort; 88 89 // Lock held by current reading or connecting thread 90 private final ReentrantLock readLock = new ReentrantLock(); 91 92 // Lock held by current writing or connecting thread 93 private final ReentrantLock writeLock = new ReentrantLock(); 94 95 // Lock held by any thread that modifies the state fields declared below 96 // DO NOT invoke a blocking I/O operation while holding this lock! 97 private final Object stateLock = new Object(); 98 99 // -- The following fields are protected by stateLock 100 101 // State (does not necessarily increase monotonically) 102 private static final int ST_UNCONNECTED = 0; 103 private static final int ST_CONNECTED = 1; 104 private static final int ST_CLOSING = 2; 105 private static final int ST_CLOSED = 3; 106 private int state; 107 108 // IDs of native threads doing reads and writes, for signalling 109 private long readerThread; 110 private long writerThread; 111 112 // Binding and remote address (when connected) 113 private InetSocketAddress localAddress; 114 private InetSocketAddress remoteAddress; 115 116 // Our socket adaptor, if any 117 private DatagramSocket socket; 118 119 // Multicast support 120 private MembershipRegistry registry; 121 122 // set true when socket is bound and SO_REUSEADDRESS is emulated 123 private boolean reuseAddressEmulated; 124 125 // set true/false when socket is already bound and SO_REUSEADDR is emulated 126 private boolean isReuseAddress; 127 128 // -- End of fields protected by stateLock 129 130 public DatagramChannelImpl(SelectorProvider sp) 131 throws IOException 132 { 133 super(sp); 134 ResourceManager.beforeUdpCreate(); 135 try { 136 this.family = Net.isIPv6Available() 137 ? StandardProtocolFamily.INET6 138 : StandardProtocolFamily.INET; 139 this.fd = Net.socket(family, false); 140 this.fdVal = IOUtil.fdVal(fd); 141 } catch (IOException ioe) { 142 ResourceManager.afterUdpClose(); 143 throw ioe; 144 } 145 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); 146 } 147 148 public DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family) 149 throws IOException 150 { 151 super(sp); 152 Objects.requireNonNull(family, "'family' is null"); 153 if ((family != StandardProtocolFamily.INET) && 154 (family != StandardProtocolFamily.INET6)) { 155 throw new UnsupportedOperationException("Protocol family not supported"); 156 } 157 if (family == StandardProtocolFamily.INET6) { 158 if (!Net.isIPv6Available()) { 159 throw new UnsupportedOperationException("IPv6 not available"); 160 } 161 } 162 163 ResourceManager.beforeUdpCreate(); 164 try { 165 this.family = family; 166 this.fd = Net.socket(family, false); 167 this.fdVal = IOUtil.fdVal(fd); 168 } catch (IOException ioe) { 169 ResourceManager.afterUdpClose(); 170 throw ioe; 171 } 172 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); 173 } 174 175 public DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd) 176 throws IOException 177 { 178 super(sp); 179 180 // increment UDP count to match decrement when closing 181 ResourceManager.beforeUdpCreate(); 182 183 this.family = Net.isIPv6Available() 184 ? StandardProtocolFamily.INET6 185 : StandardProtocolFamily.INET; 186 this.fd = fd; 187 this.fdVal = IOUtil.fdVal(fd); 188 this.cleaner = CleanerFactory.cleaner().register(this, closerFor(fd)); 189 synchronized (stateLock) { 190 this.localAddress = Net.localAddress(fd); 191 } 192 } 193 194 // @throws ClosedChannelException if channel is closed 195 private void ensureOpen() throws ClosedChannelException { 196 if (!isOpen()) 197 throw new ClosedChannelException(); 198 } 199 200 @Override 201 public DatagramSocket socket() { 202 synchronized (stateLock) { 203 if (socket == null) 204 socket = DatagramSocketAdaptor.create(this); 205 return socket; 206 } 207 } 208 209 @Override 210 public SocketAddress getLocalAddress() throws IOException { 211 synchronized (stateLock) { 212 ensureOpen(); 213 // Perform security check before returning address 214 return Net.getRevealedLocalAddress(localAddress); 215 } 216 } 217 218 @Override 219 public SocketAddress getRemoteAddress() throws IOException { 220 synchronized (stateLock) { 221 ensureOpen(); 222 return remoteAddress; 223 } 224 } 225 226 @Override 227 public <T> DatagramChannel setOption(SocketOption<T> name, T value) 228 throws IOException 229 { 230 Objects.requireNonNull(name); 231 if (!supportedOptions().contains(name)) 232 throw new UnsupportedOperationException("'" + name + "' not supported"); 233 if (!name.type().isInstance(value)) 234 throw new IllegalArgumentException("Invalid value '" + value + "'"); 235 236 synchronized (stateLock) { 237 ensureOpen(); 238 239 if (name == StandardSocketOptions.IP_TOS || 240 name == StandardSocketOptions.IP_MULTICAST_TTL || 241 name == StandardSocketOptions.IP_MULTICAST_LOOP) 242 { 243 // options are protocol dependent 244 Net.setSocketOption(fd, family, name, value); 245 return this; 246 } 247 248 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 249 NetworkInterface interf = (NetworkInterface)value; 250 if (family == StandardProtocolFamily.INET6) { 251 int index = interf.getIndex(); 252 if (index == -1) 253 throw new IOException("Network interface cannot be identified"); 254 Net.setInterface6(fd, index); 255 } else { 256 // need IPv4 address to identify interface 257 Inet4Address target = Net.anyInet4Address(interf); 258 if (target == null) 259 throw new IOException("Network interface not configured for IPv4"); 260 int targetAddress = Net.inet4AsInt(target); 261 Net.setInterface4(fd, targetAddress); 262 } 263 return this; 264 } 265 if (name == StandardSocketOptions.SO_REUSEADDR 266 && Net.useExclusiveBind() && localAddress != null) { 267 reuseAddressEmulated = true; 268 this.isReuseAddress = (Boolean)value; 269 } 270 271 // remaining options don't need any special handling 272 Net.setSocketOption(fd, Net.UNSPEC, name, value); 273 return this; 274 } 275 } 276 277 @Override 278 @SuppressWarnings("unchecked") 279 public <T> T getOption(SocketOption<T> name) 280 throws IOException 281 { 282 Objects.requireNonNull(name); 283 if (!supportedOptions().contains(name)) 284 throw new UnsupportedOperationException("'" + name + "' not supported"); 285 286 synchronized (stateLock) { 287 ensureOpen(); 288 289 if (name == StandardSocketOptions.IP_TOS || 290 name == StandardSocketOptions.IP_MULTICAST_TTL || 291 name == StandardSocketOptions.IP_MULTICAST_LOOP) 292 { 293 return (T) Net.getSocketOption(fd, family, name); 294 } 295 296 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 297 if (family == StandardProtocolFamily.INET) { 298 int address = Net.getInterface4(fd); 299 if (address == 0) 300 return null; // default interface 301 302 InetAddress ia = Net.inet4FromInt(address); 303 NetworkInterface ni = NetworkInterface.getByInetAddress(ia); 304 if (ni == null) 305 throw new IOException("Unable to map address to interface"); 306 return (T) ni; 307 } else { 308 int index = Net.getInterface6(fd); 309 if (index == 0) 310 return null; // default interface 311 312 NetworkInterface ni = NetworkInterface.getByIndex(index); 313 if (ni == null) 314 throw new IOException("Unable to map index to interface"); 315 return (T) ni; 316 } 317 } 318 319 if (name == StandardSocketOptions.SO_REUSEADDR && reuseAddressEmulated) { 320 return (T)Boolean.valueOf(isReuseAddress); 321 } 322 323 // no special handling 324 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); 325 } 326 } 327 328 private static class DefaultOptionsHolder { 329 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 330 331 private static Set<SocketOption<?>> defaultOptions() { 332 HashSet<SocketOption<?>> set = new HashSet<>(); 333 set.add(StandardSocketOptions.SO_SNDBUF); 334 set.add(StandardSocketOptions.SO_RCVBUF); 335 set.add(StandardSocketOptions.SO_REUSEADDR); 336 if (Net.isReusePortAvailable()) { 337 set.add(StandardSocketOptions.SO_REUSEPORT); 338 } 339 set.add(StandardSocketOptions.SO_BROADCAST); 340 set.add(StandardSocketOptions.IP_TOS); 341 set.add(StandardSocketOptions.IP_MULTICAST_IF); 342 set.add(StandardSocketOptions.IP_MULTICAST_TTL); 343 set.add(StandardSocketOptions.IP_MULTICAST_LOOP); 344 set.addAll(ExtendedSocketOptions.datagramSocketOptions()); 345 return Collections.unmodifiableSet(set); 346 } 347 } 348 349 @Override 350 public final Set<SocketOption<?>> supportedOptions() { 351 return DefaultOptionsHolder.defaultOptions; 352 } 353 354 /** 355 * Marks the beginning of a read operation that might block. 356 * 357 * @param blocking true if configured blocking 358 * @param mustBeConnected true if the socket must be connected 359 * @return remote address if connected 360 * @throws ClosedChannelException if the channel is closed 361 * @throws NotYetConnectedException if mustBeConnected and not connected 362 * @throws IOException if socket not bound and cannot be bound 363 */ 364 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected) 365 throws IOException 366 { 367 if (blocking) { 368 // set hook for Thread.interrupt 369 begin(); 370 } 371 SocketAddress remote; 372 synchronized (stateLock) { 373 ensureOpen(); 374 remote = remoteAddress; 375 if ((remote == null) && mustBeConnected) 376 throw new NotYetConnectedException(); 377 if (localAddress == null) 378 bindInternal(null); 379 if (blocking) 380 readerThread = NativeThread.current(); 381 } 382 return remote; 383 } 384 385 /** 386 * Marks the end of a read operation that may have blocked. 387 * 388 * @throws AsynchronousCloseException if the channel was closed asynchronously 389 */ 390 private void endRead(boolean blocking, boolean completed) 391 throws AsynchronousCloseException 392 { 393 if (blocking) { 394 synchronized (stateLock) { 395 readerThread = 0; 396 if (state == ST_CLOSING) { 397 tryFinishClose(); 398 } 399 } 400 // remove hook for Thread.interrupt 401 end(completed); 402 } 403 } 404 405 private SocketAddress sender; // Set by receive0 (## ugh) 406 407 private static final int INITIAL = -100; // some unambiguous negative value 408 409 @Override 410 public SocketAddress receive(ByteBuffer dst) throws IOException { 411 if (dst.isReadOnly()) 412 throw new IllegalArgumentException("Read-only buffer"); 413 414 readLock.lock(); 415 try { 416 boolean blocking = isBlocking(); 417 int n = INITIAL; 418 ByteBuffer bb = null; 419 try { 420 SocketAddress remote = beginRead(blocking, false); 421 boolean connected = (remote != null); 422 SecurityManager sm = System.getSecurityManager(); 423 if (connected || (sm == null)) { 424 // connected or no security manager 425 n = receive(fd, dst, connected); 426 if (blocking) { 427 while (IOStatus.okayToRetry(n) && isOpen()) { 428 park(Net.POLLIN); 429 n = receive(fd, dst, connected); 430 } 431 } else if (n == IOStatus.UNAVAILABLE) { 432 return null; 433 } 434 } else { 435 // Cannot receive into user's buffer when running with a 436 // security manager and not connected 437 bb = Util.getTemporaryDirectBuffer(dst.remaining()); 438 for (;;) { 439 n = receive(fd, bb, connected); 440 if (blocking) { 441 while (IOStatus.okayToRetry(n) && isOpen()) { 442 park(Net.POLLIN); 443 n = receive(fd, bb, connected); 444 } 445 } else if (n == IOStatus.UNAVAILABLE) { 446 return null; 447 } 448 InetSocketAddress isa = (InetSocketAddress)sender; 449 try { 450 sm.checkAccept(isa.getAddress().getHostAddress(), 451 isa.getPort()); 452 } catch (SecurityException se) { 453 // Ignore packet 454 bb.clear(); 455 n = INITIAL; 456 continue; 457 } 458 bb.flip(); 459 dst.put(bb); 460 break; 461 } 462 } 463 assert sender != null; 464 return sender; 465 } finally { 466 if (bb != null) 467 Util.releaseTemporaryDirectBuffer(bb); 468 endRead(blocking, n >= 0); 469 assert IOStatus.check(n); 470 } 471 } finally { 472 readLock.unlock(); 473 } 474 } 475 476 private int receive(FileDescriptor fd, ByteBuffer dst, boolean connected) 477 throws IOException 478 { 479 int pos = dst.position(); 480 int lim = dst.limit(); 481 assert (pos <= lim); 482 int rem = (pos <= lim ? lim - pos : 0); 483 if (dst instanceof DirectBuffer && rem > 0) 484 return receiveIntoNativeBuffer(fd, dst, rem, pos, connected); 485 486 // Substitute a native buffer. If the supplied buffer is empty 487 // we must instead use a nonempty buffer, otherwise the call 488 // will not block waiting for a datagram on some platforms. 489 int newSize = Math.max(rem, 1); 490 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 491 try { 492 int n = receiveIntoNativeBuffer(fd, bb, newSize, 0, connected); 493 bb.flip(); 494 if (n > 0 && rem > 0) 495 dst.put(bb); 496 return n; 497 } finally { 498 Util.releaseTemporaryDirectBuffer(bb); 499 } 500 } 501 502 private int receiveIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, 503 int rem, int pos, boolean connected) 504 throws IOException 505 { 506 int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected); 507 if (n > 0) 508 bb.position(pos + n); 509 return n; 510 } 511 512 @Override 513 public int send(ByteBuffer src, SocketAddress target) 514 throws IOException 515 { 516 Objects.requireNonNull(src); 517 InetSocketAddress isa = Net.checkAddress(target, family); 518 519 writeLock.lock(); 520 try { 521 boolean blocking = isBlocking(); 522 int n = 0; 523 try { 524 SocketAddress remote = beginWrite(blocking, false); 525 if (remote != null) { 526 // connected 527 if (!target.equals(remote)) { 528 throw new AlreadyConnectedException(); 529 } 530 n = IOUtil.write(fd, src, -1, nd); 531 if (blocking) { 532 while (IOStatus.okayToRetry(n) && isOpen()) { 533 park(Net.POLLOUT); 534 n = IOUtil.write(fd, src, -1, nd); 535 } 536 } 537 } else { 538 // not connected 539 SecurityManager sm = System.getSecurityManager(); 540 InetAddress ia = isa.getAddress(); 541 if (sm != null) { 542 if (ia.isMulticastAddress()) { 543 sm.checkMulticast(ia); 544 } else { 545 sm.checkConnect(ia.getHostAddress(), isa.getPort()); 546 } 547 } 548 if (ia.isLinkLocalAddress()) 549 isa = IPAddressUtil.toScopedAddress(isa); 550 n = send(fd, src, isa); 551 if (blocking) { 552 while (IOStatus.okayToRetry(n) && isOpen()) { 553 park(Net.POLLOUT); 554 n = send(fd, src, isa); 555 } 556 } 557 } 558 } finally { 559 endWrite(blocking, n > 0); 560 assert IOStatus.check(n); 561 } 562 return IOStatus.normalize(n); 563 } finally { 564 writeLock.unlock(); 565 } 566 } 567 568 private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target) 569 throws IOException 570 { 571 if (src instanceof DirectBuffer) 572 return sendFromNativeBuffer(fd, src, target); 573 574 // Substitute a native buffer 575 int pos = src.position(); 576 int lim = src.limit(); 577 assert (pos <= lim); 578 int rem = (pos <= lim ? lim - pos : 0); 579 580 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 581 try { 582 bb.put(src); 583 bb.flip(); 584 // Do not update src until we see how many bytes were written 585 src.position(pos); 586 587 int n = sendFromNativeBuffer(fd, bb, target); 588 if (n > 0) { 589 // now update src 590 src.position(pos + n); 591 } 592 return n; 593 } finally { 594 Util.releaseTemporaryDirectBuffer(bb); 595 } 596 } 597 598 private int sendFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, 599 InetSocketAddress target) 600 throws IOException 601 { 602 int pos = bb.position(); 603 int lim = bb.limit(); 604 assert (pos <= lim); 605 int rem = (pos <= lim ? lim - pos : 0); 606 607 boolean preferIPv6 = (family != StandardProtocolFamily.INET); 608 int written; 609 try { 610 written = send0(preferIPv6, fd, ((DirectBuffer)bb).address() + pos, 611 rem, target.getAddress(), target.getPort()); 612 } catch (PortUnreachableException pue) { 613 if (isConnected()) 614 throw pue; 615 written = rem; 616 } 617 if (written > 0) 618 bb.position(pos + written); 619 return written; 620 } 621 622 @Override 623 public int read(ByteBuffer buf) throws IOException { 624 Objects.requireNonNull(buf); 625 626 readLock.lock(); 627 try { 628 boolean blocking = isBlocking(); 629 int n = 0; 630 try { 631 beginRead(blocking, true); 632 n = IOUtil.read(fd, buf, -1, nd); 633 if (blocking) { 634 while (IOStatus.okayToRetry(n) && isOpen()) { 635 park(Net.POLLIN); 636 n = IOUtil.read(fd, buf, -1, nd); 637 } 638 } 639 } finally { 640 endRead(blocking, n > 0); 641 assert IOStatus.check(n); 642 } 643 return IOStatus.normalize(n); 644 } finally { 645 readLock.unlock(); 646 } 647 } 648 649 @Override 650 public long read(ByteBuffer[] dsts, int offset, int length) 651 throws IOException 652 { 653 Objects.checkFromIndexSize(offset, length, dsts.length); 654 655 readLock.lock(); 656 try { 657 boolean blocking = isBlocking(); 658 long n = 0; 659 try { 660 beginRead(blocking, true); 661 n = IOUtil.read(fd, dsts, offset, length, nd); 662 if (blocking) { 663 while (IOStatus.okayToRetry(n) && isOpen()) { 664 park(Net.POLLIN); 665 n = IOUtil.read(fd, dsts, offset, length, nd); 666 } 667 } 668 } finally { 669 endRead(blocking, n > 0); 670 assert IOStatus.check(n); 671 } 672 return IOStatus.normalize(n); 673 } finally { 674 readLock.unlock(); 675 } 676 } 677 678 /** 679 * Marks the beginning of a write operation that might block. 680 * @param blocking true if configured blocking 681 * @param mustBeConnected true if the socket must be connected 682 * @return remote address if connected 683 * @throws ClosedChannelException if the channel is closed 684 * @throws NotYetConnectedException if mustBeConnected and not connected 685 * @throws IOException if socket not bound and cannot be bound 686 */ 687 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected) 688 throws IOException 689 { 690 if (blocking) { 691 // set hook for Thread.interrupt 692 begin(); 693 } 694 SocketAddress remote; 695 synchronized (stateLock) { 696 ensureOpen(); 697 remote = remoteAddress; 698 if ((remote == null) && mustBeConnected) 699 throw new NotYetConnectedException(); 700 if (localAddress == null) 701 bindInternal(null); 702 if (blocking) 703 writerThread = NativeThread.current(); 704 } 705 return remote; 706 } 707 708 /** 709 * Marks the end of a write operation that may have blocked. 710 * 711 * @throws AsynchronousCloseException if the channel was closed asynchronously 712 */ 713 private void endWrite(boolean blocking, boolean completed) 714 throws AsynchronousCloseException 715 { 716 if (blocking) { 717 synchronized (stateLock) { 718 writerThread = 0; 719 if (state == ST_CLOSING) { 720 tryFinishClose(); 721 } 722 } 723 // remove hook for Thread.interrupt 724 end(completed); 725 } 726 } 727 728 @Override 729 public int write(ByteBuffer buf) throws IOException { 730 Objects.requireNonNull(buf); 731 732 writeLock.lock(); 733 try { 734 boolean blocking = isBlocking(); 735 int n = 0; 736 try { 737 beginWrite(blocking, true); 738 n = IOUtil.write(fd, buf, -1, nd); 739 if (blocking) { 740 while (IOStatus.okayToRetry(n) && isOpen()) { 741 park(Net.POLLOUT); 742 n = IOUtil.write(fd, buf, -1, nd); 743 } 744 } 745 } finally { 746 endWrite(blocking, n > 0); 747 assert IOStatus.check(n); 748 } 749 return IOStatus.normalize(n); 750 } finally { 751 writeLock.unlock(); 752 } 753 } 754 755 @Override 756 public long write(ByteBuffer[] srcs, int offset, int length) 757 throws IOException 758 { 759 Objects.checkFromIndexSize(offset, length, srcs.length); 760 761 writeLock.lock(); 762 try { 763 boolean blocking = isBlocking(); 764 long n = 0; 765 try { 766 beginWrite(blocking, true); 767 n = IOUtil.write(fd, srcs, offset, length, nd); 768 if (blocking) { 769 while (IOStatus.okayToRetry(n) && isOpen()) { 770 park(Net.POLLOUT); 771 n = IOUtil.write(fd, srcs, offset, length, nd); 772 } 773 } 774 } finally { 775 endWrite(blocking, n > 0); 776 assert IOStatus.check(n); 777 } 778 return IOStatus.normalize(n); 779 } finally { 780 writeLock.unlock(); 781 } 782 } 783 784 @Override 785 protected void implConfigureBlocking(boolean block) throws IOException { 786 readLock.lock(); 787 try { 788 writeLock.lock(); 789 try { 790 synchronized (stateLock) { 791 ensureOpen(); 792 IOUtil.configureBlocking(fd, block); 793 } 794 } finally { 795 writeLock.unlock(); 796 } 797 } finally { 798 readLock.unlock(); 799 } 800 } 801 802 InetSocketAddress localAddress() { 803 synchronized (stateLock) { 804 return localAddress; 805 } 806 } 807 808 InetSocketAddress remoteAddress() { 809 synchronized (stateLock) { 810 return remoteAddress; 811 } 812 } 813 814 @Override 815 public DatagramChannel bind(SocketAddress local) throws IOException { 816 readLock.lock(); 817 try { 818 writeLock.lock(); 819 try { 820 synchronized (stateLock) { 821 ensureOpen(); 822 if (localAddress != null) 823 throw new AlreadyBoundException(); 824 bindInternal(local); 825 } 826 } finally { 827 writeLock.unlock(); 828 } 829 } finally { 830 readLock.unlock(); 831 } 832 return this; 833 } 834 835 private void bindInternal(SocketAddress local) throws IOException { 836 assert Thread.holdsLock(stateLock )&& (localAddress == null); 837 838 InetSocketAddress isa; 839 if (local == null) { 840 // only Inet4Address allowed with IPv4 socket 841 if (family == StandardProtocolFamily.INET) { 842 isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0); 843 } else { 844 isa = new InetSocketAddress(0); 845 } 846 } else { 847 isa = Net.checkAddress(local, family); 848 } 849 SecurityManager sm = System.getSecurityManager(); 850 if (sm != null) 851 sm.checkListen(isa.getPort()); 852 853 Net.bind(family, fd, isa.getAddress(), isa.getPort()); 854 localAddress = Net.localAddress(fd); 855 } 856 857 @Override 858 public boolean isConnected() { 859 synchronized (stateLock) { 860 return (state == ST_CONNECTED); 861 } 862 } 863 864 @Override 865 public DatagramChannel connect(SocketAddress sa) throws IOException { 866 InetSocketAddress isa = Net.checkAddress(sa, family); 867 SecurityManager sm = System.getSecurityManager(); 868 if (sm != null) { 869 InetAddress ia = isa.getAddress(); 870 if (ia.isMulticastAddress()) { 871 sm.checkMulticast(ia); 872 } else { 873 sm.checkConnect(ia.getHostAddress(), isa.getPort()); 874 sm.checkAccept(ia.getHostAddress(), isa.getPort()); 875 } 876 } 877 878 readLock.lock(); 879 try { 880 writeLock.lock(); 881 try { 882 synchronized (stateLock) { 883 ensureOpen(); 884 if (state == ST_CONNECTED) 885 throw new AlreadyConnectedException(); 886 887 // ensure that the socket is bound 888 if (localAddress == null) { 889 bindInternal(null); 890 } 891 892 int n = Net.connect(family, 893 fd, 894 isa.getAddress(), 895 isa.getPort()); 896 if (n <= 0) 897 throw new Error(); // Can't happen 898 899 // connected 900 remoteAddress = isa; 901 state = ST_CONNECTED; 902 903 // refresh local address 904 localAddress = Net.localAddress(fd); 905 906 // flush any packets already received. 907 boolean blocking = isBlocking(); 908 if (blocking) { 909 IOUtil.configureBlocking(fd, false); 910 } 911 try { 912 ByteBuffer buf = ByteBuffer.allocate(100); 913 while (receive(fd, buf, false) > 0) { 914 buf.clear(); 915 } 916 } finally { 917 if (blocking) { 918 IOUtil.configureBlocking(fd, true); 919 } 920 } 921 } 922 } finally { 923 writeLock.unlock(); 924 } 925 } finally { 926 readLock.unlock(); 927 } 928 return this; 929 } 930 931 @Override 932 public DatagramChannel disconnect() throws IOException { 933 readLock.lock(); 934 try { 935 writeLock.lock(); 936 try { 937 synchronized (stateLock) { 938 if (!isOpen() || (state != ST_CONNECTED)) 939 return this; 940 941 // disconnect socket 942 boolean isIPv6 = (family == StandardProtocolFamily.INET6); 943 disconnect0(fd, isIPv6); 944 945 // no longer connected 946 remoteAddress = null; 947 state = ST_UNCONNECTED; 948 949 // check whether rebind is needed 950 InetSocketAddress isa = Net.localAddress(fd); 951 if (isa.getPort() == 0) { 952 // On Linux, if bound to ephemeral port, 953 // disconnect does not preserve that port. 954 // In this case, try to rebind to the previous port. 955 int port = localAddress.getPort(); 956 localAddress = isa; // in case Net.bind fails 957 Net.bind(family, fd, isa.getAddress(), port); 958 isa = Net.localAddress(fd); // refresh address 959 assert isa.getPort() == port; 960 } 961 962 // refresh localAddress 963 localAddress = isa; 964 } 965 } finally { 966 writeLock.unlock(); 967 } 968 } finally { 969 readLock.unlock(); 970 } 971 return this; 972 } 973 974 /** 975 * Joins channel's socket to the given group/interface and 976 * optional source address. 977 */ 978 private MembershipKey innerJoin(InetAddress group, 979 NetworkInterface interf, 980 InetAddress source) 981 throws IOException 982 { 983 if (!group.isMulticastAddress()) 984 throw new IllegalArgumentException("Group not a multicast address"); 985 986 // check multicast address is compatible with this socket 987 if (group instanceof Inet4Address) { 988 if (family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group()) 989 throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group"); 990 } else if (group instanceof Inet6Address) { 991 if (family != StandardProtocolFamily.INET6) 992 throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group"); 993 } else { 994 throw new IllegalArgumentException("Address type not supported"); 995 } 996 997 // check source address 998 if (source != null) { 999 if (source.isAnyLocalAddress()) 1000 throw new IllegalArgumentException("Source address is a wildcard address"); 1001 if (source.isMulticastAddress()) 1002 throw new IllegalArgumentException("Source address is multicast address"); 1003 if (source.getClass() != group.getClass()) 1004 throw new IllegalArgumentException("Source address is different type to group"); 1005 } 1006 1007 SecurityManager sm = System.getSecurityManager(); 1008 if (sm != null) 1009 sm.checkMulticast(group); 1010 1011 synchronized (stateLock) { 1012 ensureOpen(); 1013 1014 // check the registry to see if we are already a member of the group 1015 if (registry == null) { 1016 registry = new MembershipRegistry(); 1017 } else { 1018 // return existing membership key 1019 MembershipKey key = registry.checkMembership(group, interf, source); 1020 if (key != null) 1021 return key; 1022 } 1023 1024 MembershipKeyImpl key; 1025 if ((family == StandardProtocolFamily.INET6) && 1026 ((group instanceof Inet6Address) || Net.canJoin6WithIPv4Group())) 1027 { 1028 int index = interf.getIndex(); 1029 if (index == -1) 1030 throw new IOException("Network interface cannot be identified"); 1031 1032 // need multicast and source address as byte arrays 1033 byte[] groupAddress = Net.inet6AsByteArray(group); 1034 byte[] sourceAddress = (source == null) ? null : 1035 Net.inet6AsByteArray(source); 1036 1037 // join the group 1038 int n = Net.join6(fd, groupAddress, index, sourceAddress); 1039 if (n == IOStatus.UNAVAILABLE) 1040 throw new UnsupportedOperationException(); 1041 1042 key = new MembershipKeyImpl.Type6(this, group, interf, source, 1043 groupAddress, index, sourceAddress); 1044 1045 } else { 1046 // need IPv4 address to identify interface 1047 Inet4Address target = Net.anyInet4Address(interf); 1048 if (target == null) 1049 throw new IOException("Network interface not configured for IPv4"); 1050 1051 int groupAddress = Net.inet4AsInt(group); 1052 int targetAddress = Net.inet4AsInt(target); 1053 int sourceAddress = (source == null) ? 0 : Net.inet4AsInt(source); 1054 1055 // join the group 1056 int n = Net.join4(fd, groupAddress, targetAddress, sourceAddress); 1057 if (n == IOStatus.UNAVAILABLE) 1058 throw new UnsupportedOperationException(); 1059 1060 key = new MembershipKeyImpl.Type4(this, group, interf, source, 1061 groupAddress, targetAddress, sourceAddress); 1062 } 1063 1064 registry.add(key); 1065 return key; 1066 } 1067 } 1068 1069 @Override 1070 public MembershipKey join(InetAddress group, 1071 NetworkInterface interf) 1072 throws IOException 1073 { 1074 return innerJoin(group, interf, null); 1075 } 1076 1077 @Override 1078 public MembershipKey join(InetAddress group, 1079 NetworkInterface interf, 1080 InetAddress source) 1081 throws IOException 1082 { 1083 Objects.requireNonNull(source); 1084 return innerJoin(group, interf, source); 1085 } 1086 1087 // package-private 1088 void drop(MembershipKeyImpl key) { 1089 assert key.channel() == this; 1090 1091 synchronized (stateLock) { 1092 if (!key.isValid()) 1093 return; 1094 1095 try { 1096 if (key instanceof MembershipKeyImpl.Type6) { 1097 MembershipKeyImpl.Type6 key6 = 1098 (MembershipKeyImpl.Type6)key; 1099 Net.drop6(fd, key6.groupAddress(), key6.index(), key6.source()); 1100 } else { 1101 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4)key; 1102 Net.drop4(fd, key4.groupAddress(), key4.interfaceAddress(), 1103 key4.source()); 1104 } 1105 } catch (IOException ioe) { 1106 // should not happen 1107 throw new AssertionError(ioe); 1108 } 1109 1110 key.invalidate(); 1111 registry.remove(key); 1112 } 1113 } 1114 1115 /** 1116 * Block datagrams from given source if a memory to receive all 1117 * datagrams. 1118 */ 1119 void block(MembershipKeyImpl key, InetAddress source) 1120 throws IOException 1121 { 1122 assert key.channel() == this; 1123 assert key.sourceAddress() == null; 1124 1125 synchronized (stateLock) { 1126 if (!key.isValid()) 1127 throw new IllegalStateException("key is no longer valid"); 1128 if (source.isAnyLocalAddress()) 1129 throw new IllegalArgumentException("Source address is a wildcard address"); 1130 if (source.isMulticastAddress()) 1131 throw new IllegalArgumentException("Source address is multicast address"); 1132 if (source.getClass() != key.group().getClass()) 1133 throw new IllegalArgumentException("Source address is different type to group"); 1134 1135 int n; 1136 if (key instanceof MembershipKeyImpl.Type6) { 1137 MembershipKeyImpl.Type6 key6 = 1138 (MembershipKeyImpl.Type6)key; 1139 n = Net.block6(fd, key6.groupAddress(), key6.index(), 1140 Net.inet6AsByteArray(source)); 1141 } else { 1142 MembershipKeyImpl.Type4 key4 = 1143 (MembershipKeyImpl.Type4)key; 1144 n = Net.block4(fd, key4.groupAddress(), key4.interfaceAddress(), 1145 Net.inet4AsInt(source)); 1146 } 1147 if (n == IOStatus.UNAVAILABLE) { 1148 // ancient kernel 1149 throw new UnsupportedOperationException(); 1150 } 1151 } 1152 } 1153 1154 /** 1155 * Unblock given source. 1156 */ 1157 void unblock(MembershipKeyImpl key, InetAddress source) { 1158 assert key.channel() == this; 1159 assert key.sourceAddress() == null; 1160 1161 synchronized (stateLock) { 1162 if (!key.isValid()) 1163 throw new IllegalStateException("key is no longer valid"); 1164 1165 try { 1166 if (key instanceof MembershipKeyImpl.Type6) { 1167 MembershipKeyImpl.Type6 key6 = 1168 (MembershipKeyImpl.Type6)key; 1169 Net.unblock6(fd, key6.groupAddress(), key6.index(), 1170 Net.inet6AsByteArray(source)); 1171 } else { 1172 MembershipKeyImpl.Type4 key4 = 1173 (MembershipKeyImpl.Type4)key; 1174 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(), 1175 Net.inet4AsInt(source)); 1176 } 1177 } catch (IOException ioe) { 1178 // should not happen 1179 throw new AssertionError(ioe); 1180 } 1181 } 1182 } 1183 1184 /** 1185 * Closes the socket if there are no I/O operations in progress and the 1186 * channel is not registered with a Selector. 1187 */ 1188 private boolean tryClose() throws IOException { 1189 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 1190 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { 1191 state = ST_CLOSED; 1192 try { 1193 // close socket 1194 cleaner.clean(); 1195 } catch (UncheckedIOException ioe) { 1196 throw ioe.getCause(); 1197 } 1198 return true; 1199 } else { 1200 return false; 1201 } 1202 } 1203 1204 /** 1205 * Invokes tryClose to attempt to close the socket. 1206 * 1207 * This method is used for deferred closing by I/O and Selector operations. 1208 */ 1209 private void tryFinishClose() { 1210 try { 1211 tryClose(); 1212 } catch (IOException ignore) { } 1213 } 1214 1215 /** 1216 * Closes this channel when configured in blocking mode. 1217 * 1218 * If there is an I/O operation in progress then the socket is pre-closed 1219 * and the I/O threads signalled, in which case the final close is deferred 1220 * until all I/O operations complete. 1221 */ 1222 private void implCloseBlockingMode() throws IOException { 1223 synchronized (stateLock) { 1224 assert state < ST_CLOSING; 1225 state = ST_CLOSING; 1226 1227 // if member of any multicast groups then invalidate the keys 1228 if (registry != null) 1229 registry.invalidateAll(); 1230 1231 if (!tryClose()) { 1232 long reader = readerThread; 1233 long writer = writerThread; 1234 if (reader != 0 || writer != 0) { 1235 nd.preClose(fd); 1236 if (reader != 0) 1237 NativeThread.signal(reader); 1238 if (writer != 0) 1239 NativeThread.signal(writer); 1240 } 1241 } 1242 } 1243 } 1244 1245 /** 1246 * Closes this channel when configured in non-blocking mode. 1247 * 1248 * If the channel is registered with a Selector then the close is deferred 1249 * until the channel is flushed from all Selectors. 1250 */ 1251 private void implCloseNonBlockingMode() throws IOException { 1252 synchronized (stateLock) { 1253 assert state < ST_CLOSING; 1254 state = ST_CLOSING; 1255 1256 // if member of any multicast groups then invalidate the keys 1257 if (registry != null) 1258 registry.invalidateAll(); 1259 } 1260 1261 // wait for any read/write operations to complete before trying to close 1262 readLock.lock(); 1263 readLock.unlock(); 1264 writeLock.lock(); 1265 writeLock.unlock(); 1266 synchronized (stateLock) { 1267 if (state == ST_CLOSING) { 1268 tryClose(); 1269 } 1270 } 1271 } 1272 1273 /** 1274 * Invoked by implCloseChannel to close the channel. 1275 */ 1276 @Override 1277 protected void implCloseSelectableChannel() throws IOException { 1278 assert !isOpen(); 1279 if (isBlocking()) { 1280 implCloseBlockingMode(); 1281 } else { 1282 implCloseNonBlockingMode(); 1283 } 1284 } 1285 1286 @Override 1287 public void kill() { 1288 synchronized (stateLock) { 1289 if (state == ST_CLOSING) { 1290 tryFinishClose(); 1291 } 1292 } 1293 } 1294 1295 /** 1296 * Translates native poll revent set into a ready operation set 1297 */ 1298 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1299 int intOps = ski.nioInterestOps(); 1300 int oldOps = ski.nioReadyOps(); 1301 int newOps = initialOps; 1302 1303 if ((ops & Net.POLLNVAL) != 0) { 1304 // This should only happen if this channel is pre-closed while a 1305 // selection operation is in progress 1306 // ## Throw an error if this channel has not been pre-closed 1307 return false; 1308 } 1309 1310 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1311 newOps = intOps; 1312 ski.nioReadyOps(newOps); 1313 return (newOps & ~oldOps) != 0; 1314 } 1315 1316 if (((ops & Net.POLLIN) != 0) && 1317 ((intOps & SelectionKey.OP_READ) != 0)) 1318 newOps |= SelectionKey.OP_READ; 1319 1320 if (((ops & Net.POLLOUT) != 0) && 1321 ((intOps & SelectionKey.OP_WRITE) != 0)) 1322 newOps |= SelectionKey.OP_WRITE; 1323 1324 ski.nioReadyOps(newOps); 1325 return (newOps & ~oldOps) != 0; 1326 } 1327 1328 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1329 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1330 } 1331 1332 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1333 return translateReadyOps(ops, 0, ski); 1334 } 1335 1336 /** 1337 * Poll this channel's socket for reading up to the given timeout. 1338 * @return {@code true} if the socket is polled 1339 */ 1340 boolean pollRead(long timeout) throws IOException { 1341 boolean blocking = isBlocking(); 1342 assert Thread.holdsLock(blockingLock()) && blocking; 1343 1344 readLock.lock(); 1345 try { 1346 boolean polled = false; 1347 try { 1348 beginRead(blocking, false); 1349 int events = Net.poll(fd, Net.POLLIN, timeout); 1350 polled = (events != 0); 1351 } finally { 1352 endRead(blocking, polled); 1353 } 1354 return polled; 1355 } finally { 1356 readLock.unlock(); 1357 } 1358 } 1359 1360 /** 1361 * Translates an interest operation set into a native poll event set 1362 */ 1363 public int translateInterestOps(int ops) { 1364 int newOps = 0; 1365 if ((ops & SelectionKey.OP_READ) != 0) 1366 newOps |= Net.POLLIN; 1367 if ((ops & SelectionKey.OP_WRITE) != 0) 1368 newOps |= Net.POLLOUT; 1369 if ((ops & SelectionKey.OP_CONNECT) != 0) 1370 newOps |= Net.POLLIN; 1371 return newOps; 1372 } 1373 1374 public FileDescriptor getFD() { 1375 return fd; 1376 } 1377 1378 public int getFDVal() { 1379 return fdVal; 1380 } 1381 1382 /** 1383 * Returns an action to close the given file descriptor. 1384 */ 1385 private static Runnable closerFor(FileDescriptor fd) { 1386 return () -> { 1387 try { 1388 nd.close(fd); 1389 } catch (IOException ioe) { 1390 throw new UncheckedIOException(ioe); 1391 } finally { 1392 // decrement 1393 ResourceManager.afterUdpClose(); 1394 } 1395 }; 1396 } 1397 1398 // -- Native methods -- 1399 1400 private static native void initIDs(); 1401 1402 private static native void disconnect0(FileDescriptor fd, boolean isIPv6) 1403 throws IOException; 1404 1405 private native int receive0(FileDescriptor fd, long address, int len, 1406 boolean connected) 1407 throws IOException; 1408 1409 private native int send0(boolean preferIPv6, FileDescriptor fd, long address, 1410 int len, InetAddress addr, int port) 1411 throws IOException; 1412 1413 static { 1414 IOUtil.load(); 1415 initIDs(); 1416 } 1417 }