1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  *
   4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5  *
   6  * The contents of this file are subject to the terms of either the Universal Permissive License
   7  * v 1.0 as shown at http://oss.oracle.com/licenses/upl
   8  *
   9  * or the following license:
  10  *
  11  * Redistribution and use in source and binary forms, with or without modification, are permitted
  12  * provided that the following conditions are met:
  13  *
  14  * 1. Redistributions of source code must retain the above copyright notice, this list of conditions
  15  * and the following disclaimer.
  16  *
  17  * 2. Redistributions in binary form must reproduce the above copyright notice, this list of
  18  * conditions and the following disclaimer in the documentation and/or other materials provided with
  19  * the distribution.
  20  *
  21  * 3. Neither the name of the copyright holder nor the names of its contributors may be used to
  22  * endorse or promote products derived from this software without specific prior written permission.
  23  *
  24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
  25  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
  26  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
  27  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  28  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  29  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
  30  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
  31  * WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  32  */
  33 package org.openjdk.jmc.flightrecorder.internal;
  34 
  35 import java.io.DataInput;
  36 import java.io.DataInputStream;
  37 import java.io.IOException;
  38 import java.io.InputStream;
  39 import java.io.RandomAccessFile;
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.HashSet;
  43 import java.util.LinkedList;
  44 import java.util.List;
  45 import java.util.Set;
  46 import java.util.concurrent.ExecutionException;
  47 import java.util.concurrent.ExecutorCompletionService;
  48 import java.util.concurrent.ExecutorService;
  49 import java.util.concurrent.Executors;
  50 import java.util.concurrent.Future;
  51 import java.util.logging.Level;
  52 import java.util.logging.Logger;
  53 
  54 import org.openjdk.jmc.flightrecorder.CouldNotLoadRecordingException;
  55 import org.openjdk.jmc.flightrecorder.internal.parser.Chunk;
  56 import org.openjdk.jmc.flightrecorder.internal.parser.LoaderContext;
  57 import org.openjdk.jmc.flightrecorder.internal.parser.v0.ChunkLoaderV0;
  58 import org.openjdk.jmc.flightrecorder.internal.parser.v1.ChunkLoaderV1;
  59 import org.openjdk.jmc.flightrecorder.parser.IParserExtension;
  60 import org.openjdk.jmc.flightrecorder.parser.ParserExtensionRegistry;
  61 
  62 /**
  63  * Helper class for loading flight recordings from disk.
  64  */
  65 public final class FlightRecordingLoader {
  66 
  67         private static final Logger LOGGER = Logger.getLogger(FlightRecordingLoader.class.getName());
  68         private static final String SINGLE_THREADED_PARSER_PROPERTY_KEY = "org.openjdk.jmc.flightrecorder.parser.singlethreaded"; //$NON-NLS-1$
  69         private static final int MIN_MEMORY_PER_THREAD = 300 * 1024 * 1024; // Unless the chunks are very big, 300MB of available memory per parallel chunk load should be plenty
  70         private static final short VERSION_0 = 0; // JDK7 & JDK8
  71         private static final short VERSION_1 = 1; // JDK9 & JDK10
  72         private static final short VERSION_2 = 2; // JDK11
  73         private static final byte[] FLIGHT_RECORDER_MAGIC = {'F', 'L', 'R', '\0'};
  74 
  75         public static EventArray[] loadStream(InputStream stream, boolean hideExperimentals, boolean ignoreTruncatedChunk)
  76                         throws CouldNotLoadRecordingException, IOException {
  77                 return loadStream(stream, ParserExtensionRegistry.getParserExtensions(), hideExperimentals,
  78                                 ignoreTruncatedChunk);
  79         }
  80 
  81         /**
  82          * Read events from an input stream of JFR data.
  83          *
  84          * @param stream
  85          *            input stream
  86          * @param extensions
  87          *            the extensions to use when parsing the data
  88          * @param hideExperimentals
  89          *            if {@code true}, then events of types marked as experimental will be ignored when
  90          *            reading the data
  91          * @return an array of EventArrays (one event type per EventArray)
  92          */
  93         public static EventArray[] loadStream(
  94                 InputStream stream, List<? extends IParserExtension> extensions, boolean hideExperimentals,
  95                 boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException {
  96                 return readChunks(null, extensions, createChunkSupplier(stream), hideExperimentals, ignoreTruncatedChunk);
  97         }
  98 
  99         public static IChunkSupplier createChunkSupplier(final InputStream input)
 100                         throws CouldNotLoadRecordingException, IOException {
 101                 return new IChunkSupplier() {
 102 
 103                         @Override
 104                         public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException {
 105                                 int value = input.read();
 106                                 if (value < 0) {
 107                                         return null;
 108                                 }
 109                                 return createChunkInput(new DataInputStream(input), value, reusableBuffer);
 110                         }
 111                 };
 112         }
 113 
 114         public static IChunkSupplier createChunkSupplier(final RandomAccessFile input)
 115                         throws CouldNotLoadRecordingException, IOException {
 116                 return new IChunkSupplier() {
 117 
 118                         @Override
 119                         public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException {
 120                                 if (input.length() > input.getFilePointer()) {
 121                                         return createChunkInput(input, input.readUnsignedByte(), reusableBuffer);
 122                                 }
 123                                 return null;
 124                         }
 125 
 126                 };
 127         }
 128 
 129         public static IChunkSupplier createChunkSupplier(final RandomAccessFile input, Collection<ChunkInfo> chunks)
 130                         throws CouldNotLoadRecordingException, IOException {
 131                 final LinkedList<ChunkInfo> include = new LinkedList<>(chunks);
 132                 return new IChunkSupplier() {
 133 
 134                         @Override
 135                         public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException {
 136                                 if (include.isEmpty()) {
 137                                         return null;
 138                                 }
 139                                 input.seek(include.poll().getChunkPosistion());
 140                                 return createChunkInput(input, input.readUnsignedByte(), reusableBuffer);
 141                         }
 142 
 143                 };
 144 
 145         }
 146 
 147         private static Chunk createChunkInput(DataInput input, int firstByte, byte[] reusableBuffer)
 148                         throws CouldNotLoadRecordingException, IOException {
 149                 int i = 0;
 150                 while (FLIGHT_RECORDER_MAGIC[i] == firstByte) {
 151                         if (++i == FLIGHT_RECORDER_MAGIC.length) {
 152                                 return new Chunk(input, FLIGHT_RECORDER_MAGIC.length, reusableBuffer);
 153                         }
 154                         firstByte = input.readUnsignedByte();
 155                 }
 156                 throw new InvalidJfrFileException();
 157         }
 158 
 159         public static List<ChunkInfo> readChunkInfo(IChunkSupplier chunkSupplier)
 160                         throws CouldNotLoadRecordingException, IOException {
 161                 long nextChunkPos = 0;
 162                 final List<ChunkInfo> chunks = new ArrayList<>();
 163                 byte[] buffer = new byte[0];
 164                 Chunk nextChunk;
 165                 while ((nextChunk = chunkSupplier.getNextChunk(buffer)) != null) {
 166                         ChunkInfo info = getChunkInfo(nextChunk, nextChunkPos);
 167                         nextChunk.skip(info.getChunkSize());
 168                         buffer = nextChunk.getReusableBuffer();
 169                         nextChunkPos += info.getChunkSize();
 170                         chunks.add(info);
 171                 }
 172                 return chunks;
 173         }
 174 
 175         private static ChunkInfo getChunkInfo(Chunk nextChunk, long nextChunkPos)
 176                         throws CouldNotLoadRecordingException, IOException {
 177                 switch (nextChunk.getMajorVersion()) {
 178                 case VERSION_0:
 179                         return ChunkLoaderV0.getInfo(nextChunk, nextChunkPos);
 180                 case VERSION_1:
 181                 case VERSION_2:
 182                         return ChunkLoaderV1.getInfo(nextChunk, nextChunkPos);
 183                 default:
 184                         throw new VersionNotSupportedException();
 185                 }
 186         }
 187 
 188         public static EventArray[] readChunks(
 189                 Runnable monitor, IChunkSupplier chunkSupplier, boolean hideExperimentals, boolean ignoreTruncatedChunk)
 190                         throws CouldNotLoadRecordingException, IOException {
 191                 return readChunks(monitor, ParserExtensionRegistry.getParserExtensions(), chunkSupplier, hideExperimentals,
 192                                 ignoreTruncatedChunk);
 193         }
 194 
 195         public static EventArray[] readChunks(
 196                 Runnable monitor, List<? extends IParserExtension> extensions, IChunkSupplier chunkSupplier,
 197                 boolean hideExperimentals, boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException {
 198                 LoaderContext context = new LoaderContext(extensions, hideExperimentals);
 199                 Runtime rt = Runtime.getRuntime();
 200                 long availableMemory = rt.maxMemory() - rt.totalMemory() + rt.freeMemory();
 201                 long maxBuffersCount = Math.min(Math.max(availableMemory / MIN_MEMORY_PER_THREAD, 1),
 202                                 rt.availableProcessors() - 1);
 203 
 204                 ExecutorService threadPool;
 205                 if (Boolean.getBoolean(SINGLE_THREADED_PARSER_PROPERTY_KEY)) {
 206                         threadPool = Executors.newSingleThreadExecutor();
 207                 } else {
 208                         threadPool = Executors.newCachedThreadPool();
 209                 }
 210 
 211                 int chunkCount = 0;
 212                 try {
 213                         ExecutorCompletionService<byte[]> service = new ExecutorCompletionService<>(threadPool);
 214                         byte[] buffer = new byte[0];
 215                         int outstanding = 0;
 216                         Set<Long> loadedChunkTimestamps = new HashSet<>();
 217                         IChunkLoader chunkLoader;
 218                         while ((chunkLoader = createChunkLoader(chunkSupplier, context, buffer, ignoreTruncatedChunk)) != null) {
 219                                 Long ts = chunkLoader.getTimestamp();
 220                                 if (!loadedChunkTimestamps.contains(ts)) {
 221                                         loadedChunkTimestamps.add(ts);
 222                                         service.submit(chunkLoader);
 223                                         chunkCount++;
 224                                         outstanding++;
 225                                         // Recover buffer from finished chunk loader for reuse or create a new buffer
 226                                         Future<byte[]> available = service.poll();
 227                                         if (available != null) {
 228                                                 buffer = available.get();
 229                                                 sendProgress(monitor);
 230                                                 outstanding--;
 231                                         } else if (outstanding < maxBuffersCount) {
 232                                                 buffer = new byte[0];
 233                                         } else {
 234                                                 buffer = service.take().get();
 235                                                 sendProgress(monitor);
 236                                                 outstanding--;
 237                                         }
 238                                 }
 239                         }
 240                         // Wait for all outstanding loaders to complete
 241                         while (outstanding > 0) {
 242                                 service.take().get();
 243                                 sendProgress(monitor);
 244                                 outstanding--;
 245                         }
 246                         if (chunkCount == 0) {
 247                                 // Recordings without any chunks are not allowed
 248                                 throw new InvalidJfrFileException("No readable chunks in recording"); //$NON-NLS-1$
 249                         }
 250                 } catch (InterruptedException e) {
 251                         throw new CouldNotLoadRecordingException(e);
 252                 } catch (ExecutionException e) {
 253                         Throwable cause = e.getCause();
 254                         if (cause instanceof Error) {
 255                                 throw ((Error) cause);
 256                         } else if (cause instanceof RuntimeException) {
 257                                 throw ((RuntimeException) cause);
 258                         } else if (cause instanceof IOException) {
 259                                 throw ((IOException) cause);
 260                         } else if (cause instanceof CouldNotLoadRecordingException) {
 261                                 throw ((CouldNotLoadRecordingException) cause);
 262                         } else {
 263                                 throw new CouldNotLoadRecordingException(cause);
 264                         }
 265                 } finally {
 266                         threadPool.shutdownNow();
 267                 }
 268                 LOGGER.fine("Loaded JFR with " + chunkCount + " chunks"); //$NON-NLS-1$ //$NON-NLS-2$
 269                 return context.buildEventArrays();
 270         }
 271 
 272         private static void sendProgress(Runnable listener) {
 273                 if (listener != null) {
 274                         listener.run();
 275                 }
 276         }
 277 
 278         /**
 279          * @param chunkSupplier
 280          *            chunk data source
 281          * @param context
 282          *            loader context that the returned chunk loader will send event data to
 283          * @param buffer
 284          *            Initial byte array to use for storing chunk data. See
 285          *            {@link IChunkSupplier#getNextChunk(byte[])}.
 286          * @param ignoreTruncatedChunk
 287          *            if true, then any exceptions caused by getting and reading the next chunk will be
 288          *            ignored and instead make the method return null
 289          * @return a new chunk loader or null if no more data is available from the chunk supplier
 290          */
 291         private static IChunkLoader createChunkLoader(
 292                 IChunkSupplier chunkSupplier, LoaderContext context, byte[] buffer, boolean ignoreTruncatedChunk)
 293                         throws CouldNotLoadRecordingException, IOException {
 294                 try {
 295                         Chunk chunk = chunkSupplier.getNextChunk(buffer);
 296                         if (chunk != null) {
 297                                 switch (chunk.getMajorVersion()) {
 298                                 case VERSION_0:
 299                                         return ChunkLoaderV0.create(chunk, context);
 300                                 case VERSION_1:
 301                                 case VERSION_2:
 302                                         return ChunkLoaderV1.create(chunk, context);
 303                                 default:
 304                                         throw new VersionNotSupportedException();
 305                                 }
 306                         }
 307                 } catch (IOException e) {
 308                         if (ignoreTruncatedChunk) {
 309                                 LOGGER.log(Level.INFO, "Ignoring exception while reading chunk", e); //$NON-NLS-1$
 310                         } else {
 311                                 throw e;
 312                         }
 313                 }
 314                 return null;
 315         }
 316 }