1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.ByteBuffer; 30 import java.net.*; 31 import java.util.concurrent.*; 32 import java.io.IOException; 33 import java.io.FileDescriptor; 34 import sun.net.NetHooks; 35 import sun.security.action.GetPropertyAction; 36 37 /** 38 * Unix implementation of AsynchronousSocketChannel 39 */ 40 41 class UnixAsynchronousSocketChannelImpl 42 extends AsynchronousSocketChannelImpl implements Port.PollableChannel 43 { 44 private static final NativeDispatcher nd = new SocketDispatcher(); 45 private static enum OpType { CONNECT, READ, WRITE }; 46 47 private static final boolean disableSynchronousRead; 48 static { 49 String propValue = GetPropertyAction 50 .getProperty("sun.nio.ch.disableSynchronousRead", "false"); 51 disableSynchronousRead = (propValue.length() == 0) ? 52 true : Boolean.valueOf(propValue); 53 } 54 55 private final Port port; 56 private final int fdVal; 57 58 // used to ensure that the context for I/O operations that complete 59 // ascynrhonously is visible to the pooled threads handling I/O events. 60 private final Object updateLock = new Object(); 61 62 // pending connect (updateLock) 63 private boolean connectPending; 64 private CompletionHandler<Void,Object> connectHandler; 65 private Object connectAttachment; 66 private PendingFuture<Void,Object> connectFuture; 67 68 // pending remote address (stateLock) 69 private SocketAddress pendingRemote; 70 71 // pending read (updateLock) 72 private boolean readPending; 73 private boolean isScatteringRead; 74 private ByteBuffer readBuffer; 75 private ByteBuffer[] readBuffers; 76 private CompletionHandler<Number,Object> readHandler; 77 private Object readAttachment; 78 private PendingFuture<Number,Object> readFuture; 79 private Future<?> readTimer; 80 81 // pending write (updateLock) 82 private boolean writePending; 83 private boolean isGatheringWrite; 84 private ByteBuffer writeBuffer; 85 private ByteBuffer[] writeBuffers; 86 private CompletionHandler<Number,Object> writeHandler; 87 private Object writeAttachment; 88 private PendingFuture<Number,Object> writeFuture; 89 private Future<?> writeTimer; 90 91 92 UnixAsynchronousSocketChannelImpl(Port port) 93 throws IOException 94 { 95 super(port); 96 97 // set non-blocking 98 try { 99 IOUtil.configureBlocking(fd, false); 100 } catch (IOException x) { 101 nd.close(fd); 102 throw x; 103 } 104 105 this.port = port; 106 this.fdVal = IOUtil.fdVal(fd); 107 108 // add mapping from file descriptor to this channel 109 port.register(fdVal, this); 110 } 111 112 // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl 113 UnixAsynchronousSocketChannelImpl(Port port, 114 FileDescriptor fd, 115 InetSocketAddress remote) 116 throws IOException 117 { 118 super(port, fd, remote); 119 120 this.fdVal = IOUtil.fdVal(fd); 121 IOUtil.configureBlocking(fd, false); 122 123 try { 124 port.register(fdVal, this); 125 } catch (ShutdownChannelGroupException x) { 126 // ShutdownChannelGroupException thrown if we attempt to register a 127 // new channel after the group is shutdown 128 throw new IOException(x); 129 } 130 131 this.port = port; 132 } 133 134 @Override 135 public AsynchronousChannelGroupImpl group() { 136 return port; 137 } 138 139 // register events for outstanding I/O operations, caller already owns updateLock 140 private void updateEvents() { 141 assert Thread.holdsLock(updateLock); 142 int events = 0; 143 if (readPending) 144 events |= Net.POLLIN; 145 if (connectPending || writePending) 146 events |= Net.POLLOUT; 147 if (events != 0) 148 port.startPoll(fdVal, events); 149 } 150 151 // register events for outstanding I/O operations 152 private void lockAndUpdateEvents() { 153 synchronized (updateLock) { 154 updateEvents(); 155 } 156 } 157 158 // invoke to finish read and/or write operations 159 private void finish(boolean mayInvokeDirect, 160 boolean readable, 161 boolean writable) 162 { 163 boolean finishRead = false; 164 boolean finishWrite = false; 165 boolean finishConnect = false; 166 167 // map event to pending result 168 synchronized (updateLock) { 169 if (readable && this.readPending) { 170 this.readPending = false; 171 finishRead = true; 172 } 173 if (writable) { 174 if (this.writePending) { 175 this.writePending = false; 176 finishWrite = true; 177 } else if (this.connectPending) { 178 this.connectPending = false; 179 finishConnect = true; 180 } 181 } 182 } 183 184 // complete the I/O operation. Special case for when channel is 185 // ready for both reading and writing. In that case, submit task to 186 // complete write if write operation has a completion handler. 187 if (finishRead) { 188 if (finishWrite) 189 finishWrite(false); 190 finishRead(mayInvokeDirect); 191 return; 192 } 193 if (finishWrite) { 194 finishWrite(mayInvokeDirect); 195 } 196 if (finishConnect) { 197 finishConnect(mayInvokeDirect); 198 } 199 } 200 201 /** 202 * Invoked by event handler thread when file descriptor is polled 203 */ 204 @Override 205 public void onEvent(int events, boolean mayInvokeDirect) { 206 boolean readable = (events & Net.POLLIN) > 0; 207 boolean writable = (events & Net.POLLOUT) > 0; 208 if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) { 209 readable = true; 210 writable = true; 211 } 212 finish(mayInvokeDirect, readable, writable); 213 } 214 215 @Override 216 void implClose() throws IOException { 217 // remove the mapping 218 port.unregister(fdVal); 219 220 // close file descriptor 221 nd.close(fd); 222 223 // All outstanding I/O operations are required to fail 224 finish(false, true, true); 225 } 226 227 @Override 228 public void onCancel(PendingFuture<?,?> task) { 229 if (task.getContext() == OpType.CONNECT) 230 killConnect(); 231 if (task.getContext() == OpType.READ) 232 killReading(); 233 if (task.getContext() == OpType.WRITE) 234 killWriting(); 235 } 236 237 // -- connect -- 238 239 private void setConnected() throws IOException { 240 synchronized (stateLock) { 241 state = ST_CONNECTED; 242 localAddress = Net.localAddress(fd); 243 remoteAddress = (InetSocketAddress)pendingRemote; 244 } 245 } 246 247 private void finishConnect(boolean mayInvokeDirect) { 248 Throwable e = null; 249 try { 250 begin(); 251 checkConnect(fdVal); 252 setConnected(); 253 } catch (Throwable x) { 254 if (x instanceof ClosedChannelException) 255 x = new AsynchronousCloseException(); 256 e = x; 257 } finally { 258 end(); 259 } 260 if (e != null) { 261 // close channel if connection cannot be established 262 try { 263 close(); 264 } catch (Throwable suppressed) { 265 e.addSuppressed(suppressed); 266 } 267 } 268 269 // invoke handler and set result 270 CompletionHandler<Void,Object> handler = connectHandler; 271 Object att = connectAttachment; 272 PendingFuture<Void,Object> future = connectFuture; 273 if (handler == null) { 274 future.setResult(null, e); 275 } else { 276 if (mayInvokeDirect) { 277 Invoker.invokeUnchecked(handler, att, null, e); 278 } else { 279 Invoker.invokeIndirectly(this, handler, att, null, e); 280 } 281 } 282 } 283 284 @Override 285 @SuppressWarnings("unchecked") 286 <A> Future<Void> implConnect(SocketAddress remote, 287 A attachment, 288 CompletionHandler<Void,? super A> handler) 289 { 290 if (!isOpen()) { 291 Throwable e = new ClosedChannelException(); 292 if (handler == null) { 293 return CompletedFuture.withFailure(e); 294 } else { 295 Invoker.invoke(this, handler, attachment, null, e); 296 return null; 297 } 298 } 299 300 InetSocketAddress isa = Net.checkAddress(remote); 301 302 // permission check 303 SecurityManager sm = System.getSecurityManager(); 304 if (sm != null) 305 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 306 307 // check and set state 308 boolean notifyBeforeTcpConnect; 309 synchronized (stateLock) { 310 if (state == ST_CONNECTED) 311 throw new AlreadyConnectedException(); 312 if (state == ST_PENDING) 313 throw new ConnectionPendingException(); 314 state = ST_PENDING; 315 pendingRemote = remote; 316 notifyBeforeTcpConnect = (localAddress == null); 317 } 318 319 Throwable e = null; 320 try { 321 begin(); 322 // notify hook if unbound 323 if (notifyBeforeTcpConnect) 324 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 325 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 326 if (n == IOStatus.UNAVAILABLE) { 327 // connection could not be established immediately 328 PendingFuture<Void,A> result = null; 329 synchronized (updateLock) { 330 if (handler == null) { 331 result = new PendingFuture<Void,A>(this, OpType.CONNECT); 332 this.connectFuture = (PendingFuture<Void,Object>)result; 333 } else { 334 this.connectHandler = (CompletionHandler<Void,Object>)handler; 335 this.connectAttachment = attachment; 336 } 337 this.connectPending = true; 338 updateEvents(); 339 } 340 return result; 341 } 342 setConnected(); 343 } catch (Throwable x) { 344 if (x instanceof ClosedChannelException) 345 x = new AsynchronousCloseException(); 346 e = x; 347 } finally { 348 end(); 349 } 350 351 // close channel if connect fails 352 if (e != null) { 353 try { 354 close(); 355 } catch (Throwable suppressed) { 356 e.addSuppressed(suppressed); 357 } 358 } 359 if (handler == null) { 360 return CompletedFuture.withResult(null, e); 361 } else { 362 Invoker.invoke(this, handler, attachment, null, e); 363 return null; 364 } 365 } 366 367 // -- read -- 368 369 private void finishRead(boolean mayInvokeDirect) { 370 int n = -1; 371 Throwable exc = null; 372 373 // copy fields as we can't access them after reading is re-enabled. 374 boolean scattering = isScatteringRead; 375 CompletionHandler<Number,Object> handler = readHandler; 376 Object att = readAttachment; 377 PendingFuture<Number,Object> future = readFuture; 378 Future<?> timeout = readTimer; 379 380 try { 381 begin(); 382 383 if (scattering) { 384 n = (int)IOUtil.read(fd, readBuffers, nd); 385 } else { 386 n = IOUtil.read(fd, readBuffer, -1, nd); 387 } 388 if (n == IOStatus.UNAVAILABLE) { 389 // spurious wakeup, is this possible? 390 synchronized (updateLock) { 391 readPending = true; 392 } 393 return; 394 } 395 396 // allow objects to be GC'ed. 397 this.readBuffer = null; 398 this.readBuffers = null; 399 this.readAttachment = null; 400 401 // allow another read to be initiated 402 enableReading(); 403 404 } catch (Throwable x) { 405 enableReading(); 406 if (x instanceof ClosedChannelException) 407 x = new AsynchronousCloseException(); 408 exc = x; 409 } finally { 410 // restart poll in case of concurrent write 411 if (!(exc instanceof AsynchronousCloseException)) 412 lockAndUpdateEvents(); 413 end(); 414 } 415 416 // cancel the associated timer 417 if (timeout != null) 418 timeout.cancel(false); 419 420 // create result 421 Number result = (exc != null) ? null : (scattering) ? 422 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 423 424 // invoke handler or set result 425 if (handler == null) { 426 future.setResult(result, exc); 427 } else { 428 if (mayInvokeDirect) { 429 Invoker.invokeUnchecked(handler, att, result, exc); 430 } else { 431 Invoker.invokeIndirectly(this, handler, att, result, exc); 432 } 433 } 434 } 435 436 private Runnable readTimeoutTask = new Runnable() { 437 public void run() { 438 CompletionHandler<Number,Object> handler = null; 439 Object att = null; 440 PendingFuture<Number,Object> future = null; 441 442 synchronized (updateLock) { 443 if (!readPending) 444 return; 445 readPending = false; 446 handler = readHandler; 447 att = readAttachment; 448 future = readFuture; 449 } 450 451 // kill further reading before releasing waiters 452 enableReading(true); 453 454 // invoke handler or set result 455 Exception exc = new InterruptedByTimeoutException(); 456 if (handler == null) { 457 future.setFailure(exc); 458 } else { 459 AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this; 460 Invoker.invokeIndirectly(ch, handler, att, null, exc); 461 } 462 } 463 }; 464 465 /** 466 * Initiates a read or scattering read operation 467 */ 468 @Override 469 @SuppressWarnings("unchecked") 470 <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 471 ByteBuffer dst, 472 ByteBuffer[] dsts, 473 long timeout, 474 TimeUnit unit, 475 A attachment, 476 CompletionHandler<V,? super A> handler) 477 { 478 // A synchronous read is not attempted if disallowed by system property 479 // or, we are using a fixed thread pool and the completion handler may 480 // not be invoked directly (because the thread is not a pooled thread or 481 // there are too many handlers on the stack). 482 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null; 483 boolean invokeDirect = false; 484 boolean attemptRead = false; 485 if (!disableSynchronousRead) { 486 if (handler == null) { 487 attemptRead = true; 488 } else { 489 myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount(); 490 invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 491 // okay to attempt read with user thread pool 492 attemptRead = invokeDirect || !port.isFixedThreadPool(); 493 } 494 } 495 496 int n = IOStatus.UNAVAILABLE; 497 Throwable exc = null; 498 boolean pending = false; 499 500 try { 501 begin(); 502 503 if (attemptRead) { 504 if (isScatteringRead) { 505 n = (int)IOUtil.read(fd, dsts, nd); 506 } else { 507 n = IOUtil.read(fd, dst, -1, nd); 508 } 509 } 510 511 if (n == IOStatus.UNAVAILABLE) { 512 PendingFuture<V,A> result = null; 513 synchronized (updateLock) { 514 this.isScatteringRead = isScatteringRead; 515 this.readBuffer = dst; 516 this.readBuffers = dsts; 517 if (handler == null) { 518 this.readHandler = null; 519 result = new PendingFuture<V,A>(this, OpType.READ); 520 this.readFuture = (PendingFuture<Number,Object>)result; 521 this.readAttachment = null; 522 } else { 523 this.readHandler = (CompletionHandler<Number,Object>)handler; 524 this.readAttachment = attachment; 525 this.readFuture = null; 526 } 527 if (timeout > 0L) { 528 this.readTimer = port.schedule(readTimeoutTask, timeout, unit); 529 } 530 this.readPending = true; 531 updateEvents(); 532 } 533 pending = true; 534 return result; 535 } 536 } catch (Throwable x) { 537 if (x instanceof ClosedChannelException) 538 x = new AsynchronousCloseException(); 539 exc = x; 540 } finally { 541 if (!pending) 542 enableReading(); 543 end(); 544 } 545 546 Number result = (exc != null) ? null : (isScatteringRead) ? 547 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 548 549 // read completed immediately 550 if (handler != null) { 551 if (invokeDirect) { 552 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 553 } else { 554 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 555 } 556 return null; 557 } else { 558 return CompletedFuture.withResult((V)result, exc); 559 } 560 } 561 562 // -- write -- 563 564 private void finishWrite(boolean mayInvokeDirect) { 565 int n = -1; 566 Throwable exc = null; 567 568 // copy fields as we can't access them after reading is re-enabled. 569 boolean gathering = this.isGatheringWrite; 570 CompletionHandler<Number,Object> handler = this.writeHandler; 571 Object att = this.writeAttachment; 572 PendingFuture<Number,Object> future = this.writeFuture; 573 Future<?> timer = this.writeTimer; 574 575 try { 576 begin(); 577 578 if (gathering) { 579 n = (int)IOUtil.write(fd, writeBuffers, nd); 580 } else { 581 n = IOUtil.write(fd, writeBuffer, -1, nd); 582 } 583 if (n == IOStatus.UNAVAILABLE) { 584 // spurious wakeup, is this possible? 585 synchronized (updateLock) { 586 writePending = true; 587 } 588 return; 589 } 590 591 // allow objects to be GC'ed. 592 this.writeBuffer = null; 593 this.writeBuffers = null; 594 this.writeAttachment = null; 595 596 // allow another write to be initiated 597 enableWriting(); 598 599 } catch (Throwable x) { 600 enableWriting(); 601 if (x instanceof ClosedChannelException) 602 x = new AsynchronousCloseException(); 603 exc = x; 604 } finally { 605 // restart poll in case of concurrent write 606 if (!(exc instanceof AsynchronousCloseException)) 607 lockAndUpdateEvents(); 608 end(); 609 } 610 611 // cancel the associated timer 612 if (timer != null) 613 timer.cancel(false); 614 615 // create result 616 Number result = (exc != null) ? null : (gathering) ? 617 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 618 619 // invoke handler or set result 620 if (handler == null) { 621 future.setResult(result, exc); 622 } else { 623 if (mayInvokeDirect) { 624 Invoker.invokeUnchecked(handler, att, result, exc); 625 } else { 626 Invoker.invokeIndirectly(this, handler, att, result, exc); 627 } 628 } 629 } 630 631 private Runnable writeTimeoutTask = new Runnable() { 632 public void run() { 633 CompletionHandler<Number,Object> handler = null; 634 Object att = null; 635 PendingFuture<Number,Object> future = null; 636 637 synchronized (updateLock) { 638 if (!writePending) 639 return; 640 writePending = false; 641 handler = writeHandler; 642 att = writeAttachment; 643 future = writeFuture; 644 } 645 646 // kill further writing before releasing waiters 647 enableWriting(true); 648 649 // invoke handler or set result 650 Exception exc = new InterruptedByTimeoutException(); 651 if (handler != null) { 652 Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, 653 handler, att, null, exc); 654 } else { 655 future.setFailure(exc); 656 } 657 } 658 }; 659 660 /** 661 * Initiates a read or scattering read operation 662 */ 663 @Override 664 @SuppressWarnings("unchecked") 665 <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 666 ByteBuffer src, 667 ByteBuffer[] srcs, 668 long timeout, 669 TimeUnit unit, 670 A attachment, 671 CompletionHandler<V,? super A> handler) 672 { 673 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 674 Invoker.getGroupAndInvokeCount(); 675 boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port); 676 boolean attemptWrite = (handler == null) || invokeDirect || 677 !port.isFixedThreadPool(); // okay to attempt write with user thread pool 678 679 int n = IOStatus.UNAVAILABLE; 680 Throwable exc = null; 681 boolean pending = false; 682 683 try { 684 begin(); 685 686 if (attemptWrite) { 687 if (isGatheringWrite) { 688 n = (int)IOUtil.write(fd, srcs, nd); 689 } else { 690 n = IOUtil.write(fd, src, -1, nd); 691 } 692 } 693 694 if (n == IOStatus.UNAVAILABLE) { 695 PendingFuture<V,A> result = null; 696 synchronized (updateLock) { 697 this.isGatheringWrite = isGatheringWrite; 698 this.writeBuffer = src; 699 this.writeBuffers = srcs; 700 if (handler == null) { 701 this.writeHandler = null; 702 result = new PendingFuture<V,A>(this, OpType.WRITE); 703 this.writeFuture = (PendingFuture<Number,Object>)result; 704 this.writeAttachment = null; 705 } else { 706 this.writeHandler = (CompletionHandler<Number,Object>)handler; 707 this.writeAttachment = attachment; 708 this.writeFuture = null; 709 } 710 if (timeout > 0L) { 711 this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit); 712 } 713 this.writePending = true; 714 updateEvents(); 715 } 716 pending = true; 717 return result; 718 } 719 } catch (Throwable x) { 720 if (x instanceof ClosedChannelException) 721 x = new AsynchronousCloseException(); 722 exc = x; 723 } finally { 724 if (!pending) 725 enableWriting(); 726 end(); 727 } 728 729 Number result = (exc != null) ? null : (isGatheringWrite) ? 730 (Number)Long.valueOf(n) : (Number)Integer.valueOf(n); 731 732 // write completed immediately 733 if (handler != null) { 734 if (invokeDirect) { 735 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc); 736 } else { 737 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc); 738 } 739 return null; 740 } else { 741 return CompletedFuture.withResult((V)result, exc); 742 } 743 } 744 745 // -- Native methods -- 746 747 private static native void checkConnect(int fdVal) throws IOException; 748 749 static { 750 IOUtil.load(); 751 } 752 }