/* * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ import jdk.incubator.http.HttpClient; import jdk.incubator.http.HttpClient.Version; import jdk.incubator.http.HttpHeaders; import jdk.incubator.http.HttpRequest; import jdk.incubator.http.HttpResponse; import jdk.jshell.spi.ExecutionControl; import jdk.testlibrary.SimpleSSLContext; import javax.net.ServerSocketFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; import javax.net.ssl.SSLServerSocketFactory; import java.io.IOException; import java.net.SocketException; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import jdk.incubator.http.HttpResponse.BodyHandler; import jdk.incubator.http.HttpResponse.BodySubscriber; import static java.lang.String.format; import static java.lang.System.out; /** * @test * @bug 8087112 * @library /lib/testlibrary * @build jdk.testlibrary.SimpleSSLContext * @build MockServer * @run main/othervm CancelledResponse * @run main/othervm CancelledResponse SSL */ /** * Similar test to SplitResponse except that the client will cancel the response * before receiving it fully. */ public class CancelledResponse { static String response(String body, boolean serverKeepalive) { StringBuilder sb = new StringBuilder(); sb.append("HTTP/1.1 200 OK\r\n"); if (!serverKeepalive) sb.append("Connection: Close\r\n"); sb.append("Content-length: ").append(body.length()).append("\r\n"); sb.append("\r\n"); sb.append(body); return sb.toString(); } static final String responses[] = { "Lorem ipsum dolor sit amet consectetur adipiscing elit,", "sed do eiusmod tempor quis nostrud exercitation ullamco laboris nisi ut", "aliquip ex ea commodo consequat." }; final ServerSocketFactory factory; final SSLContext context; final boolean useSSL; CancelledResponse(boolean useSSL) throws IOException { this.useSSL = useSSL; context = new SimpleSSLContext().get(); SSLContext.setDefault(context); factory = useSSL ? SSLServerSocketFactory.getDefault() : ServerSocketFactory.getDefault(); } public HttpClient newHttpClient() { HttpClient client; if (useSSL) { client = HttpClient.newBuilder() .sslContext(context) .build(); } else { client = HttpClient.newHttpClient(); } return client; } public static void main(String[] args) throws Exception { boolean useSSL = false; if (args != null && args.length == 1) { useSSL = "SSL".equals(args[0]); } CancelledResponse sp = new CancelledResponse(useSSL); for (Version version : Version.values()) { for (boolean serverKeepalive : new boolean[]{ true, false }) { // Note: the mock server doesn't support Keep-Alive, but // pretending that it might exercises code paths in and out of // the connection pool, and retry logic for (boolean async : new boolean[]{ true, false }) { sp.test(version, serverKeepalive, async); } } } } static class CancelException extends IOException { } // @Test void test(Version version, boolean serverKeepalive, boolean async) throws Exception { out.println(format("*** version %s, serverKeepAlive: %s, async: %s ***", version, serverKeepalive, async)); MockServer server = new MockServer(0, factory); URI uri = new URI(server.getURL()); out.println("server is: " + uri); server.start(); HttpClient client = newHttpClient(); HttpRequest request = HttpRequest.newBuilder(uri).version(version).build(); try { for (int i = 0; i < responses.length; i++) { HttpResponse r = null; CompletableFuture> cf1; CancelException expected = null; AtomicBoolean cancelled = new AtomicBoolean(); out.println("----- iteration " + i + " -----"); String body = responses[i]; Thread t = sendSplitResponse(response(body, serverKeepalive), server, cancelled); try { if (async) { out.println("send async: " + request); cf1 = client.sendAsync(request, asString(body, cancelled)); r = cf1.get(); } else { // sync out.println("send sync: " + request); r = client.send(request, asString(body, cancelled)); } } catch (CancelException c1) { System.out.println("Got expected exception: " + c1); expected = c1; } catch (IOException | ExecutionException | CompletionException c2) { Throwable c = c2; while (c != null && !(c instanceof CancelException)) { c = c.getCause(); } if (c instanceof CancelException) { System.out.println("Got expected exception: " + c); expected = (CancelException)c; } else throw c2; } if (r != null) { if (r.statusCode() != 200) throw new RuntimeException("Failed"); String rxbody = r.body(); out.println("received " + rxbody); if (!rxbody.equals(body)) throw new RuntimeException(format("Expected:%s, got:%s", body, rxbody)); } t.join(); conn.close(); if (expected == null) { throw new RuntimeException("Expected exception not raised for " + i + " cancelled=" + cancelled.get()); } } } finally { server.close(); } System.out.println("OK"); } static class CancellingSubscriber implements BodySubscriber { private final String expected; private final CompletableFuture result; private Flow.Subscription subscription; final AtomicInteger index = new AtomicInteger(); final AtomicBoolean cancelled; CancellingSubscriber(String expected, AtomicBoolean cancelled) { this.cancelled = cancelled; this.expected = expected; result = new CompletableFuture<>(); } @Override public CompletionStage getBody() { return result; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(List item) { //if (result.isDone()) for (ByteBuffer b : item) { while (b.hasRemaining() && !result.isDone()) { int i = index.getAndIncrement(); char at = expected.charAt(i); byte[] data = new byte[b.remaining()]; b.get(data); // we know that the server writes 1 char String s = new String(data); char c = s.charAt(0); if (c != at) { Throwable x = new IllegalStateException("char at " + i + " is '" + c + "' expected '" + at + "' for \"" + expected +"\""); out.println("unexpected char received, cancelling"); subscription.cancel(); result.completeExceptionally(x); return; } } } if (index.get() > 0 && !result.isDone()) { // we should complete the result here, but let's // see if we get something back... out.println("Cancelling subscription after reading " + index.get()); cancelled.set(true); subscription.cancel(); result.completeExceptionally(new CancelException()); return; } if (!result.isDone()) { out.println("requesting 1 more"); subscription.request(1); } } @Override public void onError(Throwable throwable) { result.completeExceptionally(throwable); } @Override public void onComplete() { int len = index.get(); if (len == expected.length()) { result.complete(expected); } else { Throwable x = new IllegalStateException("received only " + len + " chars, expected " + expected.length() + " for \"" + expected +"\""); result.completeExceptionally(x); } } } static class CancellingHandler implements BodyHandler { final String expected; final AtomicBoolean cancelled; CancellingHandler(String expected, AtomicBoolean cancelled) { this.expected = expected; this.cancelled = cancelled; } @Override public BodySubscriber apply(int statusCode, HttpHeaders responseHeaders) { assert !cancelled.get(); return new CancellingSubscriber(expected, cancelled); } } BodyHandler asString(String expected, AtomicBoolean cancelled) { return new CancellingHandler(expected, cancelled); } // required for cleanup volatile MockServer.Connection conn; // Sends the response, mostly, one byte at a time with a small delay // between bytes, to encourage that each byte is read in a separate read Thread sendSplitResponse(String s, MockServer server, AtomicBoolean cancelled) { System.out.println("Sending: "); Thread t = new Thread(() -> { System.out.println("Waiting for server to receive headers"); conn = server.activity(); System.out.println("Start sending response"); int sent = 0; try { int len = s.length(); out.println("sending " + s); for (int i = 0; i < len; i++) { String onechar = s.substring(i, i + 1); conn.send(onechar); sent++; Thread.sleep(10); } out.println("sent " + s); } catch (SSLException | SocketException x) { // if SSL then we might get a "Broken Pipe", otherwise // a "Socket closed". boolean expected = cancelled.get(); if (sent > 0 && expected) { System.out.println("Connection closed by peer as expected: " + x); return; } else { System.out.println("Unexpected exception (sent=" + sent + ", cancelled=" + expected + "): " + x); throw new RuntimeException(x); } } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } }); t.setDaemon(true); t.start(); return t; } }