1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.IOException;
  25 import java.io.InputStream;
  26 import java.io.OutputStream;
  27 import java.net.InetSocketAddress;
  28 import java.net.URI;
  29 import java.nio.ByteBuffer;
  30 import java.nio.MappedByteBuffer;
  31 import java.util.Arrays;
  32 import java.util.concurrent.Flow;
  33 import java.util.concurrent.atomic.AtomicBoolean;
  34 import java.util.concurrent.atomic.AtomicInteger;
  35 import java.util.concurrent.atomic.AtomicLong;
  36 import com.sun.net.httpserver.HttpExchange;
  37 import com.sun.net.httpserver.HttpHandler;
  38 import com.sun.net.httpserver.HttpServer;
  39 import com.sun.net.httpserver.HttpsConfigurator;
  40 import com.sun.net.httpserver.HttpsServer;
  41 import jdk.incubator.http.HttpClient;
  42 import jdk.incubator.http.HttpRequest;
  43 import jdk.incubator.http.HttpResponse;
  44 import jdk.testlibrary.SimpleSSLContext;
  45 import org.testng.annotations.AfterTest;
  46 import org.testng.annotations.BeforeTest;
  47 import org.testng.annotations.DataProvider;
  48 import org.testng.annotations.Test;
  49 import javax.net.ssl.SSLContext;
  50 import static java.util.stream.Collectors.joining;
  51 import static java.nio.charset.StandardCharsets.UTF_8;
  52 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromPublisher;
  53 import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
  54 import static org.testng.Assert.assertEquals;
  55 import static org.testng.Assert.assertThrows;
  56 
  57 /*
  58  * @test
  59  * @summary Basic tests for Flow adapter Publishers
  60  * @modules java.base/sun.net.www.http
  61  *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
  62  *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
  63  *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
  64  *          java.logging
  65  *          jdk.httpserver
  66  * @library /lib/testlibrary http2/server
  67  * @build Http2TestServer
  68  * @build jdk.testlibrary.SimpleSSLContext
  69  * @run testng/othervm FlowAdapterPublisherTest
  70  */
  71 
  72 public class FlowAdapterPublisherTest {
  73 
  74     SSLContext sslContext;
  75     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
  76     HttpsServer httpsTestServer;       // HTTPS/1.1
  77     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
  78     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
  79     String httpURI;
  80     String httpsURI;
  81     String http2URI;
  82     String https2URI;
  83 
  84     @DataProvider(name = "uris")
  85     public Object[][] variants() {
  86         return new Object[][]{
  87                 { httpURI   },
  88                 { httpsURI  },
  89                 { http2URI  },
  90                 { https2URI },
  91         };
  92     }
  93 
  94     static final Class<NullPointerException> NPE = NullPointerException.class;
  95     static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
  96 
  97     @Test
  98     public void testAPIExceptions() {
  99         assertThrows(NPE, () -> fromPublisher(null));
 100         assertThrows(NPE, () -> fromPublisher(null, 1));
 101         assertThrows(IAE, () -> fromPublisher(new BBPublisher(), 0));
 102         assertThrows(IAE, () -> fromPublisher(new BBPublisher(), -1));
 103         assertThrows(IAE, () -> fromPublisher(new BBPublisher(), Long.MIN_VALUE));
 104     }
 105 
 106     //  Flow.Publisher<ByteBuffer>
 107 
 108     @Test(dataProvider = "uris")
 109     void testByteBufferPublisherUnknownLength(String url) {
 110         String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
 111                 "when the ", "rain gets ", "warmer." };
 112         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 113         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 114                 .POST(fromPublisher(new BBPublisher(body))).build();
 115 
 116         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
 117         String text = response.body();
 118         System.out.println(text);
 119         assertEquals(response.statusCode(), 200);
 120         assertEquals(text, Arrays.stream(body).collect(joining()));
 121     }
 122 
 123     @Test(dataProvider = "uris")
 124     void testByteBufferPublisherFixedLength(String url) {
 125         String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
 126                 "when the ", "rain gets ", "warmer." };
 127         int cl = Arrays.stream(body).mapToInt(String::length).sum();
 128         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 129         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 130                 .POST(fromPublisher(new BBPublisher(body), cl)).build();
 131 
 132         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
 133         String text = response.body();
 134         System.out.println(text);
 135         assertEquals(response.statusCode(), 200);
 136         assertEquals(text, Arrays.stream(body).collect(joining()));
 137     }
 138 
 139     // Flow.Publisher<MappedByteBuffer>
 140 
 141     @Test(dataProvider = "uris")
 142     void testMappedByteBufferPublisherUnknownLength(String url) {
 143         String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",
 144                 "Irish from ", "ruling the ", "world." };
 145         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 146         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 147                 .POST(fromPublisher(new MBBPublisher(body))).build();
 148 
 149         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
 150         String text = response.body();
 151         System.out.println(text);
 152         assertEquals(response.statusCode(), 200);
 153         assertEquals(text, Arrays.stream(body).collect(joining()));
 154     }
 155 
 156     @Test(dataProvider = "uris")
 157     void testMappedByteBufferPublisherFixedLength(String url) {
 158         String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",
 159                 "Irish from ", "ruling the ", "world." };
 160         int cl = Arrays.stream(body).mapToInt(String::length).sum();
 161         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
 162         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
 163                 .POST(fromPublisher(new MBBPublisher(body), cl)).build();
 164 
 165         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
 166         String text = response.body();
 167         System.out.println(text);
 168         assertEquals(response.statusCode(), 200);
 169         assertEquals(text, Arrays.stream(body).collect(joining()));
 170     }
 171 
 172     static class BBPublisher extends AbstractPublisher
 173         implements Flow.Publisher<ByteBuffer>
 174     {
 175         BBPublisher(String... bodyParts) { super(bodyParts); }
 176 
 177         @Override
 178         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 179             this.subscriber = subscriber;
 180             subscriber.onSubscribe(new InternalSubscription());
 181         }
 182     }
 183 
 184     static class MBBPublisher extends AbstractPublisher
 185         implements Flow.Publisher<MappedByteBuffer>
 186     {
 187         MBBPublisher(String... bodyParts) { super(bodyParts); }
 188 
 189         @Override
 190         public void subscribe(Flow.Subscriber<? super MappedByteBuffer> subscriber) {
 191             this.subscriber = subscriber;
 192             subscriber.onSubscribe(new InternalSubscription());
 193         }
 194     }
 195 
 196     static abstract class AbstractPublisher {
 197         private final String[] bodyParts;
 198         protected volatile Flow.Subscriber subscriber;
 199 
 200         AbstractPublisher(String... bodyParts) {
 201             this.bodyParts = bodyParts;
 202         }
 203 
 204         class InternalSubscription implements Flow.Subscription {
 205 
 206             private final AtomicLong demand = new AtomicLong();
 207             private final AtomicBoolean cancelled = new AtomicBoolean();
 208             private volatile int position;
 209 
 210             private static final int IDLE    =  1;
 211             private static final int PUSHING =  2;
 212             private static final int AGAIN   =  4;
 213             private final AtomicInteger state = new AtomicInteger(IDLE);
 214 
 215             @Override
 216             public void request(long n) {
 217                 if (n <= 0L) {
 218                     subscriber.onError(new IllegalArgumentException(
 219                             "non-positive subscription request"));
 220                     return;
 221                 }
 222                 if (cancelled.get()) {
 223                     return;
 224                 }
 225 
 226                 while (true) {
 227                     long prev = demand.get(), d;
 228                     if ((d = prev + n) < prev) // saturate
 229                         d = Long.MAX_VALUE;
 230                     if (demand.compareAndSet(prev, d))
 231                         break;
 232                 }
 233 
 234                 while (true) {
 235                     int s = state.get();
 236                     if (s == IDLE) {
 237                         if (state.compareAndSet(IDLE, PUSHING)) {
 238                             while (true) {
 239                                 push();
 240                                 if (state.compareAndSet(PUSHING, IDLE))
 241                                     return;
 242                                 else if (state.compareAndSet(AGAIN, PUSHING))
 243                                     continue;
 244                             }
 245                         }
 246                     } else if (s == PUSHING) {
 247                         if (state.compareAndSet(PUSHING, AGAIN))
 248                             return;
 249                     } else if (s == AGAIN){
 250                         // do nothing, the pusher will already rerun
 251                         return;
 252                     } else {
 253                         throw new AssertionError("Unknown state:" + s);
 254                     }
 255                 }
 256             }
 257 
 258             private void push() {
 259                 long prev;
 260                 while ((prev = demand.get()) > 0) {
 261                     if (!demand.compareAndSet(prev, prev -1))
 262                         continue;
 263 
 264                     int index = position;
 265                     if (index < bodyParts.length) {
 266                         position++;
 267                         subscriber.onNext(ByteBuffer.wrap(bodyParts[index].getBytes(UTF_8)));
 268                     }
 269                 }
 270 
 271                 if (position == bodyParts.length && !cancelled.get()) {
 272                     cancelled.set(true);
 273                     subscriber.onComplete();
 274                 }
 275             }
 276 
 277             @Override
 278             public void cancel() {
 279                 if (cancelled.compareAndExchange(false, true))
 280                     return;  // already cancelled
 281             }
 282         }
 283     }
 284 
 285     @BeforeTest
 286     public void setup() throws Exception {
 287         sslContext = new SimpleSSLContext().get();
 288         if (sslContext == null)
 289             throw new AssertionError("Unexpected null sslContext");
 290 
 291         InetSocketAddress sa = new InetSocketAddress("localhost", 0);
 292         httpTestServer = HttpServer.create(sa, 0);
 293         httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
 294         httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo";
 295 
 296         httpsTestServer = HttpsServer.create(sa, 0);
 297         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
 298         httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
 299         httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo";
 300 
 301         http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
 302         http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
 303         int port = http2TestServer.getAddress().getPort();
 304         http2URI = "http://127.0.0.1:" + port + "/http2/echo";
 305 
 306         https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
 307         https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
 308         port = https2TestServer.getAddress().getPort();
 309         https2URI = "https://127.0.0.1:" + port + "/https2/echo";
 310 
 311         httpTestServer.start();
 312         httpsTestServer.start();
 313         http2TestServer.start();
 314         https2TestServer.start();
 315     }
 316 
 317     @AfterTest
 318     public void teardown() throws Exception {
 319         httpTestServer.stop(0);
 320         httpsTestServer.stop(0);
 321         http2TestServer.stop();
 322         https2TestServer.stop();
 323     }
 324 
 325     static class Http1EchoHandler implements HttpHandler {
 326         @Override
 327         public void handle(HttpExchange t) throws IOException {
 328             try (InputStream is = t.getRequestBody();
 329                  OutputStream os = t.getResponseBody()) {
 330                 byte[] bytes = is.readAllBytes();
 331                 t.sendResponseHeaders(200, bytes.length);
 332                 os.write(bytes);
 333             }
 334         }
 335     }
 336 
 337     static class Http2EchoHandler implements Http2Handler {
 338         @Override
 339         public void handle(Http2TestExchange t) throws IOException {
 340             try (InputStream is = t.getRequestBody();
 341                  OutputStream os = t.getResponseBody()) {
 342                 byte[] bytes = is.readAllBytes();
 343                 t.sendResponseHeaders(200, bytes.length);
 344                 os.write(bytes);
 345             }
 346         }
 347     }
 348 }