1 /*
   2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
   3  *
   4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5  *
   6  * The contents of this file are subject to the terms of either the Universal Permissive License
   7  * v 1.0 as shown at http://oss.oracle.com/licenses/upl
   8  *
   9  * or the following license:
  10  *
  11  * Redistribution and use in source and binary forms, with or without modification, are permitted
  12  * provided that the following conditions are met:
  13  *
  14  * 1. Redistributions of source code must retain the above copyright notice, this list of conditions
  15  * and the following disclaimer.
  16  *
  17  * 2. Redistributions in binary form must reproduce the above copyright notice, this list of
  18  * conditions and the following disclaimer in the documentation and/or other materials provided with
  19  * the distribution.
  20  *
  21  * 3. Neither the name of the copyright holder nor the names of its contributors may be used to
  22  * endorse or promote products derived from this software without specific prior written permission.
  23  *
  24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
  25  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
  26  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
  27  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  28  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  29  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
  30  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
  31  * WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  32  */
  33 package org.openjdk.jmc.flightrecorder.internal;
  34 
  35 import java.io.DataInput;
  36 import java.io.DataInputStream;
  37 import java.io.IOException;
  38 import java.io.InputStream;
  39 import java.io.RandomAccessFile;
  40 import java.util.ArrayList;
  41 import java.util.Collection;
  42 import java.util.HashSet;
  43 import java.util.LinkedList;
  44 import java.util.List;
  45 import java.util.Set;
  46 import java.util.concurrent.ExecutionException;
  47 import java.util.concurrent.ExecutorCompletionService;
  48 import java.util.concurrent.ExecutorService;
  49 import java.util.concurrent.Executors;
  50 import java.util.concurrent.Future;
  51 import java.util.logging.Level;
  52 import java.util.logging.Logger;
  53 
  54 import org.openjdk.jmc.flightrecorder.CouldNotLoadRecordingException;
  55 import org.openjdk.jmc.flightrecorder.internal.parser.Chunk;
  56 import org.openjdk.jmc.flightrecorder.internal.parser.LoaderContext;
  57 import org.openjdk.jmc.flightrecorder.internal.parser.v0.ChunkLoaderV0;
  58 import org.openjdk.jmc.flightrecorder.internal.parser.v1.ChunkLoaderV1;
  59 import org.openjdk.jmc.flightrecorder.parser.IParserExtension;
  60 import org.openjdk.jmc.flightrecorder.parser.ParserExtensionRegistry;
  61 
  62 /**
  63  * Helper class for loading flight recordings from disk.
  64  */
  65 public final class FlightRecordingLoader {
  66 
  67         private static final Logger LOGGER = Logger.getLogger(FlightRecordingLoader.class.getName());
  68         private static final String SINGLE_THREADED_PARSER_PROPERTY_KEY = "org.openjdk.jmc.flightrecorder.parser.singlethreaded"; //$NON-NLS-1$
  69         private static final int MIN_MEMORY_PER_THREAD = 300 * 1024 * 1024; // Unless the chunks are very big, 300MB of available memory per parallel chunk load should be plenty
  70         private static final short VERSION_0 = 0; // JDK7 & JDK8
  71         private static final short VERSION_1 = 1; // JDK9
  72         private static final byte[] FLIGHT_RECORDER_MAGIC = {'F', 'L', 'R', '\0'};
  73 
  74         public static EventArray[] loadStream(InputStream stream, boolean hideExperimentals, boolean ignoreTruncatedChunk)
  75                         throws CouldNotLoadRecordingException, IOException {
  76                 return loadStream(stream, ParserExtensionRegistry.getParserExtensions(), hideExperimentals,
  77                                 ignoreTruncatedChunk);
  78         }
  79 
  80         /**
  81          * Read events from an input stream of JFR data.
  82          *
  83          * @param stream
  84          *            input stream
  85          * @param extensions
  86          *            the extensions to use when parsing the data
  87          * @param hideExperimentals
  88          *            if {@code true}, then events of types marked as experimental will be ignored when
  89          *            reading the data
  90          * @return an array of EventArrays (one event type per EventArray)
  91          */
  92         public static EventArray[] loadStream(
  93                 InputStream stream, List<? extends IParserExtension> extensions, boolean hideExperimentals,
  94                 boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException {
  95                 return readChunks(null, extensions, createChunkSupplier(stream), hideExperimentals, ignoreTruncatedChunk);
  96         }
  97 
  98         public static IChunkSupplier createChunkSupplier(final InputStream input)
  99                         throws CouldNotLoadRecordingException, IOException {
 100                 return new IChunkSupplier() {
 101 
 102                         @Override
 103                         public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException {
 104                                 int value = input.read();
 105                                 if (value < 0) {
 106                                         return null;
 107                                 }
 108                                 return createChunkInput(new DataInputStream(input), value, reusableBuffer);
 109                         }
 110                 };
 111         }
 112 
 113         public static IChunkSupplier createChunkSupplier(final RandomAccessFile input)
 114                         throws CouldNotLoadRecordingException, IOException {
 115                 return new IChunkSupplier() {
 116 
 117                         @Override
 118                         public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException {
 119                                 if (input.length() > input.getFilePointer()) {
 120                                         return createChunkInput(input, input.readUnsignedByte(), reusableBuffer);
 121                                 }
 122                                 return null;
 123                         }
 124 
 125                 };
 126         }
 127 
 128         public static IChunkSupplier createChunkSupplier(final RandomAccessFile input, Collection<ChunkInfo> chunks)
 129                         throws CouldNotLoadRecordingException, IOException {
 130                 final LinkedList<ChunkInfo> include = new LinkedList<>(chunks);
 131                 return new IChunkSupplier() {
 132 
 133                         @Override
 134                         public Chunk getNextChunk(byte[] reusableBuffer) throws CouldNotLoadRecordingException, IOException {
 135                                 if (include.isEmpty()) {
 136                                         return null;
 137                                 }
 138                                 input.seek(include.poll().getChunkPosistion());
 139                                 return createChunkInput(input, input.readUnsignedByte(), reusableBuffer);
 140                         }
 141 
 142                 };
 143 
 144         }
 145 
 146         private static Chunk createChunkInput(DataInput input, int firstByte, byte[] reusableBuffer)
 147                         throws CouldNotLoadRecordingException, IOException {
 148                 int i = 0;
 149                 while (FLIGHT_RECORDER_MAGIC[i] == firstByte) {
 150                         if (++i == FLIGHT_RECORDER_MAGIC.length) {
 151                                 return new Chunk(input, FLIGHT_RECORDER_MAGIC.length, reusableBuffer);
 152                         }
 153                         firstByte = input.readUnsignedByte();
 154                 }
 155                 throw new InvalidJfrFileException();
 156         }
 157 
 158         public static List<ChunkInfo> readChunkInfo(IChunkSupplier chunkSupplier)
 159                         throws CouldNotLoadRecordingException, IOException {
 160                 long nextChunkPos = 0;
 161                 final List<ChunkInfo> chunks = new ArrayList<>();
 162                 byte[] buffer = new byte[0];
 163                 Chunk nextChunk;
 164                 while ((nextChunk = chunkSupplier.getNextChunk(buffer)) != null) {
 165                         ChunkInfo info = getChunkInfo(nextChunk, nextChunkPos);
 166                         nextChunk.skip(info.getChunkSize());
 167                         buffer = nextChunk.getReusableBuffer();
 168                         nextChunkPos += info.getChunkSize();
 169                         chunks.add(info);
 170                 }
 171                 return chunks;
 172         }
 173 
 174         private static ChunkInfo getChunkInfo(Chunk nextChunk, long nextChunkPos)
 175                         throws CouldNotLoadRecordingException, IOException {
 176                 switch (nextChunk.getMajorVersion()) {
 177                 case VERSION_0:
 178                         return ChunkLoaderV0.getInfo(nextChunk, nextChunkPos);
 179                 case VERSION_1:
 180                         return ChunkLoaderV1.getInfo(nextChunk, nextChunkPos);
 181                 default:
 182                         throw new VersionNotSupportedException();
 183                 }
 184         }
 185 
 186         public static EventArray[] readChunks(
 187                 Runnable monitor, IChunkSupplier chunkSupplier, boolean hideExperimentals, boolean ignoreTruncatedChunk)
 188                         throws CouldNotLoadRecordingException, IOException {
 189                 return readChunks(monitor, ParserExtensionRegistry.getParserExtensions(), chunkSupplier, hideExperimentals,
 190                                 ignoreTruncatedChunk);
 191         }
 192 
 193         public static EventArray[] readChunks(
 194                 Runnable monitor, List<? extends IParserExtension> extensions, IChunkSupplier chunkSupplier,
 195                 boolean hideExperimentals, boolean ignoreTruncatedChunk) throws CouldNotLoadRecordingException, IOException {
 196                 LoaderContext context = new LoaderContext(extensions, hideExperimentals);
 197                 Runtime rt = Runtime.getRuntime();
 198                 long availableMemory = rt.maxMemory() - rt.totalMemory() + rt.freeMemory();
 199                 long maxBuffersCount = Math.min(Math.max(availableMemory / MIN_MEMORY_PER_THREAD, 1),
 200                                 rt.availableProcessors() - 1);
 201 
 202                 ExecutorService threadPool;
 203                 if (Boolean.getBoolean(SINGLE_THREADED_PARSER_PROPERTY_KEY)) {
 204                         threadPool = Executors.newSingleThreadExecutor();
 205                 } else {
 206                         threadPool = Executors.newCachedThreadPool();
 207                 }
 208 
 209                 int chunkCount = 0;
 210                 try {
 211                         ExecutorCompletionService<byte[]> service = new ExecutorCompletionService<>(threadPool);
 212                         byte[] buffer = new byte[0];
 213                         int outstanding = 0;
 214                         Set<Long> loadedChunkTimestamps = new HashSet<>();
 215                         IChunkLoader chunkLoader;
 216                         while ((chunkLoader = createChunkLoader(chunkSupplier, context, buffer, ignoreTruncatedChunk)) != null) {
 217                                 Long ts = chunkLoader.getTimestamp();
 218                                 if (!loadedChunkTimestamps.contains(ts)) {
 219                                         loadedChunkTimestamps.add(ts);
 220                                         service.submit(chunkLoader);
 221                                         chunkCount++;
 222                                         outstanding++;
 223                                         // Recover buffer from finished chunk loader for reuse or create a new buffer
 224                                         Future<byte[]> available = service.poll();
 225                                         if (available != null) {
 226                                                 buffer = available.get();
 227                                                 sendProgress(monitor);
 228                                                 outstanding--;
 229                                         } else if (outstanding < maxBuffersCount) {
 230                                                 buffer = new byte[0];
 231                                         } else {
 232                                                 buffer = service.take().get();
 233                                                 sendProgress(monitor);
 234                                                 outstanding--;
 235                                         }
 236                                 }
 237                         }
 238                         // Wait for all outstanding loaders to complete
 239                         while (outstanding > 0) {
 240                                 service.take().get();
 241                                 sendProgress(monitor);
 242                                 outstanding--;
 243                         }
 244                         if (chunkCount == 0) {
 245                                 // Recordings without any chunks are not allowed
 246                                 throw new InvalidJfrFileException("No readable chunks in recording"); //$NON-NLS-1$
 247                         }
 248                 } catch (InterruptedException e) {
 249                         throw new CouldNotLoadRecordingException(e);
 250                 } catch (ExecutionException e) {
 251                         Throwable cause = e.getCause();
 252                         if (cause instanceof Error) {
 253                                 throw ((Error) cause);
 254                         } else if (cause instanceof RuntimeException) {
 255                                 throw ((RuntimeException) cause);
 256                         } else if (cause instanceof IOException) {
 257                                 throw ((IOException) cause);
 258                         } else if (cause instanceof CouldNotLoadRecordingException) {
 259                                 throw ((CouldNotLoadRecordingException) cause);
 260                         } else {
 261                                 throw new CouldNotLoadRecordingException(cause);
 262                         }
 263                 } finally {
 264                         threadPool.shutdownNow();
 265                 }
 266                 LOGGER.fine("Loaded JFR with " + chunkCount + " chunks"); //$NON-NLS-1$ //$NON-NLS-2$
 267                 return context.buildEventArrays();
 268         }
 269 
 270         private static void sendProgress(Runnable listener) {
 271                 if (listener != null) {
 272                         listener.run();
 273                 }
 274         }
 275 
 276         /**
 277          * @param chunkSupplier
 278          *            chunk data source
 279          * @param context
 280          *            loader context that the returned chunk loader will send event data to
 281          * @param buffer
 282          *            Initial byte array to use for storing chunk data. See
 283          *            {@link IChunkSupplier#getNextChunk(byte[])}.
 284          * @param ignoreTruncatedChunk
 285          *            if true, then any exceptions caused by getting and reading the next chunk will be
 286          *            ignored and instead make the method return null
 287          * @return a new chunk loader or null if no more data is available from the chunk supplier
 288          */
 289         private static IChunkLoader createChunkLoader(
 290                 IChunkSupplier chunkSupplier, LoaderContext context, byte[] buffer, boolean ignoreTruncatedChunk)
 291                         throws CouldNotLoadRecordingException, IOException {
 292                 try {
 293                         Chunk chunk = chunkSupplier.getNextChunk(buffer);
 294                         if (chunk != null) {
 295                                 switch (chunk.getMajorVersion()) {
 296                                 case VERSION_0:
 297                                         return ChunkLoaderV0.create(chunk, context);
 298                                 case VERSION_1:
 299                                         return ChunkLoaderV1.create(chunk, context);
 300                                 default:
 301                                         throw new VersionNotSupportedException();
 302                                 }
 303                         }
 304                 } catch (IOException e) {
 305                         if (ignoreTruncatedChunk) {
 306                                 LOGGER.log(Level.INFO, "Ignoring exception while reading chunk", e); //$NON-NLS-1$
 307                         } else {
 308                                 throw e;
 309                         }
 310                 }
 311                 return null;
 312         }
 313 }