src/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page
rev 5501 : imported patch io-trace


  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.*;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.*;
  34 import java.util.*;
  35 import sun.net.NetHooks;
  36 
  37 
  38 /**
  39  * An implementation of SocketChannels
  40  */
  41 
  42 class SocketChannelImpl
  43     extends SocketChannel
  44     implements SelChImpl
  45 {
  46 
  47     // Used to make native read and write calls
  48     private static NativeDispatcher nd;
  49 
  50     // Our file descriptor object
  51     private final FileDescriptor fd;
  52 
  53     // fd value needed for dev/poll. This value will remain valid
  54     // even after the value in the file descriptor object has been set to -1
  55     private final int fdVal;
  56 


  63 
  64     // Lock held by current writing or connecting thread
  65     private final Object writeLock = new Object();
  66 
  67     // Lock held by any thread that modifies the state fields declared below
  68     // DO NOT invoke a blocking I/O operation while holding this lock!
  69     private final Object stateLock = new Object();
  70 
  71     // -- The following fields are protected by stateLock
  72 
  73     // State, increases monotonically
  74     private static final int ST_UNINITIALIZED = -1;
  75     private static final int ST_UNCONNECTED = 0;
  76     private static final int ST_PENDING = 1;
  77     private static final int ST_CONNECTED = 2;
  78     private static final int ST_KILLPENDING = 3;
  79     private static final int ST_KILLED = 4;
  80     private int state = ST_UNINITIALIZED;
  81 
  82     // Binding
  83     private SocketAddress localAddress;
  84     private SocketAddress remoteAddress;
  85 
  86     // Input/Output open
  87     private boolean isInputOpen = true;
  88     private boolean isOutputOpen = true;
  89     private boolean readyToConnect = false;
  90 
  91     // Socket adaptor, created on demand
  92     private Socket socket;
  93 
  94     // -- End of fields protected by stateLock
  95 
  96 
  97     // Constructor for normal connecting sockets
  98     //
  99     SocketChannelImpl(SelectorProvider sp) throws IOException {
 100         super(sp);
 101         this.fd = Net.socket(true);
 102         this.fdVal = IOUtil.fdVal(fd);
 103         this.state = ST_UNCONNECTED;
 104     }


 261                 kill();
 262         }
 263     }
 264 
 265     private void writerCleanup() throws IOException {
 266         synchronized (stateLock) {
 267             writerThread = 0;
 268             if (state == ST_KILLPENDING)
 269                 kill();
 270         }
 271     }
 272 
 273     public int read(ByteBuffer buf) throws IOException {
 274 
 275         if (buf == null)
 276             throw new NullPointerException();
 277 
 278         synchronized (readLock) {
 279             if (!ensureReadOpen())
 280                 return -1;





 281             int n = 0;
 282             try {
 283 
 284                 // Set up the interruption machinery; see
 285                 // AbstractInterruptibleChannel for details
 286                 //
 287                 begin();
 288 
 289                 synchronized (stateLock) {
 290                     if (!isOpen()) {
 291                     // Either the current thread is already interrupted, so
 292                     // begin() closed the channel, or another thread closed the
 293                     // channel since we checked it a few bytecodes ago.  In
 294                     // either case the value returned here is irrelevant since
 295                     // the invocation of end() in the finally block will throw
 296                     // an appropriate exception.
 297                     //
 298                         return 0;
 299 
 300                     }


 350                 // so the connection gets cut off as usual).
 351                 //
 352                 // For socket channels there is the additional wrinkle that
 353                 // asynchronous shutdown works much like asynchronous close,
 354                 // except that the channel is shutdown rather than completely
 355                 // closed.  This is analogous to the first two cases above,
 356                 // except that the shutdown operation plays the role of
 357                 // nd.preClose().
 358                 for (;;) {
 359                     n = IOUtil.read(fd, buf, -1, nd, readLock);
 360                     if ((n == IOStatus.INTERRUPTED) && isOpen()) {
 361                         // The system call was interrupted but the channel
 362                         // is still open, so retry
 363                         continue;
 364                     }
 365                     return IOStatus.normalize(n);
 366                 }
 367 
 368             } finally {
 369                 readerCleanup();        // Clear reader thread





 370                 // The end method, which is defined in our superclass
 371                 // AbstractInterruptibleChannel, resets the interruption
 372                 // machinery.  If its argument is true then it returns
 373                 // normally; otherwise it checks the interrupt and open state
 374                 // of this channel and throws an appropriate exception if
 375                 // necessary.
 376                 //
 377                 // So, if we actually managed to do any I/O in the above try
 378                 // block then we pass true to the end method.  We also pass
 379                 // true if the channel was in non-blocking mode when the I/O
 380                 // operation was initiated but no data could be transferred;
 381                 // this prevents spurious exceptions from being thrown in the
 382                 // rare event that a channel is closed or a thread is
 383                 // interrupted at the exact moment that a non-blocking I/O
 384                 // request is made.
 385                 //
 386                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 387 
 388                 // Extra case for socket channels: Asynchronous shutdown
 389                 //
 390                 synchronized (stateLock) {
 391                     if ((n <= 0) && (!isInputOpen))
 392                         return IOStatus.EOF;
 393                 }
 394 
 395                 assert IOStatus.check(n);
 396 
 397             }
 398         }
 399     }
 400 
 401     public long read(ByteBuffer[] dsts, int offset, int length)
 402         throws IOException
 403     {
 404         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 405             throw new IndexOutOfBoundsException();
 406         synchronized (readLock) {
 407             if (!ensureReadOpen())
 408                 return -1;
 409             long n = 0;





 410             try {
 411                 begin();
 412                 synchronized (stateLock) {
 413                     if (!isOpen())
 414                         return 0;
 415                     readerThread = NativeThread.current();
 416                 }
 417 
 418                 for (;;) {
 419                     n = IOUtil.read(fd, dsts, offset, length, nd);
 420                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 421                         continue;
 422                     return IOStatus.normalize(n);
 423                 }
 424             } finally {
 425                 readerCleanup();



 426                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 427                 synchronized (stateLock) {
 428                     if ((n <= 0) && (!isInputOpen))
 429                         return IOStatus.EOF;
 430                 }
 431                 assert IOStatus.check(n);
 432             }
 433         }
 434     }
 435 
 436     public int write(ByteBuffer buf) throws IOException {
 437         if (buf == null)
 438             throw new NullPointerException();
 439         synchronized (writeLock) {
 440             ensureWriteOpen();
 441             int n = 0;




 442             try {
 443                 begin();
 444                 synchronized (stateLock) {
 445                     if (!isOpen())
 446                         return 0;
 447                     writerThread = NativeThread.current();
 448                 }
 449                 for (;;) {
 450                     n = IOUtil.write(fd, buf, -1, nd, writeLock);
 451                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 452                         continue;
 453                     return IOStatus.normalize(n);
 454                 }
 455             } finally {
 456                 writerCleanup();

 457                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 458                 synchronized (stateLock) {
 459                     if ((n <= 0) && (!isOutputOpen))
 460                         throw new AsynchronousCloseException();
 461                 }
 462                 assert IOStatus.check(n);
 463             }
 464         }
 465     }
 466 
 467     public long write(ByteBuffer[] srcs, int offset, int length)
 468         throws IOException
 469     {
 470         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 471             throw new IndexOutOfBoundsException();
 472         synchronized (writeLock) {
 473             ensureWriteOpen();
 474             long n = 0;



 475             try {
 476                 begin();
 477                 synchronized (stateLock) {
 478                     if (!isOpen())
 479                         return 0;
 480                     writerThread = NativeThread.current();
 481                 }
 482                 for (;;) {
 483                     n = IOUtil.write(fd, srcs, offset, length, nd);
 484                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 485                         continue;
 486                     return IOStatus.normalize(n);
 487                 }
 488             } finally {
 489                 writerCleanup();

 490                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 491                 synchronized (stateLock) {
 492                     if ((n <= 0) && (!isOutputOpen))
 493                         throw new AsynchronousCloseException();
 494                 }
 495                 assert IOStatus.check(n);
 496             }
 497         }
 498     }
 499 
 500     // package-private
 501     int sendOutOfBandData(byte b) throws IOException {
 502         synchronized (writeLock) {
 503             ensureWriteOpen();
 504             int n = 0;
 505             try {
 506                 begin();
 507                 synchronized (stateLock) {
 508                     if (!isOpen())
 509                         return 0;




  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.*;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.*;
  34 import java.util.*;
  35 import sun.net.NetHooks;
  36 import sun.misc.IoTrace;
  37 
  38 /**
  39  * An implementation of SocketChannels
  40  */
  41 
  42 class SocketChannelImpl
  43     extends SocketChannel
  44     implements SelChImpl
  45 {
  46 
  47     // Used to make native read and write calls
  48     private static NativeDispatcher nd;
  49 
  50     // Our file descriptor object
  51     private final FileDescriptor fd;
  52 
  53     // fd value needed for dev/poll. This value will remain valid
  54     // even after the value in the file descriptor object has been set to -1
  55     private final int fdVal;
  56 


  63 
  64     // Lock held by current writing or connecting thread
  65     private final Object writeLock = new Object();
  66 
  67     // Lock held by any thread that modifies the state fields declared below
  68     // DO NOT invoke a blocking I/O operation while holding this lock!
  69     private final Object stateLock = new Object();
  70 
  71     // -- The following fields are protected by stateLock
  72 
  73     // State, increases monotonically
  74     private static final int ST_UNINITIALIZED = -1;
  75     private static final int ST_UNCONNECTED = 0;
  76     private static final int ST_PENDING = 1;
  77     private static final int ST_CONNECTED = 2;
  78     private static final int ST_KILLPENDING = 3;
  79     private static final int ST_KILLED = 4;
  80     private int state = ST_UNINITIALIZED;
  81 
  82     // Binding
  83     private InetSocketAddress localAddress;
  84     private InetSocketAddress remoteAddress;
  85 
  86     // Input/Output open
  87     private boolean isInputOpen = true;
  88     private boolean isOutputOpen = true;
  89     private boolean readyToConnect = false;
  90 
  91     // Socket adaptor, created on demand
  92     private Socket socket;
  93 
  94     // -- End of fields protected by stateLock
  95 
  96 
  97     // Constructor for normal connecting sockets
  98     //
  99     SocketChannelImpl(SelectorProvider sp) throws IOException {
 100         super(sp);
 101         this.fd = Net.socket(true);
 102         this.fdVal = IOUtil.fdVal(fd);
 103         this.state = ST_UNCONNECTED;
 104     }


 261                 kill();
 262         }
 263     }
 264 
 265     private void writerCleanup() throws IOException {
 266         synchronized (stateLock) {
 267             writerThread = 0;
 268             if (state == ST_KILLPENDING)
 269                 kill();
 270         }
 271     }
 272 
 273     public int read(ByteBuffer buf) throws IOException {
 274 
 275         if (buf == null)
 276             throw new NullPointerException();
 277 
 278         synchronized (readLock) {
 279             if (!ensureReadOpen())
 280                 return -1;
 281             Object traceContext = null;
 282             if (isBlocking()) {
 283                 traceContext = IoTrace.socketReadBegin(remoteAddress.getAddress(),
 284                                                       remoteAddress.getPort(), 0);
 285             }
 286             int n = 0;
 287             try {
 288 
 289                 // Set up the interruption machinery; see
 290                 // AbstractInterruptibleChannel for details
 291                 //
 292                 begin();
 293 
 294                 synchronized (stateLock) {
 295                     if (!isOpen()) {
 296                     // Either the current thread is already interrupted, so
 297                     // begin() closed the channel, or another thread closed the
 298                     // channel since we checked it a few bytecodes ago.  In
 299                     // either case the value returned here is irrelevant since
 300                     // the invocation of end() in the finally block will throw
 301                     // an appropriate exception.
 302                     //
 303                         return 0;
 304 
 305                     }


 355                 // so the connection gets cut off as usual).
 356                 //
 357                 // For socket channels there is the additional wrinkle that
 358                 // asynchronous shutdown works much like asynchronous close,
 359                 // except that the channel is shutdown rather than completely
 360                 // closed.  This is analogous to the first two cases above,
 361                 // except that the shutdown operation plays the role of
 362                 // nd.preClose().
 363                 for (;;) {
 364                     n = IOUtil.read(fd, buf, -1, nd, readLock);
 365                     if ((n == IOStatus.INTERRUPTED) && isOpen()) {
 366                         // The system call was interrupted but the channel
 367                         // is still open, so retry
 368                         continue;
 369                     }
 370                     return IOStatus.normalize(n);
 371                 }
 372 
 373             } finally {
 374                 readerCleanup();        // Clear reader thread
 375 
 376                 if (isBlocking()) {
 377                     IoTrace.socketReadEnd(traceContext, n > 0 ? n : 0);
 378                 }
 379 
 380                 // The end method, which is defined in our superclass
 381                 // AbstractInterruptibleChannel, resets the interruption
 382                 // machinery.  If its argument is true then it returns
 383                 // normally; otherwise it checks the interrupt and open state
 384                 // of this channel and throws an appropriate exception if
 385                 // necessary.
 386                 //
 387                 // So, if we actually managed to do any I/O in the above try
 388                 // block then we pass true to the end method.  We also pass
 389                 // true if the channel was in non-blocking mode when the I/O
 390                 // operation was initiated but no data could be transferred;
 391                 // this prevents spurious exceptions from being thrown in the
 392                 // rare event that a channel is closed or a thread is
 393                 // interrupted at the exact moment that a non-blocking I/O
 394                 // request is made.
 395                 //
 396                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 397 
 398                 // Extra case for socket channels: Asynchronous shutdown
 399                 //
 400                 synchronized (stateLock) {
 401                     if ((n <= 0) && (!isInputOpen))
 402                         return IOStatus.EOF;
 403                 }
 404 
 405                 assert IOStatus.check(n);
 406 
 407             }
 408         }
 409     }
 410 
 411     public long read(ByteBuffer[] dsts, int offset, int length)
 412         throws IOException
 413     {
 414         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 415             throw new IndexOutOfBoundsException();
 416         synchronized (readLock) {
 417             if (!ensureReadOpen())
 418                 return -1;
 419             long n = 0;
 420             Object traceContext = null;
 421             if (isBlocking()) {
 422                 traceContext = IoTrace.socketReadBegin(remoteAddress.getAddress(),
 423                                                       remoteAddress.getPort(), 0);
 424             }
 425             try {
 426                 begin();
 427                 synchronized (stateLock) {
 428                     if (!isOpen())
 429                         return 0;
 430                     readerThread = NativeThread.current();
 431                 }
 432 
 433                 for (;;) {
 434                     n = IOUtil.read(fd, dsts, offset, length, nd);
 435                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 436                         continue;
 437                     return IOStatus.normalize(n);
 438                 }
 439             } finally {
 440                 readerCleanup();
 441                 if (isBlocking()) {
 442                     IoTrace.socketReadEnd(traceContext, n > 0 ? n : 0);
 443                 }
 444                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 445                 synchronized (stateLock) {
 446                     if ((n <= 0) && (!isInputOpen))
 447                         return IOStatus.EOF;
 448                 }
 449                 assert IOStatus.check(n);
 450             }
 451         }
 452     }
 453 
 454     public int write(ByteBuffer buf) throws IOException {
 455         if (buf == null)
 456             throw new NullPointerException();
 457         synchronized (writeLock) {
 458             ensureWriteOpen();
 459             int n = 0;
 460             Object traceContext =
 461                 IoTrace.socketWriteBegin(remoteAddress.getAddress(),
 462                                          remoteAddress.getPort());
 463 
 464             try {
 465                 begin();
 466                 synchronized (stateLock) {
 467                     if (!isOpen())
 468                         return 0;
 469                     writerThread = NativeThread.current();
 470                 }
 471                 for (;;) {
 472                     n = IOUtil.write(fd, buf, -1, nd, writeLock);
 473                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 474                         continue;
 475                     return IOStatus.normalize(n);
 476                 }
 477             } finally {
 478                 writerCleanup();
 479                 IoTrace.socketWriteEnd(traceContext, n > 0 ? n : 0);
 480                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 481                 synchronized (stateLock) {
 482                     if ((n <= 0) && (!isOutputOpen))
 483                         throw new AsynchronousCloseException();
 484                 }
 485                 assert IOStatus.check(n);
 486             }
 487         }
 488     }
 489 
 490     public long write(ByteBuffer[] srcs, int offset, int length)
 491         throws IOException
 492     {
 493         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 494             throw new IndexOutOfBoundsException();
 495         synchronized (writeLock) {
 496             ensureWriteOpen();
 497             long n = 0;
 498             Object traceContext =
 499                 IoTrace.socketWriteBegin(remoteAddress.getAddress(),
 500                                          remoteAddress.getPort());
 501             try {
 502                 begin();
 503                 synchronized (stateLock) {
 504                     if (!isOpen())
 505                         return 0;
 506                     writerThread = NativeThread.current();
 507                 }
 508                 for (;;) {
 509                     n = IOUtil.write(fd, srcs, offset, length, nd);
 510                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 511                         continue;
 512                     return IOStatus.normalize(n);
 513                 }
 514             } finally {
 515                 writerCleanup();
 516                 IoTrace.socketWriteEnd(traceContext, n > 0 ? n : 0);
 517                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 518                 synchronized (stateLock) {
 519                     if ((n <= 0) && (!isOutputOpen))
 520                         throw new AsynchronousCloseException();
 521                 }
 522                 assert IOStatus.check(n);
 523             }
 524         }
 525     }
 526 
 527     // package-private
 528     int sendOutOfBandData(byte b) throws IOException {
 529         synchronized (writeLock) {
 530             ensureWriteOpen();
 531             int n = 0;
 532             try {
 533                 begin();
 534                 synchronized (stateLock) {
 535                     if (!isOpen())
 536                         return 0;