1 /* 2 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 26 * @summary Unit test for AsynchronousChannelGroup 27 * @key randomness 28 */ 29 30 import java.nio.ByteBuffer; 31 import java.nio.channels.*; 32 import java.net.*; 33 import java.util.*; 34 import java.util.concurrent.*; 35 import java.util.concurrent.atomic.*; 36 import java.io.IOException; 37 38 /** 39 * Tests that the completion handler is invoked by a thread with 40 * the expected identity. 41 */ 42 43 public class Identity { 44 static final Random rand = new Random(); 45 static final CountDownLatch done = new CountDownLatch(1); 46 static final AtomicBoolean failed = new AtomicBoolean(false); 47 48 static void fail(String msg) { 49 failed.set(true); 50 done.countDown(); 51 throw new RuntimeException(msg); 52 } 53 54 // thread-local identifies the thread 55 private static final ThreadLocal<Integer> myGroup = 56 new ThreadLocal<Integer>() { 57 @Override protected Integer initialValue() { 58 return Integer.valueOf(-1); 59 } 60 }; 61 62 // creates a ThreadFactory that constructs groups with the given identity 63 static final ThreadFactory createThreadFactory(final int groupId) { 64 return new ThreadFactory() { 65 @Override 66 public Thread newThread(final Runnable r) { 67 Thread t = new Thread(new Runnable() { 68 public void run() { 69 myGroup.set(groupId); 70 r.run(); 71 }}); 72 t.setDaemon(true); 73 return t; 74 } 75 }; 76 } 77 78 public static void main(String[] args) throws Exception { 79 // create listener to accept connections 80 final AsynchronousServerSocketChannel listener = 81 AsynchronousServerSocketChannel.open() 82 .bind(new InetSocketAddress(0)); 83 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { 84 public void completed(final AsynchronousSocketChannel ch, Void att) { 85 listener.accept((Void)null, this); 86 final ByteBuffer buf = ByteBuffer.allocate(100); 87 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 88 public void completed(Integer bytesRead, AsynchronousSocketChannel ch) { 89 if (bytesRead < 0) { 90 try { ch.close(); } catch (IOException ignore) { } 91 } else { 92 buf.clear(); 93 ch.read(buf, ch, this); 94 } 95 } 96 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 97 try { ch.close(); } catch (IOException ignore) { } 98 } 99 }); 100 } 101 public void failed(Throwable exc, Void att) { 102 } 103 }); 104 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); 105 SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); 106 107 // create 3-10 channels, each in its own group 108 final int groupCount = 3 + rand.nextInt(8); 109 AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount]; 110 final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount]; 111 for (int i=0; i<groupCount; i++) { 112 ThreadFactory factory = createThreadFactory(i); 113 AsynchronousChannelGroup group; 114 if (rand.nextBoolean()) { 115 int nThreads = 1 + rand.nextInt(10); 116 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory); 117 } else { 118 ExecutorService pool = Executors.newCachedThreadPool(factory); 119 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5)); 120 } 121 groups[i] = group; 122 123 // create channel in group and connect it to the server 124 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group); 125 ch.connect(sa).get(); 126 channels[i] = ch; 127 } 128 129 // randomly write to each channel, ensuring that the completion handler 130 // is always invoked by a thread with the right identity. 131 final AtomicInteger writeCount = new AtomicInteger(100); 132 channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() { 133 public void completed(Integer bytesWritten, Integer groupId) { 134 if (bytesWritten != 1) 135 fail("Expected 1 byte to be written"); 136 if (!myGroup.get().equals(groupId)) 137 fail("Handler invoked by thread with the wrong identity"); 138 if (writeCount.decrementAndGet() > 0) { 139 int id = rand.nextInt(groupCount); 140 channels[id].write(getBuffer(), id, this); 141 } else { 142 done.countDown(); 143 } 144 } 145 public void failed(Throwable exc, Integer groupId) { 146 fail(exc.getMessage()); 147 } 148 }); 149 150 // wait until done 151 done.await(); 152 153 // clean-up 154 for (AsynchronousSocketChannel ch: channels) 155 ch.close(); 156 for (AsynchronousChannelGroup group: groups) 157 group.shutdownNow(); 158 listener.close(); 159 160 if (failed.get()) 161 throw new RuntimeException("Test failed - see log for details"); 162 } 163 164 static ByteBuffer getBuffer() { 165 ByteBuffer buf; 166 if (rand.nextBoolean()) { 167 buf = ByteBuffer.allocateDirect(1); 168 } else { 169 buf = ByteBuffer.allocate(1); 170 } 171 buf.put((byte)0); 172 buf.flip(); 173 return buf; 174 } 175 }