1 /*
   2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.BufferedInputStream;
  25 import java.io.BufferedOutputStream;
  26 import java.io.Closeable;
  27 import java.io.IOException;
  28 import java.io.UncheckedIOException;
  29 import java.io.InputStream;
  30 import java.io.OutputStream;
  31 import java.net.Socket;
  32 import java.net.URI;
  33 import java.net.InetAddress;
  34 import javax.net.ssl.*;
  35 import java.net.URISyntaxException;
  36 import java.net.http.HttpHeaders;
  37 import java.nio.ByteBuffer;
  38 import java.util.*;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.ExecutorService;
  41 import java.util.concurrent.ConcurrentLinkedQueue;
  42 import java.util.function.Consumer;
  43 import jdk.internal.net.http.common.HttpHeadersBuilder;
  44 import jdk.internal.net.http.frame.*;
  45 import jdk.internal.net.http.hpack.Decoder;
  46 import jdk.internal.net.http.hpack.DecodingCallback;
  47 import jdk.internal.net.http.hpack.Encoder;
  48 import sun.net.www.http.ChunkedInputStream;
  49 import sun.net.www.http.HttpClient;
  50 import static java.nio.charset.StandardCharsets.UTF_8;
  51 import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
  52 
  53 /**
  54  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  55  * or HTTPS opened using "h2" ALPN.
  56  */
  57 public class Http2TestServerConnection {
  58     final Http2TestServer server;
  59     @SuppressWarnings({"rawtypes","unchecked"})
  60     final Map<Integer, Queue> streams; // input q per stream
  61     final Map<Integer, BodyOutputStream> outStreams; // output q per stream
  62     final HashSet<Integer> pushStreams;
  63     final Queue<Http2Frame> outputQ;
  64     volatile int nextstream;
  65     final Socket socket;
  66     final Http2TestExchangeSupplier exchangeSupplier;
  67     final InputStream is;
  68     final OutputStream os;
  69     volatile Encoder hpackOut;
  70     volatile Decoder hpackIn;
  71     volatile SettingsFrame clientSettings;
  72     final SettingsFrame serverSettings;
  73     final ExecutorService exec;
  74     final boolean secure;
  75     final Properties properties;
  76     volatile boolean stopping;
  77     volatile int nextPushStreamId = 2;
  78     ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
  79 
  80     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
  81     final static byte[] EMPTY_BARRAY = new byte[0];
  82     final Random random;
  83 
  84     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
  85 
  86     static class Sentinel extends Http2Frame {
  87         Sentinel() { super(-1,-1);}
  88     }
  89 
  90     static final Sentinel sentinel = new Sentinel();
  91 
  92     class PingRequest {
  93         final byte[] pingData;
  94         final long pingStamp;
  95         final CompletableFuture<Long> response;
  96 
  97         PingRequest() {
  98             pingData = new byte[8];
  99             random.nextBytes(pingData);
 100             pingStamp = System.currentTimeMillis();
 101             response = new CompletableFuture<>();
 102         }
 103 
 104         PingFrame frame() {
 105             return new PingFrame(0, pingData);
 106         }
 107 
 108         CompletableFuture<Long> response() {
 109             return response;
 110         }
 111 
 112         void success() {
 113             response.complete(System.currentTimeMillis() - pingStamp);
 114         }
 115 
 116         void fail(Throwable t) {
 117             response.completeExceptionally(t);
 118         }
 119     }
 120 
 121     Http2TestServerConnection(Http2TestServer server,
 122                               Socket socket,
 123                               Http2TestExchangeSupplier exchangeSupplier,
 124                               Properties properties)
 125         throws IOException
 126     {
 127         System.err.println("TestServer: New connection from " + socket);
 128 
 129         if (socket instanceof SSLSocket) {
 130             SSLSocket sslSocket = (SSLSocket)socket;
 131             handshake(server.serverName(), sslSocket);
 132             if (!server.supportsHTTP11 && !"h2".equals(sslSocket.getApplicationProtocol())) {
 133                 throw new IOException("Unexpected ALPN: [" + sslSocket.getApplicationProtocol() + "]");
 134             }
 135         }
 136         this.server = server;
 137         this.exchangeSupplier = exchangeSupplier;
 138         this.streams = Collections.synchronizedMap(new HashMap<>());
 139         this.outStreams = Collections.synchronizedMap(new HashMap<>());
 140         this.outputQ = new Queue<>(sentinel);
 141         this.random = new Random();
 142         this.socket = socket;
 143         this.properties = properties;
 144         this.socket.setTcpNoDelay(true);
 145         this.serverSettings = getServerSettingProperties();
 146         this.exec = server.exec;
 147         this.secure = server.secure;
 148         this.pushStreams = new HashSet<>();
 149         is = new BufferedInputStream(socket.getInputStream());
 150         os = new BufferedOutputStream(socket.getOutputStream());
 151     }
 152 
 153     static final String propPrefix = "http2server.settings.";
 154 
 155     static final String[][] propIDs = {
 156         {"header_table_size", Integer.toString(SettingsFrame.HEADER_TABLE_SIZE)},
 157         {"enable_push", Integer.toString(SettingsFrame.ENABLE_PUSH)},
 158         {"max_concurrent_streams", Integer.toString(SettingsFrame.MAX_CONCURRENT_STREAMS)},
 159         {"initial_window_size", Integer.toString(SettingsFrame.INITIAL_WINDOW_SIZE)},
 160         {"max_frame_size", Integer.toString(SettingsFrame.MAX_FRAME_SIZE)},
 161         {"max_header_list_size", Integer.toString(SettingsFrame.MAX_HEADER_LIST_SIZE)}
 162     };
 163 
 164     private SettingsFrame getServerSettingProperties() {
 165         SettingsFrame s = SettingsFrame.defaultRFCSettings();
 166         if (properties == null)
 167             return s;
 168         for (int i=0; i<propIDs.length; i++) {
 169             String key = propIDs[i][0];
 170             String numS = propIDs[i][1];
 171             String prop = properties.getProperty(propPrefix + key);
 172             if (prop != null) {
 173                 try {
 174                     System.err.println("TestServer: setting " + key + " property to: " +
 175                         prop);
 176                     int num = Integer.parseInt(numS);
 177                     System.err.println("TestServer: num = " + num);
 178                     s.setParameter(num, Integer.parseInt(prop));
 179                 } catch (NumberFormatException e) {/* ignore errors */}
 180             }
 181         }
 182         return s;
 183     }
 184 
 185     /**
 186      * Sends a PING frame on this connection, and completes the returned
 187      * CF when the PING ack is received. The CF is given
 188      * an integer, whose value is the number of milliseconds
 189      * between PING and ACK.
 190      */
 191     CompletableFuture<Long> sendPing() {
 192         PingRequest ping = null;
 193         try {
 194             ping = new PingRequest();
 195             pings.add(ping);
 196             outputQ.put(ping.frame());
 197         } catch (Throwable t) {
 198             ping.fail(t);
 199         }
 200         return ping.response();
 201     }
 202 
 203     void goAway(int error) throws IOException {
 204         int laststream = nextstream >= 3 ? nextstream - 2 : 1;
 205 
 206         GoAwayFrame go = new GoAwayFrame(laststream, error);
 207         outputQ.put(go);
 208     }
 209 
 210     /**
 211      * Returns the first PingRequest from Queue
 212      */
 213     private PingRequest getNextRequest() {
 214         return pings.poll();
 215     }
 216 
 217     /**
 218      * Handles incoming Ping, which could be an ack
 219      * or a client originated Ping
 220      */
 221     void handlePing(PingFrame ping) throws IOException {
 222         if (ping.streamid() != 0) {
 223             System.err.println("Invalid ping received");
 224             close(ErrorFrame.PROTOCOL_ERROR);
 225             return;
 226         }
 227         if (ping.getFlag(PingFrame.ACK)) {
 228             // did we send a Ping?
 229             PingRequest request = getNextRequest();
 230             if (request == null) {
 231                 System.err.println("Invalid ping ACK received");
 232                 close(ErrorFrame.PROTOCOL_ERROR);
 233                 return;
 234             } else if (!Arrays.equals(request.pingData, ping.getData())) {
 235                 request.fail(new RuntimeException("Wrong ping data in ACK"));
 236             } else {
 237                 request.success();
 238             }
 239         } else {
 240             // client originated PING. Just send it back with ACK set
 241             ping.setFlag(PingFrame.ACK);
 242             outputQ.put(ping);
 243         }
 244     }
 245 
 246     private static boolean compareIPAddrs(InetAddress addr1, String host) {
 247         try {
 248             InetAddress addr2 = InetAddress.getByName(host);
 249             return addr1.equals(addr2);
 250         } catch (IOException e) {
 251             throw new UncheckedIOException(e);
 252         }
 253     }
 254 
 255     private static void handshake(String name, SSLSocket sock) throws IOException {
 256         if (name == null) {
 257             sock.getSession(); // blocks until handshake done
 258             return;
 259         } else if (name.equals("localhost")) {
 260             name = "localhost";
 261         }
 262         final String fname = name;
 263         final InetAddress addr1 = InetAddress.getByName(name);
 264         SSLParameters params = sock.getSSLParameters();
 265         SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {
 266             public boolean matches (SNIServerName n) {
 267                 String host = ((SNIHostName)n).getAsciiName();
 268                 if (host.equals("localhost"))
 269                     host = "localhost";
 270                 boolean cmp = host.equalsIgnoreCase(fname);
 271                 if (cmp)
 272                     return true;
 273                 return compareIPAddrs(addr1, host);
 274             }
 275         };
 276         List<SNIMatcher> list = List.of(matcher);
 277         params.setSNIMatchers(list);
 278         sock.setSSLParameters(params);
 279         sock.getSession(); // blocks until handshake done
 280     }
 281 
 282     void closeIncoming() {
 283         close(-1);
 284     }
 285 
 286     void close(int error) {
 287         if (stopping)
 288             return;
 289         stopping = true;
 290         System.err.printf("Server connection to %s stopping. %d streams\n",
 291             socket.getRemoteSocketAddress().toString(), streams.size());
 292         streams.forEach((i, q) -> {
 293             q.orderlyClose();
 294         });
 295         try {
 296             if (error != -1)
 297                 goAway(error);
 298             outputQ.orderlyClose();
 299             socket.close();
 300         } catch (Exception e) {
 301         }
 302     }
 303 
 304     private void readPreface() throws IOException {
 305         int len = clientPreface.length;
 306         byte[] bytes = new byte[len];
 307         is.readNBytes(bytes, 0, len);
 308         if (Arrays.compare(clientPreface, bytes) != 0) {
 309             throw new IOException("Invalid preface: " + new String(bytes, 0, len));
 310         }
 311     }
 312 
 313     Http1InitialRequest doUpgrade(Http1InitialRequest upgrade) throws IOException {
 314         String h2c = getHeader(upgrade.headers, "Upgrade");
 315         if (h2c == null || !h2c.equals("h2c")) {
 316             System.err.println("Server:HEADERS: " + upgrade);
 317             throw new IOException("Bad upgrade 1 " + h2c);
 318         }
 319 
 320         sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade",
 321                 "Upgrade", "h2c");
 322 
 323         sendSettingsFrame();
 324         readPreface();
 325 
 326         String clientSettingsString = getHeader(upgrade.headers, "HTTP2-Settings");
 327         clientSettings = getSettingsFromString(clientSettingsString);
 328 
 329         return upgrade;
 330     }
 331 
 332     /**
 333      * Decodes the given, Client, settings payload provided in base64 HTTP1
 334      * header value.
 335      */
 336     private SettingsFrame getSettingsFromString(String s) throws IOException {
 337         Base64.Decoder decoder = Base64.getUrlDecoder();
 338         byte[] payload = decoder.decode(s);
 339         ByteBuffer bb1 = ByteBuffer.wrap(payload);
 340         // simulate header of Settings Frame
 341         ByteBuffer bb0 = ByteBuffer.wrap(
 342                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
 343         List<Http2Frame> frames = new ArrayList<>();
 344         FramesDecoder reader = new FramesDecoder(frames::add);
 345         reader.decode(bb0);
 346         reader.decode(bb1);
 347         if (frames.size()!=1)
 348             throw new IOException("Expected 1 frame got "+frames.size()) ;
 349         Http2Frame frame = frames.get(0);
 350         if (!(frame instanceof SettingsFrame))
 351             throw new IOException("Expected SettingsFrame");
 352         return (SettingsFrame)frame;
 353     }
 354 
 355     public int getMaxFrameSize() {
 356         return clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE);
 357     }
 358 
 359     private void standardHTTP11Response(Http1InitialRequest request)
 360         throws IOException
 361     {
 362         String upgradeHeader = getHeader(request.headers, "Upgrade");
 363         if (upgradeHeader != null) {
 364             throw new IOException("Unexpected Upgrade header:" + upgradeHeader);
 365         }
 366 
 367         sendHttp1Response(200, "OK",
 368                           "Connection", "close",
 369                           "X-Magic", "HTTP/1.1 request received by HTTP/2 server",
 370                           "X-Received-Body", new String(request.body, UTF_8));
 371 
 372     }
 373 
 374     void run() throws Exception {
 375         Http1InitialRequest upgrade = null;
 376         if (!secure) {
 377             Http1InitialRequest request = readHttp1Request();
 378             String h2c = getHeader(request.headers, "Upgrade");
 379             if (h2c == null || !h2c.equals("h2c")) {
 380                 if (server.supportsHTTP11) {
 381                     standardHTTP11Response(request);
 382                     socket.close();
 383                     return;
 384                 } else {
 385                     System.err.println("Server:HEADERS: " + upgrade);
 386                     throw new IOException("Bad upgrade 1 " + h2c);
 387                 }
 388             }
 389             upgrade = doUpgrade(request);
 390         } else { // secure
 391             SSLSocket sslSocket = (SSLSocket)socket;
 392             if (sslSocket.getApplicationProtocol().equals("h2")) {
 393                 readPreface();
 394                 sendSettingsFrame(true);
 395                 clientSettings = (SettingsFrame) readFrame();
 396                 if (clientSettings.getFlag(SettingsFrame.ACK)) {
 397                     // we received the ack to our frame first
 398                     clientSettings = (SettingsFrame) readFrame();
 399                 }
 400                 nextstream = 1;
 401             } else if (sslSocket.getApplicationProtocol().equals("http/1.1") ||
 402                        sslSocket.getApplicationProtocol().equals("")) {
 403                 standardHTTP11Response(readHttp1Request());
 404                 socket.shutdownOutput();
 405                 socket.close();
 406                 return;
 407             } else {
 408                 throw new IOException("Unexpected ALPN:" + sslSocket.getApplicationProtocol());
 409             }
 410         }
 411 
 412         // Uncomment if needed, but very noisy
 413         //System.out.println("ServerSettings: " + serverSettings);
 414         //System.out.println("ClientSettings: " + clientSettings);
 415 
 416         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 417         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 418 
 419         if (!secure) {
 420             createPrimordialStream(upgrade);
 421             nextstream = 3;
 422         }
 423 
 424         (new ConnectionThread("readLoop", this::readLoop)).start();
 425         (new ConnectionThread("writeLoop", this::writeLoop)).start();
 426     }
 427 
 428     class ConnectionThread extends Thread {
 429         final Runnable r;
 430         ConnectionThread(String name, Runnable r) {
 431             setName(name);
 432             setDaemon(true);
 433             this.r = r;
 434         }
 435 
 436         public void run() {
 437             r.run();
 438         }
 439     }
 440 
 441     private void writeFrame(Http2Frame frame) throws IOException {
 442         List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
 443         //System.err.println("TestServer: Writing frame " + frame.toString());
 444         int c = 0;
 445         for (ByteBuffer buf : bufs) {
 446             byte[] ba = buf.array();
 447             int start = buf.arrayOffset() + buf.position();
 448             c += buf.remaining();
 449             os.write(ba, start, buf.remaining());
 450 
 451 //            System.out.println("writing byte at a time");
 452 //            while (buf.hasRemaining()) {
 453 //                byte b = buf.get();
 454 //                os.write(b);
 455 //                os.flush();
 456 //                try {
 457 //                    Thread.sleep(1);
 458 //                } catch(InterruptedException e) {
 459 //                    UncheckedIOException uie = new UncheckedIOException(new IOException(""));
 460 //                    uie.addSuppressed(e);
 461 //                    throw uie;
 462 //                }
 463 //            }
 464         }
 465         os.flush();
 466         //System.err.printf("TestServer: wrote %d bytes\n", c);
 467     }
 468 
 469     private void handleCommonFrame(Http2Frame f) throws IOException {
 470         if (f instanceof SettingsFrame) {
 471             SettingsFrame sf = (SettingsFrame) f;
 472             if (sf.getFlag(SettingsFrame.ACK)) // ignore
 473             {
 474                 return;
 475             }
 476             // otherwise acknowledge it
 477             clientSettings = sf;
 478             SettingsFrame frame = new SettingsFrame();
 479             frame.setFlag(SettingsFrame.ACK);
 480             frame.streamid(0);
 481             outputQ.put(frame);
 482             return;
 483         } else if (f instanceof GoAwayFrame) {
 484             System.err.println("Closing: "+ f.toString());
 485             close(ErrorFrame.NO_ERROR);
 486         } else if (f instanceof PingFrame) {
 487             handlePing((PingFrame)f);
 488         } else
 489             throw new UnsupportedOperationException("Not supported yet: " + f.toString());
 490     }
 491 
 492     void sendWindowUpdates(int len, int streamid) throws IOException {
 493         if (len == 0)
 494             return;
 495         WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
 496         outputQ.put(wup);
 497         wup = new WindowUpdateFrame(0 , len);
 498         outputQ.put(wup);
 499     }
 500 
 501     HttpHeaders decodeHeaders(List<HeaderFrame> frames) throws IOException {
 502         HttpHeadersBuilder headersBuilder = createNewHeadersBuilder();
 503 
 504         DecodingCallback cb = (name, value) -> {
 505             headersBuilder.addHeader(name.toString(), value.toString());
 506         };
 507 
 508         for (HeaderFrame frame : frames) {
 509             List<ByteBuffer> buffers = frame.getHeaderBlock();
 510             for (ByteBuffer buffer : buffers) {
 511                 hpackIn.decode(buffer, false, cb);
 512             }
 513         }
 514         hpackIn.decode(EMPTY_BUFFER, true, cb);
 515         return headersBuilder.build();
 516     }
 517 
 518     String getRequestLine(String request) {
 519         int eol = request.indexOf(CRLF);
 520         return request.substring(0, eol);
 521     }
 522 
 523     String getHeaders(String request) {
 524         int start = request.indexOf(CRLF);
 525         int end = request.indexOf(CRLFCRLF);
 526         if (start == -1 || end == -1) {
 527             throw new RuntimeException("Malformed request");
 528         }
 529         return request.substring(start,end);
 530     }
 531 
 532     static void addHeaders(String headersString, HttpHeadersBuilder headersBuilder) {
 533         String[] hh = headersString.split(CRLF);
 534         for (String header : hh) {
 535             int colon = header.indexOf(':');
 536             if (colon == -1)
 537                 continue;
 538             String name = header.substring(0, colon);
 539             String value = header.substring(colon+1);
 540             while (value.startsWith(" "))
 541                 value = value.substring(1);
 542             headersBuilder.addHeader(name, value);
 543         }
 544     }
 545 
 546     // First stream (1) comes from a plaintext HTTP/1.1 request
 547     @SuppressWarnings({"rawtypes","unchecked"})
 548     void createPrimordialStream(Http1InitialRequest request) throws IOException {
 549         HttpHeadersBuilder headersBuilder = createNewHeadersBuilder();
 550         String requestLine = getRequestLine(request.headers);
 551         String[] tokens = requestLine.split(" ");
 552         if (!tokens[2].equals("HTTP/1.1")) {
 553             throw new IOException("bad request line");
 554         }
 555         URI uri;
 556         try {
 557             uri = new URI(tokens[1]);
 558         } catch (URISyntaxException e) {
 559             throw new IOException(e);
 560         }
 561         String host = getHeader(request.headers, "Host");
 562         if (host == null) {
 563             throw new IOException("missing Host");
 564         }
 565 
 566         headersBuilder.setHeader(":method", tokens[0]);
 567         headersBuilder.setHeader(":scheme", "http"); // always in this case
 568         headersBuilder.setHeader(":authority", host);
 569         String path = uri.getRawPath();
 570         if (uri.getRawQuery() != null)
 571             path = path + "?" + uri.getRawQuery();
 572         headersBuilder.setHeader(":path", path);
 573 
 574         Queue q = new Queue(sentinel);
 575         byte[] body = getRequestBody(request);
 576         addHeaders(getHeaders(request.headers), headersBuilder);
 577         headersBuilder.setHeader("Content-length", Integer.toString(body.length));
 578         HttpHeaders headers = headersBuilder.build();
 579 
 580         addRequestBodyToQueue(body, q);
 581         streams.put(1, q);
 582         exec.submit(() -> {
 583             handleRequest(headers, q, 1, true /*complete request has been read*/);
 584         });
 585     }
 586 
 587     // all other streams created here
 588     @SuppressWarnings({"rawtypes","unchecked"})
 589     void createStream(HeaderFrame frame) throws IOException {
 590         List<HeaderFrame> frames = new LinkedList<>();
 591         frames.add(frame);
 592         int streamid = frame.streamid();
 593         if (streamid != nextstream) {
 594             throw new IOException("unexpected stream id");
 595         }
 596         nextstream += 2;
 597 
 598         boolean endStream = false;
 599         if (frame.getFlag(HeaderFrame.END_STREAM)) {
 600             endStream = true;
 601         }
 602 
 603         while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
 604             Http2Frame f = readFrame();
 605             if (!(f instanceof HeaderFrame)) {
 606                 handleCommonFrame(f); // should only be error frames
 607             } else {
 608                 frame = (HeaderFrame) f;
 609                 if (frame.getFlag(HeaderFrame.END_STREAM)) {
 610                     endStream = true;
 611                 }
 612                 frames.add(frame);
 613             }
 614         }
 615         boolean endStreamReceived = endStream;
 616         HttpHeaders headers = decodeHeaders(frames);
 617 
 618         // Strict to assert Client correctness. Not all servers are as strict,
 619         // but some are known to be.
 620         Optional<?> disallowedHeader = headers.firstValue("Upgrade");
 621         if (disallowedHeader.isPresent()) {
 622             throw new IOException("Unexpected Upgrade in headers:" + headers);
 623         }
 624         disallowedHeader = headers.firstValue("HTTP2-Settings");
 625         if (disallowedHeader.isPresent())
 626             throw new IOException("Unexpected HTTP2-Settings in headers:" + headers);
 627 
 628 
 629         Queue q = new Queue(sentinel);
 630         streams.put(streamid, q);
 631         exec.submit(() -> {
 632             handleRequest(headers, q, streamid, endStreamReceived);
 633         });
 634     }
 635 
 636     // runs in own thread. Handles request from start to finish. Incoming frames
 637     // for this stream/request delivered on Q
 638 
 639     @SuppressWarnings({"rawtypes","unchecked"})
 640     void handleRequest(HttpHeaders headers,
 641                        Queue queue,
 642                        int streamid,
 643                        boolean endStreamReceived)
 644     {
 645         String method = headers.firstValue(":method").orElse("");
 646         //System.out.println("method = " + method);
 647         String path = headers.firstValue(":path").orElse("");
 648         //System.out.println("path = " + path);
 649         String scheme = headers.firstValue(":scheme").orElse("");
 650         //System.out.println("scheme = " + scheme);
 651         String authority = headers.firstValue(":authority").orElse("");
 652         //System.out.println("authority = " + authority);
 653         System.err.printf("TestServer: %s %s\n", method, path);
 654         int winsize = clientSettings.getParameter(
 655                 SettingsFrame.INITIAL_WINDOW_SIZE);
 656         //System.err.println ("Stream window size = " + winsize);
 657 
 658         final InputStream bis;
 659         if (endStreamReceived && queue.size() == 0) {
 660             System.err.println("Server: got END_STREAM for stream " + streamid);
 661             bis = NullInputStream.INSTANCE;
 662         } else {
 663             System.err.println("Server: creating input stream for stream " + streamid);
 664             bis = new BodyInputStream(queue, streamid, this);
 665         }
 666         try (bis;
 667              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
 668         {
 669             outStreams.put(streamid, bos);
 670             String us = scheme + "://" + authority + path;
 671             URI uri = new URI(us);
 672             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
 673             HttpHeadersBuilder rspheadersBuilder = createNewHeadersBuilder();
 674             Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
 675                     headers, rspheadersBuilder, uri, bis, getSSLSession(),
 676                     bos, this, pushAllowed);
 677 
 678             // give to user
 679             Http2Handler handler = server.getHandlerFor(uri.getPath());
 680             try {
 681                 handler.handle(exchange);
 682             } catch (IOException closed) {
 683                 if (bos.closed) {
 684                     Queue q = streams.get(streamid);
 685                     if (q != null && (q.isClosed() || q.isClosing())) {
 686                         System.err.println("TestServer: Stream " + streamid + " closed: " + closed);
 687                         return;
 688                     }
 689                 }
 690                 throw closed;
 691             }
 692 
 693             // everything happens in the exchange from here. Hopefully will
 694             // return though.
 695         } catch (Throwable e) {
 696             System.err.println("TestServer: handleRequest exception: " + e);
 697             e.printStackTrace();
 698             close(-1);
 699         }
 700     }
 701 
 702     protected HttpHeadersBuilder createNewHeadersBuilder() {
 703         return new HttpHeadersBuilder();
 704     }
 705 
 706     private SSLSession getSSLSession() {
 707         if (! (socket instanceof SSLSocket))
 708             return null;
 709         SSLSocket ssl = (SSLSocket)socket;
 710         return ssl.getSession();
 711     }
 712     // Runs in own thread
 713 
 714     @SuppressWarnings({"rawtypes","unchecked"})
 715     void readLoop() {
 716         try {
 717             while (!stopping) {
 718                 Http2Frame frame = readFrameImpl();
 719                 if (frame == null) {
 720                     closeIncoming();
 721                     return;
 722                 }
 723                 //System.err.printf("TestServer: received frame %s\n", frame);
 724                 int stream = frame.streamid();
 725                 if (stream == 0) {
 726                     if (frame.type() == WindowUpdateFrame.TYPE) {
 727                         WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 728                         updateConnectionWindow(wup.getUpdate());
 729                     } else {
 730                         // other common frame types
 731                         handleCommonFrame(frame);
 732                     }
 733                 } else {
 734                     Queue q = streams.get(stream);
 735                     if (frame.type() == HeadersFrame.TYPE) {
 736                         if (q != null) {
 737                             System.err.println("HEADERS frame for existing stream! Error.");
 738                             // TODO: close connection
 739                             continue;
 740                         } else {
 741                             createStream((HeadersFrame) frame);
 742                         }
 743                     } else {
 744                         if (q == null && !pushStreams.contains(stream)) {
 745                             System.err.printf("Non Headers frame received with"+
 746                                     " non existing stream (%d) ", frame.streamid());
 747                             System.err.println(frame);
 748                             continue;
 749                         }
 750                         if (frame.type() == WindowUpdateFrame.TYPE) {
 751                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 752                             synchronized (updaters) {
 753                                 Consumer<Integer> r = updaters.get(stream);
 754                                 r.accept(wup.getUpdate());
 755                             }
 756                         } else if (frame.type() == ResetFrame.TYPE) {
 757                             // do orderly close on input q
 758                             // and close the output q immediately
 759                             // This should mean depending on what the
 760                             // handler is doing: either an EOF on read
 761                             // or an IOException if writing the response.
 762                             if (q != null) {
 763                                 q.orderlyClose();
 764                                 BodyOutputStream oq = outStreams.get(stream);
 765                                 if (oq != null)
 766                                     oq.closeInternal();
 767                             } else if (pushStreams.contains(stream)) {
 768                                 // we could interrupt the pushStream's output
 769                                 // but the continuation, even after a reset
 770                                 // should be handle gracefully by the client
 771                                 // anyway.
 772                             } else {
 773                                 System.err.println("TestServer: Unexpected frame on: " + stream);
 774                                 System.err.println(frame);
 775                                 throw new IOException("Unexpected frame");
 776                             }
 777                         } else {
 778                             q.put(frame);
 779                         }
 780                     }
 781                 }
 782             }
 783         } catch (Throwable e) {
 784             if (!stopping) {
 785                 System.err.println("Http server reader thread shutdown");
 786                 e.printStackTrace();
 787             }
 788             close(ErrorFrame.PROTOCOL_ERROR);
 789         }
 790     }
 791 
 792     /** Encodes an group of headers, without any ordering guarantees. */
 793     List<ByteBuffer> encodeHeaders(HttpHeaders headers) {
 794         List<ByteBuffer> buffers = new LinkedList<>();
 795 
 796         ByteBuffer buf = getBuffer();
 797         boolean encoded;
 798         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
 799             List<String> values = entry.getValue();
 800             String key = entry.getKey().toLowerCase();
 801             for (String value : values) {
 802                 do {
 803                     hpackOut.header(key, value);
 804                     encoded = hpackOut.encode(buf);
 805                     if (!encoded) {
 806                         buf.flip();
 807                         buffers.add(buf);
 808                         buf = getBuffer();
 809                     }
 810                 } while (!encoded);
 811             }
 812         }
 813         buf.flip();
 814         buffers.add(buf);
 815         return buffers;
 816     }
 817 
 818     /** Encodes an ordered list of headers. */
 819     List<ByteBuffer> encodeHeadersOrdered(List<Map.Entry<String,String>> headers) {
 820         List<ByteBuffer> buffers = new LinkedList<>();
 821 
 822         ByteBuffer buf = getBuffer();
 823         boolean encoded;
 824         for (Map.Entry<String, String> entry : headers) {
 825             String value = entry.getValue();
 826             String key = entry.getKey().toLowerCase();
 827             do {
 828                 hpackOut.header(key, value);
 829                 encoded = hpackOut.encode(buf);
 830                 if (!encoded) {
 831                     buf.flip();
 832                     buffers.add(buf);
 833                     buf = getBuffer();
 834                 }
 835             } while (!encoded);
 836         }
 837         buf.flip();
 838         buffers.add(buf);
 839         return buffers;
 840     }
 841 
 842     static void closeIgnore(Closeable c) {
 843         try {
 844             c.close();
 845         } catch (IOException e) {}
 846     }
 847 
 848     // Runs in own thread
 849     void writeLoop() {
 850         try {
 851             while (!stopping) {
 852                 Http2Frame frame;
 853                 try {
 854                     frame = outputQ.take();
 855                     if (stopping)
 856                         break;
 857                 } catch(IOException x) {
 858                     if (stopping && x.getCause() instanceof InterruptedException) {
 859                         break;
 860                     } else throw x;
 861                 }
 862                 if (frame instanceof ResponseHeaders) {
 863                     ResponseHeaders rh = (ResponseHeaders)frame;
 864                     HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
 865                     writeFrame(hf);
 866                 } else if (frame instanceof OutgoingPushPromise) {
 867                     handlePush((OutgoingPushPromise)frame);
 868                 } else
 869                     writeFrame(frame);
 870             }
 871             System.err.println("TestServer: Connection writer stopping");
 872         } catch (Throwable e) {
 873             e.printStackTrace();
 874             /*close();
 875             if (!stopping) {
 876                 e.printStackTrace();
 877                 System.err.println("TestServer: writeLoop exception: " + e);
 878             }*/
 879         }
 880     }
 881 
 882     private void handlePush(OutgoingPushPromise op) throws IOException {
 883         int promisedStreamid = nextPushStreamId;
 884         PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
 885                                                    HeaderFrame.END_HEADERS,
 886                                                    promisedStreamid,
 887                                                    encodeHeaders(op.headers),
 888                                                    0);
 889         pushStreams.add(promisedStreamid);
 890         nextPushStreamId += 2;
 891         pp.streamid(op.parentStream);
 892         writeFrame(pp);
 893         final InputStream ii = op.is;
 894         final BodyOutputStream oo = new BodyOutputStream(
 895                 promisedStreamid,
 896                 clientSettings.getParameter(
 897                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
 898         outStreams.put(promisedStreamid, oo);
 899         oo.goodToGo();
 900         exec.submit(() -> {
 901             try {
 902                 ResponseHeaders oh = getPushResponse(promisedStreamid);
 903                 outputQ.put(oh);
 904                 ii.transferTo(oo);
 905             } catch (Throwable ex) {
 906                 System.err.printf("TestServer: pushing response error: %s\n",
 907                         ex.toString());
 908             } finally {
 909                 closeIgnore(ii);
 910                 closeIgnore(oo);
 911             }
 912         });
 913 
 914     }
 915 
 916     // returns a minimal response with status 200
 917     // that is the response to the push promise just sent
 918     private ResponseHeaders getPushResponse(int streamid) {
 919         HttpHeadersBuilder hb = createNewHeadersBuilder();
 920         hb.addHeader(":status", "200");
 921         ResponseHeaders oh = new ResponseHeaders(hb.build());
 922         oh.streamid(streamid);
 923         oh.setFlag(HeaderFrame.END_HEADERS);
 924         return oh;
 925     }
 926 
 927     private ByteBuffer getBuffer() {
 928         return ByteBuffer.allocate(8 * 1024);
 929     }
 930 
 931     private Http2Frame readFrame() throws IOException {
 932         Http2Frame f = readFrameImpl();
 933         if (f == null)
 934             throw new IOException("connection closed");
 935         return f;
 936     }
 937 
 938     // does not throw an exception for EOF
 939     private Http2Frame readFrameImpl() throws IOException {
 940         try {
 941             byte[] buf = new byte[9];
 942             int ret;
 943             ret=is.readNBytes(buf, 0, 9);
 944             if (ret == 0) {
 945                 return null;
 946             } else if (ret != 9) {
 947                 throw new IOException("readFrame: connection closed");
 948             }
 949             int len = 0;
 950             for (int i = 0; i < 3; i++) {
 951                 int n = buf[i] & 0xff;
 952                 //System.err.println("n = " + n);
 953                 len = (len << 8) + n;
 954             }
 955             byte[] rest = new byte[len];
 956             int n = is.readNBytes(rest, 0, len);
 957             if (n != len)
 958                 throw new IOException("Error reading frame");
 959             List<Http2Frame> frames = new ArrayList<>();
 960             FramesDecoder reader = new FramesDecoder(frames::add);
 961             reader.decode(ByteBuffer.wrap(buf));
 962             reader.decode(ByteBuffer.wrap(rest));
 963             if (frames.size()!=1)
 964                 throw new IOException("Expected 1 frame got "+frames.size()) ;
 965 
 966             return frames.get(0);
 967         } catch (IOException ee) {
 968             if (stopping)
 969                 return null;
 970             throw ee;
 971         }
 972     }
 973 
 974     void sendSettingsFrame() throws IOException {
 975         sendSettingsFrame(false);
 976     }
 977 
 978     void sendSettingsFrame(boolean now) throws IOException {
 979         if (now) {
 980             writeFrame(serverSettings);
 981         } else {
 982             outputQ.put(serverSettings);
 983         }
 984     }
 985 
 986     String readUntil(String end) throws IOException {
 987         int number = end.length();
 988         int found = 0;
 989         StringBuilder sb = new StringBuilder();
 990         while (found < number) {
 991             char expected = end.charAt(found);
 992             int c = is.read();
 993             if (c == -1) {
 994                 throw new IOException("Connection closed");
 995             }
 996             char c0 = (char) c;
 997             sb.append(c0);
 998             if (c0 != expected) {
 999                 found = 0;
1000                 continue;
1001             }
1002             found++;
1003         }
1004         return sb.toString();
1005     }
1006 
1007     private int getContentLength(String headers) {
1008         return getIntHeader(headers, "Content-length");
1009     }
1010 
1011     private int getIntHeader(String headers, String name) {
1012         String val = getHeader(headers, name);
1013         if (val == null) {
1014             return -1;
1015         }
1016         return Integer.parseInt(val);
1017     }
1018 
1019     private String getHeader(String headers, String name) {
1020         String headers1 = headers.toLowerCase(); // not efficient
1021         name = CRLF + name.toLowerCase();
1022         int start = headers1.indexOf(name);
1023         if (start == -1) {
1024             return null;
1025         }
1026         start += 2;
1027         int end = headers1.indexOf(CRLF, start);
1028         String line = headers.substring(start, end);
1029         start = line.indexOf(':');
1030         if (start == -1) {
1031             return null;
1032         }
1033         return line.substring(start + 1).trim();
1034     }
1035 
1036     final static String CRLF = "\r\n";
1037     final static String CRLFCRLF = "\r\n\r\n";
1038 
1039     static class Http1InitialRequest {
1040         final String headers;
1041         final byte[] body;
1042         Http1InitialRequest(String headers, byte[] body) {
1043             this.headers = headers;
1044             this.body = body.clone();
1045         }
1046     }
1047 
1048     Http1InitialRequest readHttp1Request() throws IOException {
1049         String headers = readUntil(CRLF + CRLF);
1050         int clen = getContentLength(headers);
1051         String te = getHeader(headers, "Transfer-encoding");
1052         byte[] buf = new byte[0];
1053         try {
1054             if (clen >= 0) {
1055                 // HTTP/1.1 fixed length content ( may be 0 ), read it
1056                 buf = new byte[clen];
1057                 is.readNBytes(buf, 0, clen);
1058             } else if ("chunked".equalsIgnoreCase(te)) {
1059                 //  HTTP/1.1 chunked data, read it
1060                 buf = readChunkedInputStream(is);
1061             }
1062             return new Http1InitialRequest(headers, buf);
1063         } catch (IOException e) {
1064             System.err.println("TestServer: headers read: [ " + headers + " ]");
1065             throw e;
1066         }
1067     }
1068 
1069     // This is a quick hack to get a chunked input stream reader.
1070     private static byte[] readChunkedInputStream(InputStream is) throws IOException {
1071         ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
1072         return cis.readAllBytes();
1073     }
1074 
1075     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
1076         StringBuilder sb = new StringBuilder();
1077         sb.append("HTTP/1.1 ")
1078                 .append(code)
1079                 .append(' ')
1080                 .append(msg)
1081                 .append(CRLF);
1082         int numheaders = headers.length;
1083         for (int i = 0; i < numheaders; i += 2) {
1084             sb.append(headers[i])
1085                     .append(": ")
1086                     .append(headers[i + 1])
1087                     .append(CRLF);
1088         }
1089         sb.append(CRLF);
1090         String s = sb.toString();
1091         os.write(s.getBytes("US-ASCII"));
1092         os.flush();
1093     }
1094 
1095     private void unexpectedFrame(Http2Frame frame) {
1096         System.err.println("OOPS. Unexpected");
1097         assert false;
1098     }
1099 
1100     final static ByteBuffer[] bbarray = new ByteBuffer[0];
1101 
1102     // wrapper around a BlockingQueue that throws an exception when it's closed
1103     // Each stream has one of these
1104 
1105     byte[] getRequestBody(Http1InitialRequest request) {
1106         return request.body;
1107     }
1108 
1109     @SuppressWarnings({"rawtypes","unchecked"})
1110     void addRequestBodyToQueue(byte[] body, Queue q) throws IOException {
1111         ByteBuffer buf = ByteBuffer.wrap(body);
1112         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
1113         // only used for primordial stream
1114         q.put(df);
1115     }
1116 
1117     // window updates done in main reader thread because they may
1118     // be used to unblock BodyOutputStreams waiting for WUPs
1119 
1120     HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
1121 
1122     void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
1123         synchronized(updaters) {
1124             updaters.put(streamid, r);
1125         }
1126     }
1127 
1128     int sendWindow = 64 * 1024 - 1; // connection level send window
1129 
1130     /**
1131      * BodyOutputStreams call this to get the connection window first.
1132      *
1133      * @param amount
1134      */
1135     synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
1136         while (amount > 0) {
1137             int n = Math.min(amount, sendWindow);
1138             amount -= n;
1139             sendWindow -= n;
1140             if (amount > 0)
1141                 wait();
1142         }
1143     }
1144 
1145     synchronized void updateConnectionWindow(int amount) {
1146         sendWindow += amount;
1147         notifyAll();
1148     }
1149 
1150     // simplified output headers class. really just a type safe container
1151     // for the hashmap.
1152 
1153     static class ResponseHeaders extends Http2Frame {
1154         HttpHeaders headers;
1155 
1156         ResponseHeaders(HttpHeaders headers) {
1157             super(0, 0);
1158             this.headers = headers;
1159         }
1160 
1161     }
1162 
1163     static class NullInputStream extends InputStream {
1164         static final NullInputStream INSTANCE = new NullInputStream();
1165         private NullInputStream() {}
1166         public int read()      { return -1; }
1167         public int available() { return 0;  }
1168     }
1169 }