1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import jdk.incubator.http.HttpClient;
  25 import jdk.incubator.http.HttpClient.Version;
  26 import jdk.incubator.http.HttpHeaders;
  27 import jdk.incubator.http.HttpRequest;
  28 import jdk.incubator.http.HttpResponse;
  29 import jdk.jshell.spi.ExecutionControl;
  30 import jdk.testlibrary.SimpleSSLContext;
  31 
  32 import javax.net.ServerSocketFactory;
  33 import javax.net.ssl.SSLContext;
  34 import javax.net.ssl.SSLException;
  35 import javax.net.ssl.SSLServerSocketFactory;
  36 import java.io.IOException;
  37 import java.net.SocketException;
  38 import java.net.URI;
  39 import java.nio.ByteBuffer;
  40 import java.util.List;
  41 import java.util.concurrent.CompletableFuture;
  42 import java.util.concurrent.CompletionException;
  43 import java.util.concurrent.CompletionStage;
  44 import java.util.concurrent.ExecutionException;
  45 import java.util.concurrent.Flow;
  46 import java.util.concurrent.atomic.AtomicBoolean;
  47 import java.util.concurrent.atomic.AtomicInteger;
  48 
  49 import jdk.incubator.http.HttpResponse.BodyHandler;
  50 import jdk.incubator.http.HttpResponse.BodySubscriber;
  51 
  52 import static java.lang.String.format;
  53 import static java.lang.System.out;
  54 
  55 /**
  56  * @test
  57  * @bug 8087112
  58  * @library /lib/testlibrary
  59  * @build jdk.testlibrary.SimpleSSLContext
  60  * @build MockServer
  61  * @run main/othervm  CancelledResponse
  62  * @run main/othervm  CancelledResponse SSL
  63  */
  64 
  65 /**
  66  * Similar test to SplitResponse except that the client will cancel the response
  67  * before receiving it fully.
  68  */
  69 public class CancelledResponse {
  70 
  71     static String response(String body, boolean serverKeepalive) {
  72         StringBuilder sb = new StringBuilder();
  73         sb.append("HTTP/1.1 200 OK\r\n");
  74         if (!serverKeepalive)
  75             sb.append("Connection: Close\r\n");
  76 
  77         sb.append("Content-length: ").append(body.length()).append("\r\n");
  78         sb.append("\r\n");
  79         sb.append(body);
  80         return sb.toString();
  81     }
  82 
  83     static final String responses[] = {
  84         "Lorem ipsum dolor sit amet consectetur adipiscing elit,",
  85         "sed do eiusmod tempor quis nostrud exercitation ullamco laboris nisi ut",
  86         "aliquip ex ea commodo consequat."
  87     };
  88 
  89     final ServerSocketFactory factory;
  90     final SSLContext context;
  91     final boolean useSSL;
  92     CancelledResponse(boolean useSSL) throws IOException {
  93         this.useSSL = useSSL;
  94         context = new SimpleSSLContext().get();
  95         SSLContext.setDefault(context);
  96         factory = useSSL ? SSLServerSocketFactory.getDefault()
  97                          : ServerSocketFactory.getDefault();
  98     }
  99 
 100     public HttpClient newHttpClient() {
 101         HttpClient client;
 102         if (useSSL) {
 103             client = HttpClient.newBuilder()
 104                                .sslContext(context)
 105                                .build();
 106         } else {
 107             client = HttpClient.newHttpClient();
 108         }
 109         return client;
 110     }
 111 
 112     public static void main(String[] args) throws Exception {
 113         boolean useSSL = false;
 114         if (args != null && args.length == 1) {
 115             useSSL = "SSL".equals(args[0]);
 116         }
 117         CancelledResponse sp = new CancelledResponse(useSSL);
 118 
 119         for (Version version : Version.values()) {
 120             for (boolean serverKeepalive : new boolean[]{ true, false }) {
 121                 // Note: the mock server doesn't support Keep-Alive, but
 122                 // pretending that it might exercises code paths in and out of
 123                 // the connection pool, and retry logic
 124                 for (boolean async : new boolean[]{ true, false }) {
 125                     sp.test(version, serverKeepalive, async);
 126                 }
 127             }
 128         }
 129     }
 130 
 131     static class CancelException extends IOException {
 132     }
 133 
 134     // @Test
 135     void test(Version version, boolean serverKeepalive, boolean async)
 136         throws Exception
 137     {
 138         out.println(format("*** version %s, serverKeepAlive: %s, async: %s ***",
 139                            version, serverKeepalive, async));
 140         MockServer server = new MockServer(0, factory);
 141         URI uri = new URI(server.getURL());
 142         out.println("server is: " + uri);
 143         server.start();
 144 
 145         HttpClient client = newHttpClient();
 146         HttpRequest request = HttpRequest.newBuilder(uri).version(version).build();
 147         try {
 148             for (int i = 0; i < responses.length; i++) {
 149                 HttpResponse<String> r = null;
 150                 CompletableFuture<HttpResponse<String>> cf1;
 151                 CancelException expected = null;
 152                 AtomicBoolean cancelled = new AtomicBoolean();
 153 
 154                 out.println("----- iteration " + i + " -----");
 155                 String body = responses[i];
 156                 Thread t = sendSplitResponse(response(body, serverKeepalive), server, cancelled);
 157 
 158                 try {
 159                     if (async) {
 160                         out.println("send async: " + request);
 161                         cf1 = client.sendAsync(request, asString(body, cancelled));
 162                         r = cf1.get();
 163                     } else { // sync
 164                         out.println("send sync: " + request);
 165                         r = client.send(request, asString(body, cancelled));
 166                     }
 167                 } catch (CancelException c1) {
 168                     System.out.println("Got expected exception: " + c1);
 169                     expected = c1;
 170                 } catch (IOException | ExecutionException | CompletionException c2) {
 171                     Throwable c = c2;
 172                     while (c != null && !(c instanceof CancelException)) {
 173                         c = c.getCause();
 174                     }
 175                     if (c instanceof CancelException) {
 176                         System.out.println("Got expected exception: " + c);
 177                         expected = (CancelException)c;
 178                     } else throw c2;
 179                 }
 180                 if (r != null) {
 181                     if (r.statusCode() != 200)
 182                         throw new RuntimeException("Failed");
 183 
 184                     String rxbody = r.body();
 185                     out.println("received " + rxbody);
 186                     if (!rxbody.equals(body))
 187                         throw new RuntimeException(format("Expected:%s, got:%s", body, rxbody));
 188                 }
 189                 t.join();
 190                 conn.close();
 191                 if (expected == null) {
 192                     throw new RuntimeException("Expected exception not raised for "
 193                             + i + " cancelled=" + cancelled.get());
 194                 }
 195             }
 196         } finally {
 197             server.close();
 198         }
 199         System.out.println("OK");
 200     }
 201 
 202     static class CancellingSubscriber implements BodySubscriber<String> {
 203         private final String expected;
 204         private final CompletableFuture<String> result;
 205         private Flow.Subscription subscription;
 206         final AtomicInteger index = new AtomicInteger();
 207         final AtomicBoolean cancelled;
 208         CancellingSubscriber(String expected, AtomicBoolean cancelled) {
 209             this.cancelled = cancelled;
 210             this.expected = expected;
 211             result = new CompletableFuture<>();
 212         }
 213 
 214         @Override
 215         public CompletionStage<String> getBody() {
 216             return result;
 217         }
 218 
 219         @Override
 220         public void onSubscribe(Flow.Subscription subscription) {
 221             this.subscription = subscription;
 222             subscription.request(1);
 223         }
 224 
 225         @Override
 226         public void onNext(List<ByteBuffer> item) {
 227             //if (result.isDone())
 228             for (ByteBuffer b : item) {
 229                 while (b.hasRemaining() && !result.isDone()) {
 230                     int i = index.getAndIncrement();
 231                     char at = expected.charAt(i);
 232                     byte[] data = new byte[b.remaining()];
 233                     b.get(data); // we know that the server writes 1 char
 234                     String s = new String(data);
 235                     char c = s.charAt(0);
 236                     if (c != at) {
 237                         Throwable x = new IllegalStateException("char at "
 238                                 + i + " is '" + c + "' expected '"
 239                                 + at + "' for \"" + expected +"\"");
 240                         out.println("unexpected char received, cancelling");
 241                         subscription.cancel();
 242                         result.completeExceptionally(x);
 243                         return;
 244                     }
 245                 }
 246             }
 247             if (index.get() > 0 && !result.isDone()) {
 248                 // we should complete the result here, but let's
 249                 // see if we get something back...
 250                 out.println("Cancelling subscription after reading " + index.get());
 251                 cancelled.set(true);
 252                 subscription.cancel();
 253                 result.completeExceptionally(new CancelException());
 254                 return;
 255             }
 256             if (!result.isDone()) {
 257                 out.println("requesting 1 more");
 258                 subscription.request(1);
 259             }
 260         }
 261 
 262         @Override
 263         public void onError(Throwable throwable) {
 264             result.completeExceptionally(throwable);
 265         }
 266 
 267         @Override
 268         public void onComplete() {
 269             int len = index.get();
 270             if (len == expected.length()) {
 271                 result.complete(expected);
 272             } else {
 273                 Throwable x = new IllegalStateException("received only "
 274                         + len + " chars, expected " + expected.length()
 275                         + " for \"" + expected +"\"");
 276                 result.completeExceptionally(x);
 277             }
 278         }
 279     }
 280 
 281     static class CancellingHandler implements BodyHandler<String> {
 282         final String expected;
 283         final AtomicBoolean cancelled;
 284         CancellingHandler(String expected, AtomicBoolean cancelled) {
 285             this.expected = expected;
 286             this.cancelled = cancelled;
 287         }
 288         @Override
 289         public BodySubscriber<String> apply(int statusCode, HttpHeaders responseHeaders) {
 290             assert !cancelled.get();
 291             return new CancellingSubscriber(expected, cancelled);
 292         }
 293     }
 294 
 295     BodyHandler<String> asString(String expected, AtomicBoolean cancelled) {
 296         return new CancellingHandler(expected, cancelled);
 297     }
 298 
 299     // required for cleanup
 300     volatile MockServer.Connection conn;
 301 
 302     // Sends the response, mostly, one byte at a time with a small delay
 303     // between bytes, to encourage that each byte is read in a separate read
 304     Thread sendSplitResponse(String s, MockServer server, AtomicBoolean cancelled) {
 305         System.out.println("Sending: ");
 306         Thread t = new Thread(() -> {
 307             System.out.println("Waiting for server to receive headers");
 308             conn = server.activity();
 309             System.out.println("Start sending response");
 310             int sent = 0;
 311             try {
 312                 int len = s.length();
 313                 out.println("sending " + s);
 314                 for (int i = 0; i < len; i++) {
 315                     String onechar = s.substring(i, i + 1);
 316                     conn.send(onechar);
 317                     sent++;
 318                     Thread.sleep(10);
 319                 }
 320                 out.println("sent " + s);
 321             } catch (SSLException | SocketException x) {
 322                 // if SSL then we might get a "Broken Pipe", otherwise
 323                 // a "Socket closed".
 324                 boolean expected = cancelled.get();
 325                 if (sent > 0 && expected) {
 326                     System.out.println("Connection closed by peer as expected: " + x);
 327                     return;
 328                 } else {
 329                     System.out.println("Unexpected exception (sent="
 330                             + sent + ", cancelled=" + expected + "): " + x);
 331                     throw new RuntimeException(x);
 332                 }
 333             } catch (IOException | InterruptedException e) {
 334                 throw new RuntimeException(e);
 335             }
 336         });
 337         t.setDaemon(true);
 338         t.start();
 339         return t;
 340     }
 341 }