/* * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package sun.nio.ch; import java.nio.channels.*; import java.nio.ByteBuffer; import java.nio.BufferOverflowException; import java.net.*; import java.util.concurrent.*; import java.io.IOException; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import jdk.internal.misc.Unsafe; /** * Windows implementation of AsynchronousSocketChannel using overlapped I/O. */ class WindowsAsynchronousSocketChannelImpl extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static int addressSize = unsafe.addressSize(); private static int dependsArch(int value32, int value64) { return (addressSize == 4) ? value32 : value64; } /* * typedef struct _WSABUF { * u_long len; * char FAR * buf; * } WSABUF; */ private static final int SIZEOF_WSABUF = dependsArch(8, 16); private static final int OFFSETOF_LEN = 0; private static final int OFFSETOF_BUF = dependsArch(4, 8); // maximum vector size for scatter/gather I/O private static final int MAX_WSABUF = 16; private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF; // socket handle. Use begin()/end() around each usage of this handle. final long handle; // I/O completion port that the socket is associated with private final Iocp iocp; // completion key to identify channel when I/O completes private final int completionKey; // Pending I/O operations are tied to an OVERLAPPED structure that can only // be released when the I/O completion event is posted to the completion // port. Where I/O operations complete immediately then it is possible // there may be more than two OVERLAPPED structures in use. private final PendingIoCache ioCache; // per-channel arrays of WSABUF structures private final long readBufferArray; private final long writeBufferArray; WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown) throws IOException { super(iocp); // associate socket with default completion port long h = IOUtil.fdVal(fd); int key = 0; try { key = iocp.associate(this, h); } catch (ShutdownChannelGroupException x) { if (failIfGroupShutdown) { closesocket0(h); throw x; } } catch (IOException x) { closesocket0(h); throw x; } this.handle = h; this.iocp = iocp; this.completionKey = key; this.ioCache = new PendingIoCache(); // allocate WSABUF arrays this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY); } WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException { this(iocp, true); } @Override public AsynchronousChannelGroupImpl group() { return iocp; } /** * Invoked by Iocp when an I/O operation competes. */ @Override public PendingFuture getByOverlapped(long overlapped) { return ioCache.remove(overlapped); } // invoked by WindowsAsynchronousServerSocketChannelImpl long handle() { return handle; } // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection // accept void setConnected(InetSocketAddress localAddress, InetSocketAddress remoteAddress) { synchronized (stateLock) { state = ST_CONNECTED; this.localAddress = localAddress; this.remoteAddress = remoteAddress; } } @Override void implClose() throws IOException { // close socket (may cause outstanding async I/O operations to fail). closesocket0(handle); // waits until all I/O operations have completed ioCache.close(); // release arrays of WSABUF structures unsafe.freeMemory(readBufferArray); unsafe.freeMemory(writeBufferArray); // finally disassociate from the completion port (key can be 0 if // channel created when group is shutdown) if (completionKey != 0) iocp.disassociate(completionKey); } @Override public void onCancel(PendingFuture task) { if (task.getContext() instanceof ConnectTask) killConnect(); if (task.getContext() instanceof ReadTask) killReading(); if (task.getContext() instanceof WriteTask) killWriting(); } /** * Implements the task to initiate a connection and the handler to * consume the result when the connection is established (or fails). */ private class ConnectTask implements Runnable, Iocp.ResultHandler { private final InetSocketAddress remote; private final PendingFuture result; ConnectTask(InetSocketAddress remote, PendingFuture result) { this.remote = remote; this.result = result; } private void closeChannel() { try { close(); } catch (IOException ignore) { } } private IOException toIOException(Throwable x) { if (x instanceof IOException) { if (x instanceof ClosedChannelException) x = new AsynchronousCloseException(); return (IOException)x; } return new IOException(x); } /** * Invoke after a connection is successfully established. */ private void afterConnect() throws IOException { updateConnectContext(handle); synchronized (stateLock) { state = ST_CONNECTED; remoteAddress = remote; } } /** * Task to initiate a connection. */ @Override public void run() { long overlapped = 0L; Throwable exc = null; try { begin(); // synchronize on result to allow this thread handle the case // where the connection is established immediately. synchronized (result) { overlapped = ioCache.add(result); // initiate the connection int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(), remote.getPort(), overlapped); if (n == IOStatus.UNAVAILABLE) { // connection is pending return; } // connection established immediately afterConnect(); result.setResult(null); } } catch (Throwable x) { if (overlapped != 0L) ioCache.remove(overlapped); exc = x; } finally { end(); } if (exc != null) { closeChannel(); result.setFailure(toIOException(exc)); } Invoker.invoke(result); } /** * Invoked by handler thread when connection established. */ @Override public void completed(int bytesTransferred, boolean canInvokeDirect) { Throwable exc = null; try { begin(); afterConnect(); result.setResult(null); } catch (Throwable x) { // channel is closed or unable to finish connect exc = x; } finally { end(); } // can't close channel while in begin/end block if (exc != null) { closeChannel(); result.setFailure(toIOException(exc)); } if (canInvokeDirect) { Invoker.invokeUnchecked(result); } else { Invoker.invoke(result); } } /** * Invoked by handler thread when failed to establish connection. */ @Override public void failed(int error, IOException x) { if (isOpen()) { closeChannel(); result.setFailure(x); } else { result.setFailure(new AsynchronousCloseException()); } Invoker.invoke(result); } } private void doPrivilegedBind(final SocketAddress sa) throws IOException { try { AccessController.doPrivileged(new PrivilegedExceptionAction() { public Void run() throws IOException { bind(sa); return null; } }); } catch (PrivilegedActionException e) { throw (IOException) e.getException(); } } @Override Future implConnect(SocketAddress remote, A attachment, CompletionHandler handler) { if (!isOpen()) { Throwable exc = new ClosedChannelException(); if (handler == null) return CompletedFuture.withFailure(exc); Invoker.invoke(this, handler, attachment, null, exc); return null; } InetSocketAddress isa = Net.checkAddress(remote); // permission check SecurityManager sm = System.getSecurityManager(); if (sm != null) sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); // check and update state // ConnectEx requires the socket to be bound to a local address IOException bindException = null; synchronized (stateLock) { if (state == ST_CONNECTED) throw new AlreadyConnectedException(); if (state == ST_PENDING) throw new ConnectionPendingException(); if (localAddress == null) { try { SocketAddress any = new InetSocketAddress(0); if (sm == null) { bind(any); } else { doPrivilegedBind(any); } } catch (IOException x) { bindException = x; } } if (bindException == null) state = ST_PENDING; } // handle bind failure if (bindException != null) { try { close(); } catch (IOException ignore) { } if (handler == null) return CompletedFuture.withFailure(bindException); Invoker.invoke(this, handler, attachment, null, bindException); return null; } // setup task PendingFuture result = new PendingFuture(this, handler, attachment); ConnectTask task = new ConnectTask(isa, result); result.setContext(task); // initiate I/O if (Iocp.supportsThreadAgnosticIo()) { task.run(); } else { Invoker.invokeOnThreadInThreadPool(this, task); } return result; } /** * Implements the task to initiate a read and the handler to consume the * result when the read completes. */ private class ReadTask implements Runnable, Iocp.ResultHandler { private final ByteBuffer[] bufs; private final int numBufs; private final boolean scatteringRead; private final PendingFuture result; // set by run method private ByteBuffer[] shadow; ReadTask(ByteBuffer[] bufs, boolean scatteringRead, PendingFuture result) { this.bufs = bufs; this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; this.scatteringRead = scatteringRead; this.result = result; } /** * Invoked prior to read to prepare the WSABUF array. Where necessary, * it substitutes non-direct buffers with direct buffers. */ void prepareBuffers() { shadow = new ByteBuffer[numBufs]; long address = readBufferArray; for (int i=0; i= len) { bytesRead -= len; int newPosition = pos + len; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by another } } else { // Buffers not completely filled if (bytesRead > 0) { assert(pos + bytesRead < (long)Integer.MAX_VALUE); int newPosition = pos + bytesRead; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by another } } break; } } // Put results from shadow into the slow buffers for (int i=0; i Future implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { // setup task PendingFuture result = new PendingFuture(this, handler, attachment); ByteBuffer[] bufs; if (isScatteringRead) { bufs = dsts; } else { bufs = new ByteBuffer[1]; bufs[0] = dst; } final ReadTask readTask = new ReadTask(bufs, isScatteringRead, result); result.setContext(readTask); // schedule timeout if (timeout > 0L) { Future timeoutTask = iocp.schedule(new Runnable() { public void run() { readTask.timeout(); } }, timeout, unit); result.setTimeoutTask(timeoutTask); } // initiate I/O if (Iocp.supportsThreadAgnosticIo()) { readTask.run(); } else { Invoker.invokeOnThreadInThreadPool(this, readTask); } return result; } /** * Implements the task to initiate a write and the handler to consume the * result when the write completes. */ private class WriteTask implements Runnable, Iocp.ResultHandler { private final ByteBuffer[] bufs; private final int numBufs; private final boolean gatheringWrite; private final PendingFuture result; // set by run method private ByteBuffer[] shadow; WriteTask(ByteBuffer[] bufs, boolean gatheringWrite, PendingFuture result) { this.bufs = bufs; this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length; this.gatheringWrite = gatheringWrite; this.result = result; } /** * Invoked prior to write to prepare the WSABUF array. Where necessary, * it substitutes non-direct buffers with direct buffers. */ void prepareBuffers() { shadow = new ByteBuffer[numBufs]; long address = writeBufferArray; for (int i=0; i= len) { bytesWritten -= len; int newPosition = pos + len; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by someone else } } else { // Buffers not completely filled if (bytesWritten > 0) { assert(pos + bytesWritten < (long)Integer.MAX_VALUE); int newPosition = pos + bytesWritten; try { nextBuffer.position(newPosition); } catch (IllegalArgumentException x) { // position changed by someone else } } break; } } } void releaseBuffers() { for (int i=0; i Future implWrite(boolean gatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { // setup task PendingFuture result = new PendingFuture(this, handler, attachment); ByteBuffer[] bufs; if (gatheringWrite) { bufs = srcs; } else { bufs = new ByteBuffer[1]; bufs[0] = src; } final WriteTask writeTask = new WriteTask(bufs, gatheringWrite, result); result.setContext(writeTask); // schedule timeout if (timeout > 0L) { Future timeoutTask = iocp.schedule(new Runnable() { public void run() { writeTask.timeout(); } }, timeout, unit); result.setTimeoutTask(timeoutTask); } // initiate I/O (can only be done from thread in thread pool) // initiate I/O if (Iocp.supportsThreadAgnosticIo()) { writeTask.run(); } else { Invoker.invokeOnThreadInThreadPool(this, writeTask); } return result; } // -- Native methods -- private static native void initIDs(); private static native int connect0(long socket, boolean preferIPv6, InetAddress remote, int remotePort, long overlapped) throws IOException; private static native void updateConnectContext(long socket) throws IOException; private static native int read0(long socket, int count, long addres, long overlapped) throws IOException; private static native int write0(long socket, int count, long address, long overlapped) throws IOException; private static native void shutdown0(long socket, int how) throws IOException; private static native void closesocket0(long socket) throws IOException; static { IOUtil.load(); initIDs(); } }