1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.BufferedInputStream;
  25 import java.io.BufferedOutputStream;
  26 import java.io.Closeable;
  27 import java.io.IOException;
  28 import java.io.UncheckedIOException;
  29 import java.io.InputStream;
  30 import java.io.OutputStream;
  31 import java.net.Socket;
  32 import java.net.URI;
  33 import java.net.InetAddress;
  34 import javax.net.ssl.*;
  35 import java.net.URISyntaxException;
  36 import java.nio.ByteBuffer;
  37 import java.nio.charset.StandardCharsets;
  38 import java.util.*;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.ExecutorService;
  41 import java.util.concurrent.ConcurrentLinkedQueue;
  42 import java.util.function.Consumer;
  43 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  44 import jdk.incubator.http.internal.frame.*;
  45 import jdk.incubator.http.internal.hpack.Decoder;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 import jdk.incubator.http.internal.hpack.Encoder;
  48 import sun.net.www.http.ChunkedInputStream;
  49 import sun.net.www.http.HttpClient;
  50 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
  51 
  52 /**
  53  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  54  * or HTTPS opened using "h2" ALPN.
  55  */
  56 public class Http2TestServerConnection {
  57     final Http2TestServer server;
  58     @SuppressWarnings({"rawtypes","unchecked"})
  59     final Map<Integer, Queue> streams; // input q per stream
  60     final Map<Integer, BodyOutputStream> outStreams; // output q per stream
  61     final HashSet<Integer> pushStreams;
  62     final Queue<Http2Frame> outputQ;
  63     volatile int nextstream;
  64     final Socket socket;
  65     final Http2TestExchangeSupplier exchangeSupplier;
  66     final InputStream is;
  67     final OutputStream os;
  68     volatile Encoder hpackOut;
  69     volatile Decoder hpackIn;
  70     volatile SettingsFrame clientSettings;
  71     final SettingsFrame serverSettings;
  72     final ExecutorService exec;
  73     final boolean secure;
  74     volatile boolean stopping;
  75     volatile int nextPushStreamId = 2;
  76     ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
  77 
  78     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
  79     final static byte[] EMPTY_BARRAY = new byte[0];
  80     final Random random;
  81 
  82     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
  83 
  84     static class Sentinel extends Http2Frame {
  85         Sentinel() { super(-1,-1);}
  86     }
  87 
  88     class PingRequest {
  89         final byte[] pingData;
  90         final long pingStamp;
  91         final CompletableFuture<Long> response;
  92 
  93         PingRequest() {
  94             pingData = new byte[8];
  95             random.nextBytes(pingData);
  96             pingStamp = System.currentTimeMillis();
  97             response = new CompletableFuture<>();
  98         }
  99 
 100         PingFrame frame() {
 101             return new PingFrame(0, pingData);
 102         }
 103 
 104         CompletableFuture<Long> response() {
 105             return response;
 106         }
 107 
 108         void success() {
 109             response.complete(System.currentTimeMillis() - pingStamp);
 110         }
 111 
 112         void fail(Throwable t) {
 113             response.completeExceptionally(t);
 114         }
 115     }
 116 
 117     static Sentinel sentinel;
 118 
 119     Http2TestServerConnection(Http2TestServer server,
 120                               Socket socket,
 121                               Http2TestExchangeSupplier exchangeSupplier)
 122         throws IOException
 123     {
 124         if (socket instanceof SSLSocket) {
 125             handshake(server.serverName(), (SSLSocket)socket);
 126         }
 127         System.err.println("TestServer: New connection from " + socket);
 128         this.server = server;
 129         this.exchangeSupplier = exchangeSupplier;
 130         this.streams = Collections.synchronizedMap(new HashMap<>());
 131         this.outStreams = Collections.synchronizedMap(new HashMap<>());
 132         this.outputQ = new Queue<>(sentinel);
 133         this.random = new Random();
 134         this.socket = socket;
 135         this.socket.setTcpNoDelay(true);
 136         this.serverSettings = SettingsFrame.getDefaultSettings();
 137         this.exec = server.exec;
 138         this.secure = server.secure;
 139         this.pushStreams = new HashSet<>();
 140         is = new BufferedInputStream(socket.getInputStream());
 141         os = new BufferedOutputStream(socket.getOutputStream());
 142     }
 143 
 144     /**
 145      * Sends a PING frame on this connection, and completes the returned
 146      * CF when the PING ack is received. The CF is given
 147      * an integer, whose value is the number of milliseconds
 148      * between PING and ACK.
 149      */
 150     CompletableFuture<Long> sendPing() {
 151         PingRequest ping = null;
 152         try {
 153             ping = new PingRequest();
 154             pings.add(ping);
 155             outputQ.put(ping.frame());
 156         } catch (Throwable t) {
 157             ping.fail(t);
 158         }
 159         return ping.response();
 160     }
 161 
 162     /**
 163      * Returns the first PingRequest from Queue
 164      */
 165     private PingRequest getNextRequest() {
 166         return pings.poll();
 167     }
 168 
 169     /**
 170      * Handles incoming Ping, which could be an ack
 171      * or a client originated Ping
 172      */
 173     void handlePing(PingFrame ping) throws IOException {
 174         if (ping.streamid() != 0) {
 175             System.err.println("Invalid ping received");
 176             close();
 177             return;
 178         }
 179         if (ping.getFlag(PingFrame.ACK)) {
 180             // did we send a Ping?
 181             PingRequest request = getNextRequest();
 182             if (request == null) {
 183                 System.err.println("Invalid ping ACK received");
 184                 close();
 185                 return;
 186             } else if (!Arrays.equals(request.pingData, ping.getData())) {
 187                 request.fail(new RuntimeException("Wrong ping data in ACK"));
 188             } else {
 189                 request.success();
 190             }
 191         } else {
 192             // client originated PING. Just send it back with ACK set
 193             ping.setFlag(PingFrame.ACK);
 194             outputQ.put(ping);
 195         }
 196     }
 197 
 198     private static boolean compareIPAddrs(InetAddress addr1, String host) {
 199         try {
 200             InetAddress addr2 = InetAddress.getByName(host);
 201             return addr1.equals(addr2);
 202         } catch (IOException e) {
 203             throw new UncheckedIOException(e);
 204         }
 205     }
 206 
 207     private static void handshake(String name, SSLSocket sock) throws IOException {
 208         if (name == null) {
 209             // no name set. No need to check
 210             return;
 211         } else if (name.equals("127.0.0.1")) {
 212             name = "localhost";
 213         }
 214         final String fname = name;
 215         final InetAddress addr1 = InetAddress.getByName(name);
 216         SSLParameters params = sock.getSSLParameters();
 217         SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {
 218             public boolean matches (SNIServerName n) {
 219                 String host = ((SNIHostName)n).getAsciiName();
 220                 if (host.equals("127.0.0.1"))
 221                     host = "localhost";
 222                 boolean cmp = host.equalsIgnoreCase(fname);
 223                 if (cmp)
 224                     return true;
 225                 return compareIPAddrs(addr1, host);
 226             }
 227         };
 228         List<SNIMatcher> list = List.of(matcher);
 229         params.setSNIMatchers(list);
 230         sock.setSSLParameters(params);
 231         sock.getSession(); // blocks until handshake done
 232     }
 233 
 234     void close() {
 235         stopping = true;
 236         streams.forEach((i, q) -> {
 237             q.close();
 238         });
 239         try {
 240             socket.close();
 241             // TODO: put a reset on each stream
 242         } catch (IOException e) {
 243         }
 244     }
 245 
 246     private void readPreface() throws IOException {
 247         int len = clientPreface.length;
 248         byte[] bytes = new byte[len];
 249         is.readNBytes(bytes, 0, len);
 250         if (Arrays.compare(clientPreface, bytes) != 0) {
 251             throw new IOException("Invalid preface: " + new String(bytes, 0, len));
 252         }
 253     }
 254 
 255     String doUpgrade() throws IOException {
 256         String upgrade = readHttp1Request();
 257         String h2c = getHeader(upgrade, "Upgrade");
 258         if (h2c == null || !h2c.equals("h2c")) {
 259             System.err.println("Server:HEADERS: " + upgrade);
 260             throw new IOException("Bad upgrade 1 " + h2c);
 261         }
 262 
 263         sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade",
 264                 "Upgrade", "h2c");
 265 
 266         sendSettingsFrame();
 267         readPreface();
 268 
 269         String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
 270         clientSettings = getSettingsFromString(clientSettingsString);
 271 
 272         return upgrade;
 273     }
 274 
 275     /**
 276      * Decodes the given, Client, settings payload provided in base64 HTTP1
 277      * header value.
 278      */
 279     private SettingsFrame getSettingsFromString(String s) throws IOException {
 280         Base64.Decoder decoder = Base64.getUrlDecoder();
 281         byte[] payload = decoder.decode(s);
 282         ByteBuffer bb1 = ByteBuffer.wrap(payload);
 283         // simulate header of Settings Frame
 284         ByteBuffer bb0 = ByteBuffer.wrap(
 285                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
 286         List<Http2Frame> frames = new ArrayList<>();
 287         FramesDecoder reader = new FramesDecoder(frames::add);
 288         reader.decode(bb0);
 289         reader.decode(bb1);
 290         if (frames.size()!=1)
 291             throw new IOException("Expected 1 frame got "+frames.size()) ;
 292         Http2Frame frame = frames.get(0);
 293         if (!(frame instanceof SettingsFrame))
 294             throw new IOException("Expected SettingsFrame");
 295         return (SettingsFrame)frame;
 296     }
 297 
 298     void run() throws Exception {
 299         String upgrade = null;
 300         if (!secure) {
 301             upgrade = doUpgrade();
 302         } else {
 303             readPreface();
 304             sendSettingsFrame(true);
 305             clientSettings = (SettingsFrame) readFrame();
 306             if (clientSettings.getFlag(SettingsFrame.ACK)) {
 307                 // we received the ack to our frame first
 308                 clientSettings = (SettingsFrame) readFrame();
 309             }
 310             nextstream = 1;
 311         }
 312 
 313         System.out.println("ServerSettings: " + serverSettings);
 314         System.out.println("ClientSettings: " + clientSettings);
 315 
 316         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 317         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 318 
 319         if (!secure) {
 320             createPrimordialStream(upgrade);
 321             nextstream = 3;
 322         }
 323 
 324         exec.submit(this::readLoop);
 325         exec.submit(this::writeLoop);
 326     }
 327 
 328     private void writeFrame(Http2Frame frame) throws IOException {
 329         List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
 330         //System.err.println("TestServer: Writing frame " + frame.toString());
 331         int c = 0;
 332         for (ByteBuffer buf : bufs) {
 333             byte[] ba = buf.array();
 334             int start = buf.arrayOffset() + buf.position();
 335             c += buf.remaining();
 336             os.write(ba, start, buf.remaining());
 337 
 338 //            System.out.println("writing byte at a time");
 339 //            while (buf.hasRemaining()) {
 340 //                byte b = buf.get();
 341 //                os.write(b);
 342 //                os.flush();
 343 //                try {
 344 //                    Thread.sleep(1);
 345 //                } catch(InterruptedException e) {
 346 //                    UncheckedIOException uie = new UncheckedIOException(new IOException(""));
 347 //                    uie.addSuppressed(e);
 348 //                    throw uie;
 349 //                }
 350 //            }
 351         }
 352         os.flush();
 353         //System.err.printf("TestServer: wrote %d bytes\n", c);
 354     }
 355 
 356     private void handleCommonFrame(Http2Frame f) throws IOException {
 357         if (f instanceof SettingsFrame) {
 358             SettingsFrame sf = (SettingsFrame) f;
 359             if (sf.getFlag(SettingsFrame.ACK)) // ignore
 360             {
 361                 return;
 362             }
 363             // otherwise acknowledge it
 364             clientSettings = sf;
 365             SettingsFrame frame = new SettingsFrame();
 366             frame.setFlag(SettingsFrame.ACK);
 367             frame.streamid(0);
 368             outputQ.put(frame);
 369             return;
 370         } else if (f instanceof GoAwayFrame) {
 371             System.err.println("Closing: "+ f.toString());
 372             close();
 373         } else if (f instanceof PingFrame) {
 374             handlePing((PingFrame)f);
 375         } else
 376             throw new UnsupportedOperationException("Not supported yet: " + f.toString());
 377     }
 378 
 379     void sendWindowUpdates(int len, int streamid) throws IOException {
 380         if (len == 0)
 381             return;
 382         WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
 383         outputQ.put(wup);
 384         wup = new WindowUpdateFrame(0 , len);
 385         outputQ.put(wup);
 386     }
 387 
 388     HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException {
 389         HttpHeadersImpl headers = new HttpHeadersImpl();
 390 
 391         DecodingCallback cb = (name, value) -> {
 392             headers.addHeader(name.toString(), value.toString());
 393         };
 394 
 395         for (HeaderFrame frame : frames) {
 396             List<ByteBuffer> buffers = frame.getHeaderBlock();
 397             for (ByteBuffer buffer : buffers) {
 398                 hpackIn.decode(buffer, false, cb);
 399             }
 400         }
 401         hpackIn.decode(EMPTY_BUFFER, true, cb);
 402         return headers;
 403     }
 404 
 405     String getRequestLine(String request) {
 406         int eol = request.indexOf(CRLF);
 407         return request.substring(0, eol);
 408     }
 409 
 410     String getHeaders(String request) {
 411         int start = request.indexOf(CRLF);
 412         int end = request.indexOf(CRLFCRLF);
 413         if (start == -1 || end == -1) {
 414             throw new RuntimeException("Malformed request");
 415         }
 416         return request.substring(start,end);
 417     }
 418 
 419     void addHeaders(String headers, HttpHeadersImpl hdrs) {
 420         String[] hh = headers.split(CRLF);
 421         for (String header : hh) {
 422             int colon = header.indexOf(':');
 423             if (colon == -1)
 424                 continue;
 425             String name = header.substring(0, colon);
 426             String value = header.substring(colon+1);
 427             while (value.startsWith(" "))
 428                 value = value.substring(1);
 429             hdrs.addHeader(name, value);
 430         }
 431     }
 432 
 433     // First stream (1) comes from a plaintext HTTP/1.1 request
 434     @SuppressWarnings({"rawtypes","unchecked"})
 435     void createPrimordialStream(String request) throws IOException {
 436         HttpHeadersImpl headers = new HttpHeadersImpl();
 437         String requestLine = getRequestLine(request);
 438         String[] tokens = requestLine.split(" ");
 439         if (!tokens[2].equals("HTTP/1.1")) {
 440             throw new IOException("bad request line");
 441         }
 442         URI uri = null;
 443         try {
 444             uri = new URI(tokens[1]);
 445         } catch (URISyntaxException e) {
 446             throw new IOException(e);
 447         }
 448         String host = getHeader(request, "Host");
 449         if (host == null) {
 450             throw new IOException("missing Host");
 451         }
 452 
 453         headers.setHeader(":method", tokens[0]);
 454         headers.setHeader(":scheme", "http"); // always in this case
 455         headers.setHeader(":authority", host);
 456         headers.setHeader(":path", uri.getPath());
 457         Queue q = new Queue(sentinel);
 458         String body = getRequestBody(request);
 459         addHeaders(getHeaders(request), headers);
 460         headers.setHeader("Content-length", Integer.toString(body.length()));
 461 
 462         addRequestBodyToQueue(body, q);
 463         streams.put(1, q);
 464         exec.submit(() -> {
 465             handleRequest(headers, q, 1, true /*complete request has been read*/);
 466         });
 467     }
 468 
 469     // all other streams created here
 470     @SuppressWarnings({"rawtypes","unchecked"})
 471     void createStream(HeaderFrame frame) throws IOException {
 472         List<HeaderFrame> frames = new LinkedList<>();
 473         frames.add(frame);
 474         int streamid = frame.streamid();
 475         if (streamid != nextstream) {
 476             throw new IOException("unexpected stream id");
 477         }
 478         nextstream += 2;
 479 
 480         boolean endStream = false;
 481         if (frame.getFlag(HeaderFrame.END_STREAM)) {
 482             endStream = true;
 483         }
 484 
 485         while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
 486             Http2Frame f = readFrame();
 487             if (!(f instanceof HeaderFrame)) {
 488                 handleCommonFrame(f); // should only be error frames
 489             } else {
 490                 frame = (HeaderFrame) f;
 491                 if (frame.getFlag(HeaderFrame.END_STREAM)) {
 492                     endStream = true;
 493                 }
 494                 frames.add(frame);
 495             }
 496         }
 497         boolean endStreamReceived = endStream;
 498         HttpHeadersImpl headers = decodeHeaders(frames);
 499         Queue q = new Queue(sentinel);
 500         streams.put(streamid, q);
 501         exec.submit(() -> {
 502             handleRequest(headers, q, streamid, endStreamReceived);
 503         });
 504     }
 505 
 506     // runs in own thread. Handles request from start to finish. Incoming frames
 507     // for this stream/request delivered on Q
 508 
 509     @SuppressWarnings({"rawtypes","unchecked"})
 510     void handleRequest(HttpHeadersImpl headers,
 511                        Queue queue,
 512                        int streamid,
 513                        boolean endStreamReceived)
 514     {
 515         String method = headers.firstValue(":method").orElse("");
 516         //System.out.println("method = " + method);
 517         String path = headers.firstValue(":path").orElse("");
 518         //System.out.println("path = " + path);
 519         String scheme = headers.firstValue(":scheme").orElse("");
 520         //System.out.println("scheme = " + scheme);
 521         String authority = headers.firstValue(":authority").orElse("");
 522         //System.out.println("authority = " + authority);
 523         System.err.printf("TestServer: %s %s\n", method, path);
 524         HttpHeadersImpl rspheaders = new HttpHeadersImpl();
 525         int winsize = clientSettings.getParameter(
 526                 SettingsFrame.INITIAL_WINDOW_SIZE);
 527         //System.err.println ("Stream window size = " + winsize);
 528 
 529         final InputStream bis;
 530         if (endStreamReceived && queue.size() == 0) {
 531             System.err.println("Server: got END_STREAM for stream " + streamid);
 532             bis = NullInputStream.INSTANCE;
 533         } else {
 534             System.err.println("Server: creating input stream for stream " + streamid);
 535             bis = new BodyInputStream(queue, streamid, this);
 536         }
 537         try (bis;
 538              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
 539         {
 540             outStreams.put(streamid, bos);
 541             String us = scheme + "://" + authority + path;
 542             URI uri = new URI(us);
 543             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
 544             Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
 545                     headers, rspheaders, uri, bis, getSSLSession(),
 546                     bos, this, pushAllowed);
 547 
 548             // give to user
 549             Http2Handler handler = server.getHandlerFor(uri.getPath());
 550             handler.handle(exchange);
 551 
 552             // everything happens in the exchange from here. Hopefully will
 553             // return though.
 554         } catch (Throwable e) {
 555             System.err.println("TestServer: handleRequest exception: " + e);
 556             e.printStackTrace();
 557         }
 558     }
 559 
 560     private SSLSession getSSLSession() {
 561         if (! (socket instanceof SSLSocket))
 562             return null;
 563         SSLSocket ssl = (SSLSocket)socket;
 564         return ssl.getSession();
 565     }
 566     // Runs in own thread
 567 
 568     @SuppressWarnings({"rawtypes","unchecked"})
 569     void readLoop() {
 570         try {
 571             while (!stopping) {
 572                 Http2Frame frame = readFrame();
 573                 //System.err.printf("TestServer: received frame %s\n", frame);
 574                 int stream = frame.streamid();
 575                 if (stream == 0) {
 576                     if (frame.type() == WindowUpdateFrame.TYPE) {
 577                         WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 578                         updateConnectionWindow(wup.getUpdate());
 579                     } else {
 580                         // other common frame types
 581                         handleCommonFrame(frame);
 582                     }
 583                 } else {
 584                     Queue q = streams.get(stream);
 585                     if (frame.type() == HeadersFrame.TYPE) {
 586                         if (q != null) {
 587                             System.err.println("HEADERS frame for existing stream! Error.");
 588                             // TODO: close connection
 589                             continue;
 590                         } else {
 591                             createStream((HeadersFrame) frame);
 592                         }
 593                     } else {
 594                         if (q == null && !pushStreams.contains(stream)) {
 595                             System.err.printf("Non Headers frame received with"+
 596                                     " non existing stream (%d) ", frame.streamid());
 597                             System.err.println(frame);
 598                             continue;
 599                         }
 600                         if (frame.type() == WindowUpdateFrame.TYPE) {
 601                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 602                             synchronized (updaters) {
 603                                 Consumer<Integer> r = updaters.get(stream);
 604                                 r.accept(wup.getUpdate());
 605                             }
 606                         } else if (frame.type() == ResetFrame.TYPE) {
 607                             // do orderly close on input q
 608                             // and close the output q immediately
 609                             // This should mean depending on what the
 610                             // handler is doing: either an EOF on read
 611                             // or an IOException if writing the response.
 612                             q.orderlyClose();
 613                             BodyOutputStream oq = outStreams.get(stream);
 614                             if (oq != null)
 615                                 oq.closeInternal();
 616 
 617                         } else {
 618                             q.put(frame);
 619                         }
 620                     }
 621                 }
 622             }
 623         } catch (Throwable e) {
 624             if (!stopping) {
 625                 System.err.println("Http server reader thread shutdown");
 626                 e.printStackTrace();
 627             }
 628             close();
 629         }
 630     }
 631 
 632     List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) {
 633         List<ByteBuffer> buffers = new LinkedList<>();
 634 
 635         ByteBuffer buf = getBuffer();
 636         boolean encoded;
 637         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
 638             List<String> values = entry.getValue();
 639             String key = entry.getKey().toLowerCase();
 640             for (String value : values) {
 641                 do {
 642                     hpackOut.header(key, value);
 643                     encoded = hpackOut.encode(buf);
 644                     if (!encoded) {
 645                         buf.flip();
 646                         buffers.add(buf);
 647                         buf = getBuffer();
 648                     }
 649                 } while (!encoded);
 650             }
 651         }
 652         buf.flip();
 653         buffers.add(buf);
 654         return buffers;
 655     }
 656 
 657     static void closeIgnore(Closeable c) {
 658         try {
 659             c.close();
 660         } catch (IOException e) {}
 661     }
 662 
 663     // Runs in own thread
 664     void writeLoop() {
 665         try {
 666             while (!stopping) {
 667                 Http2Frame frame;
 668                 try {
 669                     frame = outputQ.take();
 670                 } catch(IOException x) {
 671                     if (stopping && x.getCause() instanceof InterruptedException) {
 672                         break;
 673                     } else throw x;
 674                 }
 675                 if (frame instanceof ResponseHeaders) {
 676                     ResponseHeaders rh = (ResponseHeaders)frame;
 677                     HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
 678                     writeFrame(hf);
 679                 } else if (frame instanceof OutgoingPushPromise) {
 680                     handlePush((OutgoingPushPromise)frame);
 681                 } else
 682                     writeFrame(frame);
 683             }
 684             System.err.println("TestServer: Connection writer stopping");
 685         } catch (Throwable e) {
 686             e.printStackTrace();
 687             /*close();
 688             if (!stopping) {
 689                 e.printStackTrace();
 690                 System.err.println("TestServer: writeLoop exception: " + e);
 691             }*/
 692         }
 693     }
 694 
 695     private void handlePush(OutgoingPushPromise op) throws IOException {
 696         int promisedStreamid = nextPushStreamId;
 697         PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
 698                                                    HeaderFrame.END_HEADERS,
 699                                                    promisedStreamid,
 700                                                    encodeHeaders(op.headers),
 701                                                    0);
 702         pushStreams.add(promisedStreamid);
 703         nextPushStreamId += 2;
 704         pp.streamid(op.parentStream);
 705         writeFrame(pp);
 706         final InputStream ii = op.is;
 707         final BodyOutputStream oo = new BodyOutputStream(
 708                 promisedStreamid,
 709                 clientSettings.getParameter(
 710                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
 711         outStreams.put(promisedStreamid, oo);
 712         oo.goodToGo();
 713         exec.submit(() -> {
 714             try {
 715                 ResponseHeaders oh = getPushResponse(promisedStreamid);
 716                 outputQ.put(oh);
 717                 ii.transferTo(oo);
 718             } catch (Throwable ex) {
 719                 System.err.printf("TestServer: pushing response error: %s\n",
 720                         ex.toString());
 721             } finally {
 722                 closeIgnore(ii);
 723                 closeIgnore(oo);
 724             }
 725         });
 726 
 727     }
 728 
 729     // returns a minimal response with status 200
 730     // that is the response to the push promise just sent
 731     private ResponseHeaders getPushResponse(int streamid) {
 732         HttpHeadersImpl h = new HttpHeadersImpl();
 733         h.addHeader(":status", "200");
 734         ResponseHeaders oh = new ResponseHeaders(h);
 735         oh.streamid(streamid);
 736         oh.setFlag(HeaderFrame.END_HEADERS);
 737         return oh;
 738     }
 739 
 740     private ByteBuffer getBuffer() {
 741         return ByteBuffer.allocate(8 * 1024);
 742     }
 743 
 744     private Http2Frame readFrame() throws IOException {
 745         byte[] buf = new byte[9];
 746         if (is.readNBytes(buf, 0, 9) != 9)
 747             throw new IOException("readFrame: connection closed");
 748         int len = 0;
 749         for (int i = 0; i < 3; i++) {
 750             int n = buf[i] & 0xff;
 751             //System.err.println("n = " + n);
 752             len = (len << 8) + n;
 753         }
 754         byte[] rest = new byte[len];
 755         int n = is.readNBytes(rest, 0, len);
 756         if (n != len)
 757             throw new IOException("Error reading frame");
 758         List<Http2Frame> frames = new ArrayList<>();
 759         FramesDecoder reader = new FramesDecoder(frames::add);
 760         reader.decode(ByteBuffer.wrap(buf));
 761         reader.decode(ByteBuffer.wrap(rest));
 762         if (frames.size()!=1)
 763             throw new IOException("Expected 1 frame got "+frames.size()) ;
 764 
 765         return frames.get(0);
 766     }
 767 
 768     void sendSettingsFrame() throws IOException {
 769         sendSettingsFrame(false);
 770     }
 771 
 772     void sendSettingsFrame(boolean now) throws IOException {
 773         if (now) {
 774             writeFrame(serverSettings);
 775         } else {
 776             outputQ.put(serverSettings);
 777         }
 778     }
 779 
 780     String readUntil(String end) throws IOException {
 781         int number = end.length();
 782         int found = 0;
 783         StringBuilder sb = new StringBuilder();
 784         while (found < number) {
 785             char expected = end.charAt(found);
 786             int c = is.read();
 787             if (c == -1) {
 788                 throw new IOException("Connection closed");
 789             }
 790             char c0 = (char) c;
 791             sb.append(c0);
 792             if (c0 != expected) {
 793                 found = 0;
 794                 continue;
 795             }
 796             found++;
 797         }
 798         return sb.toString();
 799     }
 800 
 801     private int getContentLength(String headers) {
 802         return getIntHeader(headers, "Content-length");
 803     }
 804 
 805     private int getIntHeader(String headers, String name) {
 806         String val = getHeader(headers, name);
 807         if (val == null) {
 808             return -1;
 809         }
 810         return Integer.parseInt(val);
 811     }
 812 
 813     private String getHeader(String headers, String name) {
 814         String headers1 = headers.toLowerCase(); // not efficient
 815         name = CRLF + name.toLowerCase();
 816         int start = headers1.indexOf(name);
 817         if (start == -1) {
 818             return null;
 819         }
 820         start += 2;
 821         int end = headers1.indexOf(CRLF, start);
 822         String line = headers.substring(start, end);
 823         start = line.indexOf(':');
 824         if (start == -1) {
 825             return null;
 826         }
 827         return line.substring(start + 1).trim();
 828     }
 829 
 830     final static String CRLF = "\r\n";
 831     final static String CRLFCRLF = "\r\n\r\n";
 832 
 833     String readHttp1Request() throws IOException {
 834         String headers = readUntil(CRLF + CRLF);
 835         int clen = getContentLength(headers);
 836         byte[] buf;
 837         if (clen >= 0) {
 838             // HTTP/1.1 fixed length content ( may be 0 ), read it
 839             buf = new byte[clen];
 840             is.readNBytes(buf, 0, clen);
 841         } else {
 842             //  HTTP/1.1 chunked data, read it
 843             buf = readChunkedInputStream(is);
 844         }
 845         String body = new String(buf, StandardCharsets.US_ASCII);
 846         return headers + body;
 847     }
 848 
 849     // This is a quick hack to get a chunked input stream reader.
 850     private static byte[] readChunkedInputStream(InputStream is) throws IOException {
 851         ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
 852         return cis.readAllBytes();
 853     }
 854 
 855     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
 856         StringBuilder sb = new StringBuilder();
 857         sb.append("HTTP/1.1 ")
 858                 .append(code)
 859                 .append(' ')
 860                 .append(msg)
 861                 .append(CRLF);
 862         int numheaders = headers.length;
 863         for (int i = 0; i < numheaders; i += 2) {
 864             sb.append(headers[i])
 865                     .append(": ")
 866                     .append(headers[i + 1])
 867                     .append(CRLF);
 868         }
 869         sb.append(CRLF);
 870         String s = sb.toString();
 871         os.write(s.getBytes("US-ASCII"));
 872         os.flush();
 873     }
 874 
 875     private void unexpectedFrame(Http2Frame frame) {
 876         System.err.println("OOPS. Unexpected");
 877         assert false;
 878     }
 879 
 880     final static ByteBuffer[] bbarray = new ByteBuffer[0];
 881 
 882     // wrapper around a BlockingQueue that throws an exception when it's closed
 883     // Each stream has one of these
 884 
 885     String getRequestBody(String request) {
 886         int bodystart = request.indexOf(CRLF+CRLF);
 887         String body;
 888         if (bodystart == -1)
 889             body = "";
 890         else
 891             body = request.substring(bodystart+4);
 892         return body;
 893     }
 894 
 895     @SuppressWarnings({"rawtypes","unchecked"})
 896     void addRequestBodyToQueue(String body, Queue q) throws IOException {
 897         ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
 898         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
 899         // only used for primordial stream
 900         q.put(df);
 901     }
 902 
 903     // window updates done in main reader thread because they may
 904     // be used to unblock BodyOutputStreams waiting for WUPs
 905 
 906     HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
 907 
 908     void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
 909         synchronized(updaters) {
 910             updaters.put(streamid, r);
 911         }
 912     }
 913 
 914     int sendWindow = 64 * 1024 - 1; // connection level send window
 915 
 916     /**
 917      * BodyOutputStreams call this to get the connection window first.
 918      *
 919      * @param amount
 920      */
 921     synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
 922         while (amount > 0) {
 923             int n = Math.min(amount, sendWindow);
 924             amount -= n;
 925             sendWindow -= n;
 926             if (amount > 0)
 927                 wait();
 928         }
 929     }
 930 
 931     synchronized void updateConnectionWindow(int amount) {
 932         sendWindow += amount;
 933         notifyAll();
 934     }
 935 
 936     // simplified output headers class. really just a type safe container
 937     // for the hashmap.
 938 
 939     static class ResponseHeaders extends Http2Frame {
 940         HttpHeadersImpl headers;
 941 
 942         ResponseHeaders(HttpHeadersImpl headers) {
 943             super(0, 0);
 944             this.headers = headers;
 945         }
 946 
 947     }
 948 
 949     static class NullInputStream extends InputStream {
 950         static final NullInputStream INSTANCE = new NullInputStream();
 951         private NullInputStream() {}
 952         public int read()      { return -1; }
 953         public int available() { return 0;  }
 954     }
 955 }