/*
* Copyright (c) 2014, 2015, Dynatrace and/or its affiliates. All rights reserved.
*
* This file is part of the Lock Contention Tracing Subsystem for the HotSpot
* Virtual Machine, which is developed at Christian Doppler Laboratory on
* Monitoring and Evolution of Very-Large-Scale Software Systems. Please
* contact us at if you need additional information
* or have any questions.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work. If not, see .
*
*/
package sun.evtracing.processing.io;
import static sun.evtracing.processing.io.TraceDataFormat.BYTE_ORDER;
import static sun.evtracing.processing.io.TraceDataFormat.FLAG_COMPRESSED;
import static sun.evtracing.processing.io.TraceDataFormat.MAGIC;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import sun.evtracing.TraceBuffer;
import sun.evtracing.processing.ProgressCallback;
import sun.evtracing.processing.TraceProcessor;
public class TraceDataChannelReader implements AutoCloseable {
private final ReadableByteChannel channel;
private final TraceProcessor processor;
public TraceDataChannelReader(ReadableByteChannel channel, TraceProcessor processor) {
this.channel = channel;
this.processor = processor;
}
public void readToEnd(ProgressCallback callback) throws Exception {
CountingReadableByteChannel input = new CountingReadableByteChannel(channel);
ReadableByteChannel ch = input;
ByteBuffer header = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
header.order(BYTE_ORDER);
header.limit(Integer.BYTES * 2);
readFully(ch, header);
header.rewind();
if (header.getInt() != MAGIC) {
throw new RuntimeException("expected magic");
}
int flags = header.getInt();
if ((flags & FLAG_COMPRESSED) != 0) {
ch = Compression.createDecompressingChannel(ch);
}
header.limit(Long.BYTES + Integer.BYTES);
header.rewind();
while (header.rewind() != null && tryReadFully(ch, header)) {
header.rewind();
long thread = header.getLong();
int length = header.getInt();
assert length >= 0;
ByteBuffer events = ByteBuffer.allocate(length);
events.order(BYTE_ORDER);
readFully(ch, events);
events.rewind();
processor.process(new TraceBuffer(events, thread));
if (callback != null) {
callback.bufferRead();
callback.totalInputBytesRead(input.bytesRead());
}
}
}
private boolean tryReadFully(ReadableByteChannel ch, ByteBuffer buffer) throws IOException {
int total = 0;
do {
int bytes = ch.read(buffer);
if (bytes < 0) {
if (total == 0) {
return false;
}
throw new EOFException("mid-buffer");
}
total += bytes;
} while (total != buffer.limit());
return true;
}
private void readFully(ReadableByteChannel ch, ByteBuffer buffer) throws IOException {
if (!tryReadFully(ch, buffer)) {
throw new EOFException("start");
}
}
@Override
public void close() throws IOException {
channel.close();
}
}