1 /* 2 * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.bench.java.net; 24 25 import org.openjdk.jmh.annotations.Benchmark; 26 import org.openjdk.jmh.annotations.BenchmarkMode; 27 import org.openjdk.jmh.annotations.Mode; 28 import org.openjdk.jmh.annotations.OutputTimeUnit; 29 import org.openjdk.jmh.annotations.Scope; 30 import org.openjdk.jmh.annotations.Setup; 31 import org.openjdk.jmh.annotations.State; 32 import org.openjdk.jmh.annotations.TearDown; 33 34 import java.io.IOException; 35 import java.io.InputStream; 36 import java.io.OutputStream; 37 import java.net.InetAddress; 38 import java.net.ServerSocket; 39 import java.net.Socket; 40 import java.util.concurrent.TimeUnit; 41 42 /** 43 * Micro benchmark for streaming data over a Socket. 44 */ 45 @BenchmarkMode(Mode.AverageTime) 46 @OutputTimeUnit(TimeUnit.MILLISECONDS) 47 @State(Scope.Thread) 48 public class SocketStreaming { 49 50 /** The bytes to write/read. */ 51 public static final int dataLength = 16383; 52 /** setTcpNoDelay(noNagle) */ 53 public static final boolean noNagle = false; 54 55 private WriterThread writerThread; 56 private Socket readSocket; 57 private byte[] bytes; 58 59 @Setup 60 public void prepare() throws Exception { 61 bytes = new byte[dataLength]; 62 63 // Setup the writer thread 64 writerThread = new WriterThread(dataLength, noNagle); 65 writerThread.start(); 66 67 // Wait for a read socket 68 readSocket = writerThread.waitForReadSocket(); 69 } 70 71 @TearDown 72 public void cleanup() throws IOException { 73 // Take down the writer thread and the reader socket 74 writerThread.finish(); 75 while (!readSocket.isClosed()) { 76 readSocket.close(); 77 } 78 readSocket = null; 79 } 80 81 @Benchmark 82 public void testSocketInputStreamRead() throws InterruptedException, IOException { 83 InputStream in = readSocket.getInputStream(); 84 85 // Notify the writer thread to add elements to stream 86 writerThread.requestSendBytes(); 87 88 // Read these from the stream 89 int bytesRead = 0; 90 while (bytesRead < dataLength) { 91 int lastRead = in.read(bytes); 92 if (lastRead < 0) { 93 throw new InternalError("Unexpectedly got " + lastRead + " bytes from the socket"); 94 } 95 bytesRead += lastRead; 96 } 97 } 98 99 /** 100 * Thread used to write bytes to a socket. 101 */ 102 private class WriterThread extends Thread { 103 104 /** The number of bytes to write. */ 105 private int dataLength; 106 /** setTcpNoDelay(noNagle) */ 107 private boolean noNagle; 108 /** Lock needed to send sendBytes requests. */ 109 private final Object sendBytesLock = new Object(); 110 /** Indicates that a sendBytes has been requested. */ 111 private boolean sendBytesRequested; 112 /** Indicates that no more sendBytes will be requested. Time to shutdown. */ 113 private boolean sendBytesDone; 114 /** Lock needed to protect the connectPort variable. */ 115 private final Object connectLock = new Object(); 116 /** The port the read socket should connect to. */ 117 private int connectPort = -1; 118 119 /** 120 * Constructor. 121 * 122 * @param dataLength The number of bytes to write 123 * @param noNagle setTcpNoDelay(noNagle) 124 */ 125 public WriterThread(int dataLength, boolean noNagle) { 126 super("Load producer"); 127 this.dataLength = dataLength; 128 this.noNagle = noNagle; 129 } 130 131 /** Entry point for data sending helper thread. */ 132 @Override 133 public void run() { 134 try { 135 Socket writeSocket; 136 ServerSocket serverSocket = new ServerSocket(0); 137 138 /* Tell the other thread that we now know the port number. 139 * The other thread will now start to connect until the following accept() call succeeds. 140 */ 141 synchronized (connectLock) { 142 connectPort = serverSocket.getLocalPort(); 143 connectLock.notify(); 144 } 145 146 // Wait for the other thread to connect 147 writeSocket = serverSocket.accept(); 148 writeSocket.setTcpNoDelay(noNagle); 149 150 // No more connects so this can be closed 151 serverSocket.close(); 152 serverSocket = null; 153 154 OutputStream out = writeSocket.getOutputStream(); 155 156 // Iterate as long as sendBytes are issued 157 while (waitForSendBytesRequest()) { 158 sendBytes(out); 159 } 160 161 // Time to shutdown 162 while (!writeSocket.isClosed()) { 163 writeSocket.close(); 164 } 165 writeSocket = null; 166 } catch (Exception e) { 167 System.exit(1); 168 } 169 } 170 171 /** 172 * Sends bytes to the output stream 173 * 174 * @param out The output stream 175 * @throws IOException 176 */ 177 private void sendBytes(OutputStream out) throws IOException { 178 byte outBytes[] = new byte[dataLength]; 179 180 int bytesToSend = dataLength; 181 int bytesSent = 0; 182 while (bytesSent < bytesToSend) { 183 out.write(outBytes); 184 bytesSent += outBytes.length; 185 } 186 } 187 188 /** 189 * Waits for the readSocket and returns it when it is ready. 190 * 191 * @return The socket to read from 192 * @throws InterruptedException 193 */ 194 @SuppressWarnings("SleepWhileHoldingLock") 195 public Socket waitForReadSocket() throws InterruptedException { 196 int theConnectPort = waitForConnectPort(); 197 198 while (true) { 199 try { 200 return new Socket(InetAddress.getByName(null), theConnectPort); 201 } catch (IOException e) { 202 // Wait some more for the server thread to get going 203 Thread.sleep(1000); 204 } 205 } 206 207 } 208 209 /** 210 * Waits for next sendBytes request 211 * 212 * @return <code>true</code> if it is time to sendBytes, <code>false</code> if it is time to shutdown 213 * @throws InterruptedException 214 */ 215 public boolean waitForSendBytesRequest() throws InterruptedException { 216 synchronized (sendBytesLock) { 217 while (!sendBytesRequested && !sendBytesDone) { 218 sendBytesLock.wait(); 219 } 220 221 // Clear the flag 222 sendBytesRequested = false; 223 224 return !sendBytesDone; 225 } 226 } 227 228 /** Requests a sendBytes. */ 229 public void requestSendBytes() { 230 synchronized (sendBytesLock) { 231 sendBytesRequested = true; 232 sendBytesLock.notify(); 233 } 234 } 235 236 /** Tells the writerThread that it is time to shutdown. */ 237 public void finish() { 238 synchronized (sendBytesLock) { 239 sendBytesDone = true; 240 sendBytesLock.notify(); 241 } 242 } 243 244 private int waitForConnectPort() throws InterruptedException { 245 synchronized (connectLock) { 246 while (connectPort == -1) { 247 connectLock.wait(); 248 } 249 return connectPort; 250 } 251 } 252 } 253 }