1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.BufferedInputStream; 25 import java.io.BufferedOutputStream; 26 import java.io.Closeable; 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.io.InputStream; 30 import java.io.OutputStream; 31 import java.net.Socket; 32 import java.net.URI; 33 import java.net.InetAddress; 34 import javax.net.ssl.*; 35 import java.net.URISyntaxException; 36 import java.net.http.HttpHeaders; 37 import java.nio.ByteBuffer; 38 import java.util.*; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.ConcurrentLinkedQueue; 42 import java.util.function.Consumer; 43 import jdk.internal.net.http.common.HttpHeadersBuilder; 44 import jdk.internal.net.http.frame.*; 45 import jdk.internal.net.http.hpack.Decoder; 46 import jdk.internal.net.http.hpack.DecodingCallback; 47 import jdk.internal.net.http.hpack.Encoder; 48 import sun.net.www.http.ChunkedInputStream; 49 import sun.net.www.http.HttpClient; 50 import static java.nio.charset.StandardCharsets.UTF_8; 51 import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE; 52 53 /** 54 * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1 55 * or HTTPS opened using "h2" ALPN. 56 */ 57 public class Http2TestServerConnection { 58 final Http2TestServer server; 59 @SuppressWarnings({"rawtypes","unchecked"}) 60 final Map<Integer, Queue> streams; // input q per stream 61 final Map<Integer, BodyOutputStream> outStreams; // output q per stream 62 final HashSet<Integer> pushStreams; 63 final Queue<Http2Frame> outputQ; 64 volatile int nextstream; 65 final Socket socket; 66 final Http2TestExchangeSupplier exchangeSupplier; 67 final InputStream is; 68 final OutputStream os; 69 volatile Encoder hpackOut; 70 volatile Decoder hpackIn; 71 volatile SettingsFrame clientSettings; 72 final SettingsFrame serverSettings; 73 final ExecutorService exec; 74 final boolean secure; 75 final Properties properties; 76 volatile boolean stopping; 77 volatile int nextPushStreamId = 2; 78 ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>(); 79 80 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); 81 final static byte[] EMPTY_BARRAY = new byte[0]; 82 final Random random; 83 84 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); 85 86 static class Sentinel extends Http2Frame { 87 Sentinel() { super(-1,-1);} 88 } 89 90 static final Sentinel sentinel = new Sentinel(); 91 92 class PingRequest { 93 final byte[] pingData; 94 final long pingStamp; 95 final CompletableFuture<Long> response; 96 97 PingRequest() { 98 pingData = new byte[8]; 99 random.nextBytes(pingData); 100 pingStamp = System.currentTimeMillis(); 101 response = new CompletableFuture<>(); 102 } 103 104 PingFrame frame() { 105 return new PingFrame(0, pingData); 106 } 107 108 CompletableFuture<Long> response() { 109 return response; 110 } 111 112 void success() { 113 response.complete(System.currentTimeMillis() - pingStamp); 114 } 115 116 void fail(Throwable t) { 117 response.completeExceptionally(t); 118 } 119 } 120 121 Http2TestServerConnection(Http2TestServer server, 122 Socket socket, 123 Http2TestExchangeSupplier exchangeSupplier, 124 Properties properties) 125 throws IOException 126 { 127 System.err.println("TestServer: New connection from " + socket); 128 129 if (socket instanceof SSLSocket) { 130 SSLSocket sslSocket = (SSLSocket)socket; 131 handshake(server.serverName(), sslSocket); 132 if (!server.supportsHTTP11 && !"h2".equals(sslSocket.getApplicationProtocol())) { 133 throw new IOException("Unexpected ALPN: [" + sslSocket.getApplicationProtocol() + "]"); 134 } 135 } 136 this.server = server; 137 this.exchangeSupplier = exchangeSupplier; 138 this.streams = Collections.synchronizedMap(new HashMap<>()); 139 this.outStreams = Collections.synchronizedMap(new HashMap<>()); 140 this.outputQ = new Queue<>(sentinel); 141 this.random = new Random(); 142 this.socket = socket; 143 this.properties = properties; 144 this.socket.setTcpNoDelay(true); 145 this.serverSettings = getServerSettingProperties(); 146 this.exec = server.exec; 147 this.secure = server.secure; 148 this.pushStreams = new HashSet<>(); 149 is = new BufferedInputStream(socket.getInputStream()); 150 os = new BufferedOutputStream(socket.getOutputStream()); 151 } 152 153 static final String propPrefix = "http2server.settings."; 154 155 static final String[][] propIDs = { 156 {"header_table_size", Integer.toString(SettingsFrame.HEADER_TABLE_SIZE)}, 157 {"enable_push", Integer.toString(SettingsFrame.ENABLE_PUSH)}, 158 {"max_concurrent_streams", Integer.toString(SettingsFrame.MAX_CONCURRENT_STREAMS)}, 159 {"initial_window_size", Integer.toString(SettingsFrame.INITIAL_WINDOW_SIZE)}, 160 {"max_frame_size", Integer.toString(SettingsFrame.MAX_FRAME_SIZE)}, 161 {"max_header_list_size", Integer.toString(SettingsFrame.MAX_HEADER_LIST_SIZE)} 162 }; 163 164 private SettingsFrame getServerSettingProperties() { 165 SettingsFrame s = SettingsFrame.defaultRFCSettings(); 166 if (properties == null) 167 return s; 168 for (int i=0; i<propIDs.length; i++) { 169 String key = propIDs[i][0]; 170 String numS = propIDs[i][1]; 171 String prop = properties.getProperty(propPrefix + key); 172 if (prop != null) { 173 try { 174 System.err.println("TestServer: setting " + key + " property to: " + 175 prop); 176 int num = Integer.parseInt(numS); 177 System.err.println("TestServer: num = " + num); 178 s.setParameter(num, Integer.parseInt(prop)); 179 } catch (NumberFormatException e) {/* ignore errors */} 180 } 181 } 182 return s; 183 } 184 185 /** 186 * Sends a PING frame on this connection, and completes the returned 187 * CF when the PING ack is received. The CF is given 188 * an integer, whose value is the number of milliseconds 189 * between PING and ACK. 190 */ 191 CompletableFuture<Long> sendPing() { 192 PingRequest ping = null; 193 try { 194 ping = new PingRequest(); 195 pings.add(ping); 196 outputQ.put(ping.frame()); 197 } catch (Throwable t) { 198 ping.fail(t); 199 } 200 return ping.response(); 201 } 202 203 void goAway(int error) throws IOException { 204 int laststream = nextstream >= 3 ? nextstream - 2 : 1; 205 206 GoAwayFrame go = new GoAwayFrame(laststream, error); 207 outputQ.put(go); 208 } 209 210 /** 211 * Returns the first PingRequest from Queue 212 */ 213 private PingRequest getNextRequest() { 214 return pings.poll(); 215 } 216 217 /** 218 * Handles incoming Ping, which could be an ack 219 * or a client originated Ping 220 */ 221 void handlePing(PingFrame ping) throws IOException { 222 if (ping.streamid() != 0) { 223 System.err.println("Invalid ping received"); 224 close(ErrorFrame.PROTOCOL_ERROR); 225 return; 226 } 227 if (ping.getFlag(PingFrame.ACK)) { 228 // did we send a Ping? 229 PingRequest request = getNextRequest(); 230 if (request == null) { 231 System.err.println("Invalid ping ACK received"); 232 close(ErrorFrame.PROTOCOL_ERROR); 233 return; 234 } else if (!Arrays.equals(request.pingData, ping.getData())) { 235 request.fail(new RuntimeException("Wrong ping data in ACK")); 236 } else { 237 request.success(); 238 } 239 } else { 240 // client originated PING. Just send it back with ACK set 241 ping.setFlag(PingFrame.ACK); 242 outputQ.put(ping); 243 } 244 } 245 246 private static boolean compareIPAddrs(InetAddress addr1, String host) { 247 try { 248 InetAddress addr2 = InetAddress.getByName(host); 249 return addr1.equals(addr2); 250 } catch (IOException e) { 251 throw new UncheckedIOException(e); 252 } 253 } 254 255 private static void handshake(String name, SSLSocket sock) throws IOException { 256 if (name == null) { 257 sock.getSession(); // blocks until handshake done 258 return; 259 } else if (name.equals("localhost")) { 260 name = "localhost"; 261 } 262 final String fname = name; 263 final InetAddress addr1 = InetAddress.getByName(name); 264 SSLParameters params = sock.getSSLParameters(); 265 SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) { 266 public boolean matches (SNIServerName n) { 267 String host = ((SNIHostName)n).getAsciiName(); 268 if (host.equals("localhost")) 269 host = "localhost"; 270 boolean cmp = host.equalsIgnoreCase(fname); 271 if (cmp) 272 return true; 273 return compareIPAddrs(addr1, host); 274 } 275 }; 276 List<SNIMatcher> list = List.of(matcher); 277 params.setSNIMatchers(list); 278 sock.setSSLParameters(params); 279 sock.getSession(); // blocks until handshake done 280 } 281 282 void closeIncoming() { 283 close(-1); 284 } 285 286 void close(int error) { 287 if (stopping) 288 return; 289 stopping = true; 290 System.err.printf("Server connection to %s stopping. %d streams\n", 291 socket.getRemoteSocketAddress().toString(), streams.size()); 292 streams.forEach((i, q) -> { 293 q.orderlyClose(); 294 }); 295 try { 296 if (error != -1) 297 goAway(error); 298 outputQ.orderlyClose(); 299 socket.close(); 300 } catch (Exception e) { 301 } 302 } 303 304 private void readPreface() throws IOException { 305 int len = clientPreface.length; 306 byte[] bytes = new byte[len]; 307 is.readNBytes(bytes, 0, len); 308 if (Arrays.compare(clientPreface, bytes) != 0) { 309 throw new IOException("Invalid preface: " + new String(bytes, 0, len)); 310 } 311 } 312 313 Http1InitialRequest doUpgrade(Http1InitialRequest upgrade) throws IOException { 314 String h2c = getHeader(upgrade.headers, "Upgrade"); 315 if (h2c == null || !h2c.equals("h2c")) { 316 System.err.println("Server:HEADERS: " + upgrade); 317 throw new IOException("Bad upgrade 1 " + h2c); 318 } 319 320 sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade", 321 "Upgrade", "h2c"); 322 323 sendSettingsFrame(); 324 readPreface(); 325 326 String clientSettingsString = getHeader(upgrade.headers, "HTTP2-Settings"); 327 clientSettings = getSettingsFromString(clientSettingsString); 328 329 return upgrade; 330 } 331 332 /** 333 * Decodes the given, Client, settings payload provided in base64 HTTP1 334 * header value. 335 */ 336 private SettingsFrame getSettingsFromString(String s) throws IOException { 337 Base64.Decoder decoder = Base64.getUrlDecoder(); 338 byte[] payload = decoder.decode(s); 339 ByteBuffer bb1 = ByteBuffer.wrap(payload); 340 // simulate header of Settings Frame 341 ByteBuffer bb0 = ByteBuffer.wrap( 342 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); 343 List<Http2Frame> frames = new ArrayList<>(); 344 FramesDecoder reader = new FramesDecoder(frames::add); 345 reader.decode(bb0); 346 reader.decode(bb1); 347 if (frames.size()!=1) 348 throw new IOException("Expected 1 frame got "+frames.size()) ; 349 Http2Frame frame = frames.get(0); 350 if (!(frame instanceof SettingsFrame)) 351 throw new IOException("Expected SettingsFrame"); 352 return (SettingsFrame)frame; 353 } 354 355 public int getMaxFrameSize() { 356 return clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE); 357 } 358 359 private void standardHTTP11Response(Http1InitialRequest request) 360 throws IOException 361 { 362 String upgradeHeader = getHeader(request.headers, "Upgrade"); 363 if (upgradeHeader != null) { 364 throw new IOException("Unexpected Upgrade header:" + upgradeHeader); 365 } 366 367 sendHttp1Response(200, "OK", 368 "Connection", "close", 369 "X-Magic", "HTTP/1.1 request received by HTTP/2 server", 370 "X-Received-Body", new String(request.body, UTF_8)); 371 372 } 373 374 void run() throws Exception { 375 Http1InitialRequest upgrade = null; 376 if (!secure) { 377 Http1InitialRequest request = readHttp1Request(); 378 String h2c = getHeader(request.headers, "Upgrade"); 379 if (h2c == null || !h2c.equals("h2c")) { 380 if (server.supportsHTTP11) { 381 standardHTTP11Response(request); 382 socket.close(); 383 return; 384 } else { 385 System.err.println("Server:HEADERS: " + upgrade); 386 throw new IOException("Bad upgrade 1 " + h2c); 387 } 388 } 389 upgrade = doUpgrade(request); 390 } else { // secure 391 SSLSocket sslSocket = (SSLSocket)socket; 392 if (sslSocket.getApplicationProtocol().equals("h2")) { 393 readPreface(); 394 sendSettingsFrame(true); 395 clientSettings = (SettingsFrame) readFrame(); 396 if (clientSettings.getFlag(SettingsFrame.ACK)) { 397 // we received the ack to our frame first 398 clientSettings = (SettingsFrame) readFrame(); 399 } 400 nextstream = 1; 401 } else if (sslSocket.getApplicationProtocol().equals("http/1.1") || 402 sslSocket.getApplicationProtocol().equals("")) { 403 standardHTTP11Response(readHttp1Request()); 404 socket.shutdownOutput(); 405 socket.close(); 406 return; 407 } else { 408 throw new IOException("Unexpected ALPN:" + sslSocket.getApplicationProtocol()); 409 } 410 } 411 412 // Uncomment if needed, but very noisy 413 //System.out.println("ServerSettings: " + serverSettings); 414 //System.out.println("ClientSettings: " + clientSettings); 415 416 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 417 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 418 419 if (!secure) { 420 createPrimordialStream(upgrade); 421 nextstream = 3; 422 } 423 424 (new ConnectionThread("readLoop", this::readLoop)).start(); 425 (new ConnectionThread("writeLoop", this::writeLoop)).start(); 426 } 427 428 class ConnectionThread extends Thread { 429 final Runnable r; 430 ConnectionThread(String name, Runnable r) { 431 setName(name); 432 setDaemon(true); 433 this.r = r; 434 } 435 436 public void run() { 437 r.run(); 438 } 439 } 440 441 private void writeFrame(Http2Frame frame) throws IOException { 442 List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame); 443 //System.err.println("TestServer: Writing frame " + frame.toString()); 444 int c = 0; 445 for (ByteBuffer buf : bufs) { 446 byte[] ba = buf.array(); 447 int start = buf.arrayOffset() + buf.position(); 448 c += buf.remaining(); 449 os.write(ba, start, buf.remaining()); 450 451 // System.out.println("writing byte at a time"); 452 // while (buf.hasRemaining()) { 453 // byte b = buf.get(); 454 // os.write(b); 455 // os.flush(); 456 // try { 457 // Thread.sleep(1); 458 // } catch(InterruptedException e) { 459 // UncheckedIOException uie = new UncheckedIOException(new IOException("")); 460 // uie.addSuppressed(e); 461 // throw uie; 462 // } 463 // } 464 } 465 os.flush(); 466 //System.err.printf("TestServer: wrote %d bytes\n", c); 467 } 468 469 private void handleCommonFrame(Http2Frame f) throws IOException { 470 if (f instanceof SettingsFrame) { 471 SettingsFrame sf = (SettingsFrame) f; 472 if (sf.getFlag(SettingsFrame.ACK)) // ignore 473 { 474 return; 475 } 476 // otherwise acknowledge it 477 clientSettings = sf; 478 SettingsFrame frame = new SettingsFrame(); 479 frame.setFlag(SettingsFrame.ACK); 480 frame.streamid(0); 481 outputQ.put(frame); 482 return; 483 } else if (f instanceof GoAwayFrame) { 484 System.err.println("Closing: "+ f.toString()); 485 close(ErrorFrame.NO_ERROR); 486 } else if (f instanceof PingFrame) { 487 handlePing((PingFrame)f); 488 } else 489 throw new UnsupportedOperationException("Not supported yet: " + f.toString()); 490 } 491 492 void sendWindowUpdates(int len, int streamid) throws IOException { 493 if (len == 0) 494 return; 495 WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len); 496 outputQ.put(wup); 497 wup = new WindowUpdateFrame(0 , len); 498 outputQ.put(wup); 499 } 500 501 HttpHeaders decodeHeaders(List<HeaderFrame> frames) throws IOException { 502 HttpHeadersBuilder headersBuilder = createNewHeadersBuilder(); 503 504 DecodingCallback cb = (name, value) -> { 505 headersBuilder.addHeader(name.toString(), value.toString()); 506 }; 507 508 for (HeaderFrame frame : frames) { 509 List<ByteBuffer> buffers = frame.getHeaderBlock(); 510 for (ByteBuffer buffer : buffers) { 511 hpackIn.decode(buffer, false, cb); 512 } 513 } 514 hpackIn.decode(EMPTY_BUFFER, true, cb); 515 return headersBuilder.build(); 516 } 517 518 String getRequestLine(String request) { 519 int eol = request.indexOf(CRLF); 520 return request.substring(0, eol); 521 } 522 523 String getHeaders(String request) { 524 int start = request.indexOf(CRLF); 525 int end = request.indexOf(CRLFCRLF); 526 if (start == -1 || end == -1) { 527 throw new RuntimeException("Malformed request"); 528 } 529 return request.substring(start,end); 530 } 531 532 static void addHeaders(String headersString, HttpHeadersBuilder headersBuilder) { 533 String[] hh = headersString.split(CRLF); 534 for (String header : hh) { 535 int colon = header.indexOf(':'); 536 if (colon == -1) 537 continue; 538 String name = header.substring(0, colon); 539 String value = header.substring(colon+1); 540 while (value.startsWith(" ")) 541 value = value.substring(1); 542 headersBuilder.addHeader(name, value); 543 } 544 } 545 546 // First stream (1) comes from a plaintext HTTP/1.1 request 547 @SuppressWarnings({"rawtypes","unchecked"}) 548 void createPrimordialStream(Http1InitialRequest request) throws IOException { 549 HttpHeadersBuilder headersBuilder = createNewHeadersBuilder(); 550 String requestLine = getRequestLine(request.headers); 551 String[] tokens = requestLine.split(" "); 552 if (!tokens[2].equals("HTTP/1.1")) { 553 throw new IOException("bad request line"); 554 } 555 URI uri; 556 try { 557 uri = new URI(tokens[1]); 558 } catch (URISyntaxException e) { 559 throw new IOException(e); 560 } 561 String host = getHeader(request.headers, "Host"); 562 if (host == null) { 563 throw new IOException("missing Host"); 564 } 565 566 headersBuilder.setHeader(":method", tokens[0]); 567 headersBuilder.setHeader(":scheme", "http"); // always in this case 568 headersBuilder.setHeader(":authority", host); 569 String path = uri.getRawPath(); 570 if (uri.getRawQuery() != null) 571 path = path + "?" + uri.getRawQuery(); 572 headersBuilder.setHeader(":path", path); 573 574 Queue q = new Queue(sentinel); 575 byte[] body = getRequestBody(request); 576 addHeaders(getHeaders(request.headers), headersBuilder); 577 headersBuilder.setHeader("Content-length", Integer.toString(body.length)); 578 HttpHeaders headers = headersBuilder.build(); 579 580 addRequestBodyToQueue(body, q); 581 streams.put(1, q); 582 exec.submit(() -> { 583 handleRequest(headers, q, 1, true /*complete request has been read*/); 584 }); 585 } 586 587 // all other streams created here 588 @SuppressWarnings({"rawtypes","unchecked"}) 589 void createStream(HeaderFrame frame) throws IOException { 590 List<HeaderFrame> frames = new LinkedList<>(); 591 frames.add(frame); 592 int streamid = frame.streamid(); 593 if (streamid != nextstream) { 594 throw new IOException("unexpected stream id"); 595 } 596 nextstream += 2; 597 598 boolean endStream = false; 599 if (frame.getFlag(HeaderFrame.END_STREAM)) { 600 endStream = true; 601 } 602 603 while (!frame.getFlag(HeaderFrame.END_HEADERS)) { 604 Http2Frame f = readFrame(); 605 if (!(f instanceof HeaderFrame)) { 606 handleCommonFrame(f); // should only be error frames 607 } else { 608 frame = (HeaderFrame) f; 609 if (frame.getFlag(HeaderFrame.END_STREAM)) { 610 endStream = true; 611 } 612 frames.add(frame); 613 } 614 } 615 boolean endStreamReceived = endStream; 616 HttpHeaders headers = decodeHeaders(frames); 617 618 // Strict to assert Client correctness. Not all servers are as strict, 619 // but some are known to be. 620 Optional<?> disallowedHeader = headers.firstValue("Upgrade"); 621 if (disallowedHeader.isPresent()) { 622 throw new IOException("Unexpected Upgrade in headers:" + headers); 623 } 624 disallowedHeader = headers.firstValue("HTTP2-Settings"); 625 if (disallowedHeader.isPresent()) 626 throw new IOException("Unexpected HTTP2-Settings in headers:" + headers); 627 628 629 Queue q = new Queue(sentinel); 630 streams.put(streamid, q); 631 exec.submit(() -> { 632 handleRequest(headers, q, streamid, endStreamReceived); 633 }); 634 } 635 636 // runs in own thread. Handles request from start to finish. Incoming frames 637 // for this stream/request delivered on Q 638 639 @SuppressWarnings({"rawtypes","unchecked"}) 640 void handleRequest(HttpHeaders headers, 641 Queue queue, 642 int streamid, 643 boolean endStreamReceived) 644 { 645 String method = headers.firstValue(":method").orElse(""); 646 //System.out.println("method = " + method); 647 String path = headers.firstValue(":path").orElse(""); 648 //System.out.println("path = " + path); 649 String scheme = headers.firstValue(":scheme").orElse(""); 650 //System.out.println("scheme = " + scheme); 651 String authority = headers.firstValue(":authority").orElse(""); 652 //System.out.println("authority = " + authority); 653 System.err.printf("TestServer: %s %s\n", method, path); 654 int winsize = clientSettings.getParameter( 655 SettingsFrame.INITIAL_WINDOW_SIZE); 656 //System.err.println ("Stream window size = " + winsize); 657 658 final InputStream bis; 659 if (endStreamReceived && queue.size() == 0) { 660 System.err.println("Server: got END_STREAM for stream " + streamid); 661 bis = NullInputStream.INSTANCE; 662 } else { 663 System.err.println("Server: creating input stream for stream " + streamid); 664 bis = new BodyInputStream(queue, streamid, this); 665 } 666 try (bis; 667 BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) 668 { 669 outStreams.put(streamid, bos); 670 String us = scheme + "://" + authority + path; 671 URI uri = new URI(us); 672 boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; 673 HttpHeadersBuilder rspheadersBuilder = createNewHeadersBuilder(); 674 Http2TestExchange exchange = exchangeSupplier.get(streamid, method, 675 headers, rspheadersBuilder, uri, bis, getSSLSession(), 676 bos, this, pushAllowed); 677 678 // give to user 679 Http2Handler handler = server.getHandlerFor(uri.getPath()); 680 try { 681 handler.handle(exchange); 682 } catch (IOException closed) { 683 if (bos.closed) { 684 Queue q = streams.get(streamid); 685 if (q != null && (q.isClosed() || q.isClosing())) { 686 System.err.println("TestServer: Stream " + streamid + " closed: " + closed); 687 return; 688 } 689 } 690 throw closed; 691 } 692 693 // everything happens in the exchange from here. Hopefully will 694 // return though. 695 } catch (Throwable e) { 696 System.err.println("TestServer: handleRequest exception: " + e); 697 e.printStackTrace(); 698 close(-1); 699 } 700 } 701 702 protected HttpHeadersBuilder createNewHeadersBuilder() { 703 return new HttpHeadersBuilder(); 704 } 705 706 private SSLSession getSSLSession() { 707 if (! (socket instanceof SSLSocket)) 708 return null; 709 SSLSocket ssl = (SSLSocket)socket; 710 return ssl.getSession(); 711 } 712 // Runs in own thread 713 714 @SuppressWarnings({"rawtypes","unchecked"}) 715 void readLoop() { 716 try { 717 while (!stopping) { 718 Http2Frame frame = readFrameImpl(); 719 if (frame == null) { 720 closeIncoming(); 721 return; 722 } 723 //System.err.printf("TestServer: received frame %s\n", frame); 724 int stream = frame.streamid(); 725 if (stream == 0) { 726 if (frame.type() == WindowUpdateFrame.TYPE) { 727 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 728 updateConnectionWindow(wup.getUpdate()); 729 } else { 730 // other common frame types 731 handleCommonFrame(frame); 732 } 733 } else { 734 Queue q = streams.get(stream); 735 if (frame.type() == HeadersFrame.TYPE) { 736 if (q != null) { 737 System.err.println("HEADERS frame for existing stream! Error."); 738 // TODO: close connection 739 continue; 740 } else { 741 createStream((HeadersFrame) frame); 742 } 743 } else { 744 if (q == null && !pushStreams.contains(stream)) { 745 System.err.printf("Non Headers frame received with"+ 746 " non existing stream (%d) ", frame.streamid()); 747 System.err.println(frame); 748 continue; 749 } 750 if (frame.type() == WindowUpdateFrame.TYPE) { 751 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 752 synchronized (updaters) { 753 Consumer<Integer> r = updaters.get(stream); 754 r.accept(wup.getUpdate()); 755 } 756 } else if (frame.type() == ResetFrame.TYPE) { 757 // do orderly close on input q 758 // and close the output q immediately 759 // This should mean depending on what the 760 // handler is doing: either an EOF on read 761 // or an IOException if writing the response. 762 if (q != null) { 763 q.orderlyClose(); 764 BodyOutputStream oq = outStreams.get(stream); 765 if (oq != null) 766 oq.closeInternal(); 767 } else if (pushStreams.contains(stream)) { 768 // we could interrupt the pushStream's output 769 // but the continuation, even after a reset 770 // should be handle gracefully by the client 771 // anyway. 772 } else { 773 System.err.println("TestServer: Unexpected frame on: " + stream); 774 System.err.println(frame); 775 throw new IOException("Unexpected frame"); 776 } 777 } else { 778 q.put(frame); 779 } 780 } 781 } 782 } 783 } catch (Throwable e) { 784 if (!stopping) { 785 System.err.println("Http server reader thread shutdown"); 786 e.printStackTrace(); 787 } 788 close(ErrorFrame.PROTOCOL_ERROR); 789 } 790 } 791 792 /** Encodes an group of headers, without any ordering guarantees. */ 793 List<ByteBuffer> encodeHeaders(HttpHeaders headers) { 794 List<ByteBuffer> buffers = new LinkedList<>(); 795 796 ByteBuffer buf = getBuffer(); 797 boolean encoded; 798 for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) { 799 List<String> values = entry.getValue(); 800 String key = entry.getKey().toLowerCase(); 801 for (String value : values) { 802 do { 803 hpackOut.header(key, value); 804 encoded = hpackOut.encode(buf); 805 if (!encoded) { 806 buf.flip(); 807 buffers.add(buf); 808 buf = getBuffer(); 809 } 810 } while (!encoded); 811 } 812 } 813 buf.flip(); 814 buffers.add(buf); 815 return buffers; 816 } 817 818 /** Encodes an ordered list of headers. */ 819 List<ByteBuffer> encodeHeadersOrdered(List<Map.Entry<String,String>> headers) { 820 List<ByteBuffer> buffers = new LinkedList<>(); 821 822 ByteBuffer buf = getBuffer(); 823 boolean encoded; 824 for (Map.Entry<String, String> entry : headers) { 825 String value = entry.getValue(); 826 String key = entry.getKey().toLowerCase(); 827 do { 828 hpackOut.header(key, value); 829 encoded = hpackOut.encode(buf); 830 if (!encoded) { 831 buf.flip(); 832 buffers.add(buf); 833 buf = getBuffer(); 834 } 835 } while (!encoded); 836 } 837 buf.flip(); 838 buffers.add(buf); 839 return buffers; 840 } 841 842 static void closeIgnore(Closeable c) { 843 try { 844 c.close(); 845 } catch (IOException e) {} 846 } 847 848 // Runs in own thread 849 void writeLoop() { 850 try { 851 while (!stopping) { 852 Http2Frame frame; 853 try { 854 frame = outputQ.take(); 855 if (stopping) 856 break; 857 } catch(IOException x) { 858 if (stopping && x.getCause() instanceof InterruptedException) { 859 break; 860 } else throw x; 861 } 862 if (frame instanceof ResponseHeaders) { 863 ResponseHeaders rh = (ResponseHeaders)frame; 864 HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers)); 865 writeFrame(hf); 866 } else if (frame instanceof OutgoingPushPromise) { 867 handlePush((OutgoingPushPromise)frame); 868 } else 869 writeFrame(frame); 870 } 871 System.err.println("TestServer: Connection writer stopping"); 872 } catch (Throwable e) { 873 e.printStackTrace(); 874 /*close(); 875 if (!stopping) { 876 e.printStackTrace(); 877 System.err.println("TestServer: writeLoop exception: " + e); 878 }*/ 879 } 880 } 881 882 private void handlePush(OutgoingPushPromise op) throws IOException { 883 int promisedStreamid = nextPushStreamId; 884 PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, 885 HeaderFrame.END_HEADERS, 886 promisedStreamid, 887 encodeHeaders(op.headers), 888 0); 889 pushStreams.add(promisedStreamid); 890 nextPushStreamId += 2; 891 pp.streamid(op.parentStream); 892 writeFrame(pp); 893 final InputStream ii = op.is; 894 final BodyOutputStream oo = new BodyOutputStream( 895 promisedStreamid, 896 clientSettings.getParameter( 897 SettingsFrame.INITIAL_WINDOW_SIZE), this); 898 outStreams.put(promisedStreamid, oo); 899 oo.goodToGo(); 900 exec.submit(() -> { 901 try { 902 ResponseHeaders oh = getPushResponse(promisedStreamid); 903 outputQ.put(oh); 904 ii.transferTo(oo); 905 } catch (Throwable ex) { 906 System.err.printf("TestServer: pushing response error: %s\n", 907 ex.toString()); 908 } finally { 909 closeIgnore(ii); 910 closeIgnore(oo); 911 } 912 }); 913 914 } 915 916 // returns a minimal response with status 200 917 // that is the response to the push promise just sent 918 private ResponseHeaders getPushResponse(int streamid) { 919 HttpHeadersBuilder hb = createNewHeadersBuilder(); 920 hb.addHeader(":status", "200"); 921 ResponseHeaders oh = new ResponseHeaders(hb.build()); 922 oh.streamid(streamid); 923 oh.setFlag(HeaderFrame.END_HEADERS); 924 return oh; 925 } 926 927 private ByteBuffer getBuffer() { 928 return ByteBuffer.allocate(8 * 1024); 929 } 930 931 private Http2Frame readFrame() throws IOException { 932 Http2Frame f = readFrameImpl(); 933 if (f == null) 934 throw new IOException("connection closed"); 935 return f; 936 } 937 938 // does not throw an exception for EOF 939 private Http2Frame readFrameImpl() throws IOException { 940 try { 941 byte[] buf = new byte[9]; 942 int ret; 943 ret=is.readNBytes(buf, 0, 9); 944 if (ret == 0) { 945 return null; 946 } else if (ret != 9) { 947 throw new IOException("readFrame: connection closed"); 948 } 949 int len = 0; 950 for (int i = 0; i < 3; i++) { 951 int n = buf[i] & 0xff; 952 //System.err.println("n = " + n); 953 len = (len << 8) + n; 954 } 955 byte[] rest = new byte[len]; 956 int n = is.readNBytes(rest, 0, len); 957 if (n != len) 958 throw new IOException("Error reading frame"); 959 List<Http2Frame> frames = new ArrayList<>(); 960 FramesDecoder reader = new FramesDecoder(frames::add); 961 reader.decode(ByteBuffer.wrap(buf)); 962 reader.decode(ByteBuffer.wrap(rest)); 963 if (frames.size()!=1) 964 throw new IOException("Expected 1 frame got "+frames.size()) ; 965 966 return frames.get(0); 967 } catch (IOException ee) { 968 if (stopping) 969 return null; 970 throw ee; 971 } 972 } 973 974 void sendSettingsFrame() throws IOException { 975 sendSettingsFrame(false); 976 } 977 978 void sendSettingsFrame(boolean now) throws IOException { 979 if (now) { 980 writeFrame(serverSettings); 981 } else { 982 outputQ.put(serverSettings); 983 } 984 } 985 986 String readUntil(String end) throws IOException { 987 int number = end.length(); 988 int found = 0; 989 StringBuilder sb = new StringBuilder(); 990 while (found < number) { 991 char expected = end.charAt(found); 992 int c = is.read(); 993 if (c == -1) { 994 throw new IOException("Connection closed"); 995 } 996 char c0 = (char) c; 997 sb.append(c0); 998 if (c0 != expected) { 999 found = 0; 1000 continue; 1001 } 1002 found++; 1003 } 1004 return sb.toString(); 1005 } 1006 1007 private int getContentLength(String headers) { 1008 return getIntHeader(headers, "Content-length"); 1009 } 1010 1011 private int getIntHeader(String headers, String name) { 1012 String val = getHeader(headers, name); 1013 if (val == null) { 1014 return -1; 1015 } 1016 return Integer.parseInt(val); 1017 } 1018 1019 private String getHeader(String headers, String name) { 1020 String headers1 = headers.toLowerCase(); // not efficient 1021 name = CRLF + name.toLowerCase(); 1022 int start = headers1.indexOf(name); 1023 if (start == -1) { 1024 return null; 1025 } 1026 start += 2; 1027 int end = headers1.indexOf(CRLF, start); 1028 String line = headers.substring(start, end); 1029 start = line.indexOf(':'); 1030 if (start == -1) { 1031 return null; 1032 } 1033 return line.substring(start + 1).trim(); 1034 } 1035 1036 final static String CRLF = "\r\n"; 1037 final static String CRLFCRLF = "\r\n\r\n"; 1038 1039 static class Http1InitialRequest { 1040 final String headers; 1041 final byte[] body; 1042 Http1InitialRequest(String headers, byte[] body) { 1043 this.headers = headers; 1044 this.body = body.clone(); 1045 } 1046 } 1047 1048 Http1InitialRequest readHttp1Request() throws IOException { 1049 String headers = readUntil(CRLF + CRLF); 1050 int clen = getContentLength(headers); 1051 String te = getHeader(headers, "Transfer-encoding"); 1052 byte[] buf = new byte[0]; 1053 try { 1054 if (clen >= 0) { 1055 // HTTP/1.1 fixed length content ( may be 0 ), read it 1056 buf = new byte[clen]; 1057 is.readNBytes(buf, 0, clen); 1058 } else if ("chunked".equalsIgnoreCase(te)) { 1059 // HTTP/1.1 chunked data, read it 1060 buf = readChunkedInputStream(is); 1061 } 1062 return new Http1InitialRequest(headers, buf); 1063 } catch (IOException e) { 1064 System.err.println("TestServer: headers read: [ " + headers + " ]"); 1065 throw e; 1066 } 1067 } 1068 1069 // This is a quick hack to get a chunked input stream reader. 1070 private static byte[] readChunkedInputStream(InputStream is) throws IOException { 1071 ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null); 1072 return cis.readAllBytes(); 1073 } 1074 1075 void sendHttp1Response(int code, String msg, String... headers) throws IOException { 1076 StringBuilder sb = new StringBuilder(); 1077 sb.append("HTTP/1.1 ") 1078 .append(code) 1079 .append(' ') 1080 .append(msg) 1081 .append(CRLF); 1082 int numheaders = headers.length; 1083 for (int i = 0; i < numheaders; i += 2) { 1084 sb.append(headers[i]) 1085 .append(": ") 1086 .append(headers[i + 1]) 1087 .append(CRLF); 1088 } 1089 sb.append(CRLF); 1090 String s = sb.toString(); 1091 os.write(s.getBytes("US-ASCII")); 1092 os.flush(); 1093 } 1094 1095 private void unexpectedFrame(Http2Frame frame) { 1096 System.err.println("OOPS. Unexpected"); 1097 assert false; 1098 } 1099 1100 final static ByteBuffer[] bbarray = new ByteBuffer[0]; 1101 1102 // wrapper around a BlockingQueue that throws an exception when it's closed 1103 // Each stream has one of these 1104 1105 byte[] getRequestBody(Http1InitialRequest request) { 1106 return request.body; 1107 } 1108 1109 @SuppressWarnings({"rawtypes","unchecked"}) 1110 void addRequestBodyToQueue(byte[] body, Queue q) throws IOException { 1111 ByteBuffer buf = ByteBuffer.wrap(body); 1112 DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf); 1113 // only used for primordial stream 1114 q.put(df); 1115 } 1116 1117 // window updates done in main reader thread because they may 1118 // be used to unblock BodyOutputStreams waiting for WUPs 1119 1120 HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>(); 1121 1122 void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) { 1123 synchronized(updaters) { 1124 updaters.put(streamid, r); 1125 } 1126 } 1127 1128 int sendWindow = 64 * 1024 - 1; // connection level send window 1129 1130 /** 1131 * BodyOutputStreams call this to get the connection window first. 1132 * 1133 * @param amount 1134 */ 1135 synchronized void obtainConnectionWindow(int amount) throws InterruptedException { 1136 while (amount > 0) { 1137 int n = Math.min(amount, sendWindow); 1138 amount -= n; 1139 sendWindow -= n; 1140 if (amount > 0) 1141 wait(); 1142 } 1143 } 1144 1145 synchronized void updateConnectionWindow(int amount) { 1146 sendWindow += amount; 1147 notifyAll(); 1148 } 1149 1150 // simplified output headers class. really just a type safe container 1151 // for the hashmap. 1152 1153 static class ResponseHeaders extends Http2Frame { 1154 HttpHeaders headers; 1155 1156 ResponseHeaders(HttpHeaders headers) { 1157 super(0, 0); 1158 this.headers = headers; 1159 } 1160 1161 } 1162 1163 static class NullInputStream extends InputStream { 1164 static final NullInputStream INSTANCE = new NullInputStream(); 1165 private NullInputStream() {} 1166 public int read() { return -1; } 1167 public int available() { return 0; } 1168 } 1169 }