Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43124: [C++] Initialize offset vector head as 0 after memory allocated in grouper.cc #43123

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

flashzxi
Copy link

@flashzxi flashzxi commented Jul 3, 2024

bug description

In grouper.cc:780, memory is allocated for offset vector of varlen column, however the vector is initialized in encoder_.DecodeFixedLengthBuffers, which will never be called when num_groups==0(see line 786). Then fixedlen_bufs[i][0] will be a uninitialized value that means a random uint32_t. Later this random uint32_t is used to AllocatePaddedBuffer(varlen_size). However an random uint32_t is up to 4GB memory, the program may run normally without being affected.

how to fix

set offset vector head as 0 after memory allocated in case it won't be initialized when num_groups==0

Copy link

github-actions bot commented Jul 3, 2024

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

In the case of PARQUET issues on JIRA the title also supports:

PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@mapleFU
Copy link
Member

mapleFU commented Jul 3, 2024

Thanks for the contribution! It would be better if having a test to reproduce the problem?

@flashzxi
Copy link
Author

flashzxi commented Jul 3, 2024

@mapleFU
test code

#include <gtest/gtest.h>
#include <climits>
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <arrow/ipc/writer.h>
#include <arrow/ipc/reader.h>
#include <arrow/api.h>
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/dictionary.h>
#include <arrow/result.h>
#include <arrow/compute/function.h>
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/function_internal.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/registry.h"
#include <arrow/acero/options.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/status.h>
#include <vector>
#include "arrow/type_fwd.h"
#include <chrono>
#include <unistd.h>

int main(int argc, char* argv[])
{
    testing::InitGoogleTest(&argc, argv);
    return RUN_ALL_TESTS();
}

inline void CHECK_OK(const arrow::Status status,const std::string& msg){
    if(!status.ok()) {                              
        std::cout<< "status not ok! msg: " << msg << std::endl;  
        exit(-1);
    } 
}

namespace arrow_test {
const int const_build_len = 1030;
const int const_match_keys = 11;
const int const_batch_size = 100000;
int basic_test_len = 400 * 1024;

const std::string long_prefix = "123456";
const std::string short_prefix = "short_prefix_";
const std::string key_prefix = long_prefix;

uint64_t milliseconds_now(){
    auto now = std::chrono::system_clock::now();
 
    // 将时间戳转换为毫秒数
    auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
    auto value = now_ms.time_since_epoch().count();
    return value;
}

class SourceReader : public arrow::RecordBatchReader {
private:
    std::shared_ptr<arrow::Schema> schema_;
    int64_t batch_size_;
    int64_t batch_count_ = 0;
    int64_t total_len_ = 0;
    
    std::string prefix_;
    int64_t offset_;
    bool with_null_;

public:
    std::string loooooong_prefix;
    SourceReader(const std::string& prefix, 
                std::vector<std::shared_ptr<arrow::DataType>>& types, 
                int64_t batch_size, 
                int total_len, 
                bool with_null = false,
                std::string prefix_binary = long_prefix)
        : batch_size_{batch_size}, 
          prefix_(prefix), 
          total_len_(total_len), 
          offset_(0),
          with_null_(with_null),
          loooooong_prefix(prefix_binary) {
            int i = 0;
            std::vector<std::shared_ptr<arrow::Field>> fields;
            for(std::shared_ptr<arrow::DataType> &type : types){
                std::string field_name = prefix + "_" + std::to_string(i);
                fields.emplace_back(arrow::field(field_name, type));
                ++i;
            }
            schema_ = std::make_shared<arrow::Schema>(fields);
        }

    std::shared_ptr<arrow::Schema> schema() const override { return schema_; }

    // 每次吐一个recordBatch, 流式执行
    arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override {
        // 模拟rocksdb iterator, 每扫描batch_size_行构建一个recordBatch, 流式执行,吐给下游执行
        if (total_len_ - offset_ <= 0) {
            out->reset();
            return arrow::Status::OK();
        }
        int64_t batch_size = std::min(batch_size_, total_len_ - offset_);
        auto status = build_row_batch(batch_size, out);
        CHECK_OK(status, "next batch: ");
        offset_ += batch_size;
        std::cout << prefix_ << ": " << offset_ << std::endl;
        return arrow::Status::OK();
    }

private:
    arrow::Status build_row_batch(int64_t batch_size, std::shared_ptr<arrow::RecordBatch>* out){
        arrow::Status status;
        std::vector<std::shared_ptr<arrow::Array>> arrays(schema_->num_fields());
        for(auto array: arrays){
            array = nullptr;
        }
 
        for(int i = 0 ; i < schema_->num_fields(); ++i){
            status = build_column(batch_size, arrays[i], i);
            CHECK_OK(status, "build_row_batch: ");
        }
        *out = arrow::RecordBatch::Make(schema_, batch_size, std::move(arrays));
        return arrow::Status::OK();
    }

    arrow::Status build_large_binary_key_col(
                std::unique_ptr<arrow::ArrayBuilder>& builder, 
                int64_t batch_size, 
                std::shared_ptr<arrow::Array>& output){
        auto type_builder = dynamic_cast<arrow::LargeBinaryBuilder*>((builder).get());
        for(size_t i = 0 ; i < batch_size ; ++i){
            arrow::Status status;
            if(with_null_ && i % 10 == 9){
                status = type_builder->AppendNull();
            }else{
                status = type_builder->Append(key_prefix + std::to_string(offset_ + i));
            }
            CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
        }
        auto status = builder->Finish(&output);
        return status;
    }

    arrow::Status build_binary_key_col(
                std::unique_ptr<arrow::ArrayBuilder>& builder, 
                int64_t batch_size, 
                std::shared_ptr<arrow::Array>& output){
        auto type_builder = dynamic_cast<arrow::BinaryBuilder*>((builder).get());
        for(size_t i = 0 ; i < batch_size ; ++i){
            arrow::Status status;
            if(with_null_ && i % 10 == 9){
                status = type_builder->AppendNull();
            }else{
                status = type_builder->Append(key_prefix + std::to_string(offset_ + i));
            }
            CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
        }
        auto status = builder->Finish(&output);
        return status;
    }


    template<typename T>
    arrow::Status build(std::unique_ptr<arrow::ArrayBuilder>& builder, int64_t batch_size, std::shared_ptr<arrow::Array>& output){
        auto type_builder = dynamic_cast<T*>(builder.get());
        for(size_t i = 0 ; i < batch_size ; ++i){
            auto status = type_builder->Append(static_cast<typename T::value_type>(offset_ + i));
            CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
        }
        auto status = builder->Finish(&output);
        return status;
    }

    arrow::Status build_column(int64_t batch_size, std::shared_ptr<arrow::Array>& array, int col_index);
};

template< >
arrow::Status SourceReader::build<arrow::BinaryBuilder>(
            std::unique_ptr<arrow::ArrayBuilder>& builder, 
            int64_t batch_size, 
            std::shared_ptr<arrow::Array>& output){
    auto type_builder = dynamic_cast<arrow::BinaryBuilder*>((builder).get());
    for(size_t i = 0 ; i < batch_size ; ++i){
        arrow::Status status;
        if(with_null_ && i % 10 == 9){
            status = type_builder->AppendNull();
        }else{
            status = type_builder->Append(loooooong_prefix + std::to_string(offset_ + i));
        }
    }
    auto status = builder->Finish(&output);
    return status;
}

template< >
arrow::Status SourceReader::build<arrow::LargeBinaryBuilder>(
            std::unique_ptr<arrow::ArrayBuilder>& builder, 
            int64_t batch_size, 
            std::shared_ptr<arrow::Array>& output){
    auto type_builder = dynamic_cast<arrow::LargeBinaryBuilder  *>((builder).get());
    for(size_t i = 0 ; i < batch_size ; ++i){
        arrow::Status status;
        if(with_null_ && i % 10 == 9){
            status = type_builder->AppendNull();
        }else{
            status = type_builder->Append(loooooong_prefix + std::to_string(offset_ + i));
        }
        CHECK_OK(status, "build_key_column: Append: row: " + std::to_string(offset_ + i));
    }
    auto status = builder->Finish(&output);
    return status;
}

arrow::Status SourceReader::build_column(int64_t batch_size, std::shared_ptr<arrow::Array>& array, int col_index) {
    auto type = schema_->field(col_index)->type();
    std::unique_ptr<arrow::ArrayBuilder> builder = nullptr;
    arrow::MakeBuilder(arrow::default_memory_pool(), type, &builder);

    if(col_index == 0){
        if(type->id() == arrow::Type::type::BINARY){
            return build_binary_key_col(builder, batch_size, array);
        }else if(type->id() == arrow::Type::type::LARGE_BINARY){
            return build_large_binary_key_col(builder, batch_size, array);
        }
    }
    
    auto typeId = type->id();
    switch(typeId){
        case arrow::Type::type::INT32:
            return build<arrow::Int32Builder>(builder, batch_size, array);
        case arrow::Type::type::UINT64:
            return build<arrow::UInt64Builder>(builder, batch_size, array);
        case arrow::Type::type::INT64:
            return build<arrow::Int64Builder>(builder, batch_size, array);
        case arrow::Type::type::FLOAT:
            return build<arrow::FloatBuilder>(builder, batch_size, array);
        case arrow::Type::type::DOUBLE:
            return build<arrow::DoubleBuilder>(builder, batch_size, array);
        case arrow::Type::type::BINARY:
            return build<arrow::BinaryBuilder>(builder, batch_size, array);
        case arrow::Type::type::LARGE_BINARY:
            return build<arrow::LargeBinaryBuilder>(builder, batch_size, array);
        default:
            return arrow::Status::Invalid("unsupport type");
    }
}

void run_agg_without_join_key_as_binary(int table_len, bool is_large_binary = false, bool check_result = true) {
    auto cup_e = arrow::internal::GetCpuThreadPool();

    // 三列,第一列是id,后两列为4kb字符串(> 4kb)
    std::vector<std::shared_ptr<arrow::DataType>> types{arrow::binary(), arrow::int64()};
    std::shared_ptr<SourceReader> reader = std::make_shared<SourceReader>("table", types, const_batch_size, table_len);
    
    arrow::acero::Declaration data_source{"record_batch_reader_source", arrow::acero::RecordBatchReaderSourceNodeOptions(reader)};

    std::vector<arrow::compute::Aggregate> aggregates;
    aggregates.emplace_back("hash_max" , /*options*/nullptr, arrow::FieldRef("table_1"), /*new field name*/"col1");
    // aggregates.emplace_back("hash_max" , /*options*/nullptr, arrow::FieldRef("table_2"), /*new field name*/"col2");

    std::vector<arrow::FieldRef> group_by_fields;
    group_by_fields.emplace_back(arrow::FieldRef("table_0"));

    arrow::acero::AggregateNodeOptions agg_options{aggregates, group_by_fields};

    arrow::acero::Declaration agg{"aggregate", {std::move(data_source)}, std::move(agg_options)};
    uint64_t start_time = milliseconds_now();
    auto result = arrow::acero::DeclarationToTable(std::move(agg), /*use_threads=*/true);
    uint64_t end_time = milliseconds_now();
    auto final_table = result.ValueOrDie();
}

TEST(test_arrow_vector_execute, group_basic_key_as_binary){
  // this loop is necessary to replay the bug
  for (int i = 0 ; i < 10; ++i) {
    run_agg_without_join_key_as_binary(/* table_len = */0);
  }
}
} 

varlen_size may be random uint32_t like this:
image

@mapleFU mapleFU changed the title Initialize offset vector head as 0 after memory allocated in grouper.cc GH-43124: [C++] Initialize offset vector head as 0 after memory allocated in grouper.cc Jul 3, 2024
Copy link

github-actions bot commented Jul 3, 2024

⚠️ GitHub issue #43124 has been automatically assigned in GitHub to PR creator.

@pitrou
Copy link
Member

pitrou commented Jul 10, 2024

@zanmato1984 Would you like to give this a look?

@zanmato1984
Copy link
Collaborator

Yes. I'll take a look.

Copy link
Collaborator

@zanmato1984 zanmato1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this. The issue is well stated and the fix is straight forward. Two things:

  1. I've suggested a refinement to the comment.
  2. Is it possible for you to write a small test to replicate the issue? If you have any trouble with that, I will be glad to help then.

EDIT: By "small test" I mean something to put into https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/row/grouper_test.cc

cpp/src/arrow/compute/row/grouper.cc Outdated Show resolved Hide resolved
@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Jul 10, 2024
@zanmato1984
Copy link
Collaborator

Hi @flashzxi , I think this is a good fix. I put some comments a while ago.

Are you still willing to move on with it? Or is there anything I can do to help? Thanks.

@zanmato1984
Copy link
Collaborator

Hi @flashzxi , I've created a PR in your fork flashzxi#1 which includes my test to reproduce the issue and verify the fix. Could you merge it? Thanks.

Or at some old friends @pitrou @assignUser , is there a way I can directly make changes to this PR in case the author doesn't respond? I am a committer now and have write access but still didn't figure out how to do that. Thanks.

@mapleFU
Copy link
Member

mapleFU commented Nov 1, 2024

bf2aff0

Didn't you updated this successfully?

@zanmato1984
Copy link
Collaborator

zanmato1984 commented Nov 1, 2024

Hi @flashzxi , I've created a PR in your fork flashzxi#1 which includes my test to reproduce the issue and verify the fix. Could you merge it? Thanks.

Or at some old friends @pitrou @assignUser , is there a way I can directly make changes to this PR in case the author doesn't respond? I am a committer now and have write access but still didn't figure out how to do that. Thanks.

Oops, just made it using push -f. Then @pitrou would you please help to review the test? Thanks!

bf2aff0

Didn't you updated this successfully?

That was my other suggestion made directly on the website. But never mind :)

@assignUser
Copy link
Member

Just for completeness sake: Yes committers can push to PRs from third parties as long as they have set the 'Maintainers are allowed to edit this pull request.' option (which is on by default).

Your process here with multiple pings before you took over the PR looks good 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants