1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.OutputStream; 27 import java.net.InetSocketAddress; 28 import java.net.URI; 29 import java.nio.ByteBuffer; 30 import java.nio.MappedByteBuffer; 31 import java.util.Arrays; 32 import java.util.concurrent.Flow; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 import java.util.concurrent.atomic.AtomicInteger; 35 import java.util.concurrent.atomic.AtomicLong; 36 import com.sun.net.httpserver.HttpExchange; 37 import com.sun.net.httpserver.HttpHandler; 38 import com.sun.net.httpserver.HttpServer; 39 import com.sun.net.httpserver.HttpsConfigurator; 40 import com.sun.net.httpserver.HttpsServer; 41 import jdk.incubator.http.HttpClient; 42 import jdk.incubator.http.HttpRequest; 43 import jdk.incubator.http.HttpResponse; 44 import jdk.testlibrary.SimpleSSLContext; 45 import org.testng.annotations.AfterTest; 46 import org.testng.annotations.BeforeTest; 47 import org.testng.annotations.DataProvider; 48 import org.testng.annotations.Test; 49 import javax.net.ssl.SSLContext; 50 import static java.util.stream.Collectors.joining; 51 import static java.nio.charset.StandardCharsets.UTF_8; 52 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromPublisher; 53 import static jdk.incubator.http.HttpResponse.BodyHandler.asString; 54 import static org.testng.Assert.assertEquals; 55 import static org.testng.Assert.assertThrows; 56 57 /* 58 * @test 59 * @summary Basic tests for Flow adapter Publishers 60 * @modules java.base/sun.net.www.http 61 * jdk.incubator.httpclient/jdk.incubator.http.internal.common 62 * jdk.incubator.httpclient/jdk.incubator.http.internal.frame 63 * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack 64 * java.logging 65 * jdk.httpserver 66 * @library /lib/testlibrary http2/server 67 * @build Http2TestServer 68 * @build jdk.testlibrary.SimpleSSLContext 69 * @run testng/othervm FlowAdapterPublisherTest 70 */ 71 72 public class FlowAdapterPublisherTest { 73 74 SSLContext sslContext; 75 HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] 76 HttpsServer httpsTestServer; // HTTPS/1.1 77 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) 78 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) 79 String httpURI; 80 String httpsURI; 81 String http2URI; 82 String https2URI; 83 84 @DataProvider(name = "uris") 85 public Object[][] variants() { 86 return new Object[][]{ 87 { httpURI }, 88 { httpsURI }, 89 { http2URI }, 90 { https2URI }, 91 }; 92 } 93 94 static final Class<NullPointerException> NPE = NullPointerException.class; 95 static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class; 96 97 @Test 98 public void testAPIExceptions() { 99 assertThrows(NPE, () -> fromPublisher(null)); 100 assertThrows(NPE, () -> fromPublisher(null, 1)); 101 assertThrows(IAE, () -> fromPublisher(new BBPublisher(), 0)); 102 assertThrows(IAE, () -> fromPublisher(new BBPublisher(), -1)); 103 assertThrows(IAE, () -> fromPublisher(new BBPublisher(), Long.MIN_VALUE)); 104 } 105 106 // Flow.Publisher<ByteBuffer> 107 108 @Test(dataProvider = "uris") 109 void testByteBufferPublisherUnknownLength(String url) { 110 String[] body = new String[] { "You know ", "it's summer ", "in Ireland ", 111 "when the ", "rain gets ", "warmer." }; 112 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 113 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 114 .POST(fromPublisher(new BBPublisher(body))).build(); 115 116 HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join(); 117 String text = response.body(); 118 System.out.println(text); 119 assertEquals(response.statusCode(), 200); 120 assertEquals(text, Arrays.stream(body).collect(joining())); 121 } 122 123 @Test(dataProvider = "uris") 124 void testByteBufferPublisherFixedLength(String url) { 125 String[] body = new String[] { "You know ", "it's summer ", "in Ireland ", 126 "when the ", "rain gets ", "warmer." }; 127 int cl = Arrays.stream(body).mapToInt(String::length).sum(); 128 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 129 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 130 .POST(fromPublisher(new BBPublisher(body), cl)).build(); 131 132 HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join(); 133 String text = response.body(); 134 System.out.println(text); 135 assertEquals(response.statusCode(), 200); 136 assertEquals(text, Arrays.stream(body).collect(joining())); 137 } 138 139 // Flow.Publisher<MappedByteBuffer> 140 141 @Test(dataProvider = "uris") 142 void testMappedByteBufferPublisherUnknownLength(String url) { 143 String[] body = new String[] { "God invented ", "whiskey to ", "keep the ", 144 "Irish from ", "ruling the ", "world." }; 145 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 146 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 147 .POST(fromPublisher(new MBBPublisher(body))).build(); 148 149 HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join(); 150 String text = response.body(); 151 System.out.println(text); 152 assertEquals(response.statusCode(), 200); 153 assertEquals(text, Arrays.stream(body).collect(joining())); 154 } 155 156 @Test(dataProvider = "uris") 157 void testMappedByteBufferPublisherFixedLength(String url) { 158 String[] body = new String[] { "God invented ", "whiskey to ", "keep the ", 159 "Irish from ", "ruling the ", "world." }; 160 int cl = Arrays.stream(body).mapToInt(String::length).sum(); 161 HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); 162 HttpRequest request = HttpRequest.newBuilder(URI.create(url)) 163 .POST(fromPublisher(new MBBPublisher(body), cl)).build(); 164 165 HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join(); 166 String text = response.body(); 167 System.out.println(text); 168 assertEquals(response.statusCode(), 200); 169 assertEquals(text, Arrays.stream(body).collect(joining())); 170 } 171 172 static class BBPublisher extends AbstractPublisher 173 implements Flow.Publisher<ByteBuffer> 174 { 175 BBPublisher(String... bodyParts) { super(bodyParts); } 176 177 @Override 178 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { 179 this.subscriber = subscriber; 180 subscriber.onSubscribe(new InternalSubscription()); 181 } 182 } 183 184 static class MBBPublisher extends AbstractPublisher 185 implements Flow.Publisher<MappedByteBuffer> 186 { 187 MBBPublisher(String... bodyParts) { super(bodyParts); } 188 189 @Override 190 public void subscribe(Flow.Subscriber<? super MappedByteBuffer> subscriber) { 191 this.subscriber = subscriber; 192 subscriber.onSubscribe(new InternalSubscription()); 193 } 194 } 195 196 static abstract class AbstractPublisher { 197 private final String[] bodyParts; 198 protected volatile Flow.Subscriber subscriber; 199 200 AbstractPublisher(String... bodyParts) { 201 this.bodyParts = bodyParts; 202 } 203 204 class InternalSubscription implements Flow.Subscription { 205 206 private final AtomicLong demand = new AtomicLong(); 207 private final AtomicBoolean cancelled = new AtomicBoolean(); 208 private volatile int position; 209 210 private static final int IDLE = 1; 211 private static final int PUSHING = 2; 212 private static final int AGAIN = 4; 213 private final AtomicInteger state = new AtomicInteger(IDLE); 214 215 @Override 216 public void request(long n) { 217 if (n <= 0L) { 218 subscriber.onError(new IllegalArgumentException( 219 "non-positive subscription request")); 220 return; 221 } 222 if (cancelled.get()) { 223 return; 224 } 225 226 while (true) { 227 long prev = demand.get(), d; 228 if ((d = prev + n) < prev) // saturate 229 d = Long.MAX_VALUE; 230 if (demand.compareAndSet(prev, d)) 231 break; 232 } 233 234 while (true) { 235 int s = state.get(); 236 if (s == IDLE) { 237 if (state.compareAndSet(IDLE, PUSHING)) { 238 while (true) { 239 push(); 240 if (state.compareAndSet(PUSHING, IDLE)) 241 return; 242 else if (state.compareAndSet(AGAIN, PUSHING)) 243 continue; 244 } 245 } 246 } else if (s == PUSHING) { 247 if (state.compareAndSet(PUSHING, AGAIN)) 248 return; 249 } else if (s == AGAIN){ 250 // do nothing, the pusher will already rerun 251 return; 252 } else { 253 throw new AssertionError("Unknown state:" + s); 254 } 255 } 256 } 257 258 private void push() { 259 long prev; 260 while ((prev = demand.get()) > 0) { 261 if (!demand.compareAndSet(prev, prev -1)) 262 continue; 263 264 int index = position; 265 if (index < bodyParts.length) { 266 position++; 267 subscriber.onNext(ByteBuffer.wrap(bodyParts[index].getBytes(UTF_8))); 268 } 269 } 270 271 if (position == bodyParts.length && !cancelled.get()) { 272 cancelled.set(true); 273 subscriber.onComplete(); 274 } 275 } 276 277 @Override 278 public void cancel() { 279 if (cancelled.compareAndExchange(false, true)) 280 return; // already cancelled 281 } 282 } 283 } 284 285 @BeforeTest 286 public void setup() throws Exception { 287 sslContext = new SimpleSSLContext().get(); 288 if (sslContext == null) 289 throw new AssertionError("Unexpected null sslContext"); 290 291 InetSocketAddress sa = new InetSocketAddress("localhost", 0); 292 httpTestServer = HttpServer.create(sa, 0); 293 httpTestServer.createContext("/http1/echo", new Http1EchoHandler()); 294 httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo"; 295 296 httpsTestServer = HttpsServer.create(sa, 0); 297 httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); 298 httpsTestServer.createContext("/https1/echo", new Http1EchoHandler()); 299 httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo"; 300 301 http2TestServer = new Http2TestServer("127.0.0.1", false, 0); 302 http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo"); 303 int port = http2TestServer.getAddress().getPort(); 304 http2URI = "http://127.0.0.1:" + port + "/http2/echo"; 305 306 https2TestServer = new Http2TestServer("127.0.0.1", true, 0); 307 https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo"); 308 port = https2TestServer.getAddress().getPort(); 309 https2URI = "https://127.0.0.1:" + port + "/https2/echo"; 310 311 httpTestServer.start(); 312 httpsTestServer.start(); 313 http2TestServer.start(); 314 https2TestServer.start(); 315 } 316 317 @AfterTest 318 public void teardown() throws Exception { 319 httpTestServer.stop(0); 320 httpsTestServer.stop(0); 321 http2TestServer.stop(); 322 https2TestServer.stop(); 323 } 324 325 static class Http1EchoHandler implements HttpHandler { 326 @Override 327 public void handle(HttpExchange t) throws IOException { 328 try (InputStream is = t.getRequestBody(); 329 OutputStream os = t.getResponseBody()) { 330 byte[] bytes = is.readAllBytes(); 331 t.sendResponseHeaders(200, bytes.length); 332 os.write(bytes); 333 } 334 } 335 } 336 337 static class Http2EchoHandler implements Http2Handler { 338 @Override 339 public void handle(Http2TestExchange t) throws IOException { 340 try (InputStream is = t.getRequestBody(); 341 OutputStream os = t.getResponseBody()) { 342 byte[] bytes = is.readAllBytes(); 343 t.sendResponseHeaders(200, bytes.length); 344 os.write(bytes); 345 } 346 } 347 } 348 }