1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.*; 29 import java.nio.channels.spi.AsynchronousChannelProvider; 30 import java.io.Closeable; 31 import java.io.IOException; 32 import java.io.FileDescriptor; 33 import java.util.*; 34 import java.util.concurrent.*; 35 import java.util.concurrent.locks.ReadWriteLock; 36 import java.util.concurrent.locks.ReentrantReadWriteLock; 37 import jdk.internal.misc.Unsafe; 38 39 /** 40 * Windows implementation of AsynchronousChannelGroup encapsulating an I/O 41 * completion port. 42 */ 43 44 class Iocp extends AsynchronousChannelGroupImpl { 45 private static final Unsafe unsafe = Unsafe.getUnsafe(); 46 private static final long INVALID_HANDLE_VALUE = -1L; 47 48 // maps completion key to channel 49 private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock(); 50 private final Map<Integer,OverlappedChannel> keyToChannel = 51 new HashMap<Integer,OverlappedChannel>(); 52 private int nextCompletionKey; 53 54 // handle to completion port 55 private final long port; 56 57 // true if port has been closed 58 private boolean closed; 59 60 // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures 61 // relate to I/O operations where the completion notification was not 62 // received in a timely manner after the channel is closed. 63 private final Set<Long> staleIoSet = new HashSet<Long>(); 64 65 Iocp(AsynchronousChannelProvider provider, ThreadPool pool) 66 throws IOException 67 { 68 super(provider, pool); 69 this.port = 70 createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount()); 71 this.nextCompletionKey = 1; 72 } 73 74 Iocp start() { 75 startThreads(new EventHandlerTask()); 76 return this; 77 } 78 79 /* 80 * Channels implements this interface support overlapped I/O and can be 81 * associated with a completion port. 82 */ 83 static interface OverlappedChannel extends Closeable { 84 /** 85 * Returns a reference to the pending I/O result. 86 */ 87 <V,A> PendingFuture<V,A> getByOverlapped(long overlapped); 88 } 89 90 // release all resources 91 void implClose() { 92 synchronized (this) { 93 if (closed) 94 return; 95 closed = true; 96 } 97 close0(port); 98 synchronized (staleIoSet) { 99 for (Long ov: staleIoSet) { 100 unsafe.freeMemory(ov); 101 } 102 staleIoSet.clear(); 103 } 104 } 105 106 @Override 107 boolean isEmpty() { 108 keyToChannelLock.writeLock().lock(); 109 try { 110 return keyToChannel.isEmpty(); 111 } finally { 112 keyToChannelLock.writeLock().unlock(); 113 } 114 } 115 116 @Override 117 final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj) 118 throws IOException 119 { 120 int key = associate(new OverlappedChannel() { 121 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) { 122 return null; 123 } 124 public void close() throws IOException { 125 channel.close(); 126 } 127 }, 0L); 128 return Integer.valueOf(key); 129 } 130 131 @Override 132 final void detachForeignChannel(Object key) { 133 disassociate((Integer)key); 134 } 135 136 @Override 137 void closeAllChannels() { 138 /** 139 * On Windows the close operation will close the socket/file handle 140 * and then wait until all outstanding I/O operations have aborted. 141 * This is necessary as each channel's cache of OVERLAPPED structures 142 * can only be freed once all I/O operations have completed. As I/O 143 * completion requires a lookup of the keyToChannel then we must close 144 * the channels when not holding the write lock. 145 */ 146 final int MAX_BATCH_SIZE = 32; 147 OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE]; 148 int count; 149 do { 150 // grab a batch of up to 32 channels 151 keyToChannelLock.writeLock().lock(); 152 count = 0; 153 try { 154 for (Integer key: keyToChannel.keySet()) { 155 channels[count++] = keyToChannel.get(key); 156 if (count >= MAX_BATCH_SIZE) 157 break; 158 } 159 } finally { 160 keyToChannelLock.writeLock().unlock(); 161 } 162 163 // close them 164 for (int i=0; i<count; i++) { 165 try { 166 channels[i].close(); 167 } catch (IOException ignore) { } 168 } 169 } while (count > 0); 170 } 171 172 private void wakeup() { 173 try { 174 postQueuedCompletionStatus(port, 0); 175 } catch (IOException e) { 176 // should not happen 177 throw new AssertionError(e); 178 } 179 } 180 181 @Override 182 void executeOnHandlerTask(Runnable task) { 183 synchronized (this) { 184 if (closed) 185 throw new RejectedExecutionException(); 186 offerTask(task); 187 wakeup(); 188 } 189 190 } 191 192 @Override 193 void shutdownHandlerTasks() { 194 // shutdown all handler threads 195 int nThreads = threadCount(); 196 while (nThreads-- > 0) { 197 wakeup(); 198 } 199 } 200 201 /** 202 * Associate the given handle with this group 203 */ 204 int associate(OverlappedChannel ch, long handle) throws IOException { 205 keyToChannelLock.writeLock().lock(); 206 207 // generate a completion key (if not shutdown) 208 int key; 209 try { 210 if (isShutdown()) 211 throw new ShutdownChannelGroupException(); 212 213 // generate unique key 214 do { 215 key = nextCompletionKey++; 216 } while ((key == 0) || keyToChannel.containsKey(key)); 217 218 // associate with I/O completion port 219 if (handle != 0L) { 220 createIoCompletionPort(handle, port, key, 0); 221 } 222 223 // setup mapping 224 keyToChannel.put(key, ch); 225 } finally { 226 keyToChannelLock.writeLock().unlock(); 227 } 228 return key; 229 } 230 231 /** 232 * Disassociate channel from the group. 233 */ 234 void disassociate(int key) { 235 boolean checkForShutdown = false; 236 237 keyToChannelLock.writeLock().lock(); 238 try { 239 keyToChannel.remove(key); 240 241 // last key to be removed so check if group is shutdown 242 if (keyToChannel.isEmpty()) 243 checkForShutdown = true; 244 245 } finally { 246 keyToChannelLock.writeLock().unlock(); 247 } 248 249 // continue shutdown 250 if (checkForShutdown && isShutdown()) { 251 try { 252 shutdownNow(); 253 } catch (IOException ignore) { } 254 } 255 } 256 257 /** 258 * Invoked when a channel associated with this port is closed before 259 * notifications for all outstanding I/O operations have been received. 260 */ 261 void makeStale(Long overlapped) { 262 synchronized (staleIoSet) { 263 staleIoSet.add(overlapped); 264 } 265 } 266 267 /** 268 * Checks if the given OVERLAPPED is stale and if so, releases it. 269 */ 270 private void checkIfStale(long ov) { 271 synchronized (staleIoSet) { 272 boolean removed = staleIoSet.remove(ov); 273 if (removed) { 274 unsafe.freeMemory(ov); 275 } 276 } 277 } 278 279 /** 280 * The handler for consuming the result of an asynchronous I/O operation. 281 */ 282 static interface ResultHandler { 283 /** 284 * Invoked if the I/O operation completes successfully. 285 */ 286 public void completed(int bytesTransferred, boolean canInvokeDirect); 287 288 /** 289 * Invoked if the I/O operation fails. 290 */ 291 public void failed(int error, IOException ioe); 292 } 293 294 // Creates IOException for the given I/O error. 295 private static IOException translateErrorToIOException(int error) { 296 String msg = getErrorMessage(error); 297 if (msg == null) 298 msg = "Unknown error: 0x0" + Integer.toHexString(error); 299 return new IOException(msg); 300 } 301 302 /** 303 * Long-running task servicing system-wide or per-file completion port 304 */ 305 private class EventHandlerTask implements Runnable { 306 public void run() { 307 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 308 Invoker.getGroupAndInvokeCount(); 309 boolean canInvokeDirect = (myGroupAndInvokeCount != null); 310 CompletionStatus ioResult = new CompletionStatus(); 311 boolean replaceMe = false; 312 313 try { 314 for (;;) { 315 // reset invoke count 316 if (myGroupAndInvokeCount != null) 317 myGroupAndInvokeCount.resetInvokeCount(); 318 319 // wait for I/O completion event 320 // A error here is fatal (thread will not be replaced) 321 replaceMe = false; 322 try { 323 getQueuedCompletionStatus(port, ioResult); 324 } catch (IOException x) { 325 // should not happen 326 x.printStackTrace(); 327 return; 328 } 329 330 // handle wakeup to execute task or shutdown 331 if (ioResult.completionKey() == 0 && 332 ioResult.overlapped() == 0L) 333 { 334 Runnable task = pollTask(); 335 if (task == null) { 336 // shutdown request 337 return; 338 } 339 340 // run task 341 // (if error/exception then replace thread) 342 replaceMe = true; 343 task.run(); 344 continue; 345 } 346 347 // map key to channel 348 OverlappedChannel ch = null; 349 keyToChannelLock.readLock().lock(); 350 try { 351 ch = keyToChannel.get(ioResult.completionKey()); 352 if (ch == null) { 353 checkIfStale(ioResult.overlapped()); 354 continue; 355 } 356 } finally { 357 keyToChannelLock.readLock().unlock(); 358 } 359 360 // lookup I/O request 361 PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped()); 362 if (result == null) { 363 // we get here if the OVERLAPPED structure is associated 364 // with an I/O operation on a channel that was closed 365 // but the I/O operation event wasn't read in a timely 366 // manner. Alternatively, it may be related to a 367 // tryLock operation as the OVERLAPPED structures for 368 // these operations are not in the I/O cache. 369 checkIfStale(ioResult.overlapped()); 370 continue; 371 } 372 373 // synchronize on result in case I/O completed immediately 374 // and was handled by initiator 375 synchronized (result) { 376 if (result.isDone()) { 377 continue; 378 } 379 // not handled by initiator 380 } 381 382 // invoke I/O result handler 383 int error = ioResult.error(); 384 ResultHandler rh = (ResultHandler)result.getContext(); 385 replaceMe = true; // (if error/exception then replace thread) 386 if (error == 0) { 387 rh.completed(ioResult.bytesTransferred(), canInvokeDirect); 388 } else { 389 rh.failed(error, translateErrorToIOException(error)); 390 } 391 } 392 } finally { 393 // last thread to exit when shutdown releases resources 394 int remaining = threadExit(this, replaceMe); 395 if (remaining == 0 && isShutdown()) { 396 implClose(); 397 } 398 } 399 } 400 } 401 402 /** 403 * Container for data returned by GetQueuedCompletionStatus 404 */ 405 private static class CompletionStatus { 406 private int error; 407 private int bytesTransferred; 408 private int completionKey; 409 private long overlapped; 410 411 private CompletionStatus() { } 412 int error() { return error; } 413 int bytesTransferred() { return bytesTransferred; } 414 int completionKey() { return completionKey; } 415 long overlapped() { return overlapped; } 416 } 417 418 // -- native methods -- 419 420 private static native void initIDs(); 421 422 private static native long createIoCompletionPort(long handle, 423 long existingPort, int completionKey, int concurrency) throws IOException; 424 425 private static native void close0(long handle); 426 427 private static native void getQueuedCompletionStatus(long completionPort, 428 CompletionStatus status) throws IOException; 429 430 private static native void postQueuedCompletionStatus(long completionPort, 431 int completionKey) throws IOException; 432 433 private static native String getErrorMessage(int error); 434 435 static { 436 IOUtil.load(); 437 initIDs(); 438 } 439 }