1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 8195823 27 * @summary Buffers given to response body subscribers should not contain 28 * unprocessed HTTP data 29 * @modules java.base/sun.net.www.http 30 * jdk.incubator.httpclient/jdk.incubator.http.internal.common 31 * jdk.incubator.httpclient/jdk.incubator.http.internal.frame 32 * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack 33 * java.logging 34 * jdk.httpserver 35 * @library /lib/testlibrary http2/server 36 * @build Http2TestServer 37 * @build jdk.testlibrary.SimpleSSLContext 38 * @run testng/othervm -Djdk.internal.httpclient.debug=true ConcurrentResponses 39 */ 40 41 import java.io.IOException; 42 import java.io.InputStream; 43 import java.io.OutputStream; 44 import java.net.InetSocketAddress; 45 import java.net.URI; 46 import java.nio.ByteBuffer; 47 import java.util.HashMap; 48 import java.util.List; 49 import java.util.Map; 50 import java.util.concurrent.CompletableFuture; 51 import java.util.concurrent.CompletionStage; 52 import java.util.concurrent.Flow; 53 import java.util.stream.IntStream; 54 import javax.net.ssl.SSLContext; 55 import com.sun.net.httpserver.HttpExchange; 56 import com.sun.net.httpserver.HttpHandler; 57 import com.sun.net.httpserver.HttpServer; 58 import com.sun.net.httpserver.HttpsConfigurator; 59 import com.sun.net.httpserver.HttpsServer; 60 import jdk.incubator.http.HttpClient; 61 import jdk.incubator.http.HttpRequest; 62 import jdk.incubator.http.HttpResponse; 63 import jdk.incubator.http.HttpResponse.BodyHandler; 64 import jdk.incubator.http.HttpResponse.BodySubscriber; 65 import jdk.testlibrary.SimpleSSLContext; 66 import org.testng.annotations.AfterTest; 67 import org.testng.annotations.BeforeTest; 68 import org.testng.annotations.DataProvider; 69 import org.testng.annotations.Test; 70 import static java.nio.charset.StandardCharsets.UTF_8; 71 import static jdk.incubator.http.HttpResponse.BodyHandler.asString; 72 import static jdk.incubator.http.HttpResponse.BodyHandler.discard; 73 import static org.testng.Assert.assertEquals; 74 import static org.testng.Assert.assertFalse; 75 import static org.testng.Assert.fail; 76 77 public class ConcurrentResponses { 78 79 SSLContext sslContext; 80 HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] 81 HttpsServer httpsTestServer; // HTTPS/1.1 82 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) 83 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) 84 String httpFixedURI, httpsFixedURI, httpChunkedURI, httpsChunkedURI; 85 String http2FixedURI, https2FixedURI, http2VariableURI, https2VariableURI; 86 87 static final int CONCURRENT_REQUESTS = 13; 88 89 static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; 90 static final int ALPHABET_LENGTH = ALPHABET.length(); 91 92 static final String stringOfLength(int requiredLength) { 93 StringBuilder sb = new StringBuilder(requiredLength); 94 IntStream.range(0, requiredLength) 95 .mapToObj(i -> ALPHABET.charAt(i % ALPHABET_LENGTH)) 96 .forEach(c -> sb.append(c)); 97 return sb.toString(); 98 } 99 100 /** An array of different Strings, to be used as bodies. */ 101 static final String[] BODIES = bodies(); 102 103 static String[] bodies() { 104 String[] bodies = new String[CONCURRENT_REQUESTS]; 105 for (int i=0;i<CONCURRENT_REQUESTS; i++) { 106 // slightly, but still, different bodies 107 bodies[i] = "Request-" + i + "-body-" + stringOfLength((1024) + i); 108 } 109 return bodies; 110 } 111 112 /** 113 * Asserts the given response's status code is 200. 114 * Returns a CF that completes with the given response. 115 */ 116 static final <T> CompletionStage<HttpResponse<T>> 117 assert200ResponseCode(HttpResponse<T> response) { 118 assertEquals(response.statusCode(), 200); 119 return CompletableFuture.completedFuture(response); 120 } 121 122 /** 123 * Asserts that the given response's body is equal to the given body. 124 * Returns a CF that completes with the given response. 125 */ 126 static final <T> CompletionStage<HttpResponse<T>> 127 assertbody(HttpResponse<T> response, T body) { 128 assertEquals(response.body(), body); 129 return CompletableFuture.completedFuture(response); 130 } 131 132 @DataProvider(name = "uris") 133 public Object[][] variants() { 134 return new Object[][]{ 135 { httpFixedURI }, 136 { httpsFixedURI }, 137 { httpChunkedURI }, 138 { httpsChunkedURI }, 139 { http2FixedURI }, 140 { https2FixedURI }, 141 { http2VariableURI }, 142 { https2VariableURI } 143 }; 144 } 145 146 147 // The asString implementation accumulates data, below a certain threshold 148 // into the byte buffers it is given. 149 @Test(dataProvider = "uris") 150 void testAsString(String uri) throws Exception { 151 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 152 153 Map<HttpRequest, String> requests = new HashMap<>(); 154 for (int i=0;i<CONCURRENT_REQUESTS; i++) { 155 HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i)) 156 .build(); 157 requests.put(request, BODIES[i]); 158 } 159 160 // initial connection to seed the cache so next parallel connections reuse it 161 client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join(); 162 163 // will reuse connection cached from the previous request ( when HTTP/2 ) 164 CompletableFuture.allOf(requests.keySet().parallelStream() 165 .map(request -> client.sendAsync(request, asString())) 166 .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode)) 167 .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request())))) 168 .toArray(CompletableFuture<?>[]::new)) 169 .join(); 170 } 171 172 // The custom subscriber aggressively attacks any area, between the limit 173 // and the capacity, in the byte buffers it is given, by writing 'X' into it. 174 @Test(dataProvider = "uris") 175 void testWithCustomSubscriber(String uri) throws Exception { 176 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 177 178 Map<HttpRequest, String> requests = new HashMap<>(); 179 for (int i=0;i<CONCURRENT_REQUESTS; i++) { 180 HttpRequest request = HttpRequest.newBuilder(URI.create(uri + "?" + i)) 181 .build(); 182 requests.put(request, BODIES[i]); 183 } 184 185 // initial connection to seed the cache so next parallel connections reuse it 186 client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join(); 187 188 // will reuse connection cached from the previous request ( when HTTP/2 ) 189 CompletableFuture.allOf(requests.keySet().parallelStream() 190 .map(request -> client.sendAsync(request, CustomSubscriber.handler)) 191 .map(cf -> cf.thenCompose(ConcurrentResponses::assert200ResponseCode)) 192 .map(cf -> cf.thenCompose(response -> assertbody(response, requests.get(response.request())))) 193 .toArray(CompletableFuture<?>[]::new)) 194 .join(); 195 } 196 197 /** 198 * A subscriber that wraps asString, but mucks with any data between limit 199 * and capacity, if the client mistakenly passes it any that is should not. 200 */ 201 static class CustomSubscriber implements BodySubscriber<String> { 202 static final BodyHandler<String> handler = (r,h) -> new CustomSubscriber(); 203 private final BodySubscriber<String> asString = BodySubscriber.asString(UTF_8); 204 205 @Override 206 public CompletionStage<String> getBody() { 207 return asString.getBody(); 208 } 209 210 @Override 211 public void onSubscribe(Flow.Subscription subscription) { 212 asString.onSubscribe(subscription); 213 } 214 215 @Override 216 public void onNext(List<ByteBuffer> buffers) { 217 // Muck any data beyond the give limit, since there shouldn't 218 // be any of interest to the HTTP Client. 219 for (ByteBuffer buffer : buffers) { 220 if (buffer.limit() != buffer.capacity()) { 221 final int limit = buffer.limit(); 222 final int position = buffer.position(); 223 buffer.position(buffer.limit()); 224 buffer.limit(buffer.capacity()); 225 while (buffer.hasRemaining()) 226 buffer.put((byte)'X'); 227 buffer.position(position); // restore original position 228 buffer.limit(limit); // restore original limit 229 } 230 } 231 asString.onNext(buffers); 232 } 233 234 @Override 235 public void onError(Throwable throwable) { 236 asString.onError(throwable); 237 throwable.printStackTrace(); 238 fail("UNEXPECTED:" + throwable); 239 } 240 241 @Override 242 public void onComplete() { 243 asString.onComplete(); 244 } 245 } 246 247 248 @BeforeTest 249 public void setup() throws Exception { 250 sslContext = new SimpleSSLContext().get(); 251 if (sslContext == null) 252 throw new AssertionError("Unexpected null sslContext"); 253 254 InetSocketAddress sa = new InetSocketAddress("localhost", 0); 255 httpTestServer = HttpServer.create(sa, 0); 256 httpTestServer.createContext("/http1/fixed", new Http1FixedHandler()); 257 httpFixedURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/fixed"; 258 httpTestServer.createContext("/http1/chunked", new Http1ChunkedHandler()); 259 httpChunkedURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/chunked"; 260 261 httpsTestServer = HttpsServer.create(sa, 0); 262 httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); 263 httpsTestServer.createContext("/https1/fixed", new Http1FixedHandler()); 264 httpsFixedURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/fixed"; 265 httpsTestServer.createContext("/https1/chunked", new Http1ChunkedHandler()); 266 httpsChunkedURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/chunked"; 267 268 http2TestServer = new Http2TestServer("127.0.0.1", false, 0); 269 http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed"); 270 http2FixedURI = "http://127.0.0.1:" + http2TestServer.getAddress().getPort() + "/http2/fixed"; 271 http2TestServer.addHandler(new Http2VariableHandler(), "/http2/variable"); 272 http2VariableURI = "http://127.0.0.1:" + http2TestServer.getAddress().getPort() + "/http2/variable"; 273 274 https2TestServer = new Http2TestServer("127.0.0.1", true, 0); 275 https2TestServer.addHandler(new Http2FixedHandler(), "/https2/fixed"); 276 https2FixedURI = "https://127.0.0.1:" + https2TestServer.getAddress().getPort() + "/https2/fixed"; 277 https2TestServer.addHandler(new Http2VariableHandler(), "/https2/variable"); 278 https2VariableURI = "https://127.0.0.1:" + https2TestServer.getAddress().getPort() + "/https2/variable"; 279 280 httpTestServer.start(); 281 httpsTestServer.start(); 282 http2TestServer.start(); 283 https2TestServer.start(); 284 } 285 286 @AfterTest 287 public void teardown() throws Exception { 288 httpTestServer.stop(0); 289 httpsTestServer.stop(0); 290 http2TestServer.stop(); 291 https2TestServer.stop(); 292 } 293 294 interface SendResponseHeadersFunction { 295 void apply(int responseCode, long responseLength) throws IOException; 296 } 297 298 // A handler implementation that replies with 200 OK. If the exchange's uri 299 // has a query, then it must be an integer, which is used as an index to 300 // select the particular response body, e.g. /http2/x?5 -> BODIES[5] 301 static void serverHandlerImpl(InputStream inputStream, 302 OutputStream outputStream, 303 URI uri, 304 SendResponseHeadersFunction sendResponseHeadersFunction) 305 throws IOException 306 { 307 try (InputStream is = inputStream; 308 OutputStream os = outputStream) { 309 is.readAllBytes(); 310 311 String magicQuery = uri.getQuery(); 312 if (magicQuery != null) { 313 int bodyIndex = Integer.valueOf(magicQuery); 314 String body = BODIES[bodyIndex]; 315 byte[] bytes = body.getBytes(UTF_8); 316 sendResponseHeadersFunction.apply(200, bytes.length); 317 int offset = 0; 318 // Deliberately attempt to reply with several relatively 319 // small data frames ( each write corresponds to its own 320 // data frame ). Additionally, yield, to encourage other 321 // handlers to execute, therefore increasing the likelihood 322 // of multiple different-stream related frames in the 323 // client's read buffer. 324 while (offset < bytes.length) { 325 int length = Math.min(bytes.length - offset, 64); 326 os.write(bytes, offset, length); 327 os.flush(); 328 offset += length; 329 Thread.yield(); 330 } 331 } else { 332 sendResponseHeadersFunction.apply(200, 1); 333 os.write('A'); 334 } 335 } 336 } 337 338 static class Http1FixedHandler implements HttpHandler { 339 @Override 340 public void handle(HttpExchange t) throws IOException { 341 serverHandlerImpl(t.getRequestBody(), 342 t.getResponseBody(), 343 t.getRequestURI(), 344 (rcode, length) -> t.sendResponseHeaders(rcode, length)); 345 } 346 } 347 348 static class Http1ChunkedHandler implements HttpHandler { 349 @Override 350 public void handle(HttpExchange t) throws IOException { 351 serverHandlerImpl(t.getRequestBody(), 352 t.getResponseBody(), 353 t.getRequestURI(), 354 (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /*chunked*/)); 355 } 356 } 357 358 static class Http2FixedHandler implements Http2Handler { 359 @Override 360 public void handle(Http2TestExchange t) throws IOException { 361 serverHandlerImpl(t.getRequestBody(), 362 t.getResponseBody(), 363 t.getRequestURI(), 364 (rcode, length) -> t.sendResponseHeaders(rcode, length)); 365 } 366 } 367 368 static class Http2VariableHandler implements Http2Handler { 369 @Override 370 public void handle(Http2TestExchange t) throws IOException { 371 serverHandlerImpl(t.getRequestBody(), 372 t.getResponseBody(), 373 t.getRequestURI(), 374 (rcode, ignored) -> t.sendResponseHeaders(rcode, 0 /* no Content-Length */)); 375 } 376 } 377 }