1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * The contents of this file are subject to the terms of either the Universal Permissive License 7 * v 1.0 as shown at http://oss.oracle.com/licenses/upl 8 * 9 * or the following license: 10 * 11 * Redistribution and use in source and binary forms, with or without modification, are permitted 12 * provided that the following conditions are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright notice, this list of conditions 15 * and the following disclaimer. 16 * 17 * 2. Redistributions in binary form must reproduce the above copyright notice, this list of 18 * conditions and the following disclaimer in the documentation and/or other materials provided with 19 * the distribution. 20 * 21 * 3. Neither the name of the copyright holder nor the names of its contributors may be used to 22 * endorse or promote products derived from this software without specific prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR 25 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 26 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 27 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 30 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY 31 * WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 package org.openjdk.jmc.flightrecorder.internal; 34 35 import java.io.DataInput; 36 import java.io.DataInputStream; 37 import java.io.IOException; 38 import java.io.InputStream; 39 import java.io.RandomAccessFile; 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.HashSet; 43 import java.util.LinkedList; 44 import java.util.List; 45 import java.util.Set; 46 import java.util.concurrent.ExecutionException; 47 import java.util.concurrent.ExecutorCompletionService; 48 import java.util.concurrent.ExecutorService; 49 import java.util.concurrent.Executors; 50 import java.util.concurrent.Future; 51 import java.util.logging.Level; 52 import java.util.logging.Logger; 53 54 import org.openjdk.jmc.flightrecorder.CouldNotLoadRecordingException; 55 import org.openjdk.jmc.flightrecorder.internal.parser.Chunk; 56 import org.openjdk.jmc.flightrecorder.internal.parser.LoaderContext; 57 import org.openjdk.jmc.flightrecorder.internal.parser.v0.ChunkLoaderV0; 58 import org.openjdk.jmc.flightrecorder.internal.parser.v1.ChunkLoaderV1; 59 import org.openjdk.jmc.flightrecorder.parser.IParserExtension; 60 import org.openjdk.jmc.flightrecorder.parser.ParserExtensionRegistry; 61 62 /** 63 * Helper class for loading flight recordings from disk. 64 */ 65 public final class FlightRecordingLoader { 66 67 private static final Logger LOGGER = Logger.getLogger(FlightRecordingLoader.class.getName()); 68 private static final String SINGLE_THREADED_PARSER_PROPERTY_KEY = "org.openjdk.jmc.flightrecorder.parser.singlethreaded"; //$NON-NLS-1$ 69 private static final int MIN_MEMORY_PER_THREAD = 300 * 1024 * 1024; // Unless the chunks are very big, 300MB of available memory per parallel chunk load should be plenty 70 private static final short VERSION_0 = 0; // JDK7 & JDK8 71 private static final short VERSION_1 = 1; // JDK9 72 private static final byte[] FLIGHT_RECORDER_MAGIC = {'F', 'L', 'R', '\0'}; 73 74 public static EventArray[] loadStream(InputStream stream, boolean hideExperimentals, boolean ignoreTruncatedChunk) 75 throws CouldNotLoadRecordingException, IOException { 76 return loadStream(stream, ParserExtensionRegistry.getParserExtensions(), hideExperimentals, 77 ignoreTruncatedChunk); 78 } 79 80 /** 81 * Read events from an input stream of JFR data. 82 * 83 * @param stream 84 * input stream 85 * @param extensions 86 * the extensions to use when parsing the data 87 * @param hideExperimentals 88 * if {@code true}, then events of types marked as experimental will be ignored when 89 * reading the data 90 * @return an array of EventArrays (one event type per EventArray) 91 */ 92 public static EventArray[] loadStream( 93 InputStream stream, List<? extends IParserExtension> extensions, boolean hideExperimentals, 94 boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException { 95 return readChunks(null, extensions, createChunkSupplier(stream), hideExperimentals, ignoreTruncatedChunk); 96 } 97 98 public static IChunkSupplier createChunkSupplier(final InputStream input) 99 throws CouldNotLoadRecordingException, IOException { 100 return new IChunkSupplier() { 101 102 @Override 103 public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException { 104 int value = input.read(); 105 if (value < 0) { 106 return null; 107 } 108 return createChunkInput(new DataInputStream(input), value, reusableBuffer); 109 } 110 }; 111 } 112 113 public static IChunkSupplier createChunkSupplier(final RandomAccessFile input) 114 throws CouldNotLoadRecordingException, IOException { 115 return new IChunkSupplier() { 116 117 @Override 118 public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException { 119 if (input.length() > input.getFilePointer()) { 120 return createChunkInput(input, input.readUnsignedByte(), reusableBuffer); 121 } 122 return null; 123 } 124 125 }; 126 } 127 128 public static IChunkSupplier createChunkSupplier(final RandomAccessFile input, Collection<ChunkInfo> chunks) 129 throws CouldNotLoadRecordingException, IOException { 130 final LinkedList<ChunkInfo> include = new LinkedList<>(chunks); 131 return new IChunkSupplier() { 132 133 @Override 134 public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException { 135 if (include.isEmpty()) { 136 return null; 137 } 138 input.seek(include.poll().getChunkPosistion()); 139 return createChunkInput(input, input.readUnsignedByte(), reusableBuffer); 140 } 141 142 }; 143 144 } 145 146 private static Chunk createChunkInput(DataInput input, int firstByte, byte[] reusableBuffer) 147 throws CouldNotLoadRecordingException, IOException { 148 int i = 0; 149 while (FLIGHT_RECORDER_MAGIC[i] == firstByte) { 150 if (++i == FLIGHT_RECORDER_MAGIC.length) { 151 return new Chunk(input, FLIGHT_RECORDER_MAGIC.length, reusableBuffer); 152 } 153 firstByte = input.readUnsignedByte(); 154 } 155 throw new InvalidJfrFileException(); 156 } 157 158 public static List<ChunkInfo> readChunkInfo(IChunkSupplier chunkSupplier) 159 throws CouldNotLoadRecordingException, IOException { 160 long nextChunkPos = 0; 161 final List<ChunkInfo> chunks = new ArrayList<>(); 162 byte[] buffer = new byte[0]; 163 Chunk nextChunk; 164 while ((nextChunk = chunkSupplier.getNextChunk(buffer)) != null) { 165 ChunkInfo info = getChunkInfo(nextChunk, nextChunkPos); 166 nextChunk.skip(info.getChunkSize()); 167 buffer = nextChunk.getReusableBuffer(); 168 nextChunkPos += info.getChunkSize(); 169 chunks.add(info); 170 } 171 return chunks; 172 } 173 174 private static ChunkInfo getChunkInfo(Chunk nextChunk, long nextChunkPos) 175 throws CouldNotLoadRecordingException, IOException { 176 switch (nextChunk.getMajorVersion()) { 177 case VERSION_0: 178 return ChunkLoaderV0.getInfo(nextChunk, nextChunkPos); 179 case VERSION_1: 180 return ChunkLoaderV1.getInfo(nextChunk, nextChunkPos); 181 default: 182 throw new VersionNotSupportedException(); 183 } 184 } 185 186 public static EventArray[] readChunks( 187 Runnable monitor, IChunkSupplier chunkSupplier, boolean hideExperimentals, boolean ignoreTruncatedChunk) 188 throws CouldNotLoadRecordingException, IOException { 189 return readChunks(monitor, ParserExtensionRegistry.getParserExtensions(), chunkSupplier, hideExperimentals, 190 ignoreTruncatedChunk); 191 } 192 193 public static EventArray[] readChunks( 194 Runnable monitor, List<? extends IParserExtension> extensions, IChunkSupplier chunkSupplier, 195 boolean hideExperimentals, boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException { 196 LoaderContext context = new LoaderContext(extensions, hideExperimentals); 197 Runtime rt = Runtime.getRuntime(); 198 long availableMemory = rt.maxMemory() - rt.totalMemory() + rt.freeMemory(); 199 long maxBuffersCount = Math.min(Math.max(availableMemory / MIN_MEMORY_PER_THREAD, 1), 200 rt.availableProcessors() - 1); 201 202 ExecutorService threadPool; 203 if (Boolean.getBoolean(SINGLE_THREADED_PARSER_PROPERTY_KEY)) { 204 threadPool = Executors.newSingleThreadExecutor(); 205 } else { 206 threadPool = Executors.newCachedThreadPool(); 207 } 208 209 int chunkCount = 0; 210 try { 211 ExecutorCompletionService<byte[]> service = new ExecutorCompletionService<>(threadPool); 212 byte[] buffer = new byte[0]; 213 int outstanding = 0; 214 Set<Long> loadedChunkTimestamps = new HashSet<>(); 215 IChunkLoader chunkLoader; 216 while ((chunkLoader = createChunkLoader(chunkSupplier, context, buffer, ignoreTruncatedChunk)) != null) { 217 Long ts = chunkLoader.getTimestamp(); 218 if (!loadedChunkTimestamps.contains(ts)) { 219 loadedChunkTimestamps.add(ts); 220 service.submit(chunkLoader); 221 chunkCount++; 222 outstanding++; 223 // Recover buffer from finished chunk loader for reuse or create a new buffer 224 Future<byte[]> available = service.poll(); 225 if (available != null) { 226 buffer = available.get(); 227 sendProgress(monitor); 228 outstanding--; 229 } else if (outstanding < maxBuffersCount) { 230 buffer = new byte[0]; 231 } else { 232 buffer = service.take().get(); 233 sendProgress(monitor); 234 outstanding--; 235 } 236 } 237 } 238 // Wait for all outstanding loaders to complete 239 while (outstanding > 0) { 240 service.take().get(); 241 sendProgress(monitor); 242 outstanding--; 243 } 244 if (chunkCount == 0) { 245 // Recordings without any chunks are not allowed 246 throw new InvalidJfrFileException("No readable chunks in recording"); //$NON-NLS-1$ 247 } 248 } catch (InterruptedException e) { 249 throw new CouldNotLoadRecordingException(e); 250 } catch (ExecutionException e) { 251 Throwable cause = e.getCause(); 252 if (cause instanceof Error) { 253 throw ((Error) cause); 254 } else if (cause instanceof RuntimeException) { 255 throw ((RuntimeException) cause); 256 } else if (cause instanceof IOException) { 257 throw ((IOException) cause); 258 } else if (cause instanceof CouldNotLoadRecordingException) { 259 throw ((CouldNotLoadRecordingException) cause); 260 } else { 261 throw new CouldNotLoadRecordingException(cause); 262 } 263 } finally { 264 threadPool.shutdownNow(); 265 } 266 LOGGER.fine("Loaded JFR with " + chunkCount + " chunks"); //$NON-NLS-1$ //$NON-NLS-2$ 267 return context.buildEventArrays(); 268 } 269 270 private static void sendProgress(Runnable listener) { 271 if (listener != null) { 272 listener.run(); 273 } 274 } 275 276 /** 277 * @param chunkSupplier 278 * chunk data source 279 * @param context 280 * loader context that the returned chunk loader will send event data to 281 * @param buffer 282 * Initial byte array to use for storing chunk data. See 283 * {@link IChunkSupplier#getNextChunk(byte[])}. 284 * @param ignoreTruncatedChunk 285 * if true, then any exceptions caused by getting and reading the next chunk will be 286 * ignored and instead make the method return null 287 * @return a new chunk loader or null if no more data is available from the chunk supplier 288 */ 289 private static IChunkLoader createChunkLoader( 290 IChunkSupplier chunkSupplier, LoaderContext context, byte[] buffer, boolean ignoreTruncatedChunk) 291 throws CouldNotLoadRecordingException, IOException { 292 try { 293 Chunk chunk = chunkSupplier.getNextChunk(buffer); 294 if (chunk != null) { 295 switch (chunk.getMajorVersion()) { 296 case VERSION_0: 297 return ChunkLoaderV0.create(chunk, context); 298 case VERSION_1: 299 return ChunkLoaderV1.create(chunk, context); 300 default: 301 throw new VersionNotSupportedException(); 302 } 303 } 304 } catch (IOException e) { 305 if (ignoreTruncatedChunk) { 306 LOGGER.log(Level.INFO, "Ignoring exception while reading chunk", e); //$NON-NLS-1$ 307 } else { 308 throw e; 309 } 310 } 311 return null; 312 } 313 }