diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java index 84cd1ca6483..789e8df067a 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java @@ -63,7 +63,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, splits = IOUtilFunctions.sortInputSplits(splits); - final ExecutorService pool = CommonThreadPool.get(numThreads); + final ExecutorService pool = CommonThreadPool.get(numThreads); try { if(splits.length == 1){ new ReadRowsTask(splits[0], informat, job, dest, 0, true).call(); @@ -73,7 +73,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, //compute num rows per split ArrayList> cret = new ArrayList<>(); for( int i=0; i computeCSVSize( Path path, JobConf job, FileSyst try { ArrayList tasks = new ArrayList<>(); for( int i=0; i> cret = pool.invokeAll(tasks); for( Future count : cret ) nrow += count.get().longValue(); @@ -140,18 +140,22 @@ private static class CountRowsTask implements Callable { private TextInputFormat _informat; private JobConf _job; private boolean _hasHeader; + private int _id; - public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader) { + + public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, int id ) { _split = split; _informat = informat; _job = job; _hasHeader = hasHeader; + _id = id; } @Override public Long call() throws Exception { + LOG.debug("lines in split: " + _id + " --- "); long count = countLinesInSplit(_split, _informat, _job, _hasHeader); - LOG.debug("lines in split: " + count); + LOG.debug("lines in split: " + _id + " --- " + count); return count; } }