1 /* 2 * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 20 * CA 95054 USA or visit www.sun.com if you need additional information or 21 * have any questions. 22 */ 23 24 /* @test 25 * @bug 6834246 26 * @summary Stress test connections through the loopback interface 27 */ 28 29 import java.nio.ByteBuffer; 30 import java.net.*; 31 import java.nio.channels.*; 32 import java.util.Random; 33 import java.io.IOException; 34 35 public class StressLoopback { 36 static final Random rand = new Random(); 37 38 public static void main(String[] args) throws Exception { 39 // setup listener 40 AsynchronousServerSocketChannel listener = 41 AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0)); 42 int port =((InetSocketAddress)(listener.getLocalAddress())).getPort(); 43 InetAddress lh = InetAddress.getLocalHost(); 44 SocketAddress remote = new InetSocketAddress(lh, port); 45 46 // create sources and sinks 47 int count = 2 + rand.nextInt(9); 48 Source[] source = new Source[count]; 49 Sink[] sink = new Sink[count]; 50 for (int i=0; i<count; i++) { 51 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 52 ch.connect(remote).get(); 53 source[i] = new Source(ch); 54 sink[i] = new Sink(listener.accept().get()); 55 } 56 57 // start the sinks and sources 58 for (int i=0; i<count; i++) { 59 sink[i].start(); 60 source[i].start(); 61 } 62 63 // let the test run for a while 64 Thread.sleep(20*1000); 65 66 // wait until everyone is done 67 boolean failed = false; 68 for (int i=0; i<count; i++) { 69 long nwrote = source[i].finish(); 70 long nread = sink[i].finish(); 71 if (nread != nwrote) 72 failed = true; 73 System.out.format("%d -> %d (%s)\n", 74 nwrote, nread, (failed) ? "FAIL" : "PASS"); 75 } 76 if (failed) 77 throw new RuntimeException("Test failed - see log for details"); 78 } 79 80 /** 81 * Writes bytes to a channel until "done". When done the channel is closed. 82 */ 83 static class Source { 84 private final AsynchronousByteChannel channel; 85 private final ByteBuffer sentBuffer; 86 private volatile long bytesSent; 87 private volatile boolean finished; 88 89 Source(AsynchronousByteChannel channel) { 90 this.channel = channel; 91 int size = 1024 + rand.nextInt(10000); 92 this.sentBuffer = (rand.nextBoolean()) ? 93 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); 94 } 95 96 void start() { 97 sentBuffer.position(0); 98 sentBuffer.limit(sentBuffer.capacity()); 99 channel.write(sentBuffer, null, new CompletionHandler<Integer,Void> () { 100 public void completed(Integer nwrote, Void att) { 101 bytesSent += nwrote; 102 if (finished) { 103 closeUnchecked(channel); 104 } else { 105 sentBuffer.position(0); 106 sentBuffer.limit(sentBuffer.capacity()); 107 channel.write(sentBuffer, null, this); 108 } 109 } 110 public void failed(Throwable exc, Void att) { 111 exc.printStackTrace(); 112 closeUnchecked(channel); 113 } 114 public void cancelled(Void att) { 115 } 116 }); 117 } 118 119 long finish() { 120 finished = true; 121 waitUntilClosed(channel); 122 return bytesSent; 123 } 124 } 125 126 /** 127 * Read bytes from a channel until EOF is received. 128 */ 129 static class Sink { 130 private final AsynchronousByteChannel channel; 131 private final ByteBuffer readBuffer; 132 private volatile long bytesRead; 133 134 Sink(AsynchronousByteChannel channel) { 135 this.channel = channel; 136 int size = 1024 + rand.nextInt(10000); 137 this.readBuffer = (rand.nextBoolean()) ? 138 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); 139 } 140 141 void start() { 142 channel.read(readBuffer, null, new CompletionHandler<Integer,Void> () { 143 public void completed(Integer nread, Void att) { 144 if (nread < 0) { 145 closeUnchecked(channel); 146 } else { 147 bytesRead += nread; 148 readBuffer.clear(); 149 channel.read(readBuffer, null, this); 150 } 151 } 152 public void failed(Throwable exc, Void att) { 153 exc.printStackTrace(); 154 closeUnchecked(channel); 155 } 156 public void cancelled(Void att) { 157 } 158 }); 159 } 160 161 long finish() { 162 waitUntilClosed(channel); 163 return bytesRead; 164 } 165 } 166 167 static void waitUntilClosed(Channel c) { 168 while (c.isOpen()) { 169 try { 170 Thread.sleep(100); 171 } catch (InterruptedException ignore) { } 172 } 173 } 174 175 static void closeUnchecked(Channel c) { 176 try { 177 c.close(); 178 } catch (IOException ignore) { } 179 } 180 }