1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import jdk.incubator.http.HttpClient; 25 import jdk.incubator.http.HttpClient.Version; 26 import jdk.incubator.http.HttpHeaders; 27 import jdk.incubator.http.HttpRequest; 28 import jdk.incubator.http.HttpResponse; 29 import jdk.jshell.spi.ExecutionControl; 30 import jdk.testlibrary.SimpleSSLContext; 31 32 import javax.net.ServerSocketFactory; 33 import javax.net.ssl.SSLContext; 34 import javax.net.ssl.SSLException; 35 import javax.net.ssl.SSLServerSocketFactory; 36 import java.io.IOException; 37 import java.net.SocketException; 38 import java.net.URI; 39 import java.nio.ByteBuffer; 40 import java.util.List; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionException; 43 import java.util.concurrent.CompletionStage; 44 import java.util.concurrent.ExecutionException; 45 import java.util.concurrent.Flow; 46 import java.util.concurrent.atomic.AtomicBoolean; 47 import java.util.concurrent.atomic.AtomicInteger; 48 49 import jdk.incubator.http.HttpResponse.BodyHandler; 50 import jdk.incubator.http.HttpResponse.BodySubscriber; 51 52 import static java.lang.String.format; 53 import static java.lang.System.out; 54 55 /** 56 * @test 57 * @bug 8087112 58 * @library /lib/testlibrary 59 * @build jdk.testlibrary.SimpleSSLContext 60 * @build MockServer 61 * @run main/othervm CancelledResponse 62 * @run main/othervm CancelledResponse SSL 63 */ 64 65 /** 66 * Similar test to SplitResponse except that the client will cancel the response 67 * before receiving it fully. 68 */ 69 public class CancelledResponse { 70 71 static String response(String body, boolean serverKeepalive) { 72 StringBuilder sb = new StringBuilder(); 73 sb.append("HTTP/1.1 200 OK\r\n"); 74 if (!serverKeepalive) 75 sb.append("Connection: Close\r\n"); 76 77 sb.append("Content-length: ").append(body.length()).append("\r\n"); 78 sb.append("\r\n"); 79 sb.append(body); 80 return sb.toString(); 81 } 82 83 static final String responses[] = { 84 "Lorem ipsum dolor sit amet consectetur adipiscing elit,", 85 "sed do eiusmod tempor quis nostrud exercitation ullamco laboris nisi ut", 86 "aliquip ex ea commodo consequat." 87 }; 88 89 final ServerSocketFactory factory; 90 final SSLContext context; 91 final boolean useSSL; 92 CancelledResponse(boolean useSSL) throws IOException { 93 this.useSSL = useSSL; 94 context = new SimpleSSLContext().get(); 95 SSLContext.setDefault(context); 96 factory = useSSL ? SSLServerSocketFactory.getDefault() 97 : ServerSocketFactory.getDefault(); 98 } 99 100 public HttpClient newHttpClient() { 101 HttpClient client; 102 if (useSSL) { 103 client = HttpClient.newBuilder() 104 .sslContext(context) 105 .build(); 106 } else { 107 client = HttpClient.newHttpClient(); 108 } 109 return client; 110 } 111 112 public static void main(String[] args) throws Exception { 113 boolean useSSL = false; 114 if (args != null && args.length == 1) { 115 useSSL = "SSL".equals(args[0]); 116 } 117 CancelledResponse sp = new CancelledResponse(useSSL); 118 119 for (Version version : Version.values()) { 120 for (boolean serverKeepalive : new boolean[]{ true, false }) { 121 // Note: the mock server doesn't support Keep-Alive, but 122 // pretending that it might exercises code paths in and out of 123 // the connection pool, and retry logic 124 for (boolean async : new boolean[]{ true, false }) { 125 sp.test(version, serverKeepalive, async); 126 } 127 } 128 } 129 } 130 131 static class CancelException extends IOException { 132 } 133 134 // @Test 135 void test(Version version, boolean serverKeepalive, boolean async) 136 throws Exception 137 { 138 out.println(format("*** version %s, serverKeepAlive: %s, async: %s ***", 139 version, serverKeepalive, async)); 140 MockServer server = new MockServer(0, factory); 141 URI uri = new URI(server.getURL()); 142 out.println("server is: " + uri); 143 server.start(); 144 145 HttpClient client = newHttpClient(); 146 HttpRequest request = HttpRequest.newBuilder(uri).version(version).build(); 147 try { 148 for (int i = 0; i < responses.length; i++) { 149 HttpResponse<String> r = null; 150 CompletableFuture<HttpResponse<String>> cf1; 151 CancelException expected = null; 152 AtomicBoolean cancelled = new AtomicBoolean(); 153 154 out.println("----- iteration " + i + " -----"); 155 String body = responses[i]; 156 Thread t = sendSplitResponse(response(body, serverKeepalive), server, cancelled); 157 158 try { 159 if (async) { 160 out.println("send async: " + request); 161 cf1 = client.sendAsync(request, asString(body, cancelled)); 162 r = cf1.get(); 163 } else { // sync 164 out.println("send sync: " + request); 165 r = client.send(request, asString(body, cancelled)); 166 } 167 } catch (CancelException c1) { 168 System.out.println("Got expected exception: " + c1); 169 expected = c1; 170 } catch (IOException | ExecutionException | CompletionException c2) { 171 Throwable c = c2; 172 while (c != null && !(c instanceof CancelException)) { 173 c = c.getCause(); 174 } 175 if (c instanceof CancelException) { 176 System.out.println("Got expected exception: " + c); 177 expected = (CancelException)c; 178 } else throw c2; 179 } 180 if (r != null) { 181 if (r.statusCode() != 200) 182 throw new RuntimeException("Failed"); 183 184 String rxbody = r.body(); 185 out.println("received " + rxbody); 186 if (!rxbody.equals(body)) 187 throw new RuntimeException(format("Expected:%s, got:%s", body, rxbody)); 188 } 189 t.join(); 190 conn.close(); 191 if (expected == null) { 192 throw new RuntimeException("Expected exception not raised for " 193 + i + " cancelled=" + cancelled.get()); 194 } 195 } 196 } finally { 197 server.close(); 198 } 199 System.out.println("OK"); 200 } 201 202 static class CancellingSubscriber implements BodySubscriber<String> { 203 private final String expected; 204 private final CompletableFuture<String> result; 205 private Flow.Subscription subscription; 206 final AtomicInteger index = new AtomicInteger(); 207 final AtomicBoolean cancelled; 208 CancellingSubscriber(String expected, AtomicBoolean cancelled) { 209 this.cancelled = cancelled; 210 this.expected = expected; 211 result = new CompletableFuture<>(); 212 } 213 214 @Override 215 public CompletionStage<String> getBody() { 216 return result; 217 } 218 219 @Override 220 public void onSubscribe(Flow.Subscription subscription) { 221 this.subscription = subscription; 222 subscription.request(1); 223 } 224 225 @Override 226 public void onNext(List<ByteBuffer> item) { 227 //if (result.isDone()) 228 for (ByteBuffer b : item) { 229 while (b.hasRemaining() && !result.isDone()) { 230 int i = index.getAndIncrement(); 231 char at = expected.charAt(i); 232 byte[] data = new byte[b.remaining()]; 233 b.get(data); // we know that the server writes 1 char 234 String s = new String(data); 235 char c = s.charAt(0); 236 if (c != at) { 237 Throwable x = new IllegalStateException("char at " 238 + i + " is '" + c + "' expected '" 239 + at + "' for \"" + expected +"\""); 240 out.println("unexpected char received, cancelling"); 241 subscription.cancel(); 242 result.completeExceptionally(x); 243 return; 244 } 245 } 246 } 247 if (index.get() > 0 && !result.isDone()) { 248 // we should complete the result here, but let's 249 // see if we get something back... 250 out.println("Cancelling subscription after reading " + index.get()); 251 cancelled.set(true); 252 subscription.cancel(); 253 result.completeExceptionally(new CancelException()); 254 return; 255 } 256 if (!result.isDone()) { 257 out.println("requesting 1 more"); 258 subscription.request(1); 259 } 260 } 261 262 @Override 263 public void onError(Throwable throwable) { 264 result.completeExceptionally(throwable); 265 } 266 267 @Override 268 public void onComplete() { 269 int len = index.get(); 270 if (len == expected.length()) { 271 result.complete(expected); 272 } else { 273 Throwable x = new IllegalStateException("received only " 274 + len + " chars, expected " + expected.length() 275 + " for \"" + expected +"\""); 276 result.completeExceptionally(x); 277 } 278 } 279 } 280 281 static class CancellingHandler implements BodyHandler<String> { 282 final String expected; 283 final AtomicBoolean cancelled; 284 CancellingHandler(String expected, AtomicBoolean cancelled) { 285 this.expected = expected; 286 this.cancelled = cancelled; 287 } 288 @Override 289 public BodySubscriber<String> apply(int statusCode, HttpHeaders responseHeaders) { 290 assert !cancelled.get(); 291 return new CancellingSubscriber(expected, cancelled); 292 } 293 } 294 295 BodyHandler<String> asString(String expected, AtomicBoolean cancelled) { 296 return new CancellingHandler(expected, cancelled); 297 } 298 299 // required for cleanup 300 volatile MockServer.Connection conn; 301 302 // Sends the response, mostly, one byte at a time with a small delay 303 // between bytes, to encourage that each byte is read in a separate read 304 Thread sendSplitResponse(String s, MockServer server, AtomicBoolean cancelled) { 305 System.out.println("Sending: "); 306 Thread t = new Thread(() -> { 307 System.out.println("Waiting for server to receive headers"); 308 conn = server.activity(); 309 System.out.println("Start sending response"); 310 int sent = 0; 311 try { 312 int len = s.length(); 313 out.println("sending " + s); 314 for (int i = 0; i < len; i++) { 315 String onechar = s.substring(i, i + 1); 316 conn.send(onechar); 317 sent++; 318 Thread.sleep(10); 319 } 320 out.println("sent " + s); 321 } catch (SSLException | SocketException x) { 322 // if SSL then we might get a "Broken Pipe", otherwise 323 // a "Socket closed". 324 boolean expected = cancelled.get(); 325 if (sent > 0 && expected) { 326 System.out.println("Connection closed by peer as expected: " + x); 327 return; 328 } else { 329 System.out.println("Unexpected exception (sent=" 330 + sent + ", cancelled=" + expected + "): " + x); 331 throw new RuntimeException(x); 332 } 333 } catch (IOException | InterruptedException e) { 334 throw new RuntimeException(e); 335 } 336 }); 337 t.setDaemon(true); 338 t.start(); 339 return t; 340 } 341 }