1 /* 2 * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.ByteBuffer; 29 import java.nio.channels.*; 30 import java.net.SocketOption; 31 import java.net.StandardSocketOption; 32 import java.net.SocketAddress; 33 import java.net.InetSocketAddress; 34 import java.io.IOException; 35 import java.io.FileDescriptor; 36 import java.util.Set; 37 import java.util.HashSet; 38 import java.util.Collections; 39 import java.util.concurrent.*; 40 import java.util.concurrent.locks.*; 41 import sun.net.NetHooks; 42 43 /** 44 * Base implementation of AsynchronousSocketChannel 45 */ 46 47 abstract class AsynchronousSocketChannelImpl 48 extends AsynchronousSocketChannel 49 implements Cancellable, Groupable 50 { 51 protected final FileDescriptor fd; 52 53 // protects state, localAddress, and remoteAddress 54 protected final Object stateLock = new Object(); 55 56 protected volatile SocketAddress localAddress = null; 57 protected volatile SocketAddress remoteAddress = null; 58 59 // State, increases monotonically 60 static final int ST_UNINITIALIZED = -1; 61 static final int ST_UNCONNECTED = 0; 62 static final int ST_PENDING = 1; 63 static final int ST_CONNECTED = 2; 64 protected volatile int state = ST_UNINITIALIZED; 65 66 // reading state 67 private final Object readLock = new Object(); 68 private boolean reading; 69 private boolean readShutdown; 70 private boolean readKilled; // further reading disallowed due to timeout 71 72 // writing state 73 private final Object writeLock = new Object(); 74 private boolean writing; 75 private boolean writeShutdown; 76 private boolean writeKilled; // further writing disallowed due to timeout 77 78 // close support 79 private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); 80 private volatile boolean open = true; 81 82 AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group) 83 throws IOException 84 { 85 super(group.provider()); 86 this.fd = Net.socket(true); 87 this.state = ST_UNCONNECTED; 88 } 89 90 // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl 91 AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group, 92 FileDescriptor fd, 93 InetSocketAddress remote) 94 throws IOException 95 { 96 super(group.provider()); 97 this.fd = fd; 98 this.state = ST_CONNECTED; 99 this.localAddress = Net.localAddress(fd); 100 this.remoteAddress = remote; 101 } 102 103 @Override 104 public final boolean isOpen() { 105 return open; 106 } 107 108 /** 109 * Marks beginning of access to file descriptor/handle 110 */ 111 final void begin() throws IOException { 112 closeLock.readLock().lock(); 113 if (!isOpen()) 114 throw new ClosedChannelException(); 115 } 116 117 /** 118 * Marks end of access to file descriptor/handle 119 */ 120 final void end() { 121 closeLock.readLock().unlock(); 122 } 123 124 /** 125 * Invoked to close socket and release other resources. 126 */ 127 abstract void implClose() throws IOException; 128 129 @Override 130 public final void close() throws IOException { 131 // synchronize with any threads initiating asynchronous operations 132 closeLock.writeLock().lock(); 133 try { 134 if (!open) 135 return; // already closed 136 open = false; 137 } finally { 138 closeLock.writeLock().unlock(); 139 } 140 implClose(); 141 } 142 143 final void enableReading(boolean killed) { 144 synchronized (readLock) { 145 reading = false; 146 if (killed) 147 readKilled = true; 148 } 149 } 150 151 final void enableReading() { 152 enableReading(false); 153 } 154 155 final void enableWriting(boolean killed) { 156 synchronized (writeLock) { 157 writing = false; 158 if (killed) 159 writeKilled = true; 160 } 161 } 162 163 final void enableWriting() { 164 enableWriting(false); 165 } 166 167 final void killReading() { 168 synchronized (readLock) { 169 readKilled = true; 170 } 171 } 172 173 final void killWriting() { 174 synchronized (writeLock) { 175 writeKilled = true; 176 } 177 } 178 179 final void killConnect() { 180 // when a connect is cancelled then the connection may have been 181 // established so prevent reading or writing. 182 killReading(); 183 killWriting(); 184 } 185 186 /** 187 * Invoked by connect to initiate the connect operation. 188 */ 189 abstract <A> Future<Void> implConnect(SocketAddress remote, 190 A attachment, 191 CompletionHandler<Void,? super A> handler); 192 193 @Override 194 public final Future<Void> connect(SocketAddress remote) { 195 return implConnect(remote, null, null); 196 } 197 198 @Override 199 public final <A> void connect(SocketAddress remote, 200 A attachment, 201 CompletionHandler<Void,? super A> handler) 202 { 203 if (handler == null) 204 throw new NullPointerException("'handler' is null"); 205 implConnect(remote, attachment, handler); 206 } 207 208 /** 209 * Invoked by read to initiate the I/O operation. 210 */ 211 abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 212 ByteBuffer dst, 213 ByteBuffer[] dsts, 214 long timeout, 215 TimeUnit unit, 216 A attachment, 217 CompletionHandler<V,? super A> handler); 218 219 @SuppressWarnings("unchecked") 220 private <V extends Number,A> Future<V> read(boolean isScatteringRead, 221 ByteBuffer dst, 222 ByteBuffer[] dsts, 223 long timeout, 224 TimeUnit unit, 225 A att, 226 CompletionHandler<V,? super A> handler) 227 { 228 if (!isOpen()) { 229 Throwable e = new ClosedChannelException(); 230 if (handler == null) 231 return CompletedFuture.withFailure(e); 232 Invoker.invoke(this, handler, att, null, e); 233 return null; 234 } 235 236 if (remoteAddress == null) 237 throw new NotYetConnectedException(); 238 239 boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining(); 240 boolean shutdown = false; 241 242 // check and update state 243 synchronized (readLock) { 244 if (readKilled) 245 throw new IllegalStateException("Reading not allowed due to timeout or cancellation"); 246 if (reading) 247 throw new ReadPendingException(); 248 if (readShutdown) { 249 shutdown = true; 250 } else { 251 if (hasSpaceToRead) { 252 reading = true; 253 } 254 } 255 } 256 257 // immediately complete with -1 if shutdown for read 258 // immediately complete with 0 if no space remaining 259 if (shutdown || !hasSpaceToRead) { 260 Number result; 261 if (isScatteringRead) { 262 result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L); 263 } else { 264 result = (shutdown) ? -1 : 0; 265 } 266 if (handler == null) 267 return CompletedFuture.withResult((V)result); 268 Invoker.invoke(this, handler, att, (V)result, null); 269 return null; 270 } 271 272 return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler); 273 } 274 275 @Override 276 public final Future<Integer> read(ByteBuffer dst) { 277 if (dst.isReadOnly()) 278 throw new IllegalArgumentException("Read-only buffer"); 279 return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null); 280 } 281 282 @Override 283 public final <A> void read(ByteBuffer dst, 284 long timeout, 285 TimeUnit unit, 286 A attachment, 287 CompletionHandler<Integer,? super A> handler) 288 { 289 if (handler == null) 290 throw new NullPointerException("'handler' is null"); 291 if (dst.isReadOnly()) 292 throw new IllegalArgumentException("Read-only buffer"); 293 read(false, dst, null, timeout, unit, attachment, handler); 294 } 295 296 @Override 297 public final <A> void read(ByteBuffer[] dsts, 298 int offset, 299 int length, 300 long timeout, 301 TimeUnit unit, 302 A attachment, 303 CompletionHandler<Long,? super A> handler) 304 { 305 if (handler == null) 306 throw new NullPointerException("'handler' is null"); 307 if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) 308 throw new IndexOutOfBoundsException(); 309 ByteBuffer[] bufs = Util.subsequence(dsts, offset, length); 310 for (int i=0; i<bufs.length; i++) { 311 if (bufs[i].isReadOnly()) 312 throw new IllegalArgumentException("Read-only buffer"); 313 } 314 read(true, null, bufs, timeout, unit, attachment, handler); 315 } 316 317 /** 318 * Invoked by write to initiate the I/O operation. 319 */ 320 abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 321 ByteBuffer src, 322 ByteBuffer[] srcs, 323 long timeout, 324 TimeUnit unit, 325 A attachment, 326 CompletionHandler<V,? super A> handler); 327 328 @SuppressWarnings("unchecked") 329 private <V extends Number,A> Future<V> write(boolean isGatheringWrite, 330 ByteBuffer src, 331 ByteBuffer[] srcs, 332 long timeout, 333 TimeUnit unit, 334 A att, 335 CompletionHandler<V,? super A> handler) 336 { 337 boolean hasDataToWrite = isGatheringWrite || src.hasRemaining(); 338 339 boolean closed = false; 340 if (isOpen()) { 341 if (remoteAddress == null) 342 throw new NotYetConnectedException(); 343 // check and update state 344 synchronized (writeLock) { 345 if (writeKilled) 346 throw new IllegalStateException("Writing not allowed due to timeout or cancellation"); 347 if (writing) 348 throw new WritePendingException(); 349 if (writeShutdown) { 350 closed = true; 351 } else { 352 if (hasDataToWrite) 353 writing = true; 354 } 355 } 356 } else { 357 closed = true; 358 } 359 360 // channel is closed or shutdown for write 361 if (closed) { 362 Throwable e = new ClosedChannelException(); 363 if (handler == null) 364 return CompletedFuture.withFailure(e); 365 Invoker.invoke(this, handler, att, null, e); 366 return null; 367 } 368 369 // nothing to write so complete immediately 370 if (!hasDataToWrite) { 371 Number result = (isGatheringWrite) ? (Number)0L : (Number)0; 372 if (handler == null) 373 return CompletedFuture.withResult((V)result); 374 Invoker.invoke(this, handler, att, (V)result, null); 375 return null; 376 } 377 378 return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler); 379 } 380 381 @Override 382 public final Future<Integer> write(ByteBuffer src) { 383 return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null); 384 } 385 386 @Override 387 public final <A> void write(ByteBuffer src, 388 long timeout, 389 TimeUnit unit, 390 A attachment, 391 CompletionHandler<Integer,? super A> handler) 392 { 393 if (handler == null) 394 throw new NullPointerException("'handler' is null"); 395 write(false, src, null, timeout, unit, attachment, handler); 396 } 397 398 @Override 399 public final <A> void write(ByteBuffer[] srcs, 400 int offset, 401 int length, 402 long timeout, 403 TimeUnit unit, 404 A attachment, 405 CompletionHandler<Long,? super A> handler) 406 { 407 if (handler == null) 408 throw new NullPointerException("'handler' is null"); 409 if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) 410 throw new IndexOutOfBoundsException(); 411 srcs = Util.subsequence(srcs, offset, length); 412 write(true, null, srcs, timeout, unit, attachment, handler); 413 } 414 415 @Override 416 public final AsynchronousSocketChannel bind(SocketAddress local) 417 throws IOException 418 { 419 try { 420 begin(); 421 synchronized (stateLock) { 422 if (state == ST_PENDING) 423 throw new ConnectionPendingException(); 424 if (localAddress != null) 425 throw new AlreadyBoundException(); 426 InetSocketAddress isa = (local == null) ? 427 new InetSocketAddress(0) : Net.checkAddress(local); 428 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); 429 Net.bind(fd, isa.getAddress(), isa.getPort()); 430 localAddress = Net.localAddress(fd); 431 } 432 } finally { 433 end(); 434 } 435 return this; 436 } 437 438 @Override 439 public final SocketAddress getLocalAddress() throws IOException { 440 if (!isOpen()) 441 throw new ClosedChannelException(); 442 return localAddress; 443 } 444 445 @Override 446 public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value) 447 throws IOException 448 { 449 if (name == null) 450 throw new NullPointerException(); 451 if (!supportedOptions().contains(name)) 452 throw new UnsupportedOperationException("'" + name + "' not supported"); 453 454 try { 455 begin(); 456 if (writeShutdown) 457 throw new IOException("Connection has been shutdown for writing"); 458 Net.setSocketOption(fd, Net.UNSPEC, name, value); 459 return this; 460 } finally { 461 end(); 462 } 463 } 464 465 @Override 466 @SuppressWarnings("unchecked") 467 public final <T> T getOption(SocketOption<T> name) throws IOException { 468 if (name == null) 469 throw new NullPointerException(); 470 if (!supportedOptions().contains(name)) 471 throw new UnsupportedOperationException("'" + name + "' not supported"); 472 473 try { 474 begin(); 475 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); 476 } finally { 477 end(); 478 } 479 } 480 481 private static class DefaultOptionsHolder { 482 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 483 484 private static Set<SocketOption<?>> defaultOptions() { 485 HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(5); 486 set.add(StandardSocketOption.SO_SNDBUF); 487 set.add(StandardSocketOption.SO_RCVBUF); 488 set.add(StandardSocketOption.SO_KEEPALIVE); 489 set.add(StandardSocketOption.SO_REUSEADDR); 490 set.add(StandardSocketOption.TCP_NODELAY); 491 return Collections.unmodifiableSet(set); 492 } 493 } 494 495 @Override 496 public final Set<SocketOption<?>> supportedOptions() { 497 return DefaultOptionsHolder.defaultOptions; 498 } 499 500 @Override 501 public final SocketAddress getRemoteAddress() throws IOException { 502 if (!isOpen()) 503 throw new ClosedChannelException(); 504 return remoteAddress; 505 } 506 507 @Override 508 public final AsynchronousSocketChannel shutdownInput() throws IOException { 509 try { 510 begin(); 511 if (remoteAddress == null) 512 throw new NotYetConnectedException(); 513 synchronized (readLock) { 514 if (!readShutdown) { 515 Net.shutdown(fd, Net.SHUT_RD); 516 readShutdown = true; 517 } 518 } 519 } finally { 520 end(); 521 } 522 return this; 523 } 524 525 @Override 526 public final AsynchronousSocketChannel shutdownOutput() throws IOException { 527 try { 528 begin(); 529 if (remoteAddress == null) 530 throw new NotYetConnectedException(); 531 synchronized (writeLock) { 532 if (!writeShutdown) { 533 Net.shutdown(fd, Net.SHUT_WR); 534 writeShutdown = true; 535 } 536 } 537 } finally { 538 end(); 539 } 540 return this; 541 } 542 543 @Override 544 public final String toString() { 545 StringBuilder sb = new StringBuilder(); 546 sb.append(this.getClass().getName()); 547 sb.append('['); 548 synchronized (stateLock) { 549 if (!isOpen()) { 550 sb.append("closed"); 551 } else { 552 switch (state) { 553 case ST_UNCONNECTED: 554 sb.append("unconnected"); 555 break; 556 case ST_PENDING: 557 sb.append("connection-pending"); 558 break; 559 case ST_CONNECTED: 560 sb.append("connected"); 561 if (readShutdown) 562 sb.append(" ishut"); 563 if (writeShutdown) 564 sb.append(" oshut"); 565 break; 566 } 567 if (localAddress != null) { 568 sb.append(" local="); 569 sb.append(localAddress.toString()); 570 } 571 if (remoteAddress != null) { 572 sb.append(" remote="); 573 sb.append(remoteAddress.toString()); 574 } 575 } 576 } 577 sb.append(']'); 578 return sb.toString(); 579 } 580 }