diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index 2c9e671b7..1b71f031f 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -13,6 +13,7 @@ package org.hpccsystems.dfs.client; import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.ecl.RecordDefinitionTranslator; import org.hpccsystems.dfs.client.RowServiceOutputStream; import org.apache.logging.log4j.Logger; @@ -34,6 +35,7 @@ public class HPCCRemoteFileWriter private BinaryRecordWriter binaryRecordWriter = null; private IRecordAccessor recordAccessor = null; private long recordsWritten = 0; + private long openTimeMs = 0; /** * A remote file writer. @@ -85,6 +87,12 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream); this.binaryRecordWriter.initialize(this.recordAccessor); + + log.info("HPCCRemoteFileWriter: Opening file part: " + dataPartition.getThisPart() + + " compression: " + fileCompression.name() + " " + + " record definition:\n" + + RecordDefinitionTranslator.toJsonRecord(this.recordDef)); + openTimeMs = System.currentTimeMillis(); } /** @@ -128,6 +136,12 @@ public void close() throws Exception { this.report(); this.binaryRecordWriter.finalize(); + + long closeTimeMs = System.currentTimeMillis(); + double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0; + log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart() + + " write time: " + writeTimeS + "s " + + " records written: " + recordsWritten); } /** @@ -201,7 +215,7 @@ public String getRemoteWriteMessages() return report; } - + /** * Reports summary of messages generated during write operation. */ diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 716e6c5e8..75804b4d5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -13,6 +13,7 @@ package org.hpccsystems.dfs.client; import org.hpccsystems.commons.ecl.FieldDef; +import org.hpccsystems.commons.ecl.RecordDefinitionTranslator; import org.hpccsystems.commons.errors.HpccFileException; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -21,7 +22,7 @@ import java.util.Iterator; /** - * Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition + * Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition * and constructs records via the provided @see org.hpccsystems.dfs.client#IRecordBuilder. */ public class HpccRemoteFileReader implements Iterator @@ -34,6 +35,8 @@ public class HpccRemoteFileReader implements Iterator private BinaryRecordReader binaryRecordReader; private IRecordBuilder recordBuilder = null; private boolean handlePrefetch = true; + private long openTimeMs = 0; + private long recordsRead = 0; public static final int NO_RECORD_LIMIT = -1; public static final int DEFAULT_READ_SIZE_OPTION = -1; @@ -72,7 +75,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * the record defintion for the dataset * @param recBuilder * the IRecordBuilder used to construct records - * @param connectTimeout + * @param connectTimeout * the connection timeout in seconds, -1 for default * @throws Exception * the exception @@ -105,20 +108,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde /** * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. - * + * * @param dp * the part of the file, name and location * @param originalRD * the record defintion for the dataset * @param recBuilder * the IRecordBuilder used to construct records - * @param connectTimeout + * @param connectTimeout * the connection timeout in seconds, -1 for default - * @param limit + * @param limit * the maximum number of records to read from the provided data partition, -1 specifies no limit - * @param createPrefetchThread + * @param createPrefetchThread * the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically. - * @param readSizeKB + * @param readSizeKB * read request size in KB, -1 specifies use default value * @throws Exception * general exception @@ -130,22 +133,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde /** * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. - * + * * @param dp * the part of the file, name and location * @param originalRD * the record defintion for the dataset * @param recBuilder * the IRecordBuilder used to construct records - * @param connectTimeout + * @param connectTimeout * the connection timeout in seconds, -1 for default - * @param limit + * @param limit * the maximum number of records to read from the provided data partition, -1 specifies no limit - * @param createPrefetchThread + * @param createPrefetchThread * the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically. - * @param readSizeKB + * @param readSizeKB * read request size in KB, -1 specifies use default value - * @param resumeInfo + * @param resumeInfo * FileReadeResumeInfo data required to restart a read from a particular point in a file * @throws Exception * general exception @@ -172,6 +175,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde { throw new Exception("IRecordBuilder does not have a valid record definition."); } + if (resumeInfo == null) { this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB); @@ -200,11 +204,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde this.binaryRecordReader = new BinaryRecordReader(this.inputStream, resumeInfo.recordReaderStreamPos); this.binaryRecordReader.initialize(this.recordBuilder); } + + + log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart() + + (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" ) + + " original record definition:\n" + + RecordDefinitionTranslator.toJsonRecord(originalRD) + + " projected record definition:\n" + + RecordDefinitionTranslator.toJsonRecord(projectedRecordDefinition)); + openTimeMs = System.currentTimeMillis(); } /** * Returns the stream position within the file. - * + * * @return stream position */ public long getStreamPosition() @@ -214,7 +227,7 @@ public long getStreamPosition() /** * Returns read resume info for the current position within the file. - * + * * @return FileReadResumeInfo */ public FileReadResumeInfo getFileReadResumeInfo() @@ -224,7 +237,7 @@ public FileReadResumeInfo getFileReadResumeInfo() /** * Returns read resume info for the specified position within the file. - * + * * @param streamPosition the stream position to resume from * @return FileReadResumeInfo */ @@ -242,7 +255,7 @@ public FileReadResumeInfo getFileReadResumeInfo(Long streamPosition) /** * Returns the number of messages created during the reading process - * + * * @return number of messages created */ public int getRemoteReadMessageCount() @@ -256,7 +269,7 @@ public int getRemoteReadMessageCount() /** * Returns messages created during the file reading process - * + * * @return Messages concatenated into a String */ public String getRemoteReadMessages() @@ -284,7 +297,7 @@ public void prefetch() /** * Is there more data - * + * * @return true if there is a next record */ @Override @@ -323,6 +336,8 @@ public T next() log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage()); throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); } + + recordsRead++; return (T) rslt; } @@ -336,6 +351,12 @@ public void close() throws Exception { report(); this.inputStream.close(); + + long closeTimeMs = System.currentTimeMillis(); + double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; + log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + + " read time: " + readTimeS + "s " + + " records read: " + recordsRead); } /** @@ -352,7 +373,7 @@ public int getAvailable() throws IOException /** * Returns the RowServiceInputStream used to read the file from dafilesrv - * + * * @return the input stream */ public RowServiceInputStream getInputStream() @@ -362,7 +383,7 @@ public RowServiceInputStream getInputStream() /** * Returns the BinaryRecordReader used to construct records - * + * * @return the record reader */ public BinaryRecordReader getRecordReader()