1 /*
   2  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6822643 6830721 6842687
  26  * @summary Unit test for AsynchronousFileChannel
  27  * @key randomness
  28  */
  29 
  30 import java.nio.file.*;
  31 import java.nio.channels.*;
  32 import java.nio.ByteBuffer;
  33 import java.io.File;
  34 import java.io.IOException;
  35 import java.util.*;
  36 import java.util.concurrent.*;
  37 import java.util.concurrent.atomic.AtomicReference;
  38 import static java.nio.file.StandardOpenOption.*;
  39 
  40 public class Basic {
  41 
  42     private static final Random rand = new Random();
  43 
  44     public static void main(String[] args) throws IOException {
  45         // create temporary file
  46         File blah = File.createTempFile("blah", null);
  47         blah.deleteOnExit();
  48 
  49         AsynchronousFileChannel ch = AsynchronousFileChannel
  50             .open(blah.toPath(), READ, WRITE);
  51         try {
  52             // run tests
  53             testUsingCompletionHandlers(ch);
  54             testUsingWaitOnResult(ch);
  55             testInterruptHandlerThread(ch);
  56         } finally {
  57             ch.close();
  58         }
  59 
  60         // run test that expects channel to be closed
  61         testClosedChannel(ch);
  62 
  63         // these tests open the file themselves
  64         testLocking(blah.toPath());
  65         testCustomThreadPool(blah.toPath());
  66         testAsynchronousClose(blah.toPath());
  67         testCancel(blah.toPath());
  68         testTruncate(blah.toPath());
  69 
  70         // eagerly clean-up
  71         blah.delete();
  72     }
  73 
  74     /*
  75      * Generate buffer with random contents
  76      * Writes buffer to file using a CompletionHandler to consume the result
  77      *    of each write operation
  78      * Reads file to EOF to a new buffer using a CompletionHandler to consume
  79      *    the result of each read operation
  80      * Compares buffer contents
  81      */
  82     static void testUsingCompletionHandlers(AsynchronousFileChannel ch)
  83         throws IOException
  84     {
  85         System.out.println("testUsingCompletionHandlers");
  86 
  87         ch.truncate(0L);
  88 
  89         // generate buffer with random elements and write it to file
  90         ByteBuffer src = genBuffer();
  91         writeFully(ch, src, 0L);
  92 
  93         // read to EOF or buffer is full
  94         ByteBuffer dst = (rand.nextBoolean()) ?
  95             ByteBuffer.allocateDirect(src.capacity()) :
  96             ByteBuffer.allocate(src.capacity());
  97         readAll(ch, dst, 0L);
  98 
  99         // check buffers are the same
 100         src.flip();
 101         dst.flip();
 102         if (!src.equals(dst)) {
 103             throw new RuntimeException("Contents differ");
 104         }
 105     }
 106 
 107     /*
 108      * Generate buffer with random contents
 109      * Writes buffer to file, invoking the Future's get method to wait for
 110      *    each write operation to complete
 111      * Reads file to EOF to a new buffer, invoking the Future's get method to
 112      *    wait for each write operation to complete
 113      * Compares buffer contents
 114      */
 115     static void testUsingWaitOnResult(AsynchronousFileChannel ch)
 116         throws IOException
 117     {
 118         System.out.println("testUsingWaitOnResult");
 119 
 120         ch.truncate(0L);
 121 
 122         // generate buffer
 123         ByteBuffer src = genBuffer();
 124 
 125         // write buffer completely to file
 126         long position = 0L;
 127         while (src.hasRemaining()) {
 128             Future<Integer> result = ch.write(src, position);
 129             try {
 130                 int n = result.get();
 131                 // update position
 132                 position += n;
 133             } catch (ExecutionException x) {
 134                 throw new RuntimeException(x.getCause());
 135             } catch (InterruptedException x) {
 136                 throw new RuntimeException(x);
 137             }
 138         }
 139 
 140         // read file into new buffer
 141         ByteBuffer dst = (rand.nextBoolean()) ?
 142             ByteBuffer.allocateDirect(src.capacity()) :
 143             ByteBuffer.allocate(src.capacity());
 144         position = 0L;
 145         int n;
 146         do {
 147             Future<Integer> result = ch.read(dst, position);
 148             try {
 149                 n = result.get();
 150 
 151                 // update position
 152                 if (n > 0) position += n;
 153             } catch (ExecutionException x) {
 154                 throw new RuntimeException(x.getCause());
 155             } catch (InterruptedException x) {
 156                 throw new RuntimeException(x);
 157             }
 158         } while (n > 0);
 159 
 160         // check buffers are the same
 161         src.flip();
 162         dst.flip();
 163         if (!src.equals(dst)) {
 164             throw new RuntimeException("Contents differ");
 165         }
 166     }
 167 
 168     // exercise lock methods
 169     static void testLocking(Path file) throws IOException {
 170         System.out.println("testLocking");
 171 
 172         AsynchronousFileChannel ch = AsynchronousFileChannel
 173             .open(file, READ, WRITE);
 174         FileLock fl;
 175         try {
 176             // test 1 - acquire lock and check that tryLock throws
 177             // OverlappingFileLockException
 178             try {
 179                 fl = ch.lock().get();
 180             } catch (ExecutionException x) {
 181                 throw new RuntimeException(x);
 182             } catch (InterruptedException x) {
 183                 throw new RuntimeException("Should not be interrupted");
 184             }
 185             if (!fl.acquiredBy().equals(ch))
 186                 throw new RuntimeException("FileLock#acquiredBy returned incorrect channel");
 187             try {
 188                 ch.tryLock();
 189                 throw new RuntimeException("OverlappingFileLockException expected");
 190             } catch (OverlappingFileLockException x) {
 191             }
 192             fl.release();
 193 
 194             // test 2 - acquire try and check that lock throws OverlappingFileLockException
 195             fl = ch.tryLock();
 196             if (fl == null)
 197                 throw new RuntimeException("Unable to acquire lock");
 198             try {
 199                 ch.lock((Void)null, new CompletionHandler<FileLock,Void> () {
 200                     public void completed(FileLock result, Void att) {
 201                     }
 202                     public void failed(Throwable exc, Void att) {
 203                     }
 204                 });
 205                 throw new RuntimeException("OverlappingFileLockException expected");
 206             } catch (OverlappingFileLockException x) {
 207             }
 208         } finally {
 209             ch.close();
 210         }
 211 
 212         // test 3 - channel is closed so FileLock should no longer be valid
 213         if (fl.isValid())
 214             throw new RuntimeException("FileLock expected to be invalid");
 215     }
 216 
 217     // interrupt should not close channel
 218     static void testInterruptHandlerThread(final AsynchronousFileChannel ch) {
 219         System.out.println("testInterruptHandlerThread");
 220 
 221         ByteBuffer buf = ByteBuffer.allocateDirect(100);
 222         final CountDownLatch latch = new CountDownLatch(1);
 223 
 224         ch.read(buf, 0L, (Void)null, new CompletionHandler<Integer,Void>() {
 225             public void completed(Integer result, Void att) {
 226                 try {
 227                     Thread.currentThread().interrupt();
 228                     long size = ch.size();
 229                     latch.countDown();
 230                 } catch (IOException x) {
 231                     x.printStackTrace();
 232                 }
 233             }
 234             public void failed(Throwable exc, Void att) {
 235             }
 236         });
 237 
 238         // wait for handler to complete
 239         await(latch);
 240     }
 241 
 242     // invoke method on closed channel
 243     static void testClosedChannel(AsynchronousFileChannel ch) {
 244         System.out.println("testClosedChannel");
 245 
 246         if (ch.isOpen())
 247             throw new RuntimeException("Channel should be closed");
 248 
 249         ByteBuffer buf = ByteBuffer.allocateDirect(100);
 250 
 251         // check read fails with ClosedChannelException
 252         try {
 253             ch.read(buf, 0L).get();
 254             throw new RuntimeException("ExecutionException expected");
 255         } catch (ExecutionException x) {
 256             if (!(x.getCause() instanceof ClosedChannelException))
 257                 throw new RuntimeException("Cause of ClosedChannelException expected");
 258         } catch (InterruptedException x) {
 259         }
 260 
 261         // check write fails with ClosedChannelException
 262         try {
 263             ch.write(buf, 0L).get();
 264             throw new RuntimeException("ExecutionException expected");
 265         } catch (ExecutionException x) {
 266             if (!(x.getCause() instanceof ClosedChannelException))
 267                 throw new RuntimeException("Cause of ClosedChannelException expected");
 268         } catch (InterruptedException x) {
 269         }
 270 
 271         // check lock fails with ClosedChannelException
 272         try {
 273             ch.lock().get();
 274             throw new RuntimeException("ExecutionException expected");
 275         } catch (ExecutionException x) {
 276             if (!(x.getCause() instanceof ClosedChannelException))
 277                 throw new RuntimeException("Cause of ClosedChannelException expected");
 278         } catch (InterruptedException x) {
 279         }
 280     }
 281 
 282 
 283     // exercise custom thread pool
 284     static void testCustomThreadPool(Path file) throws IOException {
 285         System.out.println("testCustomThreadPool");
 286 
 287         // records threads that are created
 288         final List<Thread> threads = new ArrayList<Thread>();
 289 
 290         ThreadFactory threadFactory = new ThreadFactory() {
 291              @Override
 292              public Thread newThread(Runnable r) {
 293                  Thread t = new Thread(r);
 294                  t.setDaemon(true);
 295                  synchronized (threads) {
 296                      threads.add(t);
 297                  }
 298                  return t;
 299              }
 300         };
 301 
 302         // exercise tests with varied number of threads
 303         for (int nThreads=1; nThreads<=5; nThreads++) {
 304             synchronized (threads) {
 305                 threads.clear();
 306             }
 307             ExecutorService executor = Executors.newFixedThreadPool(nThreads, threadFactory);
 308             Set<StandardOpenOption> opts = EnumSet.of(WRITE);
 309             AsynchronousFileChannel ch = AsynchronousFileChannel.open(file, opts, executor);
 310             try {
 311                 for (int i=0; i<10; i++) {
 312                     // do I/O operation to see which thread invokes the completion handler
 313                     final AtomicReference<Thread> invoker = new AtomicReference<Thread>();
 314                     final CountDownLatch latch = new CountDownLatch(1);
 315 
 316                     ch.write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
 317                         public void completed(Integer result, Void att) {
 318                             invoker.set(Thread.currentThread());
 319                             latch.countDown();
 320                         }
 321                         public void failed(Throwable exc, Void att) {
 322                         }
 323                     });
 324                     await(latch);
 325 
 326                     // check invoker
 327                     boolean found = false;
 328                     synchronized (threads) {
 329                         for (Thread t: threads) {
 330                             if (t == invoker.get()) {
 331                                 found = true;
 332                                 break;
 333                             }
 334                         }
 335                     }
 336                     if (!found)
 337                         throw new RuntimeException("Invoker thread not found");
 338                 }
 339             } finally {
 340                 ch.close();
 341                 executor.shutdown();
 342             }
 343         }
 344 
 345 
 346         // test sharing a thread pool between many channels
 347         ExecutorService executor = Executors
 348             .newFixedThreadPool(1+rand.nextInt(10), threadFactory);
 349         final int n = 50 + rand.nextInt(50);
 350         AsynchronousFileChannel[] channels = new AsynchronousFileChannel[n];
 351         try {
 352             for (int i=0; i<n; i++) {
 353                 Set<StandardOpenOption> opts = EnumSet.of(WRITE);
 354                 channels[i] = AsynchronousFileChannel.open(file, opts, executor);
 355                 final CountDownLatch latch = new CountDownLatch(1);
 356                 channels[i].write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
 357                     public void completed(Integer result, Void att) {
 358                         latch.countDown();
 359                     }
 360                     public void failed(Throwable exc, Void att) {
 361                     }
 362                 });
 363                 await(latch);
 364 
 365                 // close ~half the channels
 366                 if (rand.nextBoolean())
 367                     channels[i].close();
 368             }
 369         } finally {
 370             // close remaining channels
 371             for (int i=0; i<n; i++) {
 372                 if (channels[i] != null) channels[i].close();
 373             }
 374             executor.shutdown();
 375         }
 376     }
 377 
 378     // exercise asynchronous close
 379     static void testAsynchronousClose(Path file) throws IOException {
 380         System.out.println("testAsynchronousClose");
 381 
 382         // create file
 383         AsynchronousFileChannel ch = AsynchronousFileChannel
 384             .open(file, WRITE, TRUNCATE_EXISTING);
 385         long size = 0L;
 386         do {
 387             ByteBuffer buf = genBuffer();
 388             int n = buf.remaining();
 389             writeFully(ch, buf, size);
 390             size += n;
 391         } while (size < (50L * 1024L * 1024L));
 392 
 393         ch.close();
 394 
 395         ch = AsynchronousFileChannel.open(file, WRITE, SYNC);
 396 
 397         // randomize number of writers, buffer size, and positions
 398 
 399         int nwriters = 1 + rand.nextInt(8);
 400         ByteBuffer[] buf = new ByteBuffer[nwriters];
 401         long[] position = new long[nwriters];
 402         for (int i=0; i<nwriters; i++) {
 403             buf[i] = genBuffer();
 404             position[i] = rand.nextInt((int)size);
 405         }
 406 
 407         // initiate I/O
 408         Future[] result = new Future[nwriters];
 409         for (int i=0; i<nwriters; i++) {
 410             result[i] = ch.write(buf[i], position[i]);
 411         }
 412 
 413         // close file
 414         ch.close();
 415 
 416         // write operations should complete or fail with AsynchronousCloseException
 417         for (int i=0; i<nwriters; i++) {
 418             try {
 419                 result[i].get();
 420             } catch (ExecutionException x) {
 421                 Throwable cause = x.getCause();
 422                 if (!(cause instanceof AsynchronousCloseException))
 423                     throw new RuntimeException(cause);
 424             } catch (CancellationException  x) {
 425                 throw new RuntimeException(x);   // should not happen
 426             } catch (InterruptedException x) {
 427                 throw new RuntimeException(x);   // should not happen
 428             }
 429         }
 430     }
 431 
 432     // exercise cancel method
 433     static void testCancel(Path file) throws IOException {
 434         System.out.println("testCancel");
 435 
 436         for (int i=0; i<2; i++) {
 437             boolean mayInterruptIfRunning = (i == 0) ? false : true;
 438 
 439             // open with SYNC option to improve chances that write will not
 440             // complete immediately
 441             AsynchronousFileChannel ch = AsynchronousFileChannel
 442                 .open(file, WRITE, SYNC);
 443 
 444             // start write operation
 445             Future<Integer> res = ch.write(genBuffer(), 0L);
 446 
 447             // cancel operation
 448             boolean cancelled = res.cancel(mayInterruptIfRunning);
 449 
 450             // check post-conditions
 451             if (!res.isDone())
 452                 throw new RuntimeException("isDone should return true");
 453             if (res.isCancelled() != cancelled)
 454                 throw new RuntimeException("isCancelled not consistent");
 455             try {
 456                 res.get();
 457                 if (cancelled)
 458                     throw new RuntimeException("CancellationException expected");
 459             } catch (CancellationException x) {
 460                 if (!cancelled)
 461                     throw new RuntimeException("CancellationException not expected");
 462             } catch (ExecutionException x) {
 463                 throw new RuntimeException(x);
 464             } catch (InterruptedException x) {
 465                 throw new RuntimeException(x);
 466             }
 467             try {
 468                 res.get(1, TimeUnit.SECONDS);
 469                 if (cancelled)
 470                     throw new RuntimeException("CancellationException expected");
 471             } catch (CancellationException x) {
 472                 if (!cancelled)
 473                     throw new RuntimeException("CancellationException not expected");
 474             } catch (ExecutionException x) {
 475                 throw new RuntimeException(x);
 476             } catch (TimeoutException x) {
 477                 throw new RuntimeException(x);
 478             } catch (InterruptedException x) {
 479                 throw new RuntimeException(x);
 480             }
 481 
 482             ch.close();
 483         }
 484     }
 485 
 486     // exercise truncate method
 487     static void testTruncate(Path file) throws IOException {
 488         System.out.println("testTruncate");
 489 
 490         // basic tests
 491         AsynchronousFileChannel ch = AsynchronousFileChannel
 492             .open(file, CREATE, WRITE, TRUNCATE_EXISTING);
 493         try {
 494             writeFully(ch, genBuffer(), 0L);
 495             long size = ch.size();
 496 
 497             // attempt to truncate to a size greater than the current size
 498             if (ch.truncate(size + 1L).size() != size)
 499                 throw new RuntimeException("Unexpected size after truncation");
 500 
 501             // truncate file
 502             if (ch.truncate(size - 1L).size() != (size - 1L))
 503                 throw new RuntimeException("Unexpected size after truncation");
 504 
 505             // invalid size
 506             try {
 507                 ch.truncate(-1L);
 508                 throw new RuntimeException("IllegalArgumentException expected");
 509             } catch (IllegalArgumentException e) { }
 510 
 511         } finally {
 512             ch.close();
 513         }
 514 
 515         // channel is closed
 516         try {
 517             ch.truncate(0L);
 518             throw new RuntimeException("ClosedChannelException expected");
 519         } catch (ClosedChannelException  e) { }
 520 
 521         // channel is read-only
 522         ch = AsynchronousFileChannel.open(file, READ);
 523         try {
 524             try {
 525             ch.truncate(0L);
 526                 throw new RuntimeException("NonWritableChannelException expected");
 527             } catch (NonWritableChannelException  e) { }
 528         } finally {
 529             ch.close();
 530         }
 531     }
 532 
 533     // returns ByteBuffer with random bytes
 534     static ByteBuffer genBuffer() {
 535         int size = 1024 + rand.nextInt(16000);
 536         byte[] buf = new byte[size];
 537         boolean useDirect = rand.nextBoolean();
 538         if (useDirect) {
 539             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 540             bb.put(buf);
 541             bb.flip();
 542             return bb;
 543         } else {
 544             return ByteBuffer.wrap(buf);
 545         }
 546     }
 547 
 548     // writes all remaining bytes in the buffer to the given channel at the
 549     // given position
 550     static void writeFully(final AsynchronousFileChannel ch,
 551                            final ByteBuffer src,
 552                            long position)
 553     {
 554         final CountDownLatch latch = new CountDownLatch(1);
 555 
 556         // use position as attachment
 557         ch.write(src, position, position, new CompletionHandler<Integer,Long>() {
 558             public void completed(Integer result, Long position) {
 559                 int n = result;
 560                 if (src.hasRemaining()) {
 561                     long p = position + n;
 562                     ch.write(src, p, p, this);
 563                 } else {
 564                     latch.countDown();
 565                 }
 566             }
 567             public void failed(Throwable exc, Long position) {
 568             }
 569         });
 570 
 571         // wait for writes to complete
 572         await(latch);
 573     }
 574 
 575     static void readAll(final AsynchronousFileChannel ch,
 576                         final ByteBuffer dst,
 577                        long position)
 578     {
 579         final CountDownLatch latch = new CountDownLatch(1);
 580 
 581         // use position as attachment
 582         ch.read(dst, position, position, new CompletionHandler<Integer,Long>() {
 583             public void completed(Integer result, Long position) {
 584                 int n = result;
 585                 if (n > 0) {
 586                     long p = position + n;
 587                     ch.read(dst, p, p, this);
 588                 } else {
 589                     latch.countDown();
 590                 }
 591             }
 592             public void failed(Throwable exc, Long position) {
 593             }
 594         });
 595 
 596         // wait for reads to complete
 597         await(latch);
 598     }
 599 
 600     static void await(CountDownLatch latch) {
 601         // wait until done
 602         boolean done = false;
 603         while (!done) {
 604             try {
 605                 latch.await();
 606                 done = true;
 607             } catch (InterruptedException x) { }
 608         }
 609     }
 610 }