< prev index next >
   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /**
  25  * @test
  26  * @bug 8087112
  27  * @library /lib/testlibrary/
  28  * @build jdk.testlibrary.SimpleSSLContext
  29  * @compile ../../../com/sun/net/httpserver/LogFilter.java
  30  * @compile ../../../com/sun/net/httpserver/FileServerHandler.java
  31  * @run main/othervm ManyRequests
  32  * @summary Send a large number of requests asynchronously
  33  */
  34 
  35 //package javaapplication16;
  36 
  37 import com.sun.net.httpserver.HttpsConfigurator;
  38 import com.sun.net.httpserver.HttpsServer;
  39 import java.io.IOException;
  40 import java.io.UncheckedIOException;
  41 import java.net.http.HttpClient;
  42 import java.net.http.HttpRequest;
  43 import java.net.http.HttpResponse;
  44 import java.net.InetSocketAddress;
  45 import java.net.URI;
  46 import java.util.Arrays;
  47 import java.util.HashMap;
  48 import java.util.LinkedList;
  49 import java.util.Random;
  50 import java.util.concurrent.CompletableFuture;
  51 import javax.net.ssl.SSLContext;
  52 import jdk.testlibrary.SimpleSSLContext;
  53 
  54 public class ManyRequests {
  55 
  56     public static void main(String[] args) throws Exception {
  57         SSLContext ctx = new SimpleSSLContext().get();
  58 
  59         InetSocketAddress addr = new InetSocketAddress(0);
  60         HttpsServer server = HttpsServer.create(addr, 0);
  61         server.setHttpsConfigurator(new HttpsConfigurator(ctx));
  62 
  63         HttpClient client = HttpClient.create()
  64                                       .sslContext(ctx)
  65                                       .build();
  66         try {
  67             test(server, client);
  68             System.out.println("OK");
  69         } finally {
  70             server.stop(0);
  71             client.executorService().shutdownNow();
  72         }
  73     }
  74 
  75     static final int REQUESTS = 1000;
  76 
  77     static void test(HttpsServer server, HttpClient client) throws Exception {
  78         int port = server.getAddress().getPort();
  79         URI uri = new URI("https://127.0.0.1:" + port + "/foo/x");
  80         server.createContext("/foo", new EchoHandler());
  81         server.start();
  82 
  83         RequestLimiter limiter = new RequestLimiter(40);
  84         Random rand = new Random();
  85         CompletableFuture<Void>[] results = new CompletableFuture[REQUESTS];
  86         HashMap<HttpRequest,byte[]> bodies = new HashMap<>();
  87 
  88         for (int i=0; i<REQUESTS; i++) {
  89             byte[] buf = new byte[i+1];  // different size bodies
  90             rand.nextBytes(buf);
  91             HttpRequest r = client.request(uri)
  92                                   .body(HttpRequest.fromByteArray(buf))
  93                                   .POST();
  94             bodies.put(r, buf);
  95 
  96             results[i] =
  97                 limiter.whenOkToSend()
  98                        .thenCompose((v) -> r.responseAsync())
  99                        .thenCompose((resp) -> {
 100                            limiter.requestComplete();
 101                            if (resp.statusCode() != 200) {
 102                                resp.bodyAsync(HttpResponse.ignoreBody());
 103                                String s = "Expected 200, got: " + resp.statusCode();
 104                                return completedWithIOException(s);
 105                            }
 106                            return resp.bodyAsync(HttpResponse.asByteArray())
 107                                       .thenApply((b) -> new Pair<>(resp, b));
 108                        })
 109                       .thenAccept((pair) -> {
 110                           HttpRequest request = pair.t.request();
 111                           byte[] requestBody = bodies.get(request);
 112                           check(Arrays.equals(requestBody, pair.u),
 113                                 "bodies not equal");
 114 
 115                       });
 116         }
 117         // wait for them all to complete and throw exception in case of error
 118         CompletableFuture.allOf(results).join();
 119     }
 120 
 121     static <T> CompletableFuture<T> completedWithIOException(String message) {
 122         CompletableFuture<T> cf = new CompletableFuture<>();
 123         cf.completeExceptionally(new IOException(message));
 124         return cf;
 125     }
 126 
 127     static final class Pair<T,U> {
 128         Pair(T t, U u) {
 129             this.t = t; this.u = u;
 130         }
 131         T t;
 132         U u;
 133     }
 134 
 135     /**
 136      * A simple limiter for controlling the number of requests to be run in
 137      * parallel whenOkToSend() is called which returns a CF<Void> that allows
 138      * each individual request to proceed, or block temporarily (blocking occurs
 139      * on the waiters list here. As each request actually completes
 140      * requestComplete() is called to notify this object, and allow some
 141      * requests to continue.
 142      */
 143     static class RequestLimiter {
 144 
 145         static final CompletableFuture<Void> COMPLETED_FUTURE =
 146                 CompletableFuture.completedFuture(null);
 147 
 148         final int maxnumber;
 149         final LinkedList<CompletableFuture<Void>> waiters;
 150         int number;
 151         boolean blocked;
 152 
 153         RequestLimiter(int maximum) {
 154             waiters = new LinkedList<>();
 155             maxnumber = maximum;
 156         }
 157 
 158         synchronized void requestComplete() {
 159             number--;
 160             // don't unblock until number of requests has halved.
 161             if ((blocked && number <= maxnumber / 2) ||
 162                         (!blocked && waiters.size() > 0)) {
 163                 int toRelease = Math.min(maxnumber - number, waiters.size());
 164                 for (int i=0; i<toRelease; i++) {
 165                     CompletableFuture<Void> f = waiters.remove();
 166                     number ++;
 167                     f.complete(null);
 168                 }
 169                 blocked = number >= maxnumber;
 170             }
 171         }
 172 
 173         synchronized CompletableFuture<Void> whenOkToSend() {
 174             if (blocked || number + 1 >= maxnumber) {
 175                 blocked = true;
 176                 CompletableFuture<Void> r = new CompletableFuture<>();
 177                 waiters.add(r);
 178                 return r;
 179             } else {
 180                 number++;
 181                 return COMPLETED_FUTURE;
 182             }
 183         }
 184     }
 185 
 186     static void check(boolean cond, Object... msg) {
 187         if (cond)
 188             return;
 189         StringBuilder sb = new StringBuilder();
 190         for (Object o : msg)
 191             sb.append(o);
 192         throw new RuntimeException(sb.toString());
 193     }
 194 }
< prev index next >