1 /*
   2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.nio.channels.*;
  30 import java.net.SocketOption;
  31 import java.net.StandardSocketOption;
  32 import java.net.SocketAddress;
  33 import java.net.InetSocketAddress;
  34 import java.io.IOException;
  35 import java.io.FileDescriptor;
  36 import java.util.Set;
  37 import java.util.HashSet;
  38 import java.util.Collections;
  39 import java.util.concurrent.*;
  40 import java.util.concurrent.locks.*;
  41 import sun.net.NetHooks;
  42 
  43 /**
  44  * Base implementation of AsynchronousSocketChannel
  45  */
  46 
  47 abstract class AsynchronousSocketChannelImpl
  48     extends AsynchronousSocketChannel
  49     implements Cancellable, Groupable
  50 {
  51     protected final FileDescriptor fd;
  52 
  53     // protects state, localAddress, and remoteAddress
  54     protected final Object stateLock = new Object();
  55 
  56     protected volatile SocketAddress localAddress = null;
  57     protected volatile SocketAddress remoteAddress = null;
  58 
  59     // State, increases monotonically
  60     static final int ST_UNINITIALIZED = -1;
  61     static final int ST_UNCONNECTED = 0;
  62     static final int ST_PENDING = 1;
  63     static final int ST_CONNECTED = 2;
  64     protected volatile int state = ST_UNINITIALIZED;
  65 
  66     // reading state
  67     private final Object readLock = new Object();
  68     private boolean reading;
  69     private boolean readShutdown;
  70     private boolean readKilled;     // further reading disallowed due to timeout
  71 
  72     // writing state
  73     private final Object writeLock = new Object();
  74     private boolean writing;
  75     private boolean writeShutdown;
  76     private boolean writeKilled;    // further writing disallowed due to timeout
  77 
  78     // close support
  79     private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
  80     private volatile boolean open = true;
  81 
  82     AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)
  83         throws IOException
  84     {
  85         super(group.provider());
  86         this.fd = Net.socket(true);
  87         this.state = ST_UNCONNECTED;
  88     }
  89 
  90     // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl
  91     AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,
  92                                   FileDescriptor fd,
  93                                   InetSocketAddress remote)
  94         throws IOException
  95     {
  96         super(group.provider());
  97         this.fd = fd;
  98         this.state = ST_CONNECTED;
  99         this.localAddress = Net.localAddress(fd);
 100         this.remoteAddress = remote;
 101     }
 102 
 103     @Override
 104     public final boolean isOpen() {
 105         return open;
 106     }
 107 
 108     /**
 109      * Marks beginning of access to file descriptor/handle
 110      */
 111     final void begin() throws IOException {
 112         closeLock.readLock().lock();
 113         if (!isOpen())
 114             throw new ClosedChannelException();
 115     }
 116 
 117     /**
 118      * Marks end of access to file descriptor/handle
 119      */
 120     final void end() {
 121         closeLock.readLock().unlock();
 122     }
 123 
 124     /**
 125      * Invoked to close socket and release other resources.
 126      */
 127     abstract void implClose() throws IOException;
 128 
 129     @Override
 130     public final void close() throws IOException {
 131         // synchronize with any threads initiating asynchronous operations
 132         closeLock.writeLock().lock();
 133         try {
 134             if (!open)
 135                 return;     // already closed
 136             open = false;
 137         } finally {
 138             closeLock.writeLock().unlock();
 139         }
 140         implClose();
 141     }
 142 
 143     final void enableReading(boolean killed) {
 144         synchronized (readLock) {
 145             reading = false;
 146             if (killed)
 147                 readKilled = true;
 148         }
 149     }
 150 
 151     final void enableReading() {
 152         enableReading(false);
 153     }
 154 
 155     final void enableWriting(boolean killed) {
 156         synchronized (writeLock) {
 157             writing = false;
 158             if (killed)
 159                 writeKilled = true;
 160         }
 161     }
 162 
 163     final void enableWriting() {
 164         enableWriting(false);
 165     }
 166 
 167     final void killReading() {
 168         synchronized (readLock) {
 169             readKilled = true;
 170         }
 171     }
 172 
 173     final void killWriting() {
 174         synchronized (writeLock) {
 175             writeKilled = true;
 176         }
 177     }
 178 
 179     final void killConnect() {
 180         // when a connect is cancelled then the connection may have been
 181         // established so prevent reading or writing.
 182         killReading();
 183         killWriting();
 184     }
 185 
 186     /**
 187      * Invoked by connect to initiate the connect operation.
 188      */
 189     abstract <A> Future<Void> implConnect(SocketAddress remote,
 190                                           A attachment,
 191                                           CompletionHandler<Void,? super A> handler);
 192 
 193     @Override
 194     public final Future<Void> connect(SocketAddress remote) {
 195         return implConnect(remote, null, null);
 196     }
 197 
 198     @Override
 199     public final <A> void connect(SocketAddress remote,
 200                                   A attachment,
 201                                   CompletionHandler<Void,? super A> handler)
 202     {
 203         if (handler == null)
 204             throw new NullPointerException("'handler' is null");
 205         implConnect(remote, attachment, handler);
 206     }
 207 
 208     /**
 209      * Invoked by read to initiate the I/O operation.
 210      */
 211     abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
 212                                                      ByteBuffer dst,
 213                                                      ByteBuffer[] dsts,
 214                                                      long timeout,
 215                                                      TimeUnit unit,
 216                                                      A attachment,
 217                                                      CompletionHandler<V,? super A> handler);
 218 
 219     @SuppressWarnings("unchecked")
 220     private <V extends Number,A> Future<V> read(boolean isScatteringRead,
 221                                                 ByteBuffer dst,
 222                                                 ByteBuffer[] dsts,
 223                                                 long timeout,
 224                                                 TimeUnit unit,
 225                                                 A att,
 226                                                 CompletionHandler<V,? super A> handler)
 227     {
 228         if (!isOpen()) {
 229             Throwable e = new ClosedChannelException();
 230             if (handler == null)
 231                 return CompletedFuture.withFailure(e);
 232             Invoker.invoke(this, handler, att, null, e);
 233             return null;
 234         }
 235 
 236         if (remoteAddress == null)
 237             throw new NotYetConnectedException();
 238 
 239         boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
 240         boolean shutdown = false;
 241 
 242         // check and update state
 243         synchronized (readLock) {
 244             if (readKilled)
 245                 throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
 246             if (reading)
 247                 throw new ReadPendingException();
 248             if (readShutdown) {
 249                 shutdown = true;
 250             } else {
 251                 if (hasSpaceToRead) {
 252                     reading = true;
 253                 }
 254             }
 255         }
 256 
 257         // immediately complete with -1 if shutdown for read
 258         // immediately complete with 0 if no space remaining
 259         if (shutdown || !hasSpaceToRead) {
 260             Number result;
 261             if (isScatteringRead) {
 262                 result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
 263             } else {
 264                 result = (shutdown) ? -1 : 0;
 265             }
 266             if (handler == null)
 267                 return CompletedFuture.withResult((V)result);
 268             Invoker.invoke(this, handler, att, (V)result, null);
 269             return null;
 270         }
 271 
 272         return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
 273     }
 274 
 275     @Override
 276     public final Future<Integer> read(ByteBuffer dst) {
 277         if (dst.isReadOnly())
 278             throw new IllegalArgumentException("Read-only buffer");
 279         return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
 280     }
 281 
 282     @Override
 283     public final <A> void read(ByteBuffer dst,
 284                                long timeout,
 285                                TimeUnit unit,
 286                                A attachment,
 287                                CompletionHandler<Integer,? super A> handler)
 288     {
 289         if (handler == null)
 290             throw new NullPointerException("'handler' is null");
 291         if (dst.isReadOnly())
 292             throw new IllegalArgumentException("Read-only buffer");
 293         read(false, dst, null, timeout, unit, attachment, handler);
 294     }
 295 
 296     @Override
 297     public final <A> void read(ByteBuffer[] dsts,
 298                                int offset,
 299                                int length,
 300                                long timeout,
 301                                TimeUnit unit,
 302                                A attachment,
 303                                CompletionHandler<Long,? super A> handler)
 304     {
 305         if (handler == null)
 306             throw new NullPointerException("'handler' is null");
 307         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 308             throw new IndexOutOfBoundsException();
 309         ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
 310         for (int i=0; i<bufs.length; i++) {
 311             if (bufs[i].isReadOnly())
 312                 throw new IllegalArgumentException("Read-only buffer");
 313         }
 314         read(true, null, bufs, timeout, unit, attachment, handler);
 315     }
 316 
 317     /**
 318      * Invoked by write to initiate the I/O operation.
 319      */
 320     abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
 321                                                       ByteBuffer src,
 322                                                       ByteBuffer[] srcs,
 323                                                       long timeout,
 324                                                       TimeUnit unit,
 325                                                       A attachment,
 326                                                       CompletionHandler<V,? super A> handler);
 327 
 328     @SuppressWarnings("unchecked")
 329     private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
 330                                                  ByteBuffer src,
 331                                                  ByteBuffer[] srcs,
 332                                                  long timeout,
 333                                                  TimeUnit unit,
 334                                                  A att,
 335                                                  CompletionHandler<V,? super A> handler)
 336     {
 337         boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
 338 
 339         boolean closed = false;
 340         if (isOpen()) {
 341             if (remoteAddress == null)
 342                 throw new NotYetConnectedException();
 343             // check and update state
 344             synchronized (writeLock) {
 345                 if (writeKilled)
 346                     throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
 347                 if (writing)
 348                     throw new WritePendingException();
 349                 if (writeShutdown) {
 350                     closed = true;
 351                 } else {
 352                     if (hasDataToWrite)
 353                         writing = true;
 354                 }
 355             }
 356         } else {
 357             closed = true;
 358         }
 359 
 360         // channel is closed or shutdown for write
 361         if (closed) {
 362             Throwable e = new ClosedChannelException();
 363             if (handler == null)
 364                 return CompletedFuture.withFailure(e);
 365             Invoker.invoke(this, handler, att, null, e);
 366             return null;
 367         }
 368 
 369         // nothing to write so complete immediately
 370         if (!hasDataToWrite) {
 371             Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
 372             if (handler == null)
 373                 return CompletedFuture.withResult((V)result);
 374             Invoker.invoke(this, handler, att, (V)result, null);
 375             return null;
 376         }
 377 
 378         return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
 379     }
 380 
 381     @Override
 382     public final Future<Integer> write(ByteBuffer src) {
 383         return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
 384     }
 385 
 386     @Override
 387     public final <A> void write(ByteBuffer src,
 388                                 long timeout,
 389                                 TimeUnit unit,
 390                                 A attachment,
 391                                 CompletionHandler<Integer,? super A> handler)
 392     {
 393         if (handler == null)
 394             throw new NullPointerException("'handler' is null");
 395         write(false, src, null, timeout, unit, attachment, handler);
 396     }
 397 
 398     @Override
 399     public final <A> void  write(ByteBuffer[] srcs,
 400                                  int offset,
 401                                  int length,
 402                                  long timeout,
 403                                  TimeUnit unit,
 404                                  A attachment,
 405                                  CompletionHandler<Long,? super A> handler)
 406     {
 407         if (handler == null)
 408             throw new NullPointerException("'handler' is null");
 409         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 410             throw new IndexOutOfBoundsException();
 411         srcs = Util.subsequence(srcs, offset, length);
 412         write(true, null, srcs, timeout, unit, attachment, handler);
 413     }
 414 
 415     @Override
 416     public final AsynchronousSocketChannel bind(SocketAddress local)
 417         throws IOException
 418     {
 419         try {
 420             begin();
 421             synchronized (stateLock) {
 422                 if (state == ST_PENDING)
 423                     throw new ConnectionPendingException();
 424                 if (localAddress != null)
 425                     throw new AlreadyBoundException();
 426                 InetSocketAddress isa = (local == null) ?
 427                     new InetSocketAddress(0) : Net.checkAddress(local);
 428                 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
 429                 Net.bind(fd, isa.getAddress(), isa.getPort());
 430                 localAddress = Net.localAddress(fd);
 431             }
 432         } finally {
 433             end();
 434         }
 435         return this;
 436     }
 437 
 438     @Override
 439     public final SocketAddress getLocalAddress() throws IOException {
 440         if (!isOpen())
 441             throw new ClosedChannelException();
 442         return localAddress;
 443     }
 444 
 445     @Override
 446     public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)
 447         throws IOException
 448     {
 449         if (name == null)
 450             throw new NullPointerException();
 451         if (!supportedOptions().contains(name))
 452             throw new UnsupportedOperationException("'" + name + "' not supported");
 453 
 454         try {
 455             begin();
 456             if (writeShutdown)
 457                 throw new IOException("Connection has been shutdown for writing");
 458             Net.setSocketOption(fd, Net.UNSPEC, name, value);
 459             return this;
 460         } finally {
 461             end();
 462         }
 463     }
 464 
 465     @Override
 466     @SuppressWarnings("unchecked")
 467     public final <T> T getOption(SocketOption<T> name) throws IOException {
 468         if (name == null)
 469             throw new NullPointerException();
 470         if (!supportedOptions().contains(name))
 471             throw new UnsupportedOperationException("'" + name + "' not supported");
 472 
 473         try {
 474             begin();
 475             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
 476         } finally {
 477             end();
 478         }
 479     }
 480 
 481     private static class DefaultOptionsHolder {
 482         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 483 
 484         private static Set<SocketOption<?>> defaultOptions() {
 485             HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(5);
 486             set.add(StandardSocketOption.SO_SNDBUF);
 487             set.add(StandardSocketOption.SO_RCVBUF);
 488             set.add(StandardSocketOption.SO_KEEPALIVE);
 489             set.add(StandardSocketOption.SO_REUSEADDR);
 490             set.add(StandardSocketOption.TCP_NODELAY);
 491             return Collections.unmodifiableSet(set);
 492         }
 493     }
 494 
 495     @Override
 496     public final Set<SocketOption<?>> supportedOptions() {
 497         return DefaultOptionsHolder.defaultOptions;
 498     }
 499 
 500     @Override
 501     public final SocketAddress getRemoteAddress() throws IOException {
 502         if (!isOpen())
 503             throw new ClosedChannelException();
 504         return remoteAddress;
 505     }
 506 
 507     @Override
 508     public final AsynchronousSocketChannel shutdownInput() throws IOException {
 509         try {
 510             begin();
 511             if (remoteAddress == null)
 512                 throw new NotYetConnectedException();
 513             synchronized (readLock) {
 514                 if (!readShutdown) {
 515                     Net.shutdown(fd, Net.SHUT_RD);
 516                     readShutdown = true;
 517                 }
 518             }
 519         } finally {
 520             end();
 521         }
 522         return this;
 523     }
 524 
 525     @Override
 526     public final AsynchronousSocketChannel shutdownOutput() throws IOException {
 527         try {
 528             begin();
 529             if (remoteAddress == null)
 530                 throw new NotYetConnectedException();
 531             synchronized (writeLock) {
 532                 if (!writeShutdown) {
 533                     Net.shutdown(fd, Net.SHUT_WR);
 534                     writeShutdown = true;
 535                 }
 536             }
 537         } finally {
 538             end();
 539         }
 540         return this;
 541     }
 542 
 543     @Override
 544     public final String toString() {
 545         StringBuilder sb = new StringBuilder();
 546         sb.append(this.getClass().getName());
 547         sb.append('[');
 548         synchronized (stateLock) {
 549             if (!isOpen()) {
 550                 sb.append("closed");
 551             } else {
 552                 switch (state) {
 553                 case ST_UNCONNECTED:
 554                     sb.append("unconnected");
 555                     break;
 556                 case ST_PENDING:
 557                     sb.append("connection-pending");
 558                     break;
 559                 case ST_CONNECTED:
 560                     sb.append("connected");
 561                     if (readShutdown)
 562                         sb.append(" ishut");
 563                     if (writeShutdown)
 564                         sb.append(" oshut");
 565                     break;
 566                 }
 567                 if (localAddress != null) {
 568                     sb.append(" local=");
 569                     sb.append(localAddress.toString());
 570                 }
 571                 if (remoteAddress != null) {
 572                     sb.append(" remote=");
 573                     sb.append(remoteAddress.toString());
 574                 }
 575             }
 576         }
 577         sb.append(']');
 578         return sb.toString();
 579     }
 580 }