1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.net.*; 31 import java.util.concurrent.*; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import sun.net.NetHooks; 35 import sun.net.util.SocketExceptions; 36 import sun.security.action.GetPropertyAction; 37 38 /** 39 * Unix implementation of AsynchronousSocketChannel 40 */ 41 42 class UnixAsynchronousSocketChannelImpl 43 extends AsynchronousSocketChannelImpl implements Port.PollableChannel 44 { 45 private static final NativeDispatcher nd = new SocketDispatcher(); 46 private static enum OpType { CONNECT, READ, WRITE }; 47 48 private static final boolean disableSynchronousRead; 49 static { 50 String propValue = GetPropertyAction.privilegedGetProperty( 51 "sun.nio.ch.disableSynchronousRead", "false"); 52 disableSynchronousRead = (propValue.length() == 0) ? 53 true : Boolean.valueOf(propValue); 54 } 55 56 private final Port port; 57 private final int fdVal; 58 59 // used to ensure that the context for I/O operations that complete 60 // ascynrhonously is visible to the pooled threads handling I/O events. 61 private final Object updateLock = new Object(); 62 63 // pending connect (updateLock) 64 private boolean connectPending; 65 private CompletionHandler<Void,Object> connectHandler; 66 private Object connectAttachment; 67 private PendingFuture<Void,Object> connectFuture; 68 69 // pending remote address (stateLock) 70 private SocketAddress pendingRemote; 71 72 // pending read (updateLock) 73 private boolean readPending; 74 private boolean isScatteringRead; 75 private ByteBuffer readBuffer; 76 private ByteBuffer[] readBuffers; 77 private CompletionHandler<Number,Object> readHandler; 78 private Object readAttachment; 79 private PendingFuture<Number,Object> readFuture; 80 private Future<?> readTimer; 81 82 // pending write (updateLock) 83 private boolean writePending; 84 private boolean isGatheringWrite; 85 private ByteBuffer writeBuffer; 86 private ByteBuffer[] writeBuffers; 87 private CompletionHandler<Number,Object> writeHandler; 88 private Object writeAttachment; 89 private PendingFuture<Number,Object> writeFuture; 90 private Future<?> writeTimer; 91 92 93 UnixAsynchronousSocketChannelImpl(Port port) 94 throws IOException 95 { 96 super(port); 97 98 // set non-blocking 99 try { 100 IOUtil.configureBlocking(fd, false); 101 } catch (IOException x) { 102 nd.close(fd); 103 throw x; 104 } 105 106 this.port = port; 107 this.fdVal = IOUtil.fdVal(fd); 108 109 // add mapping from file descriptor to this channel 110 port.register(fdVal, this); 111 } 112 113 // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl 114 UnixAsynchronousSocketChannelImpl(Port port, 115 FileDescriptor fd, 116 InetSocketAddress remote) 117 throws IOException 118 { 119 super(port, fd, remote); 120 121 this.fdVal = IOUtil.fdVal(fd); 122 IOUtil.configureBlocking(fd, false); 123 124 try { 125 port.register(fdVal, this); 126 } catch (ShutdownChannelGroupException x) { 127 // ShutdownChannelGroupException thrown if we attempt to register a 128 // new channel after the group is shutdown 129 throw new IOException(x); 130 } 131 132 this.port = port; 133 } 134 135 @Override 136 public AsynchronousChannelGroupImpl group() { 137 return port; 138 } 139 140 // register events for outstanding I/O operations, caller already owns updateLock 141 private void updateEvents() { 142 assert Thread.holdsLock(updateLock); 143 int events = 0; 144 if (readPending) 145 events |= Net.POLLIN; 146 if (connectPending || writePending) 147 events |= Net.POLLOUT; 148 if (events != 0) 149 port.startPoll(fdVal, events); 150 } 151 152 // register events for outstanding I/O operations 153 private void lockAndUpdateEvents() { 154 synchronized (updateLock) { 155 updateEvents(); 156 } 157 } 158 159 // invoke to finish read and/or write operations 160 private void finish(boolean mayInvokeDirect, 161 boolean readable, 162 boolean writable) 163 { 164 boolean finishRead = false; 165 boolean finishWrite = false; 166 boolean finishConnect = false; 167 168 // map event to pending result 169 synchronized (updateLock) { 170 if (readable && this.readPending) { 171 this.readPending = false; 172 finishRead = true; 173 } 174 if (writable) { 175 if (this.writePending) { 176 this.writePending = false; 177 finishWrite = true; 178 } else if (this.connectPending) { 179 this.connectPending = false; 180 finishConnect = true; 181 } 182 } 183 } 184 185 // complete the I/O operation. Special case for when channel is 186 // ready for both reading and writing. In that case, submit task to 187 // complete write if write operation has a completion handler. 188 if (finishRead) { 189 if (finishWrite) 190 finishWrite(false); 191 finishRead(mayInvokeDirect); 192 return; 193 } 194 if (finishWrite) { 195 finishWrite(mayInvokeDirect); 196 } 197 if (finishConnect) { 198 finishConnect(mayInvokeDirect); 199 } 200 } 201 202 /** 203 * Invoked by event handler thread when file descriptor is polled 204 */ 205 @Override 206 public void onEvent(int events, boolean mayInvokeDirect) { 207 boolean readable = (events & Net.POLLIN) > 0; 208 boolean writable = (events & Net.POLLOUT) > 0; 209 if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) { 210 readable = true; 211 writable = true; 212 } 213 finish(mayInvokeDirect, readable, writable); 214 } 215 216 @Override 217 void implClose() throws IOException { 218 // remove the mapping 219 port.unregister(fdVal); 220 221 // close file descriptor 222 nd.close(fd); 223 224 // All outstanding I/O operations are required to fail 225 finish(false, true, true); 226 } 227 228 @Override 229 public void onCancel(PendingFuture<?,?> task) { 230 if (task.getContext() == OpType.CONNECT) 231 killConnect(); 232 if (task.getContext() == OpType.READ) 233 killReading(); 234 if (task.getContext() == OpType.WRITE) 235 killWriting(); 236 } 237 238 // -- connect -- 239 240 private void setConnected() throws IOException { 241 synchronized (stateLock) { 242 state = ST_CONNECTED; 243 localAddress = Net.localAddress(fd); 244 remoteAddress = (InetSocketAddress)pendingRemote; 245 } 246 } 247 248 private void finishConnect(boolean mayInvokeDirect) { 249 Throwable e = null; 250 try { 251 begin(); 252 checkConnect(fdVal); 253 setConnected(); 254 } catch (Throwable x) { 255 if (x instanceof ClosedChannelException) 256 x = new AsynchronousCloseException(); 257 e = x; 258 } finally { 259 end(); 260 } 261 if (e != null) { 262 if (e instanceof IOException) { 263 var isa = (InetSocketAddress)pendingRemote; 264 e = SocketExceptions.of((IOException)e, isa); 265 } 266 // close channel if connection cannot be established 267 try { 268 close(); 269 } catch (Throwable suppressed) { 270 e.addSuppressed(suppressed); 271 } 272 } 273 274 // invoke handler and set result 275 CompletionHandler<Void,Object> handler = connectHandler; 276 Object att = connectAttachment; 277 PendingFuture<Void,Object> future = connectFuture; 278 if (handler == null) { 279 future.setResult(null, e); 280 } else { 281 if (mayInvokeDirect) { 282 Invoker.invokeUnchecked(handler, att, null, e); 283 } else { 284 Invoker.invokeIndirectly(this, handler, att, null, e); 285 } 286 } 287 } 288 289 @Override 290 @SuppressWarnings("unchecked") 291 <A> Future<Void> implConnect(SocketAddress remote, 292 A attachment, 293 CompletionHandler<Void,? super A> handler) 294 { 295 if (!isOpen()) { 296 Throwable e = new ClosedChannelException(); 297 if (handler == null) { 298 return CompletedFuture.withFailure(e); 299 } else { 300 Invoker.invoke(this, handler, attachment, null, e); 301 return null; 302 } 303 } 304 305 InetSocketAddress isa = Net.checkAddress(remote); 306 307 // permission check 308 SecurityManager sm = System.getSecurityManager(); 309 if (sm != null) 310 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 311 312 // check and set state 313 boolean notifyBeforeTcpConnect; 314 synchronized (stateLock) { 315 if (state == ST_CONNECTED) 316 throw new AlreadyConnectedException(); 317 if (state == ST_PENDING) 318 throw new ConnectionPendingException(); 319 state = ST_PENDING; 320 pendingRemote = remote; 321 notifyBeforeTcpConnect = (localAddress == null); 322 } 323 324 Throwable e = null; 325 try { 326 begin(); 327 // notify hook if unbound 328 if (notifyBeforeTcpConnect) 329 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 330 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 331 if (n == IOStatus.UNAVAILABLE) { 332 // connection could not be established immediately 333 PendingFuture<Void,A> result = null; 334 synchronized (updateLock) { 335 if (handler == null) { 336 result = new PendingFuture<Void,A>(this, OpType.CONNECT); 337 this.connectFuture = (PendingFuture<Void,Object>)result; 338 } else { 339 this.connectHandler = (CompletionHandler<Void,Object>)handler; 340 this.connectAttachment = attachment; 341 } 342 this.connectPending = true; 343 updateEvents(); 344 } 345 return result; 346 } 347 setConnected(); 348 } catch (Throwable x) { 349 if (x instanceof ClosedChannelException) 350 x = new AsynchronousCloseException(); 351 e = x; 352 } finally { 353 end(); 354 } 355 356 // close channel if connect fails 357 if (e != null) { 358 if (e instanceof IOException) { 359 e = SocketExceptions.of((IOException)e, isa); 360 } 361 try { 362 close(); 363 } catch (Throwable suppressed) { 364 e.addSuppressed(suppressed); 365 } 366 } 367 if (handler == null) { 368 return CompletedFuture.withResult(null, e); 369 } else { 370 Invoker.invoke(this, handler, attachment, null, e); 371 return null; 372 } 373 } 374 375 // -- read -- 376 377 private void finishRead(boolean mayInvokeDirect) { 378 int n = -1; 379 Throwable exc = null; 380 381 // copy fields as we can't access them after reading is re-enabled. 382 boolean scattering = isScatteringRead; 383 CompletionHandler<Number,Object> handler = readHandler; 384 Object att = readAttachment; 385 PendingFuture<Number,Object> future = readFuture; 386 Future<?> timeout = readTimer; 387 388 try { 389 begin(); 390 391 if (scattering) { 392 n = (int)IOUtil.read(fd, readBuffers, nd); 393 } else { 394 n = IOUtil.read(fd, readBuffer, -1, nd); 395 } 396 if (n == IOStatus.UNAVAILABLE) { 397 // spurious wakeup, is this possible? 398 synchronized (updateLock) { 399 readPending = true; 400 } 401 return; 402 } 403 404 // allow objects to be GC'ed. 405 this.readBuffer = null; 406 this.readBuffers = null; 407 this.readAttachment = null; 408 409 // allow another read to be initiated 410 enableReading(); 411 412 } catch (Throwable x) { 413 enableReading(); 414 if (x instanceof ClosedChannelException) 415 x = new AsynchronousCloseException(); 416 exc = x; 417 } finally { 418 // restart poll in case of concurrent write 419 if (!(exc instanceof AsynchronousCloseException)) 420 lockAndUpdateEvents(); 421 end(); 422 } 423 424 // cancel the associated timer 425 if (timeout != null) 426 timeout.cancel(false); 427 428 // create result 429 Number result = (exc != null) ? null : (scattering) ? 430 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 431 432 // invoke handler or set result 433 if (handler == null) { 434 future.setResult(result, exc); 435 } else { 436 if (mayInvokeDirect) { 437 Invoker.invokeUnchecked(handler, att, result, exc); 438 } else { 439 Invoker.invokeIndirectly(this, handler, att, result, exc); 440 } 441 } 442 } 443 444 private Runnable readTimeoutTask = new Runnable() { 445 public void run() { 446 CompletionHandler<Number,Object> handler = null; 447 Object att = null; 448 PendingFuture<Number,Object> future = null; 449 450 synchronized (updateLock) { 451 if (!readPending) 452 return; 453 readPending = false; 454 handler = readHandler; 455 att = readAttachment; 456 future = readFuture; 457 } 458 459 // kill further reading before releasing waiters 460 enableReading(true); 461 462 // invoke handler or set result 463 Exception exc = new InterruptedByTimeoutException(); 464 if (handler == null) { 465 future.setFailure(exc); 466 } else { 467 AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this; 468 Invoker.invokeIndirectly(ch, handler, att, null, exc); 469 } 470 } 471 }; 472 473 /** 474 * Initiates a read or scattering read operation 475 */ 476 @Override 477 @SuppressWarnings("unchecked") 478 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 479 ByteBuffer dst, 480 ByteBuffer[] dsts, 481 long timeout, 482 TimeUnit unit, 483 A attachment, 484 CompletionHandler<V,? super A> handler) 485 { 486 // A synchronous read is not attempted if disallowed by system property 487 // or, we are using a fixed thread pool and the completion handler may 488 // not be invoked directly (because the thread is not a pooled thread or 489 // there are too many handlers on the stack). 490 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null; 491 boolean invokeDirect = false; 492 boolean attemptRead = false; 493 if (!disableSynchronousRead) { 494 if (handler == null) { 495 attemptRead = true; 496 } else { 497 myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); 498 invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 499 // okay to attempt read with user thread pool 500 attemptRead = invokeDirect || !port.isFixedThreadPool(); 501 } 502 } 503 504 int n = IOStatus.UNAVAILABLE; 505 Throwable exc = null; 506 boolean pending = false; 507 508 try { 509 begin(); 510 511 if (attemptRead) { 512 if (isScatteringRead) { 513 n = (int)IOUtil.read(fd, dsts, nd); 514 } else { 515 n = IOUtil.read(fd, dst, -1, nd); 516 } 517 } 518 519 if (n == IOStatus.UNAVAILABLE) { 520 PendingFuture<V,A> result = null; 521 synchronized (updateLock) { 522 this.isScatteringRead = isScatteringRead; 523 this.readBuffer = dst; 524 this.readBuffers = dsts; 525 if (handler == null) { 526 this.readHandler = null; 527 result = new PendingFuture<V,A>(this, OpType.READ); 528 this.readFuture = (PendingFuture<Number,Object>)result; 529 this.readAttachment = null; 530 } else { 531 this.readHandler = (CompletionHandler<Number,Object>)handler; 532 this.readAttachment = attachment; 533 this.readFuture = null; 534 } 535 if (timeout > 0L) { 536 this.readTimer = port.schedule(readTimeoutTask, timeout, unit); 537 } 538 this.readPending = true; 539 updateEvents(); 540 } 541 pending = true; 542 return result; 543 } 544 } catch (Throwable x) { 545 if (x instanceof ClosedChannelException) 546 x = new AsynchronousCloseException(); 547 exc = x; 548 } finally { 549 if (!pending) 550 enableReading(); 551 end(); 552 } 553 554 Number result = (exc != null) ? null : (isScatteringRead) ? 555 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 556 557 // read completed immediately 558 if (handler != null) { 559 if (invokeDirect) { 560 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 561 } else { 562 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 563 } 564 return null; 565 } else { 566 return CompletedFuture.withResult((V)result, exc); 567 } 568 } 569 570 // -- write -- 571 572 private void finishWrite(boolean mayInvokeDirect) { 573 int n = -1; 574 Throwable exc = null; 575 576 // copy fields as we can't access them after reading is re-enabled. 577 boolean gathering = this.isGatheringWrite; 578 CompletionHandler<Number,Object> handler = this.writeHandler; 579 Object att = this.writeAttachment; 580 PendingFuture<Number,Object> future = this.writeFuture; 581 Future<?> timer = this.writeTimer; 582 583 try { 584 begin(); 585 586 if (gathering) { 587 n = (int)IOUtil.write(fd, writeBuffers, nd); 588 } else { 589 n = IOUtil.write(fd, writeBuffer, -1, nd); 590 } 591 if (n == IOStatus.UNAVAILABLE) { 592 // spurious wakeup, is this possible? 593 synchronized (updateLock) { 594 writePending = true; 595 } 596 return; 597 } 598 599 // allow objects to be GC'ed. 600 this.writeBuffer = null; 601 this.writeBuffers = null; 602 this.writeAttachment = null; 603 604 // allow another write to be initiated 605 enableWriting(); 606 607 } catch (Throwable x) { 608 enableWriting(); 609 if (x instanceof ClosedChannelException) 610 x = new AsynchronousCloseException(); 611 exc = x; 612 } finally { 613 // restart poll in case of concurrent write 614 if (!(exc instanceof AsynchronousCloseException)) 615 lockAndUpdateEvents(); 616 end(); 617 } 618 619 // cancel the associated timer 620 if (timer != null) 621 timer.cancel(false); 622 623 // create result 624 Number result = (exc != null) ? null : (gathering) ? 625 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 626 627 // invoke handler or set result 628 if (handler == null) { 629 future.setResult(result, exc); 630 } else { 631 if (mayInvokeDirect) { 632 Invoker.invokeUnchecked(handler, att, result, exc); 633 } else { 634 Invoker.invokeIndirectly(this, handler, att, result, exc); 635 } 636 } 637 } 638 639 private Runnable writeTimeoutTask = new Runnable() { 640 public void run() { 641 CompletionHandler<Number,Object> handler = null; 642 Object att = null; 643 PendingFuture<Number,Object> future = null; 644 645 synchronized (updateLock) { 646 if (!writePending) 647 return; 648 writePending = false; 649 handler = writeHandler; 650 att = writeAttachment; 651 future = writeFuture; 652 } 653 654 // kill further writing before releasing waiters 655 enableWriting(true); 656 657 // invoke handler or set result 658 Exception exc = new InterruptedByTimeoutException(); 659 if (handler != null) { 660 Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, 661 handler, att, null, exc); 662 } else { 663 future.setFailure(exc); 664 } 665 } 666 }; 667 668 /** 669 * Initiates a read or scattering read operation 670 */ 671 @Override 672 @SuppressWarnings("unchecked") 673 <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 674 ByteBuffer src, 675 ByteBuffer[] srcs, 676 long timeout, 677 TimeUnit unit, 678 A attachment, 679 CompletionHandler<V,? super A> handler) 680 { 681 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 682 Invoker.getGroupAndInvokeCount(); 683 boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 684 boolean attemptWrite = (handler == null) || invokeDirect || 685 !port.isFixedThreadPool(); // okay to attempt write with user thread pool 686 687 int n = IOStatus.UNAVAILABLE; 688 Throwable exc = null; 689 boolean pending = false; 690 691 try { 692 begin(); 693 694 if (attemptWrite) { 695 if (isGatheringWrite) { 696 n = (int)IOUtil.write(fd, srcs, nd); 697 } else { 698 n = IOUtil.write(fd, src, -1, nd); 699 } 700 } 701 702 if (n == IOStatus.UNAVAILABLE) { 703 PendingFuture<V,A> result = null; 704 synchronized (updateLock) { 705 this.isGatheringWrite = isGatheringWrite; 706 this.writeBuffer = src; 707 this.writeBuffers = srcs; 708 if (handler == null) { 709 this.writeHandler = null; 710 result = new PendingFuture<V,A>(this, OpType.WRITE); 711 this.writeFuture = (PendingFuture<Number,Object>)result; 712 this.writeAttachment = null; 713 } else { 714 this.writeHandler = (CompletionHandler<Number,Object>)handler; 715 this.writeAttachment = attachment; 716 this.writeFuture = null; 717 } 718 if (timeout > 0L) { 719 this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit); 720 } 721 this.writePending = true; 722 updateEvents(); 723 } 724 pending = true; 725 return result; 726 } 727 } catch (Throwable x) { 728 if (x instanceof ClosedChannelException) 729 x = new AsynchronousCloseException(); 730 exc = x; 731 } finally { 732 if (!pending) 733 enableWriting(); 734 end(); 735 } 736 737 Number result = (exc != null) ? null : (isGatheringWrite) ? 738 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 739 740 // write completed immediately 741 if (handler != null) { 742 if (invokeDirect) { 743 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 744 } else { 745 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 746 } 747 return null; 748 } else { 749 return CompletedFuture.withResult((V)result, exc); 750 } 751 } 752 753 // -- Native methods -- 754 755 private static native void checkConnect(int fdVal) throws IOException; 756 757 static { 758 IOUtil.load(); 759 } 760 }