1 /*
   2  * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.bench.java.net;
  24 
  25 import org.openjdk.jmh.annotations.Benchmark;
  26 import org.openjdk.jmh.annotations.BenchmarkMode;
  27 import org.openjdk.jmh.annotations.Mode;
  28 import org.openjdk.jmh.annotations.OutputTimeUnit;
  29 import org.openjdk.jmh.annotations.Scope;
  30 import org.openjdk.jmh.annotations.Setup;
  31 import org.openjdk.jmh.annotations.State;
  32 import org.openjdk.jmh.annotations.TearDown;
  33 
  34 import java.io.IOException;
  35 import java.io.InputStream;
  36 import java.io.OutputStream;
  37 import java.net.InetAddress;
  38 import java.net.ServerSocket;
  39 import java.net.Socket;
  40 import java.util.concurrent.TimeUnit;
  41 
  42 /**
  43  * Micro benchmark for streaming data over a Socket.
  44  */
  45 @BenchmarkMode(Mode.AverageTime)
  46 @OutputTimeUnit(TimeUnit.MILLISECONDS)
  47 @State(Scope.Thread)
  48 public class SocketStreaming {
  49 
  50     /** The bytes to write/read. */
  51     public static final int dataLength = 16383;
  52     /** setTcpNoDelay(noNagle) */
  53     public static final boolean noNagle = false;
  54 
  55     private WriterThread writerThread;
  56     private Socket readSocket;
  57     private byte[] bytes;
  58 
  59     @Setup
  60     public void prepare() throws Exception {
  61         bytes = new byte[dataLength];
  62 
  63         // Setup the writer thread
  64         writerThread = new WriterThread(dataLength, noNagle);
  65         writerThread.start();
  66 
  67         // Wait for a read socket
  68         readSocket = writerThread.waitForReadSocket();
  69     }
  70 
  71     @TearDown
  72     public void cleanup() throws IOException {
  73         // Take down the writer thread and the reader socket
  74         writerThread.finish();
  75         while (!readSocket.isClosed()) {
  76             readSocket.close();
  77         }
  78         readSocket = null;
  79     }
  80 
  81     @Benchmark
  82     public void testSocketInputStreamRead() throws InterruptedException, IOException {
  83         InputStream in = readSocket.getInputStream();
  84 
  85         // Notify the writer thread to add elements to stream
  86         writerThread.requestSendBytes();
  87 
  88         // Read these from the stream
  89         int bytesRead = 0;
  90         while (bytesRead < dataLength) {
  91             int lastRead = in.read(bytes);
  92             if (lastRead < 0) {
  93                 throw new InternalError("Unexpectedly got " + lastRead + " bytes from the socket");
  94             }
  95             bytesRead += lastRead;
  96         }
  97     }
  98 
  99     /**
 100      * Thread used to write bytes to a socket.
 101      */
 102     private class WriterThread extends Thread {
 103 
 104         /** The number of bytes to write. */
 105         private int dataLength;
 106         /** setTcpNoDelay(noNagle) */
 107         private boolean noNagle;
 108         /** Lock needed to send sendBytes requests. */
 109         private final Object sendBytesLock = new Object();
 110         /** Indicates that a sendBytes has been requested. */
 111         private boolean sendBytesRequested;
 112         /** Indicates that no more sendBytes will be requested. Time to shutdown. */
 113         private boolean sendBytesDone;
 114         /** Lock needed to protect the connectPort variable. */
 115         private final Object connectLock = new Object();
 116         /** The port the read socket should connect to. */
 117         private int connectPort = -1;
 118 
 119         /**
 120          * Constructor.
 121          *
 122          * @param dataLength The number of bytes to write
 123          * @param noNagle    setTcpNoDelay(noNagle)
 124          */
 125         public WriterThread(int dataLength, boolean noNagle) {
 126             super("Load producer");
 127             this.dataLength = dataLength;
 128             this.noNagle = noNagle;
 129         }
 130 
 131         /** Entry point for data sending helper thread. */
 132         @Override
 133         public void run() {
 134             try {
 135                 Socket writeSocket;
 136                 ServerSocket serverSocket = new ServerSocket(0);
 137 
 138                 /* Tell the other thread that we now know the port number.
 139                  * The other thread will now start to connect until the following accept() call succeeds.
 140                  */
 141                 synchronized (connectLock) {
 142                     connectPort = serverSocket.getLocalPort();
 143                     connectLock.notify();
 144                 }
 145 
 146                 // Wait for the other thread to connect
 147                 writeSocket = serverSocket.accept();
 148                 writeSocket.setTcpNoDelay(noNagle);
 149 
 150                 // No more connects so this can be closed
 151                 serverSocket.close();
 152                 serverSocket = null;
 153 
 154                 OutputStream out = writeSocket.getOutputStream();
 155 
 156                 // Iterate as long as sendBytes are issued
 157                 while (waitForSendBytesRequest()) {
 158                     sendBytes(out);
 159                 }
 160 
 161                 // Time to shutdown
 162                 while (!writeSocket.isClosed()) {
 163                     writeSocket.close();
 164                 }
 165                 writeSocket = null;
 166             } catch (Exception e) {
 167                 System.exit(1);
 168             }
 169         }
 170 
 171         /**
 172          * Sends bytes to the output stream
 173          *
 174          * @param out The output stream
 175          * @throws IOException
 176          */
 177         private void sendBytes(OutputStream out) throws IOException {
 178             byte outBytes[] = new byte[dataLength];
 179 
 180             int bytesToSend = dataLength;
 181             int bytesSent = 0;
 182             while (bytesSent < bytesToSend) {
 183                 out.write(outBytes);
 184                 bytesSent += outBytes.length;
 185             }
 186         }
 187 
 188         /**
 189          * Waits for the readSocket and returns it when it is ready.
 190          *
 191          * @return The socket to read from
 192          * @throws InterruptedException
 193          */
 194         @SuppressWarnings("SleepWhileHoldingLock")
 195         public Socket waitForReadSocket() throws InterruptedException {
 196             int theConnectPort = waitForConnectPort();
 197 
 198             while (true) {
 199                 try {
 200                     return new Socket(InetAddress.getByName(null), theConnectPort);
 201                 } catch (IOException e) {
 202                     // Wait some more for the server thread to get going
 203                     Thread.sleep(1000);
 204                 }
 205             }
 206 
 207         }
 208 
 209         /**
 210          * Waits for next sendBytes request
 211          *
 212          * @return <code>true</code> if it is time to sendBytes, <code>false</code> if it is time to shutdown
 213          * @throws InterruptedException
 214          */
 215         public boolean waitForSendBytesRequest() throws InterruptedException {
 216             synchronized (sendBytesLock) {
 217                 while (!sendBytesRequested && !sendBytesDone) {
 218                     sendBytesLock.wait();
 219                 }
 220 
 221                 // Clear the flag
 222                 sendBytesRequested = false;
 223 
 224                 return !sendBytesDone;
 225             }
 226         }
 227 
 228         /** Requests a sendBytes. */
 229         public void requestSendBytes() {
 230             synchronized (sendBytesLock) {
 231                 sendBytesRequested = true;
 232                 sendBytesLock.notify();
 233             }
 234         }
 235 
 236         /** Tells the writerThread that it is time to shutdown. */
 237         public void finish() {
 238             synchronized (sendBytesLock) {
 239                 sendBytesDone = true;
 240                 sendBytesLock.notify();
 241             }
 242         }
 243 
 244         private int waitForConnectPort() throws InterruptedException {
 245             synchronized (connectLock) {
 246                 while (connectPort == -1) {
 247                     connectLock.wait();
 248                 }
 249                 return connectPort;
 250             }
 251         }
 252     }
 253 }