Skip to content

Commit

Permalink
[SYSTEMDS-3444][SYSTEMDS-2699] Compressed I/O
Browse files Browse the repository at this point in the history
This commit is a major overhaul of the writing and reading of compressed
matrices.

The design is now changed to write dictionaries separately and reading
in both local and spark is working. Where a spark read will combine the
dictionaries written in a distributed execution.

Also contained in this PR is updates and refinements of the schema apply
that now in a fused manner of update and apply can compress a matrix
single-threaded at around 669MiB/s and multi-threaded 2GiB.
This is done via first a full materialization of the compressed format
in memory meaning that there is potential for further speedup if we
relocate this compression on the IO path. But this is left for future
work.

One major improvement that makes our default compression faster as well is
ACountHashMap.java now generalize the counting hashmap between the
co-coded columns and single columns and optimized the increment calls
for improved performance.

The Co-Coding algorithm has also been slightly modified in this PR
to add a small fraction to the cost of column groups depending on their
column indexes. this makes it so that columns with the same cost are sorted
based on their average column indexes, and in turn, improve the compression
time of highly compressible data such as binary or ultra-sparse data.

The PR also fixed the Nan Compression to not be treated specially to
allow us to compress matrices containing Nan and then afterward we can
replace Nan in an already compressed representation. Before the behavior
was to replace all Nan Values with 0.

Future work is to parallelize the reading of compressed matrices, which
currently only is single threaded in the CP case.

In the serialization performance benchmark, this commit moves the size
calculation outside of the timed part. and improves the general code
evaluation of individual functions.

Closes apache#1880
  • Loading branch information
Baunsgaard committed Aug 21, 2023
1 parent 884ad3a commit a54f513
Show file tree
Hide file tree
Showing 136 changed files with 5,308 additions and 3,190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.sysds.runtime.compress.lib.CLALibDecompress;
import org.apache.sysds.runtime.compress.lib.CLALibMMChain;
import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult;
import org.apache.sysds.runtime.compress.lib.CLALibMerge;
import org.apache.sysds.runtime.compress.lib.CLALibRexpand;
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
Expand Down Expand Up @@ -98,7 +99,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
/**
* Debugging flag for Compressed Matrices
*/
public static boolean debug = true;
public static boolean debug = false;

/**
* Column groups
Expand Down Expand Up @@ -510,7 +511,8 @@ public void append(MatrixValue v2, ArrayList<IndexedMatrixValue> outlist, int bl
}

@Override
public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k) {
public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype,
int k) {

checkMMChain(ctype, v, w);
// multi-threaded MMChain of single uncompressed ColGroup
Expand Down Expand Up @@ -1099,7 +1101,8 @@ public void appendRow(int r, SparseRow row, boolean deep) {
}

@Override
public void appendRowToSparse(SparseBlock dest, MatrixBlock src, int i, int rowoffset, int coloffset, boolean deep) {
public void appendRowToSparse(SparseBlock dest, MatrixBlock src, int i, int rowoffset, int coloffset,
boolean deep) {
throw new DMLCompressionException("Can't append row to compressed Matrix");
}

Expand Down Expand Up @@ -1159,8 +1162,8 @@ public void sparseToDense() {
}

@Override
public void merge(MatrixBlock that, boolean appendOnly, boolean par, boolean deep) {
throw new NotImplementedException();
public MatrixBlock merge(MatrixBlock that, boolean appendOnly, boolean par, boolean deep) {
return CLALibMerge.merge(this, that, appendOnly, par, deep);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) {
final int startSize = colInfos.getInfo().size();
if(startSize == 1)
return colInfos; // nothing to join when there only is one column
else if(startSize <= 5) {// Greedy all compare all if small number of columns
else if(startSize <= 16) {// Greedy all compare all if small number of columns
LOG.debug("Hybrid chose to do greedy cocode because of few columns");
CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
return colInfos.setInfo(gd.combine(colInfos.getInfo(), k));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,17 @@ public long getExactSizeOnDisk() {
* dictionary and _columnIndexes correctly aligned with the expected sliced compressed matrix.
*/
public final AColGroup sliceColumns(int cl, int cu) {
AColGroup ret = (cu - cl == 1) ? sliceColumn(cl) : sliceMultiColumns(cl, cu);
return ret;
if(cl <= _colIndexes.get(0) && cu > _colIndexes.get(_colIndexes.size() - 1)) {
if(cl == 0)
return this;
else
return this.shiftColIndices(-cl);
}
else if(cu - cl == 1)
return sliceColumn(cl);
else
return sliceMultiColumns(cl, cu);

}

/**
Expand Down Expand Up @@ -595,8 +604,8 @@ public AColGroup addVector(double[] v) {
public abstract boolean isEmpty();

/**
* Append the other column group to this column group. This method tries to combine them to return a new column group
* containing both. In some cases it is possible in reasonable time, in others it is not.
* Append the other column group to this column group. This method tries to combine them to return a new column
* group containing both. In some cases it is possible in reasonable time, in others it is not.
*
* The result is first this column group followed by the other column group in higher row values.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.DMLScriptException;
import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.data.SparseBlock;
Expand Down Expand Up @@ -205,7 +205,7 @@ public final void tsmm(MatrixBlock ret, int nRows) {

protected abstract void tsmm(double[] result, int numColumns, int nRows);

protected static void tsmm(double[] result, int numColumns, int[] counts, ADictionary dict, IColIndex colIndexes) {
protected static void tsmm(double[] result, int numColumns, int[] counts, IDictionary dict, IColIndex colIndexes) {
dict = dict.getMBDict(colIndexes.size());
final MatrixBlock mb = ((MatrixBlockDictionary) dict).getMatrixBlock();
if(mb.isInSparseFormat())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.io.DataOutput;
import java.io.IOException;

import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.utils.MemoryEstimates;

Expand All @@ -46,7 +46,7 @@ public abstract class AColGroupOffset extends APreAgg {
/** If the column group contains unassigned rows. */
protected final boolean _zeros;

protected AColGroupOffset(IColIndex colIndices, int numRows, boolean zeros, ADictionary dict, int[] ptr, char[] data, int[] cachedCounts) {
protected AColGroupOffset(IColIndex colIndices, int numRows, boolean zeros, IDictionary dict, int[] ptr, char[] data, int[] cachedCounts) {
super(colIndices, dict, cachedCounts);
_numRows = numRows;
_zeros = zeros;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.lang.ref.SoftReference;

import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.functionobjects.Builtin;
Expand All @@ -36,14 +36,14 @@ public abstract class AColGroupValue extends ADictBasedColGroup {
private SoftReference<int[]> counts = null;

/**
* A abstract class for column groups that contain ADictionary for values.
* A abstract class for column groups that contain IDictionary for values.
*
* @param colIndices The Column indexes
* @param dict The dictionary to contain the distinct tuples
* @param cachedCounts The cached counts of the distinct tuples (can be null since it should be possible to
* reconstruct the counts on demand)
*/
protected AColGroupValue(IColIndex colIndices, ADictionary dict, int[] cachedCounts) {
protected AColGroupValue(IColIndex colIndices, IDictionary dict, int[] cachedCounts) {
super(colIndices, dict);
if(cachedCounts != null)
counts = new SoftReference<>(cachedCounts);
Expand All @@ -55,9 +55,9 @@ public int getNumValues() {
}

/**
* Returns the counts of values inside the dictionary. If already calculated it will return the previous counts. This
* produce an overhead in cases where the count is calculated, but the overhead will be limited to number of distinct
* tuples in the dictionary.
* Returns the counts of values inside the dictionary. If already calculated it will return the previous counts.
* This produce an overhead in cases where the count is calculated, but the overhead will be limited to number of
* distinct tuples in the dictionary.
*
* The returned counts always contains the number of zero tuples as well if there are some contained, even if they
* are not materialized.
Expand Down Expand Up @@ -143,7 +143,7 @@ protected AColGroup sliceSingleColumn(int idx) {
if(_colIndexes.size() == 1)
return copyAndSet(retIndexes, _dict);

final ADictionary retDict = _dict.sliceOutColumnRange(idx, idx + 1, _colIndexes.size());
final IDictionary retDict = _dict.sliceOutColumnRange(idx, idx + 1, _colIndexes.size());
if(retDict == null)
return new ColGroupEmpty(retIndexes);
else
Expand All @@ -153,7 +153,7 @@ protected AColGroup sliceSingleColumn(int idx) {

@Override
protected AColGroup sliceMultiColumns(int idStart, int idEnd, IColIndex outputCols) {
final ADictionary retDict = _dict.sliceOutColumnRange(idStart, idEnd, _colIndexes.size());
final IDictionary retDict = _dict.sliceOutColumnRange(idStart, idEnd, _colIndexes.size());
if(retDict == null)
return new ColGroupEmpty(outputCols);

Expand Down Expand Up @@ -184,7 +184,7 @@ public long estimateInMemorySize() {

@Override
public AColGroup replace(double pattern, double replace) {
ADictionary replaced = _dict.replace(pattern, replace, _colIndexes.size());
IDictionary replaced = _dict.replace(pattern, replace, _colIndexes.size());
return copyAndSet(replaced);
}

Expand All @@ -196,7 +196,7 @@ public CM_COV_Object centralMoment(CMOperator op, int nRows) {
@Override
public AColGroup rexpandCols(int max, boolean ignore, boolean cast, int nRows) {
try {
ADictionary d = _dict.rexpandCols(max, ignore, cast, _colIndexes.size());
IDictionary d = _dict.rexpandCols(max, ignore, cast, _colIndexes.size());
if(d == null)
return ColGroupEmpty.create(max);
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
Expand All @@ -37,23 +37,23 @@
public abstract class ADictBasedColGroup extends AColGroupCompressed implements IContainADictionary {
private static final long serialVersionUID = -3737025296618703668L;
/** Distinct value tuples associated with individual bitmaps. */
protected final ADictionary _dict;
protected final IDictionary _dict;

/**
* A Abstract class for column groups that contain ADictionary for values.
* A Abstract class for column groups that contain IDictionary for values.
*
* @param colIndices The Column indexes
* @param dict The dictionary to contain the distinct tuples
*/
protected ADictBasedColGroup(IColIndex colIndices, ADictionary dict) {
protected ADictBasedColGroup(IColIndex colIndices, IDictionary dict) {
super(colIndices);
_dict = dict;
if(dict == null)
throw new NullPointerException("null dict is invalid");

}

public ADictionary getDictionary() {
public IDictionary getDictionary() {
return _dict;
}

Expand Down Expand Up @@ -197,14 +197,14 @@ public final AColGroup rightMultByMatrix(MatrixBlock right, IColIndex allCols) {
return null;

final int nVals = getNumValues();
final ADictionary preAgg = (right.isInSparseFormat()) ? // Chose Sparse or Dense
final IDictionary preAgg = (right.isInSparseFormat()) ? // Chose Sparse or Dense
rightMMPreAggSparse(nVals, right.getSparseBlock(), agCols, 0, nCol) : // sparse
_dict.preaggValuesFromDense(nVals, _colIndexes, agCols, right.getDenseBlockValues(), nCol); // dense
return allocateRightMultiplication(right, agCols, preAgg);
}

protected abstract AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes,
ADictionary preAgg);
IDictionary preAgg);

/**
* Find the minimum number of columns that are effected by the right multiplication
Expand Down Expand Up @@ -269,7 +269,7 @@ protected IColIndex rightMMGetColsSparse(SparseBlock b, int retCols, IColIndex a
return ColIndexFactory.create(aggregateColumns);
}

private ADictionary rightMMPreAggSparse(int numVals, SparseBlock b, IColIndex aggregateColumns, int cl, int cu) {
private IDictionary rightMMPreAggSparse(int numVals, SparseBlock b, IColIndex aggregateColumns, int cl, int cu) {
final double[] ret = new double[numVals * aggregateColumns.size()];
for(int h = 0; h < _colIndexes.size(); h++) {
final int colIdx = _colIndexes.get(h);
Expand Down Expand Up @@ -300,10 +300,19 @@ public final AColGroup copyAndSet(IColIndex colIndexes) {
return copyAndSet(colIndexes, _dict);
}

protected final AColGroup copyAndSet(ADictionary newDictionary) {
/**
* This method copies the column group and sets the dictionary to another dictionary. The method shallow copies
* underlying data structures.
*
* NOTE: Be very carefull with this since it invalidate the contracts maintained via standard compression.
*
* @param newDictionary A new dictionary to point to.
* @return A new shallow copy of the column group
*/
public final AColGroup copyAndSet(IDictionary newDictionary) {
return copyAndSet(_colIndexes, newDictionary);
}

protected abstract AColGroup copyAndSet(IColIndex colIndexes, ADictionary newDictionary);
protected abstract AColGroup copyAndSet(IColIndex colIndexes, IDictionary newDictionary);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.sysds.runtime.compress.colgroup;

import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.lib.CLALibLeftMultBy;
import org.apache.sysds.runtime.compress.lib.CLALibTSMM;
Expand All @@ -37,14 +37,14 @@ public abstract class AMorphingMMColGroup extends AColGroupValue {
private static final long serialVersionUID = -4265713396790607199L;

/**
* A Abstract class for column groups that contain ADictionary for values.
* A Abstract class for column groups that contain IDictionary for values.
*
* @param colIndices The Column indexes
* @param dict The dictionary to contain the distinct tuples
* @param cachedCounts The cached counts of the distinct tuples (can be null since it should be possible to
* reconstruct the counts on demand)
*/
protected AMorphingMMColGroup(IColIndex colIndices, ADictionary dict, int[] cachedCounts) {
protected AMorphingMMColGroup(IColIndex colIndices, IDictionary dict, int[] cachedCounts) {
super(colIndices, dict, cachedCounts);
}

Expand Down Expand Up @@ -161,7 +161,7 @@ protected IColIndex rightMMGetColsSparse(SparseBlock b, int nCols, IColIndex all
}

@Override
protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes, ADictionary preAgg) {
protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes, IDictionary preAgg) {
LOG.warn("right mm should not be called directly on a morphing column group");
final double[] common = getCommon();
final int rc = right.getNumColumns();
Expand Down Expand Up @@ -195,7 +195,7 @@ protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex col
}

protected abstract AColGroup allocateRightMultiplicationCommon(double[] common, IColIndex colIndexes,
ADictionary preAgg);
IDictionary preAgg);

/**
* extract common value from group and return non morphing group
Expand Down
Loading

0 comments on commit a54f513

Please sign in to comment.