1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.BufferedInputStream;
  25 import java.io.BufferedOutputStream;
  26 import java.io.Closeable;
  27 import java.io.IOException;
  28 import java.io.UncheckedIOException;
  29 import java.io.InputStream;
  30 import java.io.OutputStream;
  31 import java.net.Socket;
  32 import java.net.URI;
  33 import java.net.InetAddress;
  34 import javax.net.ssl.*;
  35 import java.net.URISyntaxException;
  36 import java.nio.ByteBuffer;
  37 import java.nio.charset.StandardCharsets;
  38 import java.util.*;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.ExecutorService;
  41 import java.util.concurrent.ConcurrentLinkedQueue;
  42 import java.util.function.Consumer;
  43 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  44 import jdk.incubator.http.internal.frame.*;
  45 import jdk.incubator.http.internal.hpack.Decoder;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 import jdk.incubator.http.internal.hpack.Encoder;
  48 import sun.net.www.http.ChunkedInputStream;
  49 import sun.net.www.http.HttpClient;
  50 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
  51 
  52 /**
  53  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  54  * or HTTPS opened using "h2" ALPN.
  55  */
  56 public class Http2TestServerConnection {
  57     final Http2TestServer server;
  58     @SuppressWarnings({"rawtypes","unchecked"})
  59     final Map<Integer, Queue> streams; // input q per stream
  60     final Map<Integer, BodyOutputStream> outStreams; // output q per stream
  61     final HashSet<Integer> pushStreams;
  62     final Queue<Http2Frame> outputQ;
  63     volatile int nextstream;
  64     final Socket socket;
  65     final Http2TestExchangeSupplier exchangeSupplier;
  66     final InputStream is;
  67     final OutputStream os;
  68     volatile Encoder hpackOut;
  69     volatile Decoder hpackIn;
  70     volatile SettingsFrame clientSettings;
  71     final SettingsFrame serverSettings;
  72     final ExecutorService exec;
  73     final boolean secure;
  74     volatile boolean stopping;
  75     volatile int nextPushStreamId = 2;
  76     ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
  77 
  78     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
  79     final static byte[] EMPTY_BARRAY = new byte[0];
  80     final Random random;
  81 
  82     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
  83 
  84     static class Sentinel extends Http2Frame {
  85         Sentinel() { super(-1,-1);}
  86     }
  87 
  88     static final Sentinel sentinel = new Sentinel();
  89 
  90     class PingRequest {
  91         final byte[] pingData;
  92         final long pingStamp;
  93         final CompletableFuture<Long> response;
  94 
  95         PingRequest() {
  96             pingData = new byte[8];
  97             random.nextBytes(pingData);
  98             pingStamp = System.currentTimeMillis();
  99             response = new CompletableFuture<>();
 100         }
 101 
 102         PingFrame frame() {
 103             return new PingFrame(0, pingData);
 104         }
 105 
 106         CompletableFuture<Long> response() {
 107             return response;
 108         }
 109 
 110         void success() {
 111             response.complete(System.currentTimeMillis() - pingStamp);
 112         }
 113 
 114         void fail(Throwable t) {
 115             response.completeExceptionally(t);
 116         }
 117     }
 118 
 119     Http2TestServerConnection(Http2TestServer server,
 120                               Socket socket,
 121                               Http2TestExchangeSupplier exchangeSupplier)
 122         throws IOException
 123     {
 124         if (socket instanceof SSLSocket) {
 125             handshake(server.serverName(), (SSLSocket)socket);
 126         }
 127         System.err.println("TestServer: New connection from " + socket);
 128         this.server = server;
 129         this.exchangeSupplier = exchangeSupplier;
 130         this.streams = Collections.synchronizedMap(new HashMap<>());
 131         this.outStreams = Collections.synchronizedMap(new HashMap<>());
 132         this.outputQ = new Queue<>(sentinel);
 133         this.random = new Random();
 134         this.socket = socket;
 135         this.socket.setTcpNoDelay(true);
 136         this.serverSettings = SettingsFrame.getDefaultSettings();
 137         this.exec = server.exec;
 138         this.secure = server.secure;
 139         this.pushStreams = new HashSet<>();
 140         is = new BufferedInputStream(socket.getInputStream());
 141         os = new BufferedOutputStream(socket.getOutputStream());
 142     }
 143 
 144     /**
 145      * Sends a PING frame on this connection, and completes the returned
 146      * CF when the PING ack is received. The CF is given
 147      * an integer, whose value is the number of milliseconds
 148      * between PING and ACK.
 149      */
 150     CompletableFuture<Long> sendPing() {
 151         PingRequest ping = null;
 152         try {
 153             ping = new PingRequest();
 154             pings.add(ping);
 155             outputQ.put(ping.frame());
 156         } catch (Throwable t) {
 157             ping.fail(t);
 158         }
 159         return ping.response();
 160     }
 161 
 162     void goAway(int error) throws IOException {
 163         int laststream = nextstream >= 3 ? nextstream - 2 : 1;
 164 
 165         GoAwayFrame go = new GoAwayFrame(laststream, error);
 166         outputQ.put(go);
 167     }
 168 
 169     /**
 170      * Returns the first PingRequest from Queue
 171      */
 172     private PingRequest getNextRequest() {
 173         return pings.poll();
 174     }
 175 
 176     /**
 177      * Handles incoming Ping, which could be an ack
 178      * or a client originated Ping
 179      */
 180     void handlePing(PingFrame ping) throws IOException {
 181         if (ping.streamid() != 0) {
 182             System.err.println("Invalid ping received");
 183             close(ErrorFrame.PROTOCOL_ERROR);
 184             return;
 185         }
 186         if (ping.getFlag(PingFrame.ACK)) {
 187             // did we send a Ping?
 188             PingRequest request = getNextRequest();
 189             if (request == null) {
 190                 System.err.println("Invalid ping ACK received");
 191                 close(ErrorFrame.PROTOCOL_ERROR);
 192                 return;
 193             } else if (!Arrays.equals(request.pingData, ping.getData())) {
 194                 request.fail(new RuntimeException("Wrong ping data in ACK"));
 195             } else {
 196                 request.success();
 197             }
 198         } else {
 199             // client originated PING. Just send it back with ACK set
 200             ping.setFlag(PingFrame.ACK);
 201             outputQ.put(ping);
 202         }
 203     }
 204 
 205     private static boolean compareIPAddrs(InetAddress addr1, String host) {
 206         try {
 207             InetAddress addr2 = InetAddress.getByName(host);
 208             return addr1.equals(addr2);
 209         } catch (IOException e) {
 210             throw new UncheckedIOException(e);
 211         }
 212     }
 213 
 214     private static void handshake(String name, SSLSocket sock) throws IOException {
 215         if (name == null) {
 216             // no name set. No need to check
 217             return;
 218         } else if (name.equals("127.0.0.1")) {
 219             name = "localhost";
 220         }
 221         final String fname = name;
 222         final InetAddress addr1 = InetAddress.getByName(name);
 223         SSLParameters params = sock.getSSLParameters();
 224         SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {
 225             public boolean matches (SNIServerName n) {
 226                 String host = ((SNIHostName)n).getAsciiName();
 227                 if (host.equals("127.0.0.1"))
 228                     host = "localhost";
 229                 boolean cmp = host.equalsIgnoreCase(fname);
 230                 if (cmp)
 231                     return true;
 232                 return compareIPAddrs(addr1, host);
 233             }
 234         };
 235         List<SNIMatcher> list = List.of(matcher);
 236         params.setSNIMatchers(list);
 237         sock.setSSLParameters(params);
 238         sock.getSession(); // blocks until handshake done
 239     }
 240 
 241     void closeIncoming() {
 242         close(-1);
 243     }
 244 
 245     void close(int error) {
 246         if (stopping)
 247             return;
 248         stopping = true;
 249         System.err.printf("Server connection to %s stopping. %d streams\n",
 250             socket.getRemoteSocketAddress().toString(), streams.size());
 251         streams.forEach((i, q) -> {
 252             q.orderlyClose();
 253         });
 254         try {
 255             if (error != -1)
 256                 goAway(error);
 257             outputQ.orderlyClose();
 258             socket.close();
 259         } catch (Exception e) {
 260         }
 261     }
 262 
 263     private void readPreface() throws IOException {
 264         int len = clientPreface.length;
 265         byte[] bytes = new byte[len];
 266         is.readNBytes(bytes, 0, len);
 267         if (Arrays.compare(clientPreface, bytes) != 0) {
 268             throw new IOException("Invalid preface: " + new String(bytes, 0, len));
 269         }
 270     }
 271 
 272     String doUpgrade() throws IOException {
 273         String upgrade = readHttp1Request();
 274         String h2c = getHeader(upgrade, "Upgrade");
 275         if (h2c == null || !h2c.equals("h2c")) {
 276             System.err.println("Server:HEADERS: " + upgrade);
 277             throw new IOException("Bad upgrade 1 " + h2c);
 278         }
 279 
 280         sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade",
 281                 "Upgrade", "h2c");
 282 
 283         sendSettingsFrame();
 284         readPreface();
 285 
 286         String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
 287         clientSettings = getSettingsFromString(clientSettingsString);
 288 
 289         return upgrade;
 290     }
 291 
 292     /**
 293      * Decodes the given, Client, settings payload provided in base64 HTTP1
 294      * header value.
 295      */
 296     private SettingsFrame getSettingsFromString(String s) throws IOException {
 297         Base64.Decoder decoder = Base64.getUrlDecoder();
 298         byte[] payload = decoder.decode(s);
 299         ByteBuffer bb1 = ByteBuffer.wrap(payload);
 300         // simulate header of Settings Frame
 301         ByteBuffer bb0 = ByteBuffer.wrap(
 302                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
 303         List<Http2Frame> frames = new ArrayList<>();
 304         FramesDecoder reader = new FramesDecoder(frames::add);
 305         reader.decode(bb0);
 306         reader.decode(bb1);
 307         if (frames.size()!=1)
 308             throw new IOException("Expected 1 frame got "+frames.size()) ;
 309         Http2Frame frame = frames.get(0);
 310         if (!(frame instanceof SettingsFrame))
 311             throw new IOException("Expected SettingsFrame");
 312         return (SettingsFrame)frame;
 313     }
 314 
 315     void run() throws Exception {
 316         String upgrade = null;
 317         if (!secure) {
 318             upgrade = doUpgrade();
 319         } else {
 320             readPreface();
 321             sendSettingsFrame(true);
 322             clientSettings = (SettingsFrame) readFrame();
 323             if (clientSettings.getFlag(SettingsFrame.ACK)) {
 324                 // we received the ack to our frame first
 325                 clientSettings = (SettingsFrame) readFrame();
 326             }
 327             nextstream = 1;
 328         }
 329 
 330         System.out.println("ServerSettings: " + serverSettings);
 331         System.out.println("ClientSettings: " + clientSettings);
 332 
 333         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 334         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 335 
 336         if (!secure) {
 337             createPrimordialStream(upgrade);
 338             nextstream = 3;
 339         }
 340 
 341         (new ConnectionThread("readLoop", this::readLoop)).start();
 342         (new ConnectionThread("writeLoop", this::writeLoop)).start();
 343     }
 344 
 345     class ConnectionThread extends Thread {
 346         final Runnable r;
 347         ConnectionThread(String name, Runnable r) {
 348             setName(name);
 349             setDaemon(true);
 350             this.r = r;
 351         }
 352 
 353         public void run() {
 354             r.run();
 355         }
 356     }
 357 
 358     private void writeFrame(Http2Frame frame) throws IOException {
 359         List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
 360         //System.err.println("TestServer: Writing frame " + frame.toString());
 361         int c = 0;
 362         for (ByteBuffer buf : bufs) {
 363             byte[] ba = buf.array();
 364             int start = buf.arrayOffset() + buf.position();
 365             c += buf.remaining();
 366             os.write(ba, start, buf.remaining());
 367 
 368 //            System.out.println("writing byte at a time");
 369 //            while (buf.hasRemaining()) {
 370 //                byte b = buf.get();
 371 //                os.write(b);
 372 //                os.flush();
 373 //                try {
 374 //                    Thread.sleep(1);
 375 //                } catch(InterruptedException e) {
 376 //                    UncheckedIOException uie = new UncheckedIOException(new IOException(""));
 377 //                    uie.addSuppressed(e);
 378 //                    throw uie;
 379 //                }
 380 //            }
 381         }
 382         os.flush();
 383         //System.err.printf("TestServer: wrote %d bytes\n", c);
 384     }
 385 
 386     private void handleCommonFrame(Http2Frame f) throws IOException {
 387         if (f instanceof SettingsFrame) {
 388             SettingsFrame sf = (SettingsFrame) f;
 389             if (sf.getFlag(SettingsFrame.ACK)) // ignore
 390             {
 391                 return;
 392             }
 393             // otherwise acknowledge it
 394             clientSettings = sf;
 395             SettingsFrame frame = new SettingsFrame();
 396             frame.setFlag(SettingsFrame.ACK);
 397             frame.streamid(0);
 398             outputQ.put(frame);
 399             return;
 400         } else if (f instanceof GoAwayFrame) {
 401             System.err.println("Closing: "+ f.toString());
 402             close(ErrorFrame.NO_ERROR);
 403         } else if (f instanceof PingFrame) {
 404             handlePing((PingFrame)f);
 405         } else
 406             throw new UnsupportedOperationException("Not supported yet: " + f.toString());
 407     }
 408 
 409     void sendWindowUpdates(int len, int streamid) throws IOException {
 410         if (len == 0)
 411             return;
 412         WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
 413         outputQ.put(wup);
 414         wup = new WindowUpdateFrame(0 , len);
 415         outputQ.put(wup);
 416     }
 417 
 418     HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException {
 419         HttpHeadersImpl headers = new HttpHeadersImpl();
 420 
 421         DecodingCallback cb = (name, value) -> {
 422             headers.addHeader(name.toString(), value.toString());
 423         };
 424 
 425         for (HeaderFrame frame : frames) {
 426             List<ByteBuffer> buffers = frame.getHeaderBlock();
 427             for (ByteBuffer buffer : buffers) {
 428                 hpackIn.decode(buffer, false, cb);
 429             }
 430         }
 431         hpackIn.decode(EMPTY_BUFFER, true, cb);
 432         return headers;
 433     }
 434 
 435     String getRequestLine(String request) {
 436         int eol = request.indexOf(CRLF);
 437         return request.substring(0, eol);
 438     }
 439 
 440     String getHeaders(String request) {
 441         int start = request.indexOf(CRLF);
 442         int end = request.indexOf(CRLFCRLF);
 443         if (start == -1 || end == -1) {
 444             throw new RuntimeException("Malformed request");
 445         }
 446         return request.substring(start,end);
 447     }
 448 
 449     void addHeaders(String headers, HttpHeadersImpl hdrs) {
 450         String[] hh = headers.split(CRLF);
 451         for (String header : hh) {
 452             int colon = header.indexOf(':');
 453             if (colon == -1)
 454                 continue;
 455             String name = header.substring(0, colon);
 456             String value = header.substring(colon+1);
 457             while (value.startsWith(" "))
 458                 value = value.substring(1);
 459             hdrs.addHeader(name, value);
 460         }
 461     }
 462 
 463     // First stream (1) comes from a plaintext HTTP/1.1 request
 464     @SuppressWarnings({"rawtypes","unchecked"})
 465     void createPrimordialStream(String request) throws IOException {
 466         HttpHeadersImpl headers = new HttpHeadersImpl();
 467         String requestLine = getRequestLine(request);
 468         String[] tokens = requestLine.split(" ");
 469         if (!tokens[2].equals("HTTP/1.1")) {
 470             throw new IOException("bad request line");
 471         }
 472         URI uri = null;
 473         try {
 474             uri = new URI(tokens[1]);
 475         } catch (URISyntaxException e) {
 476             throw new IOException(e);
 477         }
 478         String host = getHeader(request, "Host");
 479         if (host == null) {
 480             throw new IOException("missing Host");
 481         }
 482 
 483         headers.setHeader(":method", tokens[0]);
 484         headers.setHeader(":scheme", "http"); // always in this case
 485         headers.setHeader(":authority", host);
 486         headers.setHeader(":path", uri.getPath());
 487         Queue q = new Queue(sentinel);
 488         String body = getRequestBody(request);
 489         addHeaders(getHeaders(request), headers);
 490         headers.setHeader("Content-length", Integer.toString(body.length()));
 491 
 492         addRequestBodyToQueue(body, q);
 493         streams.put(1, q);
 494         exec.submit(() -> {
 495             handleRequest(headers, q, 1, true /*complete request has been read*/);
 496         });
 497     }
 498 
 499     // all other streams created here
 500     @SuppressWarnings({"rawtypes","unchecked"})
 501     void createStream(HeaderFrame frame) throws IOException {
 502         List<HeaderFrame> frames = new LinkedList<>();
 503         frames.add(frame);
 504         int streamid = frame.streamid();
 505         if (streamid != nextstream) {
 506             throw new IOException("unexpected stream id");
 507         }
 508         nextstream += 2;
 509 
 510         boolean endStream = false;
 511         if (frame.getFlag(HeaderFrame.END_STREAM)) {
 512             endStream = true;
 513         }
 514 
 515         while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
 516             Http2Frame f = readFrame();
 517             if (!(f instanceof HeaderFrame)) {
 518                 handleCommonFrame(f); // should only be error frames
 519             } else {
 520                 frame = (HeaderFrame) f;
 521                 if (frame.getFlag(HeaderFrame.END_STREAM)) {
 522                     endStream = true;
 523                 }
 524                 frames.add(frame);
 525             }
 526         }
 527         boolean endStreamReceived = endStream;
 528         HttpHeadersImpl headers = decodeHeaders(frames);
 529         Queue q = new Queue(sentinel);
 530         streams.put(streamid, q);
 531         exec.submit(() -> {
 532             handleRequest(headers, q, streamid, endStreamReceived);
 533         });
 534     }
 535 
 536     // runs in own thread. Handles request from start to finish. Incoming frames
 537     // for this stream/request delivered on Q
 538 
 539     @SuppressWarnings({"rawtypes","unchecked"})
 540     void handleRequest(HttpHeadersImpl headers,
 541                        Queue queue,
 542                        int streamid,
 543                        boolean endStreamReceived)
 544     {
 545         String method = headers.firstValue(":method").orElse("");
 546         //System.out.println("method = " + method);
 547         String path = headers.firstValue(":path").orElse("");
 548         //System.out.println("path = " + path);
 549         String scheme = headers.firstValue(":scheme").orElse("");
 550         //System.out.println("scheme = " + scheme);
 551         String authority = headers.firstValue(":authority").orElse("");
 552         //System.out.println("authority = " + authority);
 553         System.err.printf("TestServer: %s %s\n", method, path);
 554         HttpHeadersImpl rspheaders = new HttpHeadersImpl();
 555         int winsize = clientSettings.getParameter(
 556                 SettingsFrame.INITIAL_WINDOW_SIZE);
 557         //System.err.println ("Stream window size = " + winsize);
 558 
 559         final InputStream bis;
 560         if (endStreamReceived && queue.size() == 0) {
 561             System.err.println("Server: got END_STREAM for stream " + streamid);
 562             bis = NullInputStream.INSTANCE;
 563         } else {
 564             System.err.println("Server: creating input stream for stream " + streamid);
 565             bis = new BodyInputStream(queue, streamid, this);
 566         }
 567         try (bis;
 568              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
 569         {
 570             outStreams.put(streamid, bos);
 571             String us = scheme + "://" + authority + path;
 572             URI uri = new URI(us);
 573             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
 574             Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
 575                     headers, rspheaders, uri, bis, getSSLSession(),
 576                     bos, this, pushAllowed);
 577 
 578             // give to user
 579             Http2Handler handler = server.getHandlerFor(uri.getPath());
 580             handler.handle(exchange);
 581 
 582             // everything happens in the exchange from here. Hopefully will
 583             // return though.
 584         } catch (Throwable e) {
 585             System.err.println("TestServer: handleRequest exception: " + e);
 586             e.printStackTrace();
 587         }
 588     }
 589 
 590     private SSLSession getSSLSession() {
 591         if (! (socket instanceof SSLSocket))
 592             return null;
 593         SSLSocket ssl = (SSLSocket)socket;
 594         return ssl.getSession();
 595     }
 596     // Runs in own thread
 597 
 598     @SuppressWarnings({"rawtypes","unchecked"})
 599     void readLoop() {
 600         try {
 601             while (!stopping) {
 602                 Http2Frame frame = readFrameImpl();
 603                 if (frame == null) {
 604                     closeIncoming();
 605                     return;
 606                 }
 607                 //System.err.printf("TestServer: received frame %s\n", frame);
 608                 int stream = frame.streamid();
 609                 if (stream == 0) {
 610                     if (frame.type() == WindowUpdateFrame.TYPE) {
 611                         WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 612                         updateConnectionWindow(wup.getUpdate());
 613                     } else {
 614                         // other common frame types
 615                         handleCommonFrame(frame);
 616                     }
 617                 } else {
 618                     Queue q = streams.get(stream);
 619                     if (frame.type() == HeadersFrame.TYPE) {
 620                         if (q != null) {
 621                             System.err.println("HEADERS frame for existing stream! Error.");
 622                             // TODO: close connection
 623                             continue;
 624                         } else {
 625                             createStream((HeadersFrame) frame);
 626                         }
 627                     } else {
 628                         if (q == null && !pushStreams.contains(stream)) {
 629                             System.err.printf("Non Headers frame received with"+
 630                                     " non existing stream (%d) ", frame.streamid());
 631                             System.err.println(frame);
 632                             continue;
 633                         }
 634                         if (frame.type() == WindowUpdateFrame.TYPE) {
 635                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 636                             synchronized (updaters) {
 637                                 Consumer<Integer> r = updaters.get(stream);
 638                                 r.accept(wup.getUpdate());
 639                             }
 640                         } else if (frame.type() == ResetFrame.TYPE) {
 641                             // do orderly close on input q
 642                             // and close the output q immediately
 643                             // This should mean depending on what the
 644                             // handler is doing: either an EOF on read
 645                             // or an IOException if writing the response.
 646                             if (q != null) {
 647                                 q.orderlyClose();
 648                                 BodyOutputStream oq = outStreams.get(stream);
 649                                 if (oq != null)
 650                                     oq.closeInternal();
 651                             } else if (pushStreams.contains(stream)) {
 652                                 // we could interrupt the pushStream's output
 653                                 // but the continuation, even after a reset
 654                                 // should be handle gracefully by the client
 655                                 // anyway.
 656                             } else {
 657                                 System.err.println("TestServer: Unexpected frame on: " + stream);
 658                                 System.err.println(frame);
 659                                 throw new IOException("Unexpected frame");
 660                             }
 661                         } else {
 662                             q.put(frame);
 663                         }
 664                     }
 665                 }
 666             }
 667         } catch (Throwable e) {
 668             if (!stopping) {
 669                 System.err.println("Http server reader thread shutdown");
 670                 e.printStackTrace();
 671             }
 672             close(ErrorFrame.PROTOCOL_ERROR);
 673         }
 674     }
 675 
 676     List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) {
 677         List<ByteBuffer> buffers = new LinkedList<>();
 678 
 679         ByteBuffer buf = getBuffer();
 680         boolean encoded;
 681         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
 682             List<String> values = entry.getValue();
 683             String key = entry.getKey().toLowerCase();
 684             for (String value : values) {
 685                 do {
 686                     hpackOut.header(key, value);
 687                     encoded = hpackOut.encode(buf);
 688                     if (!encoded) {
 689                         buf.flip();
 690                         buffers.add(buf);
 691                         buf = getBuffer();
 692                     }
 693                 } while (!encoded);
 694             }
 695         }
 696         buf.flip();
 697         buffers.add(buf);
 698         return buffers;
 699     }
 700 
 701     static void closeIgnore(Closeable c) {
 702         try {
 703             c.close();
 704         } catch (IOException e) {}
 705     }
 706 
 707     // Runs in own thread
 708     void writeLoop() {
 709         try {
 710             while (!stopping) {
 711                 Http2Frame frame;
 712                 try {
 713                     frame = outputQ.take();
 714                     if (stopping)
 715                         break;
 716                 } catch(IOException x) {
 717                     if (stopping && x.getCause() instanceof InterruptedException) {
 718                         break;
 719                     } else throw x;
 720                 }
 721                 if (frame instanceof ResponseHeaders) {
 722                     ResponseHeaders rh = (ResponseHeaders)frame;
 723                     HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
 724                     writeFrame(hf);
 725                 } else if (frame instanceof OutgoingPushPromise) {
 726                     handlePush((OutgoingPushPromise)frame);
 727                 } else
 728                     writeFrame(frame);
 729             }
 730             System.err.println("TestServer: Connection writer stopping");
 731         } catch (Throwable e) {
 732             e.printStackTrace();
 733             /*close();
 734             if (!stopping) {
 735                 e.printStackTrace();
 736                 System.err.println("TestServer: writeLoop exception: " + e);
 737             }*/
 738         }
 739     }
 740 
 741     private void handlePush(OutgoingPushPromise op) throws IOException {
 742         int promisedStreamid = nextPushStreamId;
 743         PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
 744                                                    HeaderFrame.END_HEADERS,
 745                                                    promisedStreamid,
 746                                                    encodeHeaders(op.headers),
 747                                                    0);
 748         pushStreams.add(promisedStreamid);
 749         nextPushStreamId += 2;
 750         pp.streamid(op.parentStream);
 751         writeFrame(pp);
 752         final InputStream ii = op.is;
 753         final BodyOutputStream oo = new BodyOutputStream(
 754                 promisedStreamid,
 755                 clientSettings.getParameter(
 756                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
 757         outStreams.put(promisedStreamid, oo);
 758         oo.goodToGo();
 759         exec.submit(() -> {
 760             try {
 761                 ResponseHeaders oh = getPushResponse(promisedStreamid);
 762                 outputQ.put(oh);
 763                 ii.transferTo(oo);
 764             } catch (Throwable ex) {
 765                 System.err.printf("TestServer: pushing response error: %s\n",
 766                         ex.toString());
 767             } finally {
 768                 closeIgnore(ii);
 769                 closeIgnore(oo);
 770             }
 771         });
 772 
 773     }
 774 
 775     // returns a minimal response with status 200
 776     // that is the response to the push promise just sent
 777     private ResponseHeaders getPushResponse(int streamid) {
 778         HttpHeadersImpl h = new HttpHeadersImpl();
 779         h.addHeader(":status", "200");
 780         ResponseHeaders oh = new ResponseHeaders(h);
 781         oh.streamid(streamid);
 782         oh.setFlag(HeaderFrame.END_HEADERS);
 783         return oh;
 784     }
 785 
 786     private ByteBuffer getBuffer() {
 787         return ByteBuffer.allocate(8 * 1024);
 788     }
 789 
 790     private Http2Frame readFrame() throws IOException {
 791         Http2Frame f = readFrameImpl();
 792         if (f == null)
 793             throw new IOException("connection closed");
 794         return f;
 795     }
 796 
 797     // does not throw an exception for EOF
 798     private Http2Frame readFrameImpl() throws IOException {
 799         try {
 800             byte[] buf = new byte[9];
 801             int ret;
 802             ret=is.readNBytes(buf, 0, 9);
 803             if (ret == 0) {
 804                 return null;
 805             } else if (ret != 9) {
 806                 throw new IOException("readFrame: connection closed");
 807             }
 808             int len = 0;
 809             for (int i = 0; i < 3; i++) {
 810                 int n = buf[i] & 0xff;
 811                 //System.err.println("n = " + n);
 812                 len = (len << 8) + n;
 813             }
 814             byte[] rest = new byte[len];
 815             int n = is.readNBytes(rest, 0, len);
 816             if (n != len)
 817                 throw new IOException("Error reading frame");
 818             List<Http2Frame> frames = new ArrayList<>();
 819             FramesDecoder reader = new FramesDecoder(frames::add);
 820             reader.decode(ByteBuffer.wrap(buf));
 821             reader.decode(ByteBuffer.wrap(rest));
 822             if (frames.size()!=1)
 823                 throw new IOException("Expected 1 frame got "+frames.size()) ;
 824 
 825             return frames.get(0);
 826         } catch (IOException ee) {
 827             if (stopping)
 828                 return null;
 829             throw ee;
 830         }
 831     }
 832 
 833     void sendSettingsFrame() throws IOException {
 834         sendSettingsFrame(false);
 835     }
 836 
 837     void sendSettingsFrame(boolean now) throws IOException {
 838         if (now) {
 839             writeFrame(serverSettings);
 840         } else {
 841             outputQ.put(serverSettings);
 842         }
 843     }
 844 
 845     String readUntil(String end) throws IOException {
 846         int number = end.length();
 847         int found = 0;
 848         StringBuilder sb = new StringBuilder();
 849         while (found < number) {
 850             char expected = end.charAt(found);
 851             int c = is.read();
 852             if (c == -1) {
 853                 throw new IOException("Connection closed");
 854             }
 855             char c0 = (char) c;
 856             sb.append(c0);
 857             if (c0 != expected) {
 858                 found = 0;
 859                 continue;
 860             }
 861             found++;
 862         }
 863         return sb.toString();
 864     }
 865 
 866     private int getContentLength(String headers) {
 867         return getIntHeader(headers, "Content-length");
 868     }
 869 
 870     private int getIntHeader(String headers, String name) {
 871         String val = getHeader(headers, name);
 872         if (val == null) {
 873             return -1;
 874         }
 875         return Integer.parseInt(val);
 876     }
 877 
 878     private String getHeader(String headers, String name) {
 879         String headers1 = headers.toLowerCase(); // not efficient
 880         name = CRLF + name.toLowerCase();
 881         int start = headers1.indexOf(name);
 882         if (start == -1) {
 883             return null;
 884         }
 885         start += 2;
 886         int end = headers1.indexOf(CRLF, start);
 887         String line = headers.substring(start, end);
 888         start = line.indexOf(':');
 889         if (start == -1) {
 890             return null;
 891         }
 892         return line.substring(start + 1).trim();
 893     }
 894 
 895     final static String CRLF = "\r\n";
 896     final static String CRLFCRLF = "\r\n\r\n";
 897 
 898     String readHttp1Request() throws IOException {
 899         String headers = readUntil(CRLF + CRLF);
 900         int clen = getContentLength(headers);
 901         String te = getHeader(headers, "Transfer-encoding");
 902         byte[] buf = new byte[0];
 903         try {
 904             if (clen >= 0) {
 905                 // HTTP/1.1 fixed length content ( may be 0 ), read it
 906                 buf = new byte[clen];
 907                 is.readNBytes(buf, 0, clen);
 908             } else if ("chunked".equalsIgnoreCase(te)) {
 909                 //  HTTP/1.1 chunked data, read it
 910                 buf = readChunkedInputStream(is);
 911             }
 912             String body = new String(buf, StandardCharsets.US_ASCII);
 913             return headers + body;
 914         } catch (IOException e) {
 915             System.err.println("TestServer: headers read: [ " + headers + " ]");
 916             throw e;
 917         }
 918     }
 919 
 920     // This is a quick hack to get a chunked input stream reader.
 921     private static byte[] readChunkedInputStream(InputStream is) throws IOException {
 922         ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
 923         return cis.readAllBytes();
 924     }
 925 
 926     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
 927         StringBuilder sb = new StringBuilder();
 928         sb.append("HTTP/1.1 ")
 929                 .append(code)
 930                 .append(' ')
 931                 .append(msg)
 932                 .append(CRLF);
 933         int numheaders = headers.length;
 934         for (int i = 0; i < numheaders; i += 2) {
 935             sb.append(headers[i])
 936                     .append(": ")
 937                     .append(headers[i + 1])
 938                     .append(CRLF);
 939         }
 940         sb.append(CRLF);
 941         String s = sb.toString();
 942         os.write(s.getBytes("US-ASCII"));
 943         os.flush();
 944     }
 945 
 946     private void unexpectedFrame(Http2Frame frame) {
 947         System.err.println("OOPS. Unexpected");
 948         assert false;
 949     }
 950 
 951     final static ByteBuffer[] bbarray = new ByteBuffer[0];
 952 
 953     // wrapper around a BlockingQueue that throws an exception when it's closed
 954     // Each stream has one of these
 955 
 956     String getRequestBody(String request) {
 957         int bodystart = request.indexOf(CRLF+CRLF);
 958         String body;
 959         if (bodystart == -1)
 960             body = "";
 961         else
 962             body = request.substring(bodystart+4);
 963         return body;
 964     }
 965 
 966     @SuppressWarnings({"rawtypes","unchecked"})
 967     void addRequestBodyToQueue(String body, Queue q) throws IOException {
 968         ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
 969         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
 970         // only used for primordial stream
 971         q.put(df);
 972     }
 973 
 974     // window updates done in main reader thread because they may
 975     // be used to unblock BodyOutputStreams waiting for WUPs
 976 
 977     HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
 978 
 979     void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
 980         synchronized(updaters) {
 981             updaters.put(streamid, r);
 982         }
 983     }
 984 
 985     int sendWindow = 64 * 1024 - 1; // connection level send window
 986 
 987     /**
 988      * BodyOutputStreams call this to get the connection window first.
 989      *
 990      * @param amount
 991      */
 992     synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
 993         while (amount > 0) {
 994             int n = Math.min(amount, sendWindow);
 995             amount -= n;
 996             sendWindow -= n;
 997             if (amount > 0)
 998                 wait();
 999         }
1000     }
1001 
1002     synchronized void updateConnectionWindow(int amount) {
1003         sendWindow += amount;
1004         notifyAll();
1005     }
1006 
1007     // simplified output headers class. really just a type safe container
1008     // for the hashmap.
1009 
1010     static class ResponseHeaders extends Http2Frame {
1011         HttpHeadersImpl headers;
1012 
1013         ResponseHeaders(HttpHeadersImpl headers) {
1014             super(0, 0);
1015             this.headers = headers;
1016         }
1017 
1018     }
1019 
1020     static class NullInputStream extends InputStream {
1021         static final NullInputStream INSTANCE = new NullInputStream();
1022         private NullInputStream() {}
1023         public int read()      { return -1; }
1024         public int available() { return 0;  }
1025     }
1026 }