1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test
  26  * @bug 8195823
  27  * @summary Buffers given to response body subscribers should not contain
  28  *          unprocessed HTTP data
  29  * @modules java.base/sun.net.www.http
  30  *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
  31  *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
  32  *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
  33  *          java.logging
  34  *          jdk.httpserver
  35  * @library /lib/testlibrary http2/server
  36  * @build Http2TestServer
  37  * @build jdk.testlibrary.SimpleSSLContext
  38  * @run testng/othervm -Djdk.internal.httpclient.debug=true ConcurrentResponses
  39  */
  40 
  41 import java.io.IOException;
  42 import java.io.InputStream;
  43 import java.io.OutputStream;
  44 import java.net.InetSocketAddress;
  45 import java.net.URI;
  46 import java.nio.ByteBuffer;
  47 import java.util.HashMap;
  48 import java.util.List;
  49 import java.util.Map;
  50 import java.util.concurrent.CompletableFuture;
  51 import java.util.concurrent.CompletionStage;
  52 import java.util.concurrent.Flow;
  53 import java.util.stream.IntStream;
  54 import javax.net.ssl.SSLContext;
  55 import com.sun.net.httpserver.HttpExchange;
  56 import com.sun.net.httpserver.HttpHandler;
  57 import com.sun.net.httpserver.HttpServer;
  58 import com.sun.net.httpserver.HttpsConfigurator;
  59 import com.sun.net.httpserver.HttpsServer;
  60 import jdk.incubator.http.HttpClient;
  61 import jdk.incubator.http.HttpRequest;
  62 import jdk.incubator.http.HttpResponse;
  63 import jdk.incubator.http.HttpResponse.BodyHandler;
  64 import jdk.incubator.http.HttpResponse.BodySubscriber;
  65 import jdk.testlibrary.SimpleSSLContext;
  66 import org.testng.annotations.AfterTest;
  67 import org.testng.annotations.BeforeTest;
  68 import org.testng.annotations.DataProvider;
  69 import org.testng.annotations.Test;
  70 import static java.nio.charset.StandardCharsets.UTF_8;
  71 import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
  72 import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
  73 import static org.testng.Assert.assertEquals;
  74 import static org.testng.Assert.assertFalse;
  75 import static org.testng.Assert.fail;
  76 
  77 public class ConcurrentResponses {
  78 
  79     SSLContext sslContext;
  80     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
  81     HttpsServer httpsTestServer;       // HTTPS/1.1
  82     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
  83     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
  84     String httpFixedURI, httpsFixedURI, httpChunkedURI, httpsChunkedURI;
  85     String http2FixedURI, https2FixedURI, http2VariableURI, https2VariableURI;
  86 
  87     static final int CONCURRENT_REQUESTS = 13;
  88 
  89     static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
  90     static final int ALPHABET_LENGTH = ALPHABET.length();
  91 
  92     static final String stringOfLength(int requiredLength) {
  93         StringBuilder sb = new StringBuilder(requiredLength);
  94         IntStream.range(0, requiredLength)
  95                  .mapToObj(i -> ALPHABET.charAt(i % ALPHABET_LENGTH))
  96                  .forEach(c -> sb.append(c));
  97         return sb.toString();
  98     }
  99 
 100     /** An array of different Strings, to be used as bodies. */
 101     static final String[] BODIES = bodies();
 102 
 103     static String[] bodies() {
 104         String[] bodies = new String[CONCURRENT_REQUESTS];
 105         for (int i=0;i<CONCURRENT_REQUESTS; i++) {
 106             // slightly, but still, different bodies
 107             bodies[i] = "Request-" + i + "-body-" + stringOfLength((1024) + i);
 108         }
 109         return bodies;
 110     }
 111 
 112     /**
 113      * Asserts the given response's status code is 200.
 114      * Returns a CF that completes with the given response.
 115      */
 116     static final <T> CompletionStage<HttpResponse<T>>
 117     assert200ResponseCode(HttpResponse<T> response) {
 118         assertEquals(response.statusCode(), 200);
 119         return CompletableFuture.completedFuture(response);
 120     }
 121 
 122     /**
 123      * Asserts that the given response's body is equal to the given body.
 124      * Returns a CF that completes with the given response.
 125      */
 126     static final <T> CompletionStage<HttpResponse<T>>
 127     assertbody(HttpResponse<T> response, T body) {
 128         assertEquals(response.body(), body);
 129         return CompletableFuture.completedFuture(response);
 130     }
 131 
 132     @DataProvider(name = "uris")
 133     public Object[][] variants() {
 134         return new Object[][]{
 135                 { httpFixedURI },
 136                 { httpsFixedURI },
 137                 { httpChunkedURI },
 138                 { httpsChunkedURI },
 139                 { http2FixedURI },
 140                 { https2FixedURI },
 141                 { http2VariableURI },
 142                 { https2VariableURI }
 143         };
 144     }
 145 
 146 
 147     // The asString implementation accumulates data, below a certain threshold
 148     // into the byte buffers it is given.
 149     @Test(dataProvider = "uris")
 150     void testAsString(String uri) throws Exception {
 151         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 152 
 153         Map<HttpRequest, String> requests = new HashMap<>();
 154         for (int i=0;i<CONCURRENT_REQUESTS; i++) {
 155             HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
 156                                              .build();
 157             requests.put(request, BODIES[i]);
 158         }
 159 
 160         // initial connection to seed the cache so next parallel connections reuse it
 161         client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join();
 162 
 163         // will reuse connection cached from the previous request ( when HTTP/2 )
 164         CompletableFuture.allOf(requests.keySet().parallelStream()
 165                 .map(request -> client.sendAsync(request, asString()))
 166                 .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
 167                 .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
 168                 .toArray(CompletableFuture<?>[]::new))
 169                 .join();
 170     }
 171 
 172     // The custom subscriber aggressively attacks any area, between the limit
 173     // and the capacity, in the byte buffers it is given, by writing 'X' into it.
 174     @Test(dataProvider = "uris")
 175     void testWithCustomSubscriber(String uri) throws Exception {
 176         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 177 
 178         Map<HttpRequest, String> requests = new HashMap<>();
 179         for (int i=0;i<CONCURRENT_REQUESTS; i++) {
 180             HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i))
 181                     .build();
 182             requests.put(request, BODIES[i]);
 183         }
 184 
 185         // initial connection to seed the cache so next parallel connections reuse it
 186         client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join();
 187 
 188         // will reuse connection cached from the previous request ( when HTTP/2 )
 189         CompletableFuture.allOf(requests.keySet().parallelStream()
 190                 .map(request -> client.sendAsync(request, CustomSubscriber.handler))
 191                 .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode))
 192                 .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request()))))
 193                 .toArray(CompletableFuture<?>[]::new))
 194                 .join();
 195     }
 196 
 197     /**
 198      * A subscriber that wraps asString, but mucks with any data between limit
 199      * and capacity, if the client mistakenly passes it any that is should not.
 200      */
 201     static class CustomSubscriber implements BodySubscriber<String> {
 202         static final BodyHandler<String> handler = (r,h) -> new CustomSubscriber();
 203         private final BodySubscriber<String> asString = BodySubscriber.asString(UTF_8);
 204 
 205         @Override
 206         public CompletionStage<String> getBody() {
 207             return asString.getBody();
 208         }
 209 
 210         @Override
 211         public void onSubscribe(Flow.Subscription subscription) {
 212             asString.onSubscribe(subscription);
 213         }
 214 
 215         @Override
 216         public void onNext(List<ByteBuffer> buffers) {
 217             // Muck any data beyond the give limit, since there shouldn't
 218             // be any of interest to the HTTP Client.
 219             for (ByteBuffer buffer : buffers) {
 220                 if (buffer.limit() != buffer.capacity()) {
 221                     final int limit = buffer.limit();
 222                     final int position = buffer.position();
 223                     buffer.position(buffer.limit());
 224                     buffer.limit(buffer.capacity());
 225                     while (buffer.hasRemaining())
 226                         buffer.put((byte)'X');
 227                     buffer.position(position); // restore original position
 228                     buffer.limit(limit);       // restore original limit
 229                 }
 230             }
 231             asString.onNext(buffers);
 232         }
 233 
 234         @Override
 235         public void onError(Throwable throwable) {
 236             asString.onError(throwable);
 237             throwable.printStackTrace();
 238             fail("UNEXPECTED:" + throwable);
 239         }
 240 
 241         @Override
 242         public void onComplete() {
 243             asString.onComplete();
 244         }
 245     }
 246 
 247 
 248     @BeforeTest
 249     public void setup() throws Exception {
 250         sslContext = new SimpleSSLContext().get();
 251         if (sslContext == null)
 252             throw new AssertionError("Unexpected null sslContext");
 253 
 254         InetSocketAddress sa = new InetSocketAddress("localhost", 0);
 255         httpTestServer = HttpServer.create(sa, 0);
 256         httpTestServer.createContext("/http1/fixed", new Http1FixedHandler());
 257         httpFixedURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/fixed";
 258         httpTestServer.createContext("/http1/chunked", new Http1ChunkedHandler());
 259         httpChunkedURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/chunked";
 260 
 261         httpsTestServer = HttpsServer.create(sa, 0);
 262         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
 263         httpsTestServer.createContext("/https1/fixed", new Http1FixedHandler());
 264         httpsFixedURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/fixed";
 265         httpsTestServer.createContext("/https1/chunked", new Http1ChunkedHandler());
 266         httpsChunkedURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/chunked";
 267 
 268         http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
 269         http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed");
 270         http2FixedURI = "http://127.0.0.1:" + http2TestServer.getAddress().getPort() + "/http2/fixed";
 271         http2TestServer.addHandler(new Http2VariableHandler(), "/http2/variable");
 272         http2VariableURI = "http://127.0.0.1:" + http2TestServer.getAddress().getPort() + "/http2/variable";
 273 
 274         https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
 275         https2TestServer.addHandler(new Http2FixedHandler(), "/https2/fixed");
 276         https2FixedURI = "https://127.0.0.1:" + https2TestServer.getAddress().getPort() + "/https2/fixed";
 277         https2TestServer.addHandler(new Http2VariableHandler(), "/https2/variable");
 278         https2VariableURI = "https://127.0.0.1:" + https2TestServer.getAddress().getPort() + "/https2/variable";
 279 
 280         httpTestServer.start();
 281         httpsTestServer.start();
 282         http2TestServer.start();
 283         https2TestServer.start();
 284     }
 285 
 286     @AfterTest
 287     public void teardown() throws Exception {
 288         httpTestServer.stop(0);
 289         httpsTestServer.stop(0);
 290         http2TestServer.stop();
 291         https2TestServer.stop();
 292     }
 293 
 294     interface SendResponseHeadersFunction {
 295         void apply(int responseCode, long responseLength) throws IOException;
 296     }
 297 
 298     // A handler implementation that replies with 200 OK. If the exchange's uri
 299     // has a query, then it must be an integer, which is used as an index to
 300     // select the particular response body, e.g. /http2/x?5 -> BODIES[5]
 301     static void serverHandlerImpl(InputStream inputStream,
 302                                   OutputStream outputStream,
 303                                   URI uri,
 304                                   SendResponseHeadersFunction sendResponseHeadersFunction)
 305         throws IOException
 306     {
 307         try (InputStream is = inputStream;
 308              OutputStream os = outputStream) {
 309             is.readAllBytes();
 310 
 311             String magicQuery = uri.getQuery();
 312             if (magicQuery != null) {
 313                 int bodyIndex = Integer.valueOf(magicQuery);
 314                 String body = BODIES[bodyIndex];
 315                 byte[] bytes = body.getBytes(UTF_8);
 316                 sendResponseHeadersFunction.apply(200, bytes.length);
 317                 int offset = 0;
 318                 // Deliberately attempt to reply with several relatively
 319                 // small data frames ( each write corresponds to its own
 320                 // data frame ). Additionally, yield, to encourage other
 321                 // handlers to execute, therefore increasing the likelihood
 322                 // of multiple different-stream related frames in the
 323                 // client's read buffer.
 324                 while (offset < bytes.length) {
 325                     int length = Math.min(bytes.length - offset, 64);
 326                     os.write(bytes, offset, length);
 327                     os.flush();
 328                     offset += length;
 329                     Thread.yield();
 330                 }
 331             } else {
 332                 sendResponseHeadersFunction.apply(200, 1);
 333                 os.write('A');
 334             }
 335         }
 336     }
 337 
 338     static class Http1FixedHandler implements HttpHandler {
 339         @Override
 340         public void handle(HttpExchange t) throws IOException {
 341             serverHandlerImpl(t.getRequestBody(),
 342                               t.getResponseBody(),
 343                               t.getRequestURI(),
 344                               (rcode, length) -> t.sendResponseHeaders(rcode, length));
 345         }
 346     }
 347 
 348     static class Http1ChunkedHandler implements HttpHandler {
 349         @Override
 350         public void handle(HttpExchange t) throws IOException {
 351             serverHandlerImpl(t.getRequestBody(),
 352                               t.getResponseBody(),
 353                               t.getRequestURI(),
 354                               (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /*chunked*/));
 355         }
 356     }
 357 
 358     static class Http2FixedHandler implements Http2Handler {
 359         @Override
 360         public void handle(Http2TestExchange t) throws IOException {
 361             serverHandlerImpl(t.getRequestBody(),
 362                               t.getResponseBody(),
 363                               t.getRequestURI(),
 364                               (rcode, length) -> t.sendResponseHeaders(rcode, length));
 365         }
 366     }
 367 
 368     static class Http2VariableHandler implements Http2Handler {
 369         @Override
 370         public void handle(Http2TestExchange t) throws IOException {
 371             serverHandlerImpl(t.getRequestBody(),
 372                               t.getResponseBody(),
 373                               t.getRequestURI(),
 374                               (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /* no Content-Length */));
 375         }
 376     }
 377 }