src/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page
rev 6099 : [mq]: io-trace


  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.*;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.*;
  34 import java.util.*;
  35 import sun.net.NetHooks;
  36 

  37 
  38 /**
  39  * An implementation of SocketChannels
  40  */
  41 
  42 class SocketChannelImpl
  43     extends SocketChannel
  44     implements SelChImpl
  45 {
  46 
  47     // Used to make native read and write calls
  48     private static NativeDispatcher nd;
  49 
  50     // Our file descriptor object
  51     private final FileDescriptor fd;
  52 
  53     // fd value needed for dev/poll. This value will remain valid
  54     // even after the value in the file descriptor object has been set to -1
  55     private final int fdVal;
  56 


  63 
  64     // Lock held by current writing or connecting thread
  65     private final Object writeLock = new Object();
  66 
  67     // Lock held by any thread that modifies the state fields declared below
  68     // DO NOT invoke a blocking I/O operation while holding this lock!
  69     private final Object stateLock = new Object();
  70 
  71     // -- The following fields are protected by stateLock
  72 
  73     // State, increases monotonically
  74     private static final int ST_UNINITIALIZED = -1;
  75     private static final int ST_UNCONNECTED = 0;
  76     private static final int ST_PENDING = 1;
  77     private static final int ST_CONNECTED = 2;
  78     private static final int ST_KILLPENDING = 3;
  79     private static final int ST_KILLED = 4;
  80     private int state = ST_UNINITIALIZED;
  81 
  82     // Binding
  83     private SocketAddress localAddress;
  84     private SocketAddress remoteAddress;
  85 
  86     // Input/Output open
  87     private boolean isInputOpen = true;
  88     private boolean isOutputOpen = true;
  89     private boolean readyToConnect = false;
  90 
  91     // Socket adaptor, created on demand
  92     private Socket socket;
  93 
  94     // -- End of fields protected by stateLock
  95 
  96 
  97     // Constructor for normal connecting sockets
  98     //
  99     SocketChannelImpl(SelectorProvider sp) throws IOException {
 100         super(sp);
 101         this.fd = Net.socket(true);
 102         this.fdVal = IOUtil.fdVal(fd);
 103         this.state = ST_UNCONNECTED;
 104     }


 261                 kill();
 262         }
 263     }
 264 
 265     private void writerCleanup() throws IOException {
 266         synchronized (stateLock) {
 267             writerThread = 0;
 268             if (state == ST_KILLPENDING)
 269                 kill();
 270         }
 271     }
 272 
 273     public int read(ByteBuffer buf) throws IOException {
 274 
 275         if (buf == null)
 276             throw new NullPointerException();
 277 
 278         synchronized (readLock) {
 279             if (!ensureReadOpen())
 280                 return -1;





 281             int n = 0;
 282             try {
 283 
 284                 // Set up the interruption machinery; see
 285                 // AbstractInterruptibleChannel for details
 286                 //
 287                 begin();
 288 
 289                 synchronized (stateLock) {
 290                     if (!isOpen()) {
 291                     // Either the current thread is already interrupted, so
 292                     // begin() closed the channel, or another thread closed the
 293                     // channel since we checked it a few bytecodes ago.  In
 294                     // either case the value returned here is irrelevant since
 295                     // the invocation of end() in the finally block will throw
 296                     // an appropriate exception.
 297                     //
 298                         return 0;
 299 
 300                     }


 350                 // so the connection gets cut off as usual).
 351                 //
 352                 // For socket channels there is the additional wrinkle that
 353                 // asynchronous shutdown works much like asynchronous close,
 354                 // except that the channel is shutdown rather than completely
 355                 // closed.  This is analogous to the first two cases above,
 356                 // except that the shutdown operation plays the role of
 357                 // nd.preClose().
 358                 for (;;) {
 359                     n = IOUtil.read(fd, buf, -1, nd, readLock);
 360                     if ((n == IOStatus.INTERRUPTED) && isOpen()) {
 361                         // The system call was interrupted but the channel
 362                         // is still open, so retry
 363                         continue;
 364                     }
 365                     return IOStatus.normalize(n);
 366                 }
 367 
 368             } finally {
 369                 readerCleanup();        // Clear reader thread





 370                 // The end method, which is defined in our superclass
 371                 // AbstractInterruptibleChannel, resets the interruption
 372                 // machinery.  If its argument is true then it returns
 373                 // normally; otherwise it checks the interrupt and open state
 374                 // of this channel and throws an appropriate exception if
 375                 // necessary.
 376                 //
 377                 // So, if we actually managed to do any I/O in the above try
 378                 // block then we pass true to the end method.  We also pass
 379                 // true if the channel was in non-blocking mode when the I/O
 380                 // operation was initiated but no data could be transferred;
 381                 // this prevents spurious exceptions from being thrown in the
 382                 // rare event that a channel is closed or a thread is
 383                 // interrupted at the exact moment that a non-blocking I/O
 384                 // request is made.
 385                 //
 386                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 387 
 388                 // Extra case for socket channels: Asynchronous shutdown
 389                 //
 390                 synchronized (stateLock) {
 391                     if ((n <= 0) && (!isInputOpen))
 392                         return IOStatus.EOF;
 393                 }
 394 
 395                 assert IOStatus.check(n);
 396 
 397             }
 398         }
 399     }
 400 
 401     public long read(ByteBuffer[] dsts, int offset, int length)
 402         throws IOException
 403     {
 404         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 405             throw new IndexOutOfBoundsException();
 406         synchronized (readLock) {
 407             if (!ensureReadOpen())
 408                 return -1;
 409             long n = 0;





 410             try {
 411                 begin();
 412                 synchronized (stateLock) {
 413                     if (!isOpen())
 414                         return 0;
 415                     readerThread = NativeThread.current();
 416                 }
 417 
 418                 for (;;) {
 419                     n = IOUtil.read(fd, dsts, offset, length, nd);
 420                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 421                         continue;
 422                     return IOStatus.normalize(n);
 423                 }
 424             } finally {
 425                 readerCleanup();



 426                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 427                 synchronized (stateLock) {
 428                     if ((n <= 0) && (!isInputOpen))
 429                         return IOStatus.EOF;
 430                 }
 431                 assert IOStatus.check(n);
 432             }
 433         }
 434     }
 435 
 436     public int write(ByteBuffer buf) throws IOException {
 437         if (buf == null)
 438             throw new NullPointerException();
 439         synchronized (writeLock) {
 440             ensureWriteOpen();
 441             int n = 0;




 442             try {
 443                 begin();
 444                 synchronized (stateLock) {
 445                     if (!isOpen())
 446                         return 0;
 447                     writerThread = NativeThread.current();
 448                 }
 449                 for (;;) {
 450                     n = IOUtil.write(fd, buf, -1, nd, writeLock);
 451                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 452                         continue;
 453                     return IOStatus.normalize(n);
 454                 }
 455             } finally {
 456                 writerCleanup();

 457                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 458                 synchronized (stateLock) {
 459                     if ((n <= 0) && (!isOutputOpen))
 460                         throw new AsynchronousCloseException();
 461                 }
 462                 assert IOStatus.check(n);
 463             }
 464         }
 465     }
 466 
 467     public long write(ByteBuffer[] srcs, int offset, int length)
 468         throws IOException
 469     {
 470         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 471             throw new IndexOutOfBoundsException();
 472         synchronized (writeLock) {
 473             ensureWriteOpen();
 474             long n = 0;



 475             try {
 476                 begin();
 477                 synchronized (stateLock) {
 478                     if (!isOpen())
 479                         return 0;
 480                     writerThread = NativeThread.current();
 481                 }
 482                 for (;;) {
 483                     n = IOUtil.write(fd, srcs, offset, length, nd);
 484                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 485                         continue;
 486                     return IOStatus.normalize(n);
 487                 }
 488             } finally {
 489                 writerCleanup();

 490                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 491                 synchronized (stateLock) {
 492                     if ((n <= 0) && (!isOutputOpen))
 493                         throw new AsynchronousCloseException();
 494                 }
 495                 assert IOStatus.check(n);
 496             }
 497         }
 498     }
 499 
 500     // package-private
 501     int sendOutOfBandData(byte b) throws IOException {
 502         synchronized (writeLock) {
 503             ensureWriteOpen();
 504             int n = 0;
 505             try {
 506                 begin();
 507                 synchronized (stateLock) {
 508                     if (!isOpen())
 509                         return 0;




  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.*;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.*;
  34 import java.util.*;
  35 import sun.net.NetHooks;
  36 import sun.misc.IoTrace;
  37 import sun.misc.IoTraceContext;
  38 
  39 /**
  40  * An implementation of SocketChannels
  41  */
  42 
  43 class SocketChannelImpl
  44     extends SocketChannel
  45     implements SelChImpl
  46 {
  47 
  48     // Used to make native read and write calls
  49     private static NativeDispatcher nd;
  50 
  51     // Our file descriptor object
  52     private final FileDescriptor fd;
  53 
  54     // fd value needed for dev/poll. This value will remain valid
  55     // even after the value in the file descriptor object has been set to -1
  56     private final int fdVal;
  57 


  64 
  65     // Lock held by current writing or connecting thread
  66     private final Object writeLock = new Object();
  67 
  68     // Lock held by any thread that modifies the state fields declared below
  69     // DO NOT invoke a blocking I/O operation while holding this lock!
  70     private final Object stateLock = new Object();
  71 
  72     // -- The following fields are protected by stateLock
  73 
  74     // State, increases monotonically
  75     private static final int ST_UNINITIALIZED = -1;
  76     private static final int ST_UNCONNECTED = 0;
  77     private static final int ST_PENDING = 1;
  78     private static final int ST_CONNECTED = 2;
  79     private static final int ST_KILLPENDING = 3;
  80     private static final int ST_KILLED = 4;
  81     private int state = ST_UNINITIALIZED;
  82 
  83     // Binding
  84     private InetSocketAddress localAddress;
  85     private InetSocketAddress remoteAddress;
  86 
  87     // Input/Output open
  88     private boolean isInputOpen = true;
  89     private boolean isOutputOpen = true;
  90     private boolean readyToConnect = false;
  91 
  92     // Socket adaptor, created on demand
  93     private Socket socket;
  94 
  95     // -- End of fields protected by stateLock
  96 
  97 
  98     // Constructor for normal connecting sockets
  99     //
 100     SocketChannelImpl(SelectorProvider sp) throws IOException {
 101         super(sp);
 102         this.fd = Net.socket(true);
 103         this.fdVal = IOUtil.fdVal(fd);
 104         this.state = ST_UNCONNECTED;
 105     }


 262                 kill();
 263         }
 264     }
 265 
 266     private void writerCleanup() throws IOException {
 267         synchronized (stateLock) {
 268             writerThread = 0;
 269             if (state == ST_KILLPENDING)
 270                 kill();
 271         }
 272     }
 273 
 274     public int read(ByteBuffer buf) throws IOException {
 275 
 276         if (buf == null)
 277             throw new NullPointerException();
 278 
 279         synchronized (readLock) {
 280             if (!ensureReadOpen())
 281                 return -1;
 282             IoTraceContext traceContext = null;
 283             if (isBlocking()) {
 284                 traceContext = IoTrace.socketReadBegin(remoteAddress.getAddress(),
 285                                                       remoteAddress.getPort(), 0);
 286             }
 287             int n = 0;
 288             try {
 289 
 290                 // Set up the interruption machinery; see
 291                 // AbstractInterruptibleChannel for details
 292                 //
 293                 begin();
 294 
 295                 synchronized (stateLock) {
 296                     if (!isOpen()) {
 297                     // Either the current thread is already interrupted, so
 298                     // begin() closed the channel, or another thread closed the
 299                     // channel since we checked it a few bytecodes ago.  In
 300                     // either case the value returned here is irrelevant since
 301                     // the invocation of end() in the finally block will throw
 302                     // an appropriate exception.
 303                     //
 304                         return 0;
 305 
 306                     }


 356                 // so the connection gets cut off as usual).
 357                 //
 358                 // For socket channels there is the additional wrinkle that
 359                 // asynchronous shutdown works much like asynchronous close,
 360                 // except that the channel is shutdown rather than completely
 361                 // closed.  This is analogous to the first two cases above,
 362                 // except that the shutdown operation plays the role of
 363                 // nd.preClose().
 364                 for (;;) {
 365                     n = IOUtil.read(fd, buf, -1, nd, readLock);
 366                     if ((n == IOStatus.INTERRUPTED) && isOpen()) {
 367                         // The system call was interrupted but the channel
 368                         // is still open, so retry
 369                         continue;
 370                     }
 371                     return IOStatus.normalize(n);
 372                 }
 373 
 374             } finally {
 375                 readerCleanup();        // Clear reader thread
 376 
 377                 if (isBlocking()) {
 378                     IoTrace.socketReadEnd(traceContext, n > 0 ? n : 0);
 379                 }
 380 
 381                 // The end method, which is defined in our superclass
 382                 // AbstractInterruptibleChannel, resets the interruption
 383                 // machinery.  If its argument is true then it returns
 384                 // normally; otherwise it checks the interrupt and open state
 385                 // of this channel and throws an appropriate exception if
 386                 // necessary.
 387                 //
 388                 // So, if we actually managed to do any I/O in the above try
 389                 // block then we pass true to the end method.  We also pass
 390                 // true if the channel was in non-blocking mode when the I/O
 391                 // operation was initiated but no data could be transferred;
 392                 // this prevents spurious exceptions from being thrown in the
 393                 // rare event that a channel is closed or a thread is
 394                 // interrupted at the exact moment that a non-blocking I/O
 395                 // request is made.
 396                 //
 397                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 398 
 399                 // Extra case for socket channels: Asynchronous shutdown
 400                 //
 401                 synchronized (stateLock) {
 402                     if ((n <= 0) && (!isInputOpen))
 403                         return IOStatus.EOF;
 404                 }
 405 
 406                 assert IOStatus.check(n);
 407 
 408             }
 409         }
 410     }
 411 
 412     public long read(ByteBuffer[] dsts, int offset, int length)
 413         throws IOException
 414     {
 415         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 416             throw new IndexOutOfBoundsException();
 417         synchronized (readLock) {
 418             if (!ensureReadOpen())
 419                 return -1;
 420             long n = 0;
 421             IoTraceContext traceContext = null;
 422             if (isBlocking()) {
 423                 traceContext = IoTrace.socketReadBegin(remoteAddress.getAddress(),
 424                                                       remoteAddress.getPort(), 0);
 425             }
 426             try {
 427                 begin();
 428                 synchronized (stateLock) {
 429                     if (!isOpen())
 430                         return 0;
 431                     readerThread = NativeThread.current();
 432                 }
 433 
 434                 for (;;) {
 435                     n = IOUtil.read(fd, dsts, offset, length, nd);
 436                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 437                         continue;
 438                     return IOStatus.normalize(n);
 439                 }
 440             } finally {
 441                 readerCleanup();
 442                 if (isBlocking()) {
 443                     IoTrace.socketReadEnd(traceContext, n > 0 ? n : 0);
 444                 }
 445                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 446                 synchronized (stateLock) {
 447                     if ((n <= 0) && (!isInputOpen))
 448                         return IOStatus.EOF;
 449                 }
 450                 assert IOStatus.check(n);
 451             }
 452         }
 453     }
 454 
 455     public int write(ByteBuffer buf) throws IOException {
 456         if (buf == null)
 457             throw new NullPointerException();
 458         synchronized (writeLock) {
 459             ensureWriteOpen();
 460             int n = 0;
 461             IoTraceContext traceContext =
 462                 IoTrace.socketWriteBegin(remoteAddress.getAddress(),
 463                                          remoteAddress.getPort());
 464 
 465             try {
 466                 begin();
 467                 synchronized (stateLock) {
 468                     if (!isOpen())
 469                         return 0;
 470                     writerThread = NativeThread.current();
 471                 }
 472                 for (;;) {
 473                     n = IOUtil.write(fd, buf, -1, nd, writeLock);
 474                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 475                         continue;
 476                     return IOStatus.normalize(n);
 477                 }
 478             } finally {
 479                 writerCleanup();
 480                 IoTrace.socketWriteEnd(traceContext, n > 0 ? n : 0);
 481                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 482                 synchronized (stateLock) {
 483                     if ((n <= 0) && (!isOutputOpen))
 484                         throw new AsynchronousCloseException();
 485                 }
 486                 assert IOStatus.check(n);
 487             }
 488         }
 489     }
 490 
 491     public long write(ByteBuffer[] srcs, int offset, int length)
 492         throws IOException
 493     {
 494         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 495             throw new IndexOutOfBoundsException();
 496         synchronized (writeLock) {
 497             ensureWriteOpen();
 498             long n = 0;
 499             IoTraceContext traceContext =
 500                 IoTrace.socketWriteBegin(remoteAddress.getAddress(),
 501                                          remoteAddress.getPort());
 502             try {
 503                 begin();
 504                 synchronized (stateLock) {
 505                     if (!isOpen())
 506                         return 0;
 507                     writerThread = NativeThread.current();
 508                 }
 509                 for (;;) {
 510                     n = IOUtil.write(fd, srcs, offset, length, nd);
 511                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 512                         continue;
 513                     return IOStatus.normalize(n);
 514                 }
 515             } finally {
 516                 writerCleanup();
 517                 IoTrace.socketWriteEnd(traceContext, n > 0 ? n : 0);
 518                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 519                 synchronized (stateLock) {
 520                     if ((n <= 0) && (!isOutputOpen))
 521                         throw new AsynchronousCloseException();
 522                 }
 523                 assert IOStatus.check(n);
 524             }
 525         }
 526     }
 527 
 528     // package-private
 529     int sendOutOfBandData(byte b) throws IOException {
 530         synchronized (writeLock) {
 531             ensureWriteOpen();
 532             int n = 0;
 533             try {
 534                 begin();
 535                 synchronized (stateLock) {
 536                     if (!isOpen())
 537                         return 0;