1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @summary Checks correct handling of Publishers that call onComplete without demand 27 * @modules java.base/sun.net.www.http 28 * jdk.incubator.httpclient/jdk.incubator.http.internal.common 29 * jdk.incubator.httpclient/jdk.incubator.http.internal.frame 30 * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack 31 * java.logging 32 * jdk.httpserver 33 * @library /lib/testlibrary http2/server 34 * @build Http2TestServer 35 * @build jdk.testlibrary.SimpleSSLContext 36 * @run testng/othervm CustomRequestPublisher 37 */ 38 39 import com.sun.net.httpserver.HttpExchange; 40 import com.sun.net.httpserver.HttpHandler; 41 import com.sun.net.httpserver.HttpServer; 42 import com.sun.net.httpserver.HttpsConfigurator; 43 import com.sun.net.httpserver.HttpsServer; 44 import java.io.IOException; 45 import java.io.InputStream; 46 import java.io.OutputStream; 47 import java.net.InetSocketAddress; 48 import java.net.URI; 49 import java.nio.ByteBuffer; 50 import java.util.Arrays; 51 import java.util.concurrent.CompletableFuture; 52 import java.util.concurrent.Flow; 53 import java.util.concurrent.atomic.AtomicBoolean; 54 import java.util.concurrent.atomic.AtomicInteger; 55 import java.util.concurrent.atomic.AtomicLong; 56 import java.util.function.Supplier; 57 import java.util.stream.Collectors; 58 import javax.net.ssl.SSLContext; 59 import jdk.incubator.http.HttpClient; 60 import jdk.incubator.http.HttpRequest; 61 import jdk.incubator.http.HttpResponse; 62 import jdk.testlibrary.SimpleSSLContext; 63 import org.testng.annotations.AfterTest; 64 import org.testng.annotations.BeforeTest; 65 import org.testng.annotations.DataProvider; 66 import org.testng.annotations.Test; 67 import static java.lang.System.out; 68 import static java.nio.charset.StandardCharsets.US_ASCII; 69 import static jdk.incubator.http.HttpResponse.BodyHandler.asString; 70 import static org.testng.Assert.assertEquals; 71 import static org.testng.Assert.assertTrue; 72 73 public class CustomRequestPublisher { 74 75 SSLContext sslContext; 76 HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] 77 HttpsServer httpsTestServer; // HTTPS/1.1 78 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) 79 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) 80 String httpURI; 81 String httpsURI; 82 String http2URI; 83 String https2URI; 84 85 @DataProvider(name = "variants") 86 public Object[][] variants() { 87 Supplier<BodyPublisher> fixedSupplier = () -> new FixedLengthBodyPublisher(); 88 Supplier<BodyPublisher> unknownSupplier = () -> new UnknownLengthBodyPublisher(); 89 90 return new Object[][]{ 91 { httpURI, fixedSupplier, false }, 92 { httpURI, unknownSupplier, false }, 93 { httpsURI, fixedSupplier, false }, 94 { httpsURI, unknownSupplier, false }, 95 { http2URI, fixedSupplier, false }, 96 { http2URI, unknownSupplier, false }, 97 { https2URI, fixedSupplier, false,}, 98 { https2URI, unknownSupplier, false }, 99 100 { httpURI, fixedSupplier, true }, 101 { httpURI, unknownSupplier, true }, 102 { httpsURI, fixedSupplier, true }, 103 { httpsURI, unknownSupplier, true }, 104 { http2URI, fixedSupplier, true }, 105 { http2URI, unknownSupplier, true }, 106 { https2URI, fixedSupplier, true,}, 107 { https2URI, unknownSupplier, true }, 108 }; 109 } 110 111 static final int ITERATION_COUNT = 10; 112 113 @Test(dataProvider = "variants") 114 void test(String uri, Supplier<BodyPublisher> bpSupplier, boolean sameClient) 115 throws Exception 116 { 117 HttpClient client = null; 118 for (int i=0; i< ITERATION_COUNT; i++) { 119 if (!sameClient || client == null) 120 client = HttpClient.newBuilder().sslContext(sslContext).build(); 121 122 BodyPublisher bodyPublisher = bpSupplier.get(); 123 HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) 124 .POST(bodyPublisher) 125 .build(); 126 127 HttpResponse<String> resp = client.send(request, asString()); 128 129 out.println("Got response: " + resp); 130 out.println("Got body: " + resp.body()); 131 assertTrue(resp.statusCode() == 200, 132 "Expected 200, got:" + resp.statusCode()); 133 assertEquals(resp.body(), bodyPublisher.bodyAsString()); 134 } 135 } 136 137 @Test(dataProvider = "variants") 138 void testAsync(String uri, Supplier<BodyPublisher> bpSupplier, boolean sameClient) 139 throws Exception 140 { 141 HttpClient client = null; 142 for (int i=0; i< ITERATION_COUNT; i++) { 143 if (!sameClient || client == null) 144 client = HttpClient.newBuilder().sslContext(sslContext).build(); 145 146 BodyPublisher bodyPublisher = bpSupplier.get(); 147 HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) 148 .POST(bodyPublisher) 149 .build(); 150 151 CompletableFuture<HttpResponse<String>> cf = client.sendAsync(request, asString()); 152 HttpResponse<String> resp = cf.get(); 153 154 out.println("Got response: " + resp); 155 out.println("Got body: " + resp.body()); 156 assertTrue(resp.statusCode() == 200, 157 "Expected 200, got:" + resp.statusCode()); 158 assertEquals(resp.body(), bodyPublisher.bodyAsString()); 159 } 160 } 161 162 /** A Publisher that returns an UNKNOWN content length. */ 163 static class UnknownLengthBodyPublisher extends BodyPublisher { 164 @Override 165 public long contentLength() { 166 return -1; // unknown 167 } 168 } 169 170 /** A Publisher that returns a FIXED content length. */ 171 static class FixedLengthBodyPublisher extends BodyPublisher { 172 final int LENGTH = Arrays.stream(BODY) 173 .mapToInt(s-> s.getBytes(US_ASCII).length) 174 .sum(); 175 @Override 176 public long contentLength() { 177 return LENGTH; 178 } 179 } 180 181 /** 182 * A Publisher that ( quite correctly ) invokes onComplete, after the last 183 * item has been published, even without any outstanding demand. 184 */ 185 static abstract class BodyPublisher implements HttpRequest.BodyPublisher { 186 187 String[] BODY = new String[] 188 { "Say ", "Hello ", "To ", "My ", "Little ", "Friend" }; 189 190 protected volatile Flow.Subscriber subscriber; 191 192 @Override 193 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 194 this.subscriber = subscriber; 195 subscriber.onSubscribe(new InternalSubscription()); 196 } 197 198 @Override 199 public abstract long contentLength(); 200 201 String bodyAsString() { 202 return Arrays.stream(BODY).collect(Collectors.joining()); 203 } 204 205 class InternalSubscription implements Flow.Subscription { 206 207 private final AtomicLong demand = new AtomicLong(); 208 private final AtomicBoolean cancelled = new AtomicBoolean(); 209 private volatile int position; 210 211 private static final int IDLE = 1; 212 private static final int PUSHING = 2; 213 private static final int AGAIN = 4; 214 private final AtomicInteger state = new AtomicInteger(IDLE); 215 216 @Override 217 public void request(long n) { 218 if (n <= 0L) { 219 subscriber.onError(new IllegalArgumentException( 220 "non-positive subscription request")); 221 return; 222 } 223 if (cancelled.get()) { 224 return; 225 } 226 227 while (true) { 228 long prev = demand.get(), d; 229 if ((d = prev + n) < prev) // saturate 230 d = Long.MAX_VALUE; 231 if (demand.compareAndSet(prev, d)) 232 break; 233 } 234 235 while (true) { 236 int s = state.get(); 237 if (s == IDLE) { 238 if (state.compareAndSet(IDLE, PUSHING)) { 239 while (true) { 240 push(); 241 if (state.compareAndSet(PUSHING, IDLE)) 242 return; 243 else if (state.compareAndSet(AGAIN, PUSHING)) 244 continue; 245 } 246 } 247 } else if (s == PUSHING) { 248 if (state.compareAndSet(PUSHING, AGAIN)) 249 return; 250 } else if (s == AGAIN){ 251 // do nothing, the pusher will already rerun 252 return; 253 } else { 254 throw new AssertionError("Unknown state:" + s); 255 } 256 } 257 } 258 259 private void push() { 260 long prev; 261 while ((prev = demand.get()) > 0) { 262 if (!demand.compareAndSet(prev, prev -1)) 263 continue; 264 265 int index = position; 266 if (index < BODY.length) { 267 position++; 268 subscriber.onNext(ByteBuffer.wrap(BODY[index].getBytes(US_ASCII))); 269 } 270 } 271 272 if (position == BODY.length && !cancelled.get()) { 273 cancelled.set(true); 274 subscriber.onComplete(); // NOTE: onComplete without demand 275 } 276 } 277 278 @Override 279 public void cancel() { 280 if (cancelled.compareAndExchange(false, true)) 281 return; // already cancelled 282 } 283 } 284 } 285 286 @BeforeTest 287 public void setup() throws Exception { 288 sslContext = new SimpleSSLContext().get(); 289 if (sslContext == null) 290 throw new AssertionError("Unexpected null sslContext"); 291 292 InetSocketAddress sa = new InetSocketAddress("localhost", 0); 293 httpTestServer = HttpServer.create(sa, 0); 294 httpTestServer.createContext("/http1/echo", new Http1EchoHandler()); 295 httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo"; 296 297 httpsTestServer = HttpsServer.create(sa, 0); 298 httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); 299 httpsTestServer.createContext("/https1/echo", new Http1EchoHandler()); 300 httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo"; 301 302 http2TestServer = new Http2TestServer("127.0.0.1", false, 0); 303 http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo"); 304 int port = http2TestServer.getAddress().getPort(); 305 http2URI = "http://127.0.0.1:" + port + "/http2/echo"; 306 307 https2TestServer = new Http2TestServer("127.0.0.1", true, 0); 308 https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo"); 309 port = https2TestServer.getAddress().getPort(); 310 https2URI = "https://127.0.0.1:" + port + "/https2/echo"; 311 312 httpTestServer.start(); 313 httpsTestServer.start(); 314 http2TestServer.start(); 315 https2TestServer.start(); 316 } 317 318 @AfterTest 319 public void teardown() throws Exception { 320 httpTestServer.stop(0); 321 httpsTestServer.stop(0); 322 http2TestServer.stop(); 323 https2TestServer.stop(); 324 } 325 326 static class Http1EchoHandler implements HttpHandler { 327 @Override 328 public void handle(HttpExchange t) throws IOException { 329 try (InputStream is = t.getRequestBody(); 330 OutputStream os = t.getResponseBody()) { 331 byte[] bytes = is.readAllBytes(); 332 t.sendResponseHeaders(200, bytes.length); 333 os.write(bytes); 334 } 335 } 336 } 337 338 static class Http2EchoHandler implements Http2Handler { 339 @Override 340 public void handle(Http2TestExchange t) throws IOException { 341 try (InputStream is = t.getRequestBody(); 342 OutputStream os = t.getResponseBody()) { 343 byte[] bytes = is.readAllBytes(); 344 t.sendResponseHeaders(200, bytes.length); 345 os.write(bytes); 346 } 347 } 348 } 349 }