1 /*
   2  * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 package org.openjdk.bench.java.util.concurrent;
  24 
  25 import org.openjdk.jmh.annotations.Benchmark;
  26 import org.openjdk.jmh.annotations.OutputTimeUnit;
  27 import org.openjdk.jmh.annotations.Param;
  28 import org.openjdk.jmh.annotations.Scope;
  29 import org.openjdk.jmh.annotations.Setup;
  30 import org.openjdk.jmh.annotations.State;
  31 import org.openjdk.jmh.annotations.TearDown;
  32 
  33 import java.util.ArrayList;
  34 import java.util.List;
  35 import java.util.concurrent.ArrayBlockingQueue;
  36 import java.util.concurrent.Callable;
  37 import java.util.concurrent.ExecutionException;
  38 import java.util.concurrent.ExecutorService;
  39 import java.util.concurrent.ForkJoinPool;
  40 import java.util.concurrent.Future;
  41 import java.util.concurrent.ThreadPoolExecutor;
  42 import java.util.concurrent.TimeUnit;
  43 
  44 /**
  45  * Benchmark assesses general ForkJoinPool performance with simple tasks
  46  *
  47  * @author Aleksey Shipilev (aleksey.shipilev@oracle.com)
  48  */
  49 @OutputTimeUnit(TimeUnit.SECONDS)
  50 @State(Scope.Benchmark)
  51 public class ForkJoinPoolRawCallable {
  52 
  53     /**
  54      * Implementation notes:
  55      *
  56      * This test submits empty callables.
  57      * Callables are submitted in batches, to prevent convoying by driver threads.
  58      * One driver thread can saturate up to BATCH_SIZE threads.
  59      *
  60      * One baseline includes raw throughput, without submissions to executors.
  61      * This is not considered as fair comparison, but left around as basic compute baseline.
  62      * Executors could not possibly be faster than that.
  63      *
  64      * Another baseline includes ThreadPoolExecutor.
  65      * Note that this baseline is inherently non-scalable with ABQ backing TPE.
  66      * The size of ABQ is chosen to accommodate tons of threads, which can also suffer due to cache effects.
  67      *
  68      * Tasks are reading public volatile field to break opportunistic optimizations in loops.
  69      * Tasks are pre-allocated to negate instantiation costs.
  70      */
  71 
  72     @Param("0")
  73     private int workers;
  74 
  75     @Param("1000")
  76     private int batchSize;
  77 
  78     private ThreadPoolExecutor tpe;
  79     private ForkJoinPool fjpSync;
  80     private ForkJoinPool fjpAsync;
  81     private List<SampleTask> tasks;
  82 
  83     public volatile int arg = 42;
  84 
  85     @Setup
  86     public void setup() {
  87         SampleTask task = new SampleTask();
  88 
  89         tasks = new ArrayList<>();
  90         for (int c = 0; c < batchSize; c++) {
  91             tasks.add(task);
  92         }
  93 
  94         if (workers == 0) {
  95             workers = Runtime.getRuntime().availableProcessors();
  96         }
  97 
  98         tpe = new ThreadPoolExecutor(workers, workers, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(batchSize * batchSize));
  99         fjpSync = new ForkJoinPool(workers, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false);
 100         fjpAsync = new ForkJoinPool(workers, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
 101     }
 102 
 103     @TearDown
 104     public void teardown() {
 105         tpe.shutdownNow();
 106         fjpSync.shutdownNow();
 107         fjpAsync.shutdownNow();
 108     }
 109 
 110     @Benchmark
 111     public int baseline_raw() throws Exception {
 112         int s = 0;
 113         for (SampleTask t : tasks) {
 114             s += t.call();
 115         }
 116         return s;
 117     }
 118 
 119     @Benchmark
 120     public int baseline_TPE() throws Exception {
 121         return doWork(tpe);
 122     }
 123 
 124     @Benchmark
 125     public int testSync() throws ExecutionException, InterruptedException {
 126         return doWork(fjpSync);
 127     }
 128 
 129     @Benchmark
 130     public int testAsync() throws ExecutionException, InterruptedException {
 131         return doWork(fjpAsync);
 132     }
 133 
 134     public int doWork(ExecutorService service) throws ExecutionException, InterruptedException {
 135         List<Future<Integer>> futures = new ArrayList<>(tasks.size());
 136         for (SampleTask task : tasks) {
 137             futures.add(service.submit(task));
 138         }
 139 
 140         int s = 0;
 141         for (Future<Integer> future : futures) {
 142             s += future.get();
 143         }
 144         return s;
 145     }
 146 
 147     public class SampleTask implements Callable<Integer> {
 148         @Override
 149         public Integer call() throws Exception {
 150             return arg;
 151         }
 152     }
 153 
 154 }