Skip to content

Commit

Permalink
mapping updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Oct 2, 2024
1 parent 2c7fb4b commit e37d0c0
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -95,7 +99,6 @@ public final void setUnique(int nUnique) {
*/
public abstract int getIndex(int n);


/**
* Shortcut method to support Integer objects, not really efficient but for the purpose of reusing code.
*
Expand Down Expand Up @@ -813,7 +816,11 @@ protected void copyInt(MapToInt d) {
*
* @param d The array to copy
*/
public abstract void copyInt(int[] d);
public void copyInt(int[] d) {
copyInt(d, 0, size());
}

public abstract void copyInt(int[] d, int start, int end);

public abstract void copyBit(BitSet d);

Expand Down Expand Up @@ -887,7 +894,8 @@ public int countRuns(AOffset off) {

@Override
public boolean equals(Object e) {
return e instanceof AMapToData && (this == e || this.equals((AMapToData) e));
return this == e || // same object or
(e instanceof AMapToData && this.equals((AMapToData) e));
}

/**
Expand All @@ -903,13 +911,13 @@ public void verify() {
if(CompressedMatrixBlock.debug) {
for(int i = 0; i < size(); i++) {
if(getIndex(i) >= nUnique) {
throw new DMLCompressionException("invalid construction of Mapping data containing values above unique");
throw new DMLCompressionException("Invalid construction of Mapping data containing values above unique");
}
}
}
}

public void lmSparseMatrixRow(SparseBlock sb, final int r, DenseBlock db, final IColIndex colIndexes,
protected void lmSparseMatrixRow(SparseBlock sb, final int r, DenseBlock db, final IColIndex colIndexes,
final IDictionary dict) {
if(sb.isEmpty(r))
return;
Expand All @@ -934,7 +942,7 @@ public void decompressToRange(double[] c, int rl, int ru, int offR, double[] val
decompressToRangeOff(c, rl, ru, offR, values);
}

public void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
protected void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
for(int i = rl, offT = rl + offR; i < ru; i++, offT++)
c[offT] += values[getIndex(i)];
}
Expand All @@ -950,14 +958,73 @@ protected void decompressToRangeNoOffBy8(double[] c, int r, double[] values) {
c[r + 7] += values[getIndex(r + 7)];
}

public void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
protected void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
final int h = (ru - rl) % 8;
for(int rc = rl; rc < rl + h; rc++)
c[rc] += values[getIndex(rc)];
for(int rc = rl + h; rc < ru; rc += 8)
decompressToRangeNoOffBy8(c, rc, values);
}

/**
* Split this mapping into x smaller mappings according to round robin.
*
* @param multiplier The number of smaller mappings to construct
* @return The list of smaller mappings
*/
public AMapToData[] splitReshapeDDC(final int multiplier) {

final int s = size();
final AMapToData[] ret = new AMapToData[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = MapToFactory.create(eachSize, getUnique());

// for(int i = 0; i < s; i += multiplier)
// splitReshapeDDCRow(ret, multiplier, i);

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
for(int i = 0; i < s; i += blkz)
splitReshapeDDCBlock(ret, multiplier, i, Math.min(i + blkz, s));

return ret;
}

public AMapToData[] splitReshapeDDCPushDown(final int multiplier, final ExecutorService pool) throws Exception {

final int s = size();
final AMapToData[] ret = new AMapToData[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = MapToFactory.create(eachSize, getUnique());

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
List<Future<?>> tasks = new ArrayList<>();
for(int i = 0; i < s; i += blkz) {
final int start = i;
final int end = Math.min(i + blkz, s);
tasks.add(pool.submit(() -> splitReshapeDDCBlock(ret, multiplier, start, end)));
}

for(Future<?> t : tasks)
t.get();

return ret;
}

private void splitReshapeDDCBlock(final AMapToData[] ret, final int multiplier, final int start, final int end) {

for(int i = start; i < end; i += multiplier)
splitReshapeDDCRow(ret, multiplier, i);
}

private void splitReshapeDDCRow(final AMapToData[] ret, final int multiplier, final int i) {
final int off = i / multiplier;
final int end = i + multiplier;
for(int j = i; j < end; j++)
ret[j % multiplier].set(off, getIndex(j));
}

@Override
public String toString() {
final int sz = size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ public void copy(AMapToData d) {
}

@Override
public void copyInt(int[] d) {
for(int i = 0; i < _size; i++)
public void copyInt(int[] d, int start, int end) {
for(int i = start; i < end; i++)
set(i, d[i]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ public void replace(int v, int r) {
}

@Override
public void copyInt(int[] d) {
for(int i = 0; i < _data.length; i++)
public void copyInt(int[] d, int start, int end) {
for(int i = start; i < end; i++)
_data[i] = (byte) d[i];
}

Expand Down Expand Up @@ -320,13 +320,13 @@ public void decompressToRange(double[] c, int rl, int ru, int offR, double[] val
}

@Override
public void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
protected void decompressToRangeOff(double[] c, int rl, int ru, int offR, double[] values) {
for(int i = rl, offT = rl + offR; i < ru; i++, offT++)
c[offT] += values[getIndex(i)];
}

@Override
public void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
protected void decompressToRangeNoOff(double[] c, int rl, int ru, double[] values) {
// OVERWRITTEN FOR JIT COMPILE!
final int h = (ru - rl) % 8;
for(int rc = rl; rc < rl + h; rc++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.sysds.runtime.compress.colgroup.IMapToDataGroup;
Expand Down Expand Up @@ -92,6 +96,10 @@ public void set(int n, int v) {
_data[n] = (char) v;
}

public void set(int n, char v) {
_data[n] = v;
}

@Override
public int setAndGet(int n, int v) {
return _data[n] = (char) v;
Expand Down Expand Up @@ -208,8 +216,8 @@ public int getUpperBoundValue() {
}

@Override
public void copyInt(int[] d) {
for(int i = 0; i < _data.length; i++)
public void copyInt(int[] d, int start, int end) {
for(int i = start; i < end; i++)
_data[i] = (char) d[i];
}

Expand Down Expand Up @@ -391,4 +399,38 @@ protected final void preAggregateDDC_DDCSingleCol_vecChar(MapToChar tm, double[]
v[getIndex(r8)] += td[tm.getIndex(r8)];
}

@Override
public AMapToData[] splitReshapeDDCPushDown(final int multiplier, final ExecutorService pool) throws Exception {
final int s = size();
final MapToChar[] ret = new MapToChar[multiplier];
final int eachSize = s / multiplier;
for(int i = 0; i < multiplier; i++)
ret[i] = new MapToChar(getUnique(), eachSize);

final int blkz = Math.max(eachSize / 8, 2048) * multiplier;
List<Future<?>> tasks = new ArrayList<>();
for(int i = 0; i < s; i += blkz) {
final int start = i;
final int end = Math.min(i + blkz, s);
tasks.add(pool.submit(() -> splitReshapeDDCBlock(ret, multiplier, start, end)));
}

for(Future<?> t : tasks)
t.get();

return ret;
}

private void splitReshapeDDCBlock(final MapToChar[] ret, final int multiplier, final int start, final int end) {
for(int i = start; i < end; i += multiplier)
splitReshapeDDCRow(ret, multiplier, i);
}

private void splitReshapeDDCRow(final MapToChar[] ret, final int multiplier, final int i) {
final int off = i / multiplier;
final int end = i + multiplier;
for(int j = i; j < end; j++)
ret[j % multiplier]._data[off] = _data[j];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public int getUpperBoundValue() {
}

@Override
public void copyInt(int[] d) {
for(int i = 0; i < d.length; i++)
public void copyInt(int[] d, int start, int end) {
for(int i = start; i < end; i++)
set(i, d[i]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@

import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.utils.IntArrayList;
import org.apache.sysds.runtime.util.CommonThreadPool;

/** Interface for the factory design pattern for construction all AMapToData. */
public interface MapToFactory {
Expand Down Expand Up @@ -63,6 +69,26 @@ public static AMapToData create(int size, int[] values, int nUnique) {
return _data;
}

public static AMapToData create(int unique, IntArrayList values) {
AMapToData _data = create(values.size(), unique);
_data.copyInt(values.extractValues());
return _data;
}

public static AMapToData create(int size, int[] values, int nUnique, int k) {
AMapToData _data = create(size, nUnique);
ExecutorService pool = CommonThreadPool.get(k);
int blk = Math.max((values.length / k), 1024);
blk -= blk % 64; // ensure long size
List<Future<?>> tasks = new ArrayList<>();
for(int i = 0; i < values.length; i += blk){
int start = i;
int end = Math.min(i + blk, values.length);
tasks.add(pool.submit(() -> _data.copyInt(values, start, end)));
}
return _data;
}

/**
* Create and allocate a map with the given size and support for upto the num tuples argument of values
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ public int getUpperBoundValue() {
}

@Override
public void copyInt(int[] d) {
for(int i = 0; i < _data.length; i++)
public void copyInt(int[] d, int start, int end) {
for(int i = start; i < end; i++)
_data[i] = d[i];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void preAggregateDDC_DDCMultiCol(AMapToData tm, IDictionary td, double[]
}

@Override
public void copyInt(int[] d) {
public void copyInt(int[] d, int start, int end) {
// do nothing
}

Expand Down
Loading

0 comments on commit e37d0c0

Please sign in to comment.