Skip to content

Commit

Permalink
reduce tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 21, 2024
1 parent a741775 commit b163f56
Showing 1 changed file with 5 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
final int numThreads = OptimizerUtils.getParallelTextReadParallelism();

TextInputFormat informat = new TextInputFormat();
informat.configure(job);
Expand All @@ -64,29 +64,21 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,

final ExecutorService pool = CommonThreadPool.get(numThreads);
try {
// get number of threads pool to use the common thread pool.

//compute num rows per split
ArrayList<Future<Long>> cret = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
for( int i=0; i<splits.length - 1; i++ )
cret.add(pool.submit(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0)));


//compute row offset per split via cumsum on row counts
int offset = 0;

// offsets.add(0);
// List<Integer> offsets = new ArrayList<>();
ArrayList<Future<Object>> tasks2 = new ArrayList<>();
for( int i=0; i<splits.length; i++ ){
for( int i=0; i<splits.length -1; i++ ){
tasks2.add(pool.submit(new ReadRowsTask(splits[i], informat, job, dest, offset, i==0)));
offset += cret.get(i).get();
}
// for( Future<Long> count : cret ) {
// offsets.add(offset);
// offset += count.get();
// }

tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, offset, splits.length==1)));

//read individual splits
for(Future<Object> a : tasks2)
a.get();
Expand Down

0 comments on commit b163f56

Please sign in to comment.