1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /*
  25  * @test
  26  * @bug 8087112 8180044
  27  * @modules jdk.incubator.httpclient
  28  *          java.logging
  29  *          jdk.httpserver
  30  * @library /lib/testlibrary/ /
  31  * @build jdk.testlibrary.SimpleSSLContext
  32  * @compile ../../../com/sun/net/httpserver/LogFilter.java
  33  * @compile ../../../com/sun/net/httpserver/EchoHandler.java
  34  * @compile ../../../com/sun/net/httpserver/FileServerHandler.java
  35  * @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=ssl ManyRequests
  36  * @run main/othervm/timeout=40 -Dtest.insertDelay=true ManyRequests
  37  * @run main/othervm/timeout=40 -Dtest.chunkSize=64 ManyRequests
  38  * @run main/othervm/timeout=40 -Dtest.insertDelay=true -Dtest.chunkSize=64 ManyRequests
  39  * @summary Send a large number of requests asynchronously
  40  */
  41  // * @run main/othervm/timeout=40 -Djdk.httpclient.HttpClient.log=ssl ManyRequests
  42 
  43 import com.sun.net.httpserver.HttpsConfigurator;
  44 import com.sun.net.httpserver.HttpsParameters;
  45 import com.sun.net.httpserver.HttpsServer;
  46 import com.sun.net.httpserver.HttpExchange;
  47 import java.io.IOException;
  48 import java.io.InputStream;
  49 import java.io.OutputStream;
  50 import jdk.incubator.http.HttpClient;
  51 import jdk.incubator.http.HttpRequest;
  52 import java.net.InetSocketAddress;
  53 import java.net.URI;
  54 import java.util.Arrays;
  55 import java.util.Formatter;
  56 import java.util.HashMap;
  57 import java.util.LinkedList;
  58 import java.util.Random;
  59 import java.util.logging.Logger;
  60 import java.util.logging.Level;
  61 import java.util.concurrent.CompletableFuture;
  62 import javax.net.ssl.SSLContext;
  63 import jdk.testlibrary.SimpleSSLContext;
  64 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromByteArray;
  65 import static jdk.incubator.http.HttpResponse.BodyHandler.asByteArray;
  66 
  67 public class ManyRequests {
  68 
  69     volatile static int counter = 0;
  70 
  71     public static void main(String[] args) throws Exception {
  72         Logger logger = Logger.getLogger("com.sun.net.httpserver");
  73         logger.setLevel(Level.ALL);
  74         logger.info("TEST");
  75         System.out.println("Sending " + REQUESTS
  76                          + " requests; delay=" + INSERT_DELAY
  77                          + ", chunks=" + CHUNK_SIZE
  78                          + ", XFixed=" + XFIXED);
  79         SSLContext ctx = new SimpleSSLContext().get();
  80 
  81         InetSocketAddress addr = new InetSocketAddress(0);
  82         HttpsServer server = HttpsServer.create(addr, 0);
  83         server.setHttpsConfigurator(new Configurator(ctx));
  84 
  85         HttpClient client = HttpClient.newBuilder()
  86                                       .sslContext(ctx)
  87                                       .build();
  88         try {
  89             test(server, client);
  90             System.out.println("OK");
  91         } finally {
  92             server.stop(0);
  93         }
  94     }
  95 
  96     //static final int REQUESTS = 1000;
  97     static final int REQUESTS = 20;
  98     static final boolean INSERT_DELAY = Boolean.getBoolean("test.insertDelay");
  99     static final int CHUNK_SIZE = Math.max(0,
 100            Integer.parseInt(System.getProperty("test.chunkSize", "0")));
 101     static final boolean XFIXED = Boolean.getBoolean("test.XFixed");
 102 
 103     static class TestEchoHandler extends EchoHandler {
 104         final Random rand = new Random();
 105         @Override
 106         public void handle(HttpExchange e) throws IOException {
 107             System.out.println("Server: received " + e.getRequestURI());
 108             super.handle(e);
 109         }
 110         protected void close(OutputStream os) throws IOException {
 111             if (INSERT_DELAY) {
 112                 try { Thread.sleep(rand.nextInt(200)); }
 113                 catch (InterruptedException e) {}
 114             }
 115             os.close();
 116         }
 117         protected void close(InputStream is) throws IOException {
 118             if (INSERT_DELAY) {
 119                 try { Thread.sleep(rand.nextInt(200)); }
 120                 catch (InterruptedException e) {}
 121             }
 122             is.close();
 123         }
 124     }
 125 
 126     static void test(HttpsServer server, HttpClient client) throws Exception {
 127         int port = server.getAddress().getPort();
 128         URI baseURI = new URI("https://127.0.0.1:" + port + "/foo/x");
 129         server.createContext("/foo", new TestEchoHandler());
 130         server.start();
 131 
 132         RequestLimiter limiter = new RequestLimiter(40);
 133         Random rand = new Random();
 134         CompletableFuture<?>[] results = new CompletableFuture<?>[REQUESTS];
 135         HashMap<HttpRequest,byte[]> bodies = new HashMap<>();
 136 
 137         for (int i=0; i<REQUESTS; i++) {
 138             byte[] buf = new byte[(i+1)*CHUNK_SIZE+i+1];  // different size bodies
 139             rand.nextBytes(buf);
 140             URI uri = new URI(baseURI.toString() + String.valueOf(i+1));
 141             HttpRequest r = HttpRequest.newBuilder(uri)
 142                                        .header("XFixed", "true")
 143                                        .POST(fromByteArray(buf))
 144                                        .build();
 145             bodies.put(r, buf);
 146 
 147             results[i] =
 148                 limiter.whenOkToSend()
 149                        .thenCompose((v) -> {
 150                            System.out.println("Client: sendAsync: " + r.uri());
 151                            return client.sendAsync(r, asByteArray());
 152                        })
 153                        .thenCompose((resp) -> {
 154                            limiter.requestComplete();
 155                            if (resp.statusCode() != 200) {
 156                                String s = "Expected 200, got: " + resp.statusCode();
 157                                System.out.println(s + " from "
 158                                                   + resp.request().uri().getPath());
 159                                return completedWithIOException(s);
 160                            } else {
 161                                counter++;
 162                                System.out.println("Result (" + counter + ") from "
 163                                                    + resp.request().uri().getPath());
 164                            }
 165                            return CompletableFuture.completedStage(resp.body())
 166                                       .thenApply((b) -> new Pair<>(resp, b));
 167                        })
 168                       .thenAccept((pair) -> {
 169                           HttpRequest request = pair.t.request();
 170                           byte[] requestBody = bodies.get(request);
 171                           check(Arrays.equals(requestBody, pair.u),
 172                                 "bodies not equal:[" + bytesToHexString(requestBody)
 173                                 + "] [" + bytesToHexString(pair.u) + "]");
 174 
 175                       });
 176         }
 177 
 178         // wait for them all to complete and throw exception in case of error
 179         CompletableFuture.allOf(results).join();
 180     }
 181 
 182     static <T> CompletableFuture<T> completedWithIOException(String message) {
 183         return CompletableFuture.failedFuture(new IOException(message));
 184     }
 185 
 186     static String bytesToHexString(byte[] bytes) {
 187         if (bytes == null)
 188             return "null";
 189 
 190         StringBuilder sb = new StringBuilder(bytes.length * 2);
 191 
 192         Formatter formatter = new Formatter(sb);
 193         for (byte b : bytes) {
 194             formatter.format("%02x", b);
 195         }
 196 
 197         return sb.toString();
 198     }
 199 
 200     static final class Pair<T,U> {
 201         Pair(T t, U u) {
 202             this.t = t; this.u = u;
 203         }
 204         T t;
 205         U u;
 206     }
 207 
 208     /**
 209      * A simple limiter for controlling the number of requests to be run in
 210      * parallel whenOkToSend() is called which returns a CF<Void> that allows
 211      * each individual request to proceed, or block temporarily (blocking occurs
 212      * on the waiters list here. As each request actually completes
 213      * requestComplete() is called to notify this object, and allow some
 214      * requests to continue.
 215      */
 216     static class RequestLimiter {
 217 
 218         static final CompletableFuture<Void> COMPLETED_FUTURE =
 219                 CompletableFuture.completedFuture(null);
 220 
 221         final int maxnumber;
 222         final LinkedList<CompletableFuture<Void>> waiters;
 223         int number;
 224         boolean blocked;
 225 
 226         RequestLimiter(int maximum) {
 227             waiters = new LinkedList<>();
 228             maxnumber = maximum;
 229         }
 230 
 231         synchronized void requestComplete() {
 232             number--;
 233             // don't unblock until number of requests has halved.
 234             if ((blocked && number <= maxnumber / 2) ||
 235                         (!blocked && waiters.size() > 0)) {
 236                 int toRelease = Math.min(maxnumber - number, waiters.size());
 237                 for (int i=0; i<toRelease; i++) {
 238                     CompletableFuture<Void> f = waiters.remove();
 239                     number ++;
 240                     f.complete(null);
 241                 }
 242                 blocked = number >= maxnumber;
 243             }
 244         }
 245 
 246         synchronized CompletableFuture<Void> whenOkToSend() {
 247             if (blocked || number + 1 >= maxnumber) {
 248                 blocked = true;
 249                 CompletableFuture<Void> r = new CompletableFuture<>();
 250                 waiters.add(r);
 251                 return r;
 252             } else {
 253                 number++;
 254                 return COMPLETED_FUTURE;
 255             }
 256         }
 257     }
 258 
 259     static void check(boolean cond, Object... msg) {
 260         if (cond)
 261             return;
 262         StringBuilder sb = new StringBuilder();
 263         for (Object o : msg)
 264             sb.append(o);
 265         throw new RuntimeException(sb.toString());
 266     }
 267 
 268     static class Configurator extends HttpsConfigurator {
 269         public Configurator(SSLContext ctx) {
 270             super(ctx);
 271         }
 272 
 273         public void configure(HttpsParameters params) {
 274             params.setSSLParameters(getSSLContext().getSupportedSSLParameters());
 275         }
 276     }
 277 }