14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package java.net.http;
27
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.URI;
31 import java.net.http.HttpConnection.Mode;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.StandardCharsets;
34 import java.util.Collection;
35 import java.util.HashMap;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.CompletableFuture;
40 import sun.net.httpclient.hpack.Encoder;
41 import sun.net.httpclient.hpack.Decoder;
42 import static java.net.http.SettingsFrame.*;
43 import static java.net.http.Utils.BUFSIZE;
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.Formatter;
47 import java.util.stream.Collectors;
48 import sun.net.httpclient.hpack.DecodingCallback;
49
50 /**
51 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
52 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
53 *
54 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
61 *
62 * Sending is done by writing directly to underlying HttpConnection object which
63 * is operating in async mode. No flow control applies on output at this level
64 * and all writes are just executed as puts to an output Q belonging to HttpConnection
65 * Flow control is implemented by HTTP/2 protocol itself.
66 *
67 * Hpack header compression
68 * and outgoing stream creation is also done here, because these operations
69 * must be synchronized at the socket level. Stream objects send frames simply
70 * by placing them on the connection's output Queue. sendFrame() is called
71 * from a higher level (Stream) thread.
72 *
73 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
74 * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
75 * or handles them directly itself. This thread performs hpack decompression
76 * and incoming stream creation (Server push). Incoming frames destined for a
77 * stream are provided by calling Stream.incoming().
78 */
79 class Http2Connection implements BufferHandler {
80
81 final Queue<Http2Frame> outputQ;
82 volatile boolean closed;
83
84 //-------------------------------------
85 final HttpConnection connection;
86 HttpClientImpl client;
87 final Http2ClientImpl client2;
88 Map<Integer,Stream> streams;
89 int nextstreamid = 3; // stream 1 is registered separately
90 int nextPushStream = 2;
91 Encoder hpackOut;
92 Decoder hpackIn;
93 SettingsFrame clientSettings, serverSettings;
94 ByteBufferConsumer bbc;
95 final LinkedList<ByteBuffer> freeList;
96 final String key; // for HttpClientImpl.connections map
97 FrameReader reader;
98
99 // Connection level flow control windows
100 int sendWindow = INITIAL_WINDOW_SIZE;
101
102 final static int DEFAULT_FRAME_SIZE = 16 * 1024;
103 private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
104
105 final ExecutorWrapper executor;
106
107 /**
108 * This is established by the protocol spec and the peer will update it with
109 * WINDOW_UPDATEs, which affects the sendWindow.
110 */
111 final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
112
113 // TODO: need list of control frames from other threads
114 // that need to be sent
115
116 /**
117 * Case 1) Create from upgraded HTTP/1.1 connection.
118 * Is ready to use. Will not be SSL. exchange is the Exchange
119 * that initiated the connection, whose response will be delivered
120 * on a Stream.
121 */
122 Http2Connection(HttpConnection connection, Http2ClientImpl client2,
123 Exchange exchange) throws IOException, InterruptedException {
124 this.outputQ = new Queue<>();
125 String msg = "Connection send window size " + Integer.toString(sendWindow);
126 Log.logTrace(msg);
127
128 //this.initialExchange = exchange;
129 assert !(connection instanceof SSLConnection);
130 this.connection = connection;
131 this.client = client2.client();
132 this.client2 = client2;
133 this.executor = client.executorWrapper();
134 this.freeList = new LinkedList<>();
135 this.key = keyFor(connection);
136 streams = Collections.synchronizedMap(new HashMap<>());
137 initCommon();
138 //sendConnectionPreface();
139 Stream initialStream = createStream(exchange);
140 initialStream.registerStream(1);
141 initialStream.requestSent();
142 sendConnectionPreface();
143 connection.configureMode(Mode.ASYNC);
144 // start reading and writing
145 // start reading
146 AsyncConnection asyncConn = (AsyncConnection)connection;
147 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
148 asyncReceive(connection.getRemaining());
149 asyncConn.startReading();
150 }
151
152 // async style but completes immediately
153 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
154 Http2ClientImpl client2, Exchange exchange) {
155 CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
156 try {
157 Http2Connection c = new Http2Connection(connection, client2, exchange);
158 cf.complete(c);
159 } catch (IOException | InterruptedException e) {
160 cf.completeExceptionally(e);
161 }
162 return cf;
163 }
164
165 /**
166 * Cases 2) 3)
167 *
168 * request is request to be sent.
169 */
170 Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
171 InetSocketAddress proxy = request.proxy();
172 URI uri = request.uri();
173 InetSocketAddress addr = Utils.getAddress(request);
174 String msg = "Connection send window size " + Integer.toString(sendWindow);
175 Log.logTrace(msg);
176 this.key = keyFor(uri, proxy);
177 this.connection = HttpConnection.getConnection(addr, request, this);
178 streams = Collections.synchronizedMap(new HashMap<>());
179 this.client = request.client();
180 this.client2 = client.client2();
181 this.executor = client.executorWrapper();
182 this.freeList = new LinkedList<>();
183 this.outputQ = new Queue<>();
184 nextstreamid = 1;
185 initCommon();
186 connection.connect();
187 connection.configureMode(Mode.ASYNC);
188 // start reading
189 AsyncConnection asyncConn = (AsyncConnection)connection;
190 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
191 sendConnectionPreface();
192 asyncConn.startReading();
193 }
194
195 // NEW
196 synchronized void obtainSendWindow(int amount) throws InterruptedException {
197 while (amount > 0) {
198 int n = Math.min(amount, sendWindow);
199 sendWindow -= n;
200 amount -= n;
201 if (amount > 0)
202 wait();
203 }
204 }
205
206 synchronized void updateSendWindow(int amount) {
207 if (sendWindow == 0) {
208 sendWindow += amount;
209 notifyAll();
210 } else
211 sendWindow += amount;
212 }
213
214 synchronized int sendWindow() {
215 return sendWindow;
216 }
217
218 static String keyFor(HttpConnection connection) {
219 boolean isProxy = connection.isProxied();
220 boolean isSecure = connection.isSecure();
221 InetSocketAddress addr = connection.address();
222
223 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
224 }
225
226 static String keyFor(URI uri, InetSocketAddress proxy) {
227 boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
228 boolean isProxy = proxy != null;
229
230 String host;
231 int port;
232
233 if (isProxy) {
234 host = proxy.getHostString();
235 port = proxy.getPort();
449 handleWindowUpdate(f);}
450 break;
451 default:
452 protocolError(ErrorFrame.PROTOCOL_ERROR);
453 }
454 }
455
456 void resetStream(int streamid, int code) throws IOException, InterruptedException {
457 Log.logError(
458 "Resetting stream {0,number,integer} with error code {1,number,integer}",
459 streamid, code);
460 ResetFrame frame = new ResetFrame();
461 frame.streamid(streamid);
462 frame.setErrorCode(code);
463 sendFrame(frame);
464 streams.remove(streamid);
465 }
466
467 private void handleWindowUpdate(WindowUpdateFrame f)
468 throws IOException, InterruptedException {
469 updateSendWindow(f.getUpdate());
470 }
471
472 private void protocolError(int errorCode)
473 throws IOException, InterruptedException {
474 GoAwayFrame frame = new GoAwayFrame();
475 frame.setErrorCode(errorCode);
476 sendFrame(frame);
477 String msg = "Error code: " + errorCode;
478 shutdown(new IOException("protocol error"));
479 }
480
481 private void handleSettings(SettingsFrame frame)
482 throws IOException, InterruptedException {
483 if (frame.getFlag(SettingsFrame.ACK)) {
484 // ignore ack frames for now.
485 return;
486 }
487 serverSettings = frame;
488 SettingsFrame ack = getAckFrame(frame.streamid());
489 sendFrame(ack);
490 }
491
492 private void handlePing(PingFrame frame)
493 throws IOException, InterruptedException {
494 frame.setFlag(PingFrame.ACK);
495 sendFrame(frame);
496 }
497
498 private void handleGoAway(GoAwayFrame frame)
499 throws IOException, InterruptedException {
500 //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
501 shutdown(new IOException("GOAWAY received"));
502 }
503
504 private void initCommon() {
505 clientSettings = client2.getClientSettings();
506
507 // serverSettings will be updated by server
508 serverSettings = SettingsFrame.getDefaultSettings();
509 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
510 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
511 }
512
513 /**
514 * Max frame size we are allowed to send
515 */
516 public int getMaxSendFrameSize() {
517 int param = serverSettings.getParameter(MAX_FRAME_SIZE);
518 if (param == -1) {
519 param = DEFAULT_FRAME_SIZE;
520 }
521 return param;
522 }
523
524 /**
525 * Max frame size we will receive
526 */
527 public int getMaxReceiveFrameSize() {
528 return clientSettings.getParameter(MAX_FRAME_SIZE);
529 }
530
531 // Not sure how useful this is.
532 public int getMaxHeadersSize() {
533 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
534 }
535
536 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
537
538 private static final byte[] PREFACE_BYTES =
539 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
540
541 /**
542 * Sends Connection preface and Settings frame with current preferred
543 * values
544 */
545 private void sendConnectionPreface() throws IOException {
546 ByteBufferGenerator bg = new ByteBufferGenerator(this);
547 bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
548 ByteBuffer[] ba = bg.getBufferArray();
549 connection.write(ba, 0, ba.length);
550
551 bg = new ByteBufferGenerator(this);
552 SettingsFrame sf = client2.getClientSettings();
553 Log.logFrames(sf, "OUT");
554 sf.writeOutgoing(bg);
555 WindowUpdateFrame wup = new WindowUpdateFrame();
556 wup.streamid(0);
557 // send a Window update for the receive buffer we are using
558 // minus the initial 64 K specified in protocol
559 wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
560 wup.computeLength();
561 wup.writeOutgoing(bg);
562 Log.logFrames(wup, "OUT");
563 ba = bg.getBufferArray();
564 connection.write(ba, 0, ba.length);
565 }
566
567 /**
568 * Returns an existing Stream with given id, or null if doesn't exist
569 */
570 Stream getStream(int streamid) {
571 return streams.get(streamid);
572 }
573
574 /**
575 * Creates Stream with given id.
576 */
577 Stream createStream(Exchange exchange) {
578 Stream stream = new Stream(client, this, exchange);
579 return stream;
580 }
581
582 Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
583 Stream.PushGroup<?> pg = parent.request.pushGroup();
584 return new Stream.PushedStream(pg, client, this, parent, pushReq);
675 buffers.add(buffer);
676 }
677 for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
678 String key = e.getKey();
679 String lkey = key.toLowerCase();
680 List<String> values = e.getValue();
681 for (String value : values) {
682 hpackOut.header(lkey, value);
683 boolean encoded = false;
684 do {
685 encoded = hpackOut.encode(buffer);
686 if (!encoded) {
687 buffer = getBuffer();
688 buffers.add(buffer);
689 }
690 } while (!encoded);
691 }
692 }
693 }
694
695 public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
696 for (Http2Frame frame : frames) {
697 sendFrame(frame);
698 }
699 }
700
701 static Throwable getExceptionFrom(CompletableFuture<?> cf) {
702 try {
703 cf.get();
704 return null;
705 } catch (Throwable e) {
706 if (e.getCause() != null)
707 return e.getCause();
708 else
709 return e;
710 }
711 }
712
713
714 void execute(Runnable r) {
715 executor.execute(r, null);
716 }
717
718 private final Object sendlock = new Object();
719
720 /**
721 *
722 */
723 void sendFrame(Http2Frame frame) {
724 synchronized (sendlock) {
725 try {
726 if (frame instanceof OutgoingHeaders) {
727 OutgoingHeaders oh = (OutgoingHeaders) frame;
728 Stream stream = oh.getStream();
729 stream.registerStream(nextstreamid);
730 oh.streamid(nextstreamid);
731 nextstreamid += 2;
732 // set outgoing window here. This allows thread sending
733 // body to proceed.
734 stream.updateOutgoingWindow(getInitialSendWindowSize());
735 LinkedList<Http2Frame> frames = encodeHeaders(oh);
736 for (Http2Frame f : frames) {
737 sendOneFrame(f);
738 }
739 } else {
740 sendOneFrame(frame);
741 }
742
743 } catch (IOException e) {
744 if (!closed) {
745 Log.logError(e);
746 shutdown(e);
747 }
748 }
749 }
750 }
751
752 /**
753 * Send a frame.
754 *
755 * @param frame
756 * @throws IOException
757 */
758 private void sendOneFrame(Http2Frame frame) throws IOException {
759 ByteBufferGenerator bbg = new ByteBufferGenerator(this);
760 frame.computeLength();
761 Log.logFrames(frame, "OUT");
762 frame.writeOutgoing(bbg);
763 ByteBuffer[] currentBufs = bbg.getBufferArray();
764 connection.write(currentBufs, 0, currentBufs.length);
765 }
766
767
768 private SettingsFrame getAckFrame(int streamid) {
769 SettingsFrame frame = new SettingsFrame();
770 frame.setFlag(SettingsFrame.ACK);
771 frame.streamid(streamid);
772 return frame;
773 }
774
775 static class HeaderDecoder implements DecodingCallback {
776 HttpHeadersImpl headers;
777
778 HeaderDecoder() {
779 this.headers = new HttpHeadersImpl();
780 }
781
782 @Override
783 public void onDecoded(CharSequence name, CharSequence value) {
784 headers.addHeader(name.toString(), value.toString());
785 }
786
|
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package java.net.http;
27
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.URI;
31 import java.net.http.HttpConnection.Mode;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.StandardCharsets;
34 import java.util.HashMap;
35 import java.util.LinkedList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.CompletableFuture;
39 import sun.net.httpclient.hpack.Encoder;
40 import sun.net.httpclient.hpack.Decoder;
41 import static java.net.http.SettingsFrame.*;
42 import static java.net.http.Utils.BUFSIZE;
43 import java.util.ArrayList;
44 import java.util.Collections;
45 import java.util.Formatter;
46 import java.util.stream.Collectors;
47 import sun.net.httpclient.hpack.DecodingCallback;
48
49 /**
50 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
51 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
52 *
53 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
60 *
61 * Sending is done by writing directly to underlying HttpConnection object which
62 * is operating in async mode. No flow control applies on output at this level
63 * and all writes are just executed as puts to an output Q belonging to HttpConnection
64 * Flow control is implemented by HTTP/2 protocol itself.
65 *
66 * Hpack header compression
67 * and outgoing stream creation is also done here, because these operations
68 * must be synchronized at the socket level. Stream objects send frames simply
69 * by placing them on the connection's output Queue. sendFrame() is called
70 * from a higher level (Stream) thread.
71 *
72 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
73 * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
74 * or handles them directly itself. This thread performs hpack decompression
75 * and incoming stream creation (Server push). Incoming frames destined for a
76 * stream are provided by calling Stream.incoming().
77 */
78 class Http2Connection implements BufferHandler {
79
80 volatile boolean closed;
81
82 //-------------------------------------
83 final HttpConnection connection;
84 final AsyncConnection connectionAsync;
85 HttpClientImpl client;
86 final Http2ClientImpl client2;
87 Map<Integer,Stream> streams;
88 int nextstreamid = 3; // stream 1 is registered separately
89 int nextPushStream = 2;
90 Encoder hpackOut;
91 Decoder hpackIn;
92 SettingsFrame clientSettings, serverSettings;
93 final LinkedList<ByteBuffer> freeList;
94 final String key; // for HttpClientImpl.connections map
95 FrameReader reader;
96
97 // Connection level flow control windows
98 final WindowControl connectionSendWindow = new WindowControl(INITIAL_WINDOW_SIZE);
99
100 final static int DEFAULT_FRAME_SIZE = 16 * 1024;
101 private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
102
103 final ExecutorWrapper executor;
104
105 WindowUpdateSender windowUpdater;
106 /**
107 * This is established by the protocol spec and the peer will update it with
108 * WINDOW_UPDATEs, which affects the connectionSendWindow.
109 */
110 final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
111
112 // TODO: need list of control frames from other threads
113 // that need to be sent
114
115 /**
116 * Case 1) Create from upgraded HTTP/1.1 connection.
117 * Is ready to use. Will not be SSL. exchange is the Exchange
118 * that initiated the connection, whose response will be delivered
119 * on a Stream.
120 */
121 Http2Connection(HttpConnection connection, Http2ClientImpl client2,
122 Exchange exchange) throws IOException, InterruptedException {
123 String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
124 Log.logTrace(msg);
125
126 //this.initialExchange = exchange;
127 assert !(connection instanceof SSLConnection);
128 this.connection = connection;
129 this.connectionAsync = (AsyncConnection)connection;
130 this.client = client2.client();
131 this.client2 = client2;
132 this.executor = client.executorWrapper();
133 this.freeList = new LinkedList<>();
134 this.key = keyFor(connection);
135 streams = Collections.synchronizedMap(new HashMap<>());
136 initCommon();
137 //sendConnectionPreface();
138 Stream initialStream = createStream(exchange);
139 initialStream.registerStream(1);
140 initialStream.requestSent();
141 sendConnectionPreface();
142 // start reading and writing
143 // start reading
144 connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
145 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
146 asyncReceive(connection.getRemaining());
147 connectionAsync.startReading();
148 }
149
150 // async style but completes immediately
151 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
152 Http2ClientImpl client2, Exchange exchange) {
153 CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
154 try {
155 Http2Connection c = new Http2Connection(connection, client2, exchange);
156 cf.complete(c);
157 } catch (IOException | InterruptedException e) {
158 cf.completeExceptionally(e);
159 }
160 return cf;
161 }
162
163 /**
164 * Cases 2) 3)
165 *
166 * request is request to be sent.
167 */
168 Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
169 InetSocketAddress proxy = request.proxy();
170 URI uri = request.uri();
171 InetSocketAddress addr = Utils.getAddress(request);
172 String msg = "Connection send window size " + Integer.toString(connectionSendWindow.available());
173 Log.logTrace(msg);
174 this.key = keyFor(uri, proxy);
175 this.connection = HttpConnection.getConnection(addr, request, this);
176 this.connectionAsync = (AsyncConnection)connection;
177 streams = Collections.synchronizedMap(new HashMap<>());
178 this.client = request.client();
179 this.client2 = client.client2();
180 this.executor = client.executorWrapper();
181 this.freeList = new LinkedList<>();
182 nextstreamid = 1;
183 initCommon();
184 connection.connect();
185 sendConnectionPreface();
186 // start reading
187 connectionAsync.setAsyncCallbacks(this::asyncReceive, this::shutdown);
188 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
189
190 connectionAsync.startReading();
191 }
192
193 static String keyFor(HttpConnection connection) {
194 boolean isProxy = connection.isProxied();
195 boolean isSecure = connection.isSecure();
196 InetSocketAddress addr = connection.address();
197
198 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
199 }
200
201 static String keyFor(URI uri, InetSocketAddress proxy) {
202 boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
203 boolean isProxy = proxy != null;
204
205 String host;
206 int port;
207
208 if (isProxy) {
209 host = proxy.getHostString();
210 port = proxy.getPort();
424 handleWindowUpdate(f);}
425 break;
426 default:
427 protocolError(ErrorFrame.PROTOCOL_ERROR);
428 }
429 }
430
431 void resetStream(int streamid, int code) throws IOException, InterruptedException {
432 Log.logError(
433 "Resetting stream {0,number,integer} with error code {1,number,integer}",
434 streamid, code);
435 ResetFrame frame = new ResetFrame();
436 frame.streamid(streamid);
437 frame.setErrorCode(code);
438 sendFrame(frame);
439 streams.remove(streamid);
440 }
441
442 private void handleWindowUpdate(WindowUpdateFrame f)
443 throws IOException, InterruptedException {
444 connectionSendWindow.update(f.getUpdate());
445 }
446
447 private void protocolError(int errorCode)
448 throws IOException, InterruptedException {
449 GoAwayFrame frame = new GoAwayFrame();
450 frame.setErrorCode(errorCode);
451 sendFrame(frame);
452 String msg = "Error code: " + errorCode;
453 shutdown(new IOException("protocol error"));
454 }
455
456 private void handleSettings(SettingsFrame frame)
457 throws IOException, InterruptedException {
458 if (frame.getFlag(SettingsFrame.ACK)) {
459 // ignore ack frames for now.
460 return;
461 }
462 serverSettings = frame;
463 SettingsFrame ack = getAckFrame(frame.streamid());
464 sendFrame(ack);
465 }
466
467 private void handlePing(PingFrame frame)
468 throws IOException, InterruptedException {
469 frame.setFlag(PingFrame.ACK);
470 sendUnorderedFrame(frame);
471 }
472
473 private void handleGoAway(GoAwayFrame frame)
474 throws IOException, InterruptedException {
475 //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
476 shutdown(new IOException("GOAWAY received"));
477 }
478
479 private void initCommon() {
480 clientSettings = client2.getClientSettings();
481
482 // serverSettings will be updated by server
483 serverSettings = SettingsFrame.getDefaultSettings();
484 hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
485 hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
486
487 windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) {
488 @Override
489 int getStreamId() {
490 return 0;
491 }
492 };
493 }
494
495 /**
496 * Max frame size we are allowed to send
497 */
498 public int getMaxSendFrameSize() {
499 int param = serverSettings.getParameter(MAX_FRAME_SIZE);
500 if (param == -1) {
501 param = DEFAULT_FRAME_SIZE;
502 }
503 return param;
504 }
505
506 /**
507 * Max frame size we will receive
508 */
509 public int getMaxReceiveFrameSize() {
510 return clientSettings.getParameter(MAX_FRAME_SIZE);
511 }
512
513 // Not sure how useful this is.
514 public int getMaxHeadersSize() {
515 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
516 }
517
518 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
519
520 private static final byte[] PREFACE_BYTES =
521 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
522
523 /**
524 * Sends Connection preface and Settings frame with current preferred
525 * values
526 */
527 private void sendConnectionPreface() throws IOException {
528 ByteBufferGenerator bg = new ByteBufferGenerator(this);
529 bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
530 SettingsFrame sf = client2.getClientSettings();
531 Log.logFrames(sf, "OUT");
532 sf.writeOutgoing(bg);
533 ByteBuffer[] ba = bg.getBufferArray();
534 connection.write(ba, 0, ba.length); // write is performed before switch to async mode
535 // send a Window update for the receive buffer we are using
536 // minus the initial 64 K specified in protocol
537 windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
538 }
539
540 /**
541 * Returns an existing Stream with given id, or null if doesn't exist
542 */
543 Stream getStream(int streamid) {
544 return streams.get(streamid);
545 }
546
547 /**
548 * Creates Stream with given id.
549 */
550 Stream createStream(Exchange exchange) {
551 Stream stream = new Stream(client, this, exchange);
552 return stream;
553 }
554
555 Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
556 Stream.PushGroup<?> pg = parent.request.pushGroup();
557 return new Stream.PushedStream(pg, client, this, parent, pushReq);
648 buffers.add(buffer);
649 }
650 for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
651 String key = e.getKey();
652 String lkey = key.toLowerCase();
653 List<String> values = e.getValue();
654 for (String value : values) {
655 hpackOut.header(lkey, value);
656 boolean encoded = false;
657 do {
658 encoded = hpackOut.encode(buffer);
659 if (!encoded) {
660 buffer = getBuffer();
661 buffers.add(buffer);
662 }
663 } while (!encoded);
664 }
665 }
666 }
667
668 static Throwable getExceptionFrom(CompletableFuture<?> cf) {
669 try {
670 cf.get();
671 return null;
672 } catch (Throwable e) {
673 if (e.getCause() != null)
674 return e.getCause();
675 else
676 return e;
677 }
678 }
679
680
681 void execute(Runnable r) {
682 executor.execute(r, null);
683 }
684
685 private final Object sendlock = new Object();
686
687 /**
688 *
689 */
690 void sendFrame(Http2Frame frame) {
691 synchronized (sendlock) {
692 try {
693 ByteBuffer[] bufs;
694 if (frame instanceof OutgoingHeaders) {
695 OutgoingHeaders oh = (OutgoingHeaders) frame;
696 Stream stream = oh.getStream();
697 stream.registerStream(nextstreamid);
698 oh.streamid(nextstreamid);
699 nextstreamid += 2;
700 // set outgoing window here. This allows thread sending
701 // body to proceed.
702 stream.updateOutgoingWindow(getInitialSendWindowSize());
703 bufs = encodeFrames(encodeHeaders(oh));
704 } else {
705 bufs = encodeFrame(frame);
706 }
707 connectionAsync.writeAsync(bufs);
708
709 } catch (IOException e) {
710 if (!closed) {
711 Log.logError(e);
712 shutdown(e);
713 }
714 return;
715 }
716 }
717 connectionAsync.flushAsync();
718 }
719
720 private ByteBuffer[] encodeFrame(Http2Frame frame) throws IOException {
721 ByteBufferGenerator bbg = new ByteBufferGenerator(this);
722 frame.computeLength();
723 Log.logFrames(frame, "OUT");
724 frame.writeOutgoing(bbg);
725 return bbg.getBufferList().toArray(new ByteBuffer[0]);
726 }
727
728 private ByteBuffer[] encodeFrames(List<Http2Frame> frames) throws IOException {
729 List<ByteBuffer> bufs = new ArrayList<>();
730 for(Http2Frame frame : frames) {
731 ByteBufferGenerator bbg = new ByteBufferGenerator(this);
732 frame.computeLength();
733 Log.logFrames(frame, "OUT");
734 frame.writeOutgoing(bbg);
735 bufs.addAll(bbg.getBufferList());
736 }
737 return bufs.toArray(new ByteBuffer[0]);
738 }
739
740 void sendDataFrame(DataFrame frame) {
741 try {
742 connectionAsync.writeAsync(encodeFrame(frame));
743 connectionAsync.flushAsync();
744 } catch (IOException e) {
745 if (!closed) {
746 Log.logError(e);
747 shutdown(e);
748 }
749 }
750 }
751
752 /*
753 * Direct call of the method bypasses synchronization on "sendlock" and
754 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
755 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
756 */
757 void sendUnorderedFrame(Http2Frame frame){
758 try {
759 connectionAsync.writeAsyncUnordered(encodeFrame(frame));
760 connectionAsync.flushAsync();
761 } catch (IOException e) {
762 if (!closed) {
763 Log.logError(e);
764 shutdown(e);
765 }
766 }
767 }
768
769 private SettingsFrame getAckFrame(int streamid) {
770 SettingsFrame frame = new SettingsFrame();
771 frame.setFlag(SettingsFrame.ACK);
772 frame.streamid(streamid);
773 return frame;
774 }
775
776 static class HeaderDecoder implements DecodingCallback {
777 HttpHeadersImpl headers;
778
779 HeaderDecoder() {
780 this.headers = new HttpHeadersImpl();
781 }
782
783 @Override
784 public void onDecoded(CharSequence name, CharSequence value) {
785 headers.addHeader(name.toString(), value.toString());
786 }
787
|