--- old/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java 2017-11-30 04:05:22.749374789 -0800 +++ new/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java 2017-11-30 04:05:22.569359056 -0800 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -36,19 +36,17 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; - -import jdk.incubator.http.internal.common.ByteBufferReference; -import jdk.incubator.http.internal.frame.FramesDecoder; - -import jdk.incubator.http.internal.common.BufferHandler; import jdk.incubator.http.internal.common.HttpHeadersImpl; -import jdk.incubator.http.internal.common.Queue; import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; import jdk.incubator.http.internal.hpack.Encoder; +import sun.net.www.http.ChunkedInputStream; +import sun.net.www.http.HttpClient; import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; /** @@ -59,10 +57,12 @@ final Http2TestServer server; @SuppressWarnings({"rawtypes","unchecked"}) final Map streams; // input q per stream + final Map outStreams; // output q per stream final HashSet pushStreams; final Queue outputQ; volatile int nextstream; final Socket socket; + final Http2TestExchangeSupplier exchangeSupplier; final InputStream is; final OutputStream os; volatile Encoder hpackOut; @@ -73,21 +73,66 @@ final boolean secure; volatile boolean stopping; volatile int nextPushStreamId = 2; + ConcurrentLinkedQueue pings = new ConcurrentLinkedQueue<>(); final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); final static byte[] EMPTY_BARRAY = new byte[0]; + final Random random; final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); - Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException { + static class Sentinel extends Http2Frame { + Sentinel() { super(-1,-1);} + } + + class PingRequest { + final byte[] pingData; + final long pingStamp; + final CompletableFuture response; + + PingRequest() { + pingData = new byte[8]; + random.nextBytes(pingData); + pingStamp = System.currentTimeMillis(); + response = new CompletableFuture<>(); + } + + PingFrame frame() { + return new PingFrame(0, pingData); + } + + CompletableFuture response() { + return response; + } + + void success() { + response.complete(System.currentTimeMillis() - pingStamp); + } + + void fail(Throwable t) { + response.completeExceptionally(t); + } + } + + static Sentinel sentinel; + + Http2TestServerConnection(Http2TestServer server, + Socket socket, + Http2TestExchangeSupplier exchangeSupplier) + throws IOException + { if (socket instanceof SSLSocket) { handshake(server.serverName(), (SSLSocket)socket); } System.err.println("TestServer: New connection from " + socket); this.server = server; + this.exchangeSupplier = exchangeSupplier; this.streams = Collections.synchronizedMap(new HashMap<>()); - this.outputQ = new Queue<>(); + this.outStreams = Collections.synchronizedMap(new HashMap<>()); + this.outputQ = new Queue<>(sentinel); + this.random = new Random(); this.socket = socket; + this.socket.setTcpNoDelay(true); this.serverSettings = SettingsFrame.getDefaultSettings(); this.exec = server.exec; this.secure = server.secure; @@ -96,6 +141,60 @@ os = new BufferedOutputStream(socket.getOutputStream()); } + /** + * Sends a PING frame on this connection, and completes the returned + * CF when the PING ack is received. The CF is given + * an integer, whose value is the number of milliseconds + * between PING and ACK. + */ + CompletableFuture sendPing() { + PingRequest ping = null; + try { + ping = new PingRequest(); + pings.add(ping); + outputQ.put(ping.frame()); + } catch (Throwable t) { + ping.fail(t); + } + return ping.response(); + } + + /** + * Returns the first PingRequest from Queue + */ + private PingRequest getNextRequest() { + return pings.poll(); + } + + /** + * Handles incoming Ping, which could be an ack + * or a client originated Ping + */ + void handlePing(PingFrame ping) throws IOException { + if (ping.streamid() != 0) { + System.err.println("Invalid ping received"); + close(); + return; + } + if (ping.getFlag(PingFrame.ACK)) { + // did we send a Ping? + PingRequest request = getNextRequest(); + if (request == null) { + System.err.println("Invalid ping ACK received"); + close(); + return; + } else if (!Arrays.equals(request.pingData, ping.getData())) { + request.fail(new RuntimeException("Wrong ping data in ACK")); + } else { + request.success(); + } + } else { + // client originated PING. Just send it back with ACK set + ping.setFlag(PingFrame.ACK); + outputQ.put(ping); + } + } + private static boolean compareIPAddrs(InetAddress addr1, String host) { try { InetAddress addr2 = InetAddress.getByName(host); @@ -186,8 +285,8 @@ new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); List frames = new ArrayList<>(); FramesDecoder reader = new FramesDecoder(frames::add); - reader.decode(ByteBufferReference.of(bb0)); - reader.decode(ByteBufferReference.of(bb1)); + reader.decode(bb0); + reader.decode(bb1); if (frames.size()!=1) throw new IOException("Expected 1 frame got "+frames.size()) ; Http2Frame frame = frames.get(0); @@ -226,42 +325,34 @@ exec.submit(this::writeLoop); } - static class BufferPool implements BufferHandler { - - public void setMinBufferSize(int size) { - } - - @Override - public ByteBuffer getBuffer() { - int size = 32 * 1024; - return ByteBuffer.allocate(size); - } - - @Override - public void returnBuffer(ByteBuffer buffer) { - } - } - private void writeFrame(Http2Frame frame) throws IOException { - ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame); + List bufs = new FramesEncoder().encodeFrame(frame); //System.err.println("TestServer: Writing frame " + frame.toString()); int c = 0; - for (ByteBufferReference ref : refs) { - ByteBuffer buf = ref.get(); + for (ByteBuffer buf : bufs) { byte[] ba = buf.array(); int start = buf.arrayOffset() + buf.position(); c += buf.remaining(); os.write(ba, start, buf.remaining()); + +// System.out.println("writing byte at a time"); +// while (buf.hasRemaining()) { +// byte b = buf.get(); +// os.write(b); +// os.flush(); +// try { +// Thread.sleep(1); +// } catch(InterruptedException e) { +// UncheckedIOException uie = new UncheckedIOException(new IOException("")); +// uie.addSuppressed(e); +// throw uie; +// } +// } } os.flush(); //System.err.printf("TestServer: wrote %d bytes\n", c); } - void handleStreamReset(ResetFrame resetFrame) throws IOException { - // TODO: cleanup - throw new IOException("Stream reset"); - } - private void handleCommonFrame(Http2Frame f) throws IOException { if (f instanceof SettingsFrame) { SettingsFrame sf = (SettingsFrame) f; @@ -276,9 +367,13 @@ frame.streamid(0); outputQ.put(frame); return; - } - //System.err.println("TestServer: Received ---> " + f.toString()); - throw new UnsupportedOperationException("Not supported yet."); + } else if (f instanceof GoAwayFrame) { + System.err.println("Closing: "+ f.toString()); + close(); + } else if (f instanceof PingFrame) { + handlePing((PingFrame)f); + } else + throw new UnsupportedOperationException("Not supported yet: " + f.toString()); } void sendWindowUpdates(int len, int streamid) throws IOException { @@ -290,7 +385,7 @@ outputQ.put(wup); } - HttpHeadersImpl decodeHeaders(List frames) { + HttpHeadersImpl decodeHeaders(List frames) throws IOException { HttpHeadersImpl headers = new HttpHeadersImpl(); DecodingCallback cb = (name, value) -> { @@ -298,9 +393,9 @@ }; for (HeaderFrame frame : frames) { - ByteBufferReference[] buffers = frame.getHeaderBlock(); - for (ByteBufferReference buffer : buffers) { - hpackIn.decode(buffer.get(), false, cb); + List buffers = frame.getHeaderBlock(); + for (ByteBuffer buffer : buffers) { + hpackIn.decode(buffer, false, cb); } } hpackIn.decode(EMPTY_BUFFER, true, cb); @@ -359,7 +454,7 @@ headers.setHeader(":scheme", "http"); // always in this case headers.setHeader(":authority", host); headers.setHeader(":path", uri.getPath()); - Queue q = new Queue(); + Queue q = new Queue(sentinel); String body = getRequestBody(request); addHeaders(getHeaders(request), headers); headers.setHeader("Content-length", Integer.toString(body.length())); @@ -401,7 +496,7 @@ } boolean endStreamReceived = endStream; HttpHeadersImpl headers = decodeHeaders(frames); - Queue q = new Queue(); + Queue q = new Queue(sentinel); streams.put(streamid, q); exec.submit(() -> { handleRequest(headers, q, streamid, endStreamReceived); @@ -428,7 +523,7 @@ System.err.printf("TestServer: %s %s\n", method, path); HttpHeadersImpl rspheaders = new HttpHeadersImpl(); int winsize = clientSettings.getParameter( - SettingsFrame.INITIAL_WINDOW_SIZE); + SettingsFrame.INITIAL_WINDOW_SIZE); //System.err.println ("Stream window size = " + winsize); final InputStream bis; @@ -442,10 +537,11 @@ try (bis; BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) { + outStreams.put(streamid, bos); String us = scheme + "://" + authority + path; URI uri = new URI(us); boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; - Http2TestExchange exchange = new Http2TestExchange(streamid, method, + Http2TestExchange exchange = exchangeSupplier.get(streamid, method, headers, rspheaders, uri, bis, getSSLSession(), bos, this, pushAllowed); @@ -497,7 +593,7 @@ } else { if (q == null && !pushStreams.contains(stream)) { System.err.printf("Non Headers frame received with"+ - " non existing stream (%d) ", frame.streamid()); + " non existing stream (%d) ", frame.streamid()); System.err.println(frame); continue; } @@ -507,6 +603,17 @@ Consumer r = updaters.get(stream); r.accept(wup.getUpdate()); } + } else if (frame.type() == ResetFrame.TYPE) { + // do orderly close on input q + // and close the output q immediately + // This should mean depending on what the + // handler is doing: either an EOF on read + // or an IOException if writing the response. + q.orderlyClose(); + BodyOutputStream oq = outStreams.get(stream); + if (oq != null) + oq.closeInternal(); + } else { q.put(frame); } @@ -522,7 +629,7 @@ } } - ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) { + List encodeHeaders(HttpHeadersImpl headers) { List buffers = new LinkedList<>(); ByteBuffer buf = getBuffer(); @@ -544,7 +651,7 @@ } buf.flip(); buffers.add(buf); - return ByteBufferReference.toReferences(buffers.toArray(bbarray)); + return buffers; } static void closeIgnore(Closeable c) { @@ -587,7 +694,11 @@ private void handlePush(OutgoingPushPromise op) throws IOException { int promisedStreamid = nextPushStreamId; - PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0); + PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, + HeaderFrame.END_HEADERS, + promisedStreamid, + encodeHeaders(op.headers), + 0); pushStreams.add(promisedStreamid); nextPushStreamId += 2; pp.streamid(op.parentStream); @@ -597,6 +708,7 @@ promisedStreamid, clientSettings.getParameter( SettingsFrame.INITIAL_WINDOW_SIZE), this); + outStreams.put(promisedStreamid, oo); oo.goodToGo(); exec.submit(() -> { try { @@ -645,8 +757,8 @@ throw new IOException("Error reading frame"); List frames = new ArrayList<>(); FramesDecoder reader = new FramesDecoder(frames::add); - reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf))); - reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest))); + reader.decode(ByteBuffer.wrap(buf)); + reader.decode(ByteBuffer.wrap(rest)); if (frames.size()!=1) throw new IOException("Expected 1 frame got "+frames.size()) ; @@ -721,13 +833,25 @@ String readHttp1Request() throws IOException { String headers = readUntil(CRLF + CRLF); int clen = getContentLength(headers); - // read the content. - byte[] buf = new byte[clen]; - is.readNBytes(buf, 0, clen); + byte[] buf; + if (clen >= 0) { + // HTTP/1.1 fixed length content ( may be 0 ), read it + buf = new byte[clen]; + is.readNBytes(buf, 0, clen); + } else { + // HTTP/1.1 chunked data, read it + buf = readChunkedInputStream(is); + } String body = new String(buf, StandardCharsets.US_ASCII); return headers + body; } + // This is a quick hack to get a chunked input stream reader. + private static byte[] readChunkedInputStream(InputStream is) throws IOException { + ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null); + return cis.readAllBytes(); + } + void sendHttp1Response(int code, String msg, String... headers) throws IOException { StringBuilder sb = new StringBuilder(); sb.append("HTTP/1.1 ") @@ -771,7 +895,7 @@ @SuppressWarnings({"rawtypes","unchecked"}) void addRequestBodyToQueue(String body, Queue q) throws IOException { ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); - DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf)); + DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf); // only used for primordial stream q.put(df); } @@ -795,13 +919,13 @@ * @param amount */ synchronized void obtainConnectionWindow(int amount) throws InterruptedException { - while (amount > 0) { - int n = Math.min(amount, sendWindow); - amount -= n; - sendWindow -= n; - if (amount > 0) - wait(); - } + while (amount > 0) { + int n = Math.min(amount, sendWindow); + amount -= n; + sendWindow -= n; + if (amount > 0) + wait(); + } } synchronized void updateConnectionWindow(int amount) { @@ -823,9 +947,9 @@ } static class NullInputStream extends InputStream { - static final NullInputStream INSTANCE = new NullInputStream(); - private NullInputStream() {} - public int read() { return -1; } - public int available() { return 0; } - } + static final NullInputStream INSTANCE = new NullInputStream(); + private NullInputStream() {} + public int read() { return -1; } + public int available() { return 0; } + } }