Skip to content

Commit

Permalink
HPCC4J-537 Log open / close operation info
Browse files Browse the repository at this point in the history
- Added open / close logging output

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Aug 29, 2023
1 parent 378a1fe commit fa60891
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.hpccsystems.dfs.client;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.dfs.client.RowServiceOutputStream;

import org.apache.logging.log4j.Logger;
Expand All @@ -34,6 +35,7 @@ public class HPCCRemoteFileWriter<T>
private BinaryRecordWriter binaryRecordWriter = null;
private IRecordAccessor recordAccessor = null;
private long recordsWritten = 0;
private long openTimeMs = 0;

/**
* A remote file writer.
Expand Down Expand Up @@ -85,6 +87,12 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso

this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
this.binaryRecordWriter.initialize(this.recordAccessor);

log.info("HPCCRemoteFileWriter: Opening file part: " + dataPartition.getThisPart()
+ " compression: " + fileCompression.name() + " "
+ " record definition:\n"
+ RecordDefinitionTranslator.toJsonRecord(this.recordDef));
openTimeMs = System.currentTimeMillis();
}

/**
Expand Down Expand Up @@ -128,6 +136,12 @@ public void close() throws Exception
{
this.report();
this.binaryRecordWriter.finalize();

long closeTimeMs = System.currentTimeMillis();
double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart()
+ " write time: " + writeTimeS + "s "
+ " records written: " + recordsWritten);
}

/**
Expand Down Expand Up @@ -201,7 +215,7 @@ public String getRemoteWriteMessages()

return report;
}

/**
* Reports summary of messages generated during write operation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.hpccsystems.dfs.client;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -21,7 +22,7 @@
import java.util.Iterator;

/**
* Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition
* Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition
* and constructs records via the provided @see org.hpccsystems.dfs.client#IRecordBuilder.
*/
public class HpccRemoteFileReader<T> implements Iterator<T>
Expand All @@ -34,6 +35,8 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private BinaryRecordReader binaryRecordReader;
private IRecordBuilder recordBuilder = null;
private boolean handlePrefetch = true;
private long openTimeMs = 0;
private long recordsRead = 0;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
Expand Down Expand Up @@ -72,7 +75,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @throws Exception
* the exception
Expand Down Expand Up @@ -105,20 +108,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @param limit
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @throws Exception
* general exception
Expand All @@ -130,22 +133,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @param limit
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @param resumeInfo
* @param resumeInfo
* FileReadeResumeInfo data required to restart a read from a particular point in a file
* @throws Exception
* general exception
Expand All @@ -172,6 +175,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
{
throw new Exception("IRecordBuilder does not have a valid record definition.");
}

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB);
Expand Down Expand Up @@ -200,11 +204,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.binaryRecordReader = new BinaryRecordReader(this.inputStream, resumeInfo.recordReaderStreamPos);
this.binaryRecordReader.initialize(this.recordBuilder);
}


log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart()
+ (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" )
+ " original record definition:\n"
+ RecordDefinitionTranslator.toJsonRecord(originalRD)
+ " projected record definition:\n"
+ RecordDefinitionTranslator.toJsonRecord(projectedRecordDefinition));
openTimeMs = System.currentTimeMillis();
}

/**
* Returns the stream position within the file.
*
*
* @return stream position
*/
public long getStreamPosition()
Expand All @@ -214,7 +227,7 @@ public long getStreamPosition()

/**
* Returns read resume info for the current position within the file.
*
*
* @return FileReadResumeInfo
*/
public FileReadResumeInfo getFileReadResumeInfo()
Expand All @@ -224,7 +237,7 @@ public FileReadResumeInfo getFileReadResumeInfo()

/**
* Returns read resume info for the specified position within the file.
*
*
* @param streamPosition the stream position to resume from
* @return FileReadResumeInfo
*/
Expand All @@ -242,7 +255,7 @@ public FileReadResumeInfo getFileReadResumeInfo(Long streamPosition)

/**
* Returns the number of messages created during the reading process
*
*
* @return number of messages created
*/
public int getRemoteReadMessageCount()
Expand All @@ -256,7 +269,7 @@ public int getRemoteReadMessageCount()

/**
* Returns messages created during the file reading process
*
*
* @return Messages concatenated into a String
*/
public String getRemoteReadMessages()
Expand Down Expand Up @@ -284,7 +297,7 @@ public void prefetch()

/**
* Is there more data
*
*
* @return true if there is a next record
*/
@Override
Expand Down Expand Up @@ -323,6 +336,8 @@ public T next()
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage());
throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
}

recordsRead++;
return (T) rslt;
}

Expand All @@ -336,6 +351,12 @@ public void close() throws Exception
{
report();
this.inputStream.close();

long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart()
+ " read time: " + readTimeS + "s "
+ " records read: " + recordsRead);
}

/**
Expand All @@ -352,7 +373,7 @@ public int getAvailable() throws IOException

/**
* Returns the RowServiceInputStream used to read the file from dafilesrv
*
*
* @return the input stream
*/
public RowServiceInputStream getInputStream()
Expand All @@ -362,7 +383,7 @@ public RowServiceInputStream getInputStream()

/**
* Returns the BinaryRecordReader used to construct records
*
*
* @return the record reader
*/
public BinaryRecordReader getRecordReader()
Expand Down

0 comments on commit fa60891

Please sign in to comment.