1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.ByteArrayOutputStream; 25 import java.io.IOException; 26 import java.io.InputStream; 27 import java.io.OutputStream; 28 import java.io.UncheckedIOException; 29 import java.net.InetSocketAddress; 30 import java.net.URI; 31 import java.nio.ByteBuffer; 32 import java.nio.MappedByteBuffer; 33 import java.util.Collection; 34 import java.util.List; 35 import java.util.concurrent.Flow; 36 import java.util.function.Function; 37 import java.util.function.Supplier; 38 import com.sun.net.httpserver.HttpExchange; 39 import com.sun.net.httpserver.HttpHandler; 40 import com.sun.net.httpserver.HttpServer; 41 import com.sun.net.httpserver.HttpsConfigurator; 42 import com.sun.net.httpserver.HttpsServer; 43 import jdk.incubator.http.HttpClient; 44 import jdk.incubator.http.HttpRequest; 45 import jdk.incubator.http.HttpResponse; 46 import jdk.incubator.http.HttpResponse.BodyHandler; 47 import jdk.incubator.http.HttpResponse.BodySubscriber; 48 import jdk.testlibrary.SimpleSSLContext; 49 import org.testng.annotations.AfterTest; 50 import org.testng.annotations.BeforeTest; 51 import org.testng.annotations.DataProvider; 52 import org.testng.annotations.Test; 53 import javax.net.ssl.SSLContext; 54 import static java.nio.charset.StandardCharsets.UTF_8; 55 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString; 56 import static org.testng.Assert.assertEquals; 57 import static org.testng.Assert.assertThrows; 58 import static org.testng.Assert.assertTrue; 59 60 /* 61 * @test 62 * @summary Basic tests for Flow adapter Subscribers 63 * @modules java.base/sun.net.www.http 64 * jdk.incubator.httpclient/jdk.incubator.http.internal.common 65 * jdk.incubator.httpclient/jdk.incubator.http.internal.frame 66 * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack 67 * java.logging 68 * jdk.httpserver 69 * @library /lib/testlibrary http2/server 70 * @build Http2TestServer 71 * @build jdk.testlibrary.SimpleSSLContext 72 * @run testng/othervm FlowAdapterSubscriberTest 73 */ 74 75 public class FlowAdapterSubscriberTest { 76 77 SSLContext sslContext; 78 HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] 79 HttpsServer httpsTestServer; // HTTPS/1.1 80 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) 81 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) 82 String httpURI; 83 String httpsURI; 84 String http2URI; 85 String https2URI; 86 87 @DataProvider(name = "uris") 88 public Object[][] variants() { 89 return new Object[][]{ 90 { httpURI }, 91 { httpsURI }, 92 { http2URI }, 93 { https2URI }, 94 }; 95 } 96 97 static final Class<NullPointerException> NPE = NullPointerException.class; 98 99 @Test 100 public void testNull() { 101 assertThrows(NPE, () -> BodyHandler.fromSubscriber(null)); 102 assertThrows(NPE, () -> BodyHandler.fromSubscriber(null, Function.identity())); 103 assertThrows(NPE, () -> BodyHandler.fromSubscriber(new ListSubscriber(), null)); 104 assertThrows(NPE, () -> BodyHandler.fromSubscriber(null, null)); 105 106 assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null)); 107 assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null, Function.identity())); 108 assertThrows(NPE, () -> BodySubscriber.fromSubscriber(new ListSubscriber(), null)); 109 assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null, null)); 110 } 111 112 // List<ByteBuffer> 113 114 @Test(dataProvider = "uris") 115 void testListWithFinisher(String url) { 116 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 117 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 118 .POST(fromString("May the luck of the Irish be with you!")).build(); 119 120 ListSubscriber subscriber = new ListSubscriber(); 121 HttpResponse<String> response = client.sendAsync(request, 122 BodyHandler.fromSubscriber(subscriber, Supplier::get)).join(); 123 String text = response.body(); 124 System.out.println(text); 125 assertEquals(response.statusCode(), 200); 126 assertEquals(text, "May the luck of the Irish be with you!"); 127 } 128 129 @Test(dataProvider = "uris") 130 void testListWithoutFinisher(String url) { 131 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 132 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 133 .POST(fromString("May the luck of the Irish be with you!")).build(); 134 135 ListSubscriber subscriber = new ListSubscriber(); 136 HttpResponse<Void> response = client.sendAsync(request, 137 BodyHandler.fromSubscriber(subscriber)).join(); 138 String text = subscriber.get(); 139 System.out.println(text); 140 assertEquals(response.statusCode(), 200); 141 assertEquals(text, "May the luck of the Irish be with you!"); 142 } 143 144 @Test(dataProvider = "uris") 145 void testListWithFinisherBlocking(String url) throws Exception { 146 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 147 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 148 .POST(fromString("May the luck of the Irish be with you!")).build(); 149 150 ListSubscriber subscriber = new ListSubscriber(); 151 HttpResponse<String> response = client.send(request, 152 BodyHandler.fromSubscriber(subscriber, Supplier::get)); 153 String text = response.body(); 154 System.out.println(text); 155 assertEquals(response.statusCode(), 200); 156 assertEquals(text, "May the luck of the Irish be with you!"); 157 } 158 159 @Test(dataProvider = "uris") 160 void testListWithoutFinisherBlocking(String url) throws Exception { 161 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 162 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 163 .POST(fromString("May the luck of the Irish be with you!")).build(); 164 165 ListSubscriber subscriber = new ListSubscriber(); 166 HttpResponse<Void> response = client.send(request, 167 BodyHandler.fromSubscriber(subscriber)); 168 String text = subscriber.get(); 169 System.out.println(text); 170 assertEquals(response.statusCode(), 200); 171 assertEquals(text, "May the luck of the Irish be with you!"); 172 } 173 174 // Collection<ByteBuffer> 175 176 @Test(dataProvider = "uris") 177 void testCollectionWithFinisher(String url) { 178 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 179 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 180 .POST(fromString("What's the craic?")).build(); 181 182 CollectionSubscriber subscriber = new CollectionSubscriber(); 183 HttpResponse<String> response = client.sendAsync(request, 184 BodyHandler.fromSubscriber(subscriber, CollectionSubscriber::get)).join(); 185 String text = response.body(); 186 System.out.println(text); 187 assertEquals(response.statusCode(), 200); 188 assertEquals(text, "What's the craic?"); 189 } 190 191 @Test(dataProvider = "uris") 192 void testCollectionWithoutFinisher(String url) { 193 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 194 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 195 .POST(fromString("What's the craic?")).build(); 196 197 CollectionSubscriber subscriber = new CollectionSubscriber(); 198 HttpResponse<Void> response = client.sendAsync(request, 199 BodyHandler.fromSubscriber(subscriber)).join(); 200 String text = subscriber.get(); 201 System.out.println(text); 202 assertEquals(response.statusCode(), 200); 203 assertEquals(text, "What's the craic?"); 204 } 205 206 @Test(dataProvider = "uris") 207 void testCollectionWithFinisherBlocking(String url) throws Exception { 208 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 209 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 210 .POST(fromString("What's the craic?")).build(); 211 212 CollectionSubscriber subscriber = new CollectionSubscriber(); 213 HttpResponse<String> response = client.send(request, 214 BodyHandler.fromSubscriber(subscriber, CollectionSubscriber::get)); 215 String text = response.body(); 216 System.out.println(text); 217 assertEquals(response.statusCode(), 200); 218 assertEquals(text, "What's the craic?"); 219 } 220 221 @Test(dataProvider = "uris") 222 void testCollectionWithoutFinisheBlocking(String url) throws Exception { 223 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 224 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 225 .POST(fromString("What's the craic?")).build(); 226 227 CollectionSubscriber subscriber = new CollectionSubscriber(); 228 HttpResponse<Void> response = client.send(request, 229 BodyHandler.fromSubscriber(subscriber)); 230 String text = subscriber.get(); 231 System.out.println(text); 232 assertEquals(response.statusCode(), 200); 233 assertEquals(text, "What's the craic?"); 234 } 235 236 // Iterable<ByteBuffer> 237 238 @Test(dataProvider = "uris") 239 void testIterableWithFinisher(String url) { 240 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 241 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 242 .POST(fromString("We're sucking diesel now!")).build(); 243 244 IterableSubscriber subscriber = new IterableSubscriber(); 245 HttpResponse<String> response = client.sendAsync(request, 246 BodyHandler.fromSubscriber(subscriber, Supplier::get)).join(); 247 String text = response.body(); 248 System.out.println(text); 249 assertEquals(response.statusCode(), 200); 250 assertEquals(text, "We're sucking diesel now!"); 251 } 252 253 @Test(dataProvider = "uris") 254 void testIterableWithoutFinisher(String url) { 255 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 256 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 257 .POST(fromString("We're sucking diesel now!")).build(); 258 259 IterableSubscriber subscriber = new IterableSubscriber(); 260 HttpResponse<Void> response = client.sendAsync(request, 261 BodyHandler.fromSubscriber(subscriber)).join(); 262 String text = subscriber.get(); 263 System.out.println(text); 264 assertEquals(response.statusCode(), 200); 265 assertEquals(text, "We're sucking diesel now!"); 266 } 267 268 @Test(dataProvider = "uris") 269 void testIterableWithFinisherBlocking(String url) throws Exception { 270 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 271 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 272 .POST(fromString("We're sucking diesel now!")).build(); 273 274 IterableSubscriber subscriber = new IterableSubscriber(); 275 HttpResponse<String> response = client.send(request, 276 BodyHandler.fromSubscriber(subscriber, Supplier::get)); 277 String text = response.body(); 278 System.out.println(text); 279 assertEquals(response.statusCode(), 200); 280 assertEquals(text, "We're sucking diesel now!"); 281 } 282 283 @Test(dataProvider = "uris") 284 void testIterableWithoutFinisherBlocking(String url) throws Exception{ 285 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 286 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 287 .POST(fromString("We're sucking diesel now!")).build(); 288 289 IterableSubscriber subscriber = new IterableSubscriber(); 290 HttpResponse<Void> response = client.send(request, 291 BodyHandler.fromSubscriber(subscriber)); 292 String text = subscriber.get(); 293 System.out.println(text); 294 assertEquals(response.statusCode(), 200); 295 assertEquals(text, "We're sucking diesel now!"); 296 } 297 298 // Subscriber<Object> 299 300 @Test(dataProvider = "uris") 301 void testObjectWithFinisher(String url) { 302 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 303 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 304 .POST(fromString("May the wind always be at your back.")).build(); 305 306 ObjectSubscriber subscriber = new ObjectSubscriber(); 307 HttpResponse<String> response = client.sendAsync(request, 308 BodyHandler.fromSubscriber(subscriber, ObjectSubscriber::get)).join(); 309 String text = response.body(); 310 System.out.println(text); 311 assertEquals(response.statusCode(), 200); 312 assertTrue(text.length() != 0); // what else can be asserted! 313 } 314 315 @Test(dataProvider = "uris") 316 void testObjectWithoutFinisher(String url) { 317 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 318 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 319 .POST(fromString("May the wind always be at your back.")).build(); 320 321 ObjectSubscriber subscriber = new ObjectSubscriber(); 322 HttpResponse<Void> response = client.sendAsync(request, 323 BodyHandler.fromSubscriber(subscriber)).join(); 324 String text = subscriber.get(); 325 System.out.println(text); 326 assertEquals(response.statusCode(), 200); 327 assertTrue(text.length() != 0); // what else can be asserted! 328 } 329 330 @Test(dataProvider = "uris") 331 void testObjectWithFinisherBlocking(String url) throws Exception { 332 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 333 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 334 .POST(fromString("May the wind always be at your back.")).build(); 335 336 ObjectSubscriber subscriber = new ObjectSubscriber(); 337 HttpResponse<String> response = client.send(request, 338 BodyHandler.fromSubscriber(subscriber, ObjectSubscriber::get)); 339 String text = response.body(); 340 System.out.println(text); 341 assertEquals(response.statusCode(), 200); 342 assertTrue(text.length() != 0); // what else can be asserted! 343 } 344 345 @Test(dataProvider = "uris") 346 void testObjectWithoutFinisherBlocking(String url) throws Exception { 347 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 348 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 349 .POST(fromString("May the wind always be at your back.")).build(); 350 351 ObjectSubscriber subscriber = new ObjectSubscriber(); 352 HttpResponse<Void> response = client.send(request, 353 BodyHandler.fromSubscriber(subscriber)); 354 String text = subscriber.get(); 355 System.out.println(text); 356 assertEquals(response.statusCode(), 200); 357 assertTrue(text.length() != 0); // what else can be asserted! 358 } 359 360 /** An abstract Subscriber that converts all received data into a String. */ 361 static abstract class AbstractSubscriber implements Supplier<String> { 362 protected volatile Flow.Subscription subscription; 363 protected volatile ByteArrayOutputStream baos = new ByteArrayOutputStream(); 364 protected volatile String text; 365 366 public void onSubscribe(Flow.Subscription subscription) { 367 this.subscription = subscription; 368 subscription.request(Long.MAX_VALUE); 369 } 370 public void onError(Throwable throwable) { 371 throw new RuntimeException(throwable); 372 } 373 public void onComplete() { 374 text = new String(baos.toByteArray(), UTF_8); 375 } 376 @Override public String get() { return text; } 377 } 378 379 static class ListSubscriber extends AbstractSubscriber 380 implements Flow.Subscriber<List<ByteBuffer>>, Supplier<String> 381 { 382 @Override public void onNext(List<ByteBuffer> item) { 383 for (ByteBuffer bb : item) { 384 byte[] ba = new byte[bb.remaining()]; 385 bb.get(ba); 386 uncheckedWrite(baos, ba); 387 } 388 } 389 } 390 391 static class CollectionSubscriber extends AbstractSubscriber 392 implements Flow.Subscriber<Collection<ByteBuffer>>, Supplier<String> 393 { 394 @Override public void onNext(Collection<ByteBuffer> item) { 395 for (ByteBuffer bb : item) { 396 byte[] ba = new byte[bb.remaining()]; 397 bb.get(ba); 398 uncheckedWrite(baos, ba); 399 } 400 } 401 } 402 403 static class IterableSubscriber extends AbstractSubscriber 404 implements Flow.Subscriber<Iterable<ByteBuffer>>, Supplier<String> 405 { 406 @Override public void onNext(Iterable<ByteBuffer> item) { 407 for (ByteBuffer bb : item) { 408 byte[] ba = new byte[bb.remaining()]; 409 bb.get(ba); 410 uncheckedWrite(baos, ba); 411 } 412 } 413 } 414 415 static class ObjectSubscriber extends AbstractSubscriber 416 implements Flow.Subscriber<Object>, Supplier<String> 417 { 418 @Override public void onNext(Object item) { 419 // What can anyone do with Object, cast or toString it ? 420 uncheckedWrite(baos, item.toString().getBytes(UTF_8)); 421 } 422 } 423 424 static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) { 425 try { 426 baos.write(ba); 427 } catch (IOException e) { 428 throw new UncheckedIOException(e); 429 } 430 } 431 432 @BeforeTest 433 public void setup() throws Exception { 434 sslContext = new SimpleSSLContext().get(); 435 if (sslContext == null) 436 throw new AssertionError("Unexpected null sslContext"); 437 438 InetSocketAddress sa = new InetSocketAddress("localhost", 0); 439 httpTestServer = HttpServer.create(sa, 0); 440 httpTestServer.createContext("/http1/echo", new Http1EchoHandler()); 441 httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo"; 442 443 httpsTestServer = HttpsServer.create(sa, 0); 444 httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); 445 httpsTestServer.createContext("/https1/echo", new Http1EchoHandler()); 446 httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo"; 447 448 http2TestServer = new Http2TestServer("127.0.0.1", false, 0); 449 http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo"); 450 int port = http2TestServer.getAddress().getPort(); 451 http2URI = "http://127.0.0.1:" + port + "/http2/echo"; 452 453 https2TestServer = new Http2TestServer("127.0.0.1", true, 0); 454 https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo"); 455 port = https2TestServer.getAddress().getPort(); 456 https2URI = "https://127.0.0.1:" + port + "/https2/echo"; 457 458 httpTestServer.start(); 459 httpsTestServer.start(); 460 http2TestServer.start(); 461 https2TestServer.start(); 462 } 463 464 @AfterTest 465 public void teardown() throws Exception { 466 httpTestServer.stop(0); 467 httpsTestServer.stop(0); 468 http2TestServer.stop(); 469 https2TestServer.stop(); 470 } 471 472 static class Http1EchoHandler implements HttpHandler { 473 @Override 474 public void handle(HttpExchange t) throws IOException { 475 try (InputStream is = t.getRequestBody(); 476 OutputStream os = t.getResponseBody()) { 477 byte[] bytes = is.readAllBytes(); 478 t.sendResponseHeaders(200, bytes.length); 479 os.write(bytes); 480 } 481 } 482 } 483 484 static class Http2EchoHandler implements Http2Handler { 485 @Override 486 public void handle(Http2TestExchange t) throws IOException { 487 try (InputStream is = t.getRequestBody(); 488 OutputStream os = t.getResponseBody()) { 489 byte[] bytes = is.readAllBytes(); 490 t.sendResponseHeaders(200, bytes.length); 491 os.write(bytes); 492 } 493 } 494 } 495 }