1 /* 2 * Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @summary Test selectors and socketchannels 26 * @library .. 27 * @key randomness 28 */ 29 30 import java.io.*; 31 import java.net.*; 32 import java.nio.*; 33 import java.nio.channels.*; 34 import java.nio.channels.spi.SelectorProvider; 35 import java.util.*; 36 37 38 public class SelectorTest { 39 private static List clientList = new LinkedList(); 40 private static Random rnd = new Random(); 41 public static int NUM_CLIENTS = 30; 42 public static int TEST_PORT = 31452; 43 static PrintStream log = System.err; 44 private static int FINISH_TIME = 30000; 45 46 /* 47 * Usage note 48 * 49 * java SelectorTest [server] [client <host>] [<port>] 50 * 51 * No arguments runs both client and server in separate threads 52 * using the default port of 31452. 53 * 54 * client runs the client on this machine and connects to server 55 * at the given IP address. 56 * 57 * server runs the server on localhost. 58 */ 59 public static void main(String[] args) throws Exception { 60 if (args.length == 0) { 61 Server server = new Server(0); 62 server.start(); 63 try { 64 Thread.sleep(1000); 65 } catch (InterruptedException e) { } 66 InetSocketAddress isa 67 = new InetSocketAddress(InetAddress.getLocalHost(), server.port()); 68 Client client = new Client(isa); 69 client.start(); 70 if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0) 71 throw new Exception("Failure"); 72 log.println(); 73 74 } else if (args[0].equals("server")) { 75 76 if (args.length > 1) 77 TEST_PORT = Integer.parseInt(args[1]); 78 Server server = new Server(TEST_PORT); 79 server.start(); 80 if (server.finish(FINISH_TIME) == 0) 81 throw new Exception("Failure"); 82 log.println(); 83 84 } else if (args[0].equals("client")) { 85 86 if (args.length < 2) { 87 log.println("No host specified: terminating."); 88 return; 89 } 90 String ip = args[1]; 91 if (args.length > 2) 92 TEST_PORT = Integer.parseInt(args[2]); 93 InetAddress ia = InetAddress.getByName(ip); 94 InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT); 95 Client client = new Client(isa); 96 client.start(); 97 if (client.finish(FINISH_TIME) == 0) 98 throw new Exception("Failure"); 99 log.println(); 100 101 } else { 102 System.out.println("Usage note:"); 103 System.out.println("java SelectorTest [server] [client <host>] [<port>]"); 104 System.out.println("No arguments runs both client and server in separate threads using the default port of 31452."); 105 System.out.println("client runs the client on this machine and connects to the server specified."); 106 System.out.println("server runs the server on localhost."); 107 } 108 } 109 110 static class Client extends TestThread { 111 InetSocketAddress isa; 112 Client(InetSocketAddress isa) { 113 super("Client", SelectorTest.log); 114 this.isa = isa; 115 } 116 117 public void go() throws Exception { 118 log.println("starting client..."); 119 for (int i=0; i<NUM_CLIENTS; i++) 120 clientList.add(new RemoteEntity(i, isa, log)); 121 122 Collections.shuffle(clientList); 123 124 log.println("created "+NUM_CLIENTS+" clients"); 125 do { 126 for (Iterator i = clientList.iterator(); i.hasNext(); ) { 127 RemoteEntity re = (RemoteEntity) i.next(); 128 if (re.cycle()) { 129 i.remove(); 130 } 131 } 132 Collections.shuffle(clientList); 133 } while (clientList.size() > 0); 134 } 135 } 136 137 static class Server extends TestThread { 138 private final ServerSocketChannel ssc; 139 private List socketList = new ArrayList(); 140 private ServerSocket ss; 141 private int connectionsAccepted = 0; 142 private Selector pollSelector; 143 private Selector acceptSelector; 144 private Set pkeys; 145 private Set pskeys; 146 147 Server(int port) throws IOException { 148 super("Server", SelectorTest.log); 149 this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port)); 150 } 151 152 int port() { 153 return ssc.socket().getLocalPort(); 154 } 155 156 public void go() throws Exception { 157 log.println("starting server..."); 158 acceptSelector = SelectorProvider.provider().openSelector(); 159 pollSelector = SelectorProvider.provider().openSelector(); 160 pkeys = pollSelector.keys(); 161 pskeys = pollSelector.selectedKeys(); 162 Set readyKeys = acceptSelector.selectedKeys(); 163 RequestHandler rh = new RequestHandler(pollSelector, log); 164 Thread requestThread = new Thread(rh); 165 166 requestThread.start(); 167 168 ssc.configureBlocking(false); 169 SelectionKey acceptKey = ssc.register(acceptSelector, 170 SelectionKey.OP_ACCEPT); 171 while(connectionsAccepted < SelectorTest.NUM_CLIENTS) { 172 int keysAdded = acceptSelector.select(100); 173 if (keysAdded > 0) { 174 Iterator i = readyKeys.iterator(); 175 while(i.hasNext()) { 176 SelectionKey sk = (SelectionKey)i.next(); 177 i.remove(); 178 ServerSocketChannel nextReady = 179 (ServerSocketChannel)sk.channel(); 180 SocketChannel sc = nextReady.accept(); 181 connectionsAccepted++; 182 if (sc != null) { 183 sc.configureBlocking(false); 184 synchronized (pkeys) { 185 sc.register(pollSelector, SelectionKey.OP_READ); 186 } 187 } else { 188 throw new RuntimeException( 189 "Socket does not support Channels"); 190 } 191 } 192 } 193 } 194 acceptKey.cancel(); 195 requestThread.join(); 196 acceptSelector.close(); 197 pollSelector.close(); 198 } 199 } 200 } 201 202 class RemoteEntity { 203 private static Random rnd = new Random(); 204 int id; 205 ByteBuffer data; 206 int dataWrittenIndex; 207 int totalDataLength; 208 boolean initiated = false; 209 boolean connected = false; 210 boolean written = false; 211 boolean acked = false; 212 boolean closed = false; 213 private SocketChannel sc; 214 ByteBuffer ackBuffer; 215 PrintStream log; 216 InetSocketAddress server; 217 218 RemoteEntity(int id, InetSocketAddress server, PrintStream log) 219 throws Exception 220 { 221 int connectFailures = 0; 222 this.id = id; 223 this.log = log; 224 this.server = server; 225 226 sc = SocketChannel.open(); 227 sc.configureBlocking(false); 228 229 // Prepare the data buffer to write out from this entity 230 // Let's use both slow and fast buffers 231 if (rnd.nextBoolean()) 232 data = ByteBuffer.allocateDirect(100); 233 else 234 data = ByteBuffer.allocate(100); 235 String number = Integer.toString(id); 236 if (number.length() == 1) 237 number = "0"+number; 238 String source = "Testing from " + number; 239 data.put(source.getBytes("8859_1")); 240 data.flip(); 241 totalDataLength = source.length(); 242 243 // Allocate an ack buffer 244 ackBuffer = ByteBuffer.allocateDirect(10); 245 } 246 247 private void reset() throws Exception { 248 sc.close(); 249 sc = SocketChannel.open(); 250 sc.configureBlocking(false); 251 } 252 253 private void connect() throws Exception { 254 try { 255 connected = sc.connect(server); 256 initiated = true; 257 } catch (ConnectException e) { 258 initiated = false; 259 reset(); 260 } 261 } 262 263 private void finishConnect() throws Exception { 264 try { 265 connected = sc.finishConnect(); 266 } catch (IOException e) { 267 initiated = false; 268 reset(); 269 } 270 } 271 272 int id() { 273 return id; 274 } 275 276 boolean cycle() throws Exception { 277 if (!initiated) 278 connect(); 279 else if (!connected) 280 finishConnect(); 281 else if (!written) 282 writeCycle(); 283 else if (!acked) 284 ackCycle(); 285 else if (!closed) 286 close(); 287 return closed; 288 } 289 290 private void ackCycle() throws Exception { 291 //log.println("acking from "+id); 292 int bytesRead = sc.read(ackBuffer); 293 if (bytesRead > 0) { 294 acked = true; 295 } 296 } 297 298 private void close() throws Exception { 299 sc.close(); 300 closed = true; 301 } 302 303 private void writeCycle() throws Exception { 304 log.println("writing from "+id); 305 int numBytesToWrite = rnd.nextInt(10)+1; 306 int newWriteTarget = dataWrittenIndex + numBytesToWrite; 307 if (newWriteTarget > totalDataLength) 308 newWriteTarget = totalDataLength; 309 data.limit(newWriteTarget); 310 int bytesWritten = sc.write(data); 311 if (bytesWritten > 0) 312 dataWrittenIndex += bytesWritten; 313 if (dataWrittenIndex == totalDataLength) { 314 written = true; 315 sc.socket().shutdownOutput(); 316 } 317 } 318 319 } 320 321 322 class RequestHandler implements Runnable { 323 private static Random rnd = new Random(); 324 private Selector selector; 325 private int connectionsHandled = 0; 326 private HashMap dataBin = new HashMap(); 327 PrintStream log; 328 329 public RequestHandler(Selector selector, PrintStream log) { 330 this.selector = selector; 331 this.log = log; 332 } 333 334 public void run() { 335 log.println("starting request handler..."); 336 int connectionsAccepted = 0; 337 338 Set nKeys = selector.keys(); 339 Set readyKeys = selector.selectedKeys(); 340 341 try { 342 while(connectionsHandled < SelectorTest.NUM_CLIENTS) { 343 int numKeys = selector.select(100); 344 345 // Process channels with data 346 synchronized (nKeys) { 347 if (readyKeys.size() > 0) { 348 Iterator i = readyKeys.iterator(); 349 while(i.hasNext()) { 350 SelectionKey sk = (SelectionKey)i.next(); 351 i.remove(); 352 SocketChannel sc = (SocketChannel)sk.channel(); 353 if (sc.isOpen()) 354 read(sk, sc); 355 } 356 } 357 } 358 359 // Give other threads a chance to run 360 if (numKeys == 0) { 361 try { 362 Thread.sleep(1); 363 } catch (Exception x) {} 364 } 365 } 366 } catch (Exception e) { 367 log.println("Unexpected error 1: "+e); 368 e.printStackTrace(); 369 } 370 } 371 372 private void read(SelectionKey sk, SocketChannel sc) throws Exception { 373 ByteBuffer bin = (ByteBuffer)dataBin.get(sc); 374 if (bin == null) { 375 if (rnd.nextBoolean()) 376 bin = ByteBuffer.allocateDirect(100); 377 else 378 bin = ByteBuffer.allocate(100); 379 dataBin.put(sc, bin); 380 } 381 382 int bytesRead = 0; 383 do { 384 bytesRead = sc.read(bin); 385 } while(bytesRead > 0); 386 387 if (bytesRead == -1) { 388 sk.interestOps(0); 389 bin.flip(); 390 int size = bin.limit(); 391 byte[] data = new byte[size]; 392 for(int j=0; j<size; j++) 393 data[j] = bin.get(); 394 String message = new String(data, "8859_1"); 395 connectionsHandled++; 396 acknowledge(sc); 397 log.println("Received >>>"+message + "<<<"); 398 log.println("Handled: "+connectionsHandled); 399 } 400 } 401 402 private void acknowledge(SocketChannel sc) throws Exception { 403 ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10); 404 String s = "ack"; 405 ackBuffer.put(s.getBytes("8859_1")); 406 ackBuffer.flip(); 407 int bytesWritten = 0; 408 while(bytesWritten == 0) { 409 bytesWritten += sc.write(ackBuffer); 410 } 411 sc.close(); 412 } 413 }