/* * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Sun designates this * particular file as subject to the "Classpath" exception as provided * by Sun in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, * CA 95054 USA or visit www.sun.com if you need additional information or * have any questions. */ package sun.nio.ch; import java.nio.channels.*; import java.nio.ByteBuffer; import java.nio.BufferOverflowException; import java.net.*; import java.util.concurrent.*; import java.io.IOException; import sun.misc.Unsafe; /** * Windows implementation of AsynchronousSocketChannel using overlapped I/O. */ class WindowsAsynchronousSocketChannelImpl extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static int addressSize = unsafe.addressSize(); private static int dependsArch(int value32, int value64) { return (addressSize == 4) ? value32 : value64; } /* * typedef struct _WSABUF { * u_long len; * char FAR * buf; * } WSABUF; */ private static final int SIZEOF_WSABUF = dependsArch(8, 16); private static final int OFFSETOF_LEN = 0; private static final int OFFSETOF_BUF = dependsArch(4, 8); // maximum vector size for scatter/gather I/O private static final int MAX_WSABUF = 16; private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; // socket handle. Use begin()/end() around each usage of this handle. final long handle; // I/O completion port that the socket is associated with private final Iocp iocp; // completion key to identify channel when I/O completes private final int completionKey; // Pending I/O operations are tied to an OVERLAPPED structure that can only // be released when the I/O completion event is posted to the completion // port. Where I/O operations complete immediately then it is possible // there may be more than two OVERLAPPED structures in use. private final PendingIoCache ioCache; // per-channel arrays of WSABUF structures private final long readBufferArray; private final long writeBufferArray; WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) throws IOException { super(iocp); // associate socket with default completion port long h = IOUtil.fdVal(fd); int key = 0; try { key = iocp.associate(this, h); } catch (ShutdownChannelGroupException x) { if (failIfGroupShutdown) { closesocket0(h); throw x; } } catch (IOException x) { closesocket0(h); throw x; } this.handle = h; this.iocp = iocp; this.completionKey = key; this.ioCache = new PendingIoCache(); // allocate WSABUF arrays this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); } WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { this(iocp, true); } @Override public AsynchronousChannelGroupImpl group() { return iocp; } /** * Invoked by Iocp when an I/O operation competes. */ @Override public PendingFuture getByOverlapped(long overlapped) { return ioCache.remove(overlapped); } // invoked by WindowsAsynchronousServerSocketChannelImpl long handle() { return handle; } // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection // accept void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) { synchronized (stateLock) { state = ST_CONNECTED; this.localAddress = localAddress; this.remoteAddress = remoteAddress; } } @Override void implClose() throws IOException { // close socket (may cause outstanding async I/O operations to fail). closesocket0(handle); // waits until all I/O operations have completed ioCache.close(); // release arrays of WSABUF structures unsafe.freeMemory(readBufferArray); unsafe.freeMemory(writeBufferArray); // finally disassociate from the completion port (key can be 0 if // channel created when group is shutdown) if (completionKey != 0) iocp.disassociate(completionKey); } @Override public void onCancel(PendingFuture task) { if (task.getContext() instanceof ConnectTask) killConnect(); if (task.getContext() instanceof ReadTask) killReading(); if (task.getContext() instanceof WriteTask) killWriting(); } /** * Implements the task to initiate a connection and the handler to * consume the result when the connection is established (or fails). */ private class ConnectTask implements Runnable, Iocp.ResultHandler { private final InetSocketAddress remote; private final PendingFuture result; ConnectTask(InetSocketAddress remote, PendingFuture result) { this.remote = remote; this.result = result; } private void closeChannel() { try { close(); } catch (IOException ignore) { } } private IOException toIOException(Throwable x) { if (x instanceof IOException) { if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); return (IOException)x; } return new IOException(x); } /** * Invoke after a connection is successfully established. */ private void afterConnect() throws IOException { updateConnectContext(handle); synchronized (stateLock) { state = ST_CONNECTED; remoteAddress = remote; } } /** * Task to initiate a connection. */ @Override public void run() { long overlapped = 0L; Throwable exc = null; try { begin(); // synchronize on result to allow this thread handle the case // where the connection is established immediately. synchronized (result) { overlapped = ioCache.add(result); // initiate the connection int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), remote.getPort(), overlapped); if (n == IOStatus.UNAVAILABLE) { // connection is pending return; } // connection established immediately afterConnect(); result.setResult(null); } } catch (Throwable x) { exc = x; } finally { end(); } if (exc != null) { if (overlapped != 0L) ioCache.remove(overlapped); closeChannel(); result.setFailure(toIOException(exc)); } Invoker.invoke(result.handler(), result); } /** * Invoked by handler thread when connection established. */ @Override public void completed(int bytesTransferred) { Throwable exc = null; try { begin(); afterConnect(); result.setResult(null); } catch (Throwable x) { // channel is closed or unable to finish connect exc = x; } finally { end(); } // can't close channel while in begin/end block if (exc != null) { closeChannel(); result.setFailure(toIOException(exc)); } Invoker.invoke(result.handler(), result); } /** * Invoked by handler thread when failed to establish connection. */ @Override public void failed(int error, IOException x) { if (isOpen()) { closeChannel(); result.setFailure(x); } else { result.setFailure(new AsynchronousCloseException()); } Invoker.invoke(result.handler(), result); } } @Override public Future connect(SocketAddress remote, A attachment, CompletionHandler handler) { if (!isOpen()) { CompletedFuture result = CompletedFuture .withFailure(this, new ClosedChannelException(), attachment); Invoker.invoke(handler, result); return result; } InetSocketAddress isa = Net.checkAddress(remote); // permission check SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); // check and update state // ConnectEx requires the socket to be bound to a local address IOException bindException = null; synchronized (stateLock) { if (state == ST_CONNECTED) throw new AlreadyConnectedException(); if (state == ST_PENDING) throw new ConnectionPendingException(); if (localAddress == null) { try { bind(new InetSocketAddress(0)); } catch (IOException x) { bindException = x; } } if (bindException == null) state = ST_PENDING; } // handle bind failure if (bindException != null) { try { close(); } catch (IOException ignore) { } CompletedFuture result = CompletedFuture .withFailure(this, bindException, attachment); Invoker.invoke(handler, result); return result; } // setup task PendingFuture result = new PendingFuture(this, handler, attachment); ConnectTask task = new ConnectTask(isa, result); result.setContext(task); // initiate I/O (can only be done from thread in thread pool) Invoker.invokeOnThreadInThreadPool(this, task); return result; } /** * Implements the task to initiate a read and the handler to consume the * result when the read completes. */ private class ReadTask implements Runnable, Iocp.ResultHandler { private final ByteBuffer[] bufs; private final int numBufs; private final boolean scatteringRead; private final PendingFuture result; // set by run method private ByteBuffer[] shadow; ReadTask(ByteBuffer[] bufs, boolean scatteringRead, PendingFuture result) { this.bufs = bufs; this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; this.scatteringRead = scatteringRead; this.result = result; } /** * Invoked prior to read to prepare the WSABUF array. Where necessary, * it substitutes non-direct buffers with direct buffers. */ void prepareBuffers() { shadow = new ByteBuffer[numBufs]; long address = readBufferArray; for (int i=0; i= len) { bytesRead -= len; int newPosition = pos + len; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by another } } else { // Buffers not completely filled if (bytesRead > 0) { assert(pos + bytesRead < (long)Integer.MAX_VALUE); int newPosition = pos + bytesRead; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by another } } break; } } // Put results from shadow into the slow buffers for (int i=0; i Future readImpl(ByteBuffer[] bufs, boolean scatteringRead, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { // setup task PendingFuture result = new PendingFuture(this, handler, attachment); final ReadTask readTask = new ReadTask(bufs, scatteringRead, result); result.setContext(readTask); // schedule timeout if (timeout > 0L) { Future timeoutTask = iocp.schedule(new Runnable() { public void run() { readTask.timeout(); } }, timeout, unit); result.setTimeoutTask(timeoutTask); } // initiate I/O (can only be done from thread in thread pool) Invoker.invokeOnThreadInThreadPool(this, readTask); return result; } /** * Implements the task to initiate a write and the handler to consume the * result when the write completes. */ private class WriteTask implements Runnable, Iocp.ResultHandler { private final ByteBuffer[] bufs; private final int numBufs; private final boolean gatheringWrite; private final PendingFuture result; // set by run method private ByteBuffer[] shadow; WriteTask(ByteBuffer[] bufs, boolean gatheringWrite, PendingFuture result) { this.bufs = bufs; this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; this.gatheringWrite = gatheringWrite; this.result = result; } /** * Invoked prior to write to prepare the WSABUF array. Where necessary, * it substitutes non-direct buffers with direct buffers. */ void prepareBuffers() { shadow = new ByteBuffer[numBufs]; long address = writeBufferArray; for (int i=0; i= len) { bytesWritten -= len; int newPosition = pos + len; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by someone else } } else { // Buffers not completely filled if (bytesWritten > 0) { assert(pos + bytesWritten < (long)Integer.MAX_VALUE); int newPosition = pos + bytesWritten; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by someone else } } break; } } } void releaseBuffers() { for (int i=0; i Future writeImpl(ByteBuffer[] bufs, boolean gatheringWrite, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { // setup task PendingFuture result = new PendingFuture(this, handler, attachment); final WriteTask writeTask = new WriteTask(bufs, gatheringWrite, result); result.setContext(writeTask); // schedule timeout if (timeout > 0L) { Future timeoutTask = iocp.schedule(new Runnable() { public void run() { writeTask.timeout(); } }, timeout, unit); result.setTimeoutTask(timeoutTask); } // initiate I/O (can only be done from thread in thread pool) Invoker.invokeOnThreadInThreadPool(this, writeTask); return result; } // -- Native methods -- private static native void initIDs(); private static native int connect0(long socket, boolean preferIPv6, InetAddress remote, int remotePort, long overlapped) throws IOException; private static native void updateConnectContext(long socket) throws IOException; private static native int read0(long socket, int count, long addres, long overlapped) throws IOException; private static native int write0(long socket, int count, long address, long overlapped) throws IOException; private static native void shutdown0(long socket, int how) throws IOException; private static native void closesocket0(long socket) throws IOException; static { Util.load(); initIDs(); } }