1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 8195160 26 * @summary Test RdmaSelector with RdmaServerSocketChannels 27 * @requires (os.family == "linux") 28 * @library .. /test/lib /test/jdk/java/nio/channels 29 * @build RsocketTest 30 * @run main/othervm BasicAccept 31 */ 32 33 import java.io.IOException; 34 import java.net.InetAddress; 35 import java.net.InetSocketAddress; 36 import java.net.StandardProtocolFamily; 37 import java.nio.ByteBuffer; 38 import java.nio.channels.Selector; 39 import java.nio.channels.SelectionKey; 40 import java.nio.channels.ServerSocketChannel; 41 import java.nio.channels.SocketChannel; 42 import java.nio.channels.spi.SelectorProvider; 43 import java.util.Iterator; 44 import java.util.Set; 45 import jdk.net.RdmaSockets; 46 47 import jtreg.SkippedException; 48 49 public class SelectorTest { 50 private static List clientList = new LinkedList(); 51 private static Random rnd = new Random(); 52 public static int NUM_CLIENTS = 5; 53 public static int TEST_PORT = 31452; 54 static PrintStream log = System.err; 55 private static int FINISH_TIME = 30000; 56 57 /* 58 * Usage note 59 * 60 * java SelectorTest [server] [client <host>] [<port>] 61 * 62 * No arguments runs both client and server in separate threads 63 * using the default port of 31452. 64 * 65 * client runs the client on this machine and connects to server 66 * at the given IP address. 67 * 68 * server runs the server on localhost. 69 */ 70 public static void main(String[] args) throws Exception { 71 if (!RsocketTest.isRsocketAvailable()) 72 throw new SkippedException("rsocket is not available"); 73 74 if (args.length == 0) { 75 Server server = new Server(0); 76 server.start(); 77 try { 78 Thread.sleep(1000); 79 } catch (InterruptedException e) { } 80 InetSocketAddress isa 81 = new InetSocketAddress(InetAddress.getLocalHost(), server.port()); 82 Client client = new Client(isa); 83 client.start(); 84 if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0) 85 throw new Exception("Failure"); 86 log.println(); 87 88 } else if (args[0].equals("server")) { 89 90 if (args.length > 1) 91 TEST_PORT = Integer.parseInt(args[1]); 92 Server server = new Server(TEST_PORT); 93 server.start(); 94 if (server.finish(FINISH_TIME) == 0) 95 throw new Exception("Failure"); 96 log.println(); 97 98 } else if (args[0].equals("client")) { 99 100 if (args.length < 2) { 101 log.println("No host specified: terminating."); 102 return; 103 } 104 String ip = args[1]; 105 if (args.length > 2) 106 TEST_PORT = Integer.parseInt(args[2]); 107 InetAddress ia = InetAddress.getByName(ip); 108 InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT); 109 Client client = new Client(isa); 110 client.start(); 111 if (client.finish(FINISH_TIME) == 0) 112 throw new Exception("Failure"); 113 log.println(); 114 115 } else { 116 System.out.println("Usage note:"); 117 System.out.println("java SelectorTest [server] [client <host>] [<port>]"); 118 System.out.println("No arguments runs both client and server in separate threads using the default port of 31452."); 119 System.out.println("client runs the client on this machine and connects to the server specified."); 120 System.out.println("server runs the server on localhost."); 121 } 122 } 123 124 static class Client extends TestThread { 125 InetSocketAddress isa; 126 Client(InetSocketAddress isa) { 127 super("Client", SelectorTest.log); 128 this.isa = isa; 129 } 130 131 public void go() throws Exception { 132 log.println("starting client..."); 133 for (int i=0; i<NUM_CLIENTS; i++) 134 clientList.add(new RemoteEntity(i, isa, log)); 135 136 Collections.shuffle(clientList); 137 138 log.println("created "+NUM_CLIENTS+" clients"); 139 do { 140 for (Iterator i = clientList.iterator(); i.hasNext(); ) { 141 RemoteEntity re = (RemoteEntity) i.next(); 142 if (re.cycle()) { 143 i.remove(); 144 } 145 } 146 Collections.shuffle(clientList); 147 } while (clientList.size() > 0); 148 } 149 } 150 151 static class Server extends TestThread { 152 private final ServerSocketChannel ssc; 153 private List socketList = new ArrayList(); 154 private ServerSocket ss; 155 private int connectionsAccepted = 0; 156 private Selector pollSelector; 157 private Selector acceptSelector; 158 private Set pkeys; 159 private Set pskeys; 160 161 Server(int port) throws IOException { 162 super("Server", SelectorTest.log); 163 this.ssc = RdmaSockets.openServerSocketChannel( 164 StandardProtocolFamily.INET); 165 ssc.bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); 166 } 167 168 int port() { 169 return ssc.socket().getLocalPort(); 170 } 171 172 public void go() throws Exception { 173 log.println("starting server..."); 174 acceptSelector = RdmaSockets.openSelector(); 175 pollSelector = RdmaSockets.openSelector(); 176 pkeys = pollSelector.keys(); 177 pskeys = pollSelector.selectedKeys(); 178 Set readyKeys = acceptSelector.selectedKeys(); 179 RequestHandler rh = new RequestHandler(pollSelector, log); 180 Thread requestThread = new Thread(rh); 181 182 requestThread.start(); 183 184 ssc.configureBlocking(false); 185 SelectionKey acceptKey = ssc.register(acceptSelector, 186 SelectionKey.OP_ACCEPT); 187 while(connectionsAccepted < SelectorTest.NUM_CLIENTS) { 188 int keysAdded = acceptSelector.select(100); 189 if (keysAdded > 0) { 190 Iterator i = readyKeys.iterator(); 191 while(i.hasNext()) { 192 SelectionKey sk = (SelectionKey)i.next(); 193 i.remove(); 194 ServerSocketChannel nextReady = 195 (ServerSocketChannel)sk.channel(); 196 SocketChannel sc = nextReady.accept(); 197 connectionsAccepted++; 198 if (sc != null) { 199 sc.configureBlocking(false); 200 synchronized (pkeys) { 201 sc.register(pollSelector, SelectionKey.OP_READ); 202 } 203 } else { 204 throw new RuntimeException( 205 "Socket does not support Channels"); 206 } 207 } 208 } 209 } 210 acceptKey.cancel(); 211 requestThread.join(); 212 acceptSelector.close(); 213 pollSelector.close(); 214 } 215 } 216 } 217 218 class RemoteEntity { 219 private static Random rnd = new Random(); 220 int id; 221 ByteBuffer data; 222 int dataWrittenIndex; 223 int totalDataLength; 224 boolean initiated = false; 225 boolean connected = false; 226 boolean written = false; 227 boolean acked = false; 228 boolean closed = false; 229 private SocketChannel sc; 230 ByteBuffer ackBuffer; 231 PrintStream log; 232 InetSocketAddress server; 233 234 RemoteEntity(int id, InetSocketAddress server, PrintStream log) 235 throws Exception 236 { 237 int connectFailures = 0; 238 this.id = id; 239 this.log = log; 240 this.server = server; 241 242 sc = RdmaSockets.openSocketChannel(StandardProtocolFamily.INET); 243 sc.configureBlocking(false); 244 245 // Prepare the data buffer to write out from this entity 246 // Let's use both slow and fast buffers 247 if (rnd.nextBoolean()) 248 data = ByteBuffer.allocateDirect(100); 249 else 250 data = ByteBuffer.allocate(100); 251 String number = Integer.toString(id); 252 if (number.length() == 1) 253 number = "0"+number; 254 String source = "Testing from " + number; 255 data.put(source.getBytes("8859_1")); 256 data.flip(); 257 totalDataLength = source.length(); 258 259 // Allocate an ack buffer 260 ackBuffer = ByteBuffer.allocateDirect(10); 261 } 262 263 private void reset() throws Exception { 264 sc.close(); 265 sc = RdmaSockets.openSocketChannel(StandardProtocolFamily.INET); 266 sc.configureBlocking(false); 267 } 268 269 private void connect() throws Exception { 270 try { 271 connected = sc.connect(server); 272 initiated = true; 273 } catch (ConnectException e) { 274 initiated = false; 275 reset(); 276 } 277 } 278 279 private void finishConnect() throws Exception { 280 try { 281 connected = sc.finishConnect(); 282 } catch (IOException e) { 283 initiated = false; 284 reset(); 285 } 286 } 287 288 int id() { 289 return id; 290 } 291 292 boolean cycle() throws Exception { 293 if (!initiated) 294 connect(); 295 else if (!connected) 296 finishConnect(); 297 else if (!written) 298 writeCycle(); 299 else if (!acked) 300 ackCycle(); 301 else if (!closed) 302 close(); 303 return closed; 304 } 305 306 private void ackCycle() throws Exception { 307 //log.println("acking from "+id); 308 int bytesRead = sc.read(ackBuffer); 309 if (bytesRead > 0) { 310 acked = true; 311 } 312 } 313 314 private void close() throws Exception { 315 sc.close(); 316 closed = true; 317 } 318 319 private void writeCycle() throws Exception { 320 log.println("writing from "+id); 321 int numBytesToWrite = rnd.nextInt(10)+1; 322 int newWriteTarget = dataWrittenIndex + numBytesToWrite; 323 if (newWriteTarget > totalDataLength) 324 newWriteTarget = totalDataLength; 325 data.limit(newWriteTarget); 326 int bytesWritten = sc.write(data); 327 if (bytesWritten > 0) 328 dataWrittenIndex += bytesWritten; 329 if (dataWrittenIndex == totalDataLength) { 330 written = true; 331 sc.socket().shutdownOutput(); 332 } 333 } 334 335 } 336 337 338 class RequestHandler implements Runnable { 339 private static Random rnd = new Random(); 340 private Selector selector; 341 private int connectionsHandled = 0; 342 private HashMap dataBin = new HashMap(); 343 PrintStream log; 344 345 public RequestHandler(Selector selector, PrintStream log) { 346 this.selector = selector; 347 this.log = log; 348 } 349 350 public void run() { 351 log.println("starting request handler..."); 352 int connectionsAccepted = 0; 353 354 Set nKeys = selector.keys(); 355 Set readyKeys = selector.selectedKeys(); 356 357 try { 358 while(connectionsHandled < SelectorTest.NUM_CLIENTS) { 359 int numKeys = selector.select(100); 360 361 // Process channels with data 362 synchronized (nKeys) { 363 if (readyKeys.size() > 0) { 364 Iterator i = readyKeys.iterator(); 365 while(i.hasNext()) { 366 SelectionKey sk = (SelectionKey)i.next(); 367 i.remove(); 368 SocketChannel sc = (SocketChannel)sk.channel(); 369 if (sc.isOpen()) 370 read(sk, sc); 371 } 372 } 373 } 374 375 // Give other threads a chance to run 376 if (numKeys == 0) { 377 try { 378 Thread.sleep(1); 379 } catch (Exception x) {} 380 } 381 } 382 } catch (Exception e) { 383 log.println("Unexpected error 1: "+e); 384 e.printStackTrace(); 385 } 386 } 387 388 private void read(SelectionKey sk, SocketChannel sc) throws Exception { 389 ByteBuffer bin = (ByteBuffer)dataBin.get(sc); 390 if (bin == null) { 391 if (rnd.nextBoolean()) 392 bin = ByteBuffer.allocateDirect(100); 393 else 394 bin = ByteBuffer.allocate(100); 395 dataBin.put(sc, bin); 396 } 397 398 int bytesRead = 0; 399 do { 400 bytesRead = sc.read(bin); 401 } while(bytesRead > 0); 402 403 if (bytesRead == -1) { 404 sk.interestOps(0); 405 bin.flip(); 406 int size = bin.limit(); 407 byte[] data = new byte[size]; 408 for(int j=0; j<size; j++) 409 data[j] = bin.get(); 410 String message = new String(data, "8859_1"); 411 connectionsHandled++; 412 acknowledge(sc); 413 log.println("Received >>>"+message + "<<<"); 414 log.println("Handled: "+connectionsHandled); 415 } 416 } 417 418 private void acknowledge(SocketChannel sc) throws Exception { 419 ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10); 420 String s = "ack"; 421 ackBuffer.put(s.getBytes("8859_1")); 422 ackBuffer.flip(); 423 int bytesWritten = 0; 424 while(bytesWritten == 0) { 425 bytesWritten += sc.write(ackBuffer); 426 } 427 sc.close(); 428 } 429 }