From b163f566867d10598f67dc2cfeaac86d72e02c7a Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Mon, 21 Oct 2024 14:49:19 +0200 Subject: [PATCH] reduce tasks --- .../runtime/io/FrameReaderTextCSVParallel.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java index 250633e1eda..219dc98118d 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java @@ -54,7 +54,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen) throws IOException { - int numThreads = OptimizerUtils.getParallelTextReadParallelism(); + final int numThreads = OptimizerUtils.getParallelTextReadParallelism(); TextInputFormat informat = new TextInputFormat(); informat.configure(job); @@ -64,29 +64,21 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, final ExecutorService pool = CommonThreadPool.get(numThreads); try { - // get number of threads pool to use the common thread pool. - //compute num rows per split ArrayList> cret = new ArrayList<>(); - for( int i=0; i offsets = new ArrayList<>(); ArrayList> tasks2 = new ArrayList<>(); - for( int i=0; i count : cret ) { - // offsets.add(offset); - // offset += count.get(); - // } - + tasks2.add(pool.submit(new ReadRowsTask(splits[splits.length-1], informat, job, dest, offset, splits.length==1))); + //read individual splits for(Future a : tasks2) a.get();