1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.BufferedInputStream; 25 import java.io.BufferedOutputStream; 26 import java.io.Closeable; 27 import java.io.IOException; 28 import java.io.UncheckedIOException; 29 import java.io.InputStream; 30 import java.io.OutputStream; 31 import java.net.Socket; 32 import java.net.URI; 33 import java.net.InetAddress; 34 import javax.net.ssl.*; 35 import java.net.URISyntaxException; 36 import java.nio.ByteBuffer; 37 import java.nio.charset.StandardCharsets; 38 import java.util.*; 39 import java.util.concurrent.ExecutorService; 40 import java.util.function.Consumer; 41 42 import jdk.incubator.http.internal.common.ByteBufferReference; 43 import jdk.incubator.http.internal.frame.FramesDecoder; 44 45 import jdk.incubator.http.internal.common.BufferHandler; 46 import jdk.incubator.http.internal.common.HttpHeadersImpl; 47 import jdk.incubator.http.internal.common.Queue; 48 import jdk.incubator.http.internal.frame.*; 49 import jdk.incubator.http.internal.hpack.Decoder; 50 import jdk.incubator.http.internal.hpack.DecodingCallback; 51 import jdk.incubator.http.internal.hpack.Encoder; 52 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE; 53 54 /** 55 * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1 56 * or HTTPS opened using "h2" ALPN. 57 */ 58 public class Http2TestServerConnection { 59 final Http2TestServer server; 60 @SuppressWarnings({"rawtypes","unchecked"}) 61 final Map<Integer, Queue> streams; // input q per stream 62 final HashSet<Integer> pushStreams; 63 final Queue<Http2Frame> outputQ; 64 volatile int nextstream; 65 final Socket socket; 66 final InputStream is; 67 final OutputStream os; 68 volatile Encoder hpackOut; 69 volatile Decoder hpackIn; 70 volatile SettingsFrame clientSettings; 71 final SettingsFrame serverSettings; 72 final ExecutorService exec; 73 final boolean secure; 74 volatile boolean stopping; 75 volatile int nextPushStreamId = 2; 76 77 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); 78 final static byte[] EMPTY_BARRAY = new byte[0]; 79 80 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); 81 82 Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException { 83 if (socket instanceof SSLSocket) { 84 handshake(server.serverName(), (SSLSocket)socket); 85 } 86 System.err.println("TestServer: New connection from " + socket); 87 this.server = server; 88 this.streams = Collections.synchronizedMap(new HashMap<>()); 89 this.outputQ = new Queue<>(); 90 this.socket = socket; 91 this.serverSettings = SettingsFrame.getDefaultSettings(); 92 this.exec = server.exec; 93 this.secure = server.secure; 94 this.pushStreams = new HashSet<>(); 95 is = new BufferedInputStream(socket.getInputStream()); 96 os = new BufferedOutputStream(socket.getOutputStream()); 97 } 98 99 private static boolean compareIPAddrs(InetAddress addr1, String host) { 100 try { 101 InetAddress addr2 = InetAddress.getByName(host); 102 return addr1.equals(addr2); 103 } catch (IOException e) { 104 throw new UncheckedIOException(e); 105 } 106 } 107 108 private static void handshake(String name, SSLSocket sock) throws IOException { 109 if (name == null) { 110 // no name set. No need to check 111 return; 112 } else if (name.equals("127.0.0.1")) { 113 name = "localhost"; 114 } 115 final String fname = name; 116 final InetAddress addr1 = InetAddress.getByName(name); 117 SSLParameters params = sock.getSSLParameters(); 118 SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) { 119 public boolean matches (SNIServerName n) { 120 String host = ((SNIHostName)n).getAsciiName(); 121 if (host.equals("127.0.0.1")) 122 host = "localhost"; 123 boolean cmp = host.equalsIgnoreCase(fname); 124 if (cmp) 125 return true; 126 return compareIPAddrs(addr1, host); 127 } 128 }; 129 List<SNIMatcher> list = List.of(matcher); 130 params.setSNIMatchers(list); 131 sock.setSSLParameters(params); 132 sock.getSession(); // blocks until handshake done 133 } 134 135 void close() { 136 stopping = true; 137 streams.forEach((i, q) -> { 138 q.close(); 139 }); 140 try { 141 socket.close(); 142 // TODO: put a reset on each stream 143 } catch (IOException e) { 144 } 145 } 146 147 private void readPreface() throws IOException { 148 int len = clientPreface.length; 149 byte[] bytes = new byte[len]; 150 is.readNBytes(bytes, 0, len); 151 if (Arrays.compare(clientPreface, bytes) != 0) { 152 throw new IOException("Invalid preface: " + new String(bytes, 0, len)); 153 } 154 } 155 156 String doUpgrade() throws IOException { 157 String upgrade = readHttp1Request(); 158 String h2c = getHeader(upgrade, "Upgrade"); 159 if (h2c == null || !h2c.equals("h2c")) { 160 System.err.println("Server:HEADERS: " + upgrade); 161 throw new IOException("Bad upgrade 1 " + h2c); 162 } 163 164 sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade", 165 "Upgrade", "h2c"); 166 167 sendSettingsFrame(); 168 readPreface(); 169 170 String clientSettingsString = getHeader(upgrade, "HTTP2-Settings"); 171 clientSettings = getSettingsFromString(clientSettingsString); 172 173 return upgrade; 174 } 175 176 /** 177 * Decodes the given, Client, settings payload provided in base64 HTTP1 178 * header value. 179 */ 180 private SettingsFrame getSettingsFromString(String s) throws IOException { 181 Base64.Decoder decoder = Base64.getUrlDecoder(); 182 byte[] payload = decoder.decode(s); 183 ByteBuffer bb1 = ByteBuffer.wrap(payload); 184 // simulate header of Settings Frame 185 ByteBuffer bb0 = ByteBuffer.wrap( 186 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0}); 187 List<Http2Frame> frames = new ArrayList<>(); 188 FramesDecoder reader = new FramesDecoder(frames::add); 189 reader.decode(ByteBufferReference.of(bb0)); 190 reader.decode(ByteBufferReference.of(bb1)); 191 if (frames.size()!=1) 192 throw new IOException("Expected 1 frame got "+frames.size()) ; 193 Http2Frame frame = frames.get(0); 194 if (!(frame instanceof SettingsFrame)) 195 throw new IOException("Expected SettingsFrame"); 196 return (SettingsFrame)frame; 197 } 198 199 void run() throws Exception { 200 String upgrade = null; 201 if (!secure) { 202 upgrade = doUpgrade(); 203 } else { 204 readPreface(); 205 sendSettingsFrame(true); 206 clientSettings = (SettingsFrame) readFrame(); 207 if (clientSettings.getFlag(SettingsFrame.ACK)) { 208 // we received the ack to our frame first 209 clientSettings = (SettingsFrame) readFrame(); 210 } 211 nextstream = 1; 212 } 213 214 System.out.println("ServerSettings: " + serverSettings); 215 System.out.println("ClientSettings: " + clientSettings); 216 217 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 218 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 219 220 if (!secure) { 221 createPrimordialStream(upgrade); 222 nextstream = 3; 223 } 224 225 exec.submit(this::readLoop); 226 exec.submit(this::writeLoop); 227 } 228 229 static class BufferPool implements BufferHandler { 230 231 public void setMinBufferSize(int size) { 232 } 233 234 @Override 235 public ByteBuffer getBuffer() { 236 int size = 32 * 1024; 237 return ByteBuffer.allocate(size); 238 } 239 240 @Override 241 public void returnBuffer(ByteBuffer buffer) { 242 } 243 } 244 245 private void writeFrame(Http2Frame frame) throws IOException { 246 ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame); 247 //System.err.println("TestServer: Writing frame " + frame.toString()); 248 int c = 0; 249 for (ByteBufferReference ref : refs) { 250 ByteBuffer buf = ref.get(); 251 byte[] ba = buf.array(); 252 int start = buf.arrayOffset() + buf.position(); 253 c += buf.remaining(); 254 os.write(ba, start, buf.remaining()); 255 } 256 os.flush(); 257 //System.err.printf("TestServer: wrote %d bytes\n", c); 258 } 259 260 void handleStreamReset(ResetFrame resetFrame) throws IOException { 261 // TODO: cleanup 262 throw new IOException("Stream reset"); 263 } 264 265 private void handleCommonFrame(Http2Frame f) throws IOException { 266 if (f instanceof SettingsFrame) { 267 SettingsFrame sf = (SettingsFrame) f; 268 if (sf.getFlag(SettingsFrame.ACK)) // ignore 269 { 270 return; 271 } 272 // otherwise acknowledge it 273 clientSettings = sf; 274 SettingsFrame frame = new SettingsFrame(); 275 frame.setFlag(SettingsFrame.ACK); 276 frame.streamid(0); 277 outputQ.put(frame); 278 return; 279 } 280 //System.err.println("TestServer: Received ---> " + f.toString()); 281 throw new UnsupportedOperationException("Not supported yet."); 282 } 283 284 void sendWindowUpdates(int len, int streamid) throws IOException { 285 if (len == 0) 286 return; 287 WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len); 288 outputQ.put(wup); 289 wup = new WindowUpdateFrame(0 , len); 290 outputQ.put(wup); 291 } 292 293 HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) { 294 HttpHeadersImpl headers = new HttpHeadersImpl(); 295 296 DecodingCallback cb = (name, value) -> { 297 headers.addHeader(name.toString(), value.toString()); 298 }; 299 300 for (HeaderFrame frame : frames) { 301 ByteBufferReference[] buffers = frame.getHeaderBlock(); 302 for (ByteBufferReference buffer : buffers) { 303 hpackIn.decode(buffer.get(), false, cb); 304 } 305 } 306 hpackIn.decode(EMPTY_BUFFER, true, cb); 307 return headers; 308 } 309 310 String getRequestLine(String request) { 311 int eol = request.indexOf(CRLF); 312 return request.substring(0, eol); 313 } 314 315 String getHeaders(String request) { 316 int start = request.indexOf(CRLF); 317 int end = request.indexOf(CRLFCRLF); 318 if (start == -1 || end == -1) { 319 throw new RuntimeException("Malformed request"); 320 } 321 return request.substring(start,end); 322 } 323 324 void addHeaders(String headers, HttpHeadersImpl hdrs) { 325 String[] hh = headers.split(CRLF); 326 for (String header : hh) { 327 int colon = header.indexOf(':'); 328 if (colon == -1) 329 continue; 330 String name = header.substring(0, colon); 331 String value = header.substring(colon+1); 332 while (value.startsWith(" ")) 333 value = value.substring(1); 334 hdrs.addHeader(name, value); 335 } 336 } 337 338 // First stream (1) comes from a plaintext HTTP/1.1 request 339 @SuppressWarnings({"rawtypes","unchecked"}) 340 void createPrimordialStream(String request) throws IOException { 341 HttpHeadersImpl headers = new HttpHeadersImpl(); 342 String requestLine = getRequestLine(request); 343 String[] tokens = requestLine.split(" "); 344 if (!tokens[2].equals("HTTP/1.1")) { 345 throw new IOException("bad request line"); 346 } 347 URI uri = null; 348 try { 349 uri = new URI(tokens[1]); 350 } catch (URISyntaxException e) { 351 throw new IOException(e); 352 } 353 String host = getHeader(request, "Host"); 354 if (host == null) { 355 throw new IOException("missing Host"); 356 } 357 358 headers.setHeader(":method", tokens[0]); 359 headers.setHeader(":scheme", "http"); // always in this case 360 headers.setHeader(":authority", host); 361 headers.setHeader(":path", uri.getPath()); 362 Queue q = new Queue(); 363 String body = getRequestBody(request); 364 addHeaders(getHeaders(request), headers); 365 headers.setHeader("Content-length", Integer.toString(body.length())); 366 367 addRequestBodyToQueue(body, q); 368 streams.put(1, q); 369 exec.submit(() -> { 370 handleRequest(headers, q, 1, true /*complete request has been read*/); 371 }); 372 } 373 374 // all other streams created here 375 @SuppressWarnings({"rawtypes","unchecked"}) 376 void createStream(HeaderFrame frame) throws IOException { 377 List<HeaderFrame> frames = new LinkedList<>(); 378 frames.add(frame); 379 int streamid = frame.streamid(); 380 if (streamid != nextstream) { 381 throw new IOException("unexpected stream id"); 382 } 383 nextstream += 2; 384 385 boolean endStream = false; 386 if (frame.getFlag(HeaderFrame.END_STREAM)) { 387 endStream = true; 388 } 389 390 while (!frame.getFlag(HeaderFrame.END_HEADERS)) { 391 Http2Frame f = readFrame(); 392 if (!(f instanceof HeaderFrame)) { 393 handleCommonFrame(f); // should only be error frames 394 } else { 395 frame = (HeaderFrame) f; 396 if (frame.getFlag(HeaderFrame.END_STREAM)) { 397 endStream = true; 398 } 399 frames.add(frame); 400 } 401 } 402 boolean endStreamReceived = endStream; 403 HttpHeadersImpl headers = decodeHeaders(frames); 404 Queue q = new Queue(); 405 streams.put(streamid, q); 406 exec.submit(() -> { 407 handleRequest(headers, q, streamid, endStreamReceived); 408 }); 409 } 410 411 // runs in own thread. Handles request from start to finish. Incoming frames 412 // for this stream/request delivered on Q 413 414 @SuppressWarnings({"rawtypes","unchecked"}) 415 void handleRequest(HttpHeadersImpl headers, 416 Queue queue, 417 int streamid, 418 boolean endStreamReceived) 419 { 420 String method = headers.firstValue(":method").orElse(""); 421 //System.out.println("method = " + method); 422 String path = headers.firstValue(":path").orElse(""); 423 //System.out.println("path = " + path); 424 String scheme = headers.firstValue(":scheme").orElse(""); 425 //System.out.println("scheme = " + scheme); 426 String authority = headers.firstValue(":authority").orElse(""); 427 //System.out.println("authority = " + authority); 428 System.err.printf("TestServer: %s %s\n", method, path); 429 HttpHeadersImpl rspheaders = new HttpHeadersImpl(); 430 int winsize = clientSettings.getParameter( 431 SettingsFrame.INITIAL_WINDOW_SIZE); 432 //System.err.println ("Stream window size = " + winsize); 433 434 final InputStream bis; 435 if (endStreamReceived && queue.size() == 0) { 436 System.err.println("Server: got END_STREAM for stream " + streamid); 437 bis = NullInputStream.INSTANCE; 438 } else { 439 System.err.println("Server: creating input stream for stream " + streamid); 440 bis = new BodyInputStream(queue, streamid, this); 441 } 442 try (bis; 443 BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) 444 { 445 String us = scheme + "://" + authority + path; 446 URI uri = new URI(us); 447 boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; 448 Http2TestExchange exchange = new Http2TestExchange(streamid, method, 449 headers, rspheaders, uri, bis, getSSLSession(), 450 bos, this, pushAllowed); 451 452 // give to user 453 Http2Handler handler = server.getHandlerFor(uri.getPath()); 454 handler.handle(exchange); 455 456 // everything happens in the exchange from here. Hopefully will 457 // return though. 458 } catch (Throwable e) { 459 System.err.println("TestServer: handleRequest exception: " + e); 460 e.printStackTrace(); 461 } 462 } 463 464 private SSLSession getSSLSession() { 465 if (! (socket instanceof SSLSocket)) 466 return null; 467 SSLSocket ssl = (SSLSocket)socket; 468 return ssl.getSession(); 469 } 470 // Runs in own thread 471 472 @SuppressWarnings({"rawtypes","unchecked"}) 473 void readLoop() { 474 try { 475 while (!stopping) { 476 Http2Frame frame = readFrame(); 477 //System.err.printf("TestServer: received frame %s\n", frame); 478 int stream = frame.streamid(); 479 if (stream == 0) { 480 if (frame.type() == WindowUpdateFrame.TYPE) { 481 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 482 updateConnectionWindow(wup.getUpdate()); 483 } else { 484 // other common frame types 485 handleCommonFrame(frame); 486 } 487 } else { 488 Queue q = streams.get(stream); 489 if (frame.type() == HeadersFrame.TYPE) { 490 if (q != null) { 491 System.err.println("HEADERS frame for existing stream! Error."); 492 // TODO: close connection 493 continue; 494 } else { 495 createStream((HeadersFrame) frame); 496 } 497 } else { 498 if (q == null && !pushStreams.contains(stream)) { 499 System.err.printf("Non Headers frame received with"+ 500 " non existing stream (%d) ", frame.streamid()); 501 System.err.println(frame); 502 continue; 503 } 504 if (frame.type() == WindowUpdateFrame.TYPE) { 505 WindowUpdateFrame wup = (WindowUpdateFrame) frame; 506 synchronized (updaters) { 507 Consumer<Integer> r = updaters.get(stream); 508 r.accept(wup.getUpdate()); 509 } 510 } else { 511 q.put(frame); 512 } 513 } 514 } 515 } 516 } catch (Throwable e) { 517 if (!stopping) { 518 System.err.println("Http server reader thread shutdown"); 519 e.printStackTrace(); 520 } 521 close(); 522 } 523 } 524 525 ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) { 526 List<ByteBuffer> buffers = new LinkedList<>(); 527 528 ByteBuffer buf = getBuffer(); 529 boolean encoded; 530 for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) { 531 List<String> values = entry.getValue(); 532 String key = entry.getKey().toLowerCase(); 533 for (String value : values) { 534 do { 535 hpackOut.header(key, value); 536 encoded = hpackOut.encode(buf); 537 if (!encoded) { 538 buf.flip(); 539 buffers.add(buf); 540 buf = getBuffer(); 541 } 542 } while (!encoded); 543 } 544 } 545 buf.flip(); 546 buffers.add(buf); 547 return ByteBufferReference.toReferences(buffers.toArray(bbarray)); 548 } 549 550 static void closeIgnore(Closeable c) { 551 try { 552 c.close(); 553 } catch (IOException e) {} 554 } 555 556 // Runs in own thread 557 void writeLoop() { 558 try { 559 while (!stopping) { 560 Http2Frame frame; 561 try { 562 frame = outputQ.take(); 563 } catch(IOException x) { 564 if (stopping && x.getCause() instanceof InterruptedException) { 565 break; 566 } else throw x; 567 } 568 if (frame instanceof ResponseHeaders) { 569 ResponseHeaders rh = (ResponseHeaders)frame; 570 HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers)); 571 writeFrame(hf); 572 } else if (frame instanceof OutgoingPushPromise) { 573 handlePush((OutgoingPushPromise)frame); 574 } else 575 writeFrame(frame); 576 } 577 System.err.println("TestServer: Connection writer stopping"); 578 } catch (Throwable e) { 579 e.printStackTrace(); 580 /*close(); 581 if (!stopping) { 582 e.printStackTrace(); 583 System.err.println("TestServer: writeLoop exception: " + e); 584 }*/ 585 } 586 } 587 588 private void handlePush(OutgoingPushPromise op) throws IOException { 589 int promisedStreamid = nextPushStreamId; 590 PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0); 591 pushStreams.add(promisedStreamid); 592 nextPushStreamId += 2; 593 pp.streamid(op.parentStream); 594 writeFrame(pp); 595 final InputStream ii = op.is; 596 final BodyOutputStream oo = new BodyOutputStream( 597 promisedStreamid, 598 clientSettings.getParameter( 599 SettingsFrame.INITIAL_WINDOW_SIZE), this); 600 oo.goodToGo(); 601 exec.submit(() -> { 602 try { 603 ResponseHeaders oh = getPushResponse(promisedStreamid); 604 outputQ.put(oh); 605 ii.transferTo(oo); 606 } catch (Throwable ex) { 607 System.err.printf("TestServer: pushing response error: %s\n", 608 ex.toString()); 609 } finally { 610 closeIgnore(ii); 611 closeIgnore(oo); 612 } 613 }); 614 615 } 616 617 // returns a minimal response with status 200 618 // that is the response to the push promise just sent 619 private ResponseHeaders getPushResponse(int streamid) { 620 HttpHeadersImpl h = new HttpHeadersImpl(); 621 h.addHeader(":status", "200"); 622 ResponseHeaders oh = new ResponseHeaders(h); 623 oh.streamid(streamid); 624 oh.setFlag(HeaderFrame.END_HEADERS); 625 return oh; 626 } 627 628 private ByteBuffer getBuffer() { 629 return ByteBuffer.allocate(8 * 1024); 630 } 631 632 private Http2Frame readFrame() throws IOException { 633 byte[] buf = new byte[9]; 634 if (is.readNBytes(buf, 0, 9) != 9) 635 throw new IOException("readFrame: connection closed"); 636 int len = 0; 637 for (int i = 0; i < 3; i++) { 638 int n = buf[i] & 0xff; 639 //System.err.println("n = " + n); 640 len = (len << 8) + n; 641 } 642 byte[] rest = new byte[len]; 643 int n = is.readNBytes(rest, 0, len); 644 if (n != len) 645 throw new IOException("Error reading frame"); 646 List<Http2Frame> frames = new ArrayList<>(); 647 FramesDecoder reader = new FramesDecoder(frames::add); 648 reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf))); 649 reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest))); 650 if (frames.size()!=1) 651 throw new IOException("Expected 1 frame got "+frames.size()) ; 652 653 return frames.get(0); 654 } 655 656 void sendSettingsFrame() throws IOException { 657 sendSettingsFrame(false); 658 } 659 660 void sendSettingsFrame(boolean now) throws IOException { 661 if (now) { 662 writeFrame(serverSettings); 663 } else { 664 outputQ.put(serverSettings); 665 } 666 } 667 668 String readUntil(String end) throws IOException { 669 int number = end.length(); 670 int found = 0; 671 StringBuilder sb = new StringBuilder(); 672 while (found < number) { 673 char expected = end.charAt(found); 674 int c = is.read(); 675 if (c == -1) { 676 throw new IOException("Connection closed"); 677 } 678 char c0 = (char) c; 679 sb.append(c0); 680 if (c0 != expected) { 681 found = 0; 682 continue; 683 } 684 found++; 685 } 686 return sb.toString(); 687 } 688 689 private int getContentLength(String headers) { 690 return getIntHeader(headers, "Content-length"); 691 } 692 693 private int getIntHeader(String headers, String name) { 694 String val = getHeader(headers, name); 695 if (val == null) { 696 return -1; 697 } 698 return Integer.parseInt(val); 699 } 700 701 private String getHeader(String headers, String name) { 702 String headers1 = headers.toLowerCase(); // not efficient 703 name = CRLF + name.toLowerCase(); 704 int start = headers1.indexOf(name); 705 if (start == -1) { 706 return null; 707 } 708 start += 2; 709 int end = headers1.indexOf(CRLF, start); 710 String line = headers.substring(start, end); 711 start = line.indexOf(':'); 712 if (start == -1) { 713 return null; 714 } 715 return line.substring(start + 1).trim(); 716 } 717 718 final static String CRLF = "\r\n"; 719 final static String CRLFCRLF = "\r\n\r\n"; 720 721 String readHttp1Request() throws IOException { 722 String headers = readUntil(CRLF + CRLF); 723 int clen = getContentLength(headers); 724 // read the content. 725 byte[] buf = new byte[clen]; 726 is.readNBytes(buf, 0, clen); 727 String body = new String(buf, StandardCharsets.US_ASCII); 728 return headers + body; 729 } 730 731 void sendHttp1Response(int code, String msg, String... headers) throws IOException { 732 StringBuilder sb = new StringBuilder(); 733 sb.append("HTTP/1.1 ") 734 .append(code) 735 .append(' ') 736 .append(msg) 737 .append(CRLF); 738 int numheaders = headers.length; 739 for (int i = 0; i < numheaders; i += 2) { 740 sb.append(headers[i]) 741 .append(": ") 742 .append(headers[i + 1]) 743 .append(CRLF); 744 } 745 sb.append(CRLF); 746 String s = sb.toString(); 747 os.write(s.getBytes("US-ASCII")); 748 os.flush(); 749 } 750 751 private void unexpectedFrame(Http2Frame frame) { 752 System.err.println("OOPS. Unexpected"); 753 assert false; 754 } 755 756 final static ByteBuffer[] bbarray = new ByteBuffer[0]; 757 758 // wrapper around a BlockingQueue that throws an exception when it's closed 759 // Each stream has one of these 760 761 String getRequestBody(String request) { 762 int bodystart = request.indexOf(CRLF+CRLF); 763 String body; 764 if (bodystart == -1) 765 body = ""; 766 else 767 body = request.substring(bodystart+4); 768 return body; 769 } 770 771 @SuppressWarnings({"rawtypes","unchecked"}) 772 void addRequestBodyToQueue(String body, Queue q) throws IOException { 773 ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII)); 774 DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf)); 775 // only used for primordial stream 776 q.put(df); 777 } 778 779 // window updates done in main reader thread because they may 780 // be used to unblock BodyOutputStreams waiting for WUPs 781 782 HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>(); 783 784 void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) { 785 synchronized(updaters) { 786 updaters.put(streamid, r); 787 } 788 } 789 790 int sendWindow = 64 * 1024 - 1; // connection level send window 791 792 /** 793 * BodyOutputStreams call this to get the connection window first. 794 * 795 * @param amount 796 */ 797 synchronized void obtainConnectionWindow(int amount) throws InterruptedException { 798 while (amount > 0) { 799 int n = Math.min(amount, sendWindow); 800 amount -= n; 801 sendWindow -= n; 802 if (amount > 0) 803 wait(); 804 } 805 } 806 807 synchronized void updateConnectionWindow(int amount) { 808 sendWindow += amount; 809 notifyAll(); 810 } 811 812 // simplified output headers class. really just a type safe container 813 // for the hashmap. 814 815 static class ResponseHeaders extends Http2Frame { 816 HttpHeadersImpl headers; 817 818 ResponseHeaders(HttpHeadersImpl headers) { 819 super(0, 0); 820 this.headers = headers; 821 } 822 823 } 824 825 static class NullInputStream extends InputStream { 826 static final NullInputStream INSTANCE = new NullInputStream(); 827 private NullInputStream() {} 828 public int read() { return -1; } 829 public int available() { return 0; } 830 } 831 }