< prev index next >

test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java

Print this page

        

*** 1,7 **** /* ! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. --- 1,7 ---- /* ! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation.
*** 34,103 **** import javax.net.ssl.*; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; - - import jdk.incubator.http.internal.common.ByteBufferReference; - import jdk.incubator.http.internal.frame.FramesDecoder; - - import jdk.incubator.http.internal.common.BufferHandler; import jdk.incubator.http.internal.common.HttpHeadersImpl; - import jdk.incubator.http.internal.common.Queue; import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; import jdk.incubator.http.internal.hpack.Encoder; import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; /** * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1 * or HTTPS opened using "h2" ALPN. */ public class Http2TestServerConnection { final Http2TestServer server; @SuppressWarnings({"rawtypes","unchecked"}) final Map<Integer, Queue> streams; // input q per stream final HashSet<Integer> pushStreams; final Queue<Http2Frame> outputQ; volatile int nextstream; final Socket socket; final InputStream is; final OutputStream os; volatile Encoder hpackOut; volatile Decoder hpackIn; volatile SettingsFrame clientSettings; final SettingsFrame serverSettings; final ExecutorService exec; final boolean secure; volatile boolean stopping; volatile int nextPushStreamId = 2; final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); final static byte[] EMPTY_BARRAY = new byte[0]; final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); ! Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException { if (socket instanceof SSLSocket) { handshake(server.serverName(), (SSLSocket)socket); } System.err.println("TestServer: New connection from " + socket); this.server = server; this.streams = Collections.synchronizedMap(new HashMap<>()); ! this.outputQ = new Queue<>(); this.socket = socket; this.serverSettings = SettingsFrame.getDefaultSettings(); this.exec = server.exec; this.secure = server.secure; this.pushStreams = new HashSet<>(); is = new BufferedInputStream(socket.getInputStream()); os = new BufferedOutputStream(socket.getOutputStream()); } private static boolean compareIPAddrs(InetAddress addr1, String host) { try { InetAddress addr2 = InetAddress.getByName(host); return addr1.equals(addr2); } catch (IOException e) { --- 34,202 ---- import javax.net.ssl.*; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; + import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import jdk.incubator.http.internal.common.HttpHeadersImpl; import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; import jdk.incubator.http.internal.hpack.Encoder; + import sun.net.www.http.ChunkedInputStream; + import sun.net.www.http.HttpClient; import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; /** * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1 * or HTTPS opened using "h2" ALPN. */ public class Http2TestServerConnection { final Http2TestServer server; @SuppressWarnings({"rawtypes","unchecked"}) final Map<Integer, Queue> streams; // input q per stream + final Map<Integer, BodyOutputStream> outStreams; // output q per stream final HashSet<Integer> pushStreams; final Queue<Http2Frame> outputQ; volatile int nextstream; final Socket socket; + final Http2TestExchangeSupplier exchangeSupplier; final InputStream is; final OutputStream os; volatile Encoder hpackOut; volatile Decoder hpackIn; volatile SettingsFrame clientSettings; final SettingsFrame serverSettings; final ExecutorService exec; final boolean secure; volatile boolean stopping; volatile int nextPushStreamId = 2; + ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>(); final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); final static byte[] EMPTY_BARRAY = new byte[0]; + final Random random; final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); ! static class Sentinel extends Http2Frame { ! Sentinel() { super(-1,-1);} ! } ! ! class PingRequest { ! final byte[] pingData; ! final long pingStamp; ! final CompletableFuture<Long> response; ! ! PingRequest() { ! pingData = new byte[8]; ! random.nextBytes(pingData); ! pingStamp = System.currentTimeMillis(); ! response = new CompletableFuture<>(); ! } ! ! PingFrame frame() { ! return new PingFrame(0, pingData); ! } ! ! CompletableFuture<Long> response() { ! return response; ! } ! ! void success() { ! response.complete(System.currentTimeMillis() - pingStamp); ! } ! ! void fail(Throwable t) { ! response.completeExceptionally(t); ! } ! } ! ! static Sentinel sentinel; ! ! Http2TestServerConnection(Http2TestServer server, ! Socket socket, ! Http2TestExchangeSupplier exchangeSupplier) ! throws IOException ! { if (socket instanceof SSLSocket) { handshake(server.serverName(), (SSLSocket)socket); } System.err.println("TestServer: New connection from " + socket); this.server = server; + this.exchangeSupplier = exchangeSupplier; this.streams = Collections.synchronizedMap(new HashMap<>()); ! this.outStreams = Collections.synchronizedMap(new HashMap<>()); ! this.outputQ = new Queue<>(sentinel); ! this.random = new Random(); this.socket = socket; + this.socket.setTcpNoDelay(true); this.serverSettings = SettingsFrame.getDefaultSettings(); this.exec = server.exec; this.secure = server.secure; this.pushStreams = new HashSet<>(); is = new BufferedInputStream(socket.getInputStream()); os = new BufferedOutputStream(socket.getOutputStream()); } + /** + * Sends a PING frame on this connection, and completes the returned + * CF when the PING ack is received. The CF is given + * an integer, whose value is the number of milliseconds + * between PING and ACK. + */ + CompletableFuture<Long> sendPing() { + PingRequest ping = null; + try { + ping = new PingRequest(); + pings.add(ping); + outputQ.put(ping.frame()); + } catch (Throwable t) { + ping.fail(t); + } + return ping.response(); + } + + /** + * Returns the first PingRequest from Queue + */ + private PingRequest getNextRequest() { + return pings.poll(); + } + + /** + * Handles incoming Ping, which could be an ack + * or a client originated Ping + */ + void handlePing(PingFrame ping) throws IOException { + if (ping.streamid() != 0) { + System.err.println("Invalid ping received"); + close(); + return; + } + if (ping.getFlag(PingFrame.ACK)) { + // did we send a Ping? + PingRequest request = getNextRequest(); + if (request == null) { + System.err.println("Invalid ping ACK received"); + close(); + return; + } else if (!Arrays.equals(request.pingData, ping.getData())) { + request.fail(new RuntimeException("Wrong ping data in ACK")); + } else { + request.success(); + } + } else { + // client originated PING. Just send it back with ACK set + ping.setFlag(PingFrame.ACK); + outputQ.put(ping); + } + } + private static boolean compareIPAddrs(InetAddress addr1, String host) { try { InetAddress addr2 = InetAddress.getByName(host); return addr1.equals(addr2); } catch (IOException e) {
*** 184,195 **** // simulate header of Settings Frame ByteBuffer bb0 = ByteBuffer.wrap( new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); List<Http2Frame> frames = new ArrayList<>(); FramesDecoder reader = new FramesDecoder(frames::add); ! reader.decode(ByteBufferReference.of(bb0)); ! reader.decode(ByteBufferReference.of(bb1)); if (frames.size()!=1) throw new IOException("Expected 1 frame got "+frames.size()) ; Http2Frame frame = frames.get(0); if (!(frame instanceof SettingsFrame)) throw new IOException("Expected SettingsFrame"); --- 283,294 ---- // simulate header of Settings Frame ByteBuffer bb0 = ByteBuffer.wrap( new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); List<Http2Frame> frames = new ArrayList<>(); FramesDecoder reader = new FramesDecoder(frames::add); ! reader.decode(bb0); ! reader.decode(bb1); if (frames.size()!=1) throw new IOException("Expected 1 frame got "+frames.size()) ; Http2Frame frame = frames.get(0); if (!(frame instanceof SettingsFrame)) throw new IOException("Expected SettingsFrame");
*** 224,269 **** exec.submit(this::readLoop); exec.submit(this::writeLoop); } - static class BufferPool implements BufferHandler { - - public void setMinBufferSize(int size) { - } - - @Override - public ByteBuffer getBuffer() { - int size = 32 * 1024; - return ByteBuffer.allocate(size); - } - - @Override - public void returnBuffer(ByteBuffer buffer) { - } - } - private void writeFrame(Http2Frame frame) throws IOException { ! ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame); //System.err.println("TestServer: Writing frame " + frame.toString()); int c = 0; ! for (ByteBufferReference ref : refs) { ! ByteBuffer buf = ref.get(); byte[] ba = buf.array(); int start = buf.arrayOffset() + buf.position(); c += buf.remaining(); os.write(ba, start, buf.remaining()); } os.flush(); //System.err.printf("TestServer: wrote %d bytes\n", c); } - void handleStreamReset(ResetFrame resetFrame) throws IOException { - // TODO: cleanup - throw new IOException("Stream reset"); - } - private void handleCommonFrame(Http2Frame f) throws IOException { if (f instanceof SettingsFrame) { SettingsFrame sf = (SettingsFrame) f; if (sf.getFlag(SettingsFrame.ACK)) // ignore { --- 323,360 ---- exec.submit(this::readLoop); exec.submit(this::writeLoop); } private void writeFrame(Http2Frame frame) throws IOException { ! List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame); //System.err.println("TestServer: Writing frame " + frame.toString()); int c = 0; ! for (ByteBuffer buf : bufs) { byte[] ba = buf.array(); int start = buf.arrayOffset() + buf.position(); c += buf.remaining(); os.write(ba, start, buf.remaining()); + + // System.out.println("writing byte at a time"); + // while (buf.hasRemaining()) { + // byte b = buf.get(); + // os.write(b); + // os.flush(); + // try { + // Thread.sleep(1); + // } catch(InterruptedException e) { + // UncheckedIOException uie = new UncheckedIOException(new IOException("")); + // uie.addSuppressed(e); + // throw uie; + // } + // } } os.flush(); //System.err.printf("TestServer: wrote %d bytes\n", c); } private void handleCommonFrame(Http2Frame f) throws IOException { if (f instanceof SettingsFrame) { SettingsFrame sf = (SettingsFrame) f; if (sf.getFlag(SettingsFrame.ACK)) // ignore {
*** 274,286 **** SettingsFrame frame = new SettingsFrame(); frame.setFlag(SettingsFrame.ACK); frame.streamid(0); outputQ.put(frame); return; ! } ! //System.err.println("TestServer: Received ---> " + f.toString()); ! throw new UnsupportedOperationException("Not supported yet."); } void sendWindowUpdates(int len, int streamid) throws IOException { if (len == 0) return; --- 365,381 ---- SettingsFrame frame = new SettingsFrame(); frame.setFlag(SettingsFrame.ACK); frame.streamid(0); outputQ.put(frame); return; ! } else if (f instanceof GoAwayFrame) { ! System.err.println("Closing: "+ f.toString()); ! close(); ! } else if (f instanceof PingFrame) { ! handlePing((PingFrame)f); ! } else ! throw new UnsupportedOperationException("Not supported yet: " + f.toString()); } void sendWindowUpdates(int len, int streamid) throws IOException { if (len == 0) return;
*** 288,308 **** outputQ.put(wup); wup = new WindowUpdateFrame(0 , len); outputQ.put(wup); } ! HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) { HttpHeadersImpl headers = new HttpHeadersImpl(); DecodingCallback cb = (name, value) -> { headers.addHeader(name.toString(), value.toString()); }; for (HeaderFrame frame : frames) { ! ByteBufferReference[] buffers = frame.getHeaderBlock(); ! for (ByteBufferReference buffer : buffers) { ! hpackIn.decode(buffer.get(), false, cb); } } hpackIn.decode(EMPTY_BUFFER, true, cb); return headers; } --- 383,403 ---- outputQ.put(wup); wup = new WindowUpdateFrame(0 , len); outputQ.put(wup); } ! HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException { HttpHeadersImpl headers = new HttpHeadersImpl(); DecodingCallback cb = (name, value) -> { headers.addHeader(name.toString(), value.toString()); }; for (HeaderFrame frame : frames) { ! List<ByteBuffer> buffers = frame.getHeaderBlock(); ! for (ByteBuffer buffer : buffers) { ! hpackIn.decode(buffer, false, cb); } } hpackIn.decode(EMPTY_BUFFER, true, cb); return headers; }
*** 357,367 **** headers.setHeader(":method", tokens[0]); headers.setHeader(":scheme", "http"); // always in this case headers.setHeader(":authority", host); headers.setHeader(":path", uri.getPath()); ! Queue q = new Queue(); String body = getRequestBody(request); addHeaders(getHeaders(request), headers); headers.setHeader("Content-length", Integer.toString(body.length())); addRequestBodyToQueue(body, q); --- 452,462 ---- headers.setHeader(":method", tokens[0]); headers.setHeader(":scheme", "http"); // always in this case headers.setHeader(":authority", host); headers.setHeader(":path", uri.getPath()); ! Queue q = new Queue(sentinel); String body = getRequestBody(request); addHeaders(getHeaders(request), headers); headers.setHeader("Content-length", Integer.toString(body.length())); addRequestBodyToQueue(body, q);
*** 399,409 **** frames.add(frame); } } boolean endStreamReceived = endStream; HttpHeadersImpl headers = decodeHeaders(frames); ! Queue q = new Queue(); streams.put(streamid, q); exec.submit(() -> { handleRequest(headers, q, streamid, endStreamReceived); }); } --- 494,504 ---- frames.add(frame); } } boolean endStreamReceived = endStream; HttpHeadersImpl headers = decodeHeaders(frames); ! Queue q = new Queue(sentinel); streams.put(streamid, q); exec.submit(() -> { handleRequest(headers, q, streamid, endStreamReceived); }); }
*** 440,453 **** bis = new BodyInputStream(queue, streamid, this); } try (bis; BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) { String us = scheme + "://" + authority + path; URI uri = new URI(us); boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; ! Http2TestExchange exchange = new Http2TestExchange(streamid, method, headers, rspheaders, uri, bis, getSSLSession(), bos, this, pushAllowed); // give to user Http2Handler handler = server.getHandlerFor(uri.getPath()); --- 535,549 ---- bis = new BodyInputStream(queue, streamid, this); } try (bis; BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) { + outStreams.put(streamid, bos); String us = scheme + "://" + authority + path; URI uri = new URI(us); boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; ! Http2TestExchange exchange = exchangeSupplier.get(streamid, method, headers, rspheaders, uri, bis, getSSLSession(), bos, this, pushAllowed); // give to user Http2Handler handler = server.getHandlerFor(uri.getPath());
*** 505,514 **** --- 601,621 ---- WindowUpdateFrame wup = (WindowUpdateFrame) frame; synchronized (updaters) { Consumer<Integer> r = updaters.get(stream); r.accept(wup.getUpdate()); } + } else if (frame.type() == ResetFrame.TYPE) { + // do orderly close on input q + // and close the output q immediately + // This should mean depending on what the + // handler is doing: either an EOF on read + // or an IOException if writing the response. + q.orderlyClose(); + BodyOutputStream oq = outStreams.get(stream); + if (oq != null) + oq.closeInternal(); + } else { q.put(frame); } } }
*** 520,530 **** } close(); } } ! ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) { List<ByteBuffer> buffers = new LinkedList<>(); ByteBuffer buf = getBuffer(); boolean encoded; for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) { --- 627,637 ---- } close(); } } ! List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) { List<ByteBuffer> buffers = new LinkedList<>(); ByteBuffer buf = getBuffer(); boolean encoded; for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
*** 542,552 **** } while (!encoded); } } buf.flip(); buffers.add(buf); ! return ByteBufferReference.toReferences(buffers.toArray(bbarray)); } static void closeIgnore(Closeable c) { try { c.close(); --- 649,659 ---- } while (!encoded); } } buf.flip(); buffers.add(buf); ! return buffers; } static void closeIgnore(Closeable c) { try { c.close();
*** 585,604 **** } } private void handlePush(OutgoingPushPromise op) throws IOException { int promisedStreamid = nextPushStreamId; ! PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0); pushStreams.add(promisedStreamid); nextPushStreamId += 2; pp.streamid(op.parentStream); writeFrame(pp); final InputStream ii = op.is; final BodyOutputStream oo = new BodyOutputStream( promisedStreamid, clientSettings.getParameter( SettingsFrame.INITIAL_WINDOW_SIZE), this); oo.goodToGo(); exec.submit(() -> { try { ResponseHeaders oh = getPushResponse(promisedStreamid); outputQ.put(oh); --- 692,716 ---- } } private void handlePush(OutgoingPushPromise op) throws IOException { int promisedStreamid = nextPushStreamId; ! PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, ! HeaderFrame.END_HEADERS, ! promisedStreamid, ! encodeHeaders(op.headers), ! 0); pushStreams.add(promisedStreamid); nextPushStreamId += 2; pp.streamid(op.parentStream); writeFrame(pp); final InputStream ii = op.is; final BodyOutputStream oo = new BodyOutputStream( promisedStreamid, clientSettings.getParameter( SettingsFrame.INITIAL_WINDOW_SIZE), this); + outStreams.put(promisedStreamid, oo); oo.goodToGo(); exec.submit(() -> { try { ResponseHeaders oh = getPushResponse(promisedStreamid); outputQ.put(oh);
*** 643,654 **** int n = is.readNBytes(rest, 0, len); if (n != len) throw new IOException("Error reading frame"); List<Http2Frame> frames = new ArrayList<>(); FramesDecoder reader = new FramesDecoder(frames::add); ! reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf))); ! reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest))); if (frames.size()!=1) throw new IOException("Expected 1 frame got "+frames.size()) ; return frames.get(0); } --- 755,766 ---- int n = is.readNBytes(rest, 0, len); if (n != len) throw new IOException("Error reading frame"); List<Http2Frame> frames = new ArrayList<>(); FramesDecoder reader = new FramesDecoder(frames::add); ! reader.decode(ByteBuffer.wrap(buf)); ! reader.decode(ByteBuffer.wrap(rest)); if (frames.size()!=1) throw new IOException("Expected 1 frame got "+frames.size()) ; return frames.get(0); }
*** 719,735 **** final static String CRLFCRLF = "\r\n\r\n"; String readHttp1Request() throws IOException { String headers = readUntil(CRLF + CRLF); int clen = getContentLength(headers); ! // read the content. ! byte[] buf = new byte[clen]; is.readNBytes(buf, 0, clen); String body = new String(buf, StandardCharsets.US_ASCII); return headers + body; } void sendHttp1Response(int code, String msg, String... headers) throws IOException { StringBuilder sb = new StringBuilder(); sb.append("HTTP/1.1 ") .append(code) .append(' ') --- 831,859 ---- final static String CRLFCRLF = "\r\n\r\n"; String readHttp1Request() throws IOException { String headers = readUntil(CRLF + CRLF); int clen = getContentLength(headers); ! byte[] buf; ! if (clen >= 0) { ! // HTTP/1.1 fixed length content ( may be 0 ), read it ! buf = new byte[clen]; is.readNBytes(buf, 0, clen); + } else { + // HTTP/1.1 chunked data, read it + buf = readChunkedInputStream(is); + } String body = new String(buf, StandardCharsets.US_ASCII); return headers + body; } + // This is a quick hack to get a chunked input stream reader. + private static byte[] readChunkedInputStream(InputStream is) throws IOException { + ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null); + return cis.readAllBytes(); + } + void sendHttp1Response(int code, String msg, String... headers) throws IOException { StringBuilder sb = new StringBuilder(); sb.append("HTTP/1.1 ") .append(code) .append(' ')
*** 769,779 **** } @SuppressWarnings({"rawtypes","unchecked"}) void addRequestBodyToQueue(String body, Queue q) throws IOException { ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); ! DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf)); // only used for primordial stream q.put(df); } // window updates done in main reader thread because they may --- 893,903 ---- } @SuppressWarnings({"rawtypes","unchecked"}) void addRequestBodyToQueue(String body, Queue q) throws IOException { ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); ! DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf); // only used for primordial stream q.put(df); } // window updates done in main reader thread because they may
< prev index next >