1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.net.*; 31 import java.util.concurrent.*; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import java.security.AccessController; 35 import sun.net.NetHooks; 36 import sun.security.action.GetPropertyAction; 37 38 /** 39 * Unix implementation of AsynchronousSocketChannel 40 */ 41 42 class UnixAsynchronousSocketChannelImpl 43 extends AsynchronousSocketChannelImpl implements Port.PollableChannel 44 { 45 private static final NativeDispatcher nd = new SocketDispatcher(); 46 private static enum OpType { CONNECT, READ, WRITE }; 47 48 private static final boolean disableSynchronousRead; 49 static { 50 String propValue = AccessController.doPrivileged( 51 new GetPropertyAction("sun.nio.ch.disableSynchronousRead", "false")); 52 disableSynchronousRead = (propValue.length() == 0) ? 53 true : Boolean.valueOf(propValue); 54 } 55 56 private final Port port; 57 private final int fdVal; 58 59 // used to ensure that the context for I/O operations that complete 60 // ascynrhonously is visible to the pooled threads handling I/O events. 61 private final Object updateLock = new Object(); 62 63 // pending connect (updateLock) 64 private boolean connectPending; 65 private CompletionHandler<Void,Object> connectHandler; 66 private Object connectAttachment; 67 private PendingFuture<Void,Object> connectFuture; 68 69 // pending remote address (stateLock) 70 private SocketAddress pendingRemote; 71 72 // pending read (updateLock) 73 private boolean readPending; 74 private boolean isScatteringRead; 75 private ByteBuffer readBuffer; 76 private ByteBuffer[] readBuffers; 77 private CompletionHandler<Number,Object> readHandler; 78 private Object readAttachment; 79 private PendingFuture<Number,Object> readFuture; 80 private Future<?> readTimer; 81 82 // pending write (updateLock) 83 private boolean writePending; 84 private boolean isGatheringWrite; 85 private ByteBuffer writeBuffer; 86 private ByteBuffer[] writeBuffers; 87 private CompletionHandler<Number,Object> writeHandler; 88 private Object writeAttachment; 89 private PendingFuture<Number,Object> writeFuture; 90 private Future<?> writeTimer; 91 92 93 UnixAsynchronousSocketChannelImpl(Port port) 94 throws IOException 95 { 96 super(port); 97 98 // set non-blocking 99 try { 100 IOUtil.configureBlocking(fd, false); 101 } catch (IOException x) { 102 nd.close(fd); 103 throw x; 104 } 105 106 this.port = port; 107 this.fdVal = IOUtil.fdVal(fd); 108 109 // add mapping from file descriptor to this channel 110 port.register(fdVal, this); 111 } 112 113 // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl 114 UnixAsynchronousSocketChannelImpl(Port port, 115 FileDescriptor fd, 116 InetSocketAddress remote) 117 throws IOException 118 { 119 super(port, fd, remote); 120 121 this.fdVal = IOUtil.fdVal(fd); 122 IOUtil.configureBlocking(fd, false); 123 124 try { 125 port.register(fdVal, this); 126 } catch (ShutdownChannelGroupException x) { 127 // ShutdownChannelGroupException thrown if we attempt to register a 128 // new channel after the group is shutdown 129 throw new IOException(x); 130 } 131 132 this.port = port; 133 } 134 135 @Override 136 public AsynchronousChannelGroupImpl group() { 137 return port; 138 } 139 140 // register events for outstanding I/O operations, caller already owns updateLock 141 private void updateEvents() { 142 assert Thread.holdsLock(updateLock); 143 int events = 0; 144 if (readPending) 145 events |= Net.POLLIN; 146 if (connectPending || writePending) 147 events |= Net.POLLOUT; 148 if (events != 0) 149 port.startPoll(fdVal, events); 150 } 151 152 // register events for outstanding I/O operations 153 private void lockAndUpdateEvents() { 154 synchronized (updateLock) { 155 updateEvents(); 156 } 157 } 158 159 // invoke to finish read and/or write operations 160 private void finish(boolean mayInvokeDirect, 161 boolean readable, 162 boolean writable) 163 { 164 boolean finishRead = false; 165 boolean finishWrite = false; 166 boolean finishConnect = false; 167 168 // map event to pending result 169 synchronized (updateLock) { 170 if (readable && this.readPending) { 171 this.readPending = false; 172 finishRead = true; 173 } 174 if (writable) { 175 if (this.writePending) { 176 this.writePending = false; 177 finishWrite = true; 178 } else if (this.connectPending) { 179 this.connectPending = false; 180 finishConnect = true; 181 } 182 } 183 } 184 185 // complete the I/O operation. Special case for when channel is 186 // ready for both reading and writing. In that case, submit task to 187 // complete write if write operation has a completion handler. 188 if (finishRead) { 189 if (finishWrite) 190 finishWrite(false); 191 finishRead(mayInvokeDirect); 192 return; 193 } 194 if (finishWrite) { 195 finishWrite(mayInvokeDirect); 196 } 197 if (finishConnect) { 198 finishConnect(mayInvokeDirect); 199 } 200 } 201 202 /** 203 * Invoked by event handler thread when file descriptor is polled 204 */ 205 @Override 206 public void onEvent(int events, boolean mayInvokeDirect) { 207 boolean readable = (events & Net.POLLIN) > 0; 208 boolean writable = (events & Net.POLLOUT) > 0; 209 if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) { 210 readable = true; 211 writable = true; 212 } 213 finish(mayInvokeDirect, readable, writable); 214 } 215 216 @Override 217 void implClose() throws IOException { 218 // remove the mapping 219 port.unregister(fdVal); 220 221 // close file descriptor 222 nd.close(fd); 223 224 // All outstanding I/O operations are required to fail 225 finish(false, true, true); 226 } 227 228 @Override 229 public void onCancel(PendingFuture<?,?> task) { 230 if (task.getContext() == OpType.CONNECT) 231 killConnect(); 232 if (task.getContext() == OpType.READ) 233 killReading(); 234 if (task.getContext() == OpType.WRITE) 235 killWriting(); 236 } 237 238 // -- connect -- 239 240 private void setConnected() throws IOException { 241 synchronized (stateLock) { 242 state = ST_CONNECTED; 243 localAddress = Net.localAddress(fd); 244 remoteAddress = (InetSocketAddress)pendingRemote; 245 } 246 } 247 248 private void finishConnect(boolean mayInvokeDirect) { 249 Throwable e = null; 250 try { 251 begin(); 252 checkConnect(fdVal); 253 setConnected(); 254 } catch (Throwable x) { 255 if (x instanceof ClosedChannelException) 256 x = new AsynchronousCloseException(); 257 e = x; 258 } finally { 259 end(); 260 } 261 if (e != null) { 262 // close channel if connection cannot be established 263 try { 264 close(); 265 } catch (Throwable suppressed) { 266 e.addSuppressed(suppressed); 267 } 268 } 269 270 // invoke handler and set result 271 CompletionHandler<Void,Object> handler = connectHandler; 272 Object att = connectAttachment; 273 PendingFuture<Void,Object> future = connectFuture; 274 if (handler == null) { 275 future.setResult(null, e); 276 } else { 277 if (mayInvokeDirect) { 278 Invoker.invokeUnchecked(handler, att, null, e); 279 } else { 280 Invoker.invokeIndirectly(this, handler, att, null, e); 281 } 282 } 283 } 284 285 @Override 286 @SuppressWarnings("unchecked") 287 <A> Future<Void> implConnect(SocketAddress remote, 288 A attachment, 289 CompletionHandler<Void,? super A> handler) 290 { 291 if (!isOpen()) { 292 Throwable e = new ClosedChannelException(); 293 if (handler == null) { 294 return CompletedFuture.withFailure(e); 295 } else { 296 Invoker.invoke(this, handler, attachment, null, e); 297 return null; 298 } 299 } 300 301 InetSocketAddress isa = Net.checkAddress(remote); 302 303 // permission check 304 SecurityManager sm = System.getSecurityManager(); 305 if (sm != null) 306 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 307 308 // check and set state 309 boolean notifyBeforeTcpConnect; 310 synchronized (stateLock) { 311 if (state == ST_CONNECTED) 312 throw new AlreadyConnectedException(); 313 if (state == ST_PENDING) 314 throw new ConnectionPendingException(); 315 state = ST_PENDING; 316 pendingRemote = remote; 317 notifyBeforeTcpConnect = (localAddress == null); 318 } 319 320 Throwable e = null; 321 try { 322 begin(); 323 // notify hook if unbound 324 if (notifyBeforeTcpConnect) 325 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 326 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 327 if (n == IOStatus.UNAVAILABLE) { 328 // connection could not be established immediately 329 PendingFuture<Void,A> result = null; 330 synchronized (updateLock) { 331 if (handler == null) { 332 result = new PendingFuture<Void,A>(this, OpType.CONNECT); 333 this.connectFuture = (PendingFuture<Void,Object>)result; 334 } else { 335 this.connectHandler = (CompletionHandler<Void,Object>)handler; 336 this.connectAttachment = attachment; 337 } 338 this.connectPending = true; 339 updateEvents(); 340 } 341 return result; 342 } 343 setConnected(); 344 } catch (Throwable x) { 345 if (x instanceof ClosedChannelException) 346 x = new AsynchronousCloseException(); 347 e = x; 348 } finally { 349 end(); 350 } 351 352 // close channel if connect fails 353 if (e != null) { 354 try { 355 close(); 356 } catch (Throwable suppressed) { 357 e.addSuppressed(suppressed); 358 } 359 } 360 if (handler == null) { 361 return CompletedFuture.withResult(null, e); 362 } else { 363 Invoker.invoke(this, handler, attachment, null, e); 364 return null; 365 } 366 } 367 368 // -- read -- 369 370 private void finishRead(boolean mayInvokeDirect) { 371 int n = -1; 372 Throwable exc = null; 373 374 // copy fields as we can't access them after reading is re-enabled. 375 boolean scattering = isScatteringRead; 376 CompletionHandler<Number,Object> handler = readHandler; 377 Object att = readAttachment; 378 PendingFuture<Number,Object> future = readFuture; 379 Future<?> timeout = readTimer; 380 381 try { 382 begin(); 383 384 if (scattering) { 385 n = (int)IOUtil.read(fd, readBuffers, nd); 386 } else { 387 n = IOUtil.read(fd, readBuffer, -1, nd); 388 } 389 if (n == IOStatus.UNAVAILABLE) { 390 // spurious wakeup, is this possible? 391 synchronized (updateLock) { 392 readPending = true; 393 } 394 return; 395 } 396 397 // allow objects to be GC'ed. 398 this.readBuffer = null; 399 this.readBuffers = null; 400 this.readAttachment = null; 401 402 // allow another read to be initiated 403 enableReading(); 404 405 } catch (Throwable x) { 406 enableReading(); 407 if (x instanceof ClosedChannelException) 408 x = new AsynchronousCloseException(); 409 exc = x; 410 } finally { 411 // restart poll in case of concurrent write 412 if (!(exc instanceof AsynchronousCloseException)) 413 lockAndUpdateEvents(); 414 end(); 415 } 416 417 // cancel the associated timer 418 if (timeout != null) 419 timeout.cancel(false); 420 421 // create result 422 Number result = (exc != null) ? null : (scattering) ? 423 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 424 425 // invoke handler or set result 426 if (handler == null) { 427 future.setResult(result, exc); 428 } else { 429 if (mayInvokeDirect) { 430 Invoker.invokeUnchecked(handler, att, result, exc); 431 } else { 432 Invoker.invokeIndirectly(this, handler, att, result, exc); 433 } 434 } 435 } 436 437 private Runnable readTimeoutTask = new Runnable() { 438 public void run() { 439 CompletionHandler<Number,Object> handler = null; 440 Object att = null; 441 PendingFuture<Number,Object> future = null; 442 443 synchronized (updateLock) { 444 if (!readPending) 445 return; 446 readPending = false; 447 handler = readHandler; 448 att = readAttachment; 449 future = readFuture; 450 } 451 452 // kill further reading before releasing waiters 453 enableReading(true); 454 455 // invoke handler or set result 456 Exception exc = new InterruptedByTimeoutException(); 457 if (handler == null) { 458 future.setFailure(exc); 459 } else { 460 AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this; 461 Invoker.invokeIndirectly(ch, handler, att, null, exc); 462 } 463 } 464 }; 465 466 /** 467 * Initiates a read or scattering read operation 468 */ 469 @Override 470 @SuppressWarnings("unchecked") 471 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 472 ByteBuffer dst, 473 ByteBuffer[] dsts, 474 long timeout, 475 TimeUnit unit, 476 A attachment, 477 CompletionHandler<V,? super A> handler) 478 { 479 // A synchronous read is not attempted if disallowed by system property 480 // or, we are using a fixed thread pool and the completion handler may 481 // not be invoked directly (because the thread is not a pooled thread or 482 // there are too many handlers on the stack). 483 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null; 484 boolean invokeDirect = false; 485 boolean attemptRead = false; 486 if (!disableSynchronousRead) { 487 if (handler == null) { 488 attemptRead = true; 489 } else { 490 myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); 491 invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 492 // okay to attempt read with user thread pool 493 attemptRead = invokeDirect || !port.isFixedThreadPool(); 494 } 495 } 496 497 int n = IOStatus.UNAVAILABLE; 498 Throwable exc = null; 499 boolean pending = false; 500 501 try { 502 begin(); 503 504 if (attemptRead) { 505 if (isScatteringRead) { 506 n = (int)IOUtil.read(fd, dsts, nd); 507 } else { 508 n = IOUtil.read(fd, dst, -1, nd); 509 } 510 } 511 512 if (n == IOStatus.UNAVAILABLE) { 513 PendingFuture<V,A> result = null; 514 synchronized (updateLock) { 515 this.isScatteringRead = isScatteringRead; 516 this.readBuffer = dst; 517 this.readBuffers = dsts; 518 if (handler == null) { 519 this.readHandler = null; 520 result = new PendingFuture<V,A>(this, OpType.READ); 521 this.readFuture = (PendingFuture<Number,Object>)result; 522 this.readAttachment = null; 523 } else { 524 this.readHandler = (CompletionHandler<Number,Object>)handler; 525 this.readAttachment = attachment; 526 this.readFuture = null; 527 } 528 if (timeout > 0L) { 529 this.readTimer = port.schedule(readTimeoutTask, timeout, unit); 530 } 531 this.readPending = true; 532 updateEvents(); 533 } 534 pending = true; 535 return result; 536 } 537 } catch (Throwable x) { 538 if (x instanceof ClosedChannelException) 539 x = new AsynchronousCloseException(); 540 exc = x; 541 } finally { 542 if (!pending) 543 enableReading(); 544 end(); 545 } 546 547 Number result = (exc != null) ? null : (isScatteringRead) ? 548 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 549 550 // read completed immediately 551 if (handler != null) { 552 if (invokeDirect) { 553 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 554 } else { 555 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 556 } 557 return null; 558 } else { 559 return CompletedFuture.withResult((V)result, exc); 560 } 561 } 562 563 // -- write -- 564 565 private void finishWrite(boolean mayInvokeDirect) { 566 int n = -1; 567 Throwable exc = null; 568 569 // copy fields as we can't access them after reading is re-enabled. 570 boolean gathering = this.isGatheringWrite; 571 CompletionHandler<Number,Object> handler = this.writeHandler; 572 Object att = this.writeAttachment; 573 PendingFuture<Number,Object> future = this.writeFuture; 574 Future<?> timer = this.writeTimer; 575 576 try { 577 begin(); 578 579 if (gathering) { 580 n = (int)IOUtil.write(fd, writeBuffers, nd); 581 } else { 582 n = IOUtil.write(fd, writeBuffer, -1, nd); 583 } 584 if (n == IOStatus.UNAVAILABLE) { 585 // spurious wakeup, is this possible? 586 synchronized (updateLock) { 587 writePending = true; 588 } 589 return; 590 } 591 592 // allow objects to be GC'ed. 593 this.writeBuffer = null; 594 this.writeBuffers = null; 595 this.writeAttachment = null; 596 597 // allow another write to be initiated 598 enableWriting(); 599 600 } catch (Throwable x) { 601 enableWriting(); 602 if (x instanceof ClosedChannelException) 603 x = new AsynchronousCloseException(); 604 exc = x; 605 } finally { 606 // restart poll in case of concurrent write 607 if (!(exc instanceof AsynchronousCloseException)) 608 lockAndUpdateEvents(); 609 end(); 610 } 611 612 // cancel the associated timer 613 if (timer != null) 614 timer.cancel(false); 615 616 // create result 617 Number result = (exc != null) ? null : (gathering) ? 618 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 619 620 // invoke handler or set result 621 if (handler == null) { 622 future.setResult(result, exc); 623 } else { 624 if (mayInvokeDirect) { 625 Invoker.invokeUnchecked(handler, att, result, exc); 626 } else { 627 Invoker.invokeIndirectly(this, handler, att, result, exc); 628 } 629 } 630 } 631 632 private Runnable writeTimeoutTask = new Runnable() { 633 public void run() { 634 CompletionHandler<Number,Object> handler = null; 635 Object att = null; 636 PendingFuture<Number,Object> future = null; 637 638 synchronized (updateLock) { 639 if (!writePending) 640 return; 641 writePending = false; 642 handler = writeHandler; 643 att = writeAttachment; 644 future = writeFuture; 645 } 646 647 // kill further writing before releasing waiters 648 enableWriting(true); 649 650 // invoke handler or set result 651 Exception exc = new InterruptedByTimeoutException(); 652 if (handler != null) { 653 Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, 654 handler, att, null, exc); 655 } else { 656 future.setFailure(exc); 657 } 658 } 659 }; 660 661 /** 662 * Initiates a read or scattering read operation 663 */ 664 @Override 665 @SuppressWarnings("unchecked") 666 <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 667 ByteBuffer src, 668 ByteBuffer[] srcs, 669 long timeout, 670 TimeUnit unit, 671 A attachment, 672 CompletionHandler<V,? super A> handler) 673 { 674 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 675 Invoker.getGroupAndInvokeCount(); 676 boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 677 boolean attemptWrite = (handler == null) || invokeDirect || 678 !port.isFixedThreadPool(); // okay to attempt write with user thread pool 679 680 int n = IOStatus.UNAVAILABLE; 681 Throwable exc = null; 682 boolean pending = false; 683 684 try { 685 begin(); 686 687 if (attemptWrite) { 688 if (isGatheringWrite) { 689 n = (int)IOUtil.write(fd, srcs, nd); 690 } else { 691 n = IOUtil.write(fd, src, -1, nd); 692 } 693 } 694 695 if (n == IOStatus.UNAVAILABLE) { 696 PendingFuture<V,A> result = null; 697 synchronized (updateLock) { 698 this.isGatheringWrite = isGatheringWrite; 699 this.writeBuffer = src; 700 this.writeBuffers = srcs; 701 if (handler == null) { 702 this.writeHandler = null; 703 result = new PendingFuture<V,A>(this, OpType.WRITE); 704 this.writeFuture = (PendingFuture<Number,Object>)result; 705 this.writeAttachment = null; 706 } else { 707 this.writeHandler = (CompletionHandler<Number,Object>)handler; 708 this.writeAttachment = attachment; 709 this.writeFuture = null; 710 } 711 if (timeout > 0L) { 712 this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit); 713 } 714 this.writePending = true; 715 updateEvents(); 716 } 717 pending = true; 718 return result; 719 } 720 } catch (Throwable x) { 721 if (x instanceof ClosedChannelException) 722 x = new AsynchronousCloseException(); 723 exc = x; 724 } finally { 725 if (!pending) 726 enableWriting(); 727 end(); 728 } 729 730 Number result = (exc != null) ? null : (isGatheringWrite) ? 731 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 732 733 // write completed immediately 734 if (handler != null) { 735 if (invokeDirect) { 736 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 737 } else { 738 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 739 } 740 return null; 741 } else { 742 return CompletedFuture.withResult((V)result, exc); 743 } 744 } 745 746 // -- Native methods -- 747 748 private static native void checkConnect(int fdVal) throws IOException; 749 750 static { 751 IOUtil.load(); 752 } 753 }