Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-537 Log open / close operation info #637

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.hpccsystems.dfs.client;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.dfs.client.RowServiceOutputStream;

import org.apache.logging.log4j.Logger;
Expand All @@ -34,6 +35,7 @@ public class HPCCRemoteFileWriter<T>
private BinaryRecordWriter binaryRecordWriter = null;
private IRecordAccessor recordAccessor = null;
private long recordsWritten = 0;
private long openTimeMs = 0;

/**
* A remote file writer.
Expand Down Expand Up @@ -85,6 +87,12 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso

this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
this.binaryRecordWriter.initialize(this.recordAccessor);

log.info("HPCCRemoteFileWriter: Opening file part: " + dataPartition.getThisPart()
+ " compression: " + fileCompression.name());
log.trace("Record definition:\n"
+ RecordDefinitionTranslator.toJsonRecord(this.recordDef));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the bloat factor of this output?
Was there discussion of reserving this output for error/warning situations only?

openTimeMs = System.currentTimeMillis();
}

/**
Expand Down Expand Up @@ -128,6 +136,12 @@ public void close() throws Exception
{
this.report();
this.binaryRecordWriter.finalize();

long closeTimeMs = System.currentTimeMillis();
double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart()
+ " write time: " + writeTimeS + "s "
+ " records written: " + recordsWritten);
}

/**
Expand Down Expand Up @@ -201,7 +215,7 @@ public String getRemoteWriteMessages()

return report;
}

/**
* Reports summary of messages generated during write operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.hpccsystems.dfs.client;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -21,7 +22,7 @@
import java.util.Iterator;

/**
* Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition
* Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition
* and constructs records via the provided @see org.hpccsystems.dfs.client#IRecordBuilder.
*/
public class HpccRemoteFileReader<T> implements Iterator<T>
Expand All @@ -34,6 +35,8 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private BinaryRecordReader binaryRecordReader;
private IRecordBuilder recordBuilder = null;
private boolean handlePrefetch = true;
private long openTimeMs = 0;
private long recordsRead = 0;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
Expand Down Expand Up @@ -72,7 +75,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @throws Exception
* the exception
Expand Down Expand Up @@ -105,20 +108,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @param limit
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @throws Exception
* general exception
Expand All @@ -130,22 +133,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @param limit
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @param resumeInfo
* @param resumeInfo
* FileReadeResumeInfo data required to restart a read from a particular point in a file
* @throws Exception
* general exception
Expand All @@ -172,6 +175,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
{
throw new Exception("IRecordBuilder does not have a valid record definition.");
}

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB);
Expand Down Expand Up @@ -200,11 +204,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.binaryRecordReader = new BinaryRecordReader(this.inputStream, resumeInfo.recordReaderStreamPos);
this.binaryRecordReader.initialize(this.recordBuilder);
}


log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart()
+ (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" ));
log.trace("Original record definition:\n"
+ RecordDefinitionTranslator.toJsonRecord(originalRD)
+ " projected record definition:\n"
+ RecordDefinitionTranslator.toJsonRecord(projectedRecordDefinition));
openTimeMs = System.currentTimeMillis();
}

/**
* Returns the stream position within the file.
*
*
* @return stream position
*/
public long getStreamPosition()
Expand All @@ -214,7 +227,7 @@ public long getStreamPosition()

/**
* Returns read resume info for the current position within the file.
*
*
* @return FileReadResumeInfo
*/
public FileReadResumeInfo getFileReadResumeInfo()
Expand All @@ -224,7 +237,7 @@ public FileReadResumeInfo getFileReadResumeInfo()

/**
* Returns read resume info for the specified position within the file.
*
*
* @param streamPosition the stream position to resume from
* @return FileReadResumeInfo
*/
Expand All @@ -242,7 +255,7 @@ public FileReadResumeInfo getFileReadResumeInfo(Long streamPosition)

/**
* Returns the number of messages created during the reading process
*
*
* @return number of messages created
*/
public int getRemoteReadMessageCount()
Expand All @@ -256,7 +269,7 @@ public int getRemoteReadMessageCount()

/**
* Returns messages created during the file reading process
*
*
* @return Messages concatenated into a String
*/
public String getRemoteReadMessages()
Expand Down Expand Up @@ -284,7 +297,7 @@ public void prefetch()

/**
* Is there more data
*
*
* @return true if there is a next record
*/
@Override
Expand Down Expand Up @@ -323,6 +336,8 @@ public T next()
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage());
throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
}

recordsRead++;
return (T) rslt;
}

Expand All @@ -336,6 +351,12 @@ public void close() throws Exception
{
report();
this.inputStream.close();

long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart()
+ " read time: " + readTimeS + "s "
+ " records read: " + recordsRead);
}

/**
Expand All @@ -352,7 +373,7 @@ public int getAvailable() throws IOException

/**
* Returns the RowServiceInputStream used to read the file from dafilesrv
*
*
* @return the input stream
*/
public RowServiceInputStream getInputStream()
Expand All @@ -362,7 +383,7 @@ public RowServiceInputStream getInputStream()

/**
* Returns the BinaryRecordReader used to construct records
*
*
* @return the record reader
*/
public BinaryRecordReader getRecordReader()
Expand Down
Loading