1 /* 2 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 26 * @summary Unit test for AsynchronousChannelGroup 27 */ 28 29 import java.nio.ByteBuffer; 30 import java.nio.channels.*; 31 import java.net.*; 32 import java.util.*; 33 import java.util.concurrent.*; 34 import java.util.concurrent.atomic.*; 35 import java.io.IOException; 36 37 /** 38 * Tests that the completion handler is invoked by a thread with 39 * the expected identity. 40 */ 41 42 public class Identity { 43 static final Random rand = new Random(); 44 static final CountDownLatch done = new CountDownLatch(1); 45 static final AtomicBoolean failed = new AtomicBoolean(false); 46 47 static void fail(String msg) { 48 failed.set(true); 49 done.countDown(); 50 throw new RuntimeException(msg); 51 } 52 53 // thread-local identifies the thread 54 private static final ThreadLocal<Integer> myGroup = 55 new ThreadLocal<Integer>() { 56 @Override protected Integer initialValue() { 57 return Integer.valueOf(-1); 58 } 59 }; 60 61 // creates a ThreadFactory that constructs groups with the given identity 62 static final ThreadFactory createThreadFactory(final int groupId) { 63 return new ThreadFactory() { 64 @Override 65 public Thread newThread(final Runnable r) { 66 Thread t = new Thread(new Runnable() { 67 public void run() { 68 myGroup.set(groupId); 69 r.run(); 70 }}); 71 t.setDaemon(true); 72 return t; 73 } 74 }; 75 } 76 77 public static void main(String[] args) throws Exception { 78 // create listener to accept connections 79 final AsynchronousServerSocketChannel listener = 80 AsynchronousServerSocketChannel.open() 81 .bind(new InetSocketAddress(0)); 82 listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { 83 public void completed(final AsynchronousSocketChannel ch, Void att) { 84 listener.accept((Void)null, this); 85 final ByteBuffer buf = ByteBuffer.allocate(100); 86 ch.read(buf, ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 87 public void completed(Integer bytesRead, AsynchronousSocketChannel ch) { 88 if (bytesRead < 0) { 89 try { ch.close(); } catch (IOException ignore) { } 90 } else { 91 buf.clear(); 92 ch.read(buf, ch, this); 93 } 94 } 95 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 96 try { ch.close(); } catch (IOException ignore) { } 97 } 98 }); 99 } 100 public void failed(Throwable exc, Void att) { 101 } 102 }); 103 int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort(); 104 SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port); 105 106 // create 3-10 channels, each in its own group 107 final int groupCount = 3 + rand.nextInt(8); 108 AsynchronousChannelGroup[] groups = new AsynchronousChannelGroup[groupCount]; 109 final AsynchronousSocketChannel[] channels = new AsynchronousSocketChannel[groupCount]; 110 for (int i=0; i<groupCount; i++) { 111 ThreadFactory factory = createThreadFactory(i); 112 AsynchronousChannelGroup group; 113 if (rand.nextBoolean()) { 114 int nThreads = 1 + rand.nextInt(10); 115 group = AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory); 116 } else { 117 ExecutorService pool = Executors.newCachedThreadPool(factory); 118 group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5)); 119 } 120 groups[i] = group; 121 122 // create channel in group and connect it to the server 123 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(group); 124 ch.connect(sa).get(); 125 channels[i] = ch; 126 } 127 128 // randomly write to each channel, ensuring that the completion handler 129 // is always invoked by a thread with the right identity. 130 final AtomicInteger writeCount = new AtomicInteger(100); 131 channels[0].write(getBuffer(), 0, new CompletionHandler<Integer,Integer>() { 132 public void completed(Integer bytesWritten, Integer groupId) { 133 if (bytesWritten != 1) 134 fail("Expected 1 byte to be written"); 135 if (!myGroup.get().equals(groupId)) 136 fail("Handler invoked by thread with the wrong identity"); 137 if (writeCount.decrementAndGet() > 0) { 138 int id = rand.nextInt(groupCount); 139 channels[id].write(getBuffer(), id, this); 140 } else { 141 done.countDown(); 142 } 143 } 144 public void failed(Throwable exc, Integer groupId) { 145 fail(exc.getMessage()); 146 } 147 }); 148 149 // wait until done 150 done.await(); 151 152 // clean-up 153 for (AsynchronousSocketChannel ch: channels) 154 ch.close(); 155 for (AsynchronousChannelGroup group: groups) 156 group.shutdownNow(); 157 listener.close(); 158 159 if (failed.get()) 160 throw new RuntimeException("Test failed - see log for details"); 161 } 162 163 static ByteBuffer getBuffer() { 164 ByteBuffer buf; 165 if (rand.nextBoolean()) { 166 buf = ByteBuffer.allocateDirect(1); 167 } else { 168 buf = ByteBuffer.allocate(1); 169 } 170 buf.put((byte)0); 171 buf.flip(); 172 return buf; 173 } 174 }