Skip to content

Commit

Permalink
id
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 21, 2024
1 parent f0ccb32 commit 2fda71a
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
splits = IOUtilFunctions.sortInputSplits(splits);


final ExecutorService pool = CommonThreadPool.get(numThreads);
final ExecutorService pool = CommonThreadPool.get(numThreads);
try {
if(splits.length == 1){
new ReadRowsTask(splits[0], informat, job, dest, 0, true).call();
Expand All @@ -73,7 +73,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
//compute num rows per split
ArrayList<Future<Long>> cret = new ArrayList<>();
for( int i=0; i<splits.length - 1; i++ ) // all but last split
cret.add(pool.submit(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0)));
cret.add(pool.submit(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0, i)));

//compute row offset per split via cumsum on row counts
long offset = 0;
Expand Down Expand Up @@ -117,7 +117,7 @@ protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSyst
try {
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader()&& i==0));
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader()&& i==0, i));
List<Future<Long>> cret = pool.invokeAll(tasks);
for( Future<Long> count : cret )
nrow += count.get().longValue();
Expand All @@ -140,18 +140,22 @@ private static class CountRowsTask implements Callable<Long> {
private TextInputFormat _informat;
private JobConf _job;
private boolean _hasHeader;
private int _id;

public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader) {

public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, int id ) {
_split = split;
_informat = informat;
_job = job;
_hasHeader = hasHeader;
_id = id;
}

@Override
public Long call() throws Exception {
LOG.debug("lines in split: " + _id + " --- ");
long count = countLinesInSplit(_split, _informat, _job, _hasHeader);
LOG.debug("lines in split: " + count);
LOG.debug("lines in split: " + _id + " --- " + count);
return count;
}
}
Expand Down

0 comments on commit 2fda71a

Please sign in to comment.