1 /*
   2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  20  * CA 95054 USA or visit www.sun.com if you need additional information or
  21  * have any questions.
  22  */
  23 
  24 /* @test
  25  * @bug 6834246
  26  * @summary Stress test connections through the loopback interface
  27  */
  28 
  29 import java.nio.ByteBuffer;
  30 import java.net.*;
  31 import java.nio.channels.*;
  32 import java.util.Random;
  33 import java.io.IOException;
  34 
  35 public class StressLoopback {
  36     static final Random rand = new Random();
  37 
  38     public static void main(String[] args) throws Exception {
  39         // setup listener
  40         AsynchronousServerSocketChannel listener =
  41             AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
  42         int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();
  43         InetAddress lh = InetAddress.getLocalHost();
  44         SocketAddress remote = new InetSocketAddress(lh, port);
  45 
  46         // create sources and sinks
  47         int count = 2 + rand.nextInt(9);
  48         Source[] source = new Source[count];
  49         Sink[] sink = new Sink[count];
  50         for (int i=0; i<count; i++) {
  51             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
  52             ch.connect(remote).get();
  53             source[i] = new Source(ch);
  54             sink[i] = new Sink(listener.accept().get());
  55         }
  56 
  57         // start the sinks and sources
  58         for (int i=0; i<count; i++) {
  59             sink[i].start();
  60             source[i].start();
  61         }
  62 
  63         // let the test run for a while
  64         Thread.sleep(20*1000);
  65 
  66         // wait until everyone is done
  67         boolean failed = false;
  68         for (int i=0; i<count; i++) {
  69             long nwrote = source[i].finish();
  70             long nread = sink[i].finish();
  71             if (nread != nwrote)
  72                 failed = true;
  73             System.out.format("%d -> %d (%s)\n",
  74                 nwrote, nread, (failed) ? "FAIL" : "PASS");
  75         }
  76         if (failed)
  77             throw new RuntimeException("Test failed - see log for details");
  78     }
  79 
  80     /**
  81      * Writes bytes to a channel until "done". When done the channel is closed.
  82      */
  83     static class Source {
  84         private final AsynchronousByteChannel channel;
  85         private final ByteBuffer sentBuffer;
  86         private volatile long bytesSent;
  87         private volatile boolean finished;
  88 
  89         Source(AsynchronousByteChannel channel) {
  90             this.channel = channel;
  91             int size = 1024 + rand.nextInt(10000);
  92             this.sentBuffer = (rand.nextBoolean()) ?
  93                 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
  94         }
  95 
  96         void start() {
  97             sentBuffer.position(0);
  98             sentBuffer.limit(sentBuffer.capacity());
  99             channel.write(sentBuffer, null, new CompletionHandler<Integer,Void> () {
 100                 public void completed(Integer nwrote, Void att) {
 101                     bytesSent += nwrote;
 102                     if (finished) {
 103                         closeUnchecked(channel);
 104                     } else {
 105                         sentBuffer.position(0);
 106                         sentBuffer.limit(sentBuffer.capacity());
 107                         channel.write(sentBuffer, null, this);
 108                     }
 109                 }
 110                 public void failed(Throwable exc, Void att) {
 111                     exc.printStackTrace();
 112                     closeUnchecked(channel);
 113                 }
 114                 public void cancelled(Void att) {
 115                 }
 116             });
 117         }
 118 
 119         long finish() {
 120             finished = true;
 121             waitUntilClosed(channel);
 122             return bytesSent;
 123         }
 124     }
 125 
 126     /**
 127      * Read bytes from a channel until EOF is received.
 128      */
 129     static class Sink {
 130         private final AsynchronousByteChannel channel;
 131         private final ByteBuffer readBuffer;
 132         private volatile long bytesRead;
 133 
 134         Sink(AsynchronousByteChannel channel) {
 135             this.channel = channel;
 136             int size = 1024 + rand.nextInt(10000);
 137             this.readBuffer = (rand.nextBoolean()) ?
 138                 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
 139         }
 140 
 141         void start() {
 142             channel.read(readBuffer, null, new CompletionHandler<Integer,Void> () {
 143                 public void completed(Integer nread, Void att) {
 144                     if (nread < 0) {
 145                         closeUnchecked(channel);
 146                     } else {
 147                         bytesRead += nread;
 148                         readBuffer.clear();
 149                         channel.read(readBuffer, null, this);
 150                     }
 151                 }
 152                 public void failed(Throwable exc, Void att) {
 153                     exc.printStackTrace();
 154                     closeUnchecked(channel);
 155                 }
 156                 public void cancelled(Void att) {
 157                 }
 158             });
 159         }
 160 
 161         long finish() {
 162             waitUntilClosed(channel);
 163             return bytesRead;
 164         }
 165     }
 166 
 167     static void waitUntilClosed(Channel c) {
 168         while (c.isOpen()) {
 169             try {
 170                 Thread.sleep(100);
 171             } catch (InterruptedException ignore) { }
 172         }
 173     }
 174 
 175     static void closeUnchecked(Channel c) {
 176         try {
 177             c.close();
 178         } catch (IOException ignore) { }
 179     }
 180 }