diff --git a/velox/common/hyperloglog/HllUtils.h b/velox/common/hyperloglog/HllUtils.h index 6ad82de77329..0c32bdb5204b 100644 --- a/velox/common/hyperloglog/HllUtils.h +++ b/velox/common/hyperloglog/HllUtils.h @@ -23,7 +23,8 @@ namespace facebook::velox::common::hll { constexpr double kLowestMaxStandardError = 0.0040625; constexpr double kHighestMaxStandardError = 0.26000; -constexpr double kDefaultStandardError = 0.023; +constexpr double kDefaultApproxDistinctStandardError = 0.023; +constexpr double kDefaultApproxSetStandardError = 0.01625; const int8_t kPrestoSparseV2 = 2; const int8_t kPrestoDenseV2 = 3; diff --git a/velox/functions/prestosql/HyperLogLogFunctions.h b/velox/functions/prestosql/HyperLogLogFunctions.h index 0865ad29ee7f..8dc85d1ef58f 100644 --- a/velox/functions/prestosql/HyperLogLogFunctions.h +++ b/velox/functions/prestosql/HyperLogLogFunctions.h @@ -48,7 +48,7 @@ struct EmptyApproxSetFunction { FOLLY_ALWAYS_INLINE bool call(out_type& result) { static const std::string kEmpty = - common::hll::SparseHll::serializeEmpty(11); + common::hll::SparseHll::serializeEmpty(12); result.resize(kEmpty.size()); memcpy(result.data(), kEmpty.data(), kEmpty.size()); diff --git a/velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp b/velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp index 56757f853d5c..55ed6fdc03ba 100644 --- a/velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp +++ b/velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp @@ -131,10 +131,12 @@ class ApproxDistinctAggregate : public exec::Aggregate { explicit ApproxDistinctAggregate( const TypePtr& resultType, bool hllAsFinalResult, - bool hllAsRawInput) + bool hllAsRawInput, + double defaultError) : exec::Aggregate(resultType), hllAsFinalResult_{hllAsFinalResult}, - hllAsRawInput_{hllAsRawInput} {} + hllAsRawInput_{hllAsRawInput}, + indexBitLength_{common::hll::toIndexBitLength(defaultError)} {} int32_t accumulatorFixedWidthSize() const override { return sizeof(HllAccumulator); @@ -405,8 +407,7 @@ class ApproxDistinctAggregate : public exec::Aggregate { /// serialized HLLs. const bool hllAsRawInput_; - int8_t indexBitLength_{ - common::hll::toIndexBitLength(common::hll::kDefaultStandardError)}; + int8_t indexBitLength_; double maxStandardError_{-1}; DecodedVector decodedValue_; DecodedVector decodedMaxStandardError_; @@ -417,10 +418,11 @@ template std::unique_ptr createApproxDistinct( const TypePtr& resultType, bool hllAsFinalResult, - bool hllAsRawInput) { + bool hllAsRawInput, + double defaultError) { using T = typename TypeTraits::NativeType; return std::make_unique>( - resultType, hllAsFinalResult, hllAsRawInput); + resultType, hllAsFinalResult, hllAsRawInput, defaultError); } exec::AggregateRegistrationResult registerApproxDistinct( @@ -428,7 +430,8 @@ exec::AggregateRegistrationResult registerApproxDistinct( bool hllAsFinalResult, bool hllAsRawInput, bool withCompanionFunctions, - bool overwrite) { + bool overwrite, + double defaultError) { auto returnType = hllAsFinalResult ? "hyperloglog" : "bigint"; std::vector> signatures; @@ -484,7 +487,7 @@ exec::AggregateRegistrationResult registerApproxDistinct( return exec::registerAggregateFunction( name, std::move(signatures), - [name, hllAsFinalResult, hllAsRawInput]( + [name, hllAsFinalResult, hllAsRawInput, defaultError]( core::AggregationNode::Step /*step*/, const std::vector& argTypes, const TypePtr& resultType, @@ -496,7 +499,8 @@ exec::AggregateRegistrationResult registerApproxDistinct( type->kind(), resultType, hllAsFinalResult, - hllAsRawInput); + hllAsRawInput, + defaultError); }, withCompanionFunctions, overwrite); @@ -516,11 +520,24 @@ void registerApproxDistinctAggregates( false, false, withCompanionFunctions, - overwrite); + overwrite, + common::hll::kDefaultApproxDistinctStandardError); // approx_set and merge are already companion functions themselves. Don't // register companion functions for them. - registerApproxDistinct(prefix + kApproxSet, true, false, false, overwrite); - registerApproxDistinct(prefix + kMerge, true, true, false, overwrite); + registerApproxDistinct( + prefix + kApproxSet, + true, + false, + false, + overwrite, + common::hll::kDefaultApproxSetStandardError); + registerApproxDistinct( + prefix + kMerge, + true, + true, + false, + overwrite, + common::hll::kDefaultApproxSetStandardError); } } // namespace facebook::velox::aggregate::prestosql diff --git a/velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp b/velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp index 9f76d10f2be1..5156a084ce42 100644 --- a/velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp @@ -32,8 +32,7 @@ class ApproxDistinctTest : public AggregationTestBase { void testGlobalAgg( const VectorPtr& values, double maxStandardError, - int64_t expectedResult, - bool testWithTableScan = true) { + int64_t expectedResult) { auto vectors = makeRowVector({values}); auto expected = makeRowVector({makeNullableFlatVector({expectedResult})}); @@ -63,7 +62,7 @@ class ApproxDistinctTest : public AggregationTestBase { void testGlobalAgg( const VectorPtr& values, int64_t expectedResult, - bool testWithTableScan = true) { + bool testApproxSet = true) { auto vectors = makeRowVector({values}); auto expected = makeRowVector({makeNullableFlatVector({expectedResult})}); @@ -78,8 +77,10 @@ class ApproxDistinctTest : public AggregationTestBase { {}, {expected}); - testAggregations( - {vectors}, {}, {"approx_set(c0)"}, {"cardinality(a0)"}, {expected}); + if (testApproxSet) { + testAggregations( + {vectors}, {}, {"approx_set(c0)"}, {"cardinality(a0)"}, {expected}); + } } template @@ -100,7 +101,8 @@ class ApproxDistinctTest : public AggregationTestBase { void testGroupByAgg( const VectorPtr& keys, const VectorPtr& values, - const std::unordered_map& expectedResults) { + const std::unordered_map& expectedResults, + bool testApproxSet = true) { auto vectors = makeRowVector({keys, values}); auto expected = toRowVector(expectedResults); @@ -114,12 +116,14 @@ class ApproxDistinctTest : public AggregationTestBase { {}, {expected}); - testAggregations( - {vectors}, - {"c0"}, - {"approx_set(c1)"}, - {"c0", "cardinality(a0)"}, - {expected}); + if (testApproxSet) { + testAggregations( + {vectors}, + {"c0"}, + {"approx_set(c1)"}, + {"c0", "cardinality(a0)"}, + {expected}); + } } }; @@ -170,7 +174,13 @@ TEST_F(ApproxDistinctTest, groupByHighCardinalityIntegers) { auto keys = makeFlatVector(size, [](auto row) { return row % 2; }); auto values = makeFlatVector(size, [](auto row) { return row; }); - testGroupByAgg(keys, values, {{0, 488}, {1, 493}}); + testGroupByAgg(keys, values, {{0, 488}, {1, 493}}, false); + testAggregations( + {makeRowVector({keys, values})}, + {"c0"}, + {"approx_set(c1)"}, + {"c0", "cardinality(a0)"}, + {toRowVector({{0, 500}, {1, 500}})}); } TEST_F(ApproxDistinctTest, groupByVeryLowCardinalityIntegers) { @@ -230,7 +240,13 @@ TEST_F(ApproxDistinctTest, globalAggHighCardinalityIntegers) { vector_size_t size = 1'000; auto values = makeFlatVector(size, [](auto row) { return row; }); - testGlobalAgg(values, 977); + testGlobalAgg(values, 977, false); + testAggregations( + {makeRowVector({values})}, + {}, + {"approx_set(c0)"}, + {"cardinality(a0)"}, + {makeRowVector({makeFlatVector(std::vector({997}))})}); } TEST_F(ApproxDistinctTest, globalAggVeryLowCardinalityIntegers) { @@ -244,7 +260,13 @@ TEST_F(ApproxDistinctTest, toIndexBitLength) { ASSERT_EQ( common::hll::toIndexBitLength(common::hll::kHighestMaxStandardError), 4); ASSERT_EQ( - common::hll::toIndexBitLength(common::hll::kDefaultStandardError), 11); + common::hll::toIndexBitLength( + common::hll::kDefaultApproxDistinctStandardError), + 11); + ASSERT_EQ( + common::hll::toIndexBitLength( + common::hll::kDefaultApproxSetStandardError), + 12); ASSERT_EQ( common::hll::toIndexBitLength(common::hll::kLowestMaxStandardError), 16); @@ -317,14 +339,16 @@ TEST_F(ApproxDistinctTest, globalAggAllNulls) { TEST_F(ApproxDistinctTest, hugeInt) { auto hugeIntValues = makeFlatVector(50000, [](auto row) { return row; }); - // Last param is set false to disable tablescan test - // as DWRF writer doesn't have hugeint support. - // Refer:https://github.com/facebookincubator/velox/issues/7775 testGlobalAgg(hugeIntValues, 49669, false); - testGlobalAgg( - hugeIntValues, common::hll::kLowestMaxStandardError, 50110, false); - testGlobalAgg( - hugeIntValues, common::hll::kHighestMaxStandardError, 41741, false); + testAggregations( + {makeRowVector({hugeIntValues})}, + {}, + {"approx_set(c0)"}, + {"cardinality(a0)"}, + {makeRowVector( + {makeFlatVector(std::vector({49958}))})}); + testGlobalAgg(hugeIntValues, common::hll::kLowestMaxStandardError, 50110); + testGlobalAgg(hugeIntValues, common::hll::kHighestMaxStandardError, 41741); } TEST_F(ApproxDistinctTest, streaming) { @@ -366,5 +390,22 @@ TEST_F(ApproxDistinctTest, memoryLeakInMerge) { toPlanStats(task->taskStats()).at(finalAgg).peakMemoryBytes, 180'000); } +TEST_F(ApproxDistinctTest, mergeWithEmpty) { + constexpr int kSize = 500; + auto input = makeRowVector({ + makeFlatVector(kSize, [](auto i) { return std::min(i, 1); }), + makeFlatVector( + kSize, folly::identity, [](auto i) { return i == 0; }), + }); + auto op = PlanBuilder() + .values({input}) + .singleAggregation({"c0"}, {"approx_set(c1)"}) + .project({"coalesce(a0, empty_approx_set())"}) + .singleAggregation({}, {"merge(p0)"}) + .project({"cardinality(a0)"}) + .planNode(); + ASSERT_EQ(readSingleValue(op).value(), 499); +} + } // namespace } // namespace facebook::velox::aggregate::test