1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.channels.spi.AsynchronousChannelProvider; 30 import java.io.Closeable; 31 import java.io.IOException; 32 import java.io.FileDescriptor; 33 import java.util.*; 34 import java.util.concurrent.*; 35 import java.util.concurrent.locks.ReadWriteLock; 36 import java.util.concurrent.locks.ReentrantReadWriteLock; 37 import java.security.AccessController; 38 import sun.security.action.GetPropertyAction; 39 import sun.misc.Unsafe; 40 41 /** 42 * Windows implementation of AsynchronousChannelGroup encapsulating an I/O 43 * completion port. 44 */ 45 46 class Iocp extends AsynchronousChannelGroupImpl { 47 private static final Unsafe unsafe = Unsafe.getUnsafe(); 48 private static final long INVALID_HANDLE_VALUE = -1L; 49 private static final boolean supportsThreadAgnosticIo; 50 51 // maps completion key to channel 52 private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock(); 53 private final Map<Integer,OverlappedChannel> keyToChannel = 54 new HashMap<Integer,OverlappedChannel>(); 55 private int nextCompletionKey; 56 57 // handle to completion port 58 private final long port; 59 60 // true if port has been closed 61 private boolean closed; 62 63 // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures 64 // relate to I/O operations where the completion notification was not 65 // received in a timely manner after the channel is closed. 66 private final Set<Long> staleIoSet = new HashSet<Long>(); 67 68 Iocp(AsynchronousChannelProvider provider, ThreadPool pool) 69 throws IOException 70 { 71 super(provider, pool); 72 this.port = 73 createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount()); 74 this.nextCompletionKey = 1; 75 } 76 77 Iocp start() { 78 startThreads(new EventHandlerTask()); 79 return this; 80 } 81 82 /* 83 * Channels implements this interface support overlapped I/O and can be 84 * associated with a completion port. 85 */ 86 static interface OverlappedChannel extends Closeable { 87 /** 88 * Returns a reference to the pending I/O result. 89 */ 90 <V,A> PendingFuture<V,A> getByOverlapped(long overlapped); 91 } 92 93 /** 94 * Indicates if this operating system supports thread agnostic I/O. 95 */ 96 static boolean supportsThreadAgnosticIo() { 97 return supportsThreadAgnosticIo; 98 } 99 100 // release all resources 101 void implClose() { 102 synchronized (this) { 103 if (closed) 104 return; 105 closed = true; 106 } 107 close0(port); 108 synchronized (staleIoSet) { 109 for (Long ov: staleIoSet) { 110 unsafe.freeMemory(ov); 111 } 112 staleIoSet.clear(); 113 } 114 } 115 116 @Override 117 boolean isEmpty() { 118 keyToChannelLock.writeLock().lock(); 119 try { 120 return keyToChannel.isEmpty(); 121 } finally { 122 keyToChannelLock.writeLock().unlock(); 123 } 124 } 125 126 @Override 127 final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj) 128 throws IOException 129 { 130 int key = associate(new OverlappedChannel() { 131 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 132 return null; 133 } 134 public void close() throws IOException { 135 channel.close(); 136 } 137 }, 0L); 138 return Integer.valueOf(key); 139 } 140 141 @Override 142 final void detachForeignChannel(Object key) { 143 disassociate((Integer)key); 144 } 145 146 @Override 147 void closeAllChannels() { 148 /** 149 * On Windows the close operation will close the socket/file handle 150 * and then wait until all outstanding I/O operations have aborted. 151 * This is necessary as each channel's cache of OVERLAPPED structures 152 * can only be freed once all I/O operations have completed. As I/O 153 * completion requires a lookup of the keyToChannel then we must close 154 * the channels when not holding the write lock. 155 */ 156 final int MAX_BATCH_SIZE = 32; 157 OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE]; 158 int count; 159 do { 160 // grab a batch of up to 32 channels 161 keyToChannelLock.writeLock().lock(); 162 count = 0; 163 try { 164 for (Integer key: keyToChannel.keySet()) { 165 channels[count++] = keyToChannel.get(key); 166 if (count >= MAX_BATCH_SIZE) 167 break; 168 } 169 } finally { 170 keyToChannelLock.writeLock().unlock(); 171 } 172 173 // close them 174 for (int i=0; i<count; i++) { 175 try { 176 channels[i].close(); 177 } catch (IOException ignore) { } 178 } 179 } while (count > 0); 180 } 181 182 private void wakeup() { 183 try { 184 postQueuedCompletionStatus(port, 0); 185 } catch (IOException e) { 186 // should not happen 187 throw new AssertionError(e); 188 } 189 } 190 191 @Override 192 void executeOnHandlerTask(Runnable task) { 193 synchronized (this) { 194 if (closed) 195 throw new RejectedExecutionException(); 196 offerTask(task); 197 wakeup(); 198 } 199 200 } 201 202 @Override 203 void shutdownHandlerTasks() { 204 // shutdown all handler threads 205 int nThreads = threadCount(); 206 while (nThreads-- > 0) { 207 wakeup(); 208 } 209 } 210 211 /** 212 * Associate the given handle with this group 213 */ 214 int associate(OverlappedChannel ch, long handle) throws IOException { 215 keyToChannelLock.writeLock().lock(); 216 217 // generate a completion key (if not shutdown) 218 int key; 219 try { 220 if (isShutdown()) 221 throw new ShutdownChannelGroupException(); 222 223 // generate unique key 224 do { 225 key = nextCompletionKey++; 226 } while ((key == 0) || keyToChannel.containsKey(key)); 227 228 // associate with I/O completion port 229 if (handle != 0L) { 230 createIoCompletionPort(handle, port, key, 0); 231 } 232 233 // setup mapping 234 keyToChannel.put(key, ch); 235 } finally { 236 keyToChannelLock.writeLock().unlock(); 237 } 238 return key; 239 } 240 241 /** 242 * Disassociate channel from the group. 243 */ 244 void disassociate(int key) { 245 boolean checkForShutdown = false; 246 247 keyToChannelLock.writeLock().lock(); 248 try { 249 keyToChannel.remove(key); 250 251 // last key to be removed so check if group is shutdown 252 if (keyToChannel.isEmpty()) 253 checkForShutdown = true; 254 255 } finally { 256 keyToChannelLock.writeLock().unlock(); 257 } 258 259 // continue shutdown 260 if (checkForShutdown && isShutdown()) { 261 try { 262 shutdownNow(); 263 } catch (IOException ignore) { } 264 } 265 } 266 267 /** 268 * Invoked when a channel associated with this port is closed before 269 * notifications for all outstanding I/O operations have been received. 270 */ 271 void makeStale(Long overlapped) { 272 synchronized (staleIoSet) { 273 staleIoSet.add(overlapped); 274 } 275 } 276 277 /** 278 * Checks if the given OVERLAPPED is stale and if so, releases it. 279 */ 280 private void checkIfStale(long ov) { 281 synchronized (staleIoSet) { 282 boolean removed = staleIoSet.remove(ov); 283 if (removed) { 284 unsafe.freeMemory(ov); 285 } 286 } 287 } 288 289 /** 290 * The handler for consuming the result of an asynchronous I/O operation. 291 */ 292 static interface ResultHandler { 293 /** 294 * Invoked if the I/O operation completes successfully. 295 */ 296 public void completed(int bytesTransferred, boolean canInvokeDirect); 297 298 /** 299 * Invoked if the I/O operation fails. 300 */ 301 public void failed(int error, IOException ioe); 302 } 303 304 // Creates IOException for the given I/O error. 305 private static IOException translateErrorToIOException(int error) { 306 String msg = getErrorMessage(error); 307 if (msg == null) 308 msg = "Unknown error: 0x0" + Integer.toHexString(error); 309 return new IOException(msg); 310 } 311 312 /** 313 * Long-running task servicing system-wide or per-file completion port 314 */ 315 private class EventHandlerTask implements Runnable { 316 public void run() { 317 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 318 Invoker.getGroupAndInvokeCount(); 319 boolean canInvokeDirect = (myGroupAndInvokeCount != null); 320 CompletionStatus ioResult = new CompletionStatus(); 321 boolean replaceMe = false; 322 323 try { 324 for (;;) { 325 // reset invoke count 326 if (myGroupAndInvokeCount != null) 327 myGroupAndInvokeCount.resetInvokeCount(); 328 329 // wait for I/O completion event 330 // A error here is fatal (thread will not be replaced) 331 replaceMe = false; 332 try { 333 getQueuedCompletionStatus(port, ioResult); 334 } catch (IOException x) { 335 // should not happen 336 x.printStackTrace(); 337 return; 338 } 339 340 // handle wakeup to execute task or shutdown 341 if (ioResult.completionKey() == 0 && 342 ioResult.overlapped() == 0L) 343 { 344 Runnable task = pollTask(); 345 if (task == null) { 346 // shutdown request 347 return; 348 } 349 350 // run task 351 // (if error/exception then replace thread) 352 replaceMe = true; 353 task.run(); 354 continue; 355 } 356 357 // map key to channel 358 OverlappedChannel ch = null; 359 keyToChannelLock.readLock().lock(); 360 try { 361 ch = keyToChannel.get(ioResult.completionKey()); 362 if (ch == null) { 363 checkIfStale(ioResult.overlapped()); 364 continue; 365 } 366 } finally { 367 keyToChannelLock.readLock().unlock(); 368 } 369 370 // lookup I/O request 371 PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped()); 372 if (result == null) { 373 // we get here if the OVERLAPPED structure is associated 374 // with an I/O operation on a channel that was closed 375 // but the I/O operation event wasn't read in a timely 376 // manner. Alternatively, it may be related to a 377 // tryLock operation as the OVERLAPPED structures for 378 // these operations are not in the I/O cache. 379 checkIfStale(ioResult.overlapped()); 380 continue; 381 } 382 383 // synchronize on result in case I/O completed immediately 384 // and was handled by initiator 385 synchronized (result) { 386 if (result.isDone()) { 387 continue; 388 } 389 // not handled by initiator 390 } 391 392 // invoke I/O result handler 393 int error = ioResult.error(); 394 ResultHandler rh = (ResultHandler)result.getContext(); 395 replaceMe = true; // (if error/exception then replace thread) 396 if (error == 0) { 397 rh.completed(ioResult.bytesTransferred(), canInvokeDirect); 398 } else { 399 rh.failed(error, translateErrorToIOException(error)); 400 } 401 } 402 } finally { 403 // last thread to exit when shutdown releases resources 404 int remaining = threadExit(this, replaceMe); 405 if (remaining == 0 && isShutdown()) { 406 implClose(); 407 } 408 } 409 } 410 } 411 412 /** 413 * Container for data returned by GetQueuedCompletionStatus 414 */ 415 private static class CompletionStatus { 416 private int error; 417 private int bytesTransferred; 418 private int completionKey; 419 private long overlapped; 420 421 private CompletionStatus() { } 422 int error() { return error; } 423 int bytesTransferred() { return bytesTransferred; } 424 int completionKey() { return completionKey; } 425 long overlapped() { return overlapped; } 426 } 427 428 // -- native methods -- 429 430 private static native void initIDs(); 431 432 private static native long createIoCompletionPort(long handle, 433 long existingPort, int completionKey, int concurrency) throws IOException; 434 435 private static native void close0(long handle); 436 437 private static native void getQueuedCompletionStatus(long completionPort, 438 CompletionStatus status) throws IOException; 439 440 private static native void postQueuedCompletionStatus(long completionPort, 441 int completionKey) throws IOException; 442 443 private static native String getErrorMessage(int error); 444 445 static { 446 IOUtil.load(); 447 initIDs(); 448 449 // thread agnostic I/O on Vista/2008 or newer 450 String osversion = AccessController.doPrivileged( 451 new GetPropertyAction("os.version")); 452 String vers[] = osversion.split("\\."); 453 supportsThreadAgnosticIo = Integer.parseInt(vers[0]) >= 6; 454 } 455 }