Skip to content

Commit

Permalink
Optimize hash partition pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
xqrzd committed Jul 27, 2023
1 parent 53dc03f commit 1928552
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 29 deletions.
100 changes: 71 additions & 29 deletions src/Knet.Kudu.Client/Scanner/PartitionPruner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ private static BitArray PruneHashComponent(
{
var hashBuckets = new BitArray(hashSchema.NumBuckets);
List<int> columnIdxs = IdsToIndexes(schema, hashSchema.ColumnIds);
var predicateValueList = new List<List<byte[]>>();

foreach (int idx in columnIdxs)
{
Expand All @@ -526,49 +527,90 @@ private static BitArray PruneHashComponent(
hashBuckets.SetAll(true);
return hashBuckets;
}

var predicateValues = predicate.Type == PredicateType.Equality
? new List<byte[]> { predicate.Lower! }
: new List<byte[]>(predicate.InListValues!);

predicateValueList.Add(predicateValues);
}

var rows = new List<PartialRow> { new PartialRow(schema) };
var valuesCombination = new List<byte[]>();

foreach (int idx in columnIdxs)
ComputeHashBuckets(
schema, hashSchema, hashBuckets, columnIdxs,
predicateValueList, valuesCombination);

return hashBuckets;
}

/// <summary>
/// Pick all combinations and compute their hashes.
/// </summary>
/// <param name="schema">The table schema.</param>
/// <param name="hashSchema">The hash partition schema.</param>
/// <param name="hashBuckets">The result of this algorithm, a bit 0 means a partition can be pruned.</param>
/// <param name="columnIdxs">Column indexes of columns in the hash partition schema.</param>
/// <param name="predicateValueList">Values in in-list predicates of these columns.</param>
/// <param name="valuesCombination">A combination of in-list and equality predicates.</param>
private static void ComputeHashBuckets(
KuduSchema schema,
HashBucketSchema hashSchema,
BitArray hashBuckets,
List<int> columnIdxs,
List<List<byte[]>> predicateValueList,
List<byte[]> valuesCombination)
{
if (hashBuckets.Cardinality() == hashSchema.NumBuckets)
{
var newRows = new List<PartialRow>();
ColumnSchema column = schema.GetColumn(idx);
KuduPredicate predicate = predicates[column.Name];
SortedSet<byte[]> predicateValues;
return;
}

if (predicate.Type == PredicateType.Equality)
{
predicateValues = new SortedSet<byte[]> { predicate.Lower! };
}
else
{
predicateValues = predicate.InListValues!;
}
int level = valuesCombination.Count;

// For each of the encoded string, replicate it by the number of values in
// equality and in-list predicate.
foreach (PartialRow row in rows)
if (level == columnIdxs.Count)
{
// This 'valuesCombination' is a picked combination value for computing hash bucket.
//
// The algorithm is an argorithm like DFS, which pick value for every column in
// 'predicateValueList', 'valuesCombination' is the picked values.
//
// The valuesCombination is a value list picked by followings algorithm:
// 1. pick a value from predicateValueList[0] for the column who columnIdxs[0]
// stand for. Every value in predicateValueList[0] can be picked.
// The count of pick method is predicateValueList[0].Count.
// 2. pick a value from predicateValueList[1] for the column who columnIdxs[1]
// stand for.
// The count of pick method is predicateValueList[1].Count.
// 3. Do this like step 1,2 until the last one column value picked in
// 'predicateValueList[columnIdx.Count-1]' columnIdx[columnIdx.Count-1] stand for.
//
// The algorithm ends when all combinations has been searched.
// 'valuesCombination' saves a combination values of in-list predicates.
// So we use the 'valuesCombination' to construct a row, then compute its hash bucket.
var row = new PartialRow(schema);
for (int i = 0; i < valuesCombination.Count; i++)
{
foreach (byte[] predicateValue in predicateValues)
{
var newRow = new PartialRow(row);
newRow.SetRaw(idx, predicateValue);
newRows.Add(newRow);
}
row.SetRaw(columnIdxs[i], valuesCombination[i]);
}

rows = newRows;
}

foreach (PartialRow row in rows)
{
int maxSize = KeyEncoder.CalculateMaxPrimaryKeySize(row);
int hash = KeyEncoder.GetHashBucket(row, hashSchema, maxSize);
hashBuckets.Set(hash, true);
}
else
{
for (int i = 0; i < predicateValueList[level].Count; i++)
{
valuesCombination.Add(predicateValueList[level][i]);

return hashBuckets;
ComputeHashBuckets(
schema, hashSchema, hashBuckets, columnIdxs,
predicateValueList, valuesCombination);

valuesCombination.RemoveAt(valuesCombination.Count - 1);
}
}
}

private readonly struct PartitionKeyRangeBuilder
Expand Down
64 changes: 64 additions & 0 deletions test/Knet.Kudu.Client.FunctionalTests/PartitionPrunerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,70 @@ await CheckPartitionsAsync(8, 8, table, partitions,
KuduPredicate.NewInListPredicate(a, new byte[] { 0, 1 }),
KuduPredicate.NewInListPredicate(b, new byte[] { 0, 1 }),
KuduPredicate.NewInListPredicate(c, new byte[] { 0, 1 }));

// a in [0, 1, 2], b in [0, 1, 2], c in [0, 1, 2];
await CheckPartitionsAsync(8, 8, table, partitions,
KuduPredicate.NewInListPredicate(a, new byte[] { 0, 1, 2 }),
KuduPredicate.NewInListPredicate(b, new byte[] { 0, 1, 2 }),
KuduPredicate.NewInListPredicate(c, new byte[] { 0, 1, 2 }));

// a in [0, 1, 2, 3], b in [0, 1, 2, 3], c in [0, 1, 2, 3];
await CheckPartitionsAsync(8, 8, table, partitions,
KuduPredicate.NewInListPredicate(a, new byte[] { 0, 1, 2, 3 }),
KuduPredicate.NewInListPredicate(b, new byte[] { 0, 1, 2, 3 }),
KuduPredicate.NewInListPredicate(c, new byte[] { 0, 1, 2, 3 }));

var expectedList = new List<List<int>>
{
new List<int> { 1, 1 },
new List<int> { 8, 8 },
new List<int> { 8, 8 },
new List<int> { 8, 8 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 },
new List<int> { 27, 1 }
};

for (int size = 1; size <= 10; size++)
{
int columnCount = table.Schema.Columns.Count;
var testCases = new List<List<byte>>();

for (int i = 0; i < columnCount; i++)
{
var testCase = new List<byte>();
for (int j = 0; j < size; j++)
{
testCase.Add((byte)j);
}

testCases.Add(testCase);
}

var scanBuilder = _client.NewScanBuilder(table);

var columnSchemas = new List<ColumnSchema> { a, b, c };
var predicates = new KuduPredicate[3];

for (int i = 0; i < 3; i++)
{
predicates[i] = KuduPredicate.NewInListPredicate(
columnSchemas[i], testCases[i]);

scanBuilder.AddPredicate(predicates[i]);
}

await CheckPartitionsAsync(
expectedList[size - 1][0],
expectedList[size - 1][1],
table, partitions, predicates);

var partitionSchema = scanBuilder.Table.PartitionSchema;
Assert.Equal(columnCount, partitionSchema.HashBucketSchemas.Count);
}
}

[SkippableFact]
Expand Down

0 comments on commit 1928552

Please sign in to comment.