1 /*
   2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 6834246 6842687
  26  * @summary Stress test connections through the loopback interface
  27  * @run main StressLoopback
  28  * @run main/othervm -Djdk.net.useFastTcpLoopback StressLoopback
  29  * @key randomness
  30  */
  31 
  32 import java.nio.ByteBuffer;
  33 import java.net.*;
  34 import java.nio.channels.*;
  35 import java.util.Random;
  36 import java.io.IOException;
  37 
  38 public class StressLoopback {
  39     static final Random rand = new Random();
  40 
  41     public static void main(String[] args) throws Exception {
  42         // setup listener
  43         AsynchronousServerSocketChannel listener =
  44             AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
  45         int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();
  46         InetAddress lh = InetAddress.getLocalHost();
  47         SocketAddress remote = new InetSocketAddress(lh, port);
  48 
  49         // create sources and sinks
  50         int count = 2 + rand.nextInt(9);
  51         Source[] source = new Source[count];
  52         Sink[] sink = new Sink[count];
  53         for (int i=0; i<count; i++) {
  54             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
  55             ch.connect(remote).get();
  56             source[i] = new Source(ch);
  57             sink[i] = new Sink(listener.accept().get());
  58         }
  59 
  60         // start the sinks and sources
  61         for (int i=0; i<count; i++) {
  62             sink[i].start();
  63             source[i].start();
  64         }
  65 
  66         // let the test run for a while
  67         Thread.sleep(20*1000);
  68 
  69         // wait until everyone is done
  70         boolean failed = false;
  71         long total = 0L;
  72         for (int i=0; i<count; i++) {
  73             long nwrote = source[i].finish();
  74             long nread = sink[i].finish();
  75             if (nread != nwrote)
  76                 failed = true;
  77             System.out.format("%d -> %d (%s)\n",
  78                 nwrote, nread, (failed) ? "FAIL" : "PASS");
  79             total += nwrote;
  80         }
  81         if (failed)
  82             throw new RuntimeException("Test failed - see log for details");
  83         System.out.format("Total sent %d MB\n", total / (1024L * 1024L));
  84     }
  85 
  86     /**
  87      * Writes bytes to a channel until "done". When done the channel is closed.
  88      */
  89     static class Source {
  90         private final AsynchronousByteChannel channel;
  91         private final ByteBuffer sentBuffer;
  92         private volatile long bytesSent;
  93         private volatile boolean finished;
  94 
  95         Source(AsynchronousByteChannel channel) {
  96             this.channel = channel;
  97             int size = 1024 + rand.nextInt(10000);
  98             this.sentBuffer = (rand.nextBoolean()) ?
  99                 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
 100         }
 101 
 102         void start() {
 103             sentBuffer.position(0);
 104             sentBuffer.limit(sentBuffer.capacity());
 105             channel.write(sentBuffer, (Void)null, new CompletionHandler<Integer,Void> () {
 106                 public void completed(Integer nwrote, Void att) {
 107                     bytesSent += nwrote;
 108                     if (finished) {
 109                         closeUnchecked(channel);
 110                     } else {
 111                         sentBuffer.position(0);
 112                         sentBuffer.limit(sentBuffer.capacity());
 113                         channel.write(sentBuffer, (Void)null, this);
 114                     }
 115                 }
 116                 public void failed(Throwable exc, Void att) {
 117                     exc.printStackTrace();
 118                     closeUnchecked(channel);
 119                 }
 120             });
 121         }
 122 
 123         long finish() {
 124             finished = true;
 125             waitUntilClosed(channel);
 126             return bytesSent;
 127         }
 128     }
 129 
 130     /**
 131      * Read bytes from a channel until EOF is received.
 132      */
 133     static class Sink {
 134         private final AsynchronousByteChannel channel;
 135         private final ByteBuffer readBuffer;
 136         private volatile long bytesRead;
 137 
 138         Sink(AsynchronousByteChannel channel) {
 139             this.channel = channel;
 140             int size = 1024 + rand.nextInt(10000);
 141             this.readBuffer = (rand.nextBoolean()) ?
 142                 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
 143         }
 144 
 145         void start() {
 146             channel.read(readBuffer, (Void)null, new CompletionHandler<Integer,Void> () {
 147                 public void completed(Integer nread, Void att) {
 148                     if (nread < 0) {
 149                         closeUnchecked(channel);
 150                     } else {
 151                         bytesRead += nread;
 152                         readBuffer.clear();
 153                         channel.read(readBuffer, (Void)null, this);
 154                     }
 155                 }
 156                 public void failed(Throwable exc, Void att) {
 157                     exc.printStackTrace();
 158                     closeUnchecked(channel);
 159                 }
 160             });
 161         }
 162 
 163         long finish() {
 164             waitUntilClosed(channel);
 165             return bytesRead;
 166         }
 167     }
 168 
 169     static void waitUntilClosed(Channel c) {
 170         while (c.isOpen()) {
 171             try {
 172                 Thread.sleep(100);
 173             } catch (InterruptedException ignore) { }
 174         }
 175     }
 176 
 177     static void closeUnchecked(Channel c) {
 178         try {
 179             c.close();
 180         } catch (IOException ignore) { }
 181     }
 182 }