< prev index next >
1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 */
24 package java.net.http;
25
26 import java.io.IOException;
27 import java.net.InetSocketAddress;
28 import java.net.URI;
29 import java.nio.ByteBuffer;
30 import java.nio.charset.StandardCharsets;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CompletableFuture;
38 import sun.net.httpclient.hpack.Encoder;
39 import sun.net.httpclient.hpack.Decoder;
40 import static java.net.http.SettingsFrame.*;
41 import static java.net.http.Utils.BUFSIZE;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.Formatter;
45 import java.util.stream.Collectors;
46 import sun.net.httpclient.hpack.DecodingCallback;
47
48 /**
49 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
50 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
51 *
52 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
53 * to a HttpClientImpl.
54 *
55 * Creation cases:
56 * 1) upgraded HTTP/1.1 plain tcp connection
57 * 2) prior knowledge directly created plain tcp connection
58 * 3) directly created HTTP/2 SSL connection which uses ALPN.
59 *
60 * Each Http2Connection uses a thread for reading the socket and another
61 * for writing to the socket.
62 *
63 * Sending is done in the sendLoop() method where it reads frames off a blocking
64 * queue (Queue class) and writes them to the socket. Hpack header compression
65 * and outgoing stream creation is also done here, because these operations
66 * must be synchronized at the socket level. Stream objects send frames simply
67 * by placing them on the connection's output Queue.
68 *
69 * Another thread runs readLoop() which assembles incoming Http2Frames, and
70 * directs them to the appropriate Stream.incoming() or handles them directly
71 * itself. This thread performs hpack decompression and incoming stream creation
72 * (Server push). Incoming frames destined for a stream are provided by calling
73 * Stream.incoming().
74 */
75 class Http2Connection implements BufferHandler {
76
77 final Queue outputQ;
78 volatile boolean closed;
79
80 //-------------------------------------
81 final HttpConnection connection;
82 HttpClientImpl client;
83 final Http2ClientImpl client2;
84 Map<Integer,Stream> streams;
85 int nextstreamid = 3; // stream 1 is registered separately
86 int nextPushStream = 2;
87 Encoder hpackOut;
88 Decoder hpackIn;
89 SettingsFrame clientSettings, serverSettings;
90 ByteBufferConsumer bbc;
91 final LinkedList<ByteBuffer> freeList;
92 final String key; // for HttpClientImpl.connections map
93 FrameReader reader;
94
95 // Connection level flow control windows
96 int sendWindow = INITIAL_WINDOW_SIZE;
97
98 final static int DEFAULT_FRAME_SIZE = 16 * 1024;
99 private static ByteBuffer[] empty = new ByteBuffer[0];
100
101 final ExecutorWrapper executor;
102
103 /**
104 * This is established by the protocol spec and the peer will update it with
105 * WINDOW_UPDATEs, which affects the sendWindow.
106 */
107 final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
108
109 // TODO: need list of control frames from other threads
110 // that need to be sent
111
112 /**
113 * Case 1) Create from upgraded HTTP/1.1 connection.
114 * Is ready to use. Will not be SSL. exchange is the Exchange
115 * that initiated the connection, whose response will be delivered
116 * on a Stream.
117 */
118 Http2Connection(HttpConnection connection, Http2ClientImpl client2,
119 Exchange exchange) throws IOException, InterruptedException {
120 this.outputQ = new Queue();
121
122 //this.initialExchange = exchange;
123 assert !(connection instanceof SSLConnection);
124 this.connection = connection;
125 this.client = client2.client;
126 this.client2 = client2;
127 this.executor = client.executorWrapper();
128 this.freeList = new LinkedList<>();
129 this.key = keyFor(connection);
130 streams = Collections.synchronizedMap(new HashMap<>());
131 initCommon();
132 sendConnectionPreface();
133 Stream initialStream = createStream(exchange);
134 initialStream.registerStream(1);
135 initialStream.requestSent();
136 connection.configureBlocking(true);
137 // start reader and writer
138 executor.execute(() -> {
139 readLoop(connection.getRemaining());
140 }, null);
141
142 executor.execute(() -> {
143 sendLoop();
144 }, null);
145 }
146
147 // async style but completes immediately
148 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
149 Http2ClientImpl client2, Exchange exchange) {
150 CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
151 try {
152 Http2Connection c = new Http2Connection(connection, client2, exchange);
153 cf.complete(c);
154 } catch (IOException | InterruptedException e) {
155 cf.completeExceptionally(e);
156 }
157 return cf;
158 }
159
160 /**
161 * Cases 2) 3)
162 *
163 * request is request to be sent.
164 */
165 Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
166 InetSocketAddress proxy = request.proxy();
167 URI uri = request.uri();
168 String host = uri.getHost();
169 int port = uri.getPort();
170 InetSocketAddress addr = (proxy == null)
171 ? new InetSocketAddress(host, port)
172 : proxy;
173
174 this.key = keyFor(uri, proxy);
175 this.connection = HttpConnection.getConnection(addr, request);
176
177 streams = Collections.synchronizedMap(new HashMap<>());
178 this.client = request.client();
179 this.client2 = client.client2();
180 this.executor = client.executorWrapper();
181 this.freeList = new LinkedList<>();
182 this.outputQ = new Queue();
183 nextstreamid = 1;
184 initCommon();
185 connection.connect();
186 connection.configureBlocking(true);
187 sendConnectionPreface();
188 // start reader and writer
189 executor.execute(() -> {
190 readLoop(connection.getRemaining());
191 }, null);
192
193 executor.execute(() -> {
194 sendLoop();
195 }, null);
196 }
197
198 // NEW
199 synchronized void obtainSendWindow(int amount) throws InterruptedException {
200 while (amount > 0) {
201 int n = Math.min(amount, sendWindow);
202 sendWindow -= n;
203 amount -= n;
204 if (amount > 0)
205 wait();
206 }
207 }
208
209 synchronized void updateSendWindow(int amount) {
210 if (sendWindow == 0) {
211 sendWindow += amount;
212 notify();
213 } else
214 sendWindow += amount;
215 }
216
217 synchronized int sendWindow() {
218 return sendWindow;
219 }
220
221 static String keyFor(HttpConnection connection) {
222 boolean isProxy = connection.isProxied();
223 boolean isSecure = connection.isSecure();
224 InetSocketAddress addr = connection.address();
225
226 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
227 }
228
229 static String keyFor(URI uri, InetSocketAddress proxy) {
230 boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
231 boolean isProxy = proxy != null;
232
233 String host;
234 int port;
235
236 if (isProxy) {
237 host = proxy.getHostString();
238 port = proxy.getPort();
239 } else {
240 host = uri.getHost();
241 port = uri.getPort();
242 }
243 return keyString(isSecure, isProxy, host, port);
244 }
245
246 // {C,S}:{H:P}:host:port
247 // C indicates clear text connection "http"
248 // S indicates secure "https"
249 // H indicates host (direct) connection
250 // P indicates proxy
251 // Eg: "S:H:foo.com:80"
252 static String keyString(boolean secure, boolean proxy, String host, int port) {
253 char c1 = secure ? 'S' : 'C';
254 char c2 = proxy ? 'P' : 'H';
255
256 StringBuilder sb = new StringBuilder(128);
257 sb.append(c1).append(':').append(c2).append(':')
258 .append(host).append(':').append(port);
259 return sb.toString();
260 }
261
262 String key() {
263 return this.key;
264 }
265
266 void putConnection() {
267 client2.putConnection(this);
268 }
269
270 private static String toHexdump1(ByteBuffer bb) {
271 bb.mark();
272 StringBuilder sb = new StringBuilder(512);
273 Formatter f = new Formatter(sb);
274
275 while (bb.hasRemaining()) {
276 int i = Byte.toUnsignedInt(bb.get());
277 f.format("%02x:", i);
278 }
279 sb.deleteCharAt(sb.length()-1);
280 bb.reset();
281 return sb.toString();
282 }
283
284 private static String toHexdump(ByteBuffer bb) {
285 List<String> words = new ArrayList<>();
286 int i = 0;
287 bb.mark();
288 while (bb.hasRemaining()) {
289 if (i % 2 == 0) {
290 words.add("");
291 }
292 byte b = bb.get();
293 String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
294 words.set(i / 2, words.get(i / 2) + hex);
295 i++;
296 }
297 bb.reset();
298 return words.stream().collect(Collectors.joining(" "));
299 }
300
301 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
302 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
303
304 ByteBuffer[] buffers = frame.getHeaderBlock();
305 for (int i = 0; i < buffers.length; i++) {
306 hpackIn.decode(buffers[i], decoder, endOfHeaders);
307 }
308 }
309
310 int getInitialSendWindowSize() {
311 return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
312 }
313
314
315 @Override
316 public void returnBuffer(ByteBuffer buf) {
317 client.returnBuffer(buf);
318 }
319
320 void close() {
321 GoAwayFrame f = new GoAwayFrame();
322 f.setDebugData("Requested by user".getBytes());
323 // TODO: set last stream. For now zero ok.
324 try {
325 sendFrame(f);
326 } catch (IOException | InterruptedException e) {}
327 // Shouldn't throw InterruptedException as it can only happen
328 // if the socket send blocks which it shouldn't.
329 // connection.close(); <-- need to wait for ack and confirmation
330 // that requests completed.
331 }
332
333 @Override
334 public ByteBuffer getBuffer() {
335 return client.getBuffer();
336 }
337
338 /**
339 * Runs in one thread continuously reading off connection in blocking mode.
340 *
341 * @param initial
342 */
343 private void readLoop(ByteBuffer initial) {
344 try {
345 int n;
346 // initialize frame reader if necessary
347 if (reader == null) {
348 reader = new FrameReader(initial);
349 } else if (initial != null) {
350 reader.input(initial);
351 }
352
353 while (!closed) {
354 while (!reader.haveFrame()) {
355 ByteBuffer b = getBuffer();
356 n = connection.read(b);
357 if (n == -1) {
358 throw new IOException("Connection closed");
359 }
360 reader.input(b);
361 }
362 List<ByteBuffer> buffers = reader.frame();
363
364 ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
365 processFrame(bbc);
366 if (bbc.consumed()) {
367 reader = new FrameReader();
368 } else {
369 // buffers will only contain the left-over
370 reader = new FrameReader(buffers);
371 }
372 }
373 } catch (IOException e) {
374 String msg = Utils.stackTrace(e);
375 Log.logTrace(msg);
376 shutdown(e);
377 } catch (Throwable t) {
378 String msg = Utils.stackTrace(t);
379 Log.logTrace(msg);
380 shutdown(t);
381 }
382 }
383
384 void shutdown(Throwable t) {
385 System.err.println("Shutdown: " + t);
386 closed = true;
387 client2.deleteConnection(this);
388 Collection<Stream> c = streams.values();
389 for (Stream s : c) {
390 s.cancelImpl(t);
391 }
392 connection.close();
393 }
394
395 /**
396 * Handles stream 0 (common) frames that apply to whole connection and passes
397 * other stream specific frames to that Stream object.
398 *
399 * Invokes Stream.incoming() which is expected to process frame without
400 * blocking.
401 *
402 * @param bbc
403 * @throws IOException
404 * @throws InterruptedException
405 */
406 void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
407 Http2Frame frame = Http2Frame.readIncoming(bbc);
408 Log.logFrames(frame, "IN");
409 int streamid = frame.streamid();
410 if (streamid == 0) {
411 handleCommonFrame(frame);
412 } else {
413 Stream stream = getStream(streamid);
414 if (stream == null) {
415 // should never receive a frame with unknown stream id
416 resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
417 }
418 if (frame instanceof PushPromiseFrame) {
419 PushPromiseFrame pp = (PushPromiseFrame)frame;
420 handlePushPromise(stream, pp);
421 } else if (frame instanceof HeaderFrame) {
422 // decode headers (or continuation)
423 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
424 stream.incoming(frame);
425 } else
426 stream.incoming(frame);
427 }
428 }
429
430 private void handlePushPromise(Stream parent, PushPromiseFrame pp) throws IOException, InterruptedException {
431 HttpRequestImpl parentReq = parent.request;
432 int promisedStreamid = pp.getPromisedStream();
433 if (promisedStreamid != nextPushStream) {
434 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
435 return;
436 } else {
437 nextPushStream += 2;
438 }
439 HeaderDecoder decoder = new HeaderDecoder();
440 decodeHeaders(pp, decoder);
441 HttpHeadersImpl headers = decoder.headers();
442 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
443
444 Stream.PushedStream pushStream = createPushStream(parent, pushReq);
445 pushStream.registerStream(promisedStreamid);
446 parent.incoming_pushPromise(pushReq, pushStream);
447 }
448
449 private void handleCommonFrame(Http2Frame frame) throws IOException, InterruptedException {
450 switch (frame.type()) {
451 case SettingsFrame.TYPE:
452 { SettingsFrame f = (SettingsFrame)frame;
453 handleSettings(f);}
454 break;
455 case PingFrame.TYPE:
456 { PingFrame f = (PingFrame)frame;
457 handlePing(f);}
458 break;
459 case GoAwayFrame.TYPE:
460 { GoAwayFrame f = (GoAwayFrame)frame;
461 handleGoAway(f);}
462 break;
463 case WindowUpdateFrame.TYPE:
464 { WindowUpdateFrame f = (WindowUpdateFrame)frame;
465 handleWindowUpdate(f);}
466 break;
467 default:
468 protocolError(ErrorFrame.PROTOCOL_ERROR);
469 }
470 }
471
472 void resetStream(int streamid, int code) throws IOException, InterruptedException {
473 Log.logError("Resetting stream %d with error code %d", streamid, code);
474 ResetFrame frame = new ResetFrame();
475 frame.streamid(streamid);
476 frame.setErrorCode(code);
477 sendFrame(frame);
478 streams.remove(streamid);
479 }
480
481 private void handleWindowUpdate(WindowUpdateFrame f) throws IOException, InterruptedException {
482 updateSendWindow(f.getUpdate());
483 }
484
485 private void protocolError(int errorCode) throws IOException, InterruptedException {
486 GoAwayFrame frame = new GoAwayFrame();
487 frame.setErrorCode(errorCode);
488 sendFrame(frame);
489 String msg = "Error code: " + errorCode;
490 shutdown(new IOException("protocol error"));
491 }
492
493 private void handleSettings(SettingsFrame frame) throws IOException, InterruptedException {
494 if (frame.getFlag(SettingsFrame.ACK)) {
495 // ignore ack frames for now.
496 return;
497 }
498 serverSettings = frame;
499 SettingsFrame ack = getAckFrame(frame.streamid());
500 sendFrame(ack);
501 }
502
503 private void handlePing(PingFrame frame) throws IOException, InterruptedException {
504 frame.setFlag(PingFrame.ACK);
505 sendFrame(frame);
506 }
507
508 private void handleGoAway(GoAwayFrame frame) throws IOException, InterruptedException {
509 System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
510 shutdown(new IOException("GOAWAY received"));
511 }
512
513 private void initCommon() {
514 clientSettings = client2.getClientSettings();
515
516 // serverSettings will be updated by server
517 serverSettings = SettingsFrame.getDefaultSettings();
518 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
519 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
520 }
521
522 /**
523 * Max frame size we are allowed to send
524 */
525 public int getMaxSendFrameSize() {
526 int param = serverSettings.getParameter(MAX_FRAME_SIZE);
527 if (param == -1) {
528 param = DEFAULT_FRAME_SIZE;
529 }
530 return param;
531 }
532
533 /**
534 * Max frame size we will receive
535 */
536 public int getMaxReceiveFrameSize() {
537 return clientSettings.getParameter(MAX_FRAME_SIZE);
538 }
539
540 // Not sure how useful this is.
541 public int getMaxHeadersSize() {
542 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
543 }
544
545 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
546
547 private static final byte[] PREFACE_BYTES =
548 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
549
550 /**
551 * Sends Connection preface and Settings frame with current preferred
552 * values
553 */
554 private void sendConnectionPreface() throws IOException {
555 SettingsFrame sf = client2.getClientSettings();
556 ByteBufferGenerator bg = new ByteBufferGenerator(this);
557 bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
558 sf.writeOutgoing(bg);
559 Log.logFrames(sf, "OUT");
560 WindowUpdateFrame wup = new WindowUpdateFrame();
561 wup.streamid(0);
562 // send a Window update for the receive buffer we are using
563 // minus the initial 64 K specified in protocol
564 wup.setUpdate(client2.client.getReceiveBufferSize() - (64 * 1024 - 1));
565 wup.setLength();
566 wup.writeOutgoing(bg);
567 Log.logFrames(wup, "OUT");
568 ByteBuffer[] ba = bg.getBufferArray();
569 connection.write(ba, 0, ba.length);
570 }
571
572 /**
573 * Returns an existing Stream with given id, or null if doesn't exist
574 */
575 Stream getStream(int streamid) {
576 return streams.get(streamid);
577 }
578
579 /**
580 * Creates Stream with given id.
581 */
582 Stream createStream(Exchange exchange) {
583 Stream stream = new Stream(client, this, exchange);
584 return stream;
585 }
586
587 Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
588 Stream.PushGroup<?> pg = parent.request.pushGroup();
589 return new Stream.PushedStream(pg, client, this, parent, pushReq);
590 }
591
592 void putStream(Stream stream, int streamid) {
593 streams.put(streamid, stream);
594 }
595
596 void deleteStream(Stream stream) {
597 streams.remove(stream.streamid);
598 }
599
600 static final int MAX_STREAM = Integer.MAX_VALUE - 2;
601
602 // Number of header bytes in a Headers Frame
603 final static int HEADERS_HEADER_SIZE = 15;
604
605 // Number of header bytes in a Continuation frame
606 final static int CONTIN_HEADER_SIZE = 9;
607
608 /**
609 * Encode the headers into a List<ByteBuffer> and then create HEADERS
610 * and CONTINUATION frames from the list and return the List<Http2Frame>.
611 *
612 * @param frame
613 * @return
614 */
615 private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
616 LinkedList<ByteBuffer> buffers = new LinkedList<>();
617 ByteBuffer buf = getBuffer();
618 buffers.add(buf);
619 encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
620 encodeHeadersImpl(frame.getUserHeaders(), buffers);
621 encodeHeadersImpl(frame.getSystemHeaders(), buffers);
622
623 for (ByteBuffer b : buffers) {
624 b.flip();
625 }
626
627 LinkedList<Http2Frame> frames = new LinkedList<>();
628 int maxframesize = getMaxSendFrameSize();
629
630 HeadersFrame oframe = new HeadersFrame();
631 oframe.setFlags(frame.getFlags());
632 oframe.streamid(frame.streamid());
633
634 oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
635 frames.add(oframe);
636 // Any buffers left?
637 boolean done = buffers.isEmpty();
638 if (done) {
639 oframe.setFlag(HeaderFrame.END_HEADERS);
640 } else {
641 ContinuationFrame cf = null;
642 while (!done) {
643 cf = new ContinuationFrame();
644 cf.streamid(frame.streamid());
645 cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
646 frames.add(cf);
647 done = buffers.isEmpty();
648 }
649 cf.setFlag(HeaderFrame.END_HEADERS);
650 }
651 return frames;
652 }
653
654 // should always return at least one buffer
655 private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
656 assert maxsize >= BUFSIZE;
657 LinkedList<ByteBuffer> newlist = new LinkedList<>();
658 int size = list.size();
659 int nbytes = 0;
660 for (int i=0; i<size; i++) {
661 ByteBuffer buf = list.getFirst();
662 if (nbytes + buf.remaining() <= maxsize) {
663 nbytes += buf.remaining();
664 newlist.add(buf);
665 list.remove();
666 } else {
667 break;
668 }
669 }
670 return newlist.toArray(empty);
671 }
672
673 /**
674 * Encode all the headers from the given HttpHeadersImpl into the given List.
675 */
676 private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
677 ByteBuffer buffer;
678 if (!(buffer=buffers.getLast()).hasRemaining()) {
679 buffer = getBuffer();
680 buffers.add(buffer);
681 }
682 Map<String,List<String>> map = hdrs.map();
683 Set<String> keys = map.keySet();
684 for (String key : keys) {
685 String lkey = key.toLowerCase();
686 List<String> values = map.get(key);
687 for (String value : values) {
688 boolean encoded = false;
689 do {
690 encoded = hpackOut.header(lkey, value).encode(buffer);
691 if (!encoded) {
692 buffer = getBuffer();
693 buffers.add(buffer);
694 }
695 } while(!encoded);
696 }
697 }
698 }
699
700 public void sendFrame(Http2Frame frame) throws IOException, InterruptedException {
701 outputQ.put(frame);
702 }
703
704 public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
705 synchronized (outputQ) {
706 for (Http2Frame frame : frames)
707 outputQ.put(frame);
708 }
709 }
710
711 static Throwable getExceptionFrom(CompletableFuture<?> cf) {
712 try {
713 cf.get();
714 return null;
715 } catch (Throwable e) {
716 if (e.getCause() != null)
717 return e.getCause();
718 else
719 return e;
720 }
721 }
722
723
724 void execute(Runnable r) {
725 executor.execute(r, null);
726 }
727
728
729 /**
730 * Take frames off outputQ and send them. Runs
731 */
732 private void sendLoop() {
733 try {
734 while (!closed) {
735 Http2Frame frame = outputQ.take();
736 if (frame instanceof OutgoingHeaders) {
737 OutgoingHeaders oh = (OutgoingHeaders)frame;
738 Stream stream = oh.getStream();
739 stream.registerStream(nextstreamid);
740 oh.streamid(nextstreamid);
741 nextstreamid += 2;
742 // set outgoing window here. This allows thread sending
743 // body to proceed.
744 stream.updateOutgoingWindow(getInitialSendWindowSize());
745 LinkedList<Http2Frame> frames = encodeHeaders(oh);
746 for (Http2Frame f : frames) {
747 sendOneFrame(f);
748 }
749 } else {
750 sendOneFrame(frame);
751 }
752 }
753 } catch (IOException e) {
754 if (!closed) {
755 Log.logError(e);
756 shutdown(e);
757 }
758 }
759 }
760
761 private void sendOneFrame(Http2Frame frame) throws IOException {
762 ByteBufferGenerator bbg = new ByteBufferGenerator(this);
763 frame.setLength();
764 Log.logFrames(frame, "OUT");
765 frame.writeOutgoing(bbg);
766 ByteBuffer[] bufs = bbg.getBufferArray();
767 connection.write(bufs, 0, bufs.length);
768 }
769
770 private SettingsFrame getAckFrame(int streamid) {
771 SettingsFrame frame = new SettingsFrame();
772 frame.setFlag(SettingsFrame.ACK);
773 frame.streamid(streamid);
774 return frame;
775 }
776
777 static class HeaderDecoder implements DecodingCallback {
778 HttpHeadersImpl headers;
779
780 HeaderDecoder() {
781 this.headers = new HttpHeadersImpl();
782 }
783
784 @Override
785 public void onDecoded(CharSequence name, CharSequence value) {
786 headers.addHeader(name.toString(), value.toString());
787 }
788
789 HttpHeadersImpl headers() {
790 return headers;
791 }
792 }
793 }
< prev index next >