1 /* 2 * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.net.*; 31 import java.util.concurrent.*; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import sun.net.NetHooks; 35 import sun.net.util.SocketExceptions; 36 import sun.security.action.GetPropertyAction; 37 38 /** 39 * Unix implementation of AsynchronousSocketChannel 40 */ 41 42 class UnixAsynchronousSocketChannelImpl 43 extends AsynchronousSocketChannelImpl implements Port.PollableChannel 44 { 45 private static final NativeDispatcher nd = new SocketDispatcher(); 46 private static enum OpType { CONNECT, READ, WRITE }; 47 48 private static final boolean disableSynchronousRead; 49 static { 50 String propValue = GetPropertyAction.privilegedGetProperty( 51 "sun.nio.ch.disableSynchronousRead", "false"); 52 disableSynchronousRead = (propValue.length() == 0) ? 53 true : Boolean.valueOf(propValue); 54 } 55 56 private final Port port; 57 private final int fdVal; 58 59 // used to ensure that the context for I/O operations that complete 60 // ascynrhonously is visible to the pooled threads handling I/O events. 61 private final Object updateLock = new Object(); 62 63 // pending connect (updateLock) 64 private boolean connectPending; 65 private CompletionHandler<Void,Object> connectHandler; 66 private Object connectAttachment; 67 private PendingFuture<Void,Object> connectFuture; 68 69 // pending remote address (stateLock) 70 private SocketAddress pendingRemote; 71 72 // pending read (updateLock) 73 private boolean readPending; 74 private boolean isScatteringRead; 75 private ByteBuffer readBuffer; 76 private ByteBuffer[] readBuffers; 77 private CompletionHandler<Number,Object> readHandler; 78 private Object readAttachment; 79 private PendingFuture<Number,Object> readFuture; 80 private Future<?> readTimer; 81 82 // pending write (updateLock) 83 private boolean writePending; 84 private boolean isGatheringWrite; 85 private ByteBuffer writeBuffer; 86 private ByteBuffer[] writeBuffers; 87 private CompletionHandler<Number,Object> writeHandler; 88 private Object writeAttachment; 89 private PendingFuture<Number,Object> writeFuture; 90 private Future<?> writeTimer; 91 92 93 UnixAsynchronousSocketChannelImpl(Port port) 94 throws IOException 95 { 96 super(port); 97 98 // set non-blocking 99 try { 100 IOUtil.configureBlocking(fd, false); 101 } catch (IOException x) { 102 nd.close(fd); 103 throw x; 104 } 105 106 this.port = port; 107 this.fdVal = IOUtil.fdVal(fd); 108 109 // add mapping from file descriptor to this channel 110 port.register(fdVal, this); 111 } 112 113 // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl 114 UnixAsynchronousSocketChannelImpl(Port port, 115 FileDescriptor fd, 116 InetSocketAddress remote) 117 throws IOException 118 { 119 super(port, fd, remote); 120 121 this.fdVal = IOUtil.fdVal(fd); 122 IOUtil.configureBlocking(fd, false); 123 124 try { 125 port.register(fdVal, this); 126 } catch (ShutdownChannelGroupException x) { 127 // ShutdownChannelGroupException thrown if we attempt to register a 128 // new channel after the group is shutdown 129 throw new IOException(x); 130 } 131 132 this.port = port; 133 } 134 135 @Override 136 public AsynchronousChannelGroupImpl group() { 137 return port; 138 } 139 140 // register events for outstanding I/O operations, caller already owns updateLock 141 private void updateEvents() { 142 assert Thread.holdsLock(updateLock); 143 int events = 0; 144 if (readPending) 145 events |= Net.POLLIN; 146 if (connectPending || writePending) 147 events |= Net.POLLOUT; 148 if (events != 0) 149 port.startPoll(fdVal, events); 150 } 151 152 // register events for outstanding I/O operations 153 private void lockAndUpdateEvents() { 154 synchronized (updateLock) { 155 updateEvents(); 156 } 157 } 158 159 // invoke to finish read and/or write operations 160 private void finish(boolean mayInvokeDirect, 161 boolean readable, 162 boolean writable) 163 { 164 boolean finishRead = false; 165 boolean finishWrite = false; 166 boolean finishConnect = false; 167 168 // map event to pending result 169 synchronized (updateLock) { 170 if (readable && this.readPending) { 171 this.readPending = false; 172 finishRead = true; 173 } 174 if (writable) { 175 if (this.writePending) { 176 this.writePending = false; 177 finishWrite = true; 178 } else if (this.connectPending) { 179 this.connectPending = false; 180 finishConnect = true; 181 } 182 } 183 } 184 185 // complete the I/O operation. Special case for when channel is 186 // ready for both reading and writing. In that case, submit task to 187 // complete write if write operation has a completion handler. 188 if (finishRead) { 189 if (finishWrite) 190 finishWrite(false); 191 finishRead(mayInvokeDirect); 192 return; 193 } 194 if (finishWrite) { 195 finishWrite(mayInvokeDirect); 196 } 197 if (finishConnect) { 198 finishConnect(mayInvokeDirect); 199 } 200 } 201 202 /** 203 * Invoked by event handler thread when file descriptor is polled 204 */ 205 @Override 206 public void onEvent(int events, boolean mayInvokeDirect) { 207 boolean readable = (events & Net.POLLIN) > 0; 208 boolean writable = (events & Net.POLLOUT) > 0; 209 if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) { 210 readable = true; 211 writable = true; 212 } 213 finish(mayInvokeDirect, readable, writable); 214 } 215 216 @Override 217 void implClose() throws IOException { 218 // remove the mapping 219 port.unregister(fdVal); 220 221 // close file descriptor 222 nd.close(fd); 223 224 // All outstanding I/O operations are required to fail 225 finish(false, true, true); 226 } 227 228 @Override 229 public void onCancel(PendingFuture<?,?> task) { 230 if (task.getContext() == OpType.CONNECT) 231 killConnect(); 232 if (task.getContext() == OpType.READ) 233 killReading(); 234 if (task.getContext() == OpType.WRITE) 235 killWriting(); 236 } 237 238 // -- connect -- 239 240 private void setConnected() throws IOException { 241 synchronized (stateLock) { 242 state = ST_CONNECTED; 243 localAddress = Net.localAddress(fd); 244 remoteAddress = (InetSocketAddress)pendingRemote; 245 } 246 } 247 248 private void finishConnect(boolean mayInvokeDirect) { 249 Throwable e = null; 250 try { 251 begin(); 252 checkConnect(fdVal); 253 setConnected(); 254 } catch (Throwable x) { 255 if (x instanceof ClosedChannelException) 256 x = new AsynchronousCloseException(); 257 e = x; 258 } finally { 259 end(); 260 } 261 if (e != null) { 262 if (e instanceof IOException) { 263 var isa = (InetSocketAddress)pendingRemote; 264 e = SocketExceptions.of((IOException)e, isa); 265 } 266 // close channel if connection cannot be established 267 try { 268 close(); 269 } catch (Throwable suppressed) { 270 e.addSuppressed(suppressed); 271 } 272 } 273 274 // invoke handler and set result 275 CompletionHandler<Void,Object> handler = connectHandler; 276 connectHandler = null; 277 Object att = connectAttachment; 278 PendingFuture<Void,Object> future = connectFuture; 279 if (handler == null) { 280 future.setResult(null, e); 281 } else { 282 if (mayInvokeDirect) { 283 Invoker.invokeUnchecked(handler, att, null, e); 284 } else { 285 Invoker.invokeIndirectly(this, handler, att, null, e); 286 } 287 } 288 } 289 290 @Override 291 @SuppressWarnings("unchecked") 292 <A> Future<Void> implConnect(SocketAddress remote, 293 A attachment, 294 CompletionHandler<Void,? super A> handler) 295 { 296 if (!isOpen()) { 297 Throwable e = new ClosedChannelException(); 298 if (handler == null) { 299 return CompletedFuture.withFailure(e); 300 } else { 301 Invoker.invoke(this, handler, attachment, null, e); 302 return null; 303 } 304 } 305 306 InetSocketAddress isa = Net.checkAddress(remote); 307 308 // permission check 309 SecurityManager sm = System.getSecurityManager(); 310 if (sm != null) 311 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 312 313 // check and set state 314 boolean notifyBeforeTcpConnect; 315 synchronized (stateLock) { 316 if (state == ST_CONNECTED) 317 throw new AlreadyConnectedException(); 318 if (state == ST_PENDING) 319 throw new ConnectionPendingException(); 320 state = ST_PENDING; 321 pendingRemote = remote; 322 notifyBeforeTcpConnect = (localAddress == null); 323 } 324 325 Throwable e = null; 326 try { 327 begin(); 328 // notify hook if unbound 329 if (notifyBeforeTcpConnect) 330 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 331 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 332 if (n == IOStatus.UNAVAILABLE) { 333 // connection could not be established immediately 334 PendingFuture<Void,A> result = null; 335 synchronized (updateLock) { 336 if (handler == null) { 337 result = new PendingFuture<Void,A>(this, OpType.CONNECT); 338 this.connectFuture = (PendingFuture<Void,Object>)result; 339 } else { 340 this.connectHandler = (CompletionHandler<Void,Object>)handler; 341 this.connectAttachment = attachment; 342 } 343 this.connectPending = true; 344 updateEvents(); 345 } 346 return result; 347 } 348 setConnected(); 349 } catch (Throwable x) { 350 if (x instanceof ClosedChannelException) 351 x = new AsynchronousCloseException(); 352 e = x; 353 } finally { 354 end(); 355 } 356 357 // close channel if connect fails 358 if (e != null) { 359 if (e instanceof IOException) { 360 e = SocketExceptions.of((IOException)e, isa); 361 } 362 try { 363 close(); 364 } catch (Throwable suppressed) { 365 e.addSuppressed(suppressed); 366 } 367 } 368 if (handler == null) { 369 return CompletedFuture.withResult(null, e); 370 } else { 371 Invoker.invoke(this, handler, attachment, null, e); 372 return null; 373 } 374 } 375 376 // -- read -- 377 378 private void finishRead(boolean mayInvokeDirect) { 379 int n = -1; 380 Throwable exc = null; 381 382 // copy fields as we can't access them after reading is re-enabled. 383 boolean scattering = isScatteringRead; 384 CompletionHandler<Number,Object> handler = readHandler; 385 Object att = readAttachment; 386 PendingFuture<Number,Object> future = readFuture; 387 Future<?> timeout = readTimer; 388 389 try { 390 begin(); 391 392 if (scattering) { 393 n = (int)IOUtil.read(fd, readBuffers, nd); 394 } else { 395 n = IOUtil.read(fd, readBuffer, -1, nd); 396 } 397 if (n == IOStatus.UNAVAILABLE) { 398 // spurious wakeup, is this possible? 399 synchronized (updateLock) { 400 readPending = true; 401 } 402 return; 403 } 404 405 // allow objects to be GC'ed. 406 this.readBuffer = null; 407 this.readBuffers = null; 408 this.readAttachment = null; 409 this.readHandler = null; 410 411 // allow another read to be initiated 412 enableReading(); 413 414 } catch (Throwable x) { 415 enableReading(); 416 if (x instanceof ClosedChannelException) 417 x = new AsynchronousCloseException(); 418 exc = x; 419 } finally { 420 // restart poll in case of concurrent write 421 if (!(exc instanceof AsynchronousCloseException)) 422 lockAndUpdateEvents(); 423 end(); 424 } 425 426 // cancel the associated timer 427 if (timeout != null) 428 timeout.cancel(false); 429 430 // create result 431 Number result = (exc != null) ? null : (scattering) ? 432 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 433 434 // invoke handler or set result 435 if (handler == null) { 436 future.setResult(result, exc); 437 } else { 438 if (mayInvokeDirect) { 439 Invoker.invokeUnchecked(handler, att, result, exc); 440 } else { 441 Invoker.invokeIndirectly(this, handler, att, result, exc); 442 } 443 } 444 } 445 446 private Runnable readTimeoutTask = new Runnable() { 447 public void run() { 448 CompletionHandler<Number,Object> handler = null; 449 Object att = null; 450 PendingFuture<Number,Object> future = null; 451 452 synchronized (updateLock) { 453 if (!readPending) 454 return; 455 readPending = false; 456 handler = readHandler; 457 att = readAttachment; 458 future = readFuture; 459 } 460 461 // kill further reading before releasing waiters 462 enableReading(true); 463 464 // invoke handler or set result 465 Exception exc = new InterruptedByTimeoutException(); 466 if (handler == null) { 467 future.setFailure(exc); 468 } else { 469 AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this; 470 Invoker.invokeIndirectly(ch, handler, att, null, exc); 471 } 472 } 473 }; 474 475 /** 476 * Initiates a read or scattering read operation 477 */ 478 @Override 479 @SuppressWarnings("unchecked") 480 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 481 ByteBuffer dst, 482 ByteBuffer[] dsts, 483 long timeout, 484 TimeUnit unit, 485 A attachment, 486 CompletionHandler<V,? super A> handler) 487 { 488 // A synchronous read is not attempted if disallowed by system property 489 // or, we are using a fixed thread pool and the completion handler may 490 // not be invoked directly (because the thread is not a pooled thread or 491 // there are too many handlers on the stack). 492 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null; 493 boolean invokeDirect = false; 494 boolean attemptRead = false; 495 if (!disableSynchronousRead) { 496 if (handler == null) { 497 attemptRead = true; 498 } else { 499 myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); 500 invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 501 // okay to attempt read with user thread pool 502 attemptRead = invokeDirect || !port.isFixedThreadPool(); 503 } 504 } 505 506 int n = IOStatus.UNAVAILABLE; 507 Throwable exc = null; 508 boolean pending = false; 509 510 try { 511 begin(); 512 513 if (attemptRead) { 514 if (isScatteringRead) { 515 n = (int)IOUtil.read(fd, dsts, nd); 516 } else { 517 n = IOUtil.read(fd, dst, -1, nd); 518 } 519 } 520 521 if (n == IOStatus.UNAVAILABLE) { 522 PendingFuture<V,A> result = null; 523 synchronized (updateLock) { 524 this.isScatteringRead = isScatteringRead; 525 this.readBuffer = dst; 526 this.readBuffers = dsts; 527 if (handler == null) { 528 this.readHandler = null; 529 result = new PendingFuture<V,A>(this, OpType.READ); 530 this.readFuture = (PendingFuture<Number,Object>)result; 531 this.readAttachment = null; 532 } else { 533 this.readHandler = (CompletionHandler<Number,Object>)handler; 534 this.readAttachment = attachment; 535 this.readFuture = null; 536 } 537 if (timeout > 0L) { 538 this.readTimer = port.schedule(readTimeoutTask, timeout, unit); 539 } 540 this.readPending = true; 541 updateEvents(); 542 } 543 pending = true; 544 return result; 545 } 546 } catch (Throwable x) { 547 if (x instanceof ClosedChannelException) 548 x = new AsynchronousCloseException(); 549 exc = x; 550 } finally { 551 if (!pending) 552 enableReading(); 553 end(); 554 } 555 556 Number result = (exc != null) ? null : (isScatteringRead) ? 557 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 558 559 // read completed immediately 560 if (handler != null) { 561 if (invokeDirect) { 562 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 563 } else { 564 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 565 } 566 return null; 567 } else { 568 return CompletedFuture.withResult((V)result, exc); 569 } 570 } 571 572 // -- write -- 573 574 private void finishWrite(boolean mayInvokeDirect) { 575 int n = -1; 576 Throwable exc = null; 577 578 // copy fields as we can't access them after reading is re-enabled. 579 boolean gathering = this.isGatheringWrite; 580 CompletionHandler<Number,Object> handler = this.writeHandler; 581 Object att = this.writeAttachment; 582 PendingFuture<Number,Object> future = this.writeFuture; 583 Future<?> timer = this.writeTimer; 584 585 try { 586 begin(); 587 588 if (gathering) { 589 n = (int)IOUtil.write(fd, writeBuffers, nd); 590 } else { 591 n = IOUtil.write(fd, writeBuffer, -1, nd); 592 } 593 if (n == IOStatus.UNAVAILABLE) { 594 // spurious wakeup, is this possible? 595 synchronized (updateLock) { 596 writePending = true; 597 } 598 return; 599 } 600 601 // allow objects to be GC'ed. 602 this.writeBuffer = null; 603 this.writeBuffers = null; 604 this.writeAttachment = null; 605 this.writeHandler = null; 606 607 // allow another write to be initiated 608 enableWriting(); 609 610 } catch (Throwable x) { 611 enableWriting(); 612 if (x instanceof ClosedChannelException) 613 x = new AsynchronousCloseException(); 614 exc = x; 615 } finally { 616 // restart poll in case of concurrent write 617 if (!(exc instanceof AsynchronousCloseException)) 618 lockAndUpdateEvents(); 619 end(); 620 } 621 622 // cancel the associated timer 623 if (timer != null) 624 timer.cancel(false); 625 626 // create result 627 Number result = (exc != null) ? null : (gathering) ? 628 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 629 630 // invoke handler or set result 631 if (handler == null) { 632 future.setResult(result, exc); 633 } else { 634 if (mayInvokeDirect) { 635 Invoker.invokeUnchecked(handler, att, result, exc); 636 } else { 637 Invoker.invokeIndirectly(this, handler, att, result, exc); 638 } 639 } 640 } 641 642 private Runnable writeTimeoutTask = new Runnable() { 643 public void run() { 644 CompletionHandler<Number,Object> handler = null; 645 Object att = null; 646 PendingFuture<Number,Object> future = null; 647 648 synchronized (updateLock) { 649 if (!writePending) 650 return; 651 writePending = false; 652 handler = writeHandler; 653 att = writeAttachment; 654 future = writeFuture; 655 } 656 657 // kill further writing before releasing waiters 658 enableWriting(true); 659 660 // invoke handler or set result 661 Exception exc = new InterruptedByTimeoutException(); 662 if (handler != null) { 663 Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, 664 handler, att, null, exc); 665 } else { 666 future.setFailure(exc); 667 } 668 } 669 }; 670 671 /** 672 * Initiates a read or scattering read operation 673 */ 674 @Override 675 @SuppressWarnings("unchecked") 676 <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 677 ByteBuffer src, 678 ByteBuffer[] srcs, 679 long timeout, 680 TimeUnit unit, 681 A attachment, 682 CompletionHandler<V,? super A> handler) 683 { 684 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 685 Invoker.getGroupAndInvokeCount(); 686 boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 687 boolean attemptWrite = (handler == null) || invokeDirect || 688 !port.isFixedThreadPool(); // okay to attempt write with user thread pool 689 690 int n = IOStatus.UNAVAILABLE; 691 Throwable exc = null; 692 boolean pending = false; 693 694 try { 695 begin(); 696 697 if (attemptWrite) { 698 if (isGatheringWrite) { 699 n = (int)IOUtil.write(fd, srcs, nd); 700 } else { 701 n = IOUtil.write(fd, src, -1, nd); 702 } 703 } 704 705 if (n == IOStatus.UNAVAILABLE) { 706 PendingFuture<V,A> result = null; 707 synchronized (updateLock) { 708 this.isGatheringWrite = isGatheringWrite; 709 this.writeBuffer = src; 710 this.writeBuffers = srcs; 711 if (handler == null) { 712 this.writeHandler = null; 713 result = new PendingFuture<V,A>(this, OpType.WRITE); 714 this.writeFuture = (PendingFuture<Number,Object>)result; 715 this.writeAttachment = null; 716 } else { 717 this.writeHandler = (CompletionHandler<Number,Object>)handler; 718 this.writeAttachment = attachment; 719 this.writeFuture = null; 720 } 721 if (timeout > 0L) { 722 this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit); 723 } 724 this.writePending = true; 725 updateEvents(); 726 } 727 pending = true; 728 return result; 729 } 730 } catch (Throwable x) { 731 if (x instanceof ClosedChannelException) 732 x = new AsynchronousCloseException(); 733 exc = x; 734 } finally { 735 if (!pending) 736 enableWriting(); 737 end(); 738 } 739 740 Number result = (exc != null) ? null : (isGatheringWrite) ? 741 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 742 743 // write completed immediately 744 if (handler != null) { 745 if (invokeDirect) { 746 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 747 } else { 748 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 749 } 750 return null; 751 } else { 752 return CompletedFuture.withResult((V)result, exc); 753 } 754 } 755 756 // -- Native methods -- 757 758 private static native void checkConnect(int fdVal) throws IOException; 759 760 static { 761 IOUtil.load(); 762 } 763 }