-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-43124: [C++] Initialize offset vector head as 0 after memory allocated in grouper.cc #43123
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format?
or
In the case of PARQUET issues on JIRA the title also supports:
See also: |
Thanks for the contribution! It would be better if having a test to reproduce the problem? |
@mapleFU #include <gtest/gtest.h>
#include <climits>
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <arrow/ipc/writer.h>
#include <arrow/ipc/reader.h>
#include <arrow/api.h>
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/dictionary.h>
#include <arrow/result.h>
#include <arrow/compute/function.h>
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/function_internal.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/registry.h"
#include <arrow/acero/options.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/status.h>
#include <vector>
#include "arrow/type_fwd.h"
#include <chrono>
#include <unistd.h>
int main(int argc, char* argv[])
{
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
inline void CHECK_OK(const arrow::Status status,const std::string& msg){
if(!status.ok()) {
std::cout<< "status not ok! msg: " << msg << std::endl;
exit(-1);
}
}
namespace arrow_test {
const int const_build_len = 1030;
const int const_match_keys = 11;
const int const_batch_size = 100000;
int basic_test_len = 400 * 1024;
const std::string long_prefix = "123456";
const std::string short_prefix = "short_prefix_";
const std::string key_prefix = long_prefix;
uint64_t milliseconds_now(){
auto now = std::chrono::system_clock::now();
// 将时间戳转换为毫秒数
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
auto value = now_ms.time_since_epoch().count();
return value;
}
class SourceReader : public arrow::RecordBatchReader {
private:
std::shared_ptr<arrow::Schema> schema_;
int64_t batch_size_;
int64_t batch_count_ = 0;
int64_t total_len_ = 0;
std::string prefix_;
int64_t offset_;
bool with_null_;
public:
std::string loooooong_prefix;
SourceReader(const std::string& prefix,
std::vector<std::shared_ptr<arrow::DataType>>& types,
int64_t batch_size,
int total_len,
bool with_null = false,
std::string prefix_binary = long_prefix)
: batch_size_{batch_size},
prefix_(prefix),
total_len_(total_len),
offset_(0),
with_null_(with_null),
loooooong_prefix(prefix_binary) {
int i = 0;
std::vector<std::shared_ptr<arrow::Field>> fields;
for(std::shared_ptr<arrow::DataType> &type : types){
std::string field_name = prefix + "_" + std::to_string(i);
fields.emplace_back(arrow::field(field_name, type));
++i;
}
schema_ = std::make_shared<arrow::Schema>(fields);
}
std::shared_ptr<arrow::Schema> schema() const override { return schema_; }
// 每次吐一个recordBatch, 流式执行
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override {
// 模拟rocksdb iterator, 每扫描batch_size_行构建一个recordBatch, 流式执行,吐给下游执行
if (total_len_ - offset_ <= 0) {
out->reset();
return arrow::Status::OK();
}
int64_t batch_size = std::min(batch_size_, total_len_ - offset_);
auto status = build_row_batch(batch_size, out);
CHECK_OK(status, "next batch: ");
offset_ += batch_size;
std::cout << prefix_ << ": " << offset_ << std::endl;
return arrow::Status::OK();
}
private:
arrow::Status build_row_batch(int64_t batch_size, std::shared_ptr<arrow::RecordBatch>* out){
arrow::Status status;
std::vector<std::shared_ptr<arrow::Array>> arrays(schema_->num_fields());
for(auto array: arrays){
array = nullptr;
}
for(int i = 0 ; i < schema_->num_fields(); ++i){
status = build_column(batch_size, arrays[i], i);
CHECK_OK(status, "build_row_batch: ");
}
*out = arrow::RecordBatch::Make(schema_, batch_size, std::move(arrays));
return arrow::Status::OK();
}
arrow::Status build_large_binary_key_col(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::LargeBinaryBuilder*>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(key_prefix + std::to_string(offset_ + i));
}
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
arrow::Status build_binary_key_col(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::BinaryBuilder*>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(key_prefix + std::to_string(offset_ + i));
}
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
template<typename T>
arrow::Status build(std::unique_ptr<arrow::ArrayBuilder>& builder, int64_t batch_size, std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<T*>(builder.get());
for(size_t i = 0 ; i < batch_size ; ++i){
auto status = type_builder->Append(static_cast<typename T::value_type>(offset_ + i));
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
arrow::Status build_column(int64_t batch_size, std::shared_ptr<arrow::Array>& array, int col_index);
};
template< >
arrow::Status SourceReader::build<arrow::BinaryBuilder>(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::BinaryBuilder*>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(loooooong_prefix + std::to_string(offset_ + i));
}
}
auto status = builder->Finish(&output);
return status;
}
template< >
arrow::Status SourceReader::build<arrow::LargeBinaryBuilder>(
std::unique_ptr<arrow::ArrayBuilder>& builder,
int64_t batch_size,
std::shared_ptr<arrow::Array>& output){
auto type_builder = dynamic_cast<arrow::LargeBinaryBuilder *>((builder).get());
for(size_t i = 0 ; i < batch_size ; ++i){
arrow::Status status;
if(with_null_ && i % 10 == 9){
status = type_builder->AppendNull();
}else{
status = type_builder->Append(loooooong_prefix + std::to_string(offset_ + i));
}
CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
}
auto status = builder->Finish(&output);
return status;
}
arrow::Status SourceReader::build_column(int64_t batch_size, std::shared_ptr<arrow::Array>& array, int col_index) {
auto type = schema_->field(col_index)->type();
std::unique_ptr<arrow::ArrayBuilder> builder = nullptr;
arrow::MakeBuilder(arrow::default_memory_pool(), type, &builder);
if(col_index == 0){
if(type->id() == arrow::Type::type::BINARY){
return build_binary_key_col(builder, batch_size, array);
}else if(type->id() == arrow::Type::type::LARGE_BINARY){
return build_large_binary_key_col(builder, batch_size, array);
}
}
auto typeId = type->id();
switch(typeId){
case arrow::Type::type::INT32:
return build<arrow::Int32Builder>(builder, batch_size, array);
case arrow::Type::type::UINT64:
return build<arrow::UInt64Builder>(builder, batch_size, array);
case arrow::Type::type::INT64:
return build<arrow::Int64Builder>(builder, batch_size, array);
case arrow::Type::type::FLOAT:
return build<arrow::FloatBuilder>(builder, batch_size, array);
case arrow::Type::type::DOUBLE:
return build<arrow::DoubleBuilder>(builder, batch_size, array);
case arrow::Type::type::BINARY:
return build<arrow::BinaryBuilder>(builder, batch_size, array);
case arrow::Type::type::LARGE_BINARY:
return build<arrow::LargeBinaryBuilder>(builder, batch_size, array);
default:
return arrow::Status::Invalid("unsupport type");
}
}
void run_agg_without_join_key_as_binary(int table_len, bool is_large_binary = false, bool check_result = true) {
auto cup_e = arrow::internal::GetCpuThreadPool();
// 三列,第一列是id,后两列为4kb字符串(> 4kb)
std::vector<std::shared_ptr<arrow::DataType>> types{arrow::binary(), arrow::int64()};
std::shared_ptr<SourceReader> reader = std::make_shared<SourceReader>("table", types, const_batch_size, table_len);
arrow::acero::Declaration data_source{"record_batch_reader_source", arrow::acero::RecordBatchReaderSourceNodeOptions(reader)};
std::vector<arrow::compute::Aggregate> aggregates;
aggregates.emplace_back("hash_max" , /*options*/nullptr, arrow::FieldRef("table_1"), /*new field name*/"col1");
// aggregates.emplace_back("hash_max" , /*options*/nullptr, arrow::FieldRef("table_2"), /*new field name*/"col2");
std::vector<arrow::FieldRef> group_by_fields;
group_by_fields.emplace_back(arrow::FieldRef("table_0"));
arrow::acero::AggregateNodeOptions agg_options{aggregates, group_by_fields};
arrow::acero::Declaration agg{"aggregate", {std::move(data_source)}, std::move(agg_options)};
uint64_t start_time = milliseconds_now();
auto result = arrow::acero::DeclarationToTable(std::move(agg), /*use_threads=*/true);
uint64_t end_time = milliseconds_now();
auto final_table = result.ValueOrDie();
}
TEST(test_arrow_vector_execute, group_basic_key_as_binary){
// this loop is necessary to replay the bug
for (int i = 0 ; i < 10; ++i) {
run_agg_without_join_key_as_binary(/* table_len = */0);
}
}
} |
|
@zanmato1984 Would you like to give this a look? |
Yes. I'll take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this. The issue is well stated and the fix is straight forward. Two things:
- I've suggested a refinement to the comment.
- Is it possible for you to write a small test to replicate the issue? If you have any trouble with that, I will be glad to help then.
EDIT: By "small test" I mean something to put into https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/row/grouper_test.cc
Hi @flashzxi , I think this is a good fix. I put some comments a while ago. Are you still willing to move on with it? Or is there anything I can do to help? Thanks. |
Hi @flashzxi , I've created a PR in your fork flashzxi#1 which includes my test to reproduce the issue and verify the fix. Could you merge it? Thanks. Or at some old friends @pitrou @assignUser , is there a way I can directly make changes to this PR in case the author doesn't respond? I am a committer now and have write access but still didn't figure out how to do that. Thanks. |
Didn't you updated this successfully? |
Oops, just made it using
That was my other suggestion made directly on the website. But never mind :) |
Just for completeness sake: Yes committers can push to PRs from third parties as long as they have set the 'Maintainers are allowed to edit this pull request.' option (which is on by default). Your process here with multiple pings before you took over the PR looks good 👍 |
bug description
In grouper.cc:780, memory is allocated for offset vector of varlen column, however the vector is initialized in
encoder_.DecodeFixedLengthBuffers
, which will never be called whennum_groups==0
(see line 786). Thenfixedlen_bufs[i][0]
will be a uninitialized value that means a random uint32_t. Later this random uint32_t is used toAllocatePaddedBuffer(varlen_size)
. However an random uint32_t is up to 4GB memory, the program may run normally without being affected.how to fix
set offset vector head as 0 after memory allocated in case it won't be initialized when
num_groups==0