1 /*
   2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package sun.nio.ch;
  27 
  28 import java.nio.channels.*;
  29 import java.nio.channels.spi.AsynchronousChannelProvider;
  30 import java.io.Closeable;
  31 import java.io.IOException;
  32 import java.io.FileDescriptor;
  33 import java.util.*;
  34 import java.util.concurrent.*;
  35 import java.util.concurrent.locks.ReadWriteLock;
  36 import java.util.concurrent.locks.ReentrantReadWriteLock;
  37 import java.security.AccessController;
  38 import sun.security.action.GetPropertyAction;
  39 import sun.misc.Unsafe;
  40 
  41 /**
  42  * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
  43  * completion port.
  44  */
  45 
  46 class Iocp extends AsynchronousChannelGroupImpl {
  47     private static final Unsafe unsafe = Unsafe.getUnsafe();
  48     private static final long INVALID_HANDLE_VALUE  = -1L;
  49     private static final boolean supportsThreadAgnosticIo;
  50 
  51     // maps completion key to channel
  52     private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
  53     private final Map<Integer,OverlappedChannel> keyToChannel =
  54         new HashMap<Integer,OverlappedChannel>();
  55     private int nextCompletionKey;
  56 
  57     // handle to completion port
  58     private final long port;
  59 
  60     // true if port has been closed
  61     private boolean closed;
  62 
  63     // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
  64     // relate to I/O operations where the completion notification was not
  65     // received in a timely manner after the channel is closed.
  66     private final Set<Long> staleIoSet = new HashSet<Long>();
  67 
  68     Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
  69         throws IOException
  70     {
  71         super(provider, pool);
  72         this.port =
  73           createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
  74         this.nextCompletionKey = 1;
  75     }
  76 
  77     Iocp start() {
  78         startThreads(new EventHandlerTask());
  79         return this;
  80     }
  81 
  82     /*
  83      * Channels implements this interface support overlapped I/O and can be
  84      * associated with a completion port.
  85      */
  86     static interface OverlappedChannel extends Closeable {
  87         /**
  88          * Returns a reference to the pending I/O result.
  89          */
  90         <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
  91     }
  92 
  93     /**
  94      * Indicates if this operating system supports thread agnostic I/O.
  95      */
  96     static boolean supportsThreadAgnosticIo() {
  97         return supportsThreadAgnosticIo;
  98     }
  99 
 100     // release all resources
 101     void implClose() {
 102         synchronized (this) {
 103             if (closed)
 104                 return;
 105             closed = true;
 106         }
 107         close0(port);
 108         synchronized (staleIoSet) {
 109             for (Long ov: staleIoSet) {
 110                 unsafe.freeMemory(ov);
 111             }
 112             staleIoSet.clear();
 113         }
 114     }
 115 
 116     @Override
 117     boolean isEmpty() {
 118         keyToChannelLock.writeLock().lock();
 119         try {
 120             return keyToChannel.isEmpty();
 121         } finally {
 122             keyToChannelLock.writeLock().unlock();
 123         }
 124     }
 125 
 126     @Override
 127     final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
 128         throws IOException
 129     {
 130         int key = associate(new OverlappedChannel() {
 131             public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
 132                 return null;
 133             }
 134             public void close() throws IOException {
 135                 channel.close();
 136             }
 137         }, 0L);
 138         return Integer.valueOf(key);
 139     }
 140 
 141     @Override
 142     final void detachForeignChannel(Object key) {
 143         disassociate((Integer)key);
 144     }
 145 
 146     @Override
 147     void closeAllChannels() {
 148         /**
 149          * On Windows the close operation will close the socket/file handle
 150          * and then wait until all outstanding I/O operations have aborted.
 151          * This is necessary as each channel's cache of OVERLAPPED structures
 152          * can only be freed once all I/O operations have completed. As I/O
 153          * completion requires a lookup of the keyToChannel then we must close
 154          * the channels when not holding the write lock.
 155          */
 156         final int MAX_BATCH_SIZE = 32;
 157         OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
 158         int count;
 159         do {
 160             // grab a batch of up to 32 channels
 161             keyToChannelLock.writeLock().lock();
 162             count = 0;
 163             try {
 164                 for (Integer key: keyToChannel.keySet()) {
 165                     channels[count++] = keyToChannel.get(key);
 166                     if (count >= MAX_BATCH_SIZE)
 167                         break;
 168                 }
 169             } finally {
 170                 keyToChannelLock.writeLock().unlock();
 171             }
 172 
 173             // close them
 174             for (int i=0; i<count; i++) {
 175                 try {
 176                     channels[i].close();
 177                 } catch (IOException ignore) { }
 178             }
 179         } while (count > 0);
 180     }
 181 
 182     private void wakeup() {
 183         try {
 184             postQueuedCompletionStatus(port, 0);
 185         } catch (IOException e) {
 186             // should not happen
 187             throw new AssertionError(e);
 188         }
 189     }
 190 
 191     @Override
 192     void executeOnHandlerTask(Runnable task) {
 193         synchronized (this) {
 194             if (closed)
 195                 throw new RejectedExecutionException();
 196             offerTask(task);
 197             wakeup();
 198         }
 199 
 200     }
 201 
 202     @Override
 203     void shutdownHandlerTasks() {
 204         // shutdown all handler threads
 205         int nThreads = threadCount();
 206         while (nThreads-- > 0) {
 207             wakeup();
 208         }
 209     }
 210 
 211     /**
 212      * Associate the given handle with this group
 213      */
 214     int associate(OverlappedChannel ch, long handle) throws IOException {
 215         keyToChannelLock.writeLock().lock();
 216 
 217         // generate a completion key (if not shutdown)
 218         int key;
 219         try {
 220             if (isShutdown())
 221                 throw new ShutdownChannelGroupException();
 222 
 223             // generate unique key
 224             do {
 225                 key = nextCompletionKey++;
 226             } while ((key == 0) || keyToChannel.containsKey(key));
 227 
 228             // associate with I/O completion port
 229             if (handle != 0L) {
 230                 createIoCompletionPort(handle, port, key, 0);
 231             }
 232 
 233             // setup mapping
 234             keyToChannel.put(key, ch);
 235         } finally {
 236             keyToChannelLock.writeLock().unlock();
 237         }
 238         return key;
 239     }
 240 
 241     /**
 242      * Disassociate channel from the group.
 243      */
 244     void disassociate(int key) {
 245         boolean checkForShutdown = false;
 246 
 247         keyToChannelLock.writeLock().lock();
 248         try {
 249             keyToChannel.remove(key);
 250 
 251             // last key to be removed so check if group is shutdown
 252             if (keyToChannel.isEmpty())
 253                 checkForShutdown = true;
 254 
 255         } finally {
 256             keyToChannelLock.writeLock().unlock();
 257         }
 258 
 259         // continue shutdown
 260         if (checkForShutdown && isShutdown()) {
 261             try {
 262                 shutdownNow();
 263             } catch (IOException ignore) { }
 264         }
 265     }
 266 
 267     /**
 268      * Invoked when a channel associated with this port is closed before
 269      * notifications for all outstanding I/O operations have been received.
 270      */
 271     void makeStale(Long overlapped) {
 272         synchronized (staleIoSet) {
 273             staleIoSet.add(overlapped);
 274         }
 275     }
 276 
 277     /**
 278      * Checks if the given OVERLAPPED is stale and if so, releases it.
 279      */
 280     private void checkIfStale(long ov) {
 281         synchronized (staleIoSet) {
 282             boolean removed = staleIoSet.remove(ov);
 283             if (removed) {
 284                 unsafe.freeMemory(ov);
 285             }
 286         }
 287     }
 288 
 289     /**
 290      * The handler for consuming the result of an asynchronous I/O operation.
 291      */
 292     static interface ResultHandler {
 293         /**
 294          * Invoked if the I/O operation completes successfully.
 295          */
 296         public void completed(int bytesTransferred, boolean canInvokeDirect);
 297 
 298         /**
 299          * Invoked if the I/O operation fails.
 300          */
 301         public void failed(int error, IOException ioe);
 302     }
 303 
 304     // Creates IOException for the given I/O error.
 305     private static IOException translateErrorToIOException(int error) {
 306         String msg = getErrorMessage(error);
 307         if (msg == null)
 308             msg = "Unknown error: 0x0" + Integer.toHexString(error);
 309         return new IOException(msg);
 310     }
 311 
 312     /**
 313      * Long-running task servicing system-wide or per-file completion port
 314      */
 315     private class EventHandlerTask implements Runnable {
 316         public void run() {
 317             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
 318                 Invoker.getGroupAndInvokeCount();
 319             boolean canInvokeDirect = (myGroupAndInvokeCount != null);
 320             CompletionStatus ioResult = new CompletionStatus();
 321             boolean replaceMe = false;
 322 
 323             try {
 324                 for (;;) {
 325                     // reset invoke count
 326                     if (myGroupAndInvokeCount != null)
 327                         myGroupAndInvokeCount.resetInvokeCount();
 328 
 329                     // wait for I/O completion event
 330                     // A error here is fatal (thread will not be replaced)
 331                     replaceMe = false;
 332                     try {
 333                         getQueuedCompletionStatus(port, ioResult);
 334                     } catch (IOException x) {
 335                         // should not happen
 336                         x.printStackTrace();
 337                         return;
 338                     }
 339 
 340                     // handle wakeup to execute task or shutdown
 341                     if (ioResult.completionKey() == 0 &&
 342                         ioResult.overlapped() == 0L)
 343                     {
 344                         Runnable task = pollTask();
 345                         if (task == null) {
 346                             // shutdown request
 347                             return;
 348                         }
 349 
 350                         // run task
 351                         // (if error/exception then replace thread)
 352                         replaceMe = true;
 353                         task.run();
 354                         continue;
 355                     }
 356 
 357                     // map key to channel
 358                     OverlappedChannel ch = null;
 359                     keyToChannelLock.readLock().lock();
 360                     try {
 361                         ch = keyToChannel.get(ioResult.completionKey());
 362                         if (ch == null) {
 363                             checkIfStale(ioResult.overlapped());
 364                             continue;
 365                         }
 366                     } finally {
 367                         keyToChannelLock.readLock().unlock();
 368                     }
 369 
 370                     // lookup I/O request
 371                     PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
 372                     if (result == null) {
 373                         // we get here if the OVERLAPPED structure is associated
 374                         // with an I/O operation on a channel that was closed
 375                         // but the I/O operation event wasn't read in a timely
 376                         // manner. Alternatively, it may be related to a
 377                         // tryLock operation as the OVERLAPPED structures for
 378                         // these operations are not in the I/O cache.
 379                         checkIfStale(ioResult.overlapped());
 380                         continue;
 381                     }
 382 
 383                     // synchronize on result in case I/O completed immediately
 384                     // and was handled by initiator
 385                     synchronized (result) {
 386                         if (result.isDone()) {
 387                             continue;
 388                         }
 389                         // not handled by initiator
 390                     }
 391 
 392                     // invoke I/O result handler
 393                     int error = ioResult.error();
 394                     ResultHandler rh = (ResultHandler)result.getContext();
 395                     replaceMe = true; // (if error/exception then replace thread)
 396                     if (error == 0) {
 397                         rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
 398                     } else {
 399                         rh.failed(error, translateErrorToIOException(error));
 400                     }
 401                 }
 402             } finally {
 403                 // last thread to exit when shutdown releases resources
 404                 int remaining = threadExit(this, replaceMe);
 405                 if (remaining == 0 && isShutdown()) {
 406                     implClose();
 407                 }
 408             }
 409         }
 410     }
 411 
 412     /**
 413      * Container for data returned by GetQueuedCompletionStatus
 414      */
 415     private static class CompletionStatus {
 416         private int error;
 417         private int bytesTransferred;
 418         private int completionKey;
 419         private long overlapped;
 420 
 421         private CompletionStatus() { }
 422         int error() { return error; }
 423         int bytesTransferred() { return bytesTransferred; }
 424         int completionKey() { return completionKey; }
 425         long overlapped() { return overlapped; }
 426     }
 427 
 428     // -- native methods --
 429 
 430     private static native void initIDs();
 431 
 432     private static native long createIoCompletionPort(long handle,
 433         long existingPort, int completionKey, int concurrency) throws IOException;
 434 
 435     private static native void close0(long handle);
 436 
 437     private static native void getQueuedCompletionStatus(long completionPort,
 438         CompletionStatus status) throws IOException;
 439 
 440     private static native void postQueuedCompletionStatus(long completionPort,
 441         int completionKey) throws IOException;
 442 
 443     private static native String getErrorMessage(int error);
 444 
 445     static {
 446         IOUtil.load();
 447         initIDs();
 448 
 449         // thread agnostic I/O on Vista/2008 or newer
 450         String osversion = AccessController.doPrivileged(
 451             new GetPropertyAction("os.version"));
 452         String vers[] = osversion.split("\\.");
 453         supportsThreadAgnosticIo = Integer.parseInt(vers[0]) >= 6;
 454     }
 455 }