Skip to content

Commit

Permalink
[SYSTEMDS-3782] Bag-of-words Encoder for CP
Browse files Browse the repository at this point in the history
Closes #2130.
  • Loading branch information
e-strauss authored and mboehm7 committed Oct 25, 2024
1 parent 2f2117c commit 654cea9
Show file tree
Hide file tree
Showing 19 changed files with 3,142 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private static int distinctCountWithHistogram(int numVals, int[] invHist, int[]
}
}

private static int[] getInvertedFrequencyHistogram(int[] frequencies) {
public static int[] getInvertedFrequencyHistogram(int[] frequencies) {
try{

final int numVals = frequencies.length;
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/org/apache/sysds/runtime/data/SparseBlockCSR.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,16 @@ public void initSparse(int rlen, int nnz, DataInput in)
*/
public static long estimateSizeInMemory(long nrows, long ncols, double sparsity) {
double lnnz = Math.max(INIT_CAPACITY, Math.ceil(sparsity*nrows*ncols));

//32B overhead per array, int arr in nrows, int/double arr in nnz
return estimateSizeInMemory(nrows, (long) lnnz);
}

public static long estimateSizeInMemory(long nrows, long nnz) {
//32B overhead per array, int arr in nrows, int/double arr in nnz
double size = 16 + 4 + 4; //object + int field + padding
size += MemoryEstimates.intArrayCost(nrows+1); //ptr array (row pointers)
size += MemoryEstimates.intArrayCost((long) lnnz); //indexes array (column indexes)
size += MemoryEstimates.doubleArrayCost((long) lnnz);//values array (non-zero values)
size += MemoryEstimates.intArrayCost(nnz); //indexes array (column indexes)
size += MemoryEstimates.doubleArrayCost(nnz);//values array (non-zero values)

//robustness for long overflows
return (long) Math.min(size, Long.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected byte toID() {

//transform methods
public enum TfMethod {
IMPUTE, RECODE, HASH, BIN, DUMMYCODE, UDF, OMIT, WORD_EMBEDDING;
IMPUTE, RECODE, HASH, BIN, DUMMYCODE, UDF, OMIT, WORD_EMBEDDING, BAG_OF_WORDS;
@Override
public String toString() {
return name().toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
private static final long serialVersionUID = 2299156350718979064L;
protected int _colID;
protected ArrayList<Integer> _sparseRowsWZeros = null;
protected int[] sparseRowPointerOffset = null; // offsets created by bag of words encoders (multiple nnz)
protected long _estMetaSize = 0;
protected int _estNumDistincts = 0;
protected int _nBuildPartitions = 0;
protected int _nApplyPartitions = 0;
protected long _avgEntrySize = 0;

//Override in ColumnEncoderWordEmbedding
public void initEmbeddings(MatrixBlock embeddings){
return;
}

protected enum TransformType{
BIN, RECODE, DUMMYCODE, FEATURE_HASH, PASS_THROUGH, UDF, WORD_EMBEDDING, N_A
BIN, RECODE, DUMMYCODE, FEATURE_HASH, PASS_THROUGH, UDF, WORD_EMBEDDING, BAG_OF_WORDS, N_A
}

protected ColumnEncoder(int colID) {
Expand Down Expand Up @@ -115,6 +117,9 @@ public MatrixBlock apply(CacheBlock<?> in, MatrixBlock out, int outputCol, int r
case WORD_EMBEDDING:
TransformStatistics.incWordEmbeddingApplyTime(t);
break;
case BAG_OF_WORDS:
TransformStatistics.incBagOfWordsApplyTime(t);
break;
case FEATURE_HASH:
TransformStatistics.incFeatureHashingApplyTime(t);
break;
Expand Down Expand Up @@ -152,6 +157,7 @@ protected void applySparse(CacheBlock<?> in, MatrixBlock out, int outputCol, int
for(int i = rowStart; i < rowEnd; i+=B) {
int lim = Math.min(i+B, rowEnd);
for (int ii=i; ii<lim; ii++) {
int indexWithOffset = sparseRowPointerOffset != null ? sparseRowPointerOffset[ii] - 1 + index : index;
if (mcsr) {
SparseRowVector row = (SparseRowVector) out.getSparseBlock().get(ii);
row.values()[index] = codes[ii-rowStart];
Expand All @@ -161,8 +167,8 @@ protected void applySparse(CacheBlock<?> in, MatrixBlock out, int outputCol, int
// Manually fill the column-indexes and values array
SparseBlockCSR csrblock = (SparseBlockCSR)out.getSparseBlock();
int rptr[] = csrblock.rowPointers();
csrblock.indexes()[rptr[ii]+index] = outputCol;
csrblock.values()[rptr[ii]+index] = codes[ii-rowStart];
csrblock.indexes()[rptr[ii]+indexWithOffset] = outputCol;
csrblock.values()[rptr[ii]+indexWithOffset] = codes[ii-rowStart];
}
}
}
Expand Down Expand Up @@ -336,6 +342,11 @@ public int getEstNumDistincts() {
return _estNumDistincts;
}

public void computeMapSizeEstimate(CacheBlock<?> in, int[] sampleIndices) {
throw new DMLRuntimeException(this + " does not need map size estimation");
}


@Override
public int compareTo(ColumnEncoder o) {
return Integer.compare(getEncoderType(this), getEncoderType(o));
Expand All @@ -355,9 +366,11 @@ public List<DependencyTask<?>> getBuildTasks(CacheBlock<?> in) {
tasks.add(getBuildTask(in));
}
else {
if(this instanceof ColumnEncoderBagOfWords)
((ColumnEncoderBagOfWords) this).initNnzPartials(in.getNumRows(), blockSizes.length);
HashMap<Integer, Object> ret = new HashMap<>();
for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++)
tasks.add(getPartialBuildTask(in, startRow, blockSizes[i], ret));
tasks.add(getPartialBuildTask(in, startRow, blockSizes[i], ret, i));
tasks.add(getPartialMergeBuildTask(ret));
dep = new ArrayList<>(Collections.nCopies(tasks.size() - 1, null));
dep.add(tasks.subList(0, tasks.size() - 1));
Expand All @@ -370,7 +383,7 @@ public Callable<Object> getBuildTask(CacheBlock<?> in) {
}

public Callable<Object> getPartialBuildTask(CacheBlock<?> in, int startRow,
int blockSize, HashMap<Integer, Object> ret) {
int blockSize, HashMap<Integer, Object> ret, int p) {
throw new DMLRuntimeException(
"Trying to get the PartialBuild task of an Encoder which does not support partial building");
}
Expand All @@ -381,11 +394,12 @@ public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> ret) {
}


public List<DependencyTask<?>> getApplyTasks(CacheBlock<?> in, MatrixBlock out, int outputCol) {
public List<DependencyTask<?>> getApplyTasks(CacheBlock<?> in, MatrixBlock out, int outputCol, int[] sparseRowPointerOffsets) {
List<Callable<Object>> tasks = new ArrayList<>();
List<List<? extends Callable<?>>> dep = null;
//for now single threaded apply for bag of words
int[] blockSizes = getBlockSizes(in.getNumRows(), _nApplyPartitions);

this.sparseRowPointerOffset = out.isInSparseFormat() ? sparseRowPointerOffsets : null;
for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){
if(out.isInSparseFormat())
tasks.add(getSparseTask(in, out, outputCol, startRow, blockSizes[i]));
Expand Down Expand Up @@ -419,7 +433,7 @@ public Set<Integer> getSparseRowsWZeros(){
return null;
}

protected void addSparseRowsWZeros(ArrayList<Integer> sparseRowsWZeros){
protected void addSparseRowsWZeros(List<Integer> sparseRowsWZeros){
synchronized (this){
if(_sparseRowsWZeros == null)
_sparseRowsWZeros = new ArrayList<>();
Expand Down
Loading

0 comments on commit 654cea9

Please sign in to comment.