Skip to content

Commit

Permalink
debug splits
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 21, 2024
1 parent 9a312e1 commit b28c577
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
cret.add(pool.submit(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0)));

//compute row offset per split via cumsum on row counts
int offset = 0;
long offset = 0;
ArrayList<Future<Object>> tasks2 = new ArrayList<>();
for( int i=0; i<splits.length -1; i++ ){
tasks2.add(pool.submit(new ReadRowsTask(splits[i], informat, job, dest, offset, i==0)));
tasks2.add(pool.submit(new ReadRowsTask(splits[i], informat, job, dest, (int) offset, i==0)));
offset += cret.get(i).get();
}
tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, offset, splits.length==1)));
tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, (int) offset, splits.length==1)));

//read individual splits
for(Future<Object> a : tasks2)
Expand Down Expand Up @@ -149,7 +149,9 @@ public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, bo

@Override
public Long call() throws Exception {
return countLinesInSplit(_split, _informat, _job, _hasHeader);
long count = countLinesInSplit(_split, _informat, _job, _hasHeader);
LOG.debug("lines in split: " + count);
return count;
}
}

Expand Down Expand Up @@ -178,6 +180,8 @@ public ReadRowsTask(InputSplit split, TextInputFormat informat, JobConf job,
public Object call() throws Exception {
readCSVFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(),
_dest.getColumnNames(), _dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit);

LOG.debug("read csv : " + _offset);
return null;
}
}
Expand Down

0 comments on commit b28c577

Please sign in to comment.