< prev index next >

test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.

@@ -34,70 +34,169 @@
 import javax.net.ssl.*;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Consumer;
-
-import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.frame.FramesDecoder;
-
-import jdk.incubator.http.internal.common.BufferHandler;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
-import jdk.incubator.http.internal.common.Queue;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.Decoder;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
 import jdk.incubator.http.internal.hpack.Encoder;
+import sun.net.www.http.ChunkedInputStream;
+import sun.net.www.http.HttpClient;
 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
 
 /**
  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  * or HTTPS opened using "h2" ALPN.
  */
 public class Http2TestServerConnection {
     final Http2TestServer server;
     @SuppressWarnings({"rawtypes","unchecked"})
     final Map<Integer, Queue> streams; // input q per stream
+    final Map<Integer, BodyOutputStream> outStreams; // output q per stream
     final HashSet<Integer> pushStreams;
     final Queue<Http2Frame> outputQ;
     volatile int nextstream;
     final Socket socket;
+    final Http2TestExchangeSupplier exchangeSupplier;
     final InputStream is;
     final OutputStream os;
     volatile Encoder hpackOut;
     volatile Decoder hpackIn;
     volatile SettingsFrame clientSettings;
     final SettingsFrame serverSettings;
     final ExecutorService exec;
     final boolean secure;
     volatile boolean stopping;
     volatile int nextPushStreamId = 2;
+    ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
 
     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
     final static byte[] EMPTY_BARRAY = new byte[0];
+    final Random random;
 
     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
 
-    Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException {
+    static class Sentinel extends Http2Frame {
+        Sentinel() { super(-1,-1);}
+    }
+
+    class PingRequest {
+        final byte[] pingData;
+        final long pingStamp;
+        final CompletableFuture<Long> response;
+
+        PingRequest() {
+            pingData = new byte[8];
+            random.nextBytes(pingData);
+            pingStamp = System.currentTimeMillis();
+            response = new CompletableFuture<>();
+        }
+
+        PingFrame frame() {
+            return new PingFrame(0, pingData);
+        }
+
+        CompletableFuture<Long> response() {
+            return response;
+        }
+
+        void success() {
+            response.complete(System.currentTimeMillis() - pingStamp);
+        }
+
+        void fail(Throwable t) {
+            response.completeExceptionally(t);
+        }
+    }
+
+    static Sentinel sentinel;
+
+    Http2TestServerConnection(Http2TestServer server,
+                              Socket socket,
+                              Http2TestExchangeSupplier exchangeSupplier)
+        throws IOException
+    {
         if (socket instanceof SSLSocket) {
             handshake(server.serverName(), (SSLSocket)socket);
         }
         System.err.println("TestServer: New connection from " + socket);
         this.server = server;
+        this.exchangeSupplier = exchangeSupplier;
         this.streams = Collections.synchronizedMap(new HashMap<>());
-        this.outputQ = new Queue<>();
+        this.outStreams = Collections.synchronizedMap(new HashMap<>());
+        this.outputQ = new Queue<>(sentinel);
+        this.random = new Random();
         this.socket = socket;
+        this.socket.setTcpNoDelay(true);
         this.serverSettings = SettingsFrame.getDefaultSettings();
         this.exec = server.exec;
         this.secure = server.secure;
         this.pushStreams = new HashSet<>();
         is = new BufferedInputStream(socket.getInputStream());
         os = new BufferedOutputStream(socket.getOutputStream());
     }
 
+    /**
+     * Sends a PING frame on this connection, and completes the returned
+     * CF when the PING ack is received. The CF is given
+     * an integer, whose value is the number of milliseconds
+     * between PING and ACK.
+     */
+    CompletableFuture<Long> sendPing() {
+        PingRequest ping = null;
+        try {
+            ping = new PingRequest();
+            pings.add(ping);
+            outputQ.put(ping.frame());
+        } catch (Throwable t) {
+            ping.fail(t);
+        }
+        return ping.response();
+    }
+
+    /**
+     * Returns the first PingRequest from Queue
+     */
+    private PingRequest getNextRequest() {
+        return pings.poll();
+    }
+
+    /**
+     * Handles incoming Ping, which could be an ack
+     * or a client originated Ping
+     */
+    void handlePing(PingFrame ping) throws IOException {
+        if (ping.streamid() != 0) {
+            System.err.println("Invalid ping received");
+            close();
+            return;
+        }
+        if (ping.getFlag(PingFrame.ACK)) {
+            // did we send a Ping?
+            PingRequest request = getNextRequest();
+            if (request == null) {
+                System.err.println("Invalid ping ACK received");
+                close();
+                return;
+            } else if (!Arrays.equals(request.pingData, ping.getData())) {
+                request.fail(new RuntimeException("Wrong ping data in ACK"));
+            } else {
+                request.success();
+            }
+        } else {
+            // client originated PING. Just send it back with ACK set
+            ping.setFlag(PingFrame.ACK);
+            outputQ.put(ping);
+        }
+    }
+
     private static boolean compareIPAddrs(InetAddress addr1, String host) {
         try {
             InetAddress addr2 = InetAddress.getByName(host);
             return addr1.equals(addr2);
         } catch (IOException e) {

@@ -184,12 +283,12 @@
         // simulate header of Settings Frame
         ByteBuffer bb0 = ByteBuffer.wrap(
                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
         List<Http2Frame> frames = new ArrayList<>();
         FramesDecoder reader = new FramesDecoder(frames::add);
-        reader.decode(ByteBufferReference.of(bb0));
-        reader.decode(ByteBufferReference.of(bb1));
+        reader.decode(bb0);
+        reader.decode(bb1);
         if (frames.size()!=1)
             throw new IOException("Expected 1 frame got "+frames.size()) ;
         Http2Frame frame = frames.get(0);
         if (!(frame instanceof SettingsFrame))
             throw new IOException("Expected SettingsFrame");

@@ -224,46 +323,38 @@
 
         exec.submit(this::readLoop);
         exec.submit(this::writeLoop);
     }
 
-    static class BufferPool implements BufferHandler {
-
-        public void setMinBufferSize(int size) {
-        }
-
-        @Override
-        public ByteBuffer getBuffer() {
-            int size = 32 * 1024;
-            return ByteBuffer.allocate(size);
-        }
-
-        @Override
-        public void returnBuffer(ByteBuffer buffer) {
-        }
-    }
-
     private void writeFrame(Http2Frame frame) throws IOException {
-        ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame);
+        List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
         //System.err.println("TestServer: Writing frame " + frame.toString());
         int c = 0;
-        for (ByteBufferReference ref : refs) {
-            ByteBuffer buf = ref.get();
+        for (ByteBuffer buf : bufs) {
             byte[] ba = buf.array();
             int start = buf.arrayOffset() + buf.position();
             c += buf.remaining();
             os.write(ba, start, buf.remaining());
+
+//            System.out.println("writing byte at a time");
+//            while (buf.hasRemaining()) {
+//                byte b = buf.get();
+//                os.write(b);
+//                os.flush();
+//                try {
+//                    Thread.sleep(1);
+//                } catch(InterruptedException e) {
+//                    UncheckedIOException uie = new UncheckedIOException(new IOException(""));
+//                    uie.addSuppressed(e);
+//                    throw uie;
+//                }
+//            }
         }
         os.flush();
         //System.err.printf("TestServer: wrote %d bytes\n", c);
     }
 
-    void handleStreamReset(ResetFrame resetFrame) throws IOException {
-        // TODO: cleanup
-        throw new IOException("Stream reset");
-    }
-
     private void handleCommonFrame(Http2Frame f) throws IOException {
         if (f instanceof SettingsFrame) {
             SettingsFrame sf = (SettingsFrame) f;
             if (sf.getFlag(SettingsFrame.ACK)) // ignore
             {

@@ -274,13 +365,17 @@
             SettingsFrame frame = new SettingsFrame();
             frame.setFlag(SettingsFrame.ACK);
             frame.streamid(0);
             outputQ.put(frame);
             return;
-        }
-        //System.err.println("TestServer: Received ---> " + f.toString());
-        throw new UnsupportedOperationException("Not supported yet.");
+        } else if (f instanceof GoAwayFrame) {
+            System.err.println("Closing: "+ f.toString());
+            close();
+        } else if (f instanceof PingFrame) {
+            handlePing((PingFrame)f);
+        } else
+            throw new UnsupportedOperationException("Not supported yet: " + f.toString());
     }
 
     void sendWindowUpdates(int len, int streamid) throws IOException {
         if (len == 0)
             return;

@@ -288,21 +383,21 @@
         outputQ.put(wup);
         wup = new WindowUpdateFrame(0 , len);
         outputQ.put(wup);
     }
 
-    HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) {
+    HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException {
         HttpHeadersImpl headers = new HttpHeadersImpl();
 
         DecodingCallback cb = (name, value) -> {
             headers.addHeader(name.toString(), value.toString());
         };
 
         for (HeaderFrame frame : frames) {
-            ByteBufferReference[] buffers = frame.getHeaderBlock();
-            for (ByteBufferReference buffer : buffers) {
-                hpackIn.decode(buffer.get(), false, cb);
+            List<ByteBuffer> buffers = frame.getHeaderBlock();
+            for (ByteBuffer buffer : buffers) {
+                hpackIn.decode(buffer, false, cb);
             }
         }
         hpackIn.decode(EMPTY_BUFFER, true, cb);
         return headers;
     }

@@ -357,11 +452,11 @@
 
         headers.setHeader(":method", tokens[0]);
         headers.setHeader(":scheme", "http"); // always in this case
         headers.setHeader(":authority", host);
         headers.setHeader(":path", uri.getPath());
-        Queue q = new Queue();
+        Queue q = new Queue(sentinel);
         String body = getRequestBody(request);
         addHeaders(getHeaders(request), headers);
         headers.setHeader("Content-length", Integer.toString(body.length()));
 
         addRequestBodyToQueue(body, q);

@@ -399,11 +494,11 @@
                 frames.add(frame);
             }
         }
         boolean endStreamReceived = endStream;
         HttpHeadersImpl headers = decodeHeaders(frames);
-        Queue q = new Queue();
+        Queue q = new Queue(sentinel);
         streams.put(streamid, q);
         exec.submit(() -> {
             handleRequest(headers, q, streamid, endStreamReceived);
         });
     }

@@ -440,14 +535,15 @@
             bis = new BodyInputStream(queue, streamid, this);
         }
         try (bis;
              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
         {
+            outStreams.put(streamid, bos);
             String us = scheme + "://" + authority + path;
             URI uri = new URI(us);
             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
-            Http2TestExchange exchange = new Http2TestExchange(streamid, method,
+            Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
                     headers, rspheaders, uri, bis, getSSLSession(),
                     bos, this, pushAllowed);
 
             // give to user
             Http2Handler handler = server.getHandlerFor(uri.getPath());

@@ -505,10 +601,21 @@
                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
                             synchronized (updaters) {
                                 Consumer<Integer> r = updaters.get(stream);
                                 r.accept(wup.getUpdate());
                             }
+                        } else if (frame.type() == ResetFrame.TYPE) {
+                            // do orderly close on input q
+                            // and close the output q immediately
+                            // This should mean depending on what the
+                            // handler is doing: either an EOF on read
+                            // or an IOException if writing the response.
+                            q.orderlyClose();
+                            BodyOutputStream oq = outStreams.get(stream);
+                            if (oq != null)
+                                oq.closeInternal();
+
                         } else {
                             q.put(frame);
                         }
                     }
                 }

@@ -520,11 +627,11 @@
             }
             close();
         }
     }
 
-    ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) {
+    List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) {
         List<ByteBuffer> buffers = new LinkedList<>();
 
         ByteBuffer buf = getBuffer();
         boolean encoded;
         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {

@@ -542,11 +649,11 @@
                 } while (!encoded);
             }
         }
         buf.flip();
         buffers.add(buf);
-        return ByteBufferReference.toReferences(buffers.toArray(bbarray));
+        return buffers;
     }
 
     static void closeIgnore(Closeable c) {
         try {
             c.close();

@@ -585,20 +692,25 @@
         }
     }
 
     private void handlePush(OutgoingPushPromise op) throws IOException {
         int promisedStreamid = nextPushStreamId;
-        PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0);
+        PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
+                                                   HeaderFrame.END_HEADERS,
+                                                   promisedStreamid,
+                                                   encodeHeaders(op.headers),
+                                                   0);
         pushStreams.add(promisedStreamid);
         nextPushStreamId += 2;
         pp.streamid(op.parentStream);
         writeFrame(pp);
         final InputStream ii = op.is;
         final BodyOutputStream oo = new BodyOutputStream(
                 promisedStreamid,
                 clientSettings.getParameter(
                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
+        outStreams.put(promisedStreamid, oo);
         oo.goodToGo();
         exec.submit(() -> {
             try {
                 ResponseHeaders oh = getPushResponse(promisedStreamid);
                 outputQ.put(oh);

@@ -643,12 +755,12 @@
         int n = is.readNBytes(rest, 0, len);
         if (n != len)
             throw new IOException("Error reading frame");
         List<Http2Frame> frames = new ArrayList<>();
         FramesDecoder reader = new FramesDecoder(frames::add);
-        reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf)));
-        reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest)));
+        reader.decode(ByteBuffer.wrap(buf));
+        reader.decode(ByteBuffer.wrap(rest));
         if (frames.size()!=1)
             throw new IOException("Expected 1 frame got "+frames.size()) ;
 
         return frames.get(0);
     }

@@ -719,17 +831,29 @@
     final static String CRLFCRLF = "\r\n\r\n";
 
     String readHttp1Request() throws IOException {
         String headers = readUntil(CRLF + CRLF);
         int clen = getContentLength(headers);
-        // read the content.
-        byte[] buf = new byte[clen];
+        byte[] buf;
+        if (clen >= 0) {
+            // HTTP/1.1 fixed length content ( may be 0 ), read it
+            buf = new byte[clen];
         is.readNBytes(buf, 0, clen);
+        } else {
+            //  HTTP/1.1 chunked data, read it
+            buf = readChunkedInputStream(is);
+        }
         String body = new String(buf, StandardCharsets.US_ASCII);
         return headers + body;
     }
 
+    // This is a quick hack to get a chunked input stream reader.
+    private static byte[] readChunkedInputStream(InputStream is) throws IOException {
+        ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
+        return cis.readAllBytes();
+    }
+
     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
         StringBuilder sb = new StringBuilder();
         sb.append("HTTP/1.1 ")
                 .append(code)
                 .append(' ')

@@ -769,11 +893,11 @@
     }
 
     @SuppressWarnings({"rawtypes","unchecked"})
     void addRequestBodyToQueue(String body, Queue q) throws IOException {
         ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
-        DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf));
+        DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
         // only used for primordial stream
         q.put(df);
     }
 
     // window updates done in main reader thread because they may
< prev index next >