1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test
  26  * @summary Checks correct handling of Publishers that call onComplete without demand
  27  * @modules java.base/sun.net.www.http
  28  *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
  29  *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
  30  *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
  31  *          java.logging
  32  *          jdk.httpserver
  33  * @library /lib/testlibrary http2/server
  34  * @build Http2TestServer
  35  * @build jdk.testlibrary.SimpleSSLContext
  36  * @run testng/othervm CustomRequestPublisher
  37  */
  38 
  39 import com.sun.net.httpserver.HttpExchange;
  40 import com.sun.net.httpserver.HttpHandler;
  41 import com.sun.net.httpserver.HttpServer;
  42 import com.sun.net.httpserver.HttpsConfigurator;
  43 import com.sun.net.httpserver.HttpsServer;
  44 import java.io.IOException;
  45 import java.io.InputStream;
  46 import java.io.OutputStream;
  47 import java.net.InetSocketAddress;
  48 import java.net.URI;
  49 import java.nio.ByteBuffer;
  50 import java.util.Arrays;
  51 import java.util.concurrent.CompletableFuture;
  52 import java.util.concurrent.Flow;
  53 import java.util.concurrent.atomic.AtomicBoolean;
  54 import java.util.concurrent.atomic.AtomicInteger;
  55 import java.util.concurrent.atomic.AtomicLong;
  56 import java.util.function.Supplier;
  57 import java.util.stream.Collectors;
  58 import javax.net.ssl.SSLContext;
  59 import jdk.incubator.http.HttpClient;
  60 import jdk.incubator.http.HttpRequest;
  61 import jdk.incubator.http.HttpResponse;
  62 import jdk.testlibrary.SimpleSSLContext;
  63 import org.testng.annotations.AfterTest;
  64 import org.testng.annotations.BeforeTest;
  65 import org.testng.annotations.DataProvider;
  66 import org.testng.annotations.Test;
  67 import static java.lang.System.out;
  68 import static java.nio.charset.StandardCharsets.US_ASCII;
  69 import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
  70 import static org.testng.Assert.assertEquals;
  71 import static org.testng.Assert.assertTrue;
  72 
  73 public class CustomRequestPublisher {
  74 
  75     SSLContext sslContext;
  76     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
  77     HttpsServer httpsTestServer;       // HTTPS/1.1
  78     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
  79     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
  80     String httpURI;
  81     String httpsURI;
  82     String http2URI;
  83     String https2URI;
  84 
  85     @DataProvider(name = "variants")
  86     public Object[][] variants() {
  87         Supplier<BodyPublisher> fixedSupplier   = () -> new FixedLengthBodyPublisher();
  88         Supplier<BodyPublisher> unknownSupplier = () -> new UnknownLengthBodyPublisher();
  89 
  90         return new Object[][]{
  91                 { httpURI,   fixedSupplier,   false },
  92                 { httpURI,   unknownSupplier, false },
  93                 { httpsURI,  fixedSupplier,   false },
  94                 { httpsURI,  unknownSupplier, false },
  95                 { http2URI,  fixedSupplier,   false },
  96                 { http2URI,  unknownSupplier, false },
  97                 { https2URI, fixedSupplier,   false,},
  98                 { https2URI, unknownSupplier, false },
  99 
 100                 { httpURI,   fixedSupplier,   true },
 101                 { httpURI,   unknownSupplier, true },
 102                 { httpsURI,  fixedSupplier,   true },
 103                 { httpsURI,  unknownSupplier, true },
 104                 { http2URI,  fixedSupplier,   true },
 105                 { http2URI,  unknownSupplier, true },
 106                 { https2URI, fixedSupplier,   true,},
 107                 { https2URI, unknownSupplier, true },
 108         };
 109     }
 110 
 111     static final int ITERATION_COUNT = 10;
 112 
 113     @Test(dataProvider = "variants")
 114     void test(String uri, Supplier<BodyPublisher> bpSupplier, boolean sameClient)
 115             throws Exception
 116     {
 117         HttpClient client = null;
 118         for (int i=0; i< ITERATION_COUNT; i++) {
 119             if (!sameClient || client == null)
 120                 client = HttpClient.newBuilder().sslContext(sslContext).build();
 121 
 122             BodyPublisher bodyPublisher = bpSupplier.get();
 123             HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
 124                     .POST(bodyPublisher)
 125                     .build();
 126 
 127             HttpResponse<String> resp = client.send(request, asString());
 128 
 129             out.println("Got response: " + resp);
 130             out.println("Got body: " + resp.body());
 131             assertTrue(resp.statusCode() == 200,
 132                     "Expected 200, got:" + resp.statusCode());
 133             assertEquals(resp.body(), bodyPublisher.bodyAsString());
 134         }
 135     }
 136 
 137     @Test(dataProvider = "variants")
 138     void testAsync(String uri, Supplier<BodyPublisher> bpSupplier, boolean sameClient)
 139             throws Exception
 140     {
 141         HttpClient client = null;
 142         for (int i=0; i< ITERATION_COUNT; i++) {
 143             if (!sameClient || client == null)
 144                 client = HttpClient.newBuilder().sslContext(sslContext).build();
 145 
 146             BodyPublisher bodyPublisher = bpSupplier.get();
 147             HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
 148                     .POST(bodyPublisher)
 149                     .build();
 150 
 151             CompletableFuture<HttpResponse<String>> cf = client.sendAsync(request, asString());
 152             HttpResponse<String> resp = cf.get();
 153 
 154             out.println("Got response: " + resp);
 155             out.println("Got body: " + resp.body());
 156             assertTrue(resp.statusCode() == 200,
 157                     "Expected 200, got:" + resp.statusCode());
 158             assertEquals(resp.body(), bodyPublisher.bodyAsString());
 159         }
 160     }
 161 
 162     /** A Publisher that returns an UNKNOWN content length. */
 163     static class UnknownLengthBodyPublisher extends BodyPublisher {
 164         @Override
 165         public long contentLength() {
 166             return -1;  // unknown
 167         }
 168     }
 169 
 170     /** A Publisher that returns a FIXED content length. */
 171     static class FixedLengthBodyPublisher extends BodyPublisher {
 172         final int LENGTH = Arrays.stream(BODY)
 173                 .mapToInt(s-> s.getBytes(US_ASCII).length)
 174                 .sum();
 175         @Override
 176         public long contentLength() {
 177             return LENGTH;
 178         }
 179     }
 180 
 181     /**
 182      * A Publisher that ( quite correctly ) invokes onComplete, after the last
 183      * item has been published, even without any outstanding demand.
 184      */
 185     static abstract class BodyPublisher implements HttpRequest.BodyPublisher {
 186 
 187         String[] BODY = new String[]
 188                 { "Say ", "Hello ", "To ", "My ", "Little ", "Friend" };
 189 
 190         protected volatile Flow.Subscriber subscriber;
 191 
 192         @Override
 193         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
 194             this.subscriber = subscriber;
 195             subscriber.onSubscribe(new InternalSubscription());
 196         }
 197 
 198         @Override
 199         public abstract long contentLength();
 200 
 201         String bodyAsString() {
 202             return Arrays.stream(BODY).collect(Collectors.joining());
 203         }
 204 
 205         class InternalSubscription implements Flow.Subscription {
 206 
 207             private final AtomicLong demand = new AtomicLong();
 208             private final AtomicBoolean cancelled = new AtomicBoolean();
 209             private volatile int position;
 210 
 211             private static final int IDLE    =  1;
 212             private static final int PUSHING =  2;
 213             private static final int AGAIN   =  4;
 214             private final AtomicInteger state = new AtomicInteger(IDLE);
 215 
 216             @Override
 217             public void request(long n) {
 218                 if (n <= 0L) {
 219                     subscriber.onError(new IllegalArgumentException(
 220                             "non-positive subscription request"));
 221                     return;
 222                 }
 223                 if (cancelled.get()) {
 224                     return;
 225                 }
 226 
 227                 while (true) {
 228                     long prev = demand.get(), d;
 229                     if ((d = prev + n) < prev) // saturate
 230                         d = Long.MAX_VALUE;
 231                     if (demand.compareAndSet(prev, d))
 232                         break;
 233                 }
 234 
 235                 while (true) {
 236                     int s = state.get();
 237                     if (s == IDLE) {
 238                         if (state.compareAndSet(IDLE, PUSHING)) {
 239                             while (true) {
 240                                 push();
 241                                 if (state.compareAndSet(PUSHING, IDLE))
 242                                     return;
 243                                 else if (state.compareAndSet(AGAIN, PUSHING))
 244                                     continue;
 245                             }
 246                         }
 247                     } else if (s == PUSHING) {
 248                         if (state.compareAndSet(PUSHING, AGAIN))
 249                             return;
 250                     } else if (s == AGAIN){
 251                         // do nothing, the pusher will already rerun
 252                         return;
 253                     } else {
 254                         throw new AssertionError("Unknown state:" + s);
 255                     }
 256                 }
 257             }
 258 
 259             private void push() {
 260                 long prev;
 261                 while ((prev = demand.get()) > 0) {
 262                     if (!demand.compareAndSet(prev, prev -1))
 263                         continue;
 264 
 265                     int index = position;
 266                     if (index < BODY.length) {
 267                         position++;
 268                         subscriber.onNext(ByteBuffer.wrap(BODY[index].getBytes(US_ASCII)));
 269                     }
 270                 }
 271 
 272                 if (position == BODY.length && !cancelled.get()) {
 273                     cancelled.set(true);
 274                     subscriber.onComplete();  // NOTE: onComplete without demand
 275                 }
 276             }
 277 
 278             @Override
 279             public void cancel() {
 280                 if (cancelled.compareAndExchange(false, true))
 281                     return;  // already cancelled
 282             }
 283         }
 284     }
 285 
 286     @BeforeTest
 287     public void setup() throws Exception {
 288         sslContext = new SimpleSSLContext().get();
 289         if (sslContext == null)
 290             throw new AssertionError("Unexpected null sslContext");
 291 
 292         InetSocketAddress sa = new InetSocketAddress("localhost", 0);
 293         httpTestServer = HttpServer.create(sa, 0);
 294         httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
 295         httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo";
 296 
 297         httpsTestServer = HttpsServer.create(sa, 0);
 298         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
 299         httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
 300         httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo";
 301 
 302         http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
 303         http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
 304         int port = http2TestServer.getAddress().getPort();
 305         http2URI = "http://127.0.0.1:" + port + "/http2/echo";
 306 
 307         https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
 308         https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
 309         port = https2TestServer.getAddress().getPort();
 310         https2URI = "https://127.0.0.1:" + port + "/https2/echo";
 311 
 312         httpTestServer.start();
 313         httpsTestServer.start();
 314         http2TestServer.start();
 315         https2TestServer.start();
 316     }
 317 
 318     @AfterTest
 319     public void teardown() throws Exception {
 320         httpTestServer.stop(0);
 321         httpsTestServer.stop(0);
 322         http2TestServer.stop();
 323         https2TestServer.stop();
 324     }
 325 
 326     static class Http1EchoHandler implements HttpHandler {
 327         @Override
 328         public void handle(HttpExchange t) throws IOException {
 329             try (InputStream is = t.getRequestBody();
 330                  OutputStream os = t.getResponseBody()) {
 331                 byte[] bytes = is.readAllBytes();
 332                 t.sendResponseHeaders(200, bytes.length);
 333                 os.write(bytes);
 334             }
 335         }
 336     }
 337 
 338     static class Http2EchoHandler implements Http2Handler {
 339         @Override
 340         public void handle(Http2TestExchange t) throws IOException {
 341             try (InputStream is = t.getRequestBody();
 342                  OutputStream os = t.getResponseBody()) {
 343                 byte[] bytes = is.readAllBytes();
 344                 t.sendResponseHeaders(200, bytes.length);
 345                 os.write(bytes);
 346             }
 347         }
 348     }
 349 }