Skip to content

Commit

Permalink
parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 21, 2024
1 parent 6d915f8 commit ab45f4d
Showing 1 changed file with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ private boolean containsCompressed(FrameBlock in) {
}

public static MatrixBlock encode(MultiColumnEncoder enc, FrameBlock in, int k)
throws InterruptedException, ExecutionException {
throws Exception {
return new CompressedEncode(enc, in, k).apply();
}

private MatrixBlock apply() throws InterruptedException, ExecutionException {
private MatrixBlock apply() throws Exception {
try {
final List<ColumnEncoderComposite> encoders = enc.getColumnEncoders();
final List<AColGroup> groups = isParallel() ? multiThread(encoders) : singleThread(encoders);
Expand All @@ -112,15 +112,15 @@ private boolean isParallel() {
}

private List<AColGroup> singleThread(List<ColumnEncoderComposite> encoders)
throws InterruptedException, ExecutionException {
throws Exception {
List<AColGroup> groups = new ArrayList<>(encoders.size());
for(ColumnEncoderComposite c : encoders)
groups.add(encode(c));
return groups;
}

private List<AColGroup> multiThread(List<ColumnEncoderComposite> encoders)
throws InterruptedException, ExecutionException {
throws Exception {
try {
final List<Future<AColGroup>> tasks = new ArrayList<>(encoders.size());
for(ColumnEncoderComposite c : encoders)
Expand Down Expand Up @@ -150,7 +150,7 @@ private int shiftGroups(List<AColGroup> groups) {
return cols;
}

private AColGroup encode(ColumnEncoderComposite c) throws InterruptedException, ExecutionException {
private AColGroup encode(ColumnEncoderComposite c) throws Exception {
if(c.isRecodeToDummy())
return recodeToDummy(c);
else if(c.isRecode())
Expand All @@ -170,7 +170,7 @@ else if(c.isHashToDummy())
}

@SuppressWarnings("unchecked")
private AColGroup recodeToDummy(ColumnEncoderComposite c) {
private AColGroup recodeToDummy(ColumnEncoderComposite c) throws Exception {
int colId = c._colID;
Array<?> a = in.getColumn(colId - 1);
boolean containsNull = a.containsNull();
Expand Down Expand Up @@ -336,7 +336,7 @@ private AColGroup binToDummy(ColumnEncoderComposite c) throws InterruptedExcepti
}

@SuppressWarnings("unchecked")
private AColGroup recode(ColumnEncoderComposite c) {
private AColGroup recode(ColumnEncoderComposite c) throws Exception {
int colId = c._colID;
Array<?> a = in.getColumn(colId - 1);
Map<?, Long> map = a.getRecodeMap(c._estNumDistincts);
Expand Down Expand Up @@ -364,7 +364,7 @@ private AColGroup recode(ColumnEncoderComposite c) {
}

@SuppressWarnings("unchecked")
private AColGroup passThrough(ColumnEncoderComposite c) {
private AColGroup passThrough(ColumnEncoderComposite c) throws Exception {

final IColIndex colIndexes = ColIndexFactory.create(1);
final int colId = c._colID;
Expand Down Expand Up @@ -418,29 +418,45 @@ private AColGroup passThrough(ColumnEncoderComposite c) {

}

private AMapToData createMappingAMapToData(Array<?> a, Map<?, Long> map, boolean containsNull) {
private AMapToData createMappingAMapToData(Array<?> a, Map<?, Long> map, boolean containsNull) throws Exception {
final int si = map.size();
final int nRow = in.getNumRows();
if(!containsNull && a instanceof DDCArray)
return ((DDCArray<?>)a).getMap();

final AMapToData m = MapToFactory.create(nRow, si + (containsNull ? 1 : 0));
if(containsNull)
return createMappingAMapToDataWithNull(a, map, si, nRow, m);
else
return createMappingAMapToDataNoNull(a, map, si, nRow, m);
final int blkz = Math.max(10000, (nRow + k)/k);

List<Future<?>> tasks = new ArrayList<>();
for(int i = 0; i < nRow; i += blkz){
final int start = i;
final int end = Math.min(nRow, i + blkz);

tasks.add(pool.submit(() -> {
if(containsNull)
return createMappingAMapToDataWithNull(a, map, si, m, start, end);
else
return createMappingAMapToDataNoNull(a, map, si, m, start, end);

}));

}
for( Future<?> t : tasks){
t.get();
}
return m;
}

private AMapToData createMappingAMapToDataNoNull(Array<?> a, Map<?, Long> map, int si, int nRow, AMapToData m) {
private static AMapToData createMappingAMapToDataNoNull(Array<?> a, Map<?, Long> map, int si, AMapToData m, int start, int end) {
// TODO push down to underlying array if critical performance to allow JIT compilation.
for(int i = 0; i < nRow; i++)
for(int i = start; i < end; i++)
m.set(i, map.get(a.get(i)).intValue() - 1);
return m;
}

private AMapToData createMappingAMapToDataWithNull(Array<?> a, Map<?, Long> map, int si, int nRow, AMapToData m) {
private static AMapToData createMappingAMapToDataWithNull(Array<?> a, Map<?, Long> map, int si, AMapToData m, int start, int end) {
// TODO push down to underlying array if critical performance to allow JIT compilation.
for(int i = 0; i < nRow; i++) {
for(int i = start; i < end; i++) {
final Object v = a.get(i);
if(v != null)
m.set(i, map.get(v).intValue() - 1);
Expand Down

0 comments on commit ab45f4d

Please sign in to comment.