Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-33484: [C++][Compute] Implement Grouper::Reset #41352

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 135 additions & 36 deletions cpp/src/arrow/acero/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,11 @@ void TestSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecSpan& batc
ASSERT_EQ(expected_segment, segment);
offset = segment.offset + segment.length;
}
// Assert next is the last (empty) segment.
ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset));
ASSERT_TRUE(segment.offset >= batch.length);
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_TRUE(segment.is_open);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should something be checked about segment.length here? Or is undetermined?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last segment.length is supposed to be 0. Updated.

ASSERT_TRUE(segment.extends);
}

Result<std::unique_ptr<Grouper>> MakeGrouper(const std::vector<TypeHolder>& key_types) {
Expand Down Expand Up @@ -682,48 +687,142 @@ TEST(RowSegmenter, Basics) {
}

TEST(RowSegmenter, NonOrdered) {
std::vector<TypeHolder> types = {int32()};
auto batch = ExecBatchFromJSON(types, "[[1], [1], [2], [1], [2]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
{
std::vector<TypeHolder> types = {int32()};
auto batch = ExecBatchFromJSON(types, "[[1], [1], [2], [1], [2]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
auto batch = ExecBatchFromJSON(types, "[[1, 1], [1, 1], [2, 2], [1, 2], [2, 2]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
}
}

TEST(RowSegmenter, EmptyBatches) {
std::vector<TypeHolder> types = {int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[2], [2]]"), ExecBatchFromJSON(types, "[]"),
};
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {});
TestSegments(segmenter, ExecSpan(batches[1]), {});
TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[3]), {});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[5]), {});
TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[7]), {});
{
std::vector<TypeHolder> types = {int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[2], [2]]"), ExecBatchFromJSON(types, "[]"),
};
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {});
TestSegments(segmenter, ExecSpan(batches[1]), {});
TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[3]), {});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[5]), {});
TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[7]), {});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1, 1]]"),
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[1, 1]]"),
ExecBatchFromJSON(types, "[]"),
ExecBatchFromJSON(types, "[[2, 2], [2, 2]]"),
ExecBatchFromJSON(types, "[]"),
};
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {});
TestSegments(segmenter, ExecSpan(batches[1]), {});
TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[3]), {});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[5]), {});
TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[7]), {});
}
}

TEST(RowSegmenter, MultipleSegments) {
std::vector<TypeHolder> types = {int32()};
auto batch = ExecBatchFromJSON(types, "[[1], [1], [2], [5], [3], [3], [5], [5], [4]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
{
std::vector<TypeHolder> types = {int32()};
auto batch =
ExecBatchFromJSON(types, "[[1], [1], [2], [5], [3], [3], [5], [5], [4]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
auto batch = ExecBatchFromJSON(
types,
"[[1, 1], [1, 1], [2, 2], [5, 5], [3, 3], [3, 3], [5, 5], [5, 5], [4, 4]]");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batch),
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
}
}

TEST(RowSegmenter, MultipleSegmentsMultipleBatches) {
{
std::vector<TypeHolder> types = {int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[[1], [2]]"),
ExecBatchFromJSON(types, "[[5], [3]]"),
ExecBatchFromJSON(types, "[[3], [5], [5]]"), ExecBatchFromJSON(types, "[[4]]")};

ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[1]),
{{0, 1, false, true}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[2]),
{{0, 1, false, false}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[3]),
{{0, 1, false, true}, {1, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, false}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
std::vector<ExecBatch> batches = {
ExecBatchFromJSON(types, "[[1, 1]]"),
ExecBatchFromJSON(types, "[[1, 1], [2, 2]]"),
ExecBatchFromJSON(types, "[[5, 5], [3, 3]]"),
ExecBatchFromJSON(types, "[[3, 3], [5, 5], [5, 5]]"),
ExecBatchFromJSON(types, "[[4, 4]]")};

ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types));
TestSegments(segmenter, ExecSpan(batches[0]), {{0, 1, true, true}});
TestSegments(segmenter, ExecSpan(batches[1]),
{{0, 1, false, true}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[2]),
{{0, 1, false, false}, {1, 1, true, false}});
TestSegments(segmenter, ExecSpan(batches[3]),
{{0, 1, false, true}, {1, 2, true, false}});
TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, false}});
}
}

namespace {
Expand Down
43 changes: 28 additions & 15 deletions cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,18 @@ struct SimpleKeySegmenter : public BaseRowSegmenter {
struct AnyKeysSegmenter : public BaseRowSegmenter {
static Result<std::unique_ptr<RowSegmenter>> Make(
const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
ARROW_RETURN_NOT_OK(Grouper::Make(key_types, ctx)); // check types
return std::make_unique<AnyKeysSegmenter>(key_types, ctx);
ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_types, ctx)); // check types
return std::make_unique<AnyKeysSegmenter>(key_types, ctx, std::move(grouper));
}

AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx)
AnyKeysSegmenter(const std::vector<TypeHolder>& key_types, ExecContext* ctx,
std::unique_ptr<Grouper> grouper)
: BaseRowSegmenter(key_types),
ctx_(ctx),
grouper_(nullptr),
grouper_(std::move(grouper)),
save_group_id_(kNoGroupId) {}

Status Reset() override {
grouper_ = nullptr;
ARROW_RETURN_NOT_OK(grouper_->Reset());
save_group_id_ = kNoGroupId;
return Status::OK();
}
Expand All @@ -245,7 +245,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
// first row of a new segment to see if it extends the previous segment.
template <typename Batch>
Result<group_id_t> MapGroupIdAt(const Batch& batch, int64_t offset) {
if (!grouper_) return kNoGroupId;
ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset,
/*length=*/1));
if (!datum.is_array()) {
Expand All @@ -264,9 +263,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
if (offset == batch.length) {
return MakeSegment(batch.length, offset, 0, kEmptyExtends);
}
// ARROW-18311: make Grouper support Reset()
// so it can be reset instead of recreated below
//
// the group id must be computed prior to resetting the grouper, since it is compared
// to save_group_id_, and after resetting the grouper produces incomparable group ids
ARROW_ASSIGN_OR_RAISE(auto group_id, MapGroupIdAt(batch, offset));
Expand All @@ -276,7 +272,7 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
return extends;
};
// resetting drops grouper's group-ids, freeing-up memory for the next segment
ARROW_ASSIGN_OR_RAISE(grouper_, Grouper::Make(key_types_, ctx_)); // TODO: reset it
ARROW_RETURN_NOT_OK(grouper_->Reset());
// GH-34475: cache the grouper-consume result across invocations of GetNextSegment
ARROW_ASSIGN_OR_RAISE(auto datum, grouper_->Consume(batch, offset));
if (datum.is_array()) {
Expand All @@ -299,7 +295,6 @@ struct AnyKeysSegmenter : public BaseRowSegmenter {
}

private:
ExecContext* const ctx_;
std::unique_ptr<Grouper> grouper_;
group_id_t save_group_id_;
};
Expand Down Expand Up @@ -354,6 +349,7 @@ struct GrouperNoKeysImpl : Grouper {
RETURN_NOT_OK(builder->Finish(&array));
return std::move(array);
}
Status Reset() override { return Status::OK(); }
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0));
return Datum(array);
Expand Down Expand Up @@ -419,6 +415,14 @@ struct GrouperImpl : public Grouper {
return std::move(impl);
}

Status Reset() override {
map_.clear();
offsets_.clear();
key_bytes_.clear();
num_groups_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about encoders_?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I saw in https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/row_encoder_internal.h, none of the encoders holds states that would change during the lifespan of the grouper.

return Status::OK();
}

Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_RETURN_NOT_OK(CheckAndCapLengthForConsume(batch.length, offset, &length));
if (offset != 0 || length != batch.length) {
Expand Down Expand Up @@ -595,7 +599,17 @@ struct GrouperFastImpl : public Grouper {
return std::move(impl);
}

~GrouperFastImpl() { map_.cleanup(); }
Status Reset() override {
rows_.Clean();
rows_minibatch_.Clean();
map_.cleanup();
RETURN_NOT_OK(map_.init(encode_ctx_.hardware_flags, ctx_->memory_pool()));
pitrou marked this conversation as resolved.
Show resolved Hide resolved
// TODO: It is now assumed that the dictionaries_ are identical to the first batch
// throughout the grouper's lifespan so no resetting is needed. But if we want to
// support different dictionaries for different batches, we need to reset the
// dictionaries_ here.
return Status::OK();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should enlarge the size of the temp_stack_? Or the stack's internal states like top_ may no enough for the next bunch of operations after grouper reset.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all the temp vectors allocated by the temp_stack_ would be released at the end the of the call to each individual grouper member function (IMO that's why they are designed to be "temp vector"). Meaning by the time of Reset, the stack is empty already.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, i see, thanks.
TempVectorStack will hold a resizable buffer allocated in TempVectorStack::Init function until the end of the grouper object.
But the TempVectorHolder will release the used space (already allocated in TempVectorStack ) at the call to each of individual grouper member function as you said.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a DCHECK that the temp_stack_ is empty, then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's probably reasonable. But considering current temp stack doesn't have a public method for size/empty check, and I have other open PRs for temp stack restructure, I'd add the necessary methods after other PRs are done and get this one rebased.

}

Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_RETURN_NOT_OK(CheckAndCapLengthForConsume(batch.length, offset, &length));
Expand Down Expand Up @@ -838,8 +852,7 @@ struct GrouperFastImpl : public Grouper {
return out;
}

static constexpr int log_minibatch_max_ = 10;
static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_;
static constexpr int minibatch_size_max_ = arrow::util::MiniBatch::kMiniBatchLength;
static constexpr int minibatch_size_min_ = 128;
int minibatch_size_;

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/compute/row/grouper.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class ARROW_EXPORT Grouper {
static Result<std::unique_ptr<Grouper>> Make(const std::vector<TypeHolder>& key_types,
ExecContext* ctx = default_exec_context());

/// Reset all intermediate state, make the grouper logically as just `Make`ed.
/// The underlying buffers, if any, may or may not be released though.
virtual Status Reset() = 0;

/// Consume a batch of keys, producing the corresponding group ids as an integer array,
/// over a slice defined by an offset and length, which defaults to the batch length.
/// Currently only uint32 indices will be produced, eventually the bit width will only
Expand Down
Loading