1 /* 2 * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.bench.java.util.concurrent; 24 25 import org.openjdk.jmh.annotations.Benchmark; 26 import org.openjdk.jmh.annotations.OutputTimeUnit; 27 import org.openjdk.jmh.annotations.Param; 28 import org.openjdk.jmh.annotations.Scope; 29 import org.openjdk.jmh.annotations.Setup; 30 import org.openjdk.jmh.annotations.State; 31 import org.openjdk.jmh.annotations.TearDown; 32 33 import java.util.ArrayList; 34 import java.util.List; 35 import java.util.concurrent.ArrayBlockingQueue; 36 import java.util.concurrent.Callable; 37 import java.util.concurrent.ExecutionException; 38 import java.util.concurrent.ExecutorService; 39 import java.util.concurrent.ForkJoinPool; 40 import java.util.concurrent.Future; 41 import java.util.concurrent.ThreadPoolExecutor; 42 import java.util.concurrent.TimeUnit; 43 44 /** 45 * Benchmark assesses general ForkJoinPool performance with simple tasks 46 * 47 * @author Aleksey Shipilev (aleksey.shipilev@oracle.com) 48 */ 49 @OutputTimeUnit(TimeUnit.SECONDS) 50 @State(Scope.Benchmark) 51 public class ForkJoinPoolRawCallable { 52 53 /** 54 * Implementation notes: 55 * 56 * This test submits empty callables. 57 * Callables are submitted in batches, to prevent convoying by driver threads. 58 * One driver thread can saturate up to BATCH_SIZE threads. 59 * 60 * One baseline includes raw throughput, without submissions to executors. 61 * This is not considered as fair comparison, but left around as basic compute baseline. 62 * Executors could not possibly be faster than that. 63 * 64 * Another baseline includes ThreadPoolExecutor. 65 * Note that this baseline is inherently non-scalable with ABQ backing TPE. 66 * The size of ABQ is chosen to accommodate tons of threads, which can also suffer due to cache effects. 67 * 68 * Tasks are reading public volatile field to break opportunistic optimizations in loops. 69 * Tasks are pre-allocated to negate instantiation costs. 70 */ 71 72 @Param("0") 73 private int workers; 74 75 @Param("1000") 76 private int batchSize; 77 78 private ThreadPoolExecutor tpe; 79 private ForkJoinPool fjpSync; 80 private ForkJoinPool fjpAsync; 81 private List<SampleTask> tasks; 82 83 public volatile int arg = 42; 84 85 @Setup 86 public void setup() { 87 SampleTask task = new SampleTask(); 88 89 tasks = new ArrayList<>(); 90 for (int c = 0; c < batchSize; c++) { 91 tasks.add(task); 92 } 93 94 if (workers == 0) { 95 workers = Runtime.getRuntime().availableProcessors(); 96 } 97 98 tpe = new ThreadPoolExecutor(workers, workers, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(batchSize * batchSize)); 99 fjpSync = new ForkJoinPool(workers, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, false); 100 fjpAsync = new ForkJoinPool(workers, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); 101 } 102 103 @TearDown 104 public void teardown() { 105 tpe.shutdownNow(); 106 fjpSync.shutdownNow(); 107 fjpAsync.shutdownNow(); 108 } 109 110 @Benchmark 111 public int baseline_raw() throws Exception { 112 int s = 0; 113 for (SampleTask t : tasks) { 114 s += t.call(); 115 } 116 return s; 117 } 118 119 @Benchmark 120 public int baseline_TPE() throws Exception { 121 return doWork(tpe); 122 } 123 124 @Benchmark 125 public int testSync() throws ExecutionException, InterruptedException { 126 return doWork(fjpSync); 127 } 128 129 @Benchmark 130 public int testAsync() throws ExecutionException, InterruptedException { 131 return doWork(fjpAsync); 132 } 133 134 public int doWork(ExecutorService service) throws ExecutionException, InterruptedException { 135 List<Future<Integer>> futures = new ArrayList<>(tasks.size()); 136 for (SampleTask task : tasks) { 137 futures.add(service.submit(task)); 138 } 139 140 int s = 0; 141 for (Future<Integer> future : futures) { 142 s += future.get(); 143 } 144 return s; 145 } 146 147 public class SampleTask implements Callable<Integer> { 148 @Override 149 public Integer call() throws Exception { 150 return arg; 151 } 152 } 153 154 }