< prev index next >

test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java

Print this page


   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.BufferedInputStream;
  25 import java.io.BufferedOutputStream;
  26 import java.io.Closeable;
  27 import java.io.IOException;
  28 import java.io.UncheckedIOException;
  29 import java.io.InputStream;
  30 import java.io.OutputStream;
  31 import java.net.Socket;
  32 import java.net.URI;
  33 import java.net.InetAddress;
  34 import javax.net.ssl.*;
  35 import java.net.URISyntaxException;
  36 import java.nio.ByteBuffer;
  37 import java.nio.charset.StandardCharsets;
  38 import java.util.*;

  39 import java.util.concurrent.ExecutorService;

  40 import java.util.function.Consumer;
  41 
  42 import jdk.incubator.http.internal.common.ByteBufferReference;
  43 import jdk.incubator.http.internal.frame.FramesDecoder;
  44 
  45 import jdk.incubator.http.internal.common.BufferHandler;
  46 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  47 import jdk.incubator.http.internal.common.Queue;
  48 import jdk.incubator.http.internal.frame.*;
  49 import jdk.incubator.http.internal.hpack.Decoder;
  50 import jdk.incubator.http.internal.hpack.DecodingCallback;
  51 import jdk.incubator.http.internal.hpack.Encoder;


  52 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
  53 
  54 /**
  55  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  56  * or HTTPS opened using "h2" ALPN.
  57  */
  58 public class Http2TestServerConnection {
  59     final Http2TestServer server;
  60     @SuppressWarnings({"rawtypes","unchecked"})
  61     final Map<Integer, Queue> streams; // input q per stream

  62     final HashSet<Integer> pushStreams;
  63     final Queue<Http2Frame> outputQ;
  64     volatile int nextstream;
  65     final Socket socket;

  66     final InputStream is;
  67     final OutputStream os;
  68     volatile Encoder hpackOut;
  69     volatile Decoder hpackIn;
  70     volatile SettingsFrame clientSettings;
  71     final SettingsFrame serverSettings;
  72     final ExecutorService exec;
  73     final boolean secure;
  74     volatile boolean stopping;
  75     volatile int nextPushStreamId = 2;

  76 
  77     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
  78     final static byte[] EMPTY_BARRAY = new byte[0];

  79 
  80     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
  81 
  82     Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException {







































  83         if (socket instanceof SSLSocket) {
  84             handshake(server.serverName(), (SSLSocket)socket);
  85         }
  86         System.err.println("TestServer: New connection from " + socket);
  87         this.server = server;

  88         this.streams = Collections.synchronizedMap(new HashMap<>());
  89         this.outputQ = new Queue<>();


  90         this.socket = socket;

  91         this.serverSettings = SettingsFrame.getDefaultSettings();
  92         this.exec = server.exec;
  93         this.secure = server.secure;
  94         this.pushStreams = new HashSet<>();
  95         is = new BufferedInputStream(socket.getInputStream());
  96         os = new BufferedOutputStream(socket.getOutputStream());
  97     }
  98 






















































  99     private static boolean compareIPAddrs(InetAddress addr1, String host) {
 100         try {
 101             InetAddress addr2 = InetAddress.getByName(host);
 102             return addr1.equals(addr2);
 103         } catch (IOException e) {
 104             throw new UncheckedIOException(e);
 105         }
 106     }
 107 
 108     private static void handshake(String name, SSLSocket sock) throws IOException {
 109         if (name == null) {
 110             // no name set. No need to check
 111             return;
 112         } else if (name.equals("127.0.0.1")) {
 113             name = "localhost";
 114         }
 115         final String fname = name;
 116         final InetAddress addr1 = InetAddress.getByName(name);
 117         SSLParameters params = sock.getSSLParameters();
 118         SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {


 169 
 170         String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
 171         clientSettings = getSettingsFromString(clientSettingsString);
 172 
 173         return upgrade;
 174     }
 175 
 176     /**
 177      * Decodes the given, Client, settings payload provided in base64 HTTP1
 178      * header value.
 179      */
 180     private SettingsFrame getSettingsFromString(String s) throws IOException {
 181         Base64.Decoder decoder = Base64.getUrlDecoder();
 182         byte[] payload = decoder.decode(s);
 183         ByteBuffer bb1 = ByteBuffer.wrap(payload);
 184         // simulate header of Settings Frame
 185         ByteBuffer bb0 = ByteBuffer.wrap(
 186                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
 187         List<Http2Frame> frames = new ArrayList<>();
 188         FramesDecoder reader = new FramesDecoder(frames::add);
 189         reader.decode(ByteBufferReference.of(bb0));
 190         reader.decode(ByteBufferReference.of(bb1));
 191         if (frames.size()!=1)
 192             throw new IOException("Expected 1 frame got "+frames.size()) ;
 193         Http2Frame frame = frames.get(0);
 194         if (!(frame instanceof SettingsFrame))
 195             throw new IOException("Expected SettingsFrame");
 196         return (SettingsFrame)frame;
 197     }
 198 
 199     void run() throws Exception {
 200         String upgrade = null;
 201         if (!secure) {
 202             upgrade = doUpgrade();
 203         } else {
 204             readPreface();
 205             sendSettingsFrame(true);
 206             clientSettings = (SettingsFrame) readFrame();
 207             if (clientSettings.getFlag(SettingsFrame.ACK)) {
 208                 // we received the ack to our frame first
 209                 clientSettings = (SettingsFrame) readFrame();
 210             }
 211             nextstream = 1;
 212         }
 213 
 214         System.out.println("ServerSettings: " + serverSettings);
 215         System.out.println("ClientSettings: " + clientSettings);
 216 
 217         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 218         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 219 
 220         if (!secure) {
 221             createPrimordialStream(upgrade);
 222             nextstream = 3;
 223         }
 224 
 225         exec.submit(this::readLoop);
 226         exec.submit(this::writeLoop);
 227     }
 228 
 229     static class BufferPool implements BufferHandler {
 230 
 231         public void setMinBufferSize(int size) {
 232         }
 233 
 234         @Override
 235         public ByteBuffer getBuffer() {
 236             int size = 32 * 1024;
 237             return ByteBuffer.allocate(size);
 238         }
 239 
 240         @Override
 241         public void returnBuffer(ByteBuffer buffer) {
 242         }
 243     }
 244 
 245     private void writeFrame(Http2Frame frame) throws IOException {
 246         ByteBufferReference[] refs = new FramesEncoder().encodeFrame(frame);
 247         //System.err.println("TestServer: Writing frame " + frame.toString());
 248         int c = 0;
 249         for (ByteBufferReference ref : refs) {
 250             ByteBuffer buf = ref.get();
 251             byte[] ba = buf.array();
 252             int start = buf.arrayOffset() + buf.position();
 253             c += buf.remaining();
 254             os.write(ba, start, buf.remaining());














 255         }
 256         os.flush();
 257         //System.err.printf("TestServer: wrote %d bytes\n", c);
 258     }
 259 
 260     void handleStreamReset(ResetFrame resetFrame) throws IOException {
 261         // TODO: cleanup
 262         throw new IOException("Stream reset");
 263     }
 264 
 265     private void handleCommonFrame(Http2Frame f) throws IOException {
 266         if (f instanceof SettingsFrame) {
 267             SettingsFrame sf = (SettingsFrame) f;
 268             if (sf.getFlag(SettingsFrame.ACK)) // ignore
 269             {
 270                 return;
 271             }
 272             // otherwise acknowledge it
 273             clientSettings = sf;
 274             SettingsFrame frame = new SettingsFrame();
 275             frame.setFlag(SettingsFrame.ACK);
 276             frame.streamid(0);
 277             outputQ.put(frame);
 278             return;
 279         }
 280         //System.err.println("TestServer: Received ---> " + f.toString());
 281         throw new UnsupportedOperationException("Not supported yet.");




 282     }
 283 
 284     void sendWindowUpdates(int len, int streamid) throws IOException {
 285         if (len == 0)
 286             return;
 287         WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
 288         outputQ.put(wup);
 289         wup = new WindowUpdateFrame(0 , len);
 290         outputQ.put(wup);
 291     }
 292 
 293     HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) {
 294         HttpHeadersImpl headers = new HttpHeadersImpl();
 295 
 296         DecodingCallback cb = (name, value) -> {
 297             headers.addHeader(name.toString(), value.toString());
 298         };
 299 
 300         for (HeaderFrame frame : frames) {
 301             ByteBufferReference[] buffers = frame.getHeaderBlock();
 302             for (ByteBufferReference buffer : buffers) {
 303                 hpackIn.decode(buffer.get(), false, cb);
 304             }
 305         }
 306         hpackIn.decode(EMPTY_BUFFER, true, cb);
 307         return headers;
 308     }
 309 
 310     String getRequestLine(String request) {
 311         int eol = request.indexOf(CRLF);
 312         return request.substring(0, eol);
 313     }
 314 
 315     String getHeaders(String request) {
 316         int start = request.indexOf(CRLF);
 317         int end = request.indexOf(CRLFCRLF);
 318         if (start == -1 || end == -1) {
 319             throw new RuntimeException("Malformed request");
 320         }
 321         return request.substring(start,end);
 322     }
 323 


 342         String requestLine = getRequestLine(request);
 343         String[] tokens = requestLine.split(" ");
 344         if (!tokens[2].equals("HTTP/1.1")) {
 345             throw new IOException("bad request line");
 346         }
 347         URI uri = null;
 348         try {
 349             uri = new URI(tokens[1]);
 350         } catch (URISyntaxException e) {
 351             throw new IOException(e);
 352         }
 353         String host = getHeader(request, "Host");
 354         if (host == null) {
 355             throw new IOException("missing Host");
 356         }
 357 
 358         headers.setHeader(":method", tokens[0]);
 359         headers.setHeader(":scheme", "http"); // always in this case
 360         headers.setHeader(":authority", host);
 361         headers.setHeader(":path", uri.getPath());
 362         Queue q = new Queue();
 363         String body = getRequestBody(request);
 364         addHeaders(getHeaders(request), headers);
 365         headers.setHeader("Content-length", Integer.toString(body.length()));
 366 
 367         addRequestBodyToQueue(body, q);
 368         streams.put(1, q);
 369         exec.submit(() -> {
 370             handleRequest(headers, q, 1, true /*complete request has been read*/);
 371         });
 372     }
 373 
 374     // all other streams created here
 375     @SuppressWarnings({"rawtypes","unchecked"})
 376     void createStream(HeaderFrame frame) throws IOException {
 377         List<HeaderFrame> frames = new LinkedList<>();
 378         frames.add(frame);
 379         int streamid = frame.streamid();
 380         if (streamid != nextstream) {
 381             throw new IOException("unexpected stream id");
 382         }


 384 
 385         boolean endStream = false;
 386         if (frame.getFlag(HeaderFrame.END_STREAM)) {
 387             endStream = true;
 388         }
 389 
 390         while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
 391             Http2Frame f = readFrame();
 392             if (!(f instanceof HeaderFrame)) {
 393                 handleCommonFrame(f); // should only be error frames
 394             } else {
 395                 frame = (HeaderFrame) f;
 396                 if (frame.getFlag(HeaderFrame.END_STREAM)) {
 397                     endStream = true;
 398                 }
 399                 frames.add(frame);
 400             }
 401         }
 402         boolean endStreamReceived = endStream;
 403         HttpHeadersImpl headers = decodeHeaders(frames);
 404         Queue q = new Queue();
 405         streams.put(streamid, q);
 406         exec.submit(() -> {
 407             handleRequest(headers, q, streamid, endStreamReceived);
 408         });
 409     }
 410 
 411     // runs in own thread. Handles request from start to finish. Incoming frames
 412     // for this stream/request delivered on Q
 413 
 414     @SuppressWarnings({"rawtypes","unchecked"})
 415     void handleRequest(HttpHeadersImpl headers,
 416                        Queue queue,
 417                        int streamid,
 418                        boolean endStreamReceived)
 419     {
 420         String method = headers.firstValue(":method").orElse("");
 421         //System.out.println("method = " + method);
 422         String path = headers.firstValue(":path").orElse("");
 423         //System.out.println("path = " + path);
 424         String scheme = headers.firstValue(":scheme").orElse("");
 425         //System.out.println("scheme = " + scheme);
 426         String authority = headers.firstValue(":authority").orElse("");
 427         //System.out.println("authority = " + authority);
 428         System.err.printf("TestServer: %s %s\n", method, path);
 429         HttpHeadersImpl rspheaders = new HttpHeadersImpl();
 430         int winsize = clientSettings.getParameter(
 431                         SettingsFrame.INITIAL_WINDOW_SIZE);
 432         //System.err.println ("Stream window size = " + winsize);
 433 
 434         final InputStream bis;
 435         if (endStreamReceived && queue.size() == 0) {
 436             System.err.println("Server: got END_STREAM for stream " + streamid);
 437             bis = NullInputStream.INSTANCE;
 438         } else {
 439             System.err.println("Server: creating input stream for stream " + streamid);
 440             bis = new BodyInputStream(queue, streamid, this);
 441         }
 442         try (bis;
 443              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
 444         {

 445             String us = scheme + "://" + authority + path;
 446             URI uri = new URI(us);
 447             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
 448             Http2TestExchange exchange = new Http2TestExchange(streamid, method,
 449                     headers, rspheaders, uri, bis, getSSLSession(),
 450                     bos, this, pushAllowed);
 451 
 452             // give to user
 453             Http2Handler handler = server.getHandlerFor(uri.getPath());
 454             handler.handle(exchange);
 455 
 456             // everything happens in the exchange from here. Hopefully will
 457             // return though.
 458         } catch (Throwable e) {
 459             System.err.println("TestServer: handleRequest exception: " + e);
 460             e.printStackTrace();
 461         }
 462     }
 463 
 464     private SSLSession getSSLSession() {
 465         if (! (socket instanceof SSLSocket))
 466             return null;
 467         SSLSocket ssl = (SSLSocket)socket;
 468         return ssl.getSession();


 490                         if (q != null) {
 491                             System.err.println("HEADERS frame for existing stream! Error.");
 492                             // TODO: close connection
 493                             continue;
 494                         } else {
 495                             createStream((HeadersFrame) frame);
 496                         }
 497                     } else {
 498                         if (q == null && !pushStreams.contains(stream)) {
 499                             System.err.printf("Non Headers frame received with"+
 500                                 " non existing stream (%d) ", frame.streamid());
 501                             System.err.println(frame);
 502                             continue;
 503                         }
 504                         if (frame.type() == WindowUpdateFrame.TYPE) {
 505                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 506                             synchronized (updaters) {
 507                                 Consumer<Integer> r = updaters.get(stream);
 508                                 r.accept(wup.getUpdate());
 509                             }











 510                         } else {
 511                             q.put(frame);
 512                         }
 513                     }
 514                 }
 515             }
 516         } catch (Throwable e) {
 517             if (!stopping) {
 518                 System.err.println("Http server reader thread shutdown");
 519                 e.printStackTrace();
 520             }
 521             close();
 522         }
 523     }
 524 
 525     ByteBufferReference[] encodeHeaders(HttpHeadersImpl headers) {
 526         List<ByteBuffer> buffers = new LinkedList<>();
 527 
 528         ByteBuffer buf = getBuffer();
 529         boolean encoded;
 530         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
 531             List<String> values = entry.getValue();
 532             String key = entry.getKey().toLowerCase();
 533             for (String value : values) {
 534                 do {
 535                     hpackOut.header(key, value);
 536                     encoded = hpackOut.encode(buf);
 537                     if (!encoded) {
 538                         buf.flip();
 539                         buffers.add(buf);
 540                         buf = getBuffer();
 541                     }
 542                 } while (!encoded);
 543             }
 544         }
 545         buf.flip();
 546         buffers.add(buf);
 547         return ByteBufferReference.toReferences(buffers.toArray(bbarray));
 548     }
 549 
 550     static void closeIgnore(Closeable c) {
 551         try {
 552             c.close();
 553         } catch (IOException e) {}
 554     }
 555 
 556     // Runs in own thread
 557     void writeLoop() {
 558         try {
 559             while (!stopping) {
 560                 Http2Frame frame;
 561                 try {
 562                     frame = outputQ.take();
 563                 } catch(IOException x) {
 564                     if (stopping && x.getCause() instanceof InterruptedException) {
 565                         break;
 566                     } else throw x;
 567                 }


 570                     HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
 571                     writeFrame(hf);
 572                 } else if (frame instanceof OutgoingPushPromise) {
 573                     handlePush((OutgoingPushPromise)frame);
 574                 } else
 575                     writeFrame(frame);
 576             }
 577             System.err.println("TestServer: Connection writer stopping");
 578         } catch (Throwable e) {
 579             e.printStackTrace();
 580             /*close();
 581             if (!stopping) {
 582                 e.printStackTrace();
 583                 System.err.println("TestServer: writeLoop exception: " + e);
 584             }*/
 585         }
 586     }
 587 
 588     private void handlePush(OutgoingPushPromise op) throws IOException {
 589         int promisedStreamid = nextPushStreamId;
 590         PushPromiseFrame pp = new PushPromiseFrame(op.parentStream, HeaderFrame.END_HEADERS, promisedStreamid, encodeHeaders(op.headers), 0);




 591         pushStreams.add(promisedStreamid);
 592         nextPushStreamId += 2;
 593         pp.streamid(op.parentStream);
 594         writeFrame(pp);
 595         final InputStream ii = op.is;
 596         final BodyOutputStream oo = new BodyOutputStream(
 597                 promisedStreamid,
 598                 clientSettings.getParameter(
 599                         SettingsFrame.INITIAL_WINDOW_SIZE), this);

 600         oo.goodToGo();
 601         exec.submit(() -> {
 602             try {
 603                 ResponseHeaders oh = getPushResponse(promisedStreamid);
 604                 outputQ.put(oh);
 605                 ii.transferTo(oo);
 606             } catch (Throwable ex) {
 607                 System.err.printf("TestServer: pushing response error: %s\n",
 608                         ex.toString());
 609             } finally {
 610                 closeIgnore(ii);
 611                 closeIgnore(oo);
 612             }
 613         });
 614 
 615     }
 616 
 617     // returns a minimal response with status 200
 618     // that is the response to the push promise just sent
 619     private ResponseHeaders getPushResponse(int streamid) {


 628     private ByteBuffer getBuffer() {
 629         return ByteBuffer.allocate(8 * 1024);
 630     }
 631 
 632     private Http2Frame readFrame() throws IOException {
 633         byte[] buf = new byte[9];
 634         if (is.readNBytes(buf, 0, 9) != 9)
 635             throw new IOException("readFrame: connection closed");
 636         int len = 0;
 637         for (int i = 0; i < 3; i++) {
 638             int n = buf[i] & 0xff;
 639             //System.err.println("n = " + n);
 640             len = (len << 8) + n;
 641         }
 642         byte[] rest = new byte[len];
 643         int n = is.readNBytes(rest, 0, len);
 644         if (n != len)
 645             throw new IOException("Error reading frame");
 646         List<Http2Frame> frames = new ArrayList<>();
 647         FramesDecoder reader = new FramesDecoder(frames::add);
 648         reader.decode(ByteBufferReference.of(ByteBuffer.wrap(buf)));
 649         reader.decode(ByteBufferReference.of(ByteBuffer.wrap(rest)));
 650         if (frames.size()!=1)
 651             throw new IOException("Expected 1 frame got "+frames.size()) ;
 652 
 653         return frames.get(0);
 654     }
 655 
 656     void sendSettingsFrame() throws IOException {
 657         sendSettingsFrame(false);
 658     }
 659 
 660     void sendSettingsFrame(boolean now) throws IOException {
 661         if (now) {
 662             writeFrame(serverSettings);
 663         } else {
 664             outputQ.put(serverSettings);
 665         }
 666     }
 667 
 668     String readUntil(String end) throws IOException {
 669         int number = end.length();


 704         int start = headers1.indexOf(name);
 705         if (start == -1) {
 706             return null;
 707         }
 708         start += 2;
 709         int end = headers1.indexOf(CRLF, start);
 710         String line = headers.substring(start, end);
 711         start = line.indexOf(':');
 712         if (start == -1) {
 713             return null;
 714         }
 715         return line.substring(start + 1).trim();
 716     }
 717 
 718     final static String CRLF = "\r\n";
 719     final static String CRLFCRLF = "\r\n\r\n";
 720 
 721     String readHttp1Request() throws IOException {
 722         String headers = readUntil(CRLF + CRLF);
 723         int clen = getContentLength(headers);
 724         // read the content.
 725         byte[] buf = new byte[clen];


 726         is.readNBytes(buf, 0, clen);




 727         String body = new String(buf, StandardCharsets.US_ASCII);
 728         return headers + body;
 729     }
 730 






 731     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
 732         StringBuilder sb = new StringBuilder();
 733         sb.append("HTTP/1.1 ")
 734                 .append(code)
 735                 .append(' ')
 736                 .append(msg)
 737                 .append(CRLF);
 738         int numheaders = headers.length;
 739         for (int i = 0; i < numheaders; i += 2) {
 740             sb.append(headers[i])
 741                     .append(": ")
 742                     .append(headers[i + 1])
 743                     .append(CRLF);
 744         }
 745         sb.append(CRLF);
 746         String s = sb.toString();
 747         os.write(s.getBytes("US-ASCII"));
 748         os.flush();
 749     }
 750 


 754     }
 755 
 756     final static ByteBuffer[] bbarray = new ByteBuffer[0];
 757 
 758     // wrapper around a BlockingQueue that throws an exception when it's closed
 759     // Each stream has one of these
 760 
 761     String getRequestBody(String request) {
 762         int bodystart = request.indexOf(CRLF+CRLF);
 763         String body;
 764         if (bodystart == -1)
 765             body = "";
 766         else
 767             body = request.substring(bodystart+4);
 768         return body;
 769     }
 770 
 771     @SuppressWarnings({"rawtypes","unchecked"})
 772     void addRequestBodyToQueue(String body, Queue q) throws IOException {
 773         ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
 774         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, ByteBufferReference.of(buf));
 775         // only used for primordial stream
 776         q.put(df);
 777     }
 778 
 779     // window updates done in main reader thread because they may
 780     // be used to unblock BodyOutputStreams waiting for WUPs
 781 
 782     HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
 783 
 784     void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
 785         synchronized(updaters) {
 786             updaters.put(streamid, r);
 787         }
 788     }
 789 
 790     int sendWindow = 64 * 1024 - 1; // connection level send window
 791 
 792     /**
 793      * BodyOutputStreams call this to get the connection window first.
 794      *


   1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.BufferedInputStream;
  25 import java.io.BufferedOutputStream;
  26 import java.io.Closeable;
  27 import java.io.IOException;
  28 import java.io.UncheckedIOException;
  29 import java.io.InputStream;
  30 import java.io.OutputStream;
  31 import java.net.Socket;
  32 import java.net.URI;
  33 import java.net.InetAddress;
  34 import javax.net.ssl.*;
  35 import java.net.URISyntaxException;
  36 import java.nio.ByteBuffer;
  37 import java.nio.charset.StandardCharsets;
  38 import java.util.*;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.ExecutorService;
  41 import java.util.concurrent.ConcurrentLinkedQueue;
  42 import java.util.function.Consumer;





  43 import jdk.incubator.http.internal.common.HttpHeadersImpl;

  44 import jdk.incubator.http.internal.frame.*;
  45 import jdk.incubator.http.internal.hpack.Decoder;
  46 import jdk.incubator.http.internal.hpack.DecodingCallback;
  47 import jdk.incubator.http.internal.hpack.Encoder;
  48 import sun.net.www.http.ChunkedInputStream;
  49 import sun.net.www.http.HttpClient;
  50 import static jdk.incubator.http.internal.frame.SettingsFrame.HEADER_TABLE_SIZE;
  51 
  52 /**
  53  * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
  54  * or HTTPS opened using "h2" ALPN.
  55  */
  56 public class Http2TestServerConnection {
  57     final Http2TestServer server;
  58     @SuppressWarnings({"rawtypes","unchecked"})
  59     final Map<Integer, Queue> streams; // input q per stream
  60     final Map<Integer, BodyOutputStream> outStreams; // output q per stream
  61     final HashSet<Integer> pushStreams;
  62     final Queue<Http2Frame> outputQ;
  63     volatile int nextstream;
  64     final Socket socket;
  65     final Http2TestExchangeSupplier exchangeSupplier;
  66     final InputStream is;
  67     final OutputStream os;
  68     volatile Encoder hpackOut;
  69     volatile Decoder hpackIn;
  70     volatile SettingsFrame clientSettings;
  71     final SettingsFrame serverSettings;
  72     final ExecutorService exec;
  73     final boolean secure;
  74     volatile boolean stopping;
  75     volatile int nextPushStreamId = 2;
  76     ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
  77 
  78     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
  79     final static byte[] EMPTY_BARRAY = new byte[0];
  80     final Random random;
  81 
  82     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
  83 
  84     static class Sentinel extends Http2Frame {
  85         Sentinel() { super(-1,-1);}
  86     }
  87 
  88     class PingRequest {
  89         final byte[] pingData;
  90         final long pingStamp;
  91         final CompletableFuture<Long> response;
  92 
  93         PingRequest() {
  94             pingData = new byte[8];
  95             random.nextBytes(pingData);
  96             pingStamp = System.currentTimeMillis();
  97             response = new CompletableFuture<>();
  98         }
  99 
 100         PingFrame frame() {
 101             return new PingFrame(0, pingData);
 102         }
 103 
 104         CompletableFuture<Long> response() {
 105             return response;
 106         }
 107 
 108         void success() {
 109             response.complete(System.currentTimeMillis() - pingStamp);
 110         }
 111 
 112         void fail(Throwable t) {
 113             response.completeExceptionally(t);
 114         }
 115     }
 116 
 117     static Sentinel sentinel;
 118 
 119     Http2TestServerConnection(Http2TestServer server,
 120                               Socket socket,
 121                               Http2TestExchangeSupplier exchangeSupplier)
 122         throws IOException
 123     {
 124         if (socket instanceof SSLSocket) {
 125             handshake(server.serverName(), (SSLSocket)socket);
 126         }
 127         System.err.println("TestServer: New connection from " + socket);
 128         this.server = server;
 129         this.exchangeSupplier = exchangeSupplier;
 130         this.streams = Collections.synchronizedMap(new HashMap<>());
 131         this.outStreams = Collections.synchronizedMap(new HashMap<>());
 132         this.outputQ = new Queue<>(sentinel);
 133         this.random = new Random();
 134         this.socket = socket;
 135         this.socket.setTcpNoDelay(true);
 136         this.serverSettings = SettingsFrame.getDefaultSettings();
 137         this.exec = server.exec;
 138         this.secure = server.secure;
 139         this.pushStreams = new HashSet<>();
 140         is = new BufferedInputStream(socket.getInputStream());
 141         os = new BufferedOutputStream(socket.getOutputStream());
 142     }
 143 
 144     /**
 145      * Sends a PING frame on this connection, and completes the returned
 146      * CF when the PING ack is received. The CF is given
 147      * an integer, whose value is the number of milliseconds
 148      * between PING and ACK.
 149      */
 150     CompletableFuture<Long> sendPing() {
 151         PingRequest ping = null;
 152         try {
 153             ping = new PingRequest();
 154             pings.add(ping);
 155             outputQ.put(ping.frame());
 156         } catch (Throwable t) {
 157             ping.fail(t);
 158         }
 159         return ping.response();
 160     }
 161 
 162     /**
 163      * Returns the first PingRequest from Queue
 164      */
 165     private PingRequest getNextRequest() {
 166         return pings.poll();
 167     }
 168 
 169     /**
 170      * Handles incoming Ping, which could be an ack
 171      * or a client originated Ping
 172      */
 173     void handlePing(PingFrame ping) throws IOException {
 174         if (ping.streamid() != 0) {
 175             System.err.println("Invalid ping received");
 176             close();
 177             return;
 178         }
 179         if (ping.getFlag(PingFrame.ACK)) {
 180             // did we send a Ping?
 181             PingRequest request = getNextRequest();
 182             if (request == null) {
 183                 System.err.println("Invalid ping ACK received");
 184                 close();
 185                 return;
 186             } else if (!Arrays.equals(request.pingData, ping.getData())) {
 187                 request.fail(new RuntimeException("Wrong ping data in ACK"));
 188             } else {
 189                 request.success();
 190             }
 191         } else {
 192             // client originated PING. Just send it back with ACK set
 193             ping.setFlag(PingFrame.ACK);
 194             outputQ.put(ping);
 195         }
 196     }
 197 
 198     private static boolean compareIPAddrs(InetAddress addr1, String host) {
 199         try {
 200             InetAddress addr2 = InetAddress.getByName(host);
 201             return addr1.equals(addr2);
 202         } catch (IOException e) {
 203             throw new UncheckedIOException(e);
 204         }
 205     }
 206 
 207     private static void handshake(String name, SSLSocket sock) throws IOException {
 208         if (name == null) {
 209             // no name set. No need to check
 210             return;
 211         } else if (name.equals("127.0.0.1")) {
 212             name = "localhost";
 213         }
 214         final String fname = name;
 215         final InetAddress addr1 = InetAddress.getByName(name);
 216         SSLParameters params = sock.getSSLParameters();
 217         SNIMatcher matcher = new SNIMatcher(StandardConstants.SNI_HOST_NAME) {


 268 
 269         String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
 270         clientSettings = getSettingsFromString(clientSettingsString);
 271 
 272         return upgrade;
 273     }
 274 
 275     /**
 276      * Decodes the given, Client, settings payload provided in base64 HTTP1
 277      * header value.
 278      */
 279     private SettingsFrame getSettingsFromString(String s) throws IOException {
 280         Base64.Decoder decoder = Base64.getUrlDecoder();
 281         byte[] payload = decoder.decode(s);
 282         ByteBuffer bb1 = ByteBuffer.wrap(payload);
 283         // simulate header of Settings Frame
 284         ByteBuffer bb0 = ByteBuffer.wrap(
 285                 new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
 286         List<Http2Frame> frames = new ArrayList<>();
 287         FramesDecoder reader = new FramesDecoder(frames::add);
 288         reader.decode(bb0);
 289         reader.decode(bb1);
 290         if (frames.size()!=1)
 291             throw new IOException("Expected 1 frame got "+frames.size()) ;
 292         Http2Frame frame = frames.get(0);
 293         if (!(frame instanceof SettingsFrame))
 294             throw new IOException("Expected SettingsFrame");
 295         return (SettingsFrame)frame;
 296     }
 297 
 298     void run() throws Exception {
 299         String upgrade = null;
 300         if (!secure) {
 301             upgrade = doUpgrade();
 302         } else {
 303             readPreface();
 304             sendSettingsFrame(true);
 305             clientSettings = (SettingsFrame) readFrame();
 306             if (clientSettings.getFlag(SettingsFrame.ACK)) {
 307                 // we received the ack to our frame first
 308                 clientSettings = (SettingsFrame) readFrame();
 309             }
 310             nextstream = 1;
 311         }
 312 
 313         System.out.println("ServerSettings: " + serverSettings);
 314         System.out.println("ClientSettings: " + clientSettings);
 315 
 316         hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
 317         hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
 318 
 319         if (!secure) {
 320             createPrimordialStream(upgrade);
 321             nextstream = 3;
 322         }
 323 
 324         exec.submit(this::readLoop);
 325         exec.submit(this::writeLoop);
 326     }
 327 
















 328     private void writeFrame(Http2Frame frame) throws IOException {
 329         List<ByteBuffer> bufs = new FramesEncoder().encodeFrame(frame);
 330         //System.err.println("TestServer: Writing frame " + frame.toString());
 331         int c = 0;
 332         for (ByteBuffer buf : bufs) {

 333             byte[] ba = buf.array();
 334             int start = buf.arrayOffset() + buf.position();
 335             c += buf.remaining();
 336             os.write(ba, start, buf.remaining());
 337 
 338 //            System.out.println("writing byte at a time");
 339 //            while (buf.hasRemaining()) {
 340 //                byte b = buf.get();
 341 //                os.write(b);
 342 //                os.flush();
 343 //                try {
 344 //                    Thread.sleep(1);
 345 //                } catch(InterruptedException e) {
 346 //                    UncheckedIOException uie = new UncheckedIOException(new IOException(""));
 347 //                    uie.addSuppressed(e);
 348 //                    throw uie;
 349 //                }
 350 //            }
 351         }
 352         os.flush();
 353         //System.err.printf("TestServer: wrote %d bytes\n", c);
 354     }
 355 





 356     private void handleCommonFrame(Http2Frame f) throws IOException {
 357         if (f instanceof SettingsFrame) {
 358             SettingsFrame sf = (SettingsFrame) f;
 359             if (sf.getFlag(SettingsFrame.ACK)) // ignore
 360             {
 361                 return;
 362             }
 363             // otherwise acknowledge it
 364             clientSettings = sf;
 365             SettingsFrame frame = new SettingsFrame();
 366             frame.setFlag(SettingsFrame.ACK);
 367             frame.streamid(0);
 368             outputQ.put(frame);
 369             return;
 370         } else if (f instanceof GoAwayFrame) {
 371             System.err.println("Closing: "+ f.toString());
 372             close();
 373         } else if (f instanceof PingFrame) {
 374             handlePing((PingFrame)f);
 375         } else
 376             throw new UnsupportedOperationException("Not supported yet: " + f.toString());
 377     }
 378 
 379     void sendWindowUpdates(int len, int streamid) throws IOException {
 380         if (len == 0)
 381             return;
 382         WindowUpdateFrame wup = new WindowUpdateFrame(streamid, len);
 383         outputQ.put(wup);
 384         wup = new WindowUpdateFrame(0 , len);
 385         outputQ.put(wup);
 386     }
 387 
 388     HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) throws IOException {
 389         HttpHeadersImpl headers = new HttpHeadersImpl();
 390 
 391         DecodingCallback cb = (name, value) -> {
 392             headers.addHeader(name.toString(), value.toString());
 393         };
 394 
 395         for (HeaderFrame frame : frames) {
 396             List<ByteBuffer> buffers = frame.getHeaderBlock();
 397             for (ByteBuffer buffer : buffers) {
 398                 hpackIn.decode(buffer, false, cb);
 399             }
 400         }
 401         hpackIn.decode(EMPTY_BUFFER, true, cb);
 402         return headers;
 403     }
 404 
 405     String getRequestLine(String request) {
 406         int eol = request.indexOf(CRLF);
 407         return request.substring(0, eol);
 408     }
 409 
 410     String getHeaders(String request) {
 411         int start = request.indexOf(CRLF);
 412         int end = request.indexOf(CRLFCRLF);
 413         if (start == -1 || end == -1) {
 414             throw new RuntimeException("Malformed request");
 415         }
 416         return request.substring(start,end);
 417     }
 418 


 437         String requestLine = getRequestLine(request);
 438         String[] tokens = requestLine.split(" ");
 439         if (!tokens[2].equals("HTTP/1.1")) {
 440             throw new IOException("bad request line");
 441         }
 442         URI uri = null;
 443         try {
 444             uri = new URI(tokens[1]);
 445         } catch (URISyntaxException e) {
 446             throw new IOException(e);
 447         }
 448         String host = getHeader(request, "Host");
 449         if (host == null) {
 450             throw new IOException("missing Host");
 451         }
 452 
 453         headers.setHeader(":method", tokens[0]);
 454         headers.setHeader(":scheme", "http"); // always in this case
 455         headers.setHeader(":authority", host);
 456         headers.setHeader(":path", uri.getPath());
 457         Queue q = new Queue(sentinel);
 458         String body = getRequestBody(request);
 459         addHeaders(getHeaders(request), headers);
 460         headers.setHeader("Content-length", Integer.toString(body.length()));
 461 
 462         addRequestBodyToQueue(body, q);
 463         streams.put(1, q);
 464         exec.submit(() -> {
 465             handleRequest(headers, q, 1, true /*complete request has been read*/);
 466         });
 467     }
 468 
 469     // all other streams created here
 470     @SuppressWarnings({"rawtypes","unchecked"})
 471     void createStream(HeaderFrame frame) throws IOException {
 472         List<HeaderFrame> frames = new LinkedList<>();
 473         frames.add(frame);
 474         int streamid = frame.streamid();
 475         if (streamid != nextstream) {
 476             throw new IOException("unexpected stream id");
 477         }


 479 
 480         boolean endStream = false;
 481         if (frame.getFlag(HeaderFrame.END_STREAM)) {
 482             endStream = true;
 483         }
 484 
 485         while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
 486             Http2Frame f = readFrame();
 487             if (!(f instanceof HeaderFrame)) {
 488                 handleCommonFrame(f); // should only be error frames
 489             } else {
 490                 frame = (HeaderFrame) f;
 491                 if (frame.getFlag(HeaderFrame.END_STREAM)) {
 492                     endStream = true;
 493                 }
 494                 frames.add(frame);
 495             }
 496         }
 497         boolean endStreamReceived = endStream;
 498         HttpHeadersImpl headers = decodeHeaders(frames);
 499         Queue q = new Queue(sentinel);
 500         streams.put(streamid, q);
 501         exec.submit(() -> {
 502             handleRequest(headers, q, streamid, endStreamReceived);
 503         });
 504     }
 505 
 506     // runs in own thread. Handles request from start to finish. Incoming frames
 507     // for this stream/request delivered on Q
 508 
 509     @SuppressWarnings({"rawtypes","unchecked"})
 510     void handleRequest(HttpHeadersImpl headers,
 511                        Queue queue,
 512                        int streamid,
 513                        boolean endStreamReceived)
 514     {
 515         String method = headers.firstValue(":method").orElse("");
 516         //System.out.println("method = " + method);
 517         String path = headers.firstValue(":path").orElse("");
 518         //System.out.println("path = " + path);
 519         String scheme = headers.firstValue(":scheme").orElse("");
 520         //System.out.println("scheme = " + scheme);
 521         String authority = headers.firstValue(":authority").orElse("");
 522         //System.out.println("authority = " + authority);
 523         System.err.printf("TestServer: %s %s\n", method, path);
 524         HttpHeadersImpl rspheaders = new HttpHeadersImpl();
 525         int winsize = clientSettings.getParameter(
 526                 SettingsFrame.INITIAL_WINDOW_SIZE);
 527         //System.err.println ("Stream window size = " + winsize);
 528 
 529         final InputStream bis;
 530         if (endStreamReceived && queue.size() == 0) {
 531             System.err.println("Server: got END_STREAM for stream " + streamid);
 532             bis = NullInputStream.INSTANCE;
 533         } else {
 534             System.err.println("Server: creating input stream for stream " + streamid);
 535             bis = new BodyInputStream(queue, streamid, this);
 536         }
 537         try (bis;
 538              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
 539         {
 540             outStreams.put(streamid, bos);
 541             String us = scheme + "://" + authority + path;
 542             URI uri = new URI(us);
 543             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
 544             Http2TestExchange exchange = exchangeSupplier.get(streamid, method,
 545                     headers, rspheaders, uri, bis, getSSLSession(),
 546                     bos, this, pushAllowed);
 547 
 548             // give to user
 549             Http2Handler handler = server.getHandlerFor(uri.getPath());
 550             handler.handle(exchange);
 551 
 552             // everything happens in the exchange from here. Hopefully will
 553             // return though.
 554         } catch (Throwable e) {
 555             System.err.println("TestServer: handleRequest exception: " + e);
 556             e.printStackTrace();
 557         }
 558     }
 559 
 560     private SSLSession getSSLSession() {
 561         if (! (socket instanceof SSLSocket))
 562             return null;
 563         SSLSocket ssl = (SSLSocket)socket;
 564         return ssl.getSession();


 586                         if (q != null) {
 587                             System.err.println("HEADERS frame for existing stream! Error.");
 588                             // TODO: close connection
 589                             continue;
 590                         } else {
 591                             createStream((HeadersFrame) frame);
 592                         }
 593                     } else {
 594                         if (q == null && !pushStreams.contains(stream)) {
 595                             System.err.printf("Non Headers frame received with"+
 596                                     " non existing stream (%d) ", frame.streamid());
 597                             System.err.println(frame);
 598                             continue;
 599                         }
 600                         if (frame.type() == WindowUpdateFrame.TYPE) {
 601                             WindowUpdateFrame wup = (WindowUpdateFrame) frame;
 602                             synchronized (updaters) {
 603                                 Consumer<Integer> r = updaters.get(stream);
 604                                 r.accept(wup.getUpdate());
 605                             }
 606                         } else if (frame.type() == ResetFrame.TYPE) {
 607                             // do orderly close on input q
 608                             // and close the output q immediately
 609                             // This should mean depending on what the
 610                             // handler is doing: either an EOF on read
 611                             // or an IOException if writing the response.
 612                             q.orderlyClose();
 613                             BodyOutputStream oq = outStreams.get(stream);
 614                             if (oq != null)
 615                                 oq.closeInternal();
 616 
 617                         } else {
 618                             q.put(frame);
 619                         }
 620                     }
 621                 }
 622             }
 623         } catch (Throwable e) {
 624             if (!stopping) {
 625                 System.err.println("Http server reader thread shutdown");
 626                 e.printStackTrace();
 627             }
 628             close();
 629         }
 630     }
 631 
 632     List<ByteBuffer> encodeHeaders(HttpHeadersImpl headers) {
 633         List<ByteBuffer> buffers = new LinkedList<>();
 634 
 635         ByteBuffer buf = getBuffer();
 636         boolean encoded;
 637         for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
 638             List<String> values = entry.getValue();
 639             String key = entry.getKey().toLowerCase();
 640             for (String value : values) {
 641                 do {
 642                     hpackOut.header(key, value);
 643                     encoded = hpackOut.encode(buf);
 644                     if (!encoded) {
 645                         buf.flip();
 646                         buffers.add(buf);
 647                         buf = getBuffer();
 648                     }
 649                 } while (!encoded);
 650             }
 651         }
 652         buf.flip();
 653         buffers.add(buf);
 654         return buffers;
 655     }
 656 
 657     static void closeIgnore(Closeable c) {
 658         try {
 659             c.close();
 660         } catch (IOException e) {}
 661     }
 662 
 663     // Runs in own thread
 664     void writeLoop() {
 665         try {
 666             while (!stopping) {
 667                 Http2Frame frame;
 668                 try {
 669                     frame = outputQ.take();
 670                 } catch(IOException x) {
 671                     if (stopping && x.getCause() instanceof InterruptedException) {
 672                         break;
 673                     } else throw x;
 674                 }


 677                     HeadersFrame hf = new HeadersFrame(rh.streamid(), rh.getFlags(), encodeHeaders(rh.headers));
 678                     writeFrame(hf);
 679                 } else if (frame instanceof OutgoingPushPromise) {
 680                     handlePush((OutgoingPushPromise)frame);
 681                 } else
 682                     writeFrame(frame);
 683             }
 684             System.err.println("TestServer: Connection writer stopping");
 685         } catch (Throwable e) {
 686             e.printStackTrace();
 687             /*close();
 688             if (!stopping) {
 689                 e.printStackTrace();
 690                 System.err.println("TestServer: writeLoop exception: " + e);
 691             }*/
 692         }
 693     }
 694 
 695     private void handlePush(OutgoingPushPromise op) throws IOException {
 696         int promisedStreamid = nextPushStreamId;
 697         PushPromiseFrame pp = new PushPromiseFrame(op.parentStream,
 698                                                    HeaderFrame.END_HEADERS,
 699                                                    promisedStreamid,
 700                                                    encodeHeaders(op.headers),
 701                                                    0);
 702         pushStreams.add(promisedStreamid);
 703         nextPushStreamId += 2;
 704         pp.streamid(op.parentStream);
 705         writeFrame(pp);
 706         final InputStream ii = op.is;
 707         final BodyOutputStream oo = new BodyOutputStream(
 708                 promisedStreamid,
 709                 clientSettings.getParameter(
 710                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
 711         outStreams.put(promisedStreamid, oo);
 712         oo.goodToGo();
 713         exec.submit(() -> {
 714             try {
 715                 ResponseHeaders oh = getPushResponse(promisedStreamid);
 716                 outputQ.put(oh);
 717                 ii.transferTo(oo);
 718             } catch (Throwable ex) {
 719                 System.err.printf("TestServer: pushing response error: %s\n",
 720                         ex.toString());
 721             } finally {
 722                 closeIgnore(ii);
 723                 closeIgnore(oo);
 724             }
 725         });
 726 
 727     }
 728 
 729     // returns a minimal response with status 200
 730     // that is the response to the push promise just sent
 731     private ResponseHeaders getPushResponse(int streamid) {


 740     private ByteBuffer getBuffer() {
 741         return ByteBuffer.allocate(8 * 1024);
 742     }
 743 
 744     private Http2Frame readFrame() throws IOException {
 745         byte[] buf = new byte[9];
 746         if (is.readNBytes(buf, 0, 9) != 9)
 747             throw new IOException("readFrame: connection closed");
 748         int len = 0;
 749         for (int i = 0; i < 3; i++) {
 750             int n = buf[i] & 0xff;
 751             //System.err.println("n = " + n);
 752             len = (len << 8) + n;
 753         }
 754         byte[] rest = new byte[len];
 755         int n = is.readNBytes(rest, 0, len);
 756         if (n != len)
 757             throw new IOException("Error reading frame");
 758         List<Http2Frame> frames = new ArrayList<>();
 759         FramesDecoder reader = new FramesDecoder(frames::add);
 760         reader.decode(ByteBuffer.wrap(buf));
 761         reader.decode(ByteBuffer.wrap(rest));
 762         if (frames.size()!=1)
 763             throw new IOException("Expected 1 frame got "+frames.size()) ;
 764 
 765         return frames.get(0);
 766     }
 767 
 768     void sendSettingsFrame() throws IOException {
 769         sendSettingsFrame(false);
 770     }
 771 
 772     void sendSettingsFrame(boolean now) throws IOException {
 773         if (now) {
 774             writeFrame(serverSettings);
 775         } else {
 776             outputQ.put(serverSettings);
 777         }
 778     }
 779 
 780     String readUntil(String end) throws IOException {
 781         int number = end.length();


 816         int start = headers1.indexOf(name);
 817         if (start == -1) {
 818             return null;
 819         }
 820         start += 2;
 821         int end = headers1.indexOf(CRLF, start);
 822         String line = headers.substring(start, end);
 823         start = line.indexOf(':');
 824         if (start == -1) {
 825             return null;
 826         }
 827         return line.substring(start + 1).trim();
 828     }
 829 
 830     final static String CRLF = "\r\n";
 831     final static String CRLFCRLF = "\r\n\r\n";
 832 
 833     String readHttp1Request() throws IOException {
 834         String headers = readUntil(CRLF + CRLF);
 835         int clen = getContentLength(headers);
 836         byte[] buf;
 837         if (clen >= 0) {
 838             // HTTP/1.1 fixed length content ( may be 0 ), read it
 839             buf = new byte[clen];
 840             is.readNBytes(buf, 0, clen);
 841         } else {
 842             //  HTTP/1.1 chunked data, read it
 843             buf = readChunkedInputStream(is);
 844         }
 845         String body = new String(buf, StandardCharsets.US_ASCII);
 846         return headers + body;
 847     }
 848 
 849     // This is a quick hack to get a chunked input stream reader.
 850     private static byte[] readChunkedInputStream(InputStream is) throws IOException {
 851         ChunkedInputStream cis = new ChunkedInputStream(is, new HttpClient() {}, null);
 852         return cis.readAllBytes();
 853     }
 854 
 855     void sendHttp1Response(int code, String msg, String... headers) throws IOException {
 856         StringBuilder sb = new StringBuilder();
 857         sb.append("HTTP/1.1 ")
 858                 .append(code)
 859                 .append(' ')
 860                 .append(msg)
 861                 .append(CRLF);
 862         int numheaders = headers.length;
 863         for (int i = 0; i < numheaders; i += 2) {
 864             sb.append(headers[i])
 865                     .append(": ")
 866                     .append(headers[i + 1])
 867                     .append(CRLF);
 868         }
 869         sb.append(CRLF);
 870         String s = sb.toString();
 871         os.write(s.getBytes("US-ASCII"));
 872         os.flush();
 873     }
 874 


 878     }
 879 
 880     final static ByteBuffer[] bbarray = new ByteBuffer[0];
 881 
 882     // wrapper around a BlockingQueue that throws an exception when it's closed
 883     // Each stream has one of these
 884 
 885     String getRequestBody(String request) {
 886         int bodystart = request.indexOf(CRLF+CRLF);
 887         String body;
 888         if (bodystart == -1)
 889             body = "";
 890         else
 891             body = request.substring(bodystart+4);
 892         return body;
 893     }
 894 
 895     @SuppressWarnings({"rawtypes","unchecked"})
 896     void addRequestBodyToQueue(String body, Queue q) throws IOException {
 897         ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
 898         DataFrame df = new DataFrame(1, DataFrame.END_STREAM, buf);
 899         // only used for primordial stream
 900         q.put(df);
 901     }
 902 
 903     // window updates done in main reader thread because they may
 904     // be used to unblock BodyOutputStreams waiting for WUPs
 905 
 906     HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
 907 
 908     void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
 909         synchronized(updaters) {
 910             updaters.put(streamid, r);
 911         }
 912     }
 913 
 914     int sendWindow = 64 * 1024 - 1; // connection level send window
 915 
 916     /**
 917      * BodyOutputStreams call this to get the connection window first.
 918      *


< prev index next >