1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.ByteBuffer;
  29 import java.nio.channels.*;
  30 import java.net.SocketOption;
  31 import java.net.StandardSocketOptions;
  32 import java.net.SocketAddress;
  33 import java.net.InetSocketAddress;
  34 import java.io.IOException;
  35 import java.io.FileDescriptor;
  36 import java.util.Set;
  37 import java.util.HashSet;
  38 import java.util.Collections;
  39 import java.util.concurrent.*;
  40 import java.util.concurrent.locks.*;
  41 import sun.net.NetHooks;
  42 import sun.net.ExtendedOptionsImpl;
  43 
  44 /**
  45  * Base implementation of AsynchronousSocketChannel
  46  */
  47 
  48 abstract class AsynchronousSocketChannelImpl
  49     extends AsynchronousSocketChannel
  50     implements Cancellable, Groupable
  51 {
  52     protected final FileDescriptor fd;
  53 
  54     // protects state, localAddress, and remoteAddress
  55     protected final Object stateLock = new Object();
  56 
  57     protected volatile InetSocketAddress localAddress = null;
  58     protected volatile InetSocketAddress remoteAddress = null;
  59 
  60     // State, increases monotonically
  61     static final int ST_UNINITIALIZED = -1;
  62     static final int ST_UNCONNECTED = 0;
  63     static final int ST_PENDING = 1;
  64     static final int ST_CONNECTED = 2;
  65     protected volatile int state = ST_UNINITIALIZED;
  66 
  67     // reading state
  68     private final Object readLock = new Object();
  69     private boolean reading;
  70     private boolean readShutdown;
  71     private boolean readKilled;     // further reading disallowed due to timeout
  72 
  73     // writing state
  74     private final Object writeLock = new Object();
  75     private boolean writing;
  76     private boolean writeShutdown;
  77     private boolean writeKilled;    // further writing disallowed due to timeout
  78 
  79     // close support
  80     private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
  81     private volatile boolean open = true;
  82 
  83     // set true when exclusive binding is on and SO_REUSEADDR is emulated
  84     private boolean isReuseAddress;
  85 
  86     AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)
  87         throws IOException
  88     {
  89         super(group.provider());
  90         this.fd = Net.socket(true);
  91         this.state = ST_UNCONNECTED;
  92     }
  93 
  94     // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl
  95     AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,
  96                                   FileDescriptor fd,
  97                                   InetSocketAddress remote)
  98         throws IOException
  99     {
 100         super(group.provider());
 101         this.fd = fd;
 102         this.state = ST_CONNECTED;
 103         this.localAddress = Net.localAddress(fd);
 104         this.remoteAddress = remote;
 105     }
 106 
 107     @Override
 108     public final boolean isOpen() {
 109         return open;
 110     }
 111 
 112     /**
 113      * Marks beginning of access to file descriptor/handle
 114      */
 115     final void begin() throws IOException {
 116         closeLock.readLock().lock();
 117         if (!isOpen())
 118             throw new ClosedChannelException();
 119     }
 120 
 121     /**
 122      * Marks end of access to file descriptor/handle
 123      */
 124     final void end() {
 125         closeLock.readLock().unlock();
 126     }
 127 
 128     /**
 129      * Invoked to close socket and release other resources.
 130      */
 131     abstract void implClose() throws IOException;
 132 
 133     @Override
 134     public final void close() throws IOException {
 135         // synchronize with any threads initiating asynchronous operations
 136         closeLock.writeLock().lock();
 137         try {
 138             if (!open)
 139                 return;     // already closed
 140             open = false;
 141         } finally {
 142             closeLock.writeLock().unlock();
 143         }
 144         implClose();
 145     }
 146 
 147     final void enableReading(boolean killed) {
 148         synchronized (readLock) {
 149             reading = false;
 150             if (killed)
 151                 readKilled = true;
 152         }
 153     }
 154 
 155     final void enableReading() {
 156         enableReading(false);
 157     }
 158 
 159     final void enableWriting(boolean killed) {
 160         synchronized (writeLock) {
 161             writing = false;
 162             if (killed)
 163                 writeKilled = true;
 164         }
 165     }
 166 
 167     final void enableWriting() {
 168         enableWriting(false);
 169     }
 170 
 171     final void killReading() {
 172         synchronized (readLock) {
 173             readKilled = true;
 174         }
 175     }
 176 
 177     final void killWriting() {
 178         synchronized (writeLock) {
 179             writeKilled = true;
 180         }
 181     }
 182 
 183     final void killConnect() {
 184         // when a connect is cancelled then the connection may have been
 185         // established so prevent reading or writing.
 186         killReading();
 187         killWriting();
 188     }
 189 
 190     /**
 191      * Invoked by connect to initiate the connect operation.
 192      */
 193     abstract <A> Future<Void> implConnect(SocketAddress remote,
 194                                           A attachment,
 195                                           CompletionHandler<Void,? super A> handler);
 196 
 197     @Override
 198     public final Future<Void> connect(SocketAddress remote) {
 199         return implConnect(remote, null, null);
 200     }
 201 
 202     @Override
 203     public final <A> void connect(SocketAddress remote,
 204                                   A attachment,
 205                                   CompletionHandler<Void,? super A> handler)
 206     {
 207         if (handler == null)
 208             throw new NullPointerException("'handler' is null");
 209         implConnect(remote, attachment, handler);
 210     }
 211 
 212     /**
 213      * Invoked by read to initiate the I/O operation.
 214      */
 215     abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
 216                                                      ByteBuffer dst,
 217                                                      ByteBuffer[] dsts,
 218                                                      long timeout,
 219                                                      TimeUnit unit,
 220                                                      A attachment,
 221                                                      CompletionHandler<V,? super A> handler);
 222 
 223     @SuppressWarnings("unchecked")
 224     private <V extends Number,A> Future<V> read(boolean isScatteringRead,
 225                                                 ByteBuffer dst,
 226                                                 ByteBuffer[] dsts,
 227                                                 long timeout,
 228                                                 TimeUnit unit,
 229                                                 A att,
 230                                                 CompletionHandler<V,? super A> handler)
 231     {
 232         if (!isOpen()) {
 233             Throwable e = new ClosedChannelException();
 234             if (handler == null)
 235                 return CompletedFuture.withFailure(e);
 236             Invoker.invoke(this, handler, att, null, e);
 237             return null;
 238         }
 239 
 240         if (remoteAddress == null)
 241             throw new NotYetConnectedException();
 242 
 243         boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();
 244         boolean shutdown = false;
 245 
 246         // check and update state
 247         synchronized (readLock) {
 248             if (readKilled)
 249                 throw new IllegalStateException("Reading not allowed due to timeout or cancellation");
 250             if (reading)
 251                 throw new ReadPendingException();
 252             if (readShutdown) {
 253                 shutdown = true;
 254             } else {
 255                 if (hasSpaceToRead) {
 256                     reading = true;
 257                 }
 258             }
 259         }
 260 
 261         // immediately complete with -1 if shutdown for read
 262         // immediately complete with 0 if no space remaining
 263         if (shutdown || !hasSpaceToRead) {
 264             Number result;
 265             if (isScatteringRead) {
 266                 result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);
 267             } else {
 268                 result = (shutdown) ? -1 : 0;
 269             }
 270             if (handler == null)
 271                 return CompletedFuture.withResult((V)result);
 272             Invoker.invoke(this, handler, att, (V)result, null);
 273             return null;
 274         }
 275 
 276         return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);
 277     }
 278 
 279     @Override
 280     public final Future<Integer> read(ByteBuffer dst) {
 281         if (dst.isReadOnly())
 282             throw new IllegalArgumentException("Read-only buffer");
 283         return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);
 284     }
 285 
 286     @Override
 287     public final <A> void read(ByteBuffer dst,
 288                                long timeout,
 289                                TimeUnit unit,
 290                                A attachment,
 291                                CompletionHandler<Integer,? super A> handler)
 292     {
 293         if (handler == null)
 294             throw new NullPointerException("'handler' is null");
 295         if (dst.isReadOnly())
 296             throw new IllegalArgumentException("Read-only buffer");
 297         read(false, dst, null, timeout, unit, attachment, handler);
 298     }
 299 
 300     @Override
 301     public final <A> void read(ByteBuffer[] dsts,
 302                                int offset,
 303                                int length,
 304                                long timeout,
 305                                TimeUnit unit,
 306                                A attachment,
 307                                CompletionHandler<Long,? super A> handler)
 308     {
 309         if (handler == null)
 310             throw new NullPointerException("'handler' is null");
 311         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 312             throw new IndexOutOfBoundsException();
 313         ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);
 314         for (int i=0; i<bufs.length; i++) {
 315             if (bufs[i].isReadOnly())
 316                 throw new IllegalArgumentException("Read-only buffer");
 317         }
 318         read(true, null, bufs, timeout, unit, attachment, handler);
 319     }
 320 
 321     /**
 322      * Invoked by write to initiate the I/O operation.
 323      */
 324     abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
 325                                                       ByteBuffer src,
 326                                                       ByteBuffer[] srcs,
 327                                                       long timeout,
 328                                                       TimeUnit unit,
 329                                                       A attachment,
 330                                                       CompletionHandler<V,? super A> handler);
 331 
 332     @SuppressWarnings("unchecked")
 333     private <V extends Number,A> Future<V> write(boolean isGatheringWrite,
 334                                                  ByteBuffer src,
 335                                                  ByteBuffer[] srcs,
 336                                                  long timeout,
 337                                                  TimeUnit unit,
 338                                                  A att,
 339                                                  CompletionHandler<V,? super A> handler)
 340     {
 341         boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();
 342 
 343         boolean closed = false;
 344         if (isOpen()) {
 345             if (remoteAddress == null)
 346                 throw new NotYetConnectedException();
 347             // check and update state
 348             synchronized (writeLock) {
 349                 if (writeKilled)
 350                     throw new IllegalStateException("Writing not allowed due to timeout or cancellation");
 351                 if (writing)
 352                     throw new WritePendingException();
 353                 if (writeShutdown) {
 354                     closed = true;
 355                 } else {
 356                     if (hasDataToWrite)
 357                         writing = true;
 358                 }
 359             }
 360         } else {
 361             closed = true;
 362         }
 363 
 364         // channel is closed or shutdown for write
 365         if (closed) {
 366             Throwable e = new ClosedChannelException();
 367             if (handler == null)
 368                 return CompletedFuture.withFailure(e);
 369             Invoker.invoke(this, handler, att, null, e);
 370             return null;
 371         }
 372 
 373         // nothing to write so complete immediately
 374         if (!hasDataToWrite) {
 375             Number result = (isGatheringWrite) ? (Number)0L : (Number)0;
 376             if (handler == null)
 377                 return CompletedFuture.withResult((V)result);
 378             Invoker.invoke(this, handler, att, (V)result, null);
 379             return null;
 380         }
 381 
 382         return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);
 383     }
 384 
 385     @Override
 386     public final Future<Integer> write(ByteBuffer src) {
 387         return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);
 388     }
 389 
 390     @Override
 391     public final <A> void write(ByteBuffer src,
 392                                 long timeout,
 393                                 TimeUnit unit,
 394                                 A attachment,
 395                                 CompletionHandler<Integer,? super A> handler)
 396     {
 397         if (handler == null)
 398             throw new NullPointerException("'handler' is null");
 399         write(false, src, null, timeout, unit, attachment, handler);
 400     }
 401 
 402     @Override
 403     public final <A> void  write(ByteBuffer[] srcs,
 404                                  int offset,
 405                                  int length,
 406                                  long timeout,
 407                                  TimeUnit unit,
 408                                  A attachment,
 409                                  CompletionHandler<Long,? super A> handler)
 410     {
 411         if (handler == null)
 412             throw new NullPointerException("'handler' is null");
 413         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 414             throw new IndexOutOfBoundsException();
 415         srcs = Util.subsequence(srcs, offset, length);
 416         write(true, null, srcs, timeout, unit, attachment, handler);
 417     }
 418 
 419     @Override
 420     public final AsynchronousSocketChannel bind(SocketAddress local)
 421         throws IOException
 422     {
 423         try {
 424             begin();
 425             synchronized (stateLock) {
 426                 if (state == ST_PENDING)
 427                     throw new ConnectionPendingException();
 428                 if (localAddress != null)
 429                     throw new AlreadyBoundException();
 430                 InetSocketAddress isa = (local == null) ?
 431                     new InetSocketAddress(0) : Net.checkAddress(local);
 432                 SecurityManager sm = System.getSecurityManager();
 433                 if (sm != null) {
 434                     sm.checkListen(isa.getPort());
 435                 }
 436                 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
 437                 Net.bind(fd, isa.getAddress(), isa.getPort());
 438                 localAddress = Net.localAddress(fd);
 439             }
 440         } finally {
 441             end();
 442         }
 443         return this;
 444     }
 445 
 446     @Override
 447     public final SocketAddress getLocalAddress() throws IOException {
 448         if (!isOpen())
 449             throw new ClosedChannelException();
 450          return Net.getRevealedLocalAddress(localAddress);
 451     }
 452 
 453     @Override
 454     public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)
 455         throws IOException
 456     {
 457         if (name == null)
 458             throw new NullPointerException();
 459         if (!supportedOptions().contains(name))
 460             throw new UnsupportedOperationException("'" + name + "' not supported");
 461 
 462         try {
 463             begin();
 464             if (writeShutdown)
 465                 throw new IOException("Connection has been shutdown for writing");
 466             if (name == StandardSocketOptions.SO_REUSEADDR &&
 467                     Net.useExclusiveBind())
 468             {
 469                 // SO_REUSEADDR emulated when using exclusive bind
 470                 isReuseAddress = (Boolean)value;
 471             } else {
 472                 Net.setSocketOption(fd, Net.UNSPEC, name, value);
 473             }
 474             return this;
 475         } finally {
 476             end();
 477         }
 478     }
 479 
 480     @Override
 481     @SuppressWarnings("unchecked")
 482     public final <T> T getOption(SocketOption<T> name) throws IOException {
 483         if (name == null)
 484             throw new NullPointerException();
 485         if (!supportedOptions().contains(name))
 486             throw new UnsupportedOperationException("'" + name + "' not supported");
 487 
 488         try {
 489             begin();
 490             if (name == StandardSocketOptions.SO_REUSEADDR &&
 491                     Net.useExclusiveBind())
 492             {
 493                 // SO_REUSEADDR emulated when using exclusive bind
 494                 return (T)Boolean.valueOf(isReuseAddress);
 495             }
 496             return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
 497         } finally {
 498             end();
 499         }
 500     }
 501 
 502     private static class DefaultOptionsHolder {
 503         static final Set<SocketOption<?>> defaultOptions = defaultOptions();
 504 
 505         private static Set<SocketOption<?>> defaultOptions() {
 506             HashSet<SocketOption<?>> set = new HashSet<>(5);
 507             set.add(StandardSocketOptions.SO_SNDBUF);
 508             set.add(StandardSocketOptions.SO_RCVBUF);
 509             set.add(StandardSocketOptions.SO_KEEPALIVE);
 510             set.add(StandardSocketOptions.SO_REUSEADDR);
 511             set.add(StandardSocketOptions.TCP_NODELAY);
 512             if (ExtendedOptionsImpl.flowSupported()) {
 513                 set.add(jdk.net.ExtendedSocketOptions.SO_FLOW_SLA);
 514             }
 515             return Collections.unmodifiableSet(set);
 516         }
 517     }
 518 
 519     @Override
 520     public final Set<SocketOption<?>> supportedOptions() {
 521         return DefaultOptionsHolder.defaultOptions;
 522     }
 523 
 524     @Override
 525     public final SocketAddress getRemoteAddress() throws IOException {
 526         if (!isOpen())
 527             throw new ClosedChannelException();
 528         return remoteAddress;
 529     }
 530 
 531     @Override
 532     public final AsynchronousSocketChannel shutdownInput() throws IOException {
 533         try {
 534             begin();
 535             if (remoteAddress == null)
 536                 throw new NotYetConnectedException();
 537             synchronized (readLock) {
 538                 if (!readShutdown) {
 539                     Net.shutdown(fd, Net.SHUT_RD);
 540                     readShutdown = true;
 541                 }
 542             }
 543         } finally {
 544             end();
 545         }
 546         return this;
 547     }
 548 
 549     @Override
 550     public final AsynchronousSocketChannel shutdownOutput() throws IOException {
 551         try {
 552             begin();
 553             if (remoteAddress == null)
 554                 throw new NotYetConnectedException();
 555             synchronized (writeLock) {
 556                 if (!writeShutdown) {
 557                     Net.shutdown(fd, Net.SHUT_WR);
 558                     writeShutdown = true;
 559                 }
 560             }
 561         } finally {
 562             end();
 563         }
 564         return this;
 565     }
 566 
 567     @Override
 568     public final String toString() {
 569         StringBuilder sb = new StringBuilder();
 570         sb.append(this.getClass().getName());
 571         sb.append('[');
 572         synchronized (stateLock) {
 573             if (!isOpen()) {
 574                 sb.append("closed");
 575             } else {
 576                 switch (state) {
 577                 case ST_UNCONNECTED:
 578                     sb.append("unconnected");
 579                     break;
 580                 case ST_PENDING:
 581                     sb.append("connection-pending");
 582                     break;
 583                 case ST_CONNECTED:
 584                     sb.append("connected");
 585                     if (readShutdown)
 586                         sb.append(" ishut");
 587                     if (writeShutdown)
 588                         sb.append(" oshut");
 589                     break;
 590                 }
 591                 if (localAddress != null) {
 592                     sb.append(" local=");
 593                     sb.append(
 594                             Net.getRevealedLocalAddressAsString(localAddress));
 595                 }
 596                 if (remoteAddress != null) {
 597                     sb.append(" remote=");
 598                     sb.append(remoteAddress.toString());
 599                 }
 600             }
 601         }
 602         sb.append(']');
 603         return sb.toString();
 604     }
 605 }