1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * The contents of this file are subject to the terms of either the Universal Permissive License 7 * v 1.0 as shown at http://oss.oracle.com/licenses/upl 8 * 9 * or the following license: 10 * 11 * Redistribution and use in source and binary forms, with or without modification, are permitted 12 * provided that the following conditions are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright notice, this list of conditions 15 * and the following disclaimer. 16 * 17 * 2. Redistributions in binary form must reproduce the above copyright notice, this list of 18 * conditions and the following disclaimer in the documentation and/or other materials provided with 19 * the distribution. 20 * 21 * 3. Neither the name of the copyright holder nor the names of its contributors may be used to 22 * endorse or promote products derived from this software without specific prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR 25 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 26 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 27 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 30 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY 31 * WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 package org.openjdk.jmc.flightrecorder.internal; 34 35 import java.io.DataInput; 36 import java.io.DataInputStream; 37 import java.io.IOException; 38 import java.io.InputStream; 39 import java.io.RandomAccessFile; 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.HashSet; 43 import java.util.LinkedList; 44 import java.util.List; 45 import java.util.Set; 46 import java.util.concurrent.ExecutionException; 47 import java.util.concurrent.ExecutorCompletionService; 48 import java.util.concurrent.ExecutorService; 49 import java.util.concurrent.Executors; 50 import java.util.concurrent.Future; 51 import java.util.logging.Level; 52 import java.util.logging.Logger; 53 54 import org.openjdk.jmc.flightrecorder.CouldNotLoadRecordingException; 55 import org.openjdk.jmc.flightrecorder.internal.parser.Chunk; 56 import org.openjdk.jmc.flightrecorder.internal.parser.LoaderContext; 57 import org.openjdk.jmc.flightrecorder.internal.parser.v0.ChunkLoaderV0; 58 import org.openjdk.jmc.flightrecorder.internal.parser.v1.ChunkLoaderV1; 59 import org.openjdk.jmc.flightrecorder.parser.IParserExtension; 60 import org.openjdk.jmc.flightrecorder.parser.ParserExtensionRegistry; 61 62 /** 63 * Helper class for loading flight recordings from disk. 64 */ 65 public final class FlightRecordingLoader { 66 67 private static final Logger LOGGER = Logger.getLogger(FlightRecordingLoader.class.getName()); 68 private static final String SINGLE_THREADED_PARSER_PROPERTY_KEY = "org.openjdk.jmc.flightrecorder.parser.singlethreaded"; //$NON-NLS-1$ 69 private static final int MIN_MEMORY_PER_THREAD = 300 * 1024 * 1024; // Unless the chunks are very big, 300MB of available memory per parallel chunk load should be plenty 70 private static final short VERSION_0 = 0; // JDK7 & JDK8 71 private static final short VERSION_1 = 1; // JDK9 & JDK10 72 private static final short VERSION_2 = 2; // JDK11 73 private static final byte[] FLIGHT_RECORDER_MAGIC = {'F', 'L', 'R', '\0'}; 74 75 public static EventArray[] loadStream(InputStream stream, boolean hideExperimentals, boolean ignoreTruncatedChunk) 76 throws CouldNotLoadRecordingException, IOException { 77 return loadStream(stream, ParserExtensionRegistry.getParserExtensions(), hideExperimentals, 78 ignoreTruncatedChunk); 79 } 80 81 /** 82 * Read events from an input stream of JFR data. 83 * 84 * @param stream 85 * input stream 86 * @param extensions 87 * the extensions to use when parsing the data 88 * @param hideExperimentals 89 * if {@code true}, then events of types marked as experimental will be ignored when 90 * reading the data 91 * @return an array of EventArrays (one event type per EventArray) 92 */ 93 public static EventArray[] loadStream( 94 InputStream stream, List<? extends IParserExtension> extensions, boolean hideExperimentals, 95 boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException { 96 return readChunks(null, extensions, createChunkSupplier(stream), hideExperimentals, ignoreTruncatedChunk); 97 } 98 99 public static IChunkSupplier createChunkSupplier(final InputStream input) 100 throws CouldNotLoadRecordingException, IOException { 101 return new IChunkSupplier() { 102 103 @Override 104 public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException { 105 int value = input.read(); 106 if (value < 0) { 107 return null; 108 } 109 return createChunkInput(new DataInputStream(input), value, reusableBuffer); 110 } 111 }; 112 } 113 114 public static IChunkSupplier createChunkSupplier(final RandomAccessFile input) 115 throws CouldNotLoadRecordingException, IOException { 116 return new IChunkSupplier() { 117 118 @Override 119 public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException { 120 if (input.length() > input.getFilePointer()) { 121 return createChunkInput(input, input.readUnsignedByte(), reusableBuffer); 122 } 123 return null; 124 } 125 126 }; 127 } 128 129 public static IChunkSupplier createChunkSupplier(final RandomAccessFile input, Collection<ChunkInfo> chunks) 130 throws CouldNotLoadRecordingException, IOException { 131 final LinkedList<ChunkInfo> include = new LinkedList<>(chunks); 132 return new IChunkSupplier() { 133 134 @Override 135 public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException { 136 if (include.isEmpty()) { 137 return null; 138 } 139 input.seek(include.poll().getChunkPosistion()); 140 return createChunkInput(input, input.readUnsignedByte(), reusableBuffer); 141 } 142 143 }; 144 145 } 146 147 private static Chunk createChunkInput(DataInput input, int firstByte, byte[] reusableBuffer) 148 throws CouldNotLoadRecordingException, IOException { 149 int i = 0; 150 while (FLIGHT_RECORDER_MAGIC[i] == firstByte) { 151 if (++i == FLIGHT_RECORDER_MAGIC.length) { 152 return new Chunk(input, FLIGHT_RECORDER_MAGIC.length, reusableBuffer); 153 } 154 firstByte = input.readUnsignedByte(); 155 } 156 throw new InvalidJfrFileException(); 157 } 158 159 public static List<ChunkInfo> readChunkInfo(IChunkSupplier chunkSupplier) 160 throws CouldNotLoadRecordingException, IOException { 161 long nextChunkPos = 0; 162 final List<ChunkInfo> chunks = new ArrayList<>(); 163 byte[] buffer = new byte[0]; 164 Chunk nextChunk; 165 while ((nextChunk = chunkSupplier.getNextChunk(buffer)) != null) { 166 ChunkInfo info = getChunkInfo(nextChunk, nextChunkPos); 167 nextChunk.skip(info.getChunkSize()); 168 buffer = nextChunk.getReusableBuffer(); 169 nextChunkPos += info.getChunkSize(); 170 chunks.add(info); 171 } 172 return chunks; 173 } 174 175 private static ChunkInfo getChunkInfo(Chunk nextChunk, long nextChunkPos) 176 throws CouldNotLoadRecordingException, IOException { 177 switch (nextChunk.getMajorVersion()) { 178 case VERSION_0: 179 return ChunkLoaderV0.getInfo(nextChunk, nextChunkPos); 180 case VERSION_1: 181 case VERSION_2: 182 return ChunkLoaderV1.getInfo(nextChunk, nextChunkPos); 183 default: 184 throw new VersionNotSupportedException(); 185 } 186 } 187 188 public static EventArray[] readChunks( 189 Runnable monitor, IChunkSupplier chunkSupplier, boolean hideExperimentals, boolean ignoreTruncatedChunk) 190 throws CouldNotLoadRecordingException, IOException { 191 return readChunks(monitor, ParserExtensionRegistry.getParserExtensions(), chunkSupplier, hideExperimentals, 192 ignoreTruncatedChunk); 193 } 194 195 public static EventArray[] readChunks( 196 Runnable monitor, List<? extends IParserExtension> extensions, IChunkSupplier chunkSupplier, 197 boolean hideExperimentals, boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException { 198 LoaderContext context = new LoaderContext(extensions, hideExperimentals); 199 Runtime rt = Runtime.getRuntime(); 200 long availableMemory = rt.maxMemory() - rt.totalMemory() + rt.freeMemory(); 201 long maxBuffersCount = Math.min(Math.max(availableMemory / MIN_MEMORY_PER_THREAD, 1), 202 rt.availableProcessors() - 1); 203 204 ExecutorService threadPool; 205 if (Boolean.getBoolean(SINGLE_THREADED_PARSER_PROPERTY_KEY)) { 206 threadPool = Executors.newSingleThreadExecutor(); 207 } else { 208 threadPool = Executors.newCachedThreadPool(); 209 } 210 211 int chunkCount = 0; 212 try { 213 ExecutorCompletionService<byte[]> service = new ExecutorCompletionService<>(threadPool); 214 byte[] buffer = new byte[0]; 215 int outstanding = 0; 216 Set<Long> loadedChunkTimestamps = new HashSet<>(); 217 IChunkLoader chunkLoader; 218 while ((chunkLoader = createChunkLoader(chunkSupplier, context, buffer, ignoreTruncatedChunk)) != null) { 219 Long ts = chunkLoader.getTimestamp(); 220 if (!loadedChunkTimestamps.contains(ts)) { 221 loadedChunkTimestamps.add(ts); 222 service.submit(chunkLoader); 223 chunkCount++; 224 outstanding++; 225 // Recover buffer from finished chunk loader for reuse or create a new buffer 226 Future<byte[]> available = service.poll(); 227 if (available != null) { 228 buffer = available.get(); 229 sendProgress(monitor); 230 outstanding--; 231 } else if (outstanding < maxBuffersCount) { 232 buffer = new byte[0]; 233 } else { 234 buffer = service.take().get(); 235 sendProgress(monitor); 236 outstanding--; 237 } 238 } 239 } 240 // Wait for all outstanding loaders to complete 241 while (outstanding > 0) { 242 service.take().get(); 243 sendProgress(monitor); 244 outstanding--; 245 } 246 if (chunkCount == 0) { 247 // Recordings without any chunks are not allowed 248 throw new InvalidJfrFileException("No readable chunks in recording"); //$NON-NLS-1$ 249 } 250 } catch (InterruptedException e) { 251 throw new CouldNotLoadRecordingException(e); 252 } catch (ExecutionException e) { 253 Throwable cause = e.getCause(); 254 if (cause instanceof Error) { 255 throw ((Error) cause); 256 } else if (cause instanceof RuntimeException) { 257 throw ((RuntimeException) cause); 258 } else if (cause instanceof IOException) { 259 throw ((IOException) cause); 260 } else if (cause instanceof CouldNotLoadRecordingException) { 261 throw ((CouldNotLoadRecordingException) cause); 262 } else { 263 throw new CouldNotLoadRecordingException(cause); 264 } 265 } finally { 266 threadPool.shutdownNow(); 267 } 268 LOGGER.fine("Loaded JFR with " + chunkCount + " chunks"); //$NON-NLS-1$ //$NON-NLS-2$ 269 return context.buildEventArrays(); 270 } 271 272 private static void sendProgress(Runnable listener) { 273 if (listener != null) { 274 listener.run(); 275 } 276 } 277 278 /** 279 * @param chunkSupplier 280 * chunk data source 281 * @param context 282 * loader context that the returned chunk loader will send event data to 283 * @param buffer 284 * Initial byte array to use for storing chunk data. See 285 * {@link IChunkSupplier#getNextChunk(byte[])}. 286 * @param ignoreTruncatedChunk 287 * if true, then any exceptions caused by getting and reading the next chunk will be 288 * ignored and instead make the method return null 289 * @return a new chunk loader or null if no more data is available from the chunk supplier 290 */ 291 private static IChunkLoader createChunkLoader( 292 IChunkSupplier chunkSupplier, LoaderContext context, byte[] buffer, boolean ignoreTruncatedChunk) 293 throws CouldNotLoadRecordingException, IOException { 294 try { 295 Chunk chunk = chunkSupplier.getNextChunk(buffer); 296 if (chunk != null) { 297 switch (chunk.getMajorVersion()) { 298 case VERSION_0: 299 return ChunkLoaderV0.create(chunk, context); 300 case VERSION_1: 301 case VERSION_2: 302 return ChunkLoaderV1.create(chunk, context); 303 default: 304 throw new VersionNotSupportedException(); 305 } 306 } 307 } catch (IOException e) { 308 if (ignoreTruncatedChunk) { 309 LOGGER.log(Level.INFO, "Ignoring exception while reading chunk", e); //$NON-NLS-1$ 310 } else { 311 throw e; 312 } 313 } 314 return null; 315 } 316 }