1 /* 2 * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.io.UncheckedIOException; 31 import java.lang.invoke.MethodHandles; 32 import java.lang.invoke.VarHandle; 33 import java.lang.ref.Cleaner.Cleanable; 34 import java.lang.reflect.Method; 35 import java.net.DatagramSocket; 36 import java.net.Inet4Address; 37 import java.net.Inet6Address; 38 import java.net.InetAddress; 39 import java.net.InetSocketAddress; 40 import java.net.NetworkInterface; 41 import java.net.PortUnreachableException; 42 import java.net.ProtocolFamily; 43 import java.net.SocketAddress; 44 import java.net.SocketOption; 45 import java.net.SocketTimeoutException; 46 import java.net.StandardProtocolFamily; 47 import java.net.StandardSocketOptions; 48 import java.nio.ByteBuffer; 49 import java.nio.channels.AlreadyBoundException; 50 import java.nio.channels.AlreadyConnectedException; 51 import java.nio.channels.AsynchronousCloseException; 52 import java.nio.channels.ClosedChannelException; 53 import java.nio.channels.DatagramChannel; 54 import java.nio.channels.IllegalBlockingModeException; 55 import java.nio.channels.MembershipKey; 56 import java.nio.channels.NotYetConnectedException; 57 import java.nio.channels.SelectionKey; 58 import java.nio.channels.spi.AbstractSelectableChannel; 59 import java.nio.channels.spi.SelectorProvider; 60 import java.security.AccessController; 61 import java.security.PrivilegedExceptionAction; 62 import java.util.Collections; 63 import java.util.HashMap; 64 import java.util.HashSet; 65 import java.util.Map; 66 import java.util.Objects; 67 import java.util.Set; 68 import java.util.concurrent.locks.ReentrantLock; 69 import java.util.function.Consumer; 70 71 import jdk.internal.ref.CleanerFactory; 72 import sun.net.ResourceManager; 73 import sun.net.ext.ExtendedSocketOptions; 74 import sun.net.util.IPAddressUtil; 75 76 /** 77 * An implementation of DatagramChannels. 78 */ 79 80 class DatagramChannelImpl 81 extends DatagramChannel 82 implements SelChImpl 83 { 84 // Used to make native read and write calls 85 private static final NativeDispatcher nd = new DatagramDispatcher(); 86 87 // true if interruptible (can be false to emulate legacy DatagramSocket) 88 private final boolean interruptible; 89 90 // The protocol family of the socket 91 private final ProtocolFamily family; 92 93 // Our file descriptor 94 private final FileDescriptor fd; 95 private final int fdVal; 96 97 // Native sockaddrs and cached InetSocketAddress for receive, protected by readLock 98 private NativeSocketAddress sourceSockAddr; 99 private NativeSocketAddress cachedSockAddr; 100 private InetSocketAddress cachedInetSocketAddress; 101 102 // Native sockaddr and cached objects for send, protected by writeLock 103 private final NativeSocketAddress targetSockAddr; 104 private InetSocketAddress previousTarget; 105 private int previousSockAddrLength; 106 107 // Cleaner to close file descriptor and free native socket address 108 private final Cleanable cleaner; 109 110 // Lock held by current reading or connecting thread 111 private final ReentrantLock readLock = new ReentrantLock(); 112 113 // Lock held by current writing or connecting thread 114 private final ReentrantLock writeLock = new ReentrantLock(); 115 116 // Lock held by any thread that modifies the state fields declared below 117 // DO NOT invoke a blocking I/O operation while holding this lock! 118 private final Object stateLock = new Object(); 119 120 // -- The following fields are protected by stateLock 121 122 // State (does not necessarily increase monotonically) 123 private static final int ST_UNCONNECTED = 0; 124 private static final int ST_CONNECTED = 1; 125 private static final int ST_CLOSING = 2; 126 private static final int ST_CLOSED = 3; 127 private int state; 128 129 // IDs of native threads doing reads and writes, for signalling 130 private long readerThread; 131 private long writerThread; 132 133 // Local and remote (connected) address 134 private InetSocketAddress localAddress; 135 private InetSocketAddress remoteAddress; 136 137 // Local address prior to connecting 138 private InetSocketAddress initialLocalAddress; 139 140 // Socket adaptor, created lazily 141 private static final VarHandle SOCKET; 142 static { 143 try { 144 MethodHandles.Lookup l = MethodHandles.lookup(); 145 SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class); 146 } catch (Exception e) { 147 throw new InternalError(e); 148 } 149 } 150 private volatile DatagramSocket socket; 151 152 // Multicast support 153 private MembershipRegistry registry; 154 155 // set true when socket is bound and SO_REUSEADDRESS is emulated 156 private boolean reuseAddressEmulated; 157 158 // set true/false when socket is already bound and SO_REUSEADDR is emulated 159 private boolean isReuseAddress; 160 161 // -- End of fields protected by stateLock 162 163 164 DatagramChannelImpl(SelectorProvider sp, boolean interruptible) throws IOException { 165 this(sp, (Net.isIPv6Available() 166 ? StandardProtocolFamily.INET6 167 : StandardProtocolFamily.INET), 168 interruptible); 169 } 170 171 DatagramChannelImpl(SelectorProvider sp, ProtocolFamily family, boolean interruptible) 172 throws IOException 173 { 174 super(sp); 175 176 Objects.requireNonNull(family, "'family' is null"); 177 if ((family != StandardProtocolFamily.INET) && 178 (family != StandardProtocolFamily.INET6)) { 179 throw new UnsupportedOperationException("Protocol family not supported"); 180 } 181 if (family == StandardProtocolFamily.INET6 && !Net.isIPv6Available()) { 182 throw new UnsupportedOperationException("IPv6 not available"); 183 } 184 185 FileDescriptor fd = null; 186 NativeSocketAddress[] sockAddrs = null; 187 188 ResourceManager.beforeUdpCreate(); 189 boolean initialized = false; 190 try { 191 this.interruptible = interruptible; 192 this.family = family; 193 this.fd = fd = Net.socket(family, false); 194 this.fdVal = IOUtil.fdVal(fd); 195 196 sockAddrs = NativeSocketAddress.allocate(3); 197 readLock.lock(); 198 try { 199 this.sourceSockAddr = sockAddrs[0]; 200 this.cachedSockAddr = sockAddrs[1]; 201 } finally { 202 readLock.unlock(); 203 } 204 this.targetSockAddr = sockAddrs[2]; 205 206 initialized = true; 207 } finally { 208 if (!initialized) { 209 if (sockAddrs != null) NativeSocketAddress.freeAll(sockAddrs); 210 if (fd != null) nd.close(fd); 211 ResourceManager.afterUdpClose(); 212 } 213 } 214 215 Runnable releaser = releaserFor(fd, sockAddrs); 216 this.cleaner = CleanerFactory.cleaner().register(this, releaser); 217 } 218 219 DatagramChannelImpl(SelectorProvider sp, FileDescriptor fd) 220 throws IOException 221 { 222 super(sp); 223 224 NativeSocketAddress[] sockAddrs = null; 225 226 ResourceManager.beforeUdpCreate(); 227 boolean initialized = false; 228 try { 229 this.interruptible = true; 230 this.family = Net.isIPv6Available() 231 ? StandardProtocolFamily.INET6 232 : StandardProtocolFamily.INET; 233 this.fd = fd; 234 this.fdVal = IOUtil.fdVal(fd); 235 236 sockAddrs = NativeSocketAddress.allocate(3); 237 readLock.lock(); 238 try { 239 this.sourceSockAddr = sockAddrs[0]; 240 this.cachedSockAddr = sockAddrs[1]; 241 } finally { 242 readLock.unlock(); 243 } 244 this.targetSockAddr = sockAddrs[2]; 245 246 initialized = true; 247 } finally { 248 if (!initialized) { 249 if (sockAddrs != null) NativeSocketAddress.freeAll(sockAddrs); 250 nd.close(fd); 251 ResourceManager.afterUdpClose(); 252 } 253 } 254 255 Runnable releaser = releaserFor(fd, sockAddrs); 256 this.cleaner = CleanerFactory.cleaner().register(this, releaser); 257 258 synchronized (stateLock) { 259 this.localAddress = Net.localAddress(fd); 260 } 261 } 262 263 // @throws ClosedChannelException if channel is closed 264 private void ensureOpen() throws ClosedChannelException { 265 if (!isOpen()) 266 throw new ClosedChannelException(); 267 } 268 269 @Override 270 public DatagramSocket socket() { 271 DatagramSocket socket = this.socket; 272 if (socket == null) { 273 socket = DatagramSocketAdaptor.create(this); 274 if (!SOCKET.compareAndSet(this, null, socket)) { 275 socket = this.socket; 276 } 277 } 278 return socket; 279 } 280 281 @Override 282 public SocketAddress getLocalAddress() throws IOException { 283 synchronized (stateLock) { 284 ensureOpen(); 285 // Perform security check before returning address 286 return Net.getRevealedLocalAddress(localAddress); 287 } 288 } 289 290 @Override 291 public SocketAddress getRemoteAddress() throws IOException { 292 synchronized (stateLock) { 293 ensureOpen(); 294 return remoteAddress; 295 } 296 } 297 298 /** 299 * Returns the protocol family to specify to set/getSocketOption for the 300 * given socket option. 301 */ 302 private ProtocolFamily familyFor(SocketOption<?> name) { 303 assert Thread.holdsLock(stateLock); 304 305 // unspecified (most options) 306 if (SocketOptionRegistry.findOption(name, Net.UNSPEC) != null) 307 return Net.UNSPEC; 308 309 // IPv4 socket 310 if (family == StandardProtocolFamily.INET) 311 return StandardProtocolFamily.INET; 312 313 // IPv6 socket that is unbound 314 if (localAddress == null) 315 return StandardProtocolFamily.INET6; 316 317 // IPv6 socket bound to wildcard or IPv6 address 318 InetAddress address = localAddress.getAddress(); 319 if (address.isAnyLocalAddress() || (address instanceof Inet6Address)) 320 return StandardProtocolFamily.INET6; 321 322 // IPv6 socket bound to IPv4 address 323 if (Net.canUseIPv6OptionsWithIPv4LocalAddress()) { 324 // IPV6_XXX options can be used 325 return StandardProtocolFamily.INET6; 326 } else { 327 // IPV6_XXX options cannot be used 328 return StandardProtocolFamily.INET; 329 } 330 } 331 332 @Override 333 public <T> DatagramChannel setOption(SocketOption<T> name, T value) 334 throws IOException 335 { 336 Objects.requireNonNull(name); 337 if (!supportedOptions().contains(name)) 338 throw new UnsupportedOperationException("'" + name + "' not supported"); 339 if (!name.type().isInstance(value)) 340 throw new IllegalArgumentException("Invalid value '" + value + "'"); 341 342 synchronized (stateLock) { 343 ensureOpen(); 344 345 ProtocolFamily family = familyFor(name); 346 347 // Some platforms require both IPV6_XXX and IP_XXX socket options to 348 // be set when the channel's socket is IPv6 and it is used to send 349 // IPv4 multicast datagrams. The IP_XXX socket options are set on a 350 // best effort basis. 351 boolean needToSetIPv4Option = (family != Net.UNSPEC) 352 && (this.family == StandardProtocolFamily.INET6) 353 && Net.shouldSetBothIPv4AndIPv6Options(); 354 355 // outgoing multicast interface 356 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 357 assert family != Net.UNSPEC; 358 NetworkInterface interf = (NetworkInterface) value; 359 if (family == StandardProtocolFamily.INET6) { 360 int index = interf.getIndex(); 361 if (index == -1) 362 throw new IOException("Network interface cannot be identified"); 363 Net.setInterface6(fd, index); 364 } 365 if (family == StandardProtocolFamily.INET || needToSetIPv4Option) { 366 // need IPv4 address to identify interface 367 Inet4Address target = Net.anyInet4Address(interf); 368 if (target != null) { 369 try { 370 Net.setInterface4(fd, Net.inet4AsInt(target)); 371 } catch (IOException ioe) { 372 if (family == StandardProtocolFamily.INET) throw ioe; 373 } 374 } else if (family == StandardProtocolFamily.INET) { 375 throw new IOException("Network interface not configured for IPv4"); 376 } 377 } 378 return this; 379 } 380 381 // SO_REUSEADDR needs special handling as it may be emulated 382 if (name == StandardSocketOptions.SO_REUSEADDR 383 && Net.useExclusiveBind() && localAddress != null) { 384 reuseAddressEmulated = true; 385 this.isReuseAddress = (Boolean)value; 386 } 387 388 // remaining options don't need any special handling 389 Net.setSocketOption(fd, family, name, value); 390 if (needToSetIPv4Option && family != StandardProtocolFamily.INET) { 391 try { 392 Net.setSocketOption(fd, StandardProtocolFamily.INET, name, value); 393 } catch (IOException ignore) { } 394 } 395 396 return this; 397 } 398 } 399 400 @Override 401 @SuppressWarnings("unchecked") 402 public <T> T getOption(SocketOption<T> name) 403 throws IOException 404 { 405 Objects.requireNonNull(name); 406 if (!supportedOptions().contains(name)) 407 throw new UnsupportedOperationException("'" + name + "' not supported"); 408 409 synchronized (stateLock) { 410 ensureOpen(); 411 412 ProtocolFamily family = familyFor(name); 413 414 if (name == StandardSocketOptions.IP_MULTICAST_IF) { 415 if (family == StandardProtocolFamily.INET) { 416 int address = Net.getInterface4(fd); 417 if (address == 0) 418 return null; // default interface 419 420 InetAddress ia = Net.inet4FromInt(address); 421 NetworkInterface ni = NetworkInterface.getByInetAddress(ia); 422 if (ni == null) 423 throw new IOException("Unable to map address to interface"); 424 return (T) ni; 425 } else { 426 int index = Net.getInterface6(fd); 427 if (index == 0) 428 return null; // default interface 429 430 NetworkInterface ni = NetworkInterface.getByIndex(index); 431 if (ni == null) 432 throw new IOException("Unable to map index to interface"); 433 return (T) ni; 434 } 435 } 436 437 if (name == StandardSocketOptions.SO_REUSEADDR && reuseAddressEmulated) { 438 return (T) Boolean.valueOf(isReuseAddress); 439 } 440 441 // no special handling 442 return (T) Net.getSocketOption(fd, family, name); 443 } 444 } 445 446 private static class DefaultOptionsHolder { 447 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 448 449 private static Set<SocketOption<?>> defaultOptions() { 450 HashSet<SocketOption<?>> set = new HashSet<>(); 451 set.add(StandardSocketOptions.SO_SNDBUF); 452 set.add(StandardSocketOptions.SO_RCVBUF); 453 set.add(StandardSocketOptions.SO_REUSEADDR); 454 if (Net.isReusePortAvailable()) { 455 set.add(StandardSocketOptions.SO_REUSEPORT); 456 } 457 set.add(StandardSocketOptions.SO_BROADCAST); 458 set.add(StandardSocketOptions.IP_TOS); 459 set.add(StandardSocketOptions.IP_MULTICAST_IF); 460 set.add(StandardSocketOptions.IP_MULTICAST_TTL); 461 set.add(StandardSocketOptions.IP_MULTICAST_LOOP); 462 set.addAll(ExtendedSocketOptions.datagramSocketOptions()); 463 return Collections.unmodifiableSet(set); 464 } 465 } 466 467 @Override 468 public final Set<SocketOption<?>> supportedOptions() { 469 return DefaultOptionsHolder.defaultOptions; 470 } 471 472 /** 473 * Marks the beginning of a read operation that might block. 474 * 475 * @param blocking true if configured blocking 476 * @param mustBeConnected true if the socket must be connected 477 * @return remote address if connected 478 * @throws ClosedChannelException if the channel is closed 479 * @throws NotYetConnectedException if mustBeConnected and not connected 480 * @throws IOException if socket not bound and cannot be bound 481 */ 482 private SocketAddress beginRead(boolean blocking, boolean mustBeConnected) 483 throws IOException 484 { 485 if (blocking && interruptible) { 486 // set hook for Thread.interrupt 487 begin(); 488 } 489 SocketAddress remote; 490 synchronized (stateLock) { 491 ensureOpen(); 492 remote = remoteAddress; 493 if ((remote == null) && mustBeConnected) 494 throw new NotYetConnectedException(); 495 if (localAddress == null) 496 bindInternal(null); 497 if (blocking) 498 readerThread = NativeThread.current(); 499 } 500 return remote; 501 } 502 503 /** 504 * Marks the end of a read operation that may have blocked. 505 * 506 * @throws AsynchronousCloseException if the channel was closed asynchronously 507 */ 508 private void endRead(boolean blocking, boolean completed) 509 throws AsynchronousCloseException 510 { 511 if (blocking) { 512 synchronized (stateLock) { 513 readerThread = 0; 514 if (state == ST_CLOSING) { 515 tryFinishClose(); 516 } 517 } 518 if (interruptible) { 519 // remove hook for Thread.interrupt (may throw AsynchronousCloseException) 520 end(completed); 521 } else if (!completed && !isOpen()) { 522 throw new AsynchronousCloseException(); 523 } 524 } 525 } 526 527 @Override 528 public SocketAddress receive(ByteBuffer dst) throws IOException { 529 if (dst.isReadOnly()) 530 throw new IllegalArgumentException("Read-only buffer"); 531 readLock.lock(); 532 try { 533 boolean blocking = isBlocking(); 534 SocketAddress sender = null; 535 try { 536 SocketAddress remote = beginRead(blocking, false); 537 boolean connected = (remote != null); 538 SecurityManager sm = System.getSecurityManager(); 539 if (connected || (sm == null)) { 540 // connected or no security manager 541 int n = receive(dst, connected); 542 if (blocking) { 543 while (IOStatus.okayToRetry(n) && isOpen()) { 544 park(Net.POLLIN); 545 n = receive(dst, connected); 546 } 547 } 548 if (n >= 0) { 549 // sender address is in socket address buffer 550 sender = sourceSocketAddress(); 551 } 552 } else { 553 // security manager and unconnected 554 sender = untrustedReceive(dst); 555 } 556 return sender; 557 } finally { 558 endRead(blocking, (sender != null)); 559 } 560 } finally { 561 readLock.unlock(); 562 } 563 } 564 565 /** 566 * Receives a datagram into an untrusted buffer. When there is a security 567 * manager set, and the socket is not connected, datagrams have to be received 568 * into a buffer that is not accessible to the user. The datagram is copied 569 * into the user's buffer when the sender address is accepted by the security 570 * manager. 571 */ 572 private SocketAddress untrustedReceive(ByteBuffer dst) throws IOException { 573 SecurityManager sm = System.getSecurityManager(); 574 assert readLock.isHeldByCurrentThread() 575 && sm != null && remoteAddress == null; 576 577 ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining()); 578 try { 579 boolean blocking = isBlocking(); 580 for (;;) { 581 int n = receive(bb, false); 582 if (blocking) { 583 while (IOStatus.okayToRetry(n) && isOpen()) { 584 park(Net.POLLIN); 585 n = receive(bb, false); 586 } 587 } 588 if (n >= 0) { 589 // sender address is in socket address buffer 590 InetSocketAddress isa = sourceSocketAddress(); 591 try { 592 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); 593 bb.flip(); 594 dst.put(bb); 595 return isa; 596 } catch (SecurityException se) { 597 // ignore datagram 598 bb.clear(); 599 } 600 } else { 601 return null; 602 } 603 } 604 } finally { 605 Util.releaseTemporaryDirectBuffer(bb); 606 } 607 } 608 609 /** 610 * Receives a datagram into the given buffer. 611 * 612 * @apiNote This method is for use by the socket adaptor. The buffer is 613 * assumed to be trusted, meaning it is not accessible to user code. 614 * 615 * @throws IllegalBlockingModeException if the channel is non-blocking 616 * @throws SocketTimeoutException if the timeout elapses 617 */ 618 SocketAddress blockingReceive(ByteBuffer dst, long nanos) throws IOException { 619 readLock.lock(); 620 try { 621 ensureOpen(); 622 if (!isBlocking()) 623 throw new IllegalBlockingModeException(); 624 SecurityManager sm = System.getSecurityManager(); 625 boolean connected = isConnected(); 626 SocketAddress sender; 627 do { 628 if (nanos > 0) { 629 sender = trustedBlockingReceive(dst, nanos); 630 } else { 631 sender = trustedBlockingReceive(dst); 632 } 633 // check sender when security manager set and not connected 634 if (sm != null && !connected) { 635 InetSocketAddress isa = (InetSocketAddress) sender; 636 try { 637 sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); 638 } catch (SecurityException e) { 639 sender = null; 640 } 641 } 642 } while (sender == null); 643 return sender; 644 } finally { 645 readLock.unlock(); 646 } 647 } 648 649 /** 650 * Receives a datagram into given buffer. This method is used to support 651 * the socket adaptor. The buffer is assumed to be trusted. 652 * @throws SocketTimeoutException if the timeout elapses 653 */ 654 private SocketAddress trustedBlockingReceive(ByteBuffer dst) 655 throws IOException 656 { 657 assert readLock.isHeldByCurrentThread() && isBlocking(); 658 SocketAddress sender = null; 659 try { 660 SocketAddress remote = beginRead(true, false); 661 boolean connected = (remote != null); 662 int n = receive(dst, connected); 663 while (IOStatus.okayToRetry(n) && isOpen()) { 664 park(Net.POLLIN); 665 n = receive(dst, connected); 666 } 667 if (n >= 0) { 668 // sender address is in socket address buffer 669 sender = sourceSocketAddress(); 670 } 671 return sender; 672 } finally { 673 endRead(true, (sender != null)); 674 } 675 } 676 677 /** 678 * Receives a datagram into given buffer with a timeout. This method is 679 * used to support the socket adaptor. The buffer is assumed to be trusted. 680 * @throws SocketTimeoutException if the timeout elapses 681 */ 682 private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos) 683 throws IOException 684 { 685 assert readLock.isHeldByCurrentThread() && isBlocking(); 686 SocketAddress sender = null; 687 try { 688 SocketAddress remote = beginRead(true, false); 689 boolean connected = (remote != null); 690 691 // change socket to non-blocking 692 lockedConfigureBlocking(false); 693 try { 694 long startNanos = System.nanoTime(); 695 int n = receive(dst, connected); 696 while (n == IOStatus.UNAVAILABLE && isOpen()) { 697 long remainingNanos = nanos - (System.nanoTime() - startNanos); 698 if (remainingNanos <= 0) { 699 throw new SocketTimeoutException("Receive timed out"); 700 } 701 park(Net.POLLIN, remainingNanos); 702 n = receive(dst, connected); 703 } 704 if (n >= 0) { 705 // sender address is in socket address buffer 706 sender = sourceSocketAddress(); 707 } 708 return sender; 709 } finally { 710 // restore socket to blocking mode (if channel is open) 711 tryLockedConfigureBlocking(true); 712 } 713 } finally { 714 endRead(true, (sender != null)); 715 } 716 } 717 718 private int receive(ByteBuffer dst, boolean connected) throws IOException { 719 int pos = dst.position(); 720 int lim = dst.limit(); 721 assert (pos <= lim); 722 int rem = (pos <= lim ? lim - pos : 0); 723 if (dst instanceof DirectBuffer && rem > 0) 724 return receiveIntoNativeBuffer(dst, rem, pos, connected); 725 726 // Substitute a native buffer. If the supplied buffer is empty 727 // we must instead use a nonempty buffer, otherwise the call 728 // will not block waiting for a datagram on some platforms. 729 int newSize = Math.max(rem, 1); 730 ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); 731 try { 732 int n = receiveIntoNativeBuffer(bb, newSize, 0, connected); 733 bb.flip(); 734 if (n > 0 && rem > 0) 735 dst.put(bb); 736 return n; 737 } finally { 738 Util.releaseTemporaryDirectBuffer(bb); 739 } 740 } 741 742 private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos, 743 boolean connected) 744 throws IOException 745 { 746 int n = receive0(fd, 747 ((DirectBuffer)bb).address() + pos, rem, 748 sourceSockAddr.address(), 749 connected); 750 if (n > 0) 751 bb.position(pos + n); 752 return n; 753 } 754 755 /** 756 * Return an InetSocketAddress to represent the source/sender socket address 757 * in sourceSockAddr. Returns the cached InetSocketAddress if the source 758 * address is the same as the cached address. 759 */ 760 private InetSocketAddress sourceSocketAddress() throws IOException { 761 assert readLock.isHeldByCurrentThread(); 762 if (cachedInetSocketAddress != null && sourceSockAddr.equals(cachedSockAddr)) { 763 return cachedInetSocketAddress; 764 } 765 InetSocketAddress isa = sourceSockAddr.decode(); 766 // swap sourceSockAddr and cachedSockAddr 767 NativeSocketAddress tmp = cachedSockAddr; 768 cachedSockAddr = sourceSockAddr; 769 sourceSockAddr = tmp; 770 cachedInetSocketAddress = isa; 771 return isa; 772 } 773 774 @Override 775 public int send(ByteBuffer src, SocketAddress target) 776 throws IOException 777 { 778 Objects.requireNonNull(src); 779 InetSocketAddress isa = Net.checkAddress(target, family); 780 781 writeLock.lock(); 782 try { 783 boolean blocking = isBlocking(); 784 int n; 785 boolean completed = false; 786 try { 787 SocketAddress remote = beginWrite(blocking, false); 788 if (remote != null) { 789 // connected 790 if (!target.equals(remote)) { 791 throw new AlreadyConnectedException(); 792 } 793 n = IOUtil.write(fd, src, -1, nd); 794 if (blocking) { 795 while (IOStatus.okayToRetry(n) && isOpen()) { 796 park(Net.POLLOUT); 797 n = IOUtil.write(fd, src, -1, nd); 798 } 799 } 800 completed = (n > 0); 801 } else { 802 // not connected 803 SecurityManager sm = System.getSecurityManager(); 804 InetAddress ia = isa.getAddress(); 805 if (sm != null) { 806 if (ia.isMulticastAddress()) { 807 sm.checkMulticast(ia); 808 } else { 809 sm.checkConnect(ia.getHostAddress(), isa.getPort()); 810 } 811 } 812 if (ia.isLinkLocalAddress()) 813 isa = IPAddressUtil.toScopedAddress(isa); 814 n = send(fd, src, isa); 815 if (blocking) { 816 while (IOStatus.okayToRetry(n) && isOpen()) { 817 park(Net.POLLOUT); 818 n = send(fd, src, isa); 819 } 820 } 821 completed = (n >= 0); 822 } 823 } finally { 824 endWrite(blocking, completed); 825 } 826 assert n >= 0 || n == IOStatus.UNAVAILABLE; 827 return IOStatus.normalize(n); 828 } finally { 829 writeLock.unlock(); 830 } 831 } 832 833 /** 834 * Sends a datagram from the bytes in given buffer. 835 * 836 * @apiNote This method is for use by the socket adaptor. 837 * 838 * @throws IllegalBlockingModeException if the channel is non-blocking 839 */ 840 void blockingSend(ByteBuffer src, SocketAddress target) throws IOException { 841 writeLock.lock(); 842 try { 843 ensureOpen(); 844 if (!isBlocking()) 845 throw new IllegalBlockingModeException(); 846 send(src, target); 847 } finally { 848 writeLock.unlock(); 849 } 850 } 851 852 private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target) 853 throws IOException 854 { 855 if (src instanceof DirectBuffer) 856 return sendFromNativeBuffer(fd, src, target); 857 858 // Substitute a native buffer 859 int pos = src.position(); 860 int lim = src.limit(); 861 assert (pos <= lim); 862 int rem = (pos <= lim ? lim - pos : 0); 863 864 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); 865 try { 866 bb.put(src); 867 bb.flip(); 868 // Do not update src until we see how many bytes were written 869 src.position(pos); 870 871 int n = sendFromNativeBuffer(fd, bb, target); 872 if (n > 0) { 873 // now update src 874 src.position(pos + n); 875 } 876 return n; 877 } finally { 878 Util.releaseTemporaryDirectBuffer(bb); 879 } 880 } 881 882 private int sendFromNativeBuffer(FileDescriptor fd, ByteBuffer bb, 883 InetSocketAddress target) 884 throws IOException 885 { 886 int pos = bb.position(); 887 int lim = bb.limit(); 888 assert (pos <= lim); 889 int rem = (pos <= lim ? lim - pos : 0); 890 891 int written; 892 try { 893 int addressLen = targetSocketAddress(target); 894 written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, 895 targetSockAddr.address(), addressLen); 896 } catch (PortUnreachableException pue) { 897 if (isConnected()) 898 throw pue; 899 written = rem; 900 } 901 if (written > 0) 902 bb.position(pos + written); 903 return written; 904 } 905 906 /** 907 * Encodes the given InetSocketAddress into targetSockAddr, returning the 908 * length of the sockaddr structure (sizeof struct sockaddr or sockaddr6). 909 */ 910 private int targetSocketAddress(InetSocketAddress isa) { 911 assert writeLock.isHeldByCurrentThread(); 912 // Nothing to do if target address is already in the buffer. Use 913 // identity rather than equals as Inet6Address.equals ignores scope_id. 914 if (isa == previousTarget) 915 return previousSockAddrLength; 916 previousTarget = null; 917 int len = targetSockAddr.encode(family, isa); 918 previousTarget = isa; 919 previousSockAddrLength = len; 920 return len; 921 } 922 923 @Override 924 public int read(ByteBuffer buf) throws IOException { 925 Objects.requireNonNull(buf); 926 927 readLock.lock(); 928 try { 929 boolean blocking = isBlocking(); 930 int n = 0; 931 try { 932 beginRead(blocking, true); 933 n = IOUtil.read(fd, buf, -1, nd); 934 if (blocking) { 935 while (IOStatus.okayToRetry(n) && isOpen()) { 936 park(Net.POLLIN); 937 n = IOUtil.read(fd, buf, -1, nd); 938 } 939 } 940 } finally { 941 endRead(blocking, n > 0); 942 assert IOStatus.check(n); 943 } 944 return IOStatus.normalize(n); 945 } finally { 946 readLock.unlock(); 947 } 948 } 949 950 @Override 951 public long read(ByteBuffer[] dsts, int offset, int length) 952 throws IOException 953 { 954 Objects.checkFromIndexSize(offset, length, dsts.length); 955 956 readLock.lock(); 957 try { 958 boolean blocking = isBlocking(); 959 long n = 0; 960 try { 961 beginRead(blocking, true); 962 n = IOUtil.read(fd, dsts, offset, length, nd); 963 if (blocking) { 964 while (IOStatus.okayToRetry(n) && isOpen()) { 965 park(Net.POLLIN); 966 n = IOUtil.read(fd, dsts, offset, length, nd); 967 } 968 } 969 } finally { 970 endRead(blocking, n > 0); 971 assert IOStatus.check(n); 972 } 973 return IOStatus.normalize(n); 974 } finally { 975 readLock.unlock(); 976 } 977 } 978 979 /** 980 * Marks the beginning of a write operation that might block. 981 * @param blocking true if configured blocking 982 * @param mustBeConnected true if the socket must be connected 983 * @return remote address if connected 984 * @throws ClosedChannelException if the channel is closed 985 * @throws NotYetConnectedException if mustBeConnected and not connected 986 * @throws IOException if socket not bound and cannot be bound 987 */ 988 private SocketAddress beginWrite(boolean blocking, boolean mustBeConnected) 989 throws IOException 990 { 991 if (blocking && interruptible) { 992 // set hook for Thread.interrupt 993 begin(); 994 } 995 SocketAddress remote; 996 synchronized (stateLock) { 997 ensureOpen(); 998 remote = remoteAddress; 999 if ((remote == null) && mustBeConnected) 1000 throw new NotYetConnectedException(); 1001 if (localAddress == null) 1002 bindInternal(null); 1003 if (blocking) 1004 writerThread = NativeThread.current(); 1005 } 1006 return remote; 1007 } 1008 1009 /** 1010 * Marks the end of a write operation that may have blocked. 1011 * 1012 * @throws AsynchronousCloseException if the channel was closed asynchronously 1013 */ 1014 private void endWrite(boolean blocking, boolean completed) 1015 throws AsynchronousCloseException 1016 { 1017 if (blocking) { 1018 synchronized (stateLock) { 1019 writerThread = 0; 1020 if (state == ST_CLOSING) { 1021 tryFinishClose(); 1022 } 1023 } 1024 1025 if (interruptible) { 1026 // remove hook for Thread.interrupt (may throw AsynchronousCloseException) 1027 end(completed); 1028 } else if (!completed && !isOpen()) { 1029 throw new AsynchronousCloseException(); 1030 } 1031 } 1032 } 1033 1034 @Override 1035 public int write(ByteBuffer buf) throws IOException { 1036 Objects.requireNonNull(buf); 1037 1038 writeLock.lock(); 1039 try { 1040 boolean blocking = isBlocking(); 1041 int n = 0; 1042 try { 1043 beginWrite(blocking, true); 1044 n = IOUtil.write(fd, buf, -1, nd); 1045 if (blocking) { 1046 while (IOStatus.okayToRetry(n) && isOpen()) { 1047 park(Net.POLLOUT); 1048 n = IOUtil.write(fd, buf, -1, nd); 1049 } 1050 } 1051 } finally { 1052 endWrite(blocking, n > 0); 1053 assert IOStatus.check(n); 1054 } 1055 return IOStatus.normalize(n); 1056 } finally { 1057 writeLock.unlock(); 1058 } 1059 } 1060 1061 @Override 1062 public long write(ByteBuffer[] srcs, int offset, int length) 1063 throws IOException 1064 { 1065 Objects.checkFromIndexSize(offset, length, srcs.length); 1066 1067 writeLock.lock(); 1068 try { 1069 boolean blocking = isBlocking(); 1070 long n = 0; 1071 try { 1072 beginWrite(blocking, true); 1073 n = IOUtil.write(fd, srcs, offset, length, nd); 1074 if (blocking) { 1075 while (IOStatus.okayToRetry(n) && isOpen()) { 1076 park(Net.POLLOUT); 1077 n = IOUtil.write(fd, srcs, offset, length, nd); 1078 } 1079 } 1080 } finally { 1081 endWrite(blocking, n > 0); 1082 assert IOStatus.check(n); 1083 } 1084 return IOStatus.normalize(n); 1085 } finally { 1086 writeLock.unlock(); 1087 } 1088 } 1089 1090 @Override 1091 protected void implConfigureBlocking(boolean block) throws IOException { 1092 readLock.lock(); 1093 try { 1094 writeLock.lock(); 1095 try { 1096 lockedConfigureBlocking(block); 1097 } finally { 1098 writeLock.unlock(); 1099 } 1100 } finally { 1101 readLock.unlock(); 1102 } 1103 } 1104 1105 /** 1106 * Adjusts the blocking mode. readLock or writeLock must already be held. 1107 */ 1108 private void lockedConfigureBlocking(boolean block) throws IOException { 1109 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 1110 synchronized (stateLock) { 1111 ensureOpen(); 1112 IOUtil.configureBlocking(fd, block); 1113 } 1114 } 1115 1116 /** 1117 * Adjusts the blocking mode if the channel is open. readLock or writeLock 1118 * must already be held. 1119 * 1120 * @return {@code true} if the blocking mode was adjusted, {@code false} if 1121 * the blocking mode was not adjusted because the channel is closed 1122 */ 1123 private boolean tryLockedConfigureBlocking(boolean block) throws IOException { 1124 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 1125 synchronized (stateLock) { 1126 if (isOpen()) { 1127 IOUtil.configureBlocking(fd, block); 1128 return true; 1129 } else { 1130 return false; 1131 } 1132 } 1133 } 1134 1135 InetSocketAddress localAddress() { 1136 synchronized (stateLock) { 1137 return localAddress; 1138 } 1139 } 1140 1141 InetSocketAddress remoteAddress() { 1142 synchronized (stateLock) { 1143 return remoteAddress; 1144 } 1145 } 1146 1147 @Override 1148 public DatagramChannel bind(SocketAddress local) throws IOException { 1149 readLock.lock(); 1150 try { 1151 writeLock.lock(); 1152 try { 1153 synchronized (stateLock) { 1154 ensureOpen(); 1155 if (localAddress != null) 1156 throw new AlreadyBoundException(); 1157 bindInternal(local); 1158 } 1159 } finally { 1160 writeLock.unlock(); 1161 } 1162 } finally { 1163 readLock.unlock(); 1164 } 1165 return this; 1166 } 1167 1168 private void bindInternal(SocketAddress local) throws IOException { 1169 assert Thread.holdsLock(stateLock )&& (localAddress == null); 1170 1171 InetSocketAddress isa; 1172 if (local == null) { 1173 // only Inet4Address allowed with IPv4 socket 1174 if (family == StandardProtocolFamily.INET) { 1175 isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0); 1176 } else { 1177 isa = new InetSocketAddress(0); 1178 } 1179 } else { 1180 isa = Net.checkAddress(local, family); 1181 } 1182 SecurityManager sm = System.getSecurityManager(); 1183 if (sm != null) 1184 sm.checkListen(isa.getPort()); 1185 1186 Net.bind(family, fd, isa.getAddress(), isa.getPort()); 1187 localAddress = Net.localAddress(fd); 1188 } 1189 1190 @Override 1191 public boolean isConnected() { 1192 synchronized (stateLock) { 1193 return (state == ST_CONNECTED); 1194 } 1195 } 1196 1197 @Override 1198 public DatagramChannel connect(SocketAddress sa) throws IOException { 1199 return connect(sa, true); 1200 } 1201 1202 /** 1203 * Connects the channel's socket. 1204 * 1205 * @param sa the remote address to which this channel is to be connected 1206 * @param check true to check if the channel is already connected. 1207 */ 1208 DatagramChannel connect(SocketAddress sa, boolean check) throws IOException { 1209 InetSocketAddress isa = Net.checkAddress(sa, family); 1210 SecurityManager sm = System.getSecurityManager(); 1211 if (sm != null) { 1212 InetAddress ia = isa.getAddress(); 1213 if (ia.isMulticastAddress()) { 1214 sm.checkMulticast(ia); 1215 } else { 1216 sm.checkConnect(ia.getHostAddress(), isa.getPort()); 1217 sm.checkAccept(ia.getHostAddress(), isa.getPort()); 1218 } 1219 } 1220 1221 readLock.lock(); 1222 try { 1223 writeLock.lock(); 1224 try { 1225 synchronized (stateLock) { 1226 ensureOpen(); 1227 if (check && state == ST_CONNECTED) 1228 throw new AlreadyConnectedException(); 1229 1230 // ensure that the socket is bound 1231 if (localAddress == null) { 1232 bindInternal(null); 1233 } 1234 1235 // capture local address before connect 1236 initialLocalAddress = localAddress; 1237 1238 int n = Net.connect(family, 1239 fd, 1240 isa.getAddress(), 1241 isa.getPort()); 1242 if (n <= 0) 1243 throw new Error(); // Can't happen 1244 1245 // connected 1246 remoteAddress = isa; 1247 state = ST_CONNECTED; 1248 1249 // refresh local address 1250 localAddress = Net.localAddress(fd); 1251 1252 // flush any packets already received. 1253 boolean blocking = isBlocking(); 1254 if (blocking) { 1255 IOUtil.configureBlocking(fd, false); 1256 } 1257 try { 1258 ByteBuffer buf = ByteBuffer.allocate(100); 1259 while (receive(buf, false) >= 0) { 1260 buf.clear(); 1261 } 1262 } finally { 1263 if (blocking) { 1264 IOUtil.configureBlocking(fd, true); 1265 } 1266 } 1267 } 1268 } finally { 1269 writeLock.unlock(); 1270 } 1271 } finally { 1272 readLock.unlock(); 1273 } 1274 return this; 1275 } 1276 1277 @Override 1278 public DatagramChannel disconnect() throws IOException { 1279 readLock.lock(); 1280 try { 1281 writeLock.lock(); 1282 try { 1283 synchronized (stateLock) { 1284 if (!isOpen() || (state != ST_CONNECTED)) 1285 return this; 1286 1287 // disconnect socket 1288 boolean isIPv6 = (family == StandardProtocolFamily.INET6); 1289 disconnect0(fd, isIPv6); 1290 1291 // no longer connected 1292 remoteAddress = null; 1293 state = ST_UNCONNECTED; 1294 1295 // refresh localAddress, should be same as it was prior to connect 1296 localAddress = Net.localAddress(fd); 1297 try { 1298 if (!localAddress.equals(initialLocalAddress)) { 1299 // Workaround connect(2) issues on Linux and macOS 1300 repairSocket(initialLocalAddress); 1301 assert (localAddress != null) 1302 && localAddress.equals(Net.localAddress(fd)) 1303 && localAddress.equals(initialLocalAddress); 1304 } 1305 } finally { 1306 initialLocalAddress = null; 1307 } 1308 } 1309 } finally { 1310 writeLock.unlock(); 1311 } 1312 } finally { 1313 readLock.unlock(); 1314 } 1315 return this; 1316 } 1317 1318 /** 1319 * "Repair" the channel's socket after a disconnect that didn't restore the 1320 * local address. 1321 * 1322 * On Linux, connect(2) dissolves the association but changes the local port 1323 * to 0 when it was initially bound to an ephemeral port. The workaround here 1324 * is to rebind to the original port. 1325 * 1326 * On macOS, connect(2) dissolves the association but rebinds the socket to 1327 * the wildcard address when it was initially bound to a specific address. 1328 * The workaround here is to re-create the socket. 1329 */ 1330 private void repairSocket(InetSocketAddress target) 1331 throws IOException 1332 { 1333 assert Thread.holdsLock(stateLock); 1334 1335 // Linux: try to bind the socket to the original address/port 1336 if (localAddress.getPort() == 0) { 1337 assert localAddress.getAddress().equals(target.getAddress()); 1338 Net.bind(family, fd, target.getAddress(), target.getPort()); 1339 localAddress = Net.localAddress(fd); 1340 return; 1341 } 1342 1343 // capture the value of all existing socket options 1344 Map<SocketOption<?>, Object> map = new HashMap<>(); 1345 for (SocketOption<?> option : supportedOptions()) { 1346 Object value = getOption(option); 1347 if (value != null) { 1348 map.put(option, value); 1349 } 1350 } 1351 1352 // macOS: re-create the socket. 1353 FileDescriptor newfd = Net.socket(family, false); 1354 try { 1355 // copy the socket options that are protocol family agnostic 1356 for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) { 1357 SocketOption<?> option = e.getKey(); 1358 if (SocketOptionRegistry.findOption(option, Net.UNSPEC) != null) { 1359 Object value = e.getValue(); 1360 try { 1361 Net.setSocketOption(newfd, Net.UNSPEC, option, value); 1362 } catch (IOException ignore) { } 1363 } 1364 } 1365 1366 // copy the blocking mode 1367 if (!isBlocking()) { 1368 IOUtil.configureBlocking(newfd, false); 1369 } 1370 1371 // dup this channel's socket to the new socket. If this succeeds then 1372 // fd will reference the new socket. If it fails then it will still 1373 // reference the old socket. 1374 nd.dup(newfd, fd); 1375 } finally { 1376 // release the file descriptor 1377 nd.close(newfd); 1378 } 1379 1380 // bind to the original local address 1381 try { 1382 Net.bind(family, fd, target.getAddress(), target.getPort()); 1383 } catch (IOException ioe) { 1384 // bind failed, socket is left unbound 1385 localAddress = null; 1386 throw ioe; 1387 } 1388 1389 // restore local address 1390 localAddress = Net.localAddress(fd); 1391 1392 // restore all socket options (including those set in first pass) 1393 for (Map.Entry<SocketOption<?>, Object> e : map.entrySet()) { 1394 @SuppressWarnings("unchecked") 1395 SocketOption<Object> option = (SocketOption<Object>) e.getKey(); 1396 Object value = e.getValue(); 1397 try { 1398 setOption(option, value); 1399 } catch (IOException ignore) { } 1400 } 1401 1402 // restore multicast group membership 1403 MembershipRegistry registry = this.registry; 1404 if (registry != null) { 1405 registry.forEach(k -> { 1406 if (k instanceof MembershipKeyImpl.Type6) { 1407 MembershipKeyImpl.Type6 key6 = (MembershipKeyImpl.Type6) k; 1408 Net.join6(fd, key6.groupAddress(), key6.index(), key6.source()); 1409 } else { 1410 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4) k; 1411 Net.join4(fd, key4.groupAddress(), key4.interfaceAddress(), key4.source()); 1412 } 1413 }); 1414 } 1415 1416 // reset registration in all Selectors that this channel is registered with 1417 AbstractSelectableChannels.forEach(this, SelectionKeyImpl::reset); 1418 } 1419 1420 /** 1421 * Defines static methods to access AbstractSelectableChannel non-public members. 1422 */ 1423 private static class AbstractSelectableChannels { 1424 private static final Method FOREACH; 1425 static { 1426 try { 1427 PrivilegedExceptionAction<Method> pae = () -> { 1428 Method m = AbstractSelectableChannel.class.getDeclaredMethod("forEach", Consumer.class); 1429 m.setAccessible(true); 1430 return m; 1431 }; 1432 FOREACH = AccessController.doPrivileged(pae); 1433 } catch (Exception e) { 1434 throw new InternalError(e); 1435 } 1436 } 1437 static void forEach(AbstractSelectableChannel ch, Consumer<SelectionKeyImpl> action) { 1438 try { 1439 FOREACH.invoke(ch, action); 1440 } catch (Exception e) { 1441 throw new InternalError(e); 1442 } 1443 } 1444 } 1445 1446 /** 1447 * Joins channel's socket to the given group/interface and 1448 * optional source address. 1449 */ 1450 private MembershipKey innerJoin(InetAddress group, 1451 NetworkInterface interf, 1452 InetAddress source) 1453 throws IOException 1454 { 1455 if (!group.isMulticastAddress()) 1456 throw new IllegalArgumentException("Group not a multicast address"); 1457 1458 // check multicast address is compatible with this socket 1459 if (group instanceof Inet4Address) { 1460 if (family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group()) 1461 throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group"); 1462 } else if (group instanceof Inet6Address) { 1463 if (family != StandardProtocolFamily.INET6) 1464 throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group"); 1465 } else { 1466 throw new IllegalArgumentException("Address type not supported"); 1467 } 1468 1469 // check source address 1470 if (source != null) { 1471 if (source.isAnyLocalAddress()) 1472 throw new IllegalArgumentException("Source address is a wildcard address"); 1473 if (source.isMulticastAddress()) 1474 throw new IllegalArgumentException("Source address is multicast address"); 1475 if (source.getClass() != group.getClass()) 1476 throw new IllegalArgumentException("Source address is different type to group"); 1477 } 1478 1479 SecurityManager sm = System.getSecurityManager(); 1480 if (sm != null) 1481 sm.checkMulticast(group); 1482 1483 synchronized (stateLock) { 1484 ensureOpen(); 1485 1486 // check the registry to see if we are already a member of the group 1487 if (registry == null) { 1488 registry = new MembershipRegistry(); 1489 } else { 1490 // return existing membership key 1491 MembershipKey key = registry.checkMembership(group, interf, source); 1492 if (key != null) 1493 return key; 1494 } 1495 1496 MembershipKeyImpl key; 1497 if ((family == StandardProtocolFamily.INET6) && 1498 ((group instanceof Inet6Address) || Net.canJoin6WithIPv4Group())) 1499 { 1500 int index = interf.getIndex(); 1501 if (index == -1) 1502 throw new IOException("Network interface cannot be identified"); 1503 1504 // need multicast and source address as byte arrays 1505 byte[] groupAddress = Net.inet6AsByteArray(group); 1506 byte[] sourceAddress = (source == null) ? null : 1507 Net.inet6AsByteArray(source); 1508 1509 // join the group 1510 int n = Net.join6(fd, groupAddress, index, sourceAddress); 1511 if (n == IOStatus.UNAVAILABLE) 1512 throw new UnsupportedOperationException(); 1513 1514 key = new MembershipKeyImpl.Type6(this, group, interf, source, 1515 groupAddress, index, sourceAddress); 1516 1517 } else { 1518 // need IPv4 address to identify interface 1519 Inet4Address target = Net.anyInet4Address(interf); 1520 if (target == null) 1521 throw new IOException("Network interface not configured for IPv4"); 1522 1523 int groupAddress = Net.inet4AsInt(group); 1524 int targetAddress = Net.inet4AsInt(target); 1525 int sourceAddress = (source == null) ? 0 : Net.inet4AsInt(source); 1526 1527 // join the group 1528 int n = Net.join4(fd, groupAddress, targetAddress, sourceAddress); 1529 if (n == IOStatus.UNAVAILABLE) 1530 throw new UnsupportedOperationException(); 1531 1532 key = new MembershipKeyImpl.Type4(this, group, interf, source, 1533 groupAddress, targetAddress, sourceAddress); 1534 } 1535 1536 registry.add(key); 1537 return key; 1538 } 1539 } 1540 1541 @Override 1542 public MembershipKey join(InetAddress group, 1543 NetworkInterface interf) 1544 throws IOException 1545 { 1546 return innerJoin(group, interf, null); 1547 } 1548 1549 @Override 1550 public MembershipKey join(InetAddress group, 1551 NetworkInterface interf, 1552 InetAddress source) 1553 throws IOException 1554 { 1555 Objects.requireNonNull(source); 1556 return innerJoin(group, interf, source); 1557 } 1558 1559 // package-private 1560 void drop(MembershipKeyImpl key) { 1561 assert key.channel() == this; 1562 1563 synchronized (stateLock) { 1564 if (!key.isValid()) 1565 return; 1566 1567 try { 1568 if (key instanceof MembershipKeyImpl.Type6) { 1569 MembershipKeyImpl.Type6 key6 = 1570 (MembershipKeyImpl.Type6)key; 1571 Net.drop6(fd, key6.groupAddress(), key6.index(), key6.source()); 1572 } else { 1573 MembershipKeyImpl.Type4 key4 = (MembershipKeyImpl.Type4)key; 1574 Net.drop4(fd, key4.groupAddress(), key4.interfaceAddress(), 1575 key4.source()); 1576 } 1577 } catch (IOException ioe) { 1578 // should not happen 1579 throw new AssertionError(ioe); 1580 } 1581 1582 key.invalidate(); 1583 registry.remove(key); 1584 } 1585 } 1586 1587 /** 1588 * Finds an existing membership of a multicast group. Returns null if this 1589 * channel's socket is not a member of the group. 1590 * 1591 * @apiNote This method is for use by the socket adaptor 1592 */ 1593 MembershipKey findMembership(InetAddress group, NetworkInterface interf) { 1594 synchronized (stateLock) { 1595 if (registry != null) { 1596 return registry.checkMembership(group, interf, null); 1597 } else { 1598 return null; 1599 } 1600 } 1601 } 1602 1603 /** 1604 * Block datagrams from the given source. 1605 */ 1606 void block(MembershipKeyImpl key, InetAddress source) 1607 throws IOException 1608 { 1609 assert key.channel() == this; 1610 assert key.sourceAddress() == null; 1611 1612 synchronized (stateLock) { 1613 if (!key.isValid()) 1614 throw new IllegalStateException("key is no longer valid"); 1615 if (source.isAnyLocalAddress()) 1616 throw new IllegalArgumentException("Source address is a wildcard address"); 1617 if (source.isMulticastAddress()) 1618 throw new IllegalArgumentException("Source address is multicast address"); 1619 if (source.getClass() != key.group().getClass()) 1620 throw new IllegalArgumentException("Source address is different type to group"); 1621 1622 int n; 1623 if (key instanceof MembershipKeyImpl.Type6) { 1624 MembershipKeyImpl.Type6 key6 = 1625 (MembershipKeyImpl.Type6)key; 1626 n = Net.block6(fd, key6.groupAddress(), key6.index(), 1627 Net.inet6AsByteArray(source)); 1628 } else { 1629 MembershipKeyImpl.Type4 key4 = 1630 (MembershipKeyImpl.Type4)key; 1631 n = Net.block4(fd, key4.groupAddress(), key4.interfaceAddress(), 1632 Net.inet4AsInt(source)); 1633 } 1634 if (n == IOStatus.UNAVAILABLE) { 1635 // ancient kernel 1636 throw new UnsupportedOperationException(); 1637 } 1638 } 1639 } 1640 1641 /** 1642 * Unblock the given source. 1643 */ 1644 void unblock(MembershipKeyImpl key, InetAddress source) { 1645 assert key.channel() == this; 1646 assert key.sourceAddress() == null; 1647 1648 synchronized (stateLock) { 1649 if (!key.isValid()) 1650 throw new IllegalStateException("key is no longer valid"); 1651 1652 try { 1653 if (key instanceof MembershipKeyImpl.Type6) { 1654 MembershipKeyImpl.Type6 key6 = 1655 (MembershipKeyImpl.Type6)key; 1656 Net.unblock6(fd, key6.groupAddress(), key6.index(), 1657 Net.inet6AsByteArray(source)); 1658 } else { 1659 MembershipKeyImpl.Type4 key4 = 1660 (MembershipKeyImpl.Type4)key; 1661 Net.unblock4(fd, key4.groupAddress(), key4.interfaceAddress(), 1662 Net.inet4AsInt(source)); 1663 } 1664 } catch (IOException ioe) { 1665 // should not happen 1666 throw new AssertionError(ioe); 1667 } 1668 } 1669 } 1670 1671 /** 1672 * Closes the socket if there are no I/O operations in progress and the 1673 * channel is not registered with a Selector. 1674 */ 1675 private boolean tryClose() throws IOException { 1676 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 1677 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { 1678 state = ST_CLOSED; 1679 try { 1680 // close socket 1681 cleaner.clean(); 1682 } catch (UncheckedIOException ioe) { 1683 throw ioe.getCause(); 1684 } 1685 return true; 1686 } else { 1687 return false; 1688 } 1689 } 1690 1691 /** 1692 * Invokes tryClose to attempt to close the socket. 1693 * 1694 * This method is used for deferred closing by I/O and Selector operations. 1695 */ 1696 private void tryFinishClose() { 1697 try { 1698 tryClose(); 1699 } catch (IOException ignore) { } 1700 } 1701 1702 /** 1703 * Closes this channel when configured in blocking mode. 1704 * 1705 * If there is an I/O operation in progress then the socket is pre-closed 1706 * and the I/O threads signalled, in which case the final close is deferred 1707 * until all I/O operations complete. 1708 */ 1709 private void implCloseBlockingMode() throws IOException { 1710 synchronized (stateLock) { 1711 assert state < ST_CLOSING; 1712 state = ST_CLOSING; 1713 1714 // if member of any multicast groups then invalidate the keys 1715 if (registry != null) 1716 registry.invalidateAll(); 1717 1718 if (!tryClose()) { 1719 long reader = readerThread; 1720 long writer = writerThread; 1721 if (reader != 0 || writer != 0) { 1722 nd.preClose(fd); 1723 if (reader != 0) 1724 NativeThread.signal(reader); 1725 if (writer != 0) 1726 NativeThread.signal(writer); 1727 } 1728 } 1729 } 1730 } 1731 1732 /** 1733 * Closes this channel when configured in non-blocking mode. 1734 * 1735 * If the channel is registered with a Selector then the close is deferred 1736 * until the channel is flushed from all Selectors. 1737 */ 1738 private void implCloseNonBlockingMode() throws IOException { 1739 synchronized (stateLock) { 1740 assert state < ST_CLOSING; 1741 state = ST_CLOSING; 1742 1743 // if member of any multicast groups then invalidate the keys 1744 if (registry != null) 1745 registry.invalidateAll(); 1746 } 1747 1748 // wait for any read/write operations to complete before trying to close 1749 readLock.lock(); 1750 readLock.unlock(); 1751 writeLock.lock(); 1752 writeLock.unlock(); 1753 synchronized (stateLock) { 1754 if (state == ST_CLOSING) { 1755 tryClose(); 1756 } 1757 } 1758 } 1759 1760 /** 1761 * Invoked by implCloseChannel to close the channel. 1762 */ 1763 @Override 1764 protected void implCloseSelectableChannel() throws IOException { 1765 assert !isOpen(); 1766 if (isBlocking()) { 1767 implCloseBlockingMode(); 1768 } else { 1769 implCloseNonBlockingMode(); 1770 } 1771 } 1772 1773 @Override 1774 public void kill() { 1775 synchronized (stateLock) { 1776 if (state == ST_CLOSING) { 1777 tryFinishClose(); 1778 } 1779 } 1780 } 1781 1782 /** 1783 * Translates native poll revent set into a ready operation set 1784 */ 1785 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1786 int intOps = ski.nioInterestOps(); 1787 int oldOps = ski.nioReadyOps(); 1788 int newOps = initialOps; 1789 1790 if ((ops & Net.POLLNVAL) != 0) { 1791 // This should only happen if this channel is pre-closed while a 1792 // selection operation is in progress 1793 // ## Throw an error if this channel has not been pre-closed 1794 return false; 1795 } 1796 1797 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1798 newOps = intOps; 1799 ski.nioReadyOps(newOps); 1800 return (newOps & ~oldOps) != 0; 1801 } 1802 1803 if (((ops & Net.POLLIN) != 0) && 1804 ((intOps & SelectionKey.OP_READ) != 0)) 1805 newOps |= SelectionKey.OP_READ; 1806 1807 if (((ops & Net.POLLOUT) != 0) && 1808 ((intOps & SelectionKey.OP_WRITE) != 0)) 1809 newOps |= SelectionKey.OP_WRITE; 1810 1811 ski.nioReadyOps(newOps); 1812 return (newOps & ~oldOps) != 0; 1813 } 1814 1815 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1816 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1817 } 1818 1819 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1820 return translateReadyOps(ops, 0, ski); 1821 } 1822 1823 /** 1824 * Translates an interest operation set into a native poll event set 1825 */ 1826 public int translateInterestOps(int ops) { 1827 int newOps = 0; 1828 if ((ops & SelectionKey.OP_READ) != 0) 1829 newOps |= Net.POLLIN; 1830 if ((ops & SelectionKey.OP_WRITE) != 0) 1831 newOps |= Net.POLLOUT; 1832 if ((ops & SelectionKey.OP_CONNECT) != 0) 1833 newOps |= Net.POLLIN; 1834 return newOps; 1835 } 1836 1837 public FileDescriptor getFD() { 1838 return fd; 1839 } 1840 1841 public int getFDVal() { 1842 return fdVal; 1843 } 1844 1845 /** 1846 * Returns an action to release the given file descriptor and socket addresses. 1847 */ 1848 private static Runnable releaserFor(FileDescriptor fd, NativeSocketAddress... sockAddrs) { 1849 return () -> { 1850 try { 1851 nd.close(fd); 1852 } catch (IOException ioe) { 1853 throw new UncheckedIOException(ioe); 1854 } finally { 1855 // decrement socket count and release memory 1856 ResourceManager.afterUdpClose(); 1857 NativeSocketAddress.freeAll(sockAddrs); 1858 } 1859 }; 1860 } 1861 1862 // -- Native methods -- 1863 1864 private static native void disconnect0(FileDescriptor fd, boolean isIPv6) 1865 throws IOException; 1866 1867 private static native int receive0(FileDescriptor fd, long address, int len, 1868 long senderAddress, boolean connected) 1869 throws IOException; 1870 1871 private static native int send0(FileDescriptor fd, long address, int len, 1872 long targetAddress, int targetAddressLen) 1873 throws IOException; 1874 1875 static { 1876 IOUtil.load(); 1877 } 1878 }