Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44010: [C++] Add arrow::RecordBatch::MakeStatisticsArray() #44252

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cpp/src/arrow/array/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <variant>

#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand All @@ -34,6 +35,22 @@ namespace arrow {
struct ARROW_EXPORT ArrayStatistics {
using ValueType = std::variant<bool, int64_t, uint64_t, double, std::string>;

static const std::shared_ptr<DataType>& ValueToArrowType(
const std::optional<ValueType>& value) {
if (!value.has_value()) {
return null();
}

struct Visitor {
const std::shared_ptr<DataType>& operator()(const bool&) { return boolean(); }
const std::shared_ptr<DataType>& operator()(const int64_t&) { return int64(); }
const std::shared_ptr<DataType>& operator()(const uint64_t&) { return uint64(); }
const std::shared_ptr<DataType>& operator()(const double&) { return float64(); }
const std::shared_ptr<DataType>& operator()(const std::string&) { return utf8(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may forgot a bit but we don't distinct "bytes" and "utf8" in stats?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we didn't discuss it...
Let's discuss it in #44579.

We can assume "utf8" here for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a // TODO(GH-44579) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I should have added it...
I've added it.

} visitor;
return std::visit(visitor, value.value());
}

/// \brief The number of null values, may not be set
std::optional<int64_t> null_count = std::nullopt;

Expand All @@ -43,12 +60,16 @@ struct ARROW_EXPORT ArrayStatistics {
/// \brief The minimum value, may not be set
std::optional<ValueType> min = std::nullopt;

const std::shared_ptr<DataType>& MinArrowType() { return ValueToArrowType(min); }

/// \brief Whether the minimum value is exact or not
bool is_min_exact = false;

/// \brief The maximum value, may not be set
std::optional<ValueType> max = std::nullopt;

const std::shared_ptr<DataType>& MaxArrowType() { return ValueToArrowType(max); }

/// \brief Whether the maximum value is exact or not
bool is_max_exact = false;

Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ struct ArrowArray {
void* private_data;
};

# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT "ARROW:average_byte_width:exact"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know constexpr std::string_view is better or this is better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use constexpr because this header may be used by C programs.

# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE \
"ARROW:average_byte_width:approximate"
# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT "ARROW:distinct_count:exact"
# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE \
"ARROW:distinct_count:approximate"
# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT "ARROW:max_byte_width:exact"
# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE \
"ARROW:max_byte_width:approximate"
# define ARROW_STATISTICS_KEY_MAX_VALUE_EXACT "ARROW:max_value:exact"
# define ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE "ARROW:max_value:approximate"
# define ARROW_STATISTICS_KEY_MIN_VALUE_EXACT "ARROW:min_value:exact"
# define ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE "ARROW:min_value:approximate"
# define ARROW_STATISTICS_KEY_NULL_COUNT_EXACT "ARROW:null_count:exact"
# define ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE "ARROW:null_count:approximate"
# define ARROW_STATISTICS_KEY_ROW_COUNT_EXACT "ARROW:row_count:exact"
# define ARROW_STATISTICS_KEY_ROW_COUNT_APPROXIMATE "ARROW:row_count:approximate"

#endif // ARROW_C_DATA_INTERFACE

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
Expand Down
219 changes: 219 additions & 0 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
#include <utility>

#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_union.h"
#include "arrow/array/concatenate.h"
#include "arrow/array/validate.h"
#include "arrow/c/abi.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -465,6 +470,220 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::ViewOrCopyTo(
return Make(schema_, num_rows(), std::move(copied_columns));
}

namespace {
struct EnumeratedStatistics {
int nth_statistics = 0;
bool start_new_column = false;
std::optional<int32_t> nth_column = std::nullopt;
const char* key = nullptr;
std::shared_ptr<DataType> type = nullptr;
ArrayStatistics::ValueType value = false;
};
using OnStatistics =
std::function<Status(const EnumeratedStatistics& enumerated_statistics)>;
Status EnumerateStatistics(const RecordBatch& record_batch, OnStatistics on_statistics) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually this is for a two-phase building, one pass for types, and one-pass for data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

I think that it's one of complexities.
So I sent https://lists.apache.org/thread/0c9jftkspvj7yw1lpo73s3vtp6vfjqv8 to the mailing list. But nobody agreed it. So this complexity will be acceptable...

EnumeratedStatistics statistics;
statistics.nth_statistics = 0;
statistics.start_new_column = true;
statistics.nth_column = std::nullopt;

statistics.key = ARROW_STATISTICS_KEY_ROW_COUNT_EXACT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So RowCount is also handled as a stats 🤔?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statistics array will be passed to consumer before consumer receives a record batch.
So this may be useful for consumer.

But DuckDB doesn't have row count in its BaseStatistics...: https://github.com/duckdb/duckdb/blob/670cd341249e266de384e0341f200f4864b41b27/src/include/duckdb/storage/statistics/base_statistics.hpp#L38-L146
This may not be useful...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this for now to demonstrate table/record batch level statistics.

statistics.type = int64();
statistics.value = record_batch.num_rows();
RETURN_NOT_OK(on_statistics(statistics));
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
statistics.start_new_column = false;

const auto num_fields = record_batch.schema()->num_fields();
for (int nth_column = 0; nth_column < num_fields; ++nth_column) {
auto column_statistics = record_batch.column(nth_column)->statistics();
if (!column_statistics) {
continue;
}

statistics.start_new_column = true;
statistics.nth_column = nth_column;
if (column_statistics->null_count.has_value()) {
statistics.nth_statistics++;
statistics.key = ARROW_STATISTICS_KEY_NULL_COUNT_EXACT;
statistics.type = int64();
statistics.value = column_statistics->null_count.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}

if (column_statistics->distinct_count.has_value()) {
statistics.nth_statistics++;
statistics.key = ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT;
statistics.type = int64();
statistics.value = column_statistics->distinct_count.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}

if (column_statistics->min.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_min_exact) {
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE;
}
statistics.type = column_statistics->MinArrowType();
statistics.value = column_statistics->min.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}

if (column_statistics->max.has_value()) {
statistics.nth_statistics++;
if (column_statistics->is_max_exact) {
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_EXACT;
} else {
statistics.key = ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE;
}
statistics.type = column_statistics->MaxArrowType();
statistics.value = column_statistics->max.value();
RETURN_NOT_OK(on_statistics(statistics));
statistics.start_new_column = false;
}
}
return Status::OK();
}
} // namespace

Result<std::shared_ptr<Array>> RecordBatch::MakeStatisticsArray(
MemoryPool* memory_pool) const {
// Statistics schema:
// struct<
// column: int32,
// statistics: map<
// key: dictionary<
// indices: int32,
// dictionary: utf8,
// >,
// items: dense_union<...all needed types...>,
// >
// >

// Statistics schema doesn't define static dense union type for
// values. Each statistics schema have a dense union type that has
// needled value types. The following block collects these types.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually this is logically a "set" prepared for items?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

If there are the same types, the first type is only used.

std::vector<std::shared_ptr<Field>> values_types;
std::vector<int8_t> values_type_indexes;
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) {
int8_t i = 0;
for (const auto& field : values_types) {
if (field->type()->id() == statistics.type->id()) {
break;
}
i++;
}
if (i == static_cast<int8_t>(values_types.size())) {
values_types.push_back(field(statistics.type->name(), statistics.type));
}
values_type_indexes.push_back(i);
return Status::OK();
}));

// statistics.key: dictionary<indices: int32, dictionary: utf8>
auto keys_type = dictionary(int32(), utf8(), false);
// statistics.items: dense_union<...all needed types...>
auto values_type = dense_union(values_types);
// struct<
// column: int32,
// statistics: map<
// key: dictionary<
// indices: int32,
// dictionary: utf8,
// >,
// items: dense_union<...all needed types...>,
// >
// >
auto statistics_type =
struct_({field("column", int32()),
field("statistics", map(keys_type, values_type, false))});

std::vector<std::shared_ptr<ArrayBuilder>> field_builders;
// columns: int32
auto columns_builder = std::make_shared<Int32Builder>(memory_pool);
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(columns_builder));
// statistics.key: dictionary<indices: int32, dictionary: utf8>
auto keys_builder = std::make_shared<StringDictionary32Builder>();
// statistics.items: dense_union<...all needed types...>
std::vector<std::shared_ptr<ArrayBuilder>> values_builders;
for (const auto& values_type : values_types) {
std::unique_ptr<ArrayBuilder> values_builder;
RETURN_NOT_OK(MakeBuilder(memory_pool, values_type->type(), &values_builder));
values_builders.push_back(std::shared_ptr<ArrayBuilder>(std::move(values_builder)));
}
auto items_builder = std::make_shared<DenseUnionBuilder>(
memory_pool, std::move(values_builders), values_type);
// statistics:
// map<
// key: dictionary<
// indices: int32,
// dictionary: utf8,
// >,
// items: dense_union<...all needed types...>,
// >
auto values_builder = std::make_shared<MapBuilder>(
memory_pool, std::static_pointer_cast<ArrayBuilder>(keys_builder),
std::static_pointer_cast<ArrayBuilder>(items_builder));
field_builders.push_back(std::static_pointer_cast<ArrayBuilder>(values_builder));
// struct<
// column: int32,
// statistics: map<
// key: dictionary<
// indices: int32,
// dictionary: utf8,
// >,
// items: dense_union<...all needed types...>,
// >
// >
StructBuilder builder(statistics_type, memory_pool, std::move(field_builders));

// Append statistics.
RETURN_NOT_OK(EnumerateStatistics(*this, [&](const EnumeratedStatistics& statistics) {
if (statistics.start_new_column) {
RETURN_NOT_OK(builder.Append());
if (statistics.nth_column.has_value()) {
RETURN_NOT_OK(columns_builder->Append(statistics.nth_column.value()));
} else {
RETURN_NOT_OK(columns_builder->AppendNull());
}
RETURN_NOT_OK(values_builder->Append());
}
RETURN_NOT_OK(keys_builder->Append(statistics.key,
static_cast<int32_t>(strlen(statistics.key))));
const auto values_type_index = values_type_indexes[statistics.nth_statistics];
RETURN_NOT_OK(items_builder->Append(values_type_index));
struct Visitor {
ArrayBuilder* builder;

Status operator()(const bool& value) {
return static_cast<BooleanBuilder*>(builder)->Append(value);
}
Status operator()(const int64_t& value) {
return static_cast<Int64Builder*>(builder)->Append(value);
}
Status operator()(const uint64_t& value) {
return static_cast<UInt64Builder*>(builder)->Append(value);
}
Status operator()(const double& value) {
return static_cast<DoubleBuilder*>(builder)->Append(value);
}
Status operator()(const std::string& value) {
return static_cast<StringBuilder*>(builder)->Append(
value.data(), static_cast<int32_t>(value.size()));
}
} visitor;
visitor.builder = values_builders[values_type_index].get();
RETURN_NOT_OK(std::visit(visitor, statistics.value));
return Status::OK();
}));

return builder.Finish();
}

Status RecordBatch::Validate() const {
return ValidateBatch(*this, /*full_validation=*/false);
}
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ class ARROW_EXPORT RecordBatch {

virtual DeviceAllocationType device_type() const = 0;

/// \brief Create a statistics array of this record batch
///
/// The created array follows the C data interface statistics
/// specification. See
/// https://arrow.apache.org/docs/format/CDataInterfaceStatistics.html
/// for details.
///
/// \param[in] pool the memory pool to allocate memory from
/// \return the statistics array of this record batch
Result<std::shared_ptr<Array>> MakeStatisticsArray(
MemoryPool* pool = default_memory_pool()) const;

protected:
RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);

Expand Down
Loading
Loading