1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 import java.io.BufferedInputStream;
25 import java.io.BufferedOutputStream;
26 import java.io.Closeable;
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.io.InputStream;
30 import java.io.OutputStream;
31 import java.net.Socket;
32 import java.net.URI;
33 import java.net.InetAddress;
34 import javax.net.ssl.*;
35 import java.net.URISyntaxException;
36 import java.nio.ByteBuffer;
37 import java.nio.charset.StandardCharsets;
38 import java.util.*;
39 import java.util.concurrent.ExecutorService;
40 import java.util.function.Consumer;
41
42 import jdk.incubator.http.internal.common.ByteBufferReference;
43 import jdk.incubator.http.internal.frame.FramesDecoder;
44
45 import jdk.incubator.http.internal.common.BufferHandler;
46 import jdk.incubator.http.internal.common.HttpHeadersImpl;
47 import jdk.incubator.http.internal.common.Queue;
48 import jdk.incubator.http.internal.frame.*;
49 import jdk.incubator.http.internal.hpack.Decoder;
50 import jdk.incubator.http.internal.hpack.DecodingCallback;
51 import jdk.incubator.http.internal.hpack.Encoder;
52 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
53
54 /**
55 * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
56 * or HTTPS opened using "h2" ALPN.
57 */
58 public class Http2TestServerConnection {
59 final Http2TestServer server;
60 @SuppressWarnings({"rawtypes","unchecked"})
61 final Map<Integer, Queue> streams; // input q per stream
62 final HashSet<Integer> pushStreams;
63 final Queue<Http2Frame> outputQ;
64 volatile int nextstream;
65 final Socket socket;
66 final InputStream is;
67 final OutputStream os;
68 volatile Encoder hpackOut;
69 volatile Decoder hpackIn;
70 volatile SettingsFrame clientSettings;
71 final SettingsFrame serverSettings;
72 final ExecutorService exec;
73 final boolean secure;
74 volatile boolean stopping;
75 volatile int nextPushStreamId = 2;
76
77 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
78 final static byte[] EMPTY_BARRAY = new byte[0];
79
80 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
81
82 Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException {
83 if (socket instanceof SSLSocket) {
84 handshake(server.serverName(), (SSLSocket)socket);
85 }
86 System.err.println("TestServer: New connection from " + socket);
87 this.server = server;
88 this.streams = Collections.synchronizedMap(new HashMap<>());
89 this.outputQ = new Queue<>();
90 this.socket = socket;
91 this.serverSettings = SettingsFrame.getDefaultSettings();
92 this.exec = server.exec;
93 this.secure = server.secure;
94 this.pushStreams = new HashSet<>();
95 is = new BufferedInputStream(socket.getInputStream());
96 os = new BufferedOutputStream(socket.getOutputStream());
97 }
98
99 private static boolean compareIPAddrs(InetAddress addr1, String host) {
100 try {
101 InetAddress addr2 = InetAddress.getByName(host);
102 return addr1.equals(addr2);
103 } catch (IOException e) {
104 throw new UncheckedIOException(e);
105 }
106 }
107
108 private static void handshake(String name, SSLSocket sock) throws IOException {
109 if (name == null) {
110 // no name set. No need to check
111 return;
112 } else if (name.equals("127.0.0.1")) {
113 name = "localhost";
114 }
115 final String fname = name;
116 final InetAddress addr1 = InetAddress.getByName(name);
117 SSLParameters params = sock.getSSLParameters();
118 SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {
169
170 String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
171 clientSettings = getSettingsFromString(clientSettingsString);
172
173 return upgrade;
174 }
175
176 /**
177 * Decodes the given, Client, settings payload provided in base64 HTTP1
178 * header value.
179 */
180 private SettingsFrame getSettingsFromString(String s) throws IOException {
181 Base64.Decoder decoder = Base64.getUrlDecoder();
182 byte[] payload = decoder.decode(s);
183 ByteBuffer bb1 = ByteBuffer.wrap(payload);
184 // simulate header of Settings Frame
185 ByteBuffer bb0 = ByteBuffer.wrap(
186 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
187 List<Http2Frame> frames = new ArrayList<>();
188 FramesDecoder reader = new FramesDecoder(frames::add);
189 reader.decode(ByteBufferReference.of(bb0));
190 reader.decode(ByteBufferReference.of(bb1));
191 if (frames.size()!=1)
192 throw new IOException("Expected 1 frame got "+frames.size()) ;
193 Http2Frame frame = frames.get(0);
194 if (!(frame instanceof SettingsFrame))
195 throw new IOException("Expected SettingsFrame");
196 return (SettingsFrame)frame;
197 }
198
199 void run() throws Exception {
200 String upgrade = null;
201 if (!secure) {
202 upgrade = doUpgrade();
203 } else {
204 readPreface();
205 sendSettingsFrame(true);
206 clientSettings = (SettingsFrame) readFrame();
207 if (clientSettings.getFlag(SettingsFrame.ACK)) {
208 // we received the ack to our frame first
209 clientSettings = (SettingsFrame) readFrame();
210 }
211 nextstream = 1;
212 }
213
214 System.out.println("ServerSettings: " + serverSettings);
215 System.out.println("ClientSettings: " + clientSettings);
216
217 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
218 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
219
220 if (!secure) {
221 createPrimordialStream(upgrade);
222 nextstream = 3;
223 }
224
225 exec.submit(this::readLoop);
226 exec.submit(this::writeLoop);
227 }
228
229 static class BufferPool implements BufferHandler {
230
231 public void setMinBufferSize(int size) {
232 }
233
234 @Override
235 public ByteBuffer getBuffer() {
236 int size = 32 * 1024;
237 return ByteBuffer.allocate(size);
238 }
239
240 @Override
241 public void returnBuffer(ByteBuffer buffer) {
242 }
243 }
244
245 private void writeFrame(Http2Frame frame) throws IOException {
246 ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame);
247 //System.err.println("TestServer: Writing frame " + frame.toString());
248 int c = 0;
249 for (ByteBufferReference ref : refs) {
250 ByteBuffer buf = ref.get();
251 byte[] ba = buf.array();
252 int start = buf.arrayOffset() + buf.position();
253 c += buf.remaining();
254 os.write(ba, start, buf.remaining());
255 }
256 os.flush();
257 //System.err.printf("TestServer: wrote %d bytes\n", c);
258 }
259
260 void handleStreamReset(ResetFrame resetFrame) throws IOException {
261 // TODO: cleanup
262 throw new IOException("Stream reset");
263 }
264
265 private void handleCommonFrame(Http2Frame f) throws IOException {
266 if (f instanceof SettingsFrame) {
267 SettingsFrame sf = (SettingsFrame) f;
268 if (sf.getFlag(SettingsFrame.ACK)) // ignore
269 {
270 return;
271 }
272 // otherwise acknowledge it
273 clientSettings = sf;
274 SettingsFrame frame = new SettingsFrame();
275 frame.setFlag(SettingsFrame.ACK);
276 frame.streamid(0);
277 outputQ.put(frame);
278 return;
279 }
280 //System.err.println("TestServer: Received ---> " + f.toString());
281 throw new UnsupportedOperationException("Not supported yet.");
282 }
283
284 void sendWindowUpdates(int len, int streamid) throws IOException {
285 if (len == 0)
286 return;
287 WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
288 outputQ.put(wup);
289 wup = new WindowUpdateFrame(0 , len);
290 outputQ.put(wup);
291 }
292
293 HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) {
294 HttpHeadersImpl headers = new HttpHeadersImpl();
295
296 DecodingCallback cb = (name, value) -> {
297 headers.addHeader(name.toString(), value.toString());
298 };
299
300 for (HeaderFrame frame : frames) {
301 ByteBufferReference[] buffers = frame.getHeaderBlock();
302 for (ByteBufferReference buffer : buffers) {
303 hpackIn.decode(buffer.get(), false, cb);
304 }
305 }
306 hpackIn.decode(EMPTY_BUFFER, true, cb);
307 return headers;
308 }
309
310 String getRequestLine(String request) {
311 int eol = request.indexOf(CRLF);
312 return request.substring(0, eol);
313 }
314
315 String getHeaders(String request) {
316 int start = request.indexOf(CRLF);
317 int end = request.indexOf(CRLFCRLF);
318 if (start == -1 || end == -1) {
319 throw new RuntimeException("Malformed request");
320 }
321 return request.substring(start,end);
322 }
323
342 String requestLine = getRequestLine(request);
343 String[] tokens = requestLine.split(" ");
344 if (!tokens[2].equals("HTTP/1.1")) {
345 throw new IOException("bad request line");
346 }
347 URI uri = null;
348 try {
349 uri = new URI(tokens[1]);
350 } catch (URISyntaxException e) {
351 throw new IOException(e);
352 }
353 String host = getHeader(request, "Host");
354 if (host == null) {
355 throw new IOException("missing Host");
356 }
357
358 headers.setHeader(":method", tokens[0]);
359 headers.setHeader(":scheme", "http"); // always in this case
360 headers.setHeader(":authority", host);
361 headers.setHeader(":path", uri.getPath());
362 Queue q = new Queue();
363 String body = getRequestBody(request);
364 addHeaders(getHeaders(request), headers);
365 headers.setHeader("Content-length", Integer.toString(body.length()));
366
367 addRequestBodyToQueue(body, q);
368 streams.put(1, q);
369 exec.submit(() -> {
370 handleRequest(headers, q, 1, true /*complete request has been read*/);
371 });
372 }
373
374 // all other streams created here
375 @SuppressWarnings({"rawtypes","unchecked"})
376 void createStream(HeaderFrame frame) throws IOException {
377 List<HeaderFrame> frames = new LinkedList<>();
378 frames.add(frame);
379 int streamid = frame.streamid();
380 if (streamid != nextstream) {
381 throw new IOException("unexpected stream id");
382 }
384
385 boolean endStream = false;
386 if (frame.getFlag(HeaderFrame.END_STREAM)) {
387 endStream = true;
388 }
389
390 while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
391 Http2Frame f = readFrame();
392 if (!(f instanceof HeaderFrame)) {
393 handleCommonFrame(f); // should only be error frames
394 } else {
395 frame = (HeaderFrame) f;
396 if (frame.getFlag(HeaderFrame.END_STREAM)) {
397 endStream = true;
398 }
399 frames.add(frame);
400 }
401 }
402 boolean endStreamReceived = endStream;
403 HttpHeadersImpl headers = decodeHeaders(frames);
404 Queue q = new Queue();
405 streams.put(streamid, q);
406 exec.submit(() -> {
407 handleRequest(headers, q, streamid, endStreamReceived);
408 });
409 }
410
411 // runs in own thread. Handles request from start to finish. Incoming frames
412 // for this stream/request delivered on Q
413
414 @SuppressWarnings({"rawtypes","unchecked"})
415 void handleRequest(HttpHeadersImpl headers,
416 Queue queue,
417 int streamid,
418 boolean endStreamReceived)
419 {
420 String method = headers.firstValue(":method").orElse("");
421 //System.out.println("method = " + method);
422 String path = headers.firstValue(":path").orElse("");
423 //System.out.println("path = " + path);
424 String scheme = headers.firstValue(":scheme").orElse("");
425 //System.out.println("scheme = " + scheme);
426 String authority = headers.firstValue(":authority").orElse("");
427 //System.out.println("authority = " + authority);
428 System.err.printf("TestServer: %s %s\n", method, path);
429 HttpHeadersImpl rspheaders = new HttpHeadersImpl();
430 int winsize = clientSettings.getParameter(
431 SettingsFrame.INITIAL_WINDOW_SIZE);
432 //System.err.println ("Stream window size = " + winsize);
433
434 final InputStream bis;
435 if (endStreamReceived && queue.size() == 0) {
436 System.err.println("Server: got END_STREAM for stream " + streamid);
437 bis = NullInputStream.INSTANCE;
438 } else {
439 System.err.println("Server: creating input stream for stream " + streamid);
440 bis = new BodyInputStream(queue, streamid, this);
441 }
442 try (bis;
443 BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
444 {
445 String us = scheme + "://" + authority + path;
446 URI uri = new URI(us);
447 boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
448 Http2TestExchange exchange = new Http2TestExchange(streamid, method,
449 headers, rspheaders, uri, bis, getSSLSession(),
450 bos, this, pushAllowed);
451
452 // give to user
453 Http2Handler handler = server.getHandlerFor(uri.getPath());
454 handler.handle(exchange);
455
456 // everything happens in the exchange from here. Hopefully will
457 // return though.
458 } catch (Throwable e) {
459 System.err.println("TestServer: handleRequest exception: " + e);
460 e.printStackTrace();
461 }
462 }
463
464 private SSLSession getSSLSession() {
465 if (! (socket instanceof SSLSocket))
466 return null;
467 SSLSocket ssl = (SSLSocket)socket;
468 return ssl.getSession();
490 if (q != null) {
491 System.err.println("HEADERS frame for existing stream! Error.");
492 // TODO: close connection
493 continue;
494 } else {
495 createStream((HeadersFrame) frame);
496 }
497 } else {
498 if (q == null && !pushStreams.contains(stream)) {
499 System.err.printf("Non Headers frame received with"+
500 " non existing stream (%d) ", frame.streamid());
501 System.err.println(frame);
502 continue;
503 }
504 if (frame.type() == WindowUpdateFrame.TYPE) {
505 WindowUpdateFrame wup = (WindowUpdateFrame) frame;
506 synchronized (updaters) {
507 Consumer<Integer> r = updaters.get(stream);
508 r.accept(wup.getUpdate());
509 }
510 } else {
511 q.put(frame);
512 }
513 }
514 }
515 }
516 } catch (Throwable e) {
517 if (!stopping) {
518 System.err.println("Http server reader thread shutdown");
519 e.printStackTrace();
520 }
521 close();
522 }
523 }
524
525 ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) {
526 List<ByteBuffer> buffers = new LinkedList<>();
527
528 ByteBuffer buf = getBuffer();
529 boolean encoded;
530 for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
531 List<String> values = entry.getValue();
532 String key = entry.getKey().toLowerCase();
533 for (String value : values) {
534 do {
535 hpackOut.header(key, value);
536 encoded = hpackOut.encode(buf);
537 if (!encoded) {
538 buf.flip();
539 buffers.add(buf);
540 buf = getBuffer();
541 }
542 } while (!encoded);
543 }
544 }
545 buf.flip();
546 buffers.add(buf);
547 return ByteBufferReference.toReferences(buffers.toArray(bbarray));
548 }
549
550 static void closeIgnore(Closeable c) {
551 try {
552 c.close();
553 } catch (IOException e) {}
554 }
555
556 // Runs in own thread
557 void writeLoop() {
558 try {
559 while (!stopping) {
560 Http2Frame frame;
561 try {
562 frame = outputQ.take();
563 } catch(IOException x) {
564 if (stopping && x.getCause() instanceof InterruptedException) {
565 break;
566 } else throw x;
567 }
570 HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
571 writeFrame(hf);
572 } else if (frame instanceof OutgoingPushPromise) {
573 handlePush((OutgoingPushPromise)frame);
574 } else
575 writeFrame(frame);
576 }
577 System.err.println("TestServer: Connection writer stopping");
578 } catch (Throwable e) {
579 e.printStackTrace();
580 /*close();
581 if (!stopping) {
582 e.printStackTrace();
583 System.err.println("TestServer: writeLoop exception: " + e);
584 }*/
585 }
586 }
587
588 private void handlePush(OutgoingPushPromise op) throws IOException {
589 int promisedStreamid = nextPushStreamId;
590 PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0);
591 pushStreams.add(promisedStreamid);
592 nextPushStreamId += 2;
593 pp.streamid(op.parentStream);
594 writeFrame(pp);
595 final InputStream ii = op.is;
596 final BodyOutputStream oo = new BodyOutputStream(
597 promisedStreamid,
598 clientSettings.getParameter(
599 SettingsFrame.INITIAL_WINDOW_SIZE), this);
600 oo.goodToGo();
601 exec.submit(() -> {
602 try {
603 ResponseHeaders oh = getPushResponse(promisedStreamid);
604 outputQ.put(oh);
605 ii.transferTo(oo);
606 } catch (Throwable ex) {
607 System.err.printf("TestServer: pushing response error: %s\n",
608 ex.toString());
609 } finally {
610 closeIgnore(ii);
611 closeIgnore(oo);
612 }
613 });
614
615 }
616
617 // returns a minimal response with status 200
618 // that is the response to the push promise just sent
619 private ResponseHeaders getPushResponse(int streamid) {
628 private ByteBuffer getBuffer() {
629 return ByteBuffer.allocate(8 * 1024);
630 }
631
632 private Http2Frame readFrame() throws IOException {
633 byte[] buf = new byte[9];
634 if (is.readNBytes(buf, 0, 9) != 9)
635 throw new IOException("readFrame: connection closed");
636 int len = 0;
637 for (int i = 0; i < 3; i++) {
638 int n = buf[i] & 0xff;
639 //System.err.println("n = " + n);
640 len = (len << 8) + n;
641 }
642 byte[] rest = new byte[len];
643 int n = is.readNBytes(rest, 0, len);
644 if (n != len)
645 throw new IOException("Error reading frame");
646 List<Http2Frame> frames = new ArrayList<>();
647 FramesDecoder reader = new FramesDecoder(frames::add);
648 reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf)));
649 reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest)));
650 if (frames.size()!=1)
651 throw new IOException("Expected 1 frame got "+frames.size()) ;
652
653 return frames.get(0);
654 }
655
656 void sendSettingsFrame() throws IOException {
657 sendSettingsFrame(false);
658 }
659
660 void sendSettingsFrame(boolean now) throws IOException {
661 if (now) {
662 writeFrame(serverSettings);
663 } else {
664 outputQ.put(serverSettings);
665 }
666 }
667
668 String readUntil(String end) throws IOException {
669 int number = end.length();
704 int start = headers1.indexOf(name);
705 if (start == -1) {
706 return null;
707 }
708 start += 2;
709 int end = headers1.indexOf(CRLF, start);
710 String line = headers.substring(start, end);
711 start = line.indexOf(':');
712 if (start == -1) {
713 return null;
714 }
715 return line.substring(start + 1).trim();
716 }
717
718 final static String CRLF = "\r\n";
719 final static String CRLFCRLF = "\r\n\r\n";
720
721 String readHttp1Request() throws IOException {
722 String headers = readUntil(CRLF + CRLF);
723 int clen = getContentLength(headers);
724 // read the content.
725 byte[] buf = new byte[clen];
726 is.readNBytes(buf, 0, clen);
727 String body = new String(buf, StandardCharsets.US_ASCII);
728 return headers + body;
729 }
730
731 void sendHttp1Response(int code, String msg, String... headers) throws IOException {
732 StringBuilder sb = new StringBuilder();
733 sb.append("HTTP/1.1 ")
734 .append(code)
735 .append(' ')
736 .append(msg)
737 .append(CRLF);
738 int numheaders = headers.length;
739 for (int i = 0; i < numheaders; i += 2) {
740 sb.append(headers[i])
741 .append(": ")
742 .append(headers[i + 1])
743 .append(CRLF);
744 }
745 sb.append(CRLF);
746 String s = sb.toString();
747 os.write(s.getBytes("US-ASCII"));
748 os.flush();
749 }
750
754 }
755
756 final static ByteBuffer[] bbarray = new ByteBuffer[0];
757
758 // wrapper around a BlockingQueue that throws an exception when it's closed
759 // Each stream has one of these
760
761 String getRequestBody(String request) {
762 int bodystart = request.indexOf(CRLF+CRLF);
763 String body;
764 if (bodystart == -1)
765 body = "";
766 else
767 body = request.substring(bodystart+4);
768 return body;
769 }
770
771 @SuppressWarnings({"rawtypes","unchecked"})
772 void addRequestBodyToQueue(String body, Queue q) throws IOException {
773 ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
774 DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf));
775 // only used for primordial stream
776 q.put(df);
777 }
778
779 // window updates done in main reader thread because they may
780 // be used to unblock BodyOutputStreams waiting for WUPs
781
782 HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
783
784 void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
785 synchronized(updaters) {
786 updaters.put(streamid, r);
787 }
788 }
789
790 int sendWindow = 64 * 1024 - 1; // connection level send window
791
792 /**
793 * BodyOutputStreams call this to get the connection window first.
794 *
|
1 /*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24 import java.io.BufferedInputStream;
25 import java.io.BufferedOutputStream;
26 import java.io.Closeable;
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.io.InputStream;
30 import java.io.OutputStream;
31 import java.net.Socket;
32 import java.net.URI;
33 import java.net.InetAddress;
34 import javax.net.ssl.*;
35 import java.net.URISyntaxException;
36 import java.nio.ByteBuffer;
37 import java.nio.charset.StandardCharsets;
38 import java.util.*;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.function.Consumer;
43 import jdk.incubator.http.internal.common.HttpHeadersImpl;
44 import jdk.incubator.http.internal.frame.*;
45 import jdk.incubator.http.internal.hpack.Decoder;
46 import jdk.incubator.http.internal.hpack.DecodingCallback;
47 import jdk.incubator.http.internal.hpack.Encoder;
48 import sun.net.www.http.ChunkedInputStream;
49 import sun.net.www.http.HttpClient;
50 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
51
52 /**
53 * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
54 * or HTTPS opened using "h2" ALPN.
55 */
56 public class Http2TestServerConnection {
57 final Http2TestServer server;
58 @SuppressWarnings({"rawtypes","unchecked"})
59 final Map<Integer, Queue> streams; // input q per stream
60 final Map<Integer, BodyOutputStream> outStreams; // output q per stream
61 final HashSet<Integer> pushStreams;
62 final Queue<Http2Frame> outputQ;
63 volatile int nextstream;
64 final Socket socket;
65 final Http2TestExchangeSupplier exchangeSupplier;
66 final InputStream is;
67 final OutputStream os;
68 volatile Encoder hpackOut;
69 volatile Decoder hpackIn;
70 volatile SettingsFrame clientSettings;
71 final SettingsFrame serverSettings;
72 final ExecutorService exec;
73 final boolean secure;
74 volatile boolean stopping;
75 volatile int nextPushStreamId = 2;
76 ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
77
78 final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
79 final static byte[] EMPTY_BARRAY = new byte[0];
80 final Random random;
81
82 final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
83
84 static class Sentinel extends Http2Frame {
85 Sentinel() { super(-1,-1);}
86 }
87
88 class PingRequest {
89 final byte[] pingData;
90 final long pingStamp;
91 final CompletableFuture<Long> response;
92
93 PingRequest() {
94 pingData = new byte[8];
95 random.nextBytes(pingData);
96 pingStamp = System.currentTimeMillis();
97 response = new CompletableFuture<>();
98 }
99
100 PingFrame frame() {
101 return new PingFrame(0, pingData);
102 }
103
104 CompletableFuture<Long> response() {
105 return response;
106 }
107
108 void success() {
109 response.complete(System.currentTimeMillis() - pingStamp);
110 }
111
112 void fail(Throwable t) {
113 response.completeExceptionally(t);
114 }
115 }
116
117 static Sentinel sentinel;
118
119 Http2TestServerConnection(Http2TestServer server,
120 Socket socket,
121 Http2TestExchangeSupplier exchangeSupplier)
122 throws IOException
123 {
124 if (socket instanceof SSLSocket) {
125 handshake(server.serverName(), (SSLSocket)socket);
126 }
127 System.err.println("TestServer: New connection from " + socket);
128 this.server = server;
129 this.exchangeSupplier = exchangeSupplier;
130 this.streams = Collections.synchronizedMap(new HashMap<>());
131 this.outStreams = Collections.synchronizedMap(new HashMap<>());
132 this.outputQ = new Queue<>(sentinel);
133 this.random = new Random();
134 this.socket = socket;
135 this.socket.setTcpNoDelay(true);
136 this.serverSettings = SettingsFrame.getDefaultSettings();
137 this.exec = server.exec;
138 this.secure = server.secure;
139 this.pushStreams = new HashSet<>();
140 is = new BufferedInputStream(socket.getInputStream());
141 os = new BufferedOutputStream(socket.getOutputStream());
142 }
143
144 /**
145 * Sends a PING frame on this connection, and completes the returned
146 * CF when the PING ack is received. The CF is given
147 * an integer, whose value is the number of milliseconds
148 * between PING and ACK.
149 */
150 CompletableFuture<Long> sendPing() {
151 PingRequest ping = null;
152 try {
153 ping = new PingRequest();
154 pings.add(ping);
155 outputQ.put(ping.frame());
156 } catch (Throwable t) {
157 ping.fail(t);
158 }
159 return ping.response();
160 }
161
162 /**
163 * Returns the first PingRequest from Queue
164 */
165 private PingRequest getNextRequest() {
166 return pings.poll();
167 }
168
169 /**
170 * Handles incoming Ping, which could be an ack
171 * or a client originated Ping
172 */
173 void handlePing(PingFrame ping) throws IOException {
174 if (ping.streamid() != 0) {
175 System.err.println("Invalid ping received");
176 close();
177 return;
178 }
179 if (ping.getFlag(PingFrame.ACK)) {
180 // did we send a Ping?
181 PingRequest request = getNextRequest();
182 if (request == null) {
183 System.err.println("Invalid ping ACK received");
184 close();
185 return;
186 } else if (!Arrays.equals(request.pingData, ping.getData())) {
187 request.fail(new RuntimeException("Wrong ping data in ACK"));
188 } else {
189 request.success();
190 }
191 } else {
192 // client originated PING. Just send it back with ACK set
193 ping.setFlag(PingFrame.ACK);
194 outputQ.put(ping);
195 }
196 }
197
198 private static boolean compareIPAddrs(InetAddress addr1, String host) {
199 try {
200 InetAddress addr2 = InetAddress.getByName(host);
201 return addr1.equals(addr2);
202 } catch (IOException e) {
203 throw new UncheckedIOException(e);
204 }
205 }
206
207 private static void handshake(String name, SSLSocket sock) throws IOException {
208 if (name == null) {
209 // no name set. No need to check
210 return;
211 } else if (name.equals("127.0.0.1")) {
212 name = "localhost";
213 }
214 final String fname = name;
215 final InetAddress addr1 = InetAddress.getByName(name);
216 SSLParameters params = sock.getSSLParameters();
217 SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {
268
269 String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
270 clientSettings = getSettingsFromString(clientSettingsString);
271
272 return upgrade;
273 }
274
275 /**
276 * Decodes the given, Client, settings payload provided in base64 HTTP1
277 * header value.
278 */
279 private SettingsFrame getSettingsFromString(String s) throws IOException {
280 Base64.Decoder decoder = Base64.getUrlDecoder();
281 byte[] payload = decoder.decode(s);
282 ByteBuffer bb1 = ByteBuffer.wrap(payload);
283 // simulate header of Settings Frame
284 ByteBuffer bb0 = ByteBuffer.wrap(
285 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
286 List<Http2Frame> frames = new ArrayList<>();
287 FramesDecoder reader = new FramesDecoder(frames::add);
288 reader.decode(bb0);
289 reader.decode(bb1);
290 if (frames.size()!=1)
291 throw new IOException("Expected 1 frame got "+frames.size()) ;
292 Http2Frame frame = frames.get(0);
293 if (!(frame instanceof SettingsFrame))
294 throw new IOException("Expected SettingsFrame");
295 return (SettingsFrame)frame;
296 }
297
298 void run() throws Exception {
299 String upgrade = null;
300 if (!secure) {
301 upgrade = doUpgrade();
302 } else {
303 readPreface();
304 sendSettingsFrame(true);
305 clientSettings = (SettingsFrame) readFrame();
306 if (clientSettings.getFlag(SettingsFrame.ACK)) {
307 // we received the ack to our frame first
308 clientSettings = (SettingsFrame) readFrame();
309 }
310 nextstream = 1;
311 }
312
313 System.out.println("ServerSettings: " + serverSettings);
314 System.out.println("ClientSettings: " + clientSettings);
315
316 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
317 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
318
319 if (!secure) {
320 createPrimordialStream(upgrade);
321 nextstream = 3;
322 }
323
324 exec.submit(this::readLoop);
325 exec.submit(this::writeLoop);
326 }
327
328 private void writeFrame(Http2Frame frame) throws IOException {
329 List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
330 //System.err.println("TestServer: Writing frame " + frame.toString());
331 int c = 0;
332 for (ByteBuffer buf : bufs) {
333 byte[] ba = buf.array();
334 int start = buf.arrayOffset() + buf.position();
335 c += buf.remaining();
336 os.write(ba, start, buf.remaining());
337
338 // System.out.println("writing byte at a time");
339 // while (buf.hasRemaining()) {
340 // byte b = buf.get();
341 // os.write(b);
342 // os.flush();
343 // try {
344 // Thread.sleep(1);
345 // } catch(InterruptedException e) {
346 // UncheckedIOException uie = new UncheckedIOException(new IOException(""));
347 // uie.addSuppressed(e);
348 // throw uie;
349 // }
350 // }
351 }
352 os.flush();
353 //System.err.printf("TestServer: wrote %d bytes\n", c);
354 }
355
356 private void handleCommonFrame(Http2Frame f) throws IOException {
357 if (f instanceof SettingsFrame) {
358 SettingsFrame sf = (SettingsFrame) f;
359 if (sf.getFlag(SettingsFrame.ACK)) // ignore
360 {
361 return;
362 }
363 // otherwise acknowledge it
364 clientSettings = sf;
365 SettingsFrame frame = new SettingsFrame();
366 frame.setFlag(SettingsFrame.ACK);
367 frame.streamid(0);
368 outputQ.put(frame);
369 return;
370 } else if (f instanceof GoAwayFrame) {
371 System.err.println("Closing: "+ f.toString());
372 close();
373 } else if (f instanceof PingFrame) {
374 handlePing((PingFrame)f);
375 } else
376 throw new UnsupportedOperationException("Not supported yet: " + f.toString());
377 }
378
379 void sendWindowUpdates(int len, int streamid) throws IOException {
380 if (len == 0)
381 return;
382 WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
383 outputQ.put(wup);
384 wup = new WindowUpdateFrame(0 , len);
385 outputQ.put(wup);
386 }
387
388 HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException {
389 HttpHeadersImpl headers = new HttpHeadersImpl();
390
391 DecodingCallback cb = (name, value) -> {
392 headers.addHeader(name.toString(), value.toString());
393 };
394
395 for (HeaderFrame frame : frames) {
396 List<ByteBuffer> buffers = frame.getHeaderBlock();
397 for (ByteBuffer buffer : buffers) {
398 hpackIn.decode(buffer, false, cb);
399 }
400 }
401 hpackIn.decode(EMPTY_BUFFER, true, cb);
402 return headers;
403 }
404
405 String getRequestLine(String request) {
406 int eol = request.indexOf(CRLF);
407 return request.substring(0, eol);
408 }
409
410 String getHeaders(String request) {
411 int start = request.indexOf(CRLF);
412 int end = request.indexOf(CRLFCRLF);
413 if (start == -1 || end == -1) {
414 throw new RuntimeException("Malformed request");
415 }
416 return request.substring(start,end);
417 }
418
437 String requestLine = getRequestLine(request);
438 String[] tokens = requestLine.split(" ");
439 if (!tokens[2].equals("HTTP/1.1")) {
440 throw new IOException("bad request line");
441 }
442 URI uri = null;
443 try {
444 uri = new URI(tokens[1]);
445 } catch (URISyntaxException e) {
446 throw new IOException(e);
447 }
448 String host = getHeader(request, "Host");
449 if (host == null) {
450 throw new IOException("missing Host");
451 }
452
453 headers.setHeader(":method", tokens[0]);
454 headers.setHeader(":scheme", "http"); // always in this case
455 headers.setHeader(":authority", host);
456 headers.setHeader(":path", uri.getPath());
457 Queue q = new Queue(sentinel);
458 String body = getRequestBody(request);
459 addHeaders(getHeaders(request), headers);
460 headers.setHeader("Content-length", Integer.toString(body.length()));
461
462 addRequestBodyToQueue(body, q);
463 streams.put(1, q);
464 exec.submit(() -> {
465 handleRequest(headers, q, 1, true /*complete request has been read*/);
466 });
467 }
468
469 // all other streams created here
470 @SuppressWarnings({"rawtypes","unchecked"})
471 void createStream(HeaderFrame frame) throws IOException {
472 List<HeaderFrame> frames = new LinkedList<>();
473 frames.add(frame);
474 int streamid = frame.streamid();
475 if (streamid != nextstream) {
476 throw new IOException("unexpected stream id");
477 }
479
480 boolean endStream = false;
481 if (frame.getFlag(HeaderFrame.END_STREAM)) {
482 endStream = true;
483 }
484
485 while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
486 Http2Frame f = readFrame();
487 if (!(f instanceof HeaderFrame)) {
488 handleCommonFrame(f); // should only be error frames
489 } else {
490 frame = (HeaderFrame) f;
491 if (frame.getFlag(HeaderFrame.END_STREAM)) {
492 endStream = true;
493 }
494 frames.add(frame);
495 }
496 }
497 boolean endStreamReceived = endStream;
498 HttpHeadersImpl headers = decodeHeaders(frames);
499 Queue q = new Queue(sentinel);
500 streams.put(streamid, q);
501 exec.submit(() -> {
502 handleRequest(headers, q, streamid, endStreamReceived);
503 });
504 }
505
506 // runs in own thread. Handles request from start to finish. Incoming frames
507 // for this stream/request delivered on Q
508
509 @SuppressWarnings({"rawtypes","unchecked"})
510 void handleRequest(HttpHeadersImpl headers,
511 Queue queue,
512 int streamid,
513 boolean endStreamReceived)
514 {
515 String method = headers.firstValue(":method").orElse("");
516 //System.out.println("method = " + method);
517 String path = headers.firstValue(":path").orElse("");
518 //System.out.println("path = " + path);
519 String scheme = headers.firstValue(":scheme").orElse("");
520 //System.out.println("scheme = " + scheme);
521 String authority = headers.firstValue(":authority").orElse("");
522 //System.out.println("authority = " + authority);
523 System.err.printf("TestServer: %s %s\n", method, path);
524 HttpHeadersImpl rspheaders = new HttpHeadersImpl();
525 int winsize = clientSettings.getParameter(
526 SettingsFrame.INITIAL_WINDOW_SIZE);
527 //System.err.println ("Stream window size = " + winsize);
528
529 final InputStream bis;
530 if (endStreamReceived && queue.size() == 0) {
531 System.err.println("Server: got END_STREAM for stream " + streamid);
532 bis = NullInputStream.INSTANCE;
533 } else {
534 System.err.println("Server: creating input stream for stream " + streamid);
535 bis = new BodyInputStream(queue, streamid, this);
536 }
537 try (bis;
538 BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
539 {
540 outStreams.put(streamid, bos);
541 String us = scheme + "://" + authority + path;
542 URI uri = new URI(us);
543 boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
544 Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
545 headers, rspheaders, uri, bis, getSSLSession(),
546 bos, this, pushAllowed);
547
548 // give to user
549 Http2Handler handler = server.getHandlerFor(uri.getPath());
550 handler.handle(exchange);
551
552 // everything happens in the exchange from here. Hopefully will
553 // return though.
554 } catch (Throwable e) {
555 System.err.println("TestServer: handleRequest exception: " + e);
556 e.printStackTrace();
557 }
558 }
559
560 private SSLSession getSSLSession() {
561 if (! (socket instanceof SSLSocket))
562 return null;
563 SSLSocket ssl = (SSLSocket)socket;
564 return ssl.getSession();
586 if (q != null) {
587 System.err.println("HEADERS frame for existing stream! Error.");
588 // TODO: close connection
589 continue;
590 } else {
591 createStream((HeadersFrame) frame);
592 }
593 } else {
594 if (q == null && !pushStreams.contains(stream)) {
595 System.err.printf("Non Headers frame received with"+
596 " non existing stream (%d) ", frame.streamid());
597 System.err.println(frame);
598 continue;
599 }
600 if (frame.type() == WindowUpdateFrame.TYPE) {
601 WindowUpdateFrame wup = (WindowUpdateFrame) frame;
602 synchronized (updaters) {
603 Consumer<Integer> r = updaters.get(stream);
604 r.accept(wup.getUpdate());
605 }
606 } else if (frame.type() == ResetFrame.TYPE) {
607 // do orderly close on input q
608 // and close the output q immediately
609 // This should mean depending on what the
610 // handler is doing: either an EOF on read
611 // or an IOException if writing the response.
612 q.orderlyClose();
613 BodyOutputStream oq = outStreams.get(stream);
614 if (oq != null)
615 oq.closeInternal();
616
617 } else {
618 q.put(frame);
619 }
620 }
621 }
622 }
623 } catch (Throwable e) {
624 if (!stopping) {
625 System.err.println("Http server reader thread shutdown");
626 e.printStackTrace();
627 }
628 close();
629 }
630 }
631
632 List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) {
633 List<ByteBuffer> buffers = new LinkedList<>();
634
635 ByteBuffer buf = getBuffer();
636 boolean encoded;
637 for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
638 List<String> values = entry.getValue();
639 String key = entry.getKey().toLowerCase();
640 for (String value : values) {
641 do {
642 hpackOut.header(key, value);
643 encoded = hpackOut.encode(buf);
644 if (!encoded) {
645 buf.flip();
646 buffers.add(buf);
647 buf = getBuffer();
648 }
649 } while (!encoded);
650 }
651 }
652 buf.flip();
653 buffers.add(buf);
654 return buffers;
655 }
656
657 static void closeIgnore(Closeable c) {
658 try {
659 c.close();
660 } catch (IOException e) {}
661 }
662
663 // Runs in own thread
664 void writeLoop() {
665 try {
666 while (!stopping) {
667 Http2Frame frame;
668 try {
669 frame = outputQ.take();
670 } catch(IOException x) {
671 if (stopping && x.getCause() instanceof InterruptedException) {
672 break;
673 } else throw x;
674 }
677 HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
678 writeFrame(hf);
679 } else if (frame instanceof OutgoingPushPromise) {
680 handlePush((OutgoingPushPromise)frame);
681 } else
682 writeFrame(frame);
683 }
684 System.err.println("TestServer: Connection writer stopping");
685 } catch (Throwable e) {
686 e.printStackTrace();
687 /*close();
688 if (!stopping) {
689 e.printStackTrace();
690 System.err.println("TestServer: writeLoop exception: " + e);
691 }*/
692 }
693 }
694
695 private void handlePush(OutgoingPushPromise op) throws IOException {
696 int promisedStreamid = nextPushStreamId;
697 PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
698 HeaderFrame.END_HEADERS,
699 promisedStreamid,
700 encodeHeaders(op.headers),
701 0);
702 pushStreams.add(promisedStreamid);
703 nextPushStreamId += 2;
704 pp.streamid(op.parentStream);
705 writeFrame(pp);
706 final InputStream ii = op.is;
707 final BodyOutputStream oo = new BodyOutputStream(
708 promisedStreamid,
709 clientSettings.getParameter(
710 SettingsFrame.INITIAL_WINDOW_SIZE), this);
711 outStreams.put(promisedStreamid, oo);
712 oo.goodToGo();
713 exec.submit(() -> {
714 try {
715 ResponseHeaders oh = getPushResponse(promisedStreamid);
716 outputQ.put(oh);
717 ii.transferTo(oo);
718 } catch (Throwable ex) {
719 System.err.printf("TestServer: pushing response error: %s\n",
720 ex.toString());
721 } finally {
722 closeIgnore(ii);
723 closeIgnore(oo);
724 }
725 });
726
727 }
728
729 // returns a minimal response with status 200
730 // that is the response to the push promise just sent
731 private ResponseHeaders getPushResponse(int streamid) {
740 private ByteBuffer getBuffer() {
741 return ByteBuffer.allocate(8 * 1024);
742 }
743
744 private Http2Frame readFrame() throws IOException {
745 byte[] buf = new byte[9];
746 if (is.readNBytes(buf, 0, 9) != 9)
747 throw new IOException("readFrame: connection closed");
748 int len = 0;
749 for (int i = 0; i < 3; i++) {
750 int n = buf[i] & 0xff;
751 //System.err.println("n = " + n);
752 len = (len << 8) + n;
753 }
754 byte[] rest = new byte[len];
755 int n = is.readNBytes(rest, 0, len);
756 if (n != len)
757 throw new IOException("Error reading frame");
758 List<Http2Frame> frames = new ArrayList<>();
759 FramesDecoder reader = new FramesDecoder(frames::add);
760 reader.decode(ByteBuffer.wrap(buf));
761 reader.decode(ByteBuffer.wrap(rest));
762 if (frames.size()!=1)
763 throw new IOException("Expected 1 frame got "+frames.size()) ;
764
765 return frames.get(0);
766 }
767
768 void sendSettingsFrame() throws IOException {
769 sendSettingsFrame(false);
770 }
771
772 void sendSettingsFrame(boolean now) throws IOException {
773 if (now) {
774 writeFrame(serverSettings);
775 } else {
776 outputQ.put(serverSettings);
777 }
778 }
779
780 String readUntil(String end) throws IOException {
781 int number = end.length();
816 int start = headers1.indexOf(name);
817 if (start == -1) {
818 return null;
819 }
820 start += 2;
821 int end = headers1.indexOf(CRLF, start);
822 String line = headers.substring(start, end);
823 start = line.indexOf(':');
824 if (start == -1) {
825 return null;
826 }
827 return line.substring(start + 1).trim();
828 }
829
830 final static String CRLF = "\r\n";
831 final static String CRLFCRLF = "\r\n\r\n";
832
833 String readHttp1Request() throws IOException {
834 String headers = readUntil(CRLF + CRLF);
835 int clen = getContentLength(headers);
836 byte[] buf;
837 if (clen >= 0) {
838 // HTTP/1.1 fixed length content ( may be 0 ), read it
839 buf = new byte[clen];
840 is.readNBytes(buf, 0, clen);
841 } else {
842 // HTTP/1.1 chunked data, read it
843 buf = readChunkedInputStream(is);
844 }
845 String body = new String(buf, StandardCharsets.US_ASCII);
846 return headers + body;
847 }
848
849 // This is a quick hack to get a chunked input stream reader.
850 private static byte[] readChunkedInputStream(InputStream is) throws IOException {
851 ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
852 return cis.readAllBytes();
853 }
854
855 void sendHttp1Response(int code, String msg, String... headers) throws IOException {
856 StringBuilder sb = new StringBuilder();
857 sb.append("HTTP/1.1 ")
858 .append(code)
859 .append(' ')
860 .append(msg)
861 .append(CRLF);
862 int numheaders = headers.length;
863 for (int i = 0; i < numheaders; i += 2) {
864 sb.append(headers[i])
865 .append(": ")
866 .append(headers[i + 1])
867 .append(CRLF);
868 }
869 sb.append(CRLF);
870 String s = sb.toString();
871 os.write(s.getBytes("US-ASCII"));
872 os.flush();
873 }
874
878 }
879
880 final static ByteBuffer[] bbarray = new ByteBuffer[0];
881
882 // wrapper around a BlockingQueue that throws an exception when it's closed
883 // Each stream has one of these
884
885 String getRequestBody(String request) {
886 int bodystart = request.indexOf(CRLF+CRLF);
887 String body;
888 if (bodystart == -1)
889 body = "";
890 else
891 body = request.substring(bodystart+4);
892 return body;
893 }
894
895 @SuppressWarnings({"rawtypes","unchecked"})
896 void addRequestBodyToQueue(String body, Queue q) throws IOException {
897 ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
898 DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
899 // only used for primordial stream
900 q.put(df);
901 }
902
903 // window updates done in main reader thread because they may
904 // be used to unblock BodyOutputStreams waiting for WUPs
905
906 HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
907
908 void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
909 synchronized(updaters) {
910 updaters.put(streamid, r);
911 }
912 }
913
914 int sendWindow = 64 * 1024 - 1; // connection level send window
915
916 /**
917 * BodyOutputStreams call this to get the connection window first.
918 *
|