1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.nio.channels.spi.AsynchronousChannelProvider;
  30 import java.io.Closeable;
  31 import java.io.IOException;
  32 import java.io.FileDescriptor;
  33 import java.util.*;
  34 import java.util.concurrent.*;
  35 import java.util.concurrent.locks.ReadWriteLock;
  36 import java.util.concurrent.locks.ReentrantReadWriteLock;
  37 import jdk.internal.misc.Unsafe;
  38 
  39 /**
  40  * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
  41  * completion port.
  42  */
  43 
  44 class Iocp extends AsynchronousChannelGroupImpl {
  45     private static final Unsafe unsafe = Unsafe.getUnsafe();
  46     private static final long INVALID_HANDLE_VALUE  = -1L;
  47 
  48     // maps completion key to channel
  49     private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
  50     private final Map<Integer,OverlappedChannel> keyToChannel =
  51         new HashMap<Integer,OverlappedChannel>();
  52     private int nextCompletionKey;
  53 
  54     // handle to completion port
  55     private final long port;
  56 
  57     // true if port has been closed
  58     private boolean closed;
  59 
  60     // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
  61     // relate to I/O operations where the completion notification was not
  62     // received in a timely manner after the channel is closed.
  63     private final Set<Long> staleIoSet = new HashSet<Long>();
  64 
  65     Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
  66         throws IOException
  67     {
  68         super(provider, pool);
  69         this.port =
  70           createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
  71         this.nextCompletionKey = 1;
  72     }
  73 
  74     Iocp start() {
  75         startThreads(new EventHandlerTask());
  76         return this;
  77     }
  78 
  79     /*
  80      * Channels implements this interface support overlapped I/O and can be
  81      * associated with a completion port.
  82      */
  83     static interface OverlappedChannel extends Closeable {
  84         /**
  85          * Returns a reference to the pending I/O result.
  86          */
  87         <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
  88     }
  89 
  90     // release all resources
  91     void implClose() {
  92         synchronized (this) {
  93             if (closed)
  94                 return;
  95             closed = true;
  96         }
  97         close0(port);
  98         synchronized (staleIoSet) {
  99             for (Long ov: staleIoSet) {
 100                 unsafe.freeMemory(ov);
 101             }
 102             staleIoSet.clear();
 103         }
 104     }
 105 
 106     @Override
 107     boolean isEmpty() {
 108         keyToChannelLock.writeLock().lock();
 109         try {
 110             return keyToChannel.isEmpty();
 111         } finally {
 112             keyToChannelLock.writeLock().unlock();
 113         }
 114     }
 115 
 116     @Override
 117     final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
 118         throws IOException
 119     {
 120         int key = associate(new OverlappedChannel() {
 121             public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 122                 return null;
 123             }
 124             public void close() throws IOException {
 125                 channel.close();
 126             }
 127         }, 0L);
 128         return Integer.valueOf(key);
 129     }
 130 
 131     @Override
 132     final void detachForeignChannel(Object key) {
 133         disassociate((Integer)key);
 134     }
 135 
 136     @Override
 137     void closeAllChannels() {
 138         /**
 139          * On Windows the close operation will close the socket/file handle
 140          * and then wait until all outstanding I/O operations have aborted.
 141          * This is necessary as each channel's cache of OVERLAPPED structures
 142          * can only be freed once all I/O operations have completed. As I/O
 143          * completion requires a lookup of the keyToChannel then we must close
 144          * the channels when not holding the write lock.
 145          */
 146         final int MAX_BATCH_SIZE = 32;
 147         OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
 148         int count;
 149         do {
 150             // grab a batch of up to 32 channels
 151             keyToChannelLock.writeLock().lock();
 152             count = 0;
 153             try {
 154                 for (Integer key: keyToChannel.keySet()) {
 155                     channels[count++] = keyToChannel.get(key);
 156                     if (count >= MAX_BATCH_SIZE)
 157                         break;
 158                 }
 159             } finally {
 160                 keyToChannelLock.writeLock().unlock();
 161             }
 162 
 163             // close them
 164             for (int i=0; i<count; i++) {
 165                 try {
 166                     channels[i].close();
 167                 } catch (IOException ignore) { }
 168             }
 169         } while (count > 0);
 170     }
 171 
 172     private void wakeup() {
 173         try {
 174             postQueuedCompletionStatus(port, 0);
 175         } catch (IOException e) {
 176             // should not happen
 177             throw new AssertionError(e);
 178         }
 179     }
 180 
 181     @Override
 182     void executeOnHandlerTask(Runnable task) {
 183         synchronized (this) {
 184             if (closed)
 185                 throw new RejectedExecutionException();
 186             offerTask(task);
 187             wakeup();
 188         }
 189 
 190     }
 191 
 192     @Override
 193     void shutdownHandlerTasks() {
 194         // shutdown all handler threads
 195         int nThreads = threadCount();
 196         while (nThreads-- > 0) {
 197             wakeup();
 198         }
 199     }
 200 
 201     /**
 202      * Associate the given handle with this group
 203      */
 204     int associate(OverlappedChannel ch, long handle) throws IOException {
 205         keyToChannelLock.writeLock().lock();
 206 
 207         // generate a completion key (if not shutdown)
 208         int key;
 209         try {
 210             if (isShutdown())
 211                 throw new ShutdownChannelGroupException();
 212 
 213             // generate unique key
 214             do {
 215                 key = nextCompletionKey++;
 216             } while ((key == 0) || keyToChannel.containsKey(key));
 217 
 218             // associate with I/O completion port
 219             if (handle != 0L) {
 220                 createIoCompletionPort(handle, port, key, 0);
 221             }
 222 
 223             // setup mapping
 224             keyToChannel.put(key, ch);
 225         } finally {
 226             keyToChannelLock.writeLock().unlock();
 227         }
 228         return key;
 229     }
 230 
 231     /**
 232      * Disassociate channel from the group.
 233      */
 234     void disassociate(int key) {
 235         boolean checkForShutdown = false;
 236 
 237         keyToChannelLock.writeLock().lock();
 238         try {
 239             keyToChannel.remove(key);
 240 
 241             // last key to be removed so check if group is shutdown
 242             if (keyToChannel.isEmpty())
 243                 checkForShutdown = true;
 244 
 245         } finally {
 246             keyToChannelLock.writeLock().unlock();
 247         }
 248 
 249         // continue shutdown
 250         if (checkForShutdown && isShutdown()) {
 251             try {
 252                 shutdownNow();
 253             } catch (IOException ignore) { }
 254         }
 255     }
 256 
 257     /**
 258      * Invoked when a channel associated with this port is closed before
 259      * notifications for all outstanding I/O operations have been received.
 260      */
 261     void makeStale(Long overlapped) {
 262         synchronized (staleIoSet) {
 263             staleIoSet.add(overlapped);
 264         }
 265     }
 266 
 267     /**
 268      * Checks if the given OVERLAPPED is stale and if so, releases it.
 269      */
 270     private void checkIfStale(long ov) {
 271         synchronized (staleIoSet) {
 272             boolean removed = staleIoSet.remove(ov);
 273             if (removed) {
 274                 unsafe.freeMemory(ov);
 275             }
 276         }
 277     }
 278 
 279     /**
 280      * The handler for consuming the result of an asynchronous I/O operation.
 281      */
 282     static interface ResultHandler {
 283         /**
 284          * Invoked if the I/O operation completes successfully.
 285          */
 286         public void completed(int bytesTransferred, boolean canInvokeDirect);
 287 
 288         /**
 289          * Invoked if the I/O operation fails.
 290          */
 291         public void failed(int error, IOException ioe);
 292     }
 293 
 294     // Creates IOException for the given I/O error.
 295     private static IOException translateErrorToIOException(int error) {
 296         String msg = getErrorMessage(error);
 297         if (msg == null)
 298             msg = "Unknown error: 0x0" + Integer.toHexString(error);
 299         return new IOException(msg);
 300     }
 301 
 302     /**
 303      * Long-running task servicing system-wide or per-file completion port
 304      */
 305     private class EventHandlerTask implements Runnable {
 306         public void run() {
 307             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 308                 Invoker.getGroupAndInvokeCount();
 309             boolean canInvokeDirect = (myGroupAndInvokeCount != null);
 310             CompletionStatus ioResult = new CompletionStatus();
 311             boolean replaceMe = false;
 312 
 313             try {
 314                 for (;;) {
 315                     // reset invoke count
 316                     if (myGroupAndInvokeCount != null)
 317                         myGroupAndInvokeCount.resetInvokeCount();
 318 
 319                     // wait for I/O completion event
 320                     // A error here is fatal (thread will not be replaced)
 321                     replaceMe = false;
 322                     try {
 323                         getQueuedCompletionStatus(port, ioResult);
 324                     } catch (IOException x) {
 325                         // should not happen
 326                         x.printStackTrace();
 327                         return;
 328                     }
 329 
 330                     // handle wakeup to execute task or shutdown
 331                     if (ioResult.completionKey() == 0 &&
 332                         ioResult.overlapped() == 0L)
 333                     {
 334                         Runnable task = pollTask();
 335                         if (task == null) {
 336                             // shutdown request
 337                             return;
 338                         }
 339 
 340                         // run task
 341                         // (if error/exception then replace thread)
 342                         replaceMe = true;
 343                         task.run();
 344                         continue;
 345                     }
 346 
 347                     // map key to channel
 348                     OverlappedChannel ch = null;
 349                     keyToChannelLock.readLock().lock();
 350                     try {
 351                         ch = keyToChannel.get(ioResult.completionKey());
 352                         if (ch == null) {
 353                             checkIfStale(ioResult.overlapped());
 354                             continue;
 355                         }
 356                     } finally {
 357                         keyToChannelLock.readLock().unlock();
 358                     }
 359 
 360                     // lookup I/O request
 361                     PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
 362                     if (result == null) {
 363                         // we get here if the OVERLAPPED structure is associated
 364                         // with an I/O operation on a channel that was closed
 365                         // but the I/O operation event wasn't read in a timely
 366                         // manner. Alternatively, it may be related to a
 367                         // tryLock operation as the OVERLAPPED structures for
 368                         // these operations are not in the I/O cache.
 369                         checkIfStale(ioResult.overlapped());
 370                         continue;
 371                     }
 372 
 373                     // synchronize on result in case I/O completed immediately
 374                     // and was handled by initiator
 375                     synchronized (result) {
 376                         if (result.isDone()) {
 377                             continue;
 378                         }
 379                         // not handled by initiator
 380                     }
 381 
 382                     // invoke I/O result handler
 383                     int error = ioResult.error();
 384                     ResultHandler rh = (ResultHandler)result.getContext();
 385                     replaceMe = true; // (if error/exception then replace thread)
 386                     if (error == 0) {
 387                         rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
 388                     } else {
 389                         rh.failed(error, translateErrorToIOException(error));
 390                     }
 391                 }
 392             } finally {
 393                 // last thread to exit when shutdown releases resources
 394                 int remaining = threadExit(this, replaceMe);
 395                 if (remaining == 0 && isShutdown()) {
 396                     implClose();
 397                 }
 398             }
 399         }
 400     }
 401 
 402     /**
 403      * Container for data returned by GetQueuedCompletionStatus
 404      */
 405     private static class CompletionStatus {
 406         private int error;
 407         private int bytesTransferred;
 408         private int completionKey;
 409         private long overlapped;
 410 
 411         private CompletionStatus() { }
 412         int error() { return error; }
 413         int bytesTransferred() { return bytesTransferred; }
 414         int completionKey() { return completionKey; }
 415         long overlapped() { return overlapped; }
 416     }
 417 
 418     // -- native methods --
 419 
 420     private static native void initIDs();
 421 
 422     private static native long createIoCompletionPort(long handle,
 423         long existingPort, int completionKey, int concurrency) throws IOException;
 424 
 425     private static native void close0(long handle);
 426 
 427     private static native void getQueuedCompletionStatus(long completionPort,
 428         CompletionStatus status) throws IOException;
 429 
 430     private static native void postQueuedCompletionStatus(long completionPort,
 431         int completionKey) throws IOException;
 432 
 433     private static native String getErrorMessage(int error);
 434 
 435     static {
 436         IOUtil.load();
 437         initIDs();
 438     }
 439 }