1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.BufferedInputStream; 25 import java.io.BufferedOutputStream; 26 import java.io.Closeable; 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.io.InputStream; 30 import java.io.OutputStream; 31 import java.net.Socket; 32 import java.net.URI; 33 import java.net.InetAddress; 34 import javax.net.ssl.*; 35 import java.net.URISyntaxException; 36 import java.nio.ByteBuffer; 37 import java.nio.charset.StandardCharsets; 38 import java.util.*; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.ConcurrentLinkedQueue; 42 import java.util.function.Consumer; 43 import jdk.incubator.http.internal.common.HttpHeadersImpl; 44 import jdk.incubator.http.internal.frame.*; 45 import jdk.incubator.http.internal.hpack.Decoder; 46 import jdk.incubator.http.internal.hpack.DecodingCallback; 47 import jdk.incubator.http.internal.hpack.Encoder; 48 import sun.net.www.http.ChunkedInputStream; 49 import sun.net.www.http.HttpClient; 50 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; 51 52 /** 53 * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1 54 * or HTTPS opened using "h2" ALPN. 55 */ 56 public class Http2TestServerConnection { 57 final Http2TestServer server; 58 @SuppressWarnings({"rawtypes","unchecked"}) 59 final Map<Integer, Queue> streams; // input q per stream 60 final Map<Integer, BodyOutputStream> outStreams; // output q per stream 61 final HashSet<Integer> pushStreams; 62 final Queue<Http2Frame> outputQ; 63 volatile int nextstream; 64 final Socket socket; 65 final Http2TestExchangeSupplier exchangeSupplier; 66 final InputStream is; 67 final OutputStream os; 68 volatile Encoder hpackOut; 69 volatile Decoder hpackIn; 70 volatile SettingsFrame clientSettings; 71 final SettingsFrame serverSettings; 72 final ExecutorService exec; 73 final boolean secure; 74 volatile boolean stopping; 75 volatile int nextPushStreamId = 2; 76 ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>(); 77 78 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); 79 final static byte[] EMPTY_BARRAY = new byte[0]; 80 final Random random; 81 82 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); 83 84 static class Sentinel extends Http2Frame { 85 Sentinel() { super(-1,-1);} 86 } 87 88 class PingRequest { 89 final byte[] pingData; 90 final long pingStamp; 91 final CompletableFuture<Long> response; 92 93 PingRequest() { 94 pingData = new byte[8]; 95 random.nextBytes(pingData); 96 pingStamp = System.currentTimeMillis(); 97 response = new CompletableFuture<>(); 98 } 99 100 PingFrame frame() { 101 return new PingFrame(0, pingData); 102 } 103 104 CompletableFuture<Long> response() { 105 return response; 106 } 107 108 void success() { 109 response.complete(System.currentTimeMillis() - pingStamp); 110 } 111 112 void fail(Throwable t) { 113 response.completeExceptionally(t); 114 } 115 } 116 117 static Sentinel sentinel; 118 119 Http2TestServerConnection(Http2TestServer server, 120 Socket socket, 121 Http2TestExchangeSupplier exchangeSupplier) 122 throws IOException 123 { 124 if (socket instanceof SSLSocket) { 125 handshake(server.serverName(), (SSLSocket)socket); 126 } 127 System.err.println("TestServer: New connection from " + socket); 128 this.server = server; 129 this.exchangeSupplier = exchangeSupplier; 130 this.streams = Collections.synchronizedMap(new HashMap<>()); 131 this.outStreams = Collections.synchronizedMap(new HashMap<>()); 132 this.outputQ = new Queue<>(sentinel); 133 this.random = new Random(); 134 this.socket = socket; 135 this.socket.setTcpNoDelay(true); 136 this.serverSettings = SettingsFrame.getDefaultSettings(); 137 this.exec = server.exec; 138 this.secure = server.secure; 139 this.pushStreams = new HashSet<>(); 140 is = new BufferedInputStream(socket.getInputStream()); 141 os = new BufferedOutputStream(socket.getOutputStream()); 142 } 143 144 /** 145 * Sends a PING frame on this connection, and completes the returned 146 * CF when the PING ack is received. The CF is given 147 * an integer, whose value is the number of milliseconds 148 * between PING and ACK. 149 */ 150 CompletableFuture<Long> sendPing() { 151 PingRequest ping = null; 152 try { 153 ping = new PingRequest(); 154 pings.add(ping); 155 outputQ.put(ping.frame()); 156 } catch (Throwable t) { 157 ping.fail(t); 158 } 159 return ping.response(); 160 } 161 162 /** 163 * Returns the first PingRequest from Queue 164 */ 165 private PingRequest getNextRequest() { 166 return pings.poll(); 167 } 168 169 /** 170 * Handles incoming Ping, which could be an ack 171 * or a client originated Ping 172 */ 173 void handlePing(PingFrame ping) throws IOException { 174 if (ping.streamid() != 0) { 175 System.err.println("Invalid ping received"); 176 close(); 177 return; 178 } 179 if (ping.getFlag(PingFrame.ACK)) { 180 // did we send a Ping? 181 PingRequest request = getNextRequest(); 182 if (request == null) { 183 System.err.println("Invalid ping ACK received"); 184 close(); 185 return; 186 } else if (!Arrays.equals(request.pingData, ping.getData())) { 187 request.fail(new RuntimeException("Wrong ping data in ACK")); 188 } else { 189 request.success(); 190 } 191 } else { 192 // client originated PING. Just send it back with ACK set 193 ping.setFlag(PingFrame.ACK); 194 outputQ.put(ping); 195 } 196 } 197 198 private static boolean compareIPAddrs(InetAddress addr1, String host) { 199 try { 200 InetAddress addr2 = InetAddress.getByName(host); 201 return addr1.equals(addr2); 202 } catch (IOException e) { 203 throw new UncheckedIOException(e); 204 } 205 } 206 207 private static void handshake(String name, SSLSocket sock) throws IOException { 208 if (name == null) { 209 // no name set. No need to check 210 return; 211 } else if (name.equals("127.0.0.1")) { 212 name = "localhost"; 213 } 214 final String fname = name; 215 final InetAddress addr1 = InetAddress.getByName(name); 216 SSLParameters params = sock.getSSLParameters(); 217 SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) { 218 public boolean matches (SNIServerName n) { 219 String host = ((SNIHostName)n).getAsciiName(); 220 if (host.equals("127.0.0.1")) 221 host = "localhost"; 222 boolean cmp = host.equalsIgnoreCase(fname); 223 if (cmp) 224 return true; 225 return compareIPAddrs(addr1, host); 226 } 227 }; 228 List<SNIMatcher> list = List.of(matcher); 229 params.setSNIMatchers(list); 230 sock.setSSLParameters(params); 231 sock.getSession(); // blocks until handshake done 232 } 233 234 void close() { 235 stopping = true; 236 streams.forEach((i, q) -> { 237 q.close(); 238 }); 239 try { 240 socket.close(); 241 // TODO: put a reset on each stream 242 } catch (IOException e) { 243 } 244 } 245 246 private void readPreface() throws IOException { 247 int len = clientPreface.length; 248 byte[] bytes = new byte[len]; 249 is.readNBytes(bytes, 0, len); 250 if (Arrays.compare(clientPreface, bytes) != 0) { 251 throw new IOException("Invalid preface: " + new String(bytes, 0, len)); 252 } 253 } 254 255 String doUpgrade() throws IOException { 256 String upgrade = readHttp1Request(); 257 String h2c = getHeader(upgrade, "Upgrade"); 258 if (h2c == null || !h2c.equals("h2c")) { 259 System.err.println("Server:HEADERS: " + upgrade); 260 throw new IOException("Bad upgrade 1 " + h2c); 261 } 262 263 sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade", 264 "Upgrade", "h2c"); 265 266 sendSettingsFrame(); 267 readPreface(); 268 269 String clientSettingsString = getHeader(upgrade, "HTTP2-Settings"); 270 clientSettings = getSettingsFromString(clientSettingsString); 271 272 return upgrade; 273 } 274 275 /** 276 * Decodes the given, Client, settings payload provided in base64 HTTP1 277 * header value. 278 */ 279 private SettingsFrame getSettingsFromString(String s) throws IOException { 280 Base64.Decoder decoder = Base64.getUrlDecoder(); 281 byte[] payload = decoder.decode(s); 282 ByteBuffer bb1 = ByteBuffer.wrap(payload); 283 // simulate header of Settings Frame 284 ByteBuffer bb0 = ByteBuffer.wrap( 285 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); 286 List<Http2Frame> frames = new ArrayList<>(); 287 FramesDecoder reader = new FramesDecoder(frames::add); 288 reader.decode(bb0); 289 reader.decode(bb1); 290 if (frames.size()!=1) 291 throw new IOException("Expected 1 frame got "+frames.size()) ; 292 Http2Frame frame = frames.get(0); 293 if (!(frame instanceof SettingsFrame)) 294 throw new IOException("Expected SettingsFrame"); 295 return (SettingsFrame)frame; 296 } 297 298 void run() throws Exception { 299 String upgrade = null; 300 if (!secure) { 301 upgrade = doUpgrade(); 302 } else { 303 readPreface(); 304 sendSettingsFrame(true); 305 clientSettings = (SettingsFrame) readFrame(); 306 if (clientSettings.getFlag(SettingsFrame.ACK)) { 307 // we received the ack to our frame first 308 clientSettings = (SettingsFrame) readFrame(); 309 } 310 nextstream = 1; 311 } 312 313 System.out.println("ServerSettings: " + serverSettings); 314 System.out.println("ClientSettings: " + clientSettings); 315 316 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 317 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 318 319 if (!secure) { 320 createPrimordialStream(upgrade); 321 nextstream = 3; 322 } 323 324 exec.submit(this::readLoop); 325 exec.submit(this::writeLoop); 326 } 327 328 private void writeFrame(Http2Frame frame) throws IOException { 329 List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame); 330 //System.err.println("TestServer: Writing frame " + frame.toString()); 331 int c = 0; 332 for (ByteBuffer buf : bufs) { 333 byte[] ba = buf.array(); 334 int start = buf.arrayOffset() + buf.position(); 335 c += buf.remaining(); 336 os.write(ba, start, buf.remaining()); 337 338 // System.out.println("writing byte at a time"); 339 // while (buf.hasRemaining()) { 340 // byte b = buf.get(); 341 // os.write(b); 342 // os.flush(); 343 // try { 344 // Thread.sleep(1); 345 // } catch(InterruptedException e) { 346 // UncheckedIOException uie = new UncheckedIOException(new IOException("")); 347 // uie.addSuppressed(e); 348 // throw uie; 349 // } 350 // } 351 } 352 os.flush(); 353 //System.err.printf("TestServer: wrote %d bytes\n", c); 354 } 355 356 private void handleCommonFrame(Http2Frame f) throws IOException { 357 if (f instanceof SettingsFrame) { 358 SettingsFrame sf = (SettingsFrame) f; 359 if (sf.getFlag(SettingsFrame.ACK)) // ignore 360 { 361 return; 362 } 363 // otherwise acknowledge it 364 clientSettings = sf; 365 SettingsFrame frame = new SettingsFrame(); 366 frame.setFlag(SettingsFrame.ACK); 367 frame.streamid(0); 368 outputQ.put(frame); 369 return; 370 } else if (f instanceof GoAwayFrame) { 371 System.err.println("Closing: "+ f.toString()); 372 close(); 373 } else if (f instanceof PingFrame) { 374 handlePing((PingFrame)f); 375 } else 376 throw new UnsupportedOperationException("Not supported yet: " + f.toString()); 377 } 378 379 void sendWindowUpdates(int len, int streamid) throws IOException { 380 if (len == 0) 381 return; 382 WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len); 383 outputQ.put(wup); 384 wup = new WindowUpdateFrame(0 , len); 385 outputQ.put(wup); 386 } 387 388 HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException { 389 HttpHeadersImpl headers = new HttpHeadersImpl(); 390 391 DecodingCallback cb = (name, value) -> { 392 headers.addHeader(name.toString(), value.toString()); 393 }; 394 395 for (HeaderFrame frame : frames) { 396 List<ByteBuffer> buffers = frame.getHeaderBlock(); 397 for (ByteBuffer buffer : buffers) { 398 hpackIn.decode(buffer, false, cb); 399 } 400 } 401 hpackIn.decode(EMPTY_BUFFER, true, cb); 402 return headers; 403 } 404 405 String getRequestLine(String request) { 406 int eol = request.indexOf(CRLF); 407 return request.substring(0, eol); 408 } 409 410 String getHeaders(String request) { 411 int start = request.indexOf(CRLF); 412 int end = request.indexOf(CRLFCRLF); 413 if (start == -1 || end == -1) { 414 throw new RuntimeException("Malformed request"); 415 } 416 return request.substring(start,end); 417 } 418 419 void addHeaders(String headers, HttpHeadersImpl hdrs) { 420 String[] hh = headers.split(CRLF); 421 for (String header : hh) { 422 int colon = header.indexOf(':'); 423 if (colon == -1) 424 continue; 425 String name = header.substring(0, colon); 426 String value = header.substring(colon+1); 427 while (value.startsWith(" ")) 428 value = value.substring(1); 429 hdrs.addHeader(name, value); 430 } 431 } 432 433 // First stream (1) comes from a plaintext HTTP/1.1 request 434 @SuppressWarnings({"rawtypes","unchecked"}) 435 void createPrimordialStream(String request) throws IOException { 436 HttpHeadersImpl headers = new HttpHeadersImpl(); 437 String requestLine = getRequestLine(request); 438 String[] tokens = requestLine.split(" "); 439 if (!tokens[2].equals("HTTP/1.1")) { 440 throw new IOException("bad request line"); 441 } 442 URI uri = null; 443 try { 444 uri = new URI(tokens[1]); 445 } catch (URISyntaxException e) { 446 throw new IOException(e); 447 } 448 String host = getHeader(request, "Host"); 449 if (host == null) { 450 throw new IOException("missing Host"); 451 } 452 453 headers.setHeader(":method", tokens[0]); 454 headers.setHeader(":scheme", "http"); // always in this case 455 headers.setHeader(":authority", host); 456 headers.setHeader(":path", uri.getPath()); 457 Queue q = new Queue(sentinel); 458 String body = getRequestBody(request); 459 addHeaders(getHeaders(request), headers); 460 headers.setHeader("Content-length", Integer.toString(body.length())); 461 462 addRequestBodyToQueue(body, q); 463 streams.put(1, q); 464 exec.submit(() -> { 465 handleRequest(headers, q, 1, true /*complete request has been read*/); 466 }); 467 } 468 469 // all other streams created here 470 @SuppressWarnings({"rawtypes","unchecked"}) 471 void createStream(HeaderFrame frame) throws IOException { 472 List<HeaderFrame> frames = new LinkedList<>(); 473 frames.add(frame); 474 int streamid = frame.streamid(); 475 if (streamid != nextstream) { 476 throw new IOException("unexpected stream id"); 477 } 478 nextstream += 2; 479 480 boolean endStream = false; 481 if (frame.getFlag(HeaderFrame.END_STREAM)) { 482 endStream = true; 483 } 484 485 while (!frame.getFlag(HeaderFrame.END_HEADERS)) { 486 Http2Frame f = readFrame(); 487 if (!(f instanceof HeaderFrame)) { 488 handleCommonFrame(f); // should only be error frames 489 } else { 490 frame = (HeaderFrame) f; 491 if (frame.getFlag(HeaderFrame.END_STREAM)) { 492 endStream = true; 493 } 494 frames.add(frame); 495 } 496 } 497 boolean endStreamReceived = endStream; 498 HttpHeadersImpl headers = decodeHeaders(frames); 499 Queue q = new Queue(sentinel); 500 streams.put(streamid, q); 501 exec.submit(() -> { 502 handleRequest(headers, q, streamid, endStreamReceived); 503 }); 504 } 505 506 // runs in own thread. Handles request from start to finish. Incoming frames 507 // for this stream/request delivered on Q 508 509 @SuppressWarnings({"rawtypes","unchecked"}) 510 void handleRequest(HttpHeadersImpl headers, 511 Queue queue, 512 int streamid, 513 boolean endStreamReceived) 514 { 515 String method = headers.firstValue(":method").orElse(""); 516 //System.out.println("method = " + method); 517 String path = headers.firstValue(":path").orElse(""); 518 //System.out.println("path = " + path); 519 String scheme = headers.firstValue(":scheme").orElse(""); 520 //System.out.println("scheme = " + scheme); 521 String authority = headers.firstValue(":authority").orElse(""); 522 //System.out.println("authority = " + authority); 523 System.err.printf("TestServer: %s %s\n", method, path); 524 HttpHeadersImpl rspheaders = new HttpHeadersImpl(); 525 int winsize = clientSettings.getParameter( 526 SettingsFrame.INITIAL_WINDOW_SIZE); 527 //System.err.println ("Stream window size = " + winsize); 528 529 final InputStream bis; 530 if (endStreamReceived && queue.size() == 0) { 531 System.err.println("Server: got END_STREAM for stream " + streamid); 532 bis = NullInputStream.INSTANCE; 533 } else { 534 System.err.println("Server: creating input stream for stream " + streamid); 535 bis = new BodyInputStream(queue, streamid, this); 536 } 537 try (bis; 538 BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) 539 { 540 outStreams.put(streamid, bos); 541 String us = scheme + "://" + authority + path; 542 URI uri = new URI(us); 543 boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; 544 Http2TestExchange exchange = exchangeSupplier.get(streamid, method, 545 headers, rspheaders, uri, bis, getSSLSession(), 546 bos, this, pushAllowed); 547 548 // give to user 549 Http2Handler handler = server.getHandlerFor(uri.getPath()); 550 handler.handle(exchange); 551 552 // everything happens in the exchange from here. Hopefully will 553 // return though. 554 } catch (Throwable e) { 555 System.err.println("TestServer: handleRequest exception: " + e); 556 e.printStackTrace(); 557 } 558 } 559 560 private SSLSession getSSLSession() { 561 if (! (socket instanceof SSLSocket)) 562 return null; 563 SSLSocket ssl = (SSLSocket)socket; 564 return ssl.getSession(); 565 } 566 // Runs in own thread 567 568 @SuppressWarnings({"rawtypes","unchecked"}) 569 void readLoop() { 570 try { 571 while (!stopping) { 572 Http2Frame frame = readFrame(); 573 //System.err.printf("TestServer: received frame %s\n", frame); 574 int stream = frame.streamid(); 575 if (stream == 0) { 576 if (frame.type() == WindowUpdateFrame.TYPE) { 577 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 578 updateConnectionWindow(wup.getUpdate()); 579 } else { 580 // other common frame types 581 handleCommonFrame(frame); 582 } 583 } else { 584 Queue q = streams.get(stream); 585 if (frame.type() == HeadersFrame.TYPE) { 586 if (q != null) { 587 System.err.println("HEADERS frame for existing stream! Error."); 588 // TODO: close connection 589 continue; 590 } else { 591 createStream((HeadersFrame) frame); 592 } 593 } else { 594 if (q == null && !pushStreams.contains(stream)) { 595 System.err.printf("Non Headers frame received with"+ 596 " non existing stream (%d) ", frame.streamid()); 597 System.err.println(frame); 598 continue; 599 } 600 if (frame.type() == WindowUpdateFrame.TYPE) { 601 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 602 synchronized (updaters) { 603 Consumer<Integer> r = updaters.get(stream); 604 r.accept(wup.getUpdate()); 605 } 606 } else if (frame.type() == ResetFrame.TYPE) { 607 // do orderly close on input q 608 // and close the output q immediately 609 // This should mean depending on what the 610 // handler is doing: either an EOF on read 611 // or an IOException if writing the response. 612 q.orderlyClose(); 613 BodyOutputStream oq = outStreams.get(stream); 614 if (oq != null) 615 oq.closeInternal(); 616 617 } else { 618 q.put(frame); 619 } 620 } 621 } 622 } 623 } catch (Throwable e) { 624 if (!stopping) { 625 System.err.println("Http server reader thread shutdown"); 626 e.printStackTrace(); 627 } 628 close(); 629 } 630 } 631 632 List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) { 633 List<ByteBuffer> buffers = new LinkedList<>(); 634 635 ByteBuffer buf = getBuffer(); 636 boolean encoded; 637 for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) { 638 List<String> values = entry.getValue(); 639 String key = entry.getKey().toLowerCase(); 640 for (String value : values) { 641 do { 642 hpackOut.header(key, value); 643 encoded = hpackOut.encode(buf); 644 if (!encoded) { 645 buf.flip(); 646 buffers.add(buf); 647 buf = getBuffer(); 648 } 649 } while (!encoded); 650 } 651 } 652 buf.flip(); 653 buffers.add(buf); 654 return buffers; 655 } 656 657 static void closeIgnore(Closeable c) { 658 try { 659 c.close(); 660 } catch (IOException e) {} 661 } 662 663 // Runs in own thread 664 void writeLoop() { 665 try { 666 while (!stopping) { 667 Http2Frame frame; 668 try { 669 frame = outputQ.take(); 670 } catch(IOException x) { 671 if (stopping && x.getCause() instanceof InterruptedException) { 672 break; 673 } else throw x; 674 } 675 if (frame instanceof ResponseHeaders) { 676 ResponseHeaders rh = (ResponseHeaders)frame; 677 HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers)); 678 writeFrame(hf); 679 } else if (frame instanceof OutgoingPushPromise) { 680 handlePush((OutgoingPushPromise)frame); 681 } else 682 writeFrame(frame); 683 } 684 System.err.println("TestServer: Connection writer stopping"); 685 } catch (Throwable e) { 686 e.printStackTrace(); 687 /*close(); 688 if (!stopping) { 689 e.printStackTrace(); 690 System.err.println("TestServer: writeLoop exception: " + e); 691 }*/ 692 } 693 } 694 695 private void handlePush(OutgoingPushPromise op) throws IOException { 696 int promisedStreamid = nextPushStreamId; 697 PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, 698 HeaderFrame.END_HEADERS, 699 promisedStreamid, 700 encodeHeaders(op.headers), 701 0); 702 pushStreams.add(promisedStreamid); 703 nextPushStreamId += 2; 704 pp.streamid(op.parentStream); 705 writeFrame(pp); 706 final InputStream ii = op.is; 707 final BodyOutputStream oo = new BodyOutputStream( 708 promisedStreamid, 709 clientSettings.getParameter( 710 SettingsFrame.INITIAL_WINDOW_SIZE), this); 711 outStreams.put(promisedStreamid, oo); 712 oo.goodToGo(); 713 exec.submit(() -> { 714 try { 715 ResponseHeaders oh = getPushResponse(promisedStreamid); 716 outputQ.put(oh); 717 ii.transferTo(oo); 718 } catch (Throwable ex) { 719 System.err.printf("TestServer: pushing response error: %s\n", 720 ex.toString()); 721 } finally { 722 closeIgnore(ii); 723 closeIgnore(oo); 724 } 725 }); 726 727 } 728 729 // returns a minimal response with status 200 730 // that is the response to the push promise just sent 731 private ResponseHeaders getPushResponse(int streamid) { 732 HttpHeadersImpl h = new HttpHeadersImpl(); 733 h.addHeader(":status", "200"); 734 ResponseHeaders oh = new ResponseHeaders(h); 735 oh.streamid(streamid); 736 oh.setFlag(HeaderFrame.END_HEADERS); 737 return oh; 738 } 739 740 private ByteBuffer getBuffer() { 741 return ByteBuffer.allocate(8 * 1024); 742 } 743 744 private Http2Frame readFrame() throws IOException { 745 byte[] buf = new byte[9]; 746 if (is.readNBytes(buf, 0, 9) != 9) 747 throw new IOException("readFrame: connection closed"); 748 int len = 0; 749 for (int i = 0; i < 3; i++) { 750 int n = buf[i] & 0xff; 751 //System.err.println("n = " + n); 752 len = (len << 8) + n; 753 } 754 byte[] rest = new byte[len]; 755 int n = is.readNBytes(rest, 0, len); 756 if (n != len) 757 throw new IOException("Error reading frame"); 758 List<Http2Frame> frames = new ArrayList<>(); 759 FramesDecoder reader = new FramesDecoder(frames::add); 760 reader.decode(ByteBuffer.wrap(buf)); 761 reader.decode(ByteBuffer.wrap(rest)); 762 if (frames.size()!=1) 763 throw new IOException("Expected 1 frame got "+frames.size()) ; 764 765 return frames.get(0); 766 } 767 768 void sendSettingsFrame() throws IOException { 769 sendSettingsFrame(false); 770 } 771 772 void sendSettingsFrame(boolean now) throws IOException { 773 if (now) { 774 writeFrame(serverSettings); 775 } else { 776 outputQ.put(serverSettings); 777 } 778 } 779 780 String readUntil(String end) throws IOException { 781 int number = end.length(); 782 int found = 0; 783 StringBuilder sb = new StringBuilder(); 784 while (found < number) { 785 char expected = end.charAt(found); 786 int c = is.read(); 787 if (c == -1) { 788 throw new IOException("Connection closed"); 789 } 790 char c0 = (char) c; 791 sb.append(c0); 792 if (c0 != expected) { 793 found = 0; 794 continue; 795 } 796 found++; 797 } 798 return sb.toString(); 799 } 800 801 private int getContentLength(String headers) { 802 return getIntHeader(headers, "Content-length"); 803 } 804 805 private int getIntHeader(String headers, String name) { 806 String val = getHeader(headers, name); 807 if (val == null) { 808 return -1; 809 } 810 return Integer.parseInt(val); 811 } 812 813 private String getHeader(String headers, String name) { 814 String headers1 = headers.toLowerCase(); // not efficient 815 name = CRLF + name.toLowerCase(); 816 int start = headers1.indexOf(name); 817 if (start == -1) { 818 return null; 819 } 820 start += 2; 821 int end = headers1.indexOf(CRLF, start); 822 String line = headers.substring(start, end); 823 start = line.indexOf(':'); 824 if (start == -1) { 825 return null; 826 } 827 return line.substring(start + 1).trim(); 828 } 829 830 final static String CRLF = "\r\n"; 831 final static String CRLFCRLF = "\r\n\r\n"; 832 833 String readHttp1Request() throws IOException { 834 String headers = readUntil(CRLF + CRLF); 835 int clen = getContentLength(headers); 836 byte[] buf; 837 if (clen >= 0) { 838 // HTTP/1.1 fixed length content ( may be 0 ), read it 839 buf = new byte[clen]; 840 is.readNBytes(buf, 0, clen); 841 } else { 842 // HTTP/1.1 chunked data, read it 843 buf = readChunkedInputStream(is); 844 } 845 String body = new String(buf, StandardCharsets.US_ASCII); 846 return headers + body; 847 } 848 849 // This is a quick hack to get a chunked input stream reader. 850 private static byte[] readChunkedInputStream(InputStream is) throws IOException { 851 ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null); 852 return cis.readAllBytes(); 853 } 854 855 void sendHttp1Response(int code, String msg, String... headers) throws IOException { 856 StringBuilder sb = new StringBuilder(); 857 sb.append("HTTP/1.1 ") 858 .append(code) 859 .append(' ') 860 .append(msg) 861 .append(CRLF); 862 int numheaders = headers.length; 863 for (int i = 0; i < numheaders; i += 2) { 864 sb.append(headers[i]) 865 .append(": ") 866 .append(headers[i + 1]) 867 .append(CRLF); 868 } 869 sb.append(CRLF); 870 String s = sb.toString(); 871 os.write(s.getBytes("US-ASCII")); 872 os.flush(); 873 } 874 875 private void unexpectedFrame(Http2Frame frame) { 876 System.err.println("OOPS. Unexpected"); 877 assert false; 878 } 879 880 final static ByteBuffer[] bbarray = new ByteBuffer[0]; 881 882 // wrapper around a BlockingQueue that throws an exception when it's closed 883 // Each stream has one of these 884 885 String getRequestBody(String request) { 886 int bodystart = request.indexOf(CRLF+CRLF); 887 String body; 888 if (bodystart == -1) 889 body = ""; 890 else 891 body = request.substring(bodystart+4); 892 return body; 893 } 894 895 @SuppressWarnings({"rawtypes","unchecked"}) 896 void addRequestBodyToQueue(String body, Queue q) throws IOException { 897 ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); 898 DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf); 899 // only used for primordial stream 900 q.put(df); 901 } 902 903 // window updates done in main reader thread because they may 904 // be used to unblock BodyOutputStreams waiting for WUPs 905 906 HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>(); 907 908 void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) { 909 synchronized(updaters) { 910 updaters.put(streamid, r); 911 } 912 } 913 914 int sendWindow = 64 * 1024 - 1; // connection level send window 915 916 /** 917 * BodyOutputStreams call this to get the connection window first. 918 * 919 * @param amount 920 */ 921 synchronized void obtainConnectionWindow(int amount) throws InterruptedException { 922 while (amount > 0) { 923 int n = Math.min(amount, sendWindow); 924 amount -= n; 925 sendWindow -= n; 926 if (amount > 0) 927 wait(); 928 } 929 } 930 931 synchronized void updateConnectionWindow(int amount) { 932 sendWindow += amount; 933 notifyAll(); 934 } 935 936 // simplified output headers class. really just a type safe container 937 // for the hashmap. 938 939 static class ResponseHeaders extends Http2Frame { 940 HttpHeadersImpl headers; 941 942 ResponseHeaders(HttpHeadersImpl headers) { 943 super(0, 0); 944 this.headers = headers; 945 } 946 947 } 948 949 static class NullInputStream extends InputStream { 950 static final NullInputStream INSTANCE = new NullInputStream(); 951 private NullInputStream() {} 952 public int read() { return -1; } 953 public int available() { return 0; } 954 } 955 }