src/share/classes/sun/nio/ch/SocketChannelImpl.java

Print this page
rev 6052 : [mq]: jdk.patch


  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.*;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.*;
  34 import java.util.*;
  35 import sun.net.NetHooks;
  36 
  37 
  38 /**
  39  * An implementation of SocketChannels
  40  */
  41 
  42 class SocketChannelImpl
  43     extends SocketChannel
  44     implements SelChImpl
  45 {
  46 
  47     // Used to make native read and write calls
  48     private static NativeDispatcher nd;
  49 
  50     // Our file descriptor object
  51     private final FileDescriptor fd;
  52 
  53     // fd value needed for dev/poll. This value will remain valid
  54     // even after the value in the file descriptor object has been set to -1
  55     private final int fdVal;
  56 


  63 
  64     // Lock held by current writing or connecting thread
  65     private final Object writeLock = new Object();
  66 
  67     // Lock held by any thread that modifies the state fields declared below
  68     // DO NOT invoke a blocking I/O operation while holding this lock!
  69     private final Object stateLock = new Object();
  70 
  71     // -- The following fields are protected by stateLock
  72 
  73     // State, increases monotonically
  74     private static final int ST_UNINITIALIZED = -1;
  75     private static final int ST_UNCONNECTED = 0;
  76     private static final int ST_PENDING = 1;
  77     private static final int ST_CONNECTED = 2;
  78     private static final int ST_KILLPENDING = 3;
  79     private static final int ST_KILLED = 4;
  80     private int state = ST_UNINITIALIZED;
  81 
  82     // Binding
  83     private SocketAddress localAddress;
  84     private SocketAddress remoteAddress;
  85 
  86     // Input/Output open
  87     private boolean isInputOpen = true;
  88     private boolean isOutputOpen = true;
  89     private boolean readyToConnect = false;
  90 
  91     // Socket adaptor, created on demand
  92     private Socket socket;
  93 
  94     // -- End of fields protected by stateLock
  95 
  96 
  97     // Constructor for normal connecting sockets
  98     //
  99     SocketChannelImpl(SelectorProvider sp) throws IOException {
 100         super(sp);
 101         this.fd = Net.socket(true);
 102         this.fdVal = IOUtil.fdVal(fd);
 103         this.state = ST_UNCONNECTED;
 104     }


 261                 kill();
 262         }
 263     }
 264 
 265     private void writerCleanup() throws IOException {
 266         synchronized (stateLock) {
 267             writerThread = 0;
 268             if (state == ST_KILLPENDING)
 269                 kill();
 270         }
 271     }
 272 
 273     public int read(ByteBuffer buf) throws IOException {
 274 
 275         if (buf == null)
 276             throw new NullPointerException();
 277 
 278         synchronized (readLock) {
 279             if (!ensureReadOpen())
 280                 return -1;





 281             int n = 0;
 282             try {
 283 
 284                 // Set up the interruption machinery; see
 285                 // AbstractInterruptibleChannel for details
 286                 //
 287                 begin();
 288 
 289                 synchronized (stateLock) {
 290                     if (!isOpen()) {
 291                     // Either the current thread is already interrupted, so
 292                     // begin() closed the channel, or another thread closed the
 293                     // channel since we checked it a few bytecodes ago.  In
 294                     // either case the value returned here is irrelevant since
 295                     // the invocation of end() in the finally block will throw
 296                     // an appropriate exception.
 297                     //
 298                         return 0;
 299 
 300                     }


 368             } finally {
 369                 readerCleanup();        // Clear reader thread
 370                 // The end method, which is defined in our superclass
 371                 // AbstractInterruptibleChannel, resets the interruption
 372                 // machinery.  If its argument is true then it returns
 373                 // normally; otherwise it checks the interrupt and open state
 374                 // of this channel and throws an appropriate exception if
 375                 // necessary.
 376                 //
 377                 // So, if we actually managed to do any I/O in the above try
 378                 // block then we pass true to the end method.  We also pass
 379                 // true if the channel was in non-blocking mode when the I/O
 380                 // operation was initiated but no data could be transferred;
 381                 // this prevents spurious exceptions from being thrown in the
 382                 // rare event that a channel is closed or a thread is
 383                 // interrupted at the exact moment that a non-blocking I/O
 384                 // request is made.
 385                 //
 386                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 387 


 388                 // Extra case for socket channels: Asynchronous shutdown
 389                 //
 390                 synchronized (stateLock) {
 391                     if ((n <= 0) && (!isInputOpen))
 392                         return IOStatus.EOF;
 393                 }
 394 
 395                 assert IOStatus.check(n);
 396 
 397             }
 398         }
 399     }
 400 
 401     public long read(ByteBuffer[] dsts, int offset, int length)
 402         throws IOException
 403     {
 404         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 405             throw new IndexOutOfBoundsException();
 406         synchronized (readLock) {
 407             if (!ensureReadOpen())
 408                 return -1;
 409             long n = 0;





 410             try {
 411                 begin();
 412                 synchronized (stateLock) {
 413                     if (!isOpen())
 414                         return 0;
 415                     readerThread = NativeThread.current();
 416                 }
 417 
 418                 for (;;) {
 419                     n = IOUtil.read(fd, dsts, offset, length, nd);
 420                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 421                         continue;
 422                     return IOStatus.normalize(n);
 423                 }
 424             } finally {
 425                 readerCleanup();
 426                 end(n > 0 || (n == IOStatus.UNAVAILABLE));

 427                 synchronized (stateLock) {
 428                     if ((n <= 0) && (!isInputOpen))
 429                         return IOStatus.EOF;
 430                 }
 431                 assert IOStatus.check(n);
 432             }
 433         }
 434     }
 435 
 436     public int write(ByteBuffer buf) throws IOException {
 437         if (buf == null)
 438             throw new NullPointerException();
 439         synchronized (writeLock) {
 440             ensureWriteOpen();
 441             int n = 0;




 442             try {
 443                 begin();
 444                 synchronized (stateLock) {
 445                     if (!isOpen())
 446                         return 0;
 447                     writerThread = NativeThread.current();
 448                 }
 449                 for (;;) {
 450                     n = IOUtil.write(fd, buf, -1, nd, writeLock);
 451                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 452                         continue;
 453                     return IOStatus.normalize(n);
 454                 }
 455             } finally {
 456                 writerCleanup();
 457                 end(n > 0 || (n == IOStatus.UNAVAILABLE));

 458                 synchronized (stateLock) {
 459                     if ((n <= 0) && (!isOutputOpen))
 460                         throw new AsynchronousCloseException();
 461                 }
 462                 assert IOStatus.check(n);
 463             }
 464         }
 465     }
 466 
 467     public long write(ByteBuffer[] srcs, int offset, int length)
 468         throws IOException
 469     {
 470         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 471             throw new IndexOutOfBoundsException();
 472         synchronized (writeLock) {
 473             ensureWriteOpen();
 474             long n = 0;



 475             try {
 476                 begin();
 477                 synchronized (stateLock) {
 478                     if (!isOpen())
 479                         return 0;
 480                     writerThread = NativeThread.current();
 481                 }
 482                 for (;;) {
 483                     n = IOUtil.write(fd, srcs, offset, length, nd);
 484                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 485                         continue;
 486                     return IOStatus.normalize(n);
 487                 }
 488             } finally {
 489                 writerCleanup();
 490                 end((n > 0) || (n == IOStatus.UNAVAILABLE));

 491                 synchronized (stateLock) {
 492                     if ((n <= 0) && (!isOutputOpen))
 493                         throw new AsynchronousCloseException();
 494                 }
 495                 assert IOStatus.check(n);
 496             }
 497         }
 498     }
 499 
 500     // package-private
 501     int sendOutOfBandData(byte b) throws IOException {
 502         synchronized (writeLock) {
 503             ensureWriteOpen();
 504             int n = 0;
 505             try {
 506                 begin();
 507                 synchronized (stateLock) {
 508                     if (!isOpen())
 509                         return 0;
 510                     writerThread = NativeThread.current();




  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.io.FileDescriptor;
  29 import java.io.IOException;
  30 import java.net.*;
  31 import java.nio.ByteBuffer;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.*;
  34 import java.util.*;
  35 import sun.net.NetHooks;
  36 import sun.misc.IoTrace;
  37 
  38 /**
  39  * An implementation of SocketChannels
  40  */
  41 
  42 class SocketChannelImpl
  43     extends SocketChannel
  44     implements SelChImpl
  45 {
  46 
  47     // Used to make native read and write calls
  48     private static NativeDispatcher nd;
  49 
  50     // Our file descriptor object
  51     private final FileDescriptor fd;
  52 
  53     // fd value needed for dev/poll. This value will remain valid
  54     // even after the value in the file descriptor object has been set to -1
  55     private final int fdVal;
  56 


  63 
  64     // Lock held by current writing or connecting thread
  65     private final Object writeLock = new Object();
  66 
  67     // Lock held by any thread that modifies the state fields declared below
  68     // DO NOT invoke a blocking I/O operation while holding this lock!
  69     private final Object stateLock = new Object();
  70 
  71     // -- The following fields are protected by stateLock
  72 
  73     // State, increases monotonically
  74     private static final int ST_UNINITIALIZED = -1;
  75     private static final int ST_UNCONNECTED = 0;
  76     private static final int ST_PENDING = 1;
  77     private static final int ST_CONNECTED = 2;
  78     private static final int ST_KILLPENDING = 3;
  79     private static final int ST_KILLED = 4;
  80     private int state = ST_UNINITIALIZED;
  81 
  82     // Binding
  83     private InetSocketAddress localAddress;
  84     private InetSocketAddress remoteAddress;
  85 
  86     // Input/Output open
  87     private boolean isInputOpen = true;
  88     private boolean isOutputOpen = true;
  89     private boolean readyToConnect = false;
  90 
  91     // Socket adaptor, created on demand
  92     private Socket socket;
  93 
  94     // -- End of fields protected by stateLock
  95 
  96 
  97     // Constructor for normal connecting sockets
  98     //
  99     SocketChannelImpl(SelectorProvider sp) throws IOException {
 100         super(sp);
 101         this.fd = Net.socket(true);
 102         this.fdVal = IOUtil.fdVal(fd);
 103         this.state = ST_UNCONNECTED;
 104     }


 261                 kill();
 262         }
 263     }
 264 
 265     private void writerCleanup() throws IOException {
 266         synchronized (stateLock) {
 267             writerThread = 0;
 268             if (state == ST_KILLPENDING)
 269                 kill();
 270         }
 271     }
 272 
 273     public int read(ByteBuffer buf) throws IOException {
 274 
 275         if (buf == null)
 276             throw new NullPointerException();
 277 
 278         synchronized (readLock) {
 279             if (!ensureReadOpen())
 280                 return -1;
 281             Object traceHandle = null;
 282             if (isBlocking()) {
 283                 traceHandle = IoTrace.socketReadBegin(remoteAddress.getAddress(),
 284                                                       remoteAddress.getPort(), 0);
 285             }
 286             int n = 0;
 287             try {
 288 
 289                 // Set up the interruption machinery; see
 290                 // AbstractInterruptibleChannel for details
 291                 //
 292                 begin();
 293 
 294                 synchronized (stateLock) {
 295                     if (!isOpen()) {
 296                     // Either the current thread is already interrupted, so
 297                     // begin() closed the channel, or another thread closed the
 298                     // channel since we checked it a few bytecodes ago.  In
 299                     // either case the value returned here is irrelevant since
 300                     // the invocation of end() in the finally block will throw
 301                     // an appropriate exception.
 302                     //
 303                         return 0;
 304 
 305                     }


 373             } finally {
 374                 readerCleanup();        // Clear reader thread
 375                 // The end method, which is defined in our superclass
 376                 // AbstractInterruptibleChannel, resets the interruption
 377                 // machinery.  If its argument is true then it returns
 378                 // normally; otherwise it checks the interrupt and open state
 379                 // of this channel and throws an appropriate exception if
 380                 // necessary.
 381                 //
 382                 // So, if we actually managed to do any I/O in the above try
 383                 // block then we pass true to the end method.  We also pass
 384                 // true if the channel was in non-blocking mode when the I/O
 385                 // operation was initiated but no data could be transferred;
 386                 // this prevents spurious exceptions from being thrown in the
 387                 // rare event that a channel is closed or a thread is
 388                 // interrupted at the exact moment that a non-blocking I/O
 389                 // request is made.
 390                 //
 391                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 392 
 393                 IoTrace.socketReadEnd(traceHandle, IOStatus.normalize(n));
 394 
 395                 // Extra case for socket channels: Asynchronous shutdown
 396                 //
 397                 synchronized (stateLock) {
 398                     if ((n <= 0) && (!isInputOpen))
 399                         return IOStatus.EOF;
 400                 }
 401 
 402                 assert IOStatus.check(n);
 403 
 404             }
 405         }
 406     }
 407 
 408     public long read(ByteBuffer[] dsts, int offset, int length)
 409         throws IOException
 410     {
 411         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
 412             throw new IndexOutOfBoundsException();
 413         synchronized (readLock) {
 414             if (!ensureReadOpen())
 415                 return -1;
 416             long n = 0;
 417             Object traceHandle = null;
 418             if (isBlocking()) {
 419                 traceHandle = IoTrace.socketReadBegin(remoteAddress.getAddress(),
 420                                                       remoteAddress.getPort(), 0);
 421             }
 422             try {
 423                 begin();
 424                 synchronized (stateLock) {
 425                     if (!isOpen())
 426                         return 0;
 427                     readerThread = NativeThread.current();
 428                 }
 429 
 430                 for (;;) {
 431                     n = IOUtil.read(fd, dsts, offset, length, nd);
 432                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 433                         continue;
 434                     return IOStatus.normalize(n);
 435                 }
 436             } finally {
 437                 readerCleanup();
 438                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 439                 IoTrace.socketReadEnd(traceHandle, IOStatus.normalize(n));
 440                 synchronized (stateLock) {
 441                     if ((n <= 0) && (!isInputOpen))
 442                         return IOStatus.EOF;
 443                 }
 444                 assert IOStatus.check(n);
 445             }
 446         }
 447     }
 448 
 449     public int write(ByteBuffer buf) throws IOException {
 450         if (buf == null)
 451             throw new NullPointerException();
 452         synchronized (writeLock) {
 453             ensureWriteOpen();
 454             int n = 0;
 455             Object traceHandle =
 456                 IoTrace.socketWriteBegin(remoteAddress.getAddress(),
 457                                          remoteAddress.getPort());
 458 
 459             try {
 460                 begin();
 461                 synchronized (stateLock) {
 462                     if (!isOpen())
 463                         return 0;
 464                     writerThread = NativeThread.current();
 465                 }
 466                 for (;;) {
 467                     n = IOUtil.write(fd, buf, -1, nd, writeLock);
 468                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 469                         continue;
 470                     return IOStatus.normalize(n);
 471                 }
 472             } finally {
 473                 writerCleanup();
 474                 end(n > 0 || (n == IOStatus.UNAVAILABLE));
 475                 IoTrace.socketWriteEnd(traceHandle, IOStatus.normalize(n));
 476                 synchronized (stateLock) {
 477                     if ((n <= 0) && (!isOutputOpen))
 478                         throw new AsynchronousCloseException();
 479                 }
 480                 assert IOStatus.check(n);
 481             }
 482         }
 483     }
 484 
 485     public long write(ByteBuffer[] srcs, int offset, int length)
 486         throws IOException
 487     {
 488         if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
 489             throw new IndexOutOfBoundsException();
 490         synchronized (writeLock) {
 491             ensureWriteOpen();
 492             long n = 0;
 493             Object traceHandle =
 494                 IoTrace.socketWriteBegin(remoteAddress.getAddress(),
 495                                          remoteAddress.getPort());
 496             try {
 497                 begin();
 498                 synchronized (stateLock) {
 499                     if (!isOpen())
 500                         return 0;
 501                     writerThread = NativeThread.current();
 502                 }
 503                 for (;;) {
 504                     n = IOUtil.write(fd, srcs, offset, length, nd);
 505                     if ((n == IOStatus.INTERRUPTED) && isOpen())
 506                         continue;
 507                     return IOStatus.normalize(n);
 508                 }
 509             } finally {
 510                 writerCleanup();
 511                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
 512                 IoTrace.socketWriteEnd(traceHandle, IOStatus.normalize(n));
 513                 synchronized (stateLock) {
 514                     if ((n <= 0) && (!isOutputOpen))
 515                         throw new AsynchronousCloseException();
 516                 }
 517                 assert IOStatus.check(n);
 518             }
 519         }
 520     }
 521 
 522     // package-private
 523     int sendOutOfBandData(byte b) throws IOException {
 524         synchronized (writeLock) {
 525             ensureWriteOpen();
 526             int n = 0;
 527             try {
 528                 begin();
 529                 synchronized (stateLock) {
 530                     if (!isOpen())
 531                         return 0;
 532                     writerThread = NativeThread.current();