Skip to content

Commit

Permalink
partition and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 23, 2024
1 parent 992bd00 commit 78df127
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected static MapToChar readFields(DataInput in) throws IOException {
final int length = in.readInt();
final char[] data = new char[length];
for(int i = 0; i < length; i++)
data[i] = in.readChar();
data[i] = (char)in.readUnsignedShort();
return new MapToChar(unique, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
package org.apache.sysds.runtime.frame.data.columns;

import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
import org.apache.sysds.runtime.matrix.data.Pair;
import org.apache.sysds.runtime.util.CommonThreadPool;

/**
* Generic, resizable native arrays for the internal representation of the columns in the FrameBlock. We use this custom
Expand Down Expand Up @@ -119,12 +126,58 @@ public synchronized final Map<T, Long> getRecodeMap(int estimate) {
* @return The recode map
*/
protected Map<T, Long> createRecodeMap(int estimate) {
final Map<T, Long> map = new HashMap<>((int)Math.min((long)estimate *2, size()));
// final Map<T, Long> map = new HashMap<>((int)Math.min((long)estimate *2, size()));
final int s = size();
int k = OptimizerUtils.getTransformNumThreads();
if(k <= 1 || s < 10000)
return createRecodeMap(estimate, 0, s);
else
return parallelCreateRecodeMap(estimate, s, k);
}

private Map<T, Long> parallelCreateRecodeMap(int estimate, final int s, int k) {
final ExecutorService pool = CommonThreadPool.get(k);
try{

final int blk = Math.max(10000, (s + k) / k);
List<Future<Map<T,Long>>> tasks = new ArrayList<>();
for(int i = 0; i < s; i+= blk){
final int start = i;
final int end = Math.min(i + blk, s);
tasks.add(pool.submit(() -> createRecodeMap(estimate, start, end)));
}
final Map<T, Long> map = tasks.get(0).get();
for(int i = 1; i < tasks.size(); i++){
final Map<T, Long> map2 = tasks.get(i).get();
mergeRecodeMaps(map, map2);
}
return map;
}
catch(Exception e){
throw new RuntimeException(e);
}
finally{
pool.shutdown();
}
}

private void mergeRecodeMaps( Map<T, Long> target, Map<T, Long> from){
List<T> fromEntriesOrdered = new ArrayList<>(Collections.nCopies(from.size(), null));
for(Map.Entry<T,Long> e : from.entrySet())
fromEntriesOrdered.set(e.getValue().intValue(), e.getKey());
long id = target.size();
for(T e : fromEntriesOrdered){
Long v = target.putIfAbsent(e, id );
if(v == null)
id++;
}
}

private Map<T, Long> createRecodeMap(final int estimate, final int s, final int e) {
final Map<T, Long> map = new HashMap<>((int)Math.min((long)estimate *2, e-s));
long id = 1;
final int s = size();
for(int i = 0; i < s; i++)
for(int i = s; i < e; i++)
id = addValRecodeMap(map, id, i);

return map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,8 @@ public void setM(Map<T, Long> map, int si, AMapToData m, int i) {

@Override
protected long addValRecodeMap(Map<T, Long> map, long id, int i) {

if(_n.get(i)) {
if(_n.get(i))
id = _a.addValRecodeMap(map, id, i);
}
return id;
}

Expand Down

0 comments on commit 78df127

Please sign in to comment.