1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.BufferedInputStream;
  25 import java.io.BufferedOutputStream;
  26 import java.io.Closeable;
  27 import java.io.IOException;
  28 import java.io.UncheckedIOException;
  29 import java.io.InputStream;
  30 import java.io.OutputStream;
  31 import java.net.Socket;
  32 import java.net.URI;
  33 import java.net.InetAddress;
  34 import javax.net.ssl.*;
  35 import java.net.URISyntaxException;
  36 import java.nio.ByteBuffer;
  37 import java.nio.charset.StandardCharsets;
  38 import java.util.*;
  39 import java.util.concurrent.ExecutorService;
  40 import java.util.function.Consumer;
  41 
  42 import jdk.incubator.http.internal.common.ByteBufferReference;
  43 import jdk.incubator.http.internal.frame.FramesDecoder;
  44 
  45 import jdk.incubator.http.internal.common.BufferHandler;
  46 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  47 import jdk.incubator.http.internal.common.Queue;
  48 import jdk.incubator.http.internal.frame.*;
  49 import jdk.incubator.http.internal.hpack.Decoder;
  50 import jdk.incubator.http.internal.hpack.DecodingCallback;
  51 import jdk.incubator.http.internal.hpack.Encoder;
  52 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
  53 
  54 /**
  55  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  56  * or HTTPS opened using "h2" ALPN.
  57  */
  58 public class Http2TestServerConnection {
  59     final Http2TestServer server;
  60     @SuppressWarnings({"rawtypes","unchecked"})
  61     final Map<Integer, Queue> streams; // input q per stream
  62     final HashSet<Integer> pushStreams;
  63     final Queue<Http2Frame> outputQ;
  64     volatile int nextstream;
  65     final Socket socket;
  66     final InputStream is;
  67     final OutputStream os;
  68     volatile Encoder hpackOut;
  69     volatile Decoder hpackIn;
  70     volatile SettingsFrame clientSettings;
  71     final SettingsFrame serverSettings;
  72     final ExecutorService exec;
  73     final boolean secure;
  74     volatile boolean stopping;
  75     volatile int nextPushStreamId = 2;
  76 
  77     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
  78     final static byte[] EMPTY_BARRAY = new byte[0];
  79 
  80     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
  81 
  82     Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException {
  83         if (socket instanceof SSLSocket) {
  84             handshake(server.serverName(), (SSLSocket)socket);
  85         }
  86         System.err.println("TestServer: New connection from " + socket);
  87         this.server = server;
  88         this.streams = Collections.synchronizedMap(new HashMap<>());
  89         this.outputQ = new Queue<>();
  90         this.socket = socket;
  91         this.serverSettings = SettingsFrame.getDefaultSettings();
  92         this.exec = server.exec;
  93         this.secure = server.secure;
  94         this.pushStreams = new HashSet<>();
  95         is = new BufferedInputStream(socket.getInputStream());
  96         os = new BufferedOutputStream(socket.getOutputStream());
  97     }
  98 
  99     private static boolean compareIPAddrs(InetAddress addr1, String host) {
 100         try {
 101             InetAddress addr2 = InetAddress.getByName(host);
 102             return addr1.equals(addr2);
 103         } catch (IOException e) {
 104             throw new UncheckedIOException(e);
 105         }
 106     }
 107 
 108     private static void handshake(String name, SSLSocket sock) throws IOException {
 109         if (name == null) {
 110             // no name set. No need to check
 111             return;
 112         } else if (name.equals("127.0.0.1")) {
 113             name = "localhost";
 114         }
 115         final String fname = name;
 116         final InetAddress addr1 = InetAddress.getByName(name);
 117         SSLParameters params = sock.getSSLParameters();
 118         SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {
 119             public boolean matches (SNIServerName n) {
 120                 String host = ((SNIHostName)n).getAsciiName();
 121                 if (host.equals("127.0.0.1"))
 122                     host = "localhost";
 123                 boolean cmp = host.equalsIgnoreCase(fname);
 124                 if (cmp)
 125                     return true;
 126                 return compareIPAddrs(addr1, host);
 127             }
 128         };
 129         List<SNIMatcher> list = List.of(matcher);
 130         params.setSNIMatchers(list);
 131         sock.setSSLParameters(params);
 132         sock.getSession(); // blocks until handshake done
 133     }
 134 
 135     void close() {
 136         stopping = true;
 137         streams.forEach((i, q) -> {
 138             q.close();
 139         });
 140         try {
 141             socket.close();
 142             // TODO: put a reset on each stream
 143         } catch (IOException e) {
 144         }
 145     }
 146 
 147     private void readPreface() throws IOException {
 148         int len = clientPreface.length;
 149         byte[] bytes = new byte[len];
 150         is.readNBytes(bytes, 0, len);
 151         if (Arrays.compare(clientPreface, bytes) != 0) {
 152             throw new IOException("Invalid preface: " + new String(bytes, 0, len));
 153         }
 154     }
 155 
 156     String doUpgrade() throws IOException {
 157         String upgrade = readHttp1Request();
 158         String h2c = getHeader(upgrade, "Upgrade");
 159         if (h2c == null || !h2c.equals("h2c")) {
 160             System.err.println("Server:HEADERS: " + upgrade);
 161             throw new IOException("Bad upgrade 1 " + h2c);
 162         }
 163 
 164         sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade",
 165                 "Upgrade", "h2c");
 166 
 167         sendSettingsFrame();
 168         readPreface();
 169 
 170         String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
 171         clientSettings = getSettingsFromString(clientSettingsString);
 172 
 173         return upgrade;
 174     }
 175 
 176     /**
 177      * Decodes the given, Client, settings payload provided in base64 HTTP1
 178      * header value.
 179      */
 180     private SettingsFrame getSettingsFromString(String s) throws IOException {
 181         Base64.Decoder decoder = Base64.getUrlDecoder();
 182         byte[] payload = decoder.decode(s);
 183         ByteBuffer bb1 = ByteBuffer.wrap(payload);
 184         // simulate header of Settings Frame
 185         ByteBuffer bb0 = ByteBuffer.wrap(
 186                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
 187         List<Http2Frame> frames = new ArrayList<>();
 188         FramesDecoder reader = new FramesDecoder(frames::add);
 189         reader.decode(ByteBufferReference.of(bb0));
 190         reader.decode(ByteBufferReference.of(bb1));
 191         if (frames.size()!=1)
 192             throw new IOException("Expected 1 frame got "+frames.size()) ;
 193         Http2Frame frame = frames.get(0);
 194         if (!(frame instanceof SettingsFrame))
 195             throw new IOException("Expected SettingsFrame");
 196         return (SettingsFrame)frame;
 197     }
 198 
 199     void run() throws Exception {
 200         String upgrade = null;
 201         if (!secure) {
 202             upgrade = doUpgrade();
 203         } else {
 204             readPreface();
 205             sendSettingsFrame(true);
 206             clientSettings = (SettingsFrame) readFrame();
 207             if (clientSettings.getFlag(SettingsFrame.ACK)) {
 208                 // we received the ack to our frame first
 209                 clientSettings = (SettingsFrame) readFrame();
 210             }
 211             nextstream = 1;
 212         }
 213 
 214         System.out.println("ServerSettings: " + serverSettings);
 215         System.out.println("ClientSettings: " + clientSettings);
 216 
 217         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 218         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 219 
 220         if (!secure) {
 221             createPrimordialStream(upgrade);
 222             nextstream = 3;
 223         }
 224 
 225         exec.submit(this::readLoop);
 226         exec.submit(this::writeLoop);
 227     }
 228 
 229     static class BufferPool implements BufferHandler {
 230 
 231         public void setMinBufferSize(int size) {
 232         }
 233 
 234         @Override
 235         public ByteBuffer getBuffer() {
 236             int size = 32 * 1024;
 237             return ByteBuffer.allocate(size);
 238         }
 239 
 240         @Override
 241         public void returnBuffer(ByteBuffer buffer) {
 242         }
 243     }
 244 
 245     private void writeFrame(Http2Frame frame) throws IOException {
 246         ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame);
 247         //System.err.println("TestServer: Writing frame " + frame.toString());
 248         int c = 0;
 249         for (ByteBufferReference ref : refs) {
 250             ByteBuffer buf = ref.get();
 251             byte[] ba = buf.array();
 252             int start = buf.arrayOffset() + buf.position();
 253             c += buf.remaining();
 254             os.write(ba, start, buf.remaining());
 255         }
 256         os.flush();
 257         //System.err.printf("TestServer: wrote %d bytes\n", c);
 258     }
 259 
 260     void handleStreamReset(ResetFrame resetFrame) throws IOException {
 261         // TODO: cleanup
 262         throw new IOException("Stream reset");
 263     }
 264 
 265     private void handleCommonFrame(Http2Frame f) throws IOException {
 266         if (f instanceof SettingsFrame) {
 267             SettingsFrame sf = (SettingsFrame) f;
 268             if (sf.getFlag(SettingsFrame.ACK)) // ignore
 269             {
 270                 return;
 271             }
 272             // otherwise acknowledge it
 273             clientSettings = sf;
 274             SettingsFrame frame = new SettingsFrame();
 275             frame.setFlag(SettingsFrame.ACK);
 276             frame.streamid(0);
 277             outputQ.put(frame);
 278             return;
 279         }
 280         //System.err.println("TestServer: Received ---> " + f.toString());
 281         throw new UnsupportedOperationException("Not supported yet.");
 282     }
 283 
 284     void sendWindowUpdates(int len, int streamid) throws IOException {
 285         if (len == 0)
 286             return;
 287         WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
 288         outputQ.put(wup);
 289         wup = new WindowUpdateFrame(0 , len);
 290         outputQ.put(wup);
 291     }
 292 
 293     HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) {
 294         HttpHeadersImpl headers = new HttpHeadersImpl();
 295 
 296         DecodingCallback cb = (name, value) -> {
 297             headers.addHeader(name.toString(), value.toString());
 298         };
 299 
 300         for (HeaderFrame frame : frames) {
 301             ByteBufferReference[] buffers = frame.getHeaderBlock();
 302             for (ByteBufferReference buffer : buffers) {
 303                 hpackIn.decode(buffer.get(), false, cb);
 304             }
 305         }
 306         hpackIn.decode(EMPTY_BUFFER, true, cb);
 307         return headers;
 308     }
 309 
 310     String getRequestLine(String request) {
 311         int eol = request.indexOf(CRLF);
 312         return request.substring(0, eol);
 313     }
 314 
 315     String getHeaders(String request) {
 316         int start = request.indexOf(CRLF);
 317         int end = request.indexOf(CRLFCRLF);
 318         if (start == -1 || end == -1) {
 319             throw new RuntimeException("Malformed request");
 320         }
 321         return request.substring(start,end);
 322     }
 323 
 324     void addHeaders(String headers, HttpHeadersImpl hdrs) {
 325         String[] hh = headers.split(CRLF);
 326         for (String header : hh) {
 327             int colon = header.indexOf(':');
 328             if (colon == -1)
 329                 continue;
 330             String name = header.substring(0, colon);
 331             String value = header.substring(colon+1);
 332             while (value.startsWith(" "))
 333                 value = value.substring(1);
 334             hdrs.addHeader(name, value);
 335         }
 336     }
 337 
 338     // First stream (1) comes from a plaintext HTTP/1.1 request
 339     @SuppressWarnings({"rawtypes","unchecked"})
 340     void createPrimordialStream(String request) throws IOException {
 341         HttpHeadersImpl headers = new HttpHeadersImpl();
 342         String requestLine = getRequestLine(request);
 343         String[] tokens = requestLine.split(" ");
 344         if (!tokens[2].equals("HTTP/1.1")) {
 345             throw new IOException("bad request line");
 346         }
 347         URI uri = null;
 348         try {
 349             uri = new URI(tokens[1]);
 350         } catch (URISyntaxException e) {
 351             throw new IOException(e);
 352         }
 353         String host = getHeader(request, "Host");
 354         if (host == null) {
 355             throw new IOException("missing Host");
 356         }
 357 
 358         headers.setHeader(":method", tokens[0]);
 359         headers.setHeader(":scheme", "http"); // always in this case
 360         headers.setHeader(":authority", host);
 361         headers.setHeader(":path", uri.getPath());
 362         Queue q = new Queue();
 363         String body = getRequestBody(request);
 364         addHeaders(getHeaders(request), headers);
 365         headers.setHeader("Content-length", Integer.toString(body.length()));
 366 
 367         addRequestBodyToQueue(body, q);
 368         streams.put(1, q);
 369         exec.submit(() -> {
 370             handleRequest(headers, q, 1, true /*complete request has been read*/);
 371         });
 372     }
 373 
 374     // all other streams created here
 375     @SuppressWarnings({"rawtypes","unchecked"})
 376     void createStream(HeaderFrame frame) throws IOException {
 377         List<HeaderFrame> frames = new LinkedList<>();
 378         frames.add(frame);
 379         int streamid = frame.streamid();
 380         if (streamid != nextstream) {
 381             throw new IOException("unexpected stream id");
 382         }
 383         nextstream += 2;
 384 
 385         boolean endStream = false;
 386         if (frame.getFlag(HeaderFrame.END_STREAM)) {
 387             endStream = true;
 388         }
 389 
 390         while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
 391             Http2Frame f = readFrame();
 392             if (!(f instanceof HeaderFrame)) {
 393                 handleCommonFrame(f); // should only be error frames
 394             } else {
 395                 frame = (HeaderFrame) f;
 396                 if (frame.getFlag(HeaderFrame.END_STREAM)) {
 397                     endStream = true;
 398                 }
 399                 frames.add(frame);
 400             }
 401         }
 402         boolean endStreamReceived = endStream;
 403         HttpHeadersImpl headers = decodeHeaders(frames);
 404         Queue q = new Queue();
 405         streams.put(streamid, q);
 406         exec.submit(() -> {
 407             handleRequest(headers, q, streamid, endStreamReceived);
 408         });
 409     }
 410 
 411     // runs in own thread. Handles request from start to finish. Incoming frames
 412     // for this stream/request delivered on Q
 413 
 414     @SuppressWarnings({"rawtypes","unchecked"})
 415     void handleRequest(HttpHeadersImpl headers,
 416                        Queue queue,
 417                        int streamid,
 418                        boolean endStreamReceived)
 419     {
 420         String method = headers.firstValue(":method").orElse("");
 421         //System.out.println("method = " + method);
 422         String path = headers.firstValue(":path").orElse("");
 423         //System.out.println("path = " + path);
 424         String scheme = headers.firstValue(":scheme").orElse("");
 425         //System.out.println("scheme = " + scheme);
 426         String authority = headers.firstValue(":authority").orElse("");
 427         //System.out.println("authority = " + authority);
 428         System.err.printf("TestServer: %s %s\n", method, path);
 429         HttpHeadersImpl rspheaders = new HttpHeadersImpl();
 430         int winsize = clientSettings.getParameter(
 431                         SettingsFrame.INITIAL_WINDOW_SIZE);
 432         //System.err.println ("Stream window size = " + winsize);
 433 
 434         final InputStream bis;
 435         if (endStreamReceived && queue.size() == 0) {
 436             System.err.println("Server: got END_STREAM for stream " + streamid);
 437             bis = NullInputStream.INSTANCE;
 438         } else {
 439             System.err.println("Server: creating input stream for stream " + streamid);
 440             bis = new BodyInputStream(queue, streamid, this);
 441         }
 442         try (bis;
 443              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
 444         {
 445             String us = scheme + "://" + authority + path;
 446             URI uri = new URI(us);
 447             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
 448             Http2TestExchange exchange = new Http2TestExchange(streamid, method,
 449                     headers, rspheaders, uri, bis, getSSLSession(),
 450                     bos, this, pushAllowed);
 451 
 452             // give to user
 453             Http2Handler handler = server.getHandlerFor(uri.getPath());
 454             handler.handle(exchange);
 455 
 456             // everything happens in the exchange from here. Hopefully will
 457             // return though.
 458         } catch (Throwable e) {
 459             System.err.println("TestServer: handleRequest exception: " + e);
 460             e.printStackTrace();
 461         }
 462     }
 463 
 464     private SSLSession getSSLSession() {
 465         if (! (socket instanceof SSLSocket))
 466             return null;
 467         SSLSocket ssl = (SSLSocket)socket;
 468         return ssl.getSession();
 469     }
 470     // Runs in own thread
 471 
 472     @SuppressWarnings({"rawtypes","unchecked"})
 473     void readLoop() {
 474         try {
 475             while (!stopping) {
 476                 Http2Frame frame = readFrame();
 477                 //System.err.printf("TestServer: received frame %s\n", frame);
 478                 int stream = frame.streamid();
 479                 if (stream == 0) {
 480                     if (frame.type() == WindowUpdateFrame.TYPE) {
 481                         WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 482                         updateConnectionWindow(wup.getUpdate());
 483                     } else {
 484                         // other common frame types
 485                         handleCommonFrame(frame);
 486                     }
 487                 } else {
 488                     Queue q = streams.get(stream);
 489                     if (frame.type() == HeadersFrame.TYPE) {
 490                         if (q != null) {
 491                             System.err.println("HEADERS frame for existing stream! Error.");
 492                             // TODO: close connection
 493                             continue;
 494                         } else {
 495                             createStream((HeadersFrame) frame);
 496                         }
 497                     } else {
 498                         if (q == null && !pushStreams.contains(stream)) {
 499                             System.err.printf("Non Headers frame received with"+
 500                                 " non existing stream (%d) ", frame.streamid());
 501                             System.err.println(frame);
 502                             continue;
 503                         }
 504                         if (frame.type() == WindowUpdateFrame.TYPE) {
 505                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 506                             synchronized (updaters) {
 507                                 Consumer<Integer> r = updaters.get(stream);
 508                                 r.accept(wup.getUpdate());
 509                             }
 510                         } else {
 511                             q.put(frame);
 512                         }
 513                     }
 514                 }
 515             }
 516         } catch (Throwable e) {
 517             if (!stopping) {
 518                 System.err.println("Http server reader thread shutdown");
 519                 e.printStackTrace();
 520             }
 521             close();
 522         }
 523     }
 524 
 525     ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) {
 526         List<ByteBuffer> buffers = new LinkedList<>();
 527 
 528         ByteBuffer buf = getBuffer();
 529         boolean encoded;
 530         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
 531             List<String> values = entry.getValue();
 532             String key = entry.getKey().toLowerCase();
 533             for (String value : values) {
 534                 do {
 535                     hpackOut.header(key, value);
 536                     encoded = hpackOut.encode(buf);
 537                     if (!encoded) {
 538                         buf.flip();
 539                         buffers.add(buf);
 540                         buf = getBuffer();
 541                     }
 542                 } while (!encoded);
 543             }
 544         }
 545         buf.flip();
 546         buffers.add(buf);
 547         return ByteBufferReference.toReferences(buffers.toArray(bbarray));
 548     }
 549 
 550     static void closeIgnore(Closeable c) {
 551         try {
 552             c.close();
 553         } catch (IOException e) {}
 554     }
 555 
 556     // Runs in own thread
 557     void writeLoop() {
 558         try {
 559             while (!stopping) {
 560                 Http2Frame frame;
 561                 try {
 562                     frame = outputQ.take();
 563                 } catch(IOException x) {
 564                     if (stopping && x.getCause() instanceof InterruptedException) {
 565                         break;
 566                     } else throw x;
 567                 }
 568                 if (frame instanceof ResponseHeaders) {
 569                     ResponseHeaders rh = (ResponseHeaders)frame;
 570                     HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
 571                     writeFrame(hf);
 572                 } else if (frame instanceof OutgoingPushPromise) {
 573                     handlePush((OutgoingPushPromise)frame);
 574                 } else
 575                     writeFrame(frame);
 576             }
 577             System.err.println("TestServer: Connection writer stopping");
 578         } catch (Throwable e) {
 579             e.printStackTrace();
 580             /*close();
 581             if (!stopping) {
 582                 e.printStackTrace();
 583                 System.err.println("TestServer: writeLoop exception: " + e);
 584             }*/
 585         }
 586     }
 587 
 588     private void handlePush(OutgoingPushPromise op) throws IOException {
 589         int promisedStreamid = nextPushStreamId;
 590         PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0);
 591         pushStreams.add(promisedStreamid);
 592         nextPushStreamId += 2;
 593         pp.streamid(op.parentStream);
 594         writeFrame(pp);
 595         final InputStream ii = op.is;
 596         final BodyOutputStream oo = new BodyOutputStream(
 597                 promisedStreamid,
 598                 clientSettings.getParameter(
 599                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
 600         oo.goodToGo();
 601         exec.submit(() -> {
 602             try {
 603                 ResponseHeaders oh = getPushResponse(promisedStreamid);
 604                 outputQ.put(oh);
 605                 ii.transferTo(oo);
 606             } catch (Throwable ex) {
 607                 System.err.printf("TestServer: pushing response error: %s\n",
 608                         ex.toString());
 609             } finally {
 610                 closeIgnore(ii);
 611                 closeIgnore(oo);
 612             }
 613         });
 614 
 615     }
 616 
 617     // returns a minimal response with status 200
 618     // that is the response to the push promise just sent
 619     private ResponseHeaders getPushResponse(int streamid) {
 620         HttpHeadersImpl h = new HttpHeadersImpl();
 621         h.addHeader(":status", "200");
 622         ResponseHeaders oh = new ResponseHeaders(h);
 623         oh.streamid(streamid);
 624         oh.setFlag(HeaderFrame.END_HEADERS);
 625         return oh;
 626     }
 627 
 628     private ByteBuffer getBuffer() {
 629         return ByteBuffer.allocate(8 * 1024);
 630     }
 631 
 632     private Http2Frame readFrame() throws IOException {
 633         byte[] buf = new byte[9];
 634         if (is.readNBytes(buf, 0, 9) != 9)
 635             throw new IOException("readFrame: connection closed");
 636         int len = 0;
 637         for (int i = 0; i < 3; i++) {
 638             int n = buf[i] & 0xff;
 639             //System.err.println("n = " + n);
 640             len = (len << 8) + n;
 641         }
 642         byte[] rest = new byte[len];
 643         int n = is.readNBytes(rest, 0, len);
 644         if (n != len)
 645             throw new IOException("Error reading frame");
 646         List<Http2Frame> frames = new ArrayList<>();
 647         FramesDecoder reader = new FramesDecoder(frames::add);
 648         reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf)));
 649         reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest)));
 650         if (frames.size()!=1)
 651             throw new IOException("Expected 1 frame got "+frames.size()) ;
 652 
 653         return frames.get(0);
 654     }
 655 
 656     void sendSettingsFrame() throws IOException {
 657         sendSettingsFrame(false);
 658     }
 659 
 660     void sendSettingsFrame(boolean now) throws IOException {
 661         if (now) {
 662             writeFrame(serverSettings);
 663         } else {
 664             outputQ.put(serverSettings);
 665         }
 666     }
 667 
 668     String readUntil(String end) throws IOException {
 669         int number = end.length();
 670         int found = 0;
 671         StringBuilder sb = new StringBuilder();
 672         while (found < number) {
 673             char expected = end.charAt(found);
 674             int c = is.read();
 675             if (c == -1) {
 676                 throw new IOException("Connection closed");
 677             }
 678             char c0 = (char) c;
 679             sb.append(c0);
 680             if (c0 != expected) {
 681                 found = 0;
 682                 continue;
 683             }
 684             found++;
 685         }
 686         return sb.toString();
 687     }
 688 
 689     private int getContentLength(String headers) {
 690         return getIntHeader(headers, "Content-length");
 691     }
 692 
 693     private int getIntHeader(String headers, String name) {
 694         String val = getHeader(headers, name);
 695         if (val == null) {
 696             return -1;
 697         }
 698         return Integer.parseInt(val);
 699     }
 700 
 701     private String getHeader(String headers, String name) {
 702         String headers1 = headers.toLowerCase(); // not efficient
 703         name = CRLF + name.toLowerCase();
 704         int start = headers1.indexOf(name);
 705         if (start == -1) {
 706             return null;
 707         }
 708         start += 2;
 709         int end = headers1.indexOf(CRLF, start);
 710         String line = headers.substring(start, end);
 711         start = line.indexOf(':');
 712         if (start == -1) {
 713             return null;
 714         }
 715         return line.substring(start + 1).trim();
 716     }
 717 
 718     final static String CRLF = "\r\n";
 719     final static String CRLFCRLF = "\r\n\r\n";
 720 
 721     String readHttp1Request() throws IOException {
 722         String headers = readUntil(CRLF + CRLF);
 723         int clen = getContentLength(headers);
 724         // read the content.
 725         byte[] buf = new byte[clen];
 726         is.readNBytes(buf, 0, clen);
 727         String body = new String(buf, StandardCharsets.US_ASCII);
 728         return headers + body;
 729     }
 730 
 731     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
 732         StringBuilder sb = new StringBuilder();
 733         sb.append("HTTP/1.1 ")
 734                 .append(code)
 735                 .append(' ')
 736                 .append(msg)
 737                 .append(CRLF);
 738         int numheaders = headers.length;
 739         for (int i = 0; i < numheaders; i += 2) {
 740             sb.append(headers[i])
 741                     .append(": ")
 742                     .append(headers[i + 1])
 743                     .append(CRLF);
 744         }
 745         sb.append(CRLF);
 746         String s = sb.toString();
 747         os.write(s.getBytes("US-ASCII"));
 748         os.flush();
 749     }
 750 
 751     private void unexpectedFrame(Http2Frame frame) {
 752         System.err.println("OOPS. Unexpected");
 753         assert false;
 754     }
 755 
 756     final static ByteBuffer[] bbarray = new ByteBuffer[0];
 757 
 758     // wrapper around a BlockingQueue that throws an exception when it's closed
 759     // Each stream has one of these
 760 
 761     String getRequestBody(String request) {
 762         int bodystart = request.indexOf(CRLF+CRLF);
 763         String body;
 764         if (bodystart == -1)
 765             body = "";
 766         else
 767             body = request.substring(bodystart+4);
 768         return body;
 769     }
 770 
 771     @SuppressWarnings({"rawtypes","unchecked"})
 772     void addRequestBodyToQueue(String body, Queue q) throws IOException {
 773         ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
 774         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf));
 775         // only used for primordial stream
 776         q.put(df);
 777     }
 778 
 779     // window updates done in main reader thread because they may
 780     // be used to unblock BodyOutputStreams waiting for WUPs
 781 
 782     HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
 783 
 784     void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
 785         synchronized(updaters) {
 786             updaters.put(streamid, r);
 787         }
 788     }
 789 
 790     int sendWindow = 64 * 1024 - 1; // connection level send window
 791 
 792     /**
 793      * BodyOutputStreams call this to get the connection window first.
 794      *
 795      * @param amount
 796      */
 797     synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
 798        while (amount > 0) {
 799            int n = Math.min(amount, sendWindow);
 800            amount -= n;
 801            sendWindow -= n;
 802            if (amount > 0)
 803                wait();
 804        }
 805     }
 806 
 807     synchronized void updateConnectionWindow(int amount) {
 808         sendWindow += amount;
 809         notifyAll();
 810     }
 811 
 812     // simplified output headers class. really just a type safe container
 813     // for the hashmap.
 814 
 815     static class ResponseHeaders extends Http2Frame {
 816         HttpHeadersImpl headers;
 817 
 818         ResponseHeaders(HttpHeadersImpl headers) {
 819             super(0, 0);
 820             this.headers = headers;
 821         }
 822 
 823     }
 824 
 825     static class NullInputStream extends InputStream {
 826        static final NullInputStream INSTANCE = new NullInputStream();
 827        private NullInputStream() {}
 828        public int read()      { return -1; }
 829        public int available() { return 0;  }
 830    }
 831 }