Skip to content

Commit

Permalink
debug statements
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 21, 2024
1 parent cb31c61 commit a7fdf8c
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.utils.stats.Timing;

/**
* Multi-threaded frame text csv reader.
Expand All @@ -54,6 +55,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
Timing time = new Timing(true);
final int numThreads = OptimizerUtils.getParallelTextReadParallelism();

TextInputFormat informat = new TextInputFormat();
Expand All @@ -75,6 +77,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
for( int i=0; i<splits.length - 1; i++ ) // all but last split
cret.add(pool.submit(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0)));

LOG.debug("Spawned all row counting tasks CSV : " + time.stop());
//compute row offset per split via cumsum on row counts
long offset = 0;
ArrayList<Future<Object>> tasks2 = new ArrayList<>();
Expand All @@ -85,10 +88,11 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
}
tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, (int) offset, splits.length==1)));

LOG.debug("Spawned all reading tasks CSV : " + time.stop());
//read individual splits
for(Future<Object> a : tasks2)
a.get();

LOG.debug("Finished Reading CSV : " + time.stop());
}
catch (Exception e) {
throw new IOException("Failed parallel read of text csv input.", e);
Expand Down

0 comments on commit a7fdf8c

Please sign in to comment.