Skip to content

Commit

Permalink
wip.1206.1
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Dec 7, 2023
1 parent 27684bf commit e36be5a
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 783 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class CHTransformerApi extends TransformerApi with Logging {
nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString)
}

val maxMemoryUsageKey = settingPrefix + "max_memory_usage";
if (!nativeConfMap.containsKey(maxMemoryUsageKey)) {
val maxMemoryUsageValue = offHeapSize
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString)
}

// Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash
val joinAlgorithmKey = settingPrefix + "join_algorithm";
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric])
object HashAggregateMetricsUpdater {
val INCLUDING_PROCESSORS = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"ScalableMergingAggregatedTransform")
"GraceMergingAggregatedTransform")
val CH_PLAN_NODE_NAME = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"ScalableMergingAggregatedTransform")
"GraceMergingAggregatedTransform")
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(2).metrics("outputRows").value === 600572)

assert(plans(1).metrics("inputRows").value === 591673)
assert(plans(1).metrics("resizeInputRows").value === 4)
assert(plans(1).metrics("resizeOutputRows").value === 4)
assert(plans(1).metrics("outputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)

Expand All @@ -93,8 +91,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans(2).metrics("filesSize").value === 17777735)

assert(plans(1).metrics("inputRows").value === 591673)
assert(plans(1).metrics("resizeInputRows").value === 4)
assert(plans(1).metrics("resizeOutputRows").value === 4)
assert(plans(1).metrics("outputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
.get(0)
.getProcessors
.get(0)
.getInputRows == 591677)
.getInputRows == 591673)

assert(
nativeMetricsData.metricsDataList
Expand Down Expand Up @@ -356,7 +356,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
.getSteps
.get(0)
.getName
.equals("ScalableMergingAggregated"))
.equals("GraceMergingAggregatedTransform"))
assert(
nativeMetricsDataFinal.metricsDataList.get(1).getSteps.get(1).getName.equals("Expression"))
assert(nativeMetricsDataFinal.metricsDataList.get(2).getName.equals("kProject"))
Expand Down
54 changes: 46 additions & 8 deletions cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ GraceMergingAggregatedStep::GraceMergingAggregatedStep(

void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &)
{
auto num_streams = pipeline.getNumStreams();
auto transform_params = std::make_shared<DB::AggregatingTransformParams>(pipeline.getHeader(), params, true);
pipeline.resize(1);
auto build_transform = [&](DB::OutputPortRawPtrs outputs)
Expand All @@ -72,6 +73,7 @@ void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pi
return new_processors;
};
pipeline.transform(build_transform);
pipeline.resize(num_streams, true);
}

void GraceMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const
Expand Down Expand Up @@ -225,6 +227,17 @@ void GraceMergingAggregatedTransform::extendBuckets()
void GraceMergingAggregatedTransform::rehashDataVariants()
{
auto blocks = params->aggregator.convertToBlocks(*current_data_variants, false, 1);

size_t block_rows = 0;
size_t block_memory_usage = 0;
for (const auto & block : blocks)
{
block_rows += block.rows();
block_memory_usage += block.allocatedBytes();
}
if (block_rows)
per_key_memory_usage = block_memory_usage * 1.0 / block_rows;

current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
no_more_keys = false;
for (auto & block : blocks)
Expand Down Expand Up @@ -389,23 +402,48 @@ void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block)

size_t GraceMergingAggregatedTransform::getMemoryUsage()
{
size_t current_memory_usage = 0;
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
return current_memory_usage;
return current_memory_usage < 0 ? 0 : current_memory_usage;
}

bool GraceMergingAggregatedTransform::isMemoryOverFlow()
{
auto mem_limit = context->getSettingsRef().max_bytes_before_external_group_by;
if (!mem_limit)
/// More greedy memory usage strategy.
if (!context->getSettingsRef().max_memory_usage)
return false;
auto current_memory_usage = getMemoryUsage();
if (current_memory_usage > mem_limit)
auto max_mem_used = context->getSettingsRef().max_memory_usage * 8 / 10;
auto current_result_rows = current_data_variants->size();
auto current_mem_used = getMemoryUsage();
if (per_key_memory_usage > 0)
{
LOG_INFO(logger, "memory is overflow. used: {}, limit: {}", ReadableSize(current_memory_usage), ReadableSize(mem_limit));
return true;
if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used)
{
LOG_INFO(
logger,
"Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}, buckets: {}",
ReadableSize(current_mem_used),
ReadableSize(max_mem_used),
ReadableSize(per_key_memory_usage),
current_result_rows,
getBucketsNum());
return true;
}
}
else
{
if (current_mem_used * 2 >= context->getSettingsRef().max_memory_usage)
{
LOG_INFO(
logger,
"Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}, buckets: {}",
ReadableSize(current_mem_used),
ReadableSize(context->getSettingsRef().max_memory_usage),
getBucketsNum());
return true;
}
}
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class GraceMergingAggregatedTransform : public DB::IProcessor
DB::BlocksList current_final_blocks;
bool no_more_keys = false;

double per_key_memory_usage = 0;

// metrics
size_t total_input_blocks = 0;
size_t total_input_rows = 0;
Expand Down
Loading

0 comments on commit e36be5a

Please sign in to comment.