1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.ByteArrayOutputStream;
  25 import java.io.IOException;
  26 import java.io.InputStream;
  27 import java.io.OutputStream;
  28 import java.io.UncheckedIOException;
  29 import java.net.InetSocketAddress;
  30 import java.net.URI;
  31 import java.nio.ByteBuffer;
  32 import java.nio.MappedByteBuffer;
  33 import java.util.Collection;
  34 import java.util.List;
  35 import java.util.concurrent.Flow;
  36 import java.util.function.Function;
  37 import java.util.function.Supplier;
  38 import com.sun.net.httpserver.HttpExchange;
  39 import com.sun.net.httpserver.HttpHandler;
  40 import com.sun.net.httpserver.HttpServer;
  41 import com.sun.net.httpserver.HttpsConfigurator;
  42 import com.sun.net.httpserver.HttpsServer;
  43 import jdk.incubator.http.HttpClient;
  44 import jdk.incubator.http.HttpRequest;
  45 import jdk.incubator.http.HttpResponse;
  46 import jdk.incubator.http.HttpResponse.BodyHandler;
  47 import jdk.incubator.http.HttpResponse.BodySubscriber;
  48 import jdk.testlibrary.SimpleSSLContext;
  49 import org.testng.annotations.AfterTest;
  50 import org.testng.annotations.BeforeTest;
  51 import org.testng.annotations.DataProvider;
  52 import org.testng.annotations.Test;
  53 import javax.net.ssl.SSLContext;
  54 import static java.nio.charset.StandardCharsets.UTF_8;
  55 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
  56 import static org.testng.Assert.assertEquals;
  57 import static org.testng.Assert.assertThrows;
  58 import static org.testng.Assert.assertTrue;
  59 
  60 /*
  61  * @test
  62  * @summary Basic tests for Flow adapter Subscribers
  63  * @modules java.base/sun.net.www.http
  64  *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
  65  *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
  66  *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
  67  *          java.logging
  68  *          jdk.httpserver
  69  * @library /lib/testlibrary http2/server
  70  * @build Http2TestServer
  71  * @build jdk.testlibrary.SimpleSSLContext
  72  * @run testng/othervm FlowAdapterSubscriberTest
  73  */
  74 
  75 public class FlowAdapterSubscriberTest {
  76 
  77     SSLContext sslContext;
  78     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
  79     HttpsServer httpsTestServer;       // HTTPS/1.1
  80     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
  81     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
  82     String httpURI;
  83     String httpsURI;
  84     String http2URI;
  85     String https2URI;
  86 
  87     @DataProvider(name = "uris")
  88     public Object[][] variants() {
  89         return new Object[][]{
  90                 { httpURI   },
  91                 { httpsURI  },
  92                 { http2URI  },
  93                 { https2URI },
  94         };
  95     }
  96 
  97     static final Class<NullPointerException> NPE = NullPointerException.class;
  98 
  99     @Test
 100     public void testNull() {
 101         assertThrows(NPE, () -> BodyHandler.fromSubscriber(null));
 102         assertThrows(NPE, () -> BodyHandler.fromSubscriber(null, Function.identity()));
 103         assertThrows(NPE, () -> BodyHandler.fromSubscriber(new ListSubscriber(), null));
 104         assertThrows(NPE, () -> BodyHandler.fromSubscriber(null, null));
 105 
 106         assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null));
 107         assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null, Function.identity()));
 108         assertThrows(NPE, () -> BodySubscriber.fromSubscriber(new ListSubscriber(), null));
 109         assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null, null));
 110     }
 111 
 112     // List<ByteBuffer>
 113 
 114     @Test(dataProvider = "uris")
 115     void testListWithFinisher(String url) {
 116         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 117         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 118                 .POST(fromString("May the luck of the Irish be with you!")).build();
 119 
 120         ListSubscriber subscriber = new ListSubscriber();
 121         HttpResponse<String> response = client.sendAsync(request,
 122                 BodyHandler.fromSubscriber(subscriber, Supplier::get)).join();
 123         String text = response.body();
 124         System.out.println(text);
 125         assertEquals(response.statusCode(), 200);
 126         assertEquals(text, "May the luck of the Irish be with you!");
 127     }
 128 
 129     @Test(dataProvider = "uris")
 130     void testListWithoutFinisher(String url) {
 131         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 132         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 133                 .POST(fromString("May the luck of the Irish be with you!")).build();
 134 
 135         ListSubscriber subscriber = new ListSubscriber();
 136         HttpResponse<Void> response = client.sendAsync(request,
 137                 BodyHandler.fromSubscriber(subscriber)).join();
 138         String text = subscriber.get();
 139         System.out.println(text);
 140         assertEquals(response.statusCode(), 200);
 141         assertEquals(text, "May the luck of the Irish be with you!");
 142     }
 143 
 144     @Test(dataProvider = "uris")
 145     void testListWithFinisherBlocking(String url) throws Exception {
 146         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 147         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 148                 .POST(fromString("May the luck of the Irish be with you!")).build();
 149 
 150         ListSubscriber subscriber = new ListSubscriber();
 151         HttpResponse<String> response = client.send(request,
 152                 BodyHandler.fromSubscriber(subscriber, Supplier::get));
 153         String text = response.body();
 154         System.out.println(text);
 155         assertEquals(response.statusCode(), 200);
 156         assertEquals(text, "May the luck of the Irish be with you!");
 157     }
 158 
 159     @Test(dataProvider = "uris")
 160     void testListWithoutFinisherBlocking(String url) throws Exception {
 161         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 162         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 163                 .POST(fromString("May the luck of the Irish be with you!")).build();
 164 
 165         ListSubscriber subscriber = new ListSubscriber();
 166         HttpResponse<Void> response = client.send(request,
 167                 BodyHandler.fromSubscriber(subscriber));
 168         String text = subscriber.get();
 169         System.out.println(text);
 170         assertEquals(response.statusCode(), 200);
 171         assertEquals(text, "May the luck of the Irish be with you!");
 172     }
 173 
 174     // Collection<ByteBuffer>
 175 
 176     @Test(dataProvider = "uris")
 177     void testCollectionWithFinisher(String url) {
 178         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 179         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 180                 .POST(fromString("What's the craic?")).build();
 181 
 182         CollectionSubscriber subscriber = new CollectionSubscriber();
 183         HttpResponse<String> response = client.sendAsync(request,
 184                 BodyHandler.fromSubscriber(subscriber, CollectionSubscriber::get)).join();
 185         String text = response.body();
 186         System.out.println(text);
 187         assertEquals(response.statusCode(), 200);
 188         assertEquals(text, "What's the craic?");
 189     }
 190 
 191     @Test(dataProvider = "uris")
 192     void testCollectionWithoutFinisher(String url) {
 193         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 194         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 195                 .POST(fromString("What's the craic?")).build();
 196 
 197         CollectionSubscriber subscriber = new CollectionSubscriber();
 198         HttpResponse<Void> response = client.sendAsync(request,
 199                 BodyHandler.fromSubscriber(subscriber)).join();
 200         String text = subscriber.get();
 201         System.out.println(text);
 202         assertEquals(response.statusCode(), 200);
 203         assertEquals(text, "What's the craic?");
 204     }
 205 
 206     @Test(dataProvider = "uris")
 207     void testCollectionWithFinisherBlocking(String url) throws Exception {
 208         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 209         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 210                 .POST(fromString("What's the craic?")).build();
 211 
 212         CollectionSubscriber subscriber = new CollectionSubscriber();
 213         HttpResponse<String> response = client.send(request,
 214                 BodyHandler.fromSubscriber(subscriber, CollectionSubscriber::get));
 215         String text = response.body();
 216         System.out.println(text);
 217         assertEquals(response.statusCode(), 200);
 218         assertEquals(text, "What's the craic?");
 219     }
 220 
 221     @Test(dataProvider = "uris")
 222     void testCollectionWithoutFinisheBlocking(String url) throws Exception {
 223         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 224         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 225                 .POST(fromString("What's the craic?")).build();
 226 
 227         CollectionSubscriber subscriber = new CollectionSubscriber();
 228         HttpResponse<Void> response = client.send(request,
 229                 BodyHandler.fromSubscriber(subscriber));
 230         String text = subscriber.get();
 231         System.out.println(text);
 232         assertEquals(response.statusCode(), 200);
 233         assertEquals(text, "What's the craic?");
 234     }
 235 
 236     // Iterable<ByteBuffer>
 237 
 238     @Test(dataProvider = "uris")
 239     void testIterableWithFinisher(String url) {
 240         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 241         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 242                 .POST(fromString("We're sucking diesel now!")).build();
 243 
 244         IterableSubscriber subscriber = new IterableSubscriber();
 245         HttpResponse<String> response = client.sendAsync(request,
 246                 BodyHandler.fromSubscriber(subscriber, Supplier::get)).join();
 247         String text = response.body();
 248         System.out.println(text);
 249         assertEquals(response.statusCode(), 200);
 250         assertEquals(text, "We're sucking diesel now!");
 251     }
 252 
 253     @Test(dataProvider = "uris")
 254     void testIterableWithoutFinisher(String url) {
 255         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 256         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 257                 .POST(fromString("We're sucking diesel now!")).build();
 258 
 259         IterableSubscriber subscriber = new IterableSubscriber();
 260         HttpResponse<Void> response = client.sendAsync(request,
 261                 BodyHandler.fromSubscriber(subscriber)).join();
 262         String text = subscriber.get();
 263         System.out.println(text);
 264         assertEquals(response.statusCode(), 200);
 265         assertEquals(text, "We're sucking diesel now!");
 266     }
 267 
 268     @Test(dataProvider = "uris")
 269     void testIterableWithFinisherBlocking(String url) throws Exception {
 270         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 271         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 272                 .POST(fromString("We're sucking diesel now!")).build();
 273 
 274         IterableSubscriber subscriber = new IterableSubscriber();
 275         HttpResponse<String> response = client.send(request,
 276                 BodyHandler.fromSubscriber(subscriber, Supplier::get));
 277         String text = response.body();
 278         System.out.println(text);
 279         assertEquals(response.statusCode(), 200);
 280         assertEquals(text, "We're sucking diesel now!");
 281     }
 282 
 283     @Test(dataProvider = "uris")
 284     void testIterableWithoutFinisherBlocking(String url) throws Exception{
 285         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 286         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 287                 .POST(fromString("We're sucking diesel now!")).build();
 288 
 289         IterableSubscriber subscriber = new IterableSubscriber();
 290         HttpResponse<Void> response = client.send(request,
 291                 BodyHandler.fromSubscriber(subscriber));
 292         String text = subscriber.get();
 293         System.out.println(text);
 294         assertEquals(response.statusCode(), 200);
 295         assertEquals(text, "We're sucking diesel now!");
 296     }
 297 
 298     // Subscriber<Object>
 299 
 300     @Test(dataProvider = "uris")
 301     void testObjectWithFinisher(String url) {
 302         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 303         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 304                 .POST(fromString("May the wind always be at your back.")).build();
 305 
 306         ObjectSubscriber subscriber = new ObjectSubscriber();
 307         HttpResponse<String> response = client.sendAsync(request,
 308                 BodyHandler.fromSubscriber(subscriber, ObjectSubscriber::get)).join();
 309         String text = response.body();
 310         System.out.println(text);
 311         assertEquals(response.statusCode(), 200);
 312         assertTrue(text.length() != 0);  // what else can be asserted!
 313     }
 314 
 315     @Test(dataProvider = "uris")
 316     void testObjectWithoutFinisher(String url) {
 317         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 318         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 319                 .POST(fromString("May the wind always be at your back.")).build();
 320 
 321         ObjectSubscriber subscriber = new ObjectSubscriber();
 322         HttpResponse<Void> response = client.sendAsync(request,
 323                 BodyHandler.fromSubscriber(subscriber)).join();
 324         String text = subscriber.get();
 325         System.out.println(text);
 326         assertEquals(response.statusCode(), 200);
 327         assertTrue(text.length() != 0);  // what else can be asserted!
 328     }
 329 
 330     @Test(dataProvider = "uris")
 331     void testObjectWithFinisherBlocking(String url) throws Exception {
 332         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 333         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 334                 .POST(fromString("May the wind always be at your back.")).build();
 335 
 336         ObjectSubscriber subscriber = new ObjectSubscriber();
 337         HttpResponse<String> response = client.send(request,
 338                 BodyHandler.fromSubscriber(subscriber, ObjectSubscriber::get));
 339         String text = response.body();
 340         System.out.println(text);
 341         assertEquals(response.statusCode(), 200);
 342         assertTrue(text.length() != 0);  // what else can be asserted!
 343     }
 344 
 345     @Test(dataProvider = "uris")
 346     void testObjectWithoutFinisherBlocking(String url) throws Exception {
 347         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 348         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 349                 .POST(fromString("May the wind always be at your back.")).build();
 350 
 351         ObjectSubscriber subscriber = new ObjectSubscriber();
 352         HttpResponse<Void> response = client.send(request,
 353                 BodyHandler.fromSubscriber(subscriber));
 354         String text = subscriber.get();
 355         System.out.println(text);
 356         assertEquals(response.statusCode(), 200);
 357         assertTrue(text.length() != 0);  // what else can be asserted!
 358     }
 359 
 360     /** An abstract Subscriber that converts all received data into a String. */
 361     static abstract class AbstractSubscriber implements Supplier<String> {
 362         protected volatile Flow.Subscription subscription;
 363         protected volatile ByteArrayOutputStream baos = new ByteArrayOutputStream();
 364         protected volatile String text;
 365 
 366         public void onSubscribe(Flow.Subscription subscription) {
 367             this.subscription = subscription;
 368             subscription.request(Long.MAX_VALUE);
 369         }
 370         public void onError(Throwable throwable) {
 371             throw new RuntimeException(throwable);
 372         }
 373         public void onComplete() {
 374             text = new String(baos.toByteArray(), UTF_8);
 375         }
 376         @Override public String get() { return text; }
 377     }
 378 
 379     static class ListSubscriber extends AbstractSubscriber
 380         implements Flow.Subscriber<List<ByteBuffer>>, Supplier<String>
 381     {
 382         @Override public void onNext(List<ByteBuffer> item) {
 383             for (ByteBuffer bb : item) {
 384                 byte[] ba = new byte[bb.remaining()];
 385                 bb.get(ba);
 386                 uncheckedWrite(baos, ba);
 387             }
 388         }
 389     }
 390 
 391     static class CollectionSubscriber extends AbstractSubscriber
 392         implements Flow.Subscriber<Collection<ByteBuffer>>, Supplier<String>
 393     {
 394         @Override public void onNext(Collection<ByteBuffer> item) {
 395             for (ByteBuffer bb : item) {
 396                 byte[] ba = new byte[bb.remaining()];
 397                 bb.get(ba);
 398                 uncheckedWrite(baos, ba);
 399             }
 400         }
 401     }
 402 
 403     static class IterableSubscriber extends AbstractSubscriber
 404         implements Flow.Subscriber<Iterable<ByteBuffer>>, Supplier<String>
 405     {
 406         @Override public void onNext(Iterable<ByteBuffer> item) {
 407             for (ByteBuffer bb : item) {
 408                 byte[] ba = new byte[bb.remaining()];
 409                 bb.get(ba);
 410                 uncheckedWrite(baos, ba);
 411             }
 412         }
 413     }
 414 
 415     static class ObjectSubscriber extends AbstractSubscriber
 416         implements Flow.Subscriber<Object>, Supplier<String>
 417     {
 418         @Override public void onNext(Object item) {
 419             // What can anyone do with Object, cast or toString it ?
 420             uncheckedWrite(baos, item.toString().getBytes(UTF_8));
 421         }
 422     }
 423 
 424     static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
 425         try {
 426             baos.write(ba);
 427         } catch (IOException e) {
 428             throw new UncheckedIOException(e);
 429         }
 430     }
 431 
 432     @BeforeTest
 433     public void setup() throws Exception {
 434         sslContext = new SimpleSSLContext().get();
 435         if (sslContext == null)
 436             throw new AssertionError("Unexpected null sslContext");
 437 
 438         InetSocketAddress sa = new InetSocketAddress("localhost", 0);
 439         httpTestServer = HttpServer.create(sa, 0);
 440         httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
 441         httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo";
 442 
 443         httpsTestServer = HttpsServer.create(sa, 0);
 444         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
 445         httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
 446         httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo";
 447 
 448         http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
 449         http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
 450         int port = http2TestServer.getAddress().getPort();
 451         http2URI = "http://127.0.0.1:" + port + "/http2/echo";
 452 
 453         https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
 454         https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
 455         port = https2TestServer.getAddress().getPort();
 456         https2URI = "https://127.0.0.1:" + port + "/https2/echo";
 457 
 458         httpTestServer.start();
 459         httpsTestServer.start();
 460         http2TestServer.start();
 461         https2TestServer.start();
 462     }
 463 
 464     @AfterTest
 465     public void teardown() throws Exception {
 466         httpTestServer.stop(0);
 467         httpsTestServer.stop(0);
 468         http2TestServer.stop();
 469         https2TestServer.stop();
 470     }
 471 
 472     static class Http1EchoHandler implements HttpHandler {
 473         @Override
 474         public void handle(HttpExchange t) throws IOException {
 475             try (InputStream is = t.getRequestBody();
 476                  OutputStream os = t.getResponseBody()) {
 477                 byte[] bytes = is.readAllBytes();
 478                 t.sendResponseHeaders(200, bytes.length);
 479                 os.write(bytes);
 480             }
 481         }
 482     }
 483 
 484     static class Http2EchoHandler implements Http2Handler {
 485         @Override
 486         public void handle(Http2TestExchange t) throws IOException {
 487             try (InputStream is = t.getRequestBody();
 488                  OutputStream os = t.getResponseBody()) {
 489                 byte[] bytes = is.readAllBytes();
 490                 t.sendResponseHeaders(200, bytes.length);
 491                 os.write(bytes);
 492             }
 493         }
 494     }
 495 }