1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.BufferedInputStream; 25 import java.io.BufferedOutputStream; 26 import java.io.Closeable; 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.io.InputStream; 30 import java.io.OutputStream; 31 import java.net.Socket; 32 import java.net.URI; 33 import java.net.InetAddress; 34 import javax.net.ssl.*; 35 import java.net.URISyntaxException; 36 import java.nio.ByteBuffer; 37 import java.nio.charset.StandardCharsets; 38 import java.util.*; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.ConcurrentLinkedQueue; 42 import java.util.function.Consumer; 43 import jdk.incubator.http.internal.common.HttpHeadersImpl; 44 import jdk.incubator.http.internal.frame.*; 45 import jdk.incubator.http.internal.hpack.Decoder; 46 import jdk.incubator.http.internal.hpack.DecodingCallback; 47 import jdk.incubator.http.internal.hpack.Encoder; 48 import sun.net.www.http.ChunkedInputStream; 49 import sun.net.www.http.HttpClient; 50 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; 51 52 /** 53 * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1 54 * or HTTPS opened using "h2" ALPN. 55 */ 56 public class Http2TestServerConnection { 57 final Http2TestServer server; 58 @SuppressWarnings({"rawtypes","unchecked"}) 59 final Map<Integer, Queue> streams; // input q per stream 60 final Map<Integer, BodyOutputStream> outStreams; // output q per stream 61 final HashSet<Integer> pushStreams; 62 final Queue<Http2Frame> outputQ; 63 volatile int nextstream; 64 final Socket socket; 65 final Http2TestExchangeSupplier exchangeSupplier; 66 final InputStream is; 67 final OutputStream os; 68 volatile Encoder hpackOut; 69 volatile Decoder hpackIn; 70 volatile SettingsFrame clientSettings; 71 final SettingsFrame serverSettings; 72 final ExecutorService exec; 73 final boolean secure; 74 volatile boolean stopping; 75 volatile int nextPushStreamId = 2; 76 ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>(); 77 78 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); 79 final static byte[] EMPTY_BARRAY = new byte[0]; 80 final Random random; 81 82 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); 83 84 static class Sentinel extends Http2Frame { 85 Sentinel() { super(-1,-1);} 86 } 87 88 static final Sentinel sentinel = new Sentinel(); 89 90 class PingRequest { 91 final byte[] pingData; 92 final long pingStamp; 93 final CompletableFuture<Long> response; 94 95 PingRequest() { 96 pingData = new byte[8]; 97 random.nextBytes(pingData); 98 pingStamp = System.currentTimeMillis(); 99 response = new CompletableFuture<>(); 100 } 101 102 PingFrame frame() { 103 return new PingFrame(0, pingData); 104 } 105 106 CompletableFuture<Long> response() { 107 return response; 108 } 109 110 void success() { 111 response.complete(System.currentTimeMillis() - pingStamp); 112 } 113 114 void fail(Throwable t) { 115 response.completeExceptionally(t); 116 } 117 } 118 119 Http2TestServerConnection(Http2TestServer server, 120 Socket socket, 121 Http2TestExchangeSupplier exchangeSupplier) 122 throws IOException 123 { 124 if (socket instanceof SSLSocket) { 125 handshake(server.serverName(), (SSLSocket)socket); 126 } 127 System.err.println("TestServer: New connection from " + socket); 128 this.server = server; 129 this.exchangeSupplier = exchangeSupplier; 130 this.streams = Collections.synchronizedMap(new HashMap<>()); 131 this.outStreams = Collections.synchronizedMap(new HashMap<>()); 132 this.outputQ = new Queue<>(sentinel); 133 this.random = new Random(); 134 this.socket = socket; 135 this.socket.setTcpNoDelay(true); 136 this.serverSettings = SettingsFrame.getDefaultSettings(); 137 this.exec = server.exec; 138 this.secure = server.secure; 139 this.pushStreams = new HashSet<>(); 140 is = new BufferedInputStream(socket.getInputStream()); 141 os = new BufferedOutputStream(socket.getOutputStream()); 142 } 143 144 /** 145 * Sends a PING frame on this connection, and completes the returned 146 * CF when the PING ack is received. The CF is given 147 * an integer, whose value is the number of milliseconds 148 * between PING and ACK. 149 */ 150 CompletableFuture<Long> sendPing() { 151 PingRequest ping = null; 152 try { 153 ping = new PingRequest(); 154 pings.add(ping); 155 outputQ.put(ping.frame()); 156 } catch (Throwable t) { 157 ping.fail(t); 158 } 159 return ping.response(); 160 } 161 162 void goAway(int error) throws IOException { 163 int laststream = nextstream >= 3 ? nextstream - 2 : 1; 164 165 GoAwayFrame go = new GoAwayFrame(laststream, error); 166 outputQ.put(go); 167 } 168 169 /** 170 * Returns the first PingRequest from Queue 171 */ 172 private PingRequest getNextRequest() { 173 return pings.poll(); 174 } 175 176 /** 177 * Handles incoming Ping, which could be an ack 178 * or a client originated Ping 179 */ 180 void handlePing(PingFrame ping) throws IOException { 181 if (ping.streamid() != 0) { 182 System.err.println("Invalid ping received"); 183 close(ErrorFrame.PROTOCOL_ERROR); 184 return; 185 } 186 if (ping.getFlag(PingFrame.ACK)) { 187 // did we send a Ping? 188 PingRequest request = getNextRequest(); 189 if (request == null) { 190 System.err.println("Invalid ping ACK received"); 191 close(ErrorFrame.PROTOCOL_ERROR); 192 return; 193 } else if (!Arrays.equals(request.pingData, ping.getData())) { 194 request.fail(new RuntimeException("Wrong ping data in ACK")); 195 } else { 196 request.success(); 197 } 198 } else { 199 // client originated PING. Just send it back with ACK set 200 ping.setFlag(PingFrame.ACK); 201 outputQ.put(ping); 202 } 203 } 204 205 private static boolean compareIPAddrs(InetAddress addr1, String host) { 206 try { 207 InetAddress addr2 = InetAddress.getByName(host); 208 return addr1.equals(addr2); 209 } catch (IOException e) { 210 throw new UncheckedIOException(e); 211 } 212 } 213 214 private static void handshake(String name, SSLSocket sock) throws IOException { 215 if (name == null) { 216 // no name set. No need to check 217 return; 218 } else if (name.equals("127.0.0.1")) { 219 name = "localhost"; 220 } 221 final String fname = name; 222 final InetAddress addr1 = InetAddress.getByName(name); 223 SSLParameters params = sock.getSSLParameters(); 224 SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) { 225 public boolean matches (SNIServerName n) { 226 String host = ((SNIHostName)n).getAsciiName(); 227 if (host.equals("127.0.0.1")) 228 host = "localhost"; 229 boolean cmp = host.equalsIgnoreCase(fname); 230 if (cmp) 231 return true; 232 return compareIPAddrs(addr1, host); 233 } 234 }; 235 List<SNIMatcher> list = List.of(matcher); 236 params.setSNIMatchers(list); 237 sock.setSSLParameters(params); 238 sock.getSession(); // blocks until handshake done 239 } 240 241 void closeIncoming() { 242 close(-1); 243 } 244 245 void close(int error) { 246 if (stopping) 247 return; 248 stopping = true; 249 System.err.printf("Server connection to %s stopping. %d streams\n", 250 socket.getRemoteSocketAddress().toString(), streams.size()); 251 streams.forEach((i, q) -> { 252 q.orderlyClose(); 253 }); 254 try { 255 if (error != -1) 256 goAway(error); 257 outputQ.orderlyClose(); 258 socket.close(); 259 } catch (Exception e) { 260 } 261 } 262 263 private void readPreface() throws IOException { 264 int len = clientPreface.length; 265 byte[] bytes = new byte[len]; 266 is.readNBytes(bytes, 0, len); 267 if (Arrays.compare(clientPreface, bytes) != 0) { 268 throw new IOException("Invalid preface: " + new String(bytes, 0, len)); 269 } 270 } 271 272 String doUpgrade() throws IOException { 273 String upgrade = readHttp1Request(); 274 String h2c = getHeader(upgrade, "Upgrade"); 275 if (h2c == null || !h2c.equals("h2c")) { 276 System.err.println("Server:HEADERS: " + upgrade); 277 throw new IOException("Bad upgrade 1 " + h2c); 278 } 279 280 sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade", 281 "Upgrade", "h2c"); 282 283 sendSettingsFrame(); 284 readPreface(); 285 286 String clientSettingsString = getHeader(upgrade, "HTTP2-Settings"); 287 clientSettings = getSettingsFromString(clientSettingsString); 288 289 return upgrade; 290 } 291 292 /** 293 * Decodes the given, Client, settings payload provided in base64 HTTP1 294 * header value. 295 */ 296 private SettingsFrame getSettingsFromString(String s) throws IOException { 297 Base64.Decoder decoder = Base64.getUrlDecoder(); 298 byte[] payload = decoder.decode(s); 299 ByteBuffer bb1 = ByteBuffer.wrap(payload); 300 // simulate header of Settings Frame 301 ByteBuffer bb0 = ByteBuffer.wrap( 302 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); 303 List<Http2Frame> frames = new ArrayList<>(); 304 FramesDecoder reader = new FramesDecoder(frames::add); 305 reader.decode(bb0); 306 reader.decode(bb1); 307 if (frames.size()!=1) 308 throw new IOException("Expected 1 frame got "+frames.size()) ; 309 Http2Frame frame = frames.get(0); 310 if (!(frame instanceof SettingsFrame)) 311 throw new IOException("Expected SettingsFrame"); 312 return (SettingsFrame)frame; 313 } 314 315 void run() throws Exception { 316 String upgrade = null; 317 if (!secure) { 318 upgrade = doUpgrade(); 319 } else { 320 readPreface(); 321 sendSettingsFrame(true); 322 clientSettings = (SettingsFrame) readFrame(); 323 if (clientSettings.getFlag(SettingsFrame.ACK)) { 324 // we received the ack to our frame first 325 clientSettings = (SettingsFrame) readFrame(); 326 } 327 nextstream = 1; 328 } 329 330 System.out.println("ServerSettings: " + serverSettings); 331 System.out.println("ClientSettings: " + clientSettings); 332 333 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 334 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 335 336 if (!secure) { 337 createPrimordialStream(upgrade); 338 nextstream = 3; 339 } 340 341 (new ConnectionThread("readLoop", this::readLoop)).start(); 342 (new ConnectionThread("writeLoop", this::writeLoop)).start(); 343 } 344 345 class ConnectionThread extends Thread { 346 final Runnable r; 347 ConnectionThread(String name, Runnable r) { 348 setName(name); 349 setDaemon(true); 350 this.r = r; 351 } 352 353 public void run() { 354 r.run(); 355 } 356 } 357 358 private void writeFrame(Http2Frame frame) throws IOException { 359 List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame); 360 //System.err.println("TestServer: Writing frame " + frame.toString()); 361 int c = 0; 362 for (ByteBuffer buf : bufs) { 363 byte[] ba = buf.array(); 364 int start = buf.arrayOffset() + buf.position(); 365 c += buf.remaining(); 366 os.write(ba, start, buf.remaining()); 367 368 // System.out.println("writing byte at a time"); 369 // while (buf.hasRemaining()) { 370 // byte b = buf.get(); 371 // os.write(b); 372 // os.flush(); 373 // try { 374 // Thread.sleep(1); 375 // } catch(InterruptedException e) { 376 // UncheckedIOException uie = new UncheckedIOException(new IOException("")); 377 // uie.addSuppressed(e); 378 // throw uie; 379 // } 380 // } 381 } 382 os.flush(); 383 //System.err.printf("TestServer: wrote %d bytes\n", c); 384 } 385 386 private void handleCommonFrame(Http2Frame f) throws IOException { 387 if (f instanceof SettingsFrame) { 388 SettingsFrame sf = (SettingsFrame) f; 389 if (sf.getFlag(SettingsFrame.ACK)) // ignore 390 { 391 return; 392 } 393 // otherwise acknowledge it 394 clientSettings = sf; 395 SettingsFrame frame = new SettingsFrame(); 396 frame.setFlag(SettingsFrame.ACK); 397 frame.streamid(0); 398 outputQ.put(frame); 399 return; 400 } else if (f instanceof GoAwayFrame) { 401 System.err.println("Closing: "+ f.toString()); 402 close(ErrorFrame.NO_ERROR); 403 } else if (f instanceof PingFrame) { 404 handlePing((PingFrame)f); 405 } else 406 throw new UnsupportedOperationException("Not supported yet: " + f.toString()); 407 } 408 409 void sendWindowUpdates(int len, int streamid) throws IOException { 410 if (len == 0) 411 return; 412 WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len); 413 outputQ.put(wup); 414 wup = new WindowUpdateFrame(0 , len); 415 outputQ.put(wup); 416 } 417 418 HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException { 419 HttpHeadersImpl headers = new HttpHeadersImpl(); 420 421 DecodingCallback cb = (name, value) -> { 422 headers.addHeader(name.toString(), value.toString()); 423 }; 424 425 for (HeaderFrame frame : frames) { 426 List<ByteBuffer> buffers = frame.getHeaderBlock(); 427 for (ByteBuffer buffer : buffers) { 428 hpackIn.decode(buffer, false, cb); 429 } 430 } 431 hpackIn.decode(EMPTY_BUFFER, true, cb); 432 return headers; 433 } 434 435 String getRequestLine(String request) { 436 int eol = request.indexOf(CRLF); 437 return request.substring(0, eol); 438 } 439 440 String getHeaders(String request) { 441 int start = request.indexOf(CRLF); 442 int end = request.indexOf(CRLFCRLF); 443 if (start == -1 || end == -1) { 444 throw new RuntimeException("Malformed request"); 445 } 446 return request.substring(start,end); 447 } 448 449 void addHeaders(String headers, HttpHeadersImpl hdrs) { 450 String[] hh = headers.split(CRLF); 451 for (String header : hh) { 452 int colon = header.indexOf(':'); 453 if (colon == -1) 454 continue; 455 String name = header.substring(0, colon); 456 String value = header.substring(colon+1); 457 while (value.startsWith(" ")) 458 value = value.substring(1); 459 hdrs.addHeader(name, value); 460 } 461 } 462 463 // First stream (1) comes from a plaintext HTTP/1.1 request 464 @SuppressWarnings({"rawtypes","unchecked"}) 465 void createPrimordialStream(String request) throws IOException { 466 HttpHeadersImpl headers = new HttpHeadersImpl(); 467 String requestLine = getRequestLine(request); 468 String[] tokens = requestLine.split(" "); 469 if (!tokens[2].equals("HTTP/1.1")) { 470 throw new IOException("bad request line"); 471 } 472 URI uri = null; 473 try { 474 uri = new URI(tokens[1]); 475 } catch (URISyntaxException e) { 476 throw new IOException(e); 477 } 478 String host = getHeader(request, "Host"); 479 if (host == null) { 480 throw new IOException("missing Host"); 481 } 482 483 headers.setHeader(":method", tokens[0]); 484 headers.setHeader(":scheme", "http"); // always in this case 485 headers.setHeader(":authority", host); 486 headers.setHeader(":path", uri.getPath()); 487 Queue q = new Queue(sentinel); 488 String body = getRequestBody(request); 489 addHeaders(getHeaders(request), headers); 490 headers.setHeader("Content-length", Integer.toString(body.length())); 491 492 addRequestBodyToQueue(body, q); 493 streams.put(1, q); 494 exec.submit(() -> { 495 handleRequest(headers, q, 1, true /*complete request has been read*/); 496 }); 497 } 498 499 // all other streams created here 500 @SuppressWarnings({"rawtypes","unchecked"}) 501 void createStream(HeaderFrame frame) throws IOException { 502 List<HeaderFrame> frames = new LinkedList<>(); 503 frames.add(frame); 504 int streamid = frame.streamid(); 505 if (streamid != nextstream) { 506 throw new IOException("unexpected stream id"); 507 } 508 nextstream += 2; 509 510 boolean endStream = false; 511 if (frame.getFlag(HeaderFrame.END_STREAM)) { 512 endStream = true; 513 } 514 515 while (!frame.getFlag(HeaderFrame.END_HEADERS)) { 516 Http2Frame f = readFrame(); 517 if (!(f instanceof HeaderFrame)) { 518 handleCommonFrame(f); // should only be error frames 519 } else { 520 frame = (HeaderFrame) f; 521 if (frame.getFlag(HeaderFrame.END_STREAM)) { 522 endStream = true; 523 } 524 frames.add(frame); 525 } 526 } 527 boolean endStreamReceived = endStream; 528 HttpHeadersImpl headers = decodeHeaders(frames); 529 Queue q = new Queue(sentinel); 530 streams.put(streamid, q); 531 exec.submit(() -> { 532 handleRequest(headers, q, streamid, endStreamReceived); 533 }); 534 } 535 536 // runs in own thread. Handles request from start to finish. Incoming frames 537 // for this stream/request delivered on Q 538 539 @SuppressWarnings({"rawtypes","unchecked"}) 540 void handleRequest(HttpHeadersImpl headers, 541 Queue queue, 542 int streamid, 543 boolean endStreamReceived) 544 { 545 String method = headers.firstValue(":method").orElse(""); 546 //System.out.println("method = " + method); 547 String path = headers.firstValue(":path").orElse(""); 548 //System.out.println("path = " + path); 549 String scheme = headers.firstValue(":scheme").orElse(""); 550 //System.out.println("scheme = " + scheme); 551 String authority = headers.firstValue(":authority").orElse(""); 552 //System.out.println("authority = " + authority); 553 System.err.printf("TestServer: %s %s\n", method, path); 554 HttpHeadersImpl rspheaders = new HttpHeadersImpl(); 555 int winsize = clientSettings.getParameter( 556 SettingsFrame.INITIAL_WINDOW_SIZE); 557 //System.err.println ("Stream window size = " + winsize); 558 559 final InputStream bis; 560 if (endStreamReceived && queue.size() == 0) { 561 System.err.println("Server: got END_STREAM for stream " + streamid); 562 bis = NullInputStream.INSTANCE; 563 } else { 564 System.err.println("Server: creating input stream for stream " + streamid); 565 bis = new BodyInputStream(queue, streamid, this); 566 } 567 try (bis; 568 BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) 569 { 570 outStreams.put(streamid, bos); 571 String us = scheme + "://" + authority + path; 572 URI uri = new URI(us); 573 boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; 574 Http2TestExchange exchange = exchangeSupplier.get(streamid, method, 575 headers, rspheaders, uri, bis, getSSLSession(), 576 bos, this, pushAllowed); 577 578 // give to user 579 Http2Handler handler = server.getHandlerFor(uri.getPath()); 580 handler.handle(exchange); 581 582 // everything happens in the exchange from here. Hopefully will 583 // return though. 584 } catch (Throwable e) { 585 System.err.println("TestServer: handleRequest exception: " + e); 586 e.printStackTrace(); 587 } 588 } 589 590 private SSLSession getSSLSession() { 591 if (! (socket instanceof SSLSocket)) 592 return null; 593 SSLSocket ssl = (SSLSocket)socket; 594 return ssl.getSession(); 595 } 596 // Runs in own thread 597 598 @SuppressWarnings({"rawtypes","unchecked"}) 599 void readLoop() { 600 try { 601 while (!stopping) { 602 Http2Frame frame = readFrameImpl(); 603 if (frame == null) { 604 closeIncoming(); 605 return; 606 } 607 //System.err.printf("TestServer: received frame %s\n", frame); 608 int stream = frame.streamid(); 609 if (stream == 0) { 610 if (frame.type() == WindowUpdateFrame.TYPE) { 611 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 612 updateConnectionWindow(wup.getUpdate()); 613 } else { 614 // other common frame types 615 handleCommonFrame(frame); 616 } 617 } else { 618 Queue q = streams.get(stream); 619 if (frame.type() == HeadersFrame.TYPE) { 620 if (q != null) { 621 System.err.println("HEADERS frame for existing stream! Error."); 622 // TODO: close connection 623 continue; 624 } else { 625 createStream((HeadersFrame) frame); 626 } 627 } else { 628 if (q == null && !pushStreams.contains(stream)) { 629 System.err.printf("Non Headers frame received with"+ 630 " non existing stream (%d) ", frame.streamid()); 631 System.err.println(frame); 632 continue; 633 } 634 if (frame.type() == WindowUpdateFrame.TYPE) { 635 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 636 synchronized (updaters) { 637 Consumer<Integer> r = updaters.get(stream); 638 r.accept(wup.getUpdate()); 639 } 640 } else if (frame.type() == ResetFrame.TYPE) { 641 // do orderly close on input q 642 // and close the output q immediately 643 // This should mean depending on what the 644 // handler is doing: either an EOF on read 645 // or an IOException if writing the response. 646 if (q != null) { 647 q.orderlyClose(); 648 BodyOutputStream oq = outStreams.get(stream); 649 if (oq != null) 650 oq.closeInternal(); 651 } else if (pushStreams.contains(stream)) { 652 // we could interrupt the pushStream's output 653 // but the continuation, even after a reset 654 // should be handle gracefully by the client 655 // anyway. 656 } else { 657 System.err.println("TestServer: Unexpected frame on: " + stream); 658 System.err.println(frame); 659 throw new IOException("Unexpected frame"); 660 } 661 } else { 662 q.put(frame); 663 } 664 } 665 } 666 } 667 } catch (Throwable e) { 668 if (!stopping) { 669 System.err.println("Http server reader thread shutdown"); 670 e.printStackTrace(); 671 } 672 close(ErrorFrame.PROTOCOL_ERROR); 673 } 674 } 675 676 List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) { 677 List<ByteBuffer> buffers = new LinkedList<>(); 678 679 ByteBuffer buf = getBuffer(); 680 boolean encoded; 681 for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) { 682 List<String> values = entry.getValue(); 683 String key = entry.getKey().toLowerCase(); 684 for (String value : values) { 685 do { 686 hpackOut.header(key, value); 687 encoded = hpackOut.encode(buf); 688 if (!encoded) { 689 buf.flip(); 690 buffers.add(buf); 691 buf = getBuffer(); 692 } 693 } while (!encoded); 694 } 695 } 696 buf.flip(); 697 buffers.add(buf); 698 return buffers; 699 } 700 701 static void closeIgnore(Closeable c) { 702 try { 703 c.close(); 704 } catch (IOException e) {} 705 } 706 707 // Runs in own thread 708 void writeLoop() { 709 try { 710 while (!stopping) { 711 Http2Frame frame; 712 try { 713 frame = outputQ.take(); 714 if (stopping) 715 break; 716 } catch(IOException x) { 717 if (stopping && x.getCause() instanceof InterruptedException) { 718 break; 719 } else throw x; 720 } 721 if (frame instanceof ResponseHeaders) { 722 ResponseHeaders rh = (ResponseHeaders)frame; 723 HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers)); 724 writeFrame(hf); 725 } else if (frame instanceof OutgoingPushPromise) { 726 handlePush((OutgoingPushPromise)frame); 727 } else 728 writeFrame(frame); 729 } 730 System.err.println("TestServer: Connection writer stopping"); 731 } catch (Throwable e) { 732 e.printStackTrace(); 733 /*close(); 734 if (!stopping) { 735 e.printStackTrace(); 736 System.err.println("TestServer: writeLoop exception: " + e); 737 }*/ 738 } 739 } 740 741 private void handlePush(OutgoingPushPromise op) throws IOException { 742 int promisedStreamid = nextPushStreamId; 743 PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, 744 HeaderFrame.END_HEADERS, 745 promisedStreamid, 746 encodeHeaders(op.headers), 747 0); 748 pushStreams.add(promisedStreamid); 749 nextPushStreamId += 2; 750 pp.streamid(op.parentStream); 751 writeFrame(pp); 752 final InputStream ii = op.is; 753 final BodyOutputStream oo = new BodyOutputStream( 754 promisedStreamid, 755 clientSettings.getParameter( 756 SettingsFrame.INITIAL_WINDOW_SIZE), this); 757 outStreams.put(promisedStreamid, oo); 758 oo.goodToGo(); 759 exec.submit(() -> { 760 try { 761 ResponseHeaders oh = getPushResponse(promisedStreamid); 762 outputQ.put(oh); 763 ii.transferTo(oo); 764 } catch (Throwable ex) { 765 System.err.printf("TestServer: pushing response error: %s\n", 766 ex.toString()); 767 } finally { 768 closeIgnore(ii); 769 closeIgnore(oo); 770 } 771 }); 772 773 } 774 775 // returns a minimal response with status 200 776 // that is the response to the push promise just sent 777 private ResponseHeaders getPushResponse(int streamid) { 778 HttpHeadersImpl h = new HttpHeadersImpl(); 779 h.addHeader(":status", "200"); 780 ResponseHeaders oh = new ResponseHeaders(h); 781 oh.streamid(streamid); 782 oh.setFlag(HeaderFrame.END_HEADERS); 783 return oh; 784 } 785 786 private ByteBuffer getBuffer() { 787 return ByteBuffer.allocate(8 * 1024); 788 } 789 790 private Http2Frame readFrame() throws IOException { 791 Http2Frame f = readFrameImpl(); 792 if (f == null) 793 throw new IOException("connection closed"); 794 return f; 795 } 796 797 // does not throw an exception for EOF 798 private Http2Frame readFrameImpl() throws IOException { 799 try { 800 byte[] buf = new byte[9]; 801 int ret; 802 ret=is.readNBytes(buf, 0, 9); 803 if (ret == 0) { 804 return null; 805 } else if (ret != 9) { 806 throw new IOException("readFrame: connection closed"); 807 } 808 int len = 0; 809 for (int i = 0; i < 3; i++) { 810 int n = buf[i] & 0xff; 811 //System.err.println("n = " + n); 812 len = (len << 8) + n; 813 } 814 byte[] rest = new byte[len]; 815 int n = is.readNBytes(rest, 0, len); 816 if (n != len) 817 throw new IOException("Error reading frame"); 818 List<Http2Frame> frames = new ArrayList<>(); 819 FramesDecoder reader = new FramesDecoder(frames::add); 820 reader.decode(ByteBuffer.wrap(buf)); 821 reader.decode(ByteBuffer.wrap(rest)); 822 if (frames.size()!=1) 823 throw new IOException("Expected 1 frame got "+frames.size()) ; 824 825 return frames.get(0); 826 } catch (IOException ee) { 827 if (stopping) 828 return null; 829 throw ee; 830 } 831 } 832 833 void sendSettingsFrame() throws IOException { 834 sendSettingsFrame(false); 835 } 836 837 void sendSettingsFrame(boolean now) throws IOException { 838 if (now) { 839 writeFrame(serverSettings); 840 } else { 841 outputQ.put(serverSettings); 842 } 843 } 844 845 String readUntil(String end) throws IOException { 846 int number = end.length(); 847 int found = 0; 848 StringBuilder sb = new StringBuilder(); 849 while (found < number) { 850 char expected = end.charAt(found); 851 int c = is.read(); 852 if (c == -1) { 853 throw new IOException("Connection closed"); 854 } 855 char c0 = (char) c; 856 sb.append(c0); 857 if (c0 != expected) { 858 found = 0; 859 continue; 860 } 861 found++; 862 } 863 return sb.toString(); 864 } 865 866 private int getContentLength(String headers) { 867 return getIntHeader(headers, "Content-length"); 868 } 869 870 private int getIntHeader(String headers, String name) { 871 String val = getHeader(headers, name); 872 if (val == null) { 873 return -1; 874 } 875 return Integer.parseInt(val); 876 } 877 878 private String getHeader(String headers, String name) { 879 String headers1 = headers.toLowerCase(); // not efficient 880 name = CRLF + name.toLowerCase(); 881 int start = headers1.indexOf(name); 882 if (start == -1) { 883 return null; 884 } 885 start += 2; 886 int end = headers1.indexOf(CRLF, start); 887 String line = headers.substring(start, end); 888 start = line.indexOf(':'); 889 if (start == -1) { 890 return null; 891 } 892 return line.substring(start + 1).trim(); 893 } 894 895 final static String CRLF = "\r\n"; 896 final static String CRLFCRLF = "\r\n\r\n"; 897 898 String readHttp1Request() throws IOException { 899 String headers = readUntil(CRLF + CRLF); 900 int clen = getContentLength(headers); 901 String te = getHeader(headers, "Transfer-encoding"); 902 byte[] buf = new byte[0]; 903 try { 904 if (clen >= 0) { 905 // HTTP/1.1 fixed length content ( may be 0 ), read it 906 buf = new byte[clen]; 907 is.readNBytes(buf, 0, clen); 908 } else if ("chunked".equalsIgnoreCase(te)) { 909 // HTTP/1.1 chunked data, read it 910 buf = readChunkedInputStream(is); 911 } 912 String body = new String(buf, StandardCharsets.US_ASCII); 913 return headers + body; 914 } catch (IOException e) { 915 System.err.println("TestServer: headers read: [ " + headers + " ]"); 916 throw e; 917 } 918 } 919 920 // This is a quick hack to get a chunked input stream reader. 921 private static byte[] readChunkedInputStream(InputStream is) throws IOException { 922 ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null); 923 return cis.readAllBytes(); 924 } 925 926 void sendHttp1Response(int code, String msg, String... headers) throws IOException { 927 StringBuilder sb = new StringBuilder(); 928 sb.append("HTTP/1.1 ") 929 .append(code) 930 .append(' ') 931 .append(msg) 932 .append(CRLF); 933 int numheaders = headers.length; 934 for (int i = 0; i < numheaders; i += 2) { 935 sb.append(headers[i]) 936 .append(": ") 937 .append(headers[i + 1]) 938 .append(CRLF); 939 } 940 sb.append(CRLF); 941 String s = sb.toString(); 942 os.write(s.getBytes("US-ASCII")); 943 os.flush(); 944 } 945 946 private void unexpectedFrame(Http2Frame frame) { 947 System.err.println("OOPS. Unexpected"); 948 assert false; 949 } 950 951 final static ByteBuffer[] bbarray = new ByteBuffer[0]; 952 953 // wrapper around a BlockingQueue that throws an exception when it's closed 954 // Each stream has one of these 955 956 String getRequestBody(String request) { 957 int bodystart = request.indexOf(CRLF+CRLF); 958 String body; 959 if (bodystart == -1) 960 body = ""; 961 else 962 body = request.substring(bodystart+4); 963 return body; 964 } 965 966 @SuppressWarnings({"rawtypes","unchecked"}) 967 void addRequestBodyToQueue(String body, Queue q) throws IOException { 968 ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); 969 DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf); 970 // only used for primordial stream 971 q.put(df); 972 } 973 974 // window updates done in main reader thread because they may 975 // be used to unblock BodyOutputStreams waiting for WUPs 976 977 HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>(); 978 979 void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) { 980 synchronized(updaters) { 981 updaters.put(streamid, r); 982 } 983 } 984 985 int sendWindow = 64 * 1024 - 1; // connection level send window 986 987 /** 988 * BodyOutputStreams call this to get the connection window first. 989 * 990 * @param amount 991 */ 992 synchronized void obtainConnectionWindow(int amount) throws InterruptedException { 993 while (amount > 0) { 994 int n = Math.min(amount, sendWindow); 995 amount -= n; 996 sendWindow -= n; 997 if (amount > 0) 998 wait(); 999 } 1000 } 1001 1002 synchronized void updateConnectionWindow(int amount) { 1003 sendWindow += amount; 1004 notifyAll(); 1005 } 1006 1007 // simplified output headers class. really just a type safe container 1008 // for the hashmap. 1009 1010 static class ResponseHeaders extends Http2Frame { 1011 HttpHeadersImpl headers; 1012 1013 ResponseHeaders(HttpHeadersImpl headers) { 1014 super(0, 0); 1015 this.headers = headers; 1016 } 1017 1018 } 1019 1020 static class NullInputStream extends InputStream { 1021 static final NullInputStream INSTANCE = new NullInputStream(); 1022 private NullInputStream() {} 1023 public int read() { return -1; } 1024 public int available() { return 0; } 1025 } 1026 }