Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce space complexity of hash partition pruning #260

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 71 additions & 29 deletions src/Knet.Kudu.Client/Scanner/PartitionPruner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ private static BitArray PruneHashComponent(
{
var hashBuckets = new BitArray(hashSchema.NumBuckets);
List<int> columnIdxs = IdsToIndexes(schema, hashSchema.ColumnIds);
var predicateValueList = new List<List<byte[]>>();

foreach (int idx in columnIdxs)
{
Expand All @@ -526,49 +527,90 @@ private static BitArray PruneHashComponent(
hashBuckets.SetAll(true);
return hashBuckets;
}

var predicateValues = predicate.Type == PredicateType.Equality
? new List<byte[]> { predicate.Lower! }
: new List<byte[]>(predicate.InListValues!);

predicateValueList.Add(predicateValues);
}

var rows = new List<PartialRow> { new PartialRow(schema) };
var valuesCombination = new List<byte[]>();

foreach (int idx in columnIdxs)
ComputeHashBuckets(
schema, hashSchema, hashBuckets, columnIdxs,
predicateValueList, valuesCombination);

return hashBuckets;
}

/// <summary>
/// Pick all combinations and compute their hashes.
/// </summary>
/// <param name="schema">The table schema.</param>
/// <param name="hashSchema">The hash partition schema.</param>
/// <param name="hashBuckets">The result of this algorithm, a bit 0 means a partition can be pruned.</param>
/// <param name="columnIdxs">Column indexes of columns in the hash partition schema.</param>
/// <param name="predicateValueList">Values in in-list predicates of these columns.</param>
/// <param name="valuesCombination">A combination of in-list and equality predicates.</param>
private static void ComputeHashBuckets(
KuduSchema schema,
HashBucketSchema hashSchema,
BitArray hashBuckets,
List<int> columnIdxs,
List<List<byte[]>> predicateValueList,
List<byte[]> valuesCombination)
{
if (hashBuckets.Cardinality() == hashSchema.NumBuckets)
{
var newRows = new List<PartialRow>();
ColumnSchema column = schema.GetColumn(idx);
KuduPredicate predicate = predicates[column.Name];
SortedSet<byte[]> predicateValues;
return;
}

if (predicate.Type == PredicateType.Equality)
{
predicateValues = new SortedSet<byte[]> { predicate.Lower! };
}
else
{
predicateValues = predicate.InListValues!;
}
int level = valuesCombination.Count;

// For each of the encoded string, replicate it by the number of values in
// equality and in-list predicate.
foreach (PartialRow row in rows)
if (level == columnIdxs.Count)
{
// This 'valuesCombination' is a picked combination value for computing hash bucket.
//
// The algorithm is an argorithm like DFS, which pick value for every column in
// 'predicateValueList', 'valuesCombination' is the picked values.
//
// The valuesCombination is a value list picked by followings algorithm:
// 1. pick a value from predicateValueList[0] for the column who columnIdxs[0]
// stand for. Every value in predicateValueList[0] can be picked.
// The count of pick method is predicateValueList[0].Count.
// 2. pick a value from predicateValueList[1] for the column who columnIdxs[1]
// stand for.
// The count of pick method is predicateValueList[1].Count.
// 3. Do this like step 1,2 until the last one column value picked in
// 'predicateValueList[columnIdx.Count-1]' columnIdx[columnIdx.Count-1] stand for.
//
// The algorithm ends when all combinations has been searched.
// 'valuesCombination' saves a combination values of in-list predicates.
// So we use the 'valuesCombination' to construct a row, then compute its hash bucket.
var row = new PartialRow(schema);
for (int i = 0; i < valuesCombination.Count; i++)
{
foreach (byte[] predicateValue in predicateValues)
{
var newRow = new PartialRow(row);
newRow.SetRaw(idx, predicateValue);
newRows.Add(newRow);
}
row.SetRaw(columnIdxs[i], valuesCombination[i]);
}

rows = newRows;
}

foreach (PartialRow row in rows)
{
int maxSize = KeyEncoder.CalculateMaxPrimaryKeySize(row);
int hash = KeyEncoder.GetHashBucket(row, hashSchema, maxSize);
hashBuckets.Set(hash, true);
}
else
{
for (int i = 0; i < predicateValueList[level].Count; i++)
{
valuesCombination.Add(predicateValueList[level][i]);

return hashBuckets;
ComputeHashBuckets(
schema, hashSchema, hashBuckets, columnIdxs,
predicateValueList, valuesCombination);

valuesCombination.RemoveAt(valuesCombination.Count - 1);
}
}
}

private readonly struct PartitionKeyRangeBuilder
Expand Down
64 changes: 64 additions & 0 deletions test/Knet.Kudu.Client.FunctionalTests/PartitionPrunerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,70 @@ await CheckPartitionsAsync(8, 8, table, partitions,
KuduPredicate.NewInListPredicate(a, new byte[] { 0, 1 }),
KuduPredicate.NewInListPredicate(b, new byte[] { 0, 1 }),
KuduPredicate.NewInListPredicate(c, new byte[] { 0, 1 }));

// a in [0, 1, 2], b in [0, 1, 2], c in [0, 1, 2];
await CheckPartitionsAsync(8, 8, table, partitions,
KuduPredicate.NewInListPredicate(a, new byte[] { 0, 1, 2 }),
KuduPredicate.NewInListPredicate(b, new byte[] { 0, 1, 2 }),
KuduPredicate.NewInListPredicate(c, new byte[] { 0, 1, 2 }));

// a in [0, 1, 2, 3], b in [0, 1, 2, 3], c in [0, 1, 2, 3];
await CheckPartitionsAsync(8, 8, table, partitions,
KuduPredicate.NewInListPredicate(a, new byte[] { 0, 1, 2, 3 }),
KuduPredicate.NewInListPredicate(b, new byte[] { 0, 1, 2, 3 }),
KuduPredicate.NewInListPredicate(c, new byte[] { 0, 1, 2, 3 }));

var expectedList = new List<List<int>>
{
new List<int> { 1, 1 },
new List<int> { 8, 8 },
new List<int> { 8, 8 },
new List<int> { 8, 8 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 }
};

for (int size = 1; size <= 10; size++)
{
int columnCount = table.Schema.Columns.Count;
var testCases = new List<List<byte>>();

for (int i = 0; i < columnCount; i++)
{
var testCase = new List<byte>();
for (int j = 0; j < size; j++)
{
testCase.Add((byte)j);
}

testCases.Add(testCase);
}

var scanBuilder = _client.NewScanBuilder(table);

var columnSchemas = new List<ColumnSchema> { a, b, c };
var predicates = new KuduPredicate[3];

for (int i = 0; i < 3; i++)
{
predicates[i] = KuduPredicate.NewInListPredicate(
columnSchemas[i], testCases[i]);

scanBuilder.AddPredicate(predicates[i]);
}

await CheckPartitionsAsync(
expectedList[size - 1][0],
expectedList[size - 1][1],
table, partitions, predicates);

var partitionSchema = scanBuilder.Table.PartitionSchema;
Assert.Equal(columnCount, partitionSchema.HashBucketSchemas.Count);
}
}

[SkippableFact]
Expand Down