--- old/test/java/net/httpclient/whitebox/java/net/http/SelectorTest.java 2016-03-09 10:17:30.000000000 +0000 +++ new/test/java/net/httpclient/whitebox/java/net/http/SelectorTest.java 2016-03-09 10:17:29.000000000 +0000 @@ -24,15 +24,21 @@ /** * @test * @bug 8151299 + * @summary Http client SelectorManager overwriting read and write events */ package java.net.http; -import org.testng.annotations.Test; import java.net.*; import java.io.*; import java.nio.channels.*; import java.nio.ByteBuffer; -import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import static java.lang.System.out; +import static java.nio.charset.StandardCharsets.US_ASCII; +import static java.util.concurrent.TimeUnit.SECONDS; + +import org.testng.annotations.Test; /** * Whitebox test of selector mechanics. Currently only a simple test @@ -42,183 +48,161 @@ */ @Test public class SelectorTest { - int counter = 0; - volatile boolean error = false; - - static Object lock = new Object(); - synchronized int getCounter() { - return counter; - } - - synchronized void incrementCounter() { - counter++; - } + AtomicInteger counter = new AtomicInteger(); + volatile boolean error; + static final CountDownLatch finishingGate = new CountDownLatch(1); - String readAll(RawChannel chan) { + String readSomeBytes(RawChannel chan) { try { ByteBuffer buf = ByteBuffer.allocate(1024); int t = chan.read(buf); - if (t <=0) { - System.err.printf("chan read returned %d\n", t); + if (t <= 0) { + out.printf("chan read returned %d\n", t); return null; } byte[] bb = new byte[t]; buf.get(bb); - return new String(bb); - } catch (IOException e) { - throw new RuntimeException(e); + return new String(bb, US_ASCII); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); } } @Test(timeOut = 10000) public void test() throws Exception { - ServerSocket server = new ServerSocket(0); - int port = server.getLocalPort(); + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); - System.err.println("Listening on port " + server.getLocalPort()); + out.println("Listening on port " + server.getLocalPort()); - TestServer t = new TestServer(server); - t.start(); - System.err.println("Started server thread"); - - final RawChannel chan = getARawChannel(port); - - chan.registerEvent(new RawChannel.NonBlockingEvent() { - @Override - public int interestOps() { - return SelectionKey.OP_READ; - } + TestServer t = new TestServer(server); + t.start(); + out.println("Started server thread"); - @Override - public void handle() { - readAll(chan); - System.err.printf("OP_READ\n"); - if (getCounter() != 1) { - System.err.printf("OP_READ error counter = %d\n", counter); - error = true; + final RawChannel chan = getARawChannel(port); + + chan.registerEvent(new RawChannel.NonBlockingEvent() { + @Override + public int interestOps() { + return SelectionKey.OP_READ; } -/* - synchronized (SelectorTest.lock) { - SelectorTest.lock.notifyAll(); + + @Override + public void handle() { + readSomeBytes(chan); + out.printf("OP_READ\n"); + if (counter.get() != 1) { + out.printf("OP_READ error counter = %d\n", counter); + error = true; + } } -*/ - } - }); + }); - chan.registerEvent(new RawChannel.NonBlockingEvent() { - @Override - public int interestOps() { - return SelectionKey.OP_WRITE; - } + chan.registerEvent(new RawChannel.NonBlockingEvent() { + @Override + public int interestOps() { + return SelectionKey.OP_WRITE; + } - @Override - public void handle() { - System.err.printf("OP_WRITE\n"); - if (getCounter() != 0) { - System.err.printf("OP_WRITE error counter = %d\n", counter); - error = true; - } else { - ByteBuffer bb = ByteBuffer.wrap(TestServer.input); - incrementCounter(); - try { - chan.write(bb); - } catch (IOException e) {throw new RuntimeException(e);} + @Override + public void handle() { + out.printf("OP_WRITE\n"); + if (counter.get() != 0) { + out.printf("OP_WRITE error counter = %d\n", counter); + error = true; + } else { + ByteBuffer bb = ByteBuffer.wrap(TestServer.INPUT); + counter.incrementAndGet(); + try { + chan.write(bb); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } - } - - }); - System.err.println("Events registered. Waiting"); - synchronized (lock) { - lock. wait(); - } - if (error) - throw new RuntimeException("Error"); - else - System.err.println("No error"); + + }); + out.println("Events registered. Waiting"); + finishingGate.await(30, SECONDS); + if (error) + throw new RuntimeException("Error"); + else + out.println("No error"); + } } - - private static RawChannel getARawChannel(int port) throws Exception { - URI uri = URI.create("http://127.0.0.1:"+port+"/"); - System.err.println("client connecting to " + uri.toString()); - HttpRequest req = HttpRequest.create(uri) - .GET(); - HttpResponseImpl r = (HttpResponseImpl) req.response(); + + static RawChannel getARawChannel(int port) throws Exception { + URI uri = URI.create("http://127.0.0.1:" + port + "/"); + out.println("client connecting to " + uri.toString()); + HttpRequest req = HttpRequest.create(uri).GET(); + HttpResponse r = req.response(); r.body(HttpResponse.ignoreBody()); - return r.rawChannel(); + return ((HttpResponseImpl) r).rawChannel(); } -} - -class TestServer extends Thread { - final static byte[] input = "Hello world".getBytes(); - final static byte[] output = "Goodbye world".getBytes(); - final ServerSocket server; - volatile Socket s; - volatile InputStream is; - volatile OutputStream os; - final String firstResponse = - "HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n"; + static class TestServer extends Thread { + static final byte[] INPUT = "Hello world".getBytes(US_ASCII); + static final byte[] OUTPUT = "Goodbye world".getBytes(US_ASCII); + static final String FIRST_RESPONSE = "HTTP/1.1 200 OK\r\nContent-length: 0\r\n\r\n"; + final ServerSocket server; + TestServer(ServerSocket server) throws IOException { + this.server = server; + } - TestServer(ServerSocket server) throws IOException { - this.server = server; - } - - public void run() { - try { - this.s = server.accept(); - this.is = s.getInputStream(); - this.os = s.getOutputStream(); - System.err.println("Got connection"); - readRequest(); - this.s.getOutputStream().write(firstResponse.getBytes()); - read(); - write(); - Thread.sleep(1000); - // send some more data, and make sure WRITE op does not get called - write(); - System.err.println("TestServer exiting"); - synchronized (SelectorTest.lock) { - SelectorTest.lock.notifyAll(); + public void run() { + try (Socket s = server.accept(); + InputStream is = s.getInputStream(); + OutputStream os = s.getOutputStream()) { + + out.println("Got connection"); + readRequest(is); + os.write(FIRST_RESPONSE.getBytes()); + read(is); + write(os); + Thread.sleep(1000); + // send some more data, and make sure WRITE op does not get called + write(os); + out.println("TestServer exiting"); + SelectorTest.finishingGate.countDown(); + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); } - } - // consume the Http request - void readRequest() throws Exception { - System.err.println("starting readRequest"); - byte[] buf = new byte[1024]; - int pos = 0; - String s = ""; - while (true) { - int n = is.read(buf); - if (n <= 0) - throw new IOException("Error"); - s = s + new String (buf, 0, n); - if (s.indexOf("\r\n\r\n") != -1) - break; + // consumes the HTTP request + static void readRequest(InputStream is) throws IOException { + out.println("starting readRequest"); + byte[] buf = new byte[1024]; + String s = ""; + while (true) { + int n = is.read(buf); + if (n <= 0) + throw new IOException("Error"); + s = s + new String(buf, 0, n); + if (s.indexOf("\r\n\r\n") != -1) + break; + } + out.println("returning from readRequest"); } - System.err.println("returning from readRequest"); - } - void read() throws Exception { - System.err.println("starting read"); - for (int i=0; i