From e36be5ab8b870419d7683208e06c30183cf971de Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 6 Dec 2023 15:14:56 +0800 Subject: [PATCH] wip.1206.1 --- .../clickhouse/CHTransformerApi.scala | 6 + .../metrics/HashAggregateMetricsUpdater.scala | 6 +- ...seTPCHColumnarShuffleParquetAQESuite.scala | 4 - .../GlutenClickHouseTPCHMetricsSuite.scala | 4 +- .../Operator/GraceMergingAggregatedStep.cpp | 54 +- .../Operator/GraceMergingAggregatedStep.h | 2 + .../ScalableMergingAggregatedStep.cpp | 468 ------------------ .../Operator/ScalableMergingAggregatedStep.h | 142 ------ .../Operator/StreamingAggregatingStep.cpp | 220 ++++---- .../Operator/StreamingAggregatingStep.h | 22 +- .../Parser/AggregateRelParser.cpp | 114 +++-- .../Shuffle/CachedShuffleWriter.cpp | 1 - 12 files changed, 260 insertions(+), 783 deletions(-) delete mode 100644 cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp delete mode 100644 cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala index 17c8fa11add6..4aa1875bc79b 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala @@ -96,6 +96,12 @@ class CHTransformerApi extends TransformerApi with Logging { nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString) } + val maxMemoryUsageKey = settingPrefix + "max_memory_usage"; + if (!nativeConfMap.containsKey(maxMemoryUsageKey)) { + val maxMemoryUsageValue = offHeapSize + nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString) + } + // Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash val joinAlgorithmKey = settingPrefix + "join_algorithm"; if ( diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala index 7a04f3861a1f..2a6015a280c3 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala @@ -96,10 +96,12 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric]) object HashAggregateMetricsUpdater { val INCLUDING_PROCESSORS = Array( "AggregatingTransform", + "StreamingAggregatingTransform", "MergingAggregatedTransform", - "ScalableMergingAggregatedTransform") + "GraceMergingAggregatedTransform") val CH_PLAN_NODE_NAME = Array( "AggregatingTransform", + "StreamingAggregatingTransform", "MergingAggregatedTransform", - "ScalableMergingAggregatedTransform") + "GraceMergingAggregatedTransform") } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 0814c3c8c7d7..51b685f0fdf7 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -66,8 +66,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(plans(2).metrics("outputRows").value === 600572) assert(plans(1).metrics("inputRows").value === 591673) - assert(plans(1).metrics("resizeInputRows").value === 4) - assert(plans(1).metrics("resizeOutputRows").value === 4) assert(plans(1).metrics("outputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) @@ -93,8 +91,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(plans(2).metrics("filesSize").value === 17777735) assert(plans(1).metrics("inputRows").value === 591673) - assert(plans(1).metrics("resizeInputRows").value === 4) - assert(plans(1).metrics("resizeOutputRows").value === 4) assert(plans(1).metrics("outputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index 4229c38b8bb5..153ed4f2ede0 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -311,7 +311,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite .get(0) .getProcessors .get(0) - .getInputRows == 591677) + .getInputRows == 591673) assert( nativeMetricsData.metricsDataList @@ -356,7 +356,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite .getSteps .get(0) .getName - .equals("ScalableMergingAggregated")) + .equals("GraceMergingAggregatedTransform")) assert( nativeMetricsDataFinal.metricsDataList.get(1).getSteps.get(1).getName.equals("Expression")) assert(nativeMetricsDataFinal.metricsDataList.get(2).getName.equals("kProject")) diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp index 60c06e13da81..203c546b3de1 100644 --- a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp @@ -58,6 +58,7 @@ GraceMergingAggregatedStep::GraceMergingAggregatedStep( void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) { + auto num_streams = pipeline.getNumStreams(); auto transform_params = std::make_shared(pipeline.getHeader(), params, true); pipeline.resize(1); auto build_transform = [&](DB::OutputPortRawPtrs outputs) @@ -72,6 +73,7 @@ void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pi return new_processors; }; pipeline.transform(build_transform); + pipeline.resize(num_streams, true); } void GraceMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const @@ -225,6 +227,17 @@ void GraceMergingAggregatedTransform::extendBuckets() void GraceMergingAggregatedTransform::rehashDataVariants() { auto blocks = params->aggregator.convertToBlocks(*current_data_variants, false, 1); + + size_t block_rows = 0; + size_t block_memory_usage = 0; + for (const auto & block : blocks) + { + block_rows += block.rows(); + block_memory_usage += block.allocatedBytes(); + } + if (block_rows) + per_key_memory_usage = block_memory_usage * 1.0 / block_rows; + current_data_variants = std::make_shared(); no_more_keys = false; for (auto & block : blocks) @@ -389,23 +402,48 @@ void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block) size_t GraceMergingAggregatedTransform::getMemoryUsage() { - size_t current_memory_usage = 0; + Int64 current_memory_usage = 0; if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) if (auto * memory_tracker = memory_tracker_child->getParent()) current_memory_usage = memory_tracker->get(); - return current_memory_usage; + return current_memory_usage < 0 ? 0 : current_memory_usage; } bool GraceMergingAggregatedTransform::isMemoryOverFlow() { - auto mem_limit = context->getSettingsRef().max_bytes_before_external_group_by; - if (!mem_limit) + /// More greedy memory usage strategy. + if (!context->getSettingsRef().max_memory_usage) return false; - auto current_memory_usage = getMemoryUsage(); - if (current_memory_usage > mem_limit) + auto max_mem_used = context->getSettingsRef().max_memory_usage * 8 / 10; + auto current_result_rows = current_data_variants->size(); + auto current_mem_used = getMemoryUsage(); + if (per_key_memory_usage > 0) { - LOG_INFO(logger, "memory is overflow. used: {}, limit: {}", ReadableSize(current_memory_usage), ReadableSize(mem_limit)); - return true; + if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) + { + LOG_INFO( + logger, + "Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}, buckets: {}", + ReadableSize(current_mem_used), + ReadableSize(max_mem_used), + ReadableSize(per_key_memory_usage), + current_result_rows, + getBucketsNum()); + return true; + } + } + else + { + if (current_mem_used * 2 >= context->getSettingsRef().max_memory_usage) + { + LOG_INFO( + logger, + "Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}, buckets: {}", + ReadableSize(current_mem_used), + ReadableSize(context->getSettingsRef().max_memory_usage), + getBucketsNum()); + return true; + } } return false; } diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h index 71619476ce4d..1746d37c603b 100644 --- a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h @@ -97,6 +97,8 @@ class GraceMergingAggregatedTransform : public DB::IProcessor DB::BlocksList current_final_blocks; bool no_more_keys = false; + double per_key_memory_usage = 0; + // metrics size_t total_input_blocks = 0; size_t total_input_rows = 0; diff --git a/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp deleted file mode 100644 index a2fabfaaf05f..000000000000 --- a/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp +++ /dev/null @@ -1,468 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} -} - -namespace local_engine -{ -static DB::ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number) -{ - return DB::ITransformingStep::Traits - { - { - .returns_single_stream = should_produce_results_in_order_of_bucket_number, - .preserves_number_of_streams = false, - .preserves_sorting = false, - }, - { - .preserves_number_of_rows = false, - } - }; -} - -ScalableMergingAggregatedStep::ScalableMergingAggregatedStep( - const DB::ContextPtr context_, - const DB::DataStream & input_stream_, - DB::Aggregator::Params params_, - bool should_produce_results_in_order_of_bucket_number_) - : DB::ITransformingStep( - input_stream_, params_.getHeader(input_stream_.header, true), getTraits(should_produce_results_in_order_of_bucket_number_)) - , context(context_) - , params(std::move(params_)) -{ -} - -void ScalableMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) -{ - auto transform_params = std::make_shared(pipeline.getHeader(), params, true); - pipeline.resize(1); - auto build_transform = [&](DB::OutputPortRawPtrs outputs) - { - DB::Processors new_processors; - for (auto & output : outputs) - { - auto op = std::make_shared(pipeline.getHeader(), transform_params, context); - new_processors.push_back(op); - DB::connect(*output, op->getInputs().front()); - } - return new_processors; - }; - pipeline.transform(build_transform); -} - -void ScalableMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const -{ - return params.explain(settings.out, settings.offset); -} - -void ScalableMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const -{ - params.explain(map); -} - -void ScalableMergingAggregatedStep::updateOutputStream() -{ - output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, true), getDataStreamTraits()); -} - -ScalableMergingAggregatedTransform::ScalableMergingAggregatedTransform( - const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_) - : DB::IProcessor({header_}, {params_->getHeader()}) - , header(header_) - , params(params_) - , context(context_) - , tmp_data_disk(std::make_unique(context_->getTempDataOnDisk())) -{ - buckets_data_variants.resize(max_bucket_number + 1, nullptr); - bucket_tmp_files.resize(max_bucket_number + 1, nullptr); -} - -ScalableMergingAggregatedTransform::~ScalableMergingAggregatedTransform() -{ - LOG_INFO( - logger, - "time cost. merge_blocks_time_ms: {}, spill_time_ms: {}, load_time_ms: {}, convert_to_blocks_time_ms: {}", - merge_blocks_time_ms, - spill_time_ms, - load_time_ms, - convert_to_blocks_time_ms); -} - -bool ScalableMergingAggregatedTransform::isMemoryOverFlow() -{ - UInt64 current_memory_usage = getMemoryUsage(); - if (params->params.max_bytes_before_external_group_by && current_memory_usage > params->params.max_bytes_before_external_group_by) - { - LOG_INFO( - logger, - "Memory is overflow. current_memory_usage: {}, max_bytes_before_external_group_by: {}", - ReadableSize(current_memory_usage), - ReadableSize(params->params.max_bytes_before_external_group_by)); - return true; - } - return false; -} - -void ScalableMergingAggregatedTransform::swithMode() -{ - if (params->params.keys.empty()) - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot switch mode for aggregation without keys"); - has_two_level = true; - mode = OVERFLOW; -} - -size_t ScalableMergingAggregatedTransform::spillBucketDataToDisk(Int32 bucket, DB::Block block) -{ - if (!block.rows()) - return 0; - Stopwatch watch; - auto * tmp_file = getBucketTempFile(bucket); - block.info.bucket_num = bucket; - auto bytes = tmp_file->write(block); - spill_time_ms += watch.elapsedMilliseconds(); - LOG_TRACE(logger, "spilling one block to disk. bucket: {}, rows: {}, bytes: {}, time: {}", bucket, block.rows(), bytes, watch.elapsedMilliseconds()); - return bytes; -} - -DB::IProcessor::Status ScalableMergingAggregatedTransform::prepare() -{ - auto & output = outputs.front(); - auto & input = inputs.front(); - if (output.isFinished()) - { - input.close(); - return Status::Finished; - } - if (!output.canPush()) - { - input.setNotNeeded(); - return Status::PortFull; - } - - if (has_output) - { - output.push(std::move(output_chunk)); - has_output = false; - return Status::PortFull; - } - - if (!input_finished) - { - if (!has_input) - { - if (input.isFinished()) - { - input_finished = true; - return Status::Ready; - } - input.setNeeded(); - if (!input.hasData()) - return Status::NeedData; - - input_chunk = input.pull(true); - const auto & info = input_chunk.getChunkInfo(); - if (!info) - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in ScalableMergingAggregatedTransform."); - if (const auto * agg_info = typeid_cast(info.get())) - { - has_single_level |= agg_info->bucket_num == -1; - has_two_level |= agg_info->bucket_num >= 0; - if (agg_info->is_overflows) - { - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Not support overflow blocks."); - } - } - else - { - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow chunk info type."); - } - - has_input = true; - } - return Status::Ready; - } - else - { - /// All aggregated data has been output. - if (current_bucket_num >= max_bucket_number) - { - output.finish(); - return Status::Finished; - } - } - return Status::Ready; -} - -void ScalableMergingAggregatedTransform::work() -{ - if (has_input && isMemoryOverFlow()) - { - swithMode(); - } - workImpl(); -} - -void ScalableMergingAggregatedTransform::workImpl() -{ - if (input_finished) - { - if (current_output_blocks.empty()) - { - for (; current_bucket_num < max_bucket_number; ++current_bucket_num) - { - auto bucket_data_variants = getBucketDataVariants(current_bucket_num); - if (bucket_data_variants) - { - Stopwatch watch; - current_output_blocks = params->aggregator.convertToBlocks(*bucket_data_variants, true, 1); - convert_to_blocks_time_ms += watch.elapsedMilliseconds(); - releaseBucketDataVariants(current_bucket_num); - size_t blocks_bytes = 0; - size_t blocks_rows = 0; - for (const auto & block : current_output_blocks) - { - blocks_bytes += block.bytes(); - blocks_rows += block.rows(); - } - LOG_INFO( - logger, - "load bucket data from memory. bucket: {}, blocks: {}, rows: {}, bytes: {}, time: {}", - current_bucket_num, - current_output_blocks.size(), - blocks_rows, - ReadableSize(blocks_bytes), - watch.elapsedMilliseconds()); - break; - } - - Stopwatch watch; - current_output_blocks = loadBucketDataFromDiskAndMerge(current_bucket_num); - if (!current_output_blocks.empty()) - { - size_t blocks_bytes = 0; - size_t blocks_rows = 0; - for (const auto & block : current_output_blocks) - { - blocks_bytes += block.bytes(); - blocks_rows += block.rows(); - } - LOG_INFO( - logger, - "load bucket data from disk and merge. bucket: {}, blocks: {}, rows: {}, bytes: {}, time: {}", - current_bucket_num, - current_output_blocks.size(), - blocks_rows, - ReadableSize(blocks_bytes), - watch.elapsedMilliseconds()); - break; - } - } - } - if (!current_output_blocks.empty()) - { - output_chunk = blockToChunk(current_output_blocks.front()); - current_output_blocks.pop_front(); - has_output = true; - } - } - else if (has_input) - { - auto add_block = [&](DB::Block & block, Int32 bucket_num, bool create_on_miss) - { - if (!block) - return; - auto bucket_data_variants = getBucketDataVariants(bucket_num, create_on_miss); - if (bucket_data_variants) - { - Stopwatch watch; - params->aggregator.mergeOnBlock(block, *bucket_data_variants, no_more_keys); - merge_blocks_time_ms += watch.elapsedMilliseconds(); - } - else - { - spillBucketDataToDisk(bucket_num, block); - } - }; - - if (has_single_level && (has_two_level || mode == OVERFLOW)) - { - auto bucket_data_variants = getBucketDataVariants(-1, false); - if (bucket_data_variants) - { - auto blocks_list = params->aggregator.convertToBlocks(*bucket_data_variants, false, 1); - if (blocks_list.size() > 1) - { - /// two level hash table. - for (auto & blk : blocks_list) - add_block(blk, blk.info.bucket_num, true); - } - else if (blocks_list.size() == 1) - { - /// Single level hash table - auto blocks_vector = params->aggregator.convertBlockToTwoLevel(blocks_list.front()); - for (auto & blk : blocks_vector) - add_block(blk, blk.info.bucket_num, true); - } - releaseBucketDataVariants(-1); - } - has_single_level = false; - } - /// In case the OOM is caused by bucket -1, we first split the bucket -1 into 256 buckets. and then spill one bucket here. - if (isMemoryOverFlow() && mode == OVERFLOW) - { - spillOneBucket(); - } - - /// If we have at least one two level block, transform all single level blocks into two level blocks. - const auto * agg_info = typeid_cast(input_chunk.getChunkInfo().get()); - if (agg_info->bucket_num == -1 && (has_two_level || mode == OVERFLOW)) - { - auto block = chunkToBlock(input_chunk); - auto block_struct = block.dumpStructure(); - auto blocks_vector = params->aggregator.convertBlockToTwoLevel(block); - for (auto & blk : blocks_vector) - { - add_block(blk, blk.info.bucket_num, mode != OVERFLOW); - } - has_single_level = false; - } - else - { - auto block = chunkToBlock(input_chunk); - add_block(block, agg_info->bucket_num, mode != OVERFLOW); - } - input_chunk = {}; - has_input = false; - } -} -DB::AggregatedDataVariantsPtr ScalableMergingAggregatedTransform::getBucketDataVariants(Int32 bucket, bool create_on_miss, size_t size_hint [[maybe_unused]]) -{ - UInt32 index = static_cast(1 + bucket); - if (!buckets_data_variants[index] && create_on_miss) - { - in_memory_buckets_num += 1; - buckets_data_variants[index] = std::make_shared(); - } - return buckets_data_variants[index]; -} - -void ScalableMergingAggregatedTransform::releaseBucketDataVariants(Int32 bucket) -{ - UInt32 index = static_cast(1 + bucket); - buckets_data_variants[index] = nullptr; - in_memory_buckets_num -= 1; -} - -DB::BlocksList ScalableMergingAggregatedTransform::loadBucketDataFromDiskAndMerge(Int32 bucket) -{ - UInt32 index = static_cast(1 + bucket); - if (!bucket_tmp_files[index]) - return {}; - Stopwatch watch; - bucket_tmp_files[index]->finishWriting(); - auto data_variant = std::make_shared(); - bool has_data = false; - while(true) - { - watch.start(); - auto block = bucket_tmp_files[index]->read(); - load_time_ms += watch.elapsedMilliseconds(); - if (!block) - break; - has_data = true; - watch.start(); - params->aggregator.mergeOnBlock(block, *data_variant, no_more_keys); - merge_blocks_time_ms += watch.elapsedMilliseconds(); - } - if (!has_data) - return {}; - - watch.start(); - auto blocks = params->aggregator.convertToBlocks(*data_variant, true, 1); - convert_to_blocks_time_ms += watch.elapsedMilliseconds(); - return blocks; -} - -DB::Chunk ScalableMergingAggregatedTransform::blockToChunk(DB::Block & block) -{ - auto info = std::make_shared(); - info->bucket_num = block.info.bucket_num; - info->is_overflows = block.info.is_overflows; - - UInt64 num_rows = block.rows(); - DB::Chunk chunk(block.getColumns(), num_rows); - chunk.setChunkInfo(std::move(info)); - return chunk; -} - -DB::Block ScalableMergingAggregatedTransform::chunkToBlock(DB::Chunk & chunk) -{ - auto block = header.cloneWithColumns(chunk.detachColumns()); - const auto & info = chunk.getChunkInfo(); - if (const auto * agg_info = typeid_cast(info.get())) - { - block.info.bucket_num = agg_info->bucket_num; - block.info.is_overflows = agg_info->is_overflows; - } - return block; -} - -Int64 ScalableMergingAggregatedTransform::getMemoryUsage() -{ - Int64 current_memory_usage = 0; - if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) - if (auto * memory_tracker = memory_tracker_child->getParent()) - current_memory_usage = memory_tracker->get(); - return current_memory_usage; -} - -void ScalableMergingAggregatedTransform::spillOneBucket() -{ - - if (in_memory_buckets_num <= 1) - { - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot spill one bucket."); - } - for (size_t i = max_bucket_number; i > 0; --i) - { - if (buckets_data_variants[i]) - { - Stopwatch watch; - auto block = params->aggregator.convertToSingleBlock(*buckets_data_variants[i], false); - convert_to_blocks_time_ms += watch.elapsedMilliseconds(); - size_t block_bytes = block.bytes(); - auto write_bytes = spillBucketDataToDisk(i - 1, block); - releaseBucketDataVariants(i - 1); - break; - } - } -} -} diff --git a/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h b/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h deleted file mode 100644 index 857b5bc023c0..000000000000 --- a/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace local_engine -{ - -/// A more memory efficient implementation for merging aggregated date into final blocks. -/// If the memory consumption exceeds the limit, we will spilt data into 256 buckets, and spill -/// some of them into disk. -class ScalableMergingAggregatedStep : public DB::ITransformingStep -{ -public: - explicit ScalableMergingAggregatedStep( - const DB::ContextPtr context_, - const DB::DataStream & input_stream_, - DB::Aggregator::Params params_, - bool should_produce_results_in_order_of_bucket_number_); - ~ScalableMergingAggregatedStep() override = default; - - String getName() const override { return "ScalableMergingAggregated"; } - - void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; - - void describeActions(DB::JSONBuilder::JSONMap & map) const override; - void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; -private: - DB::ContextPtr context; - DB::Aggregator::Params params; - void updateOutputStream() override; -}; - -/// Accept one input port and one output port. -class ScalableMergingAggregatedTransform : public DB::IProcessor -{ -public: - static constexpr Int32 max_bucket_number = 256; - using Status = DB::IProcessor::Status; - explicit ScalableMergingAggregatedTransform(const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_); - ~ScalableMergingAggregatedTransform() override; - - Status prepare() override; - void work() override; - - String getName() const override { return "ScalableMergingAggregatedTransform"; } - -private: - DB::Block header; - DB::AggregatingTransformParamsPtr params; - DB::ContextPtr context; - Poco::Logger * logger = &Poco::Logger::get("ScalableMergingAggregatedTransform"); - - /// In NORMAL model, we will keep all data in memory. - /// In OVERFLOW model, we will spill all data to disk,except for the data in bucket 0. - /// If the bucket number of the blocks come from upstream is -1, we cannot swith mode. - enum Mode - { - NORMAL = 0, - OVERFLOW = 1, - }; - Mode mode = NORMAL; - bool has_single_level = false; - bool has_two_level = false; - size_t in_memory_buckets_num = 0; - std::vector buckets_data_variants; - - Int32 current_bucket_num = -1; - DB::TemporaryDataOnDiskPtr tmp_data_disk; - std::vector bucket_tmp_files; - - bool no_more_keys = false; - size_t merge_blocks_time_ms = 0; - size_t spill_time_ms = 0; - size_t load_time_ms = 0; - size_t convert_to_blocks_time_ms = 0; - - bool isMemoryOverFlow(); - /// Switch from NORMAL to OVERFLOW. - void swithMode(); - - inline DB::TemporaryFileStream * getBucketTempFile(Int32 bucket) - { - UInt32 index = static_cast(bucket + 1); - if (!bucket_tmp_files[index]) - { - auto * tmp_file = &tmp_data_disk->createStream(header); - bucket_tmp_files[index] = tmp_file; - } - return bucket_tmp_files[index]; - } - - DB::AggregatedDataVariantsPtr getBucketDataVariants(Int32 bucket, bool create_on_miss = false, size_t size_hint = 0); - void releaseBucketDataVariants(Int32 bucket); - - bool input_finished = false; - DB::Chunk input_chunk; - DB::Chunk output_chunk; - bool has_input = false; - bool has_output = false; - DB::BlocksList current_output_blocks; - - void workImpl(); - // Append one block to the tmp file. - size_t spillBucketDataToDisk(Int32 bucket, DB::Block block); - /// Load all blocks from the tmp file and merge them. - DB::BlocksList loadBucketDataFromDiskAndMerge(Int32 bucket); - - DB::Block chunkToBlock(DB::Chunk & chunk); - DB::Chunk blockToChunk(DB::Block & block); - - Int64 getMemoryUsage(); - - void spillOneBucket(); -}; -} diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp index a0dfd27401aa..043460585fb5 100644 --- a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp @@ -40,13 +40,21 @@ StreamingAggregatingTransform::StreamingAggregatingTransform(DB::ContextPtr cont , aggregate_columns(params_->params.aggregates_size) , params(params_) { - params = std::make_shared(); - variants = std::make_shared(); } StreamingAggregatingTransform::~StreamingAggregatingTransform() { - LOG_ERROR(logger, "xxx input rows: {}, output rows: {}", input_rows, output_rows); + LOG_INFO( + logger, + "Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, " + "total_clear_data_variants_num: {}, total_aggregate_time: {}, total_convert_data_variants_time: {}", + total_input_blocks, + total_input_rows, + total_output_blocks, + total_output_rows, + total_clear_data_variants_num, + total_aggregate_time, + total_convert_data_variants_time); } StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() @@ -62,7 +70,8 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() { if (output.canPush()) { - output_rows += output_chunk.getNumRows(); + total_output_rows += output_chunk.getNumRows(); + total_output_blocks++; output.push(std::move(output_chunk)); has_output = false; } @@ -72,17 +81,9 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() if (has_input) return Status::Ready; - if (is_consume_finished) - { - input.close(); - output.finish(); - return Status::Finished; - } - if (input.isFinished()) { - is_input_finished = true; - if (is_clear_aggregator) + if (!data_variants) { output.finish(); return Status::Finished; @@ -92,14 +93,15 @@ StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() return Status::Ready; } } + input.setNeeded(); if (!input.hasData()) { return Status::NeedData; } input_chunk = input.pull(true); - input_rows += input_chunk.getNumRows(); - LOG_ERROR(logger, "xxx input one chunk. rows: {}", input_chunk.getNumRows()); + total_input_rows += input_chunk.getNumRows(); + total_input_blocks++; has_input = true; return Status::Ready; } @@ -110,124 +112,130 @@ static UInt64 getMemoryUsage() if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) if (auto * memory_tracker = memory_tracker_child->getParent()) current_memory_usage = memory_tracker->get(); - return current_memory_usage; -} - -template -static Int32 getAggregatedDataVariantsBucketNum(Method & method [[maybe_unused]]) -{ - return Method::Data::NUM_BUCKETS; + return current_memory_usage < 0 ? 0 : current_memory_usage; } bool StreamingAggregatingTransform::isMemoryOverFlow() { - return getMemoryUsage() > context->getSettingsRef().max_bytes_before_external_group_by; + /// More greedy memory usage strategy. + if (!context->getSettingsRef().max_memory_usage) + return false; + auto max_mem_used = context->getSettingsRef().max_memory_usage * 8 / 10; + auto current_result_rows = data_variants->size(); + auto current_mem_used = getMemoryUsage(); + if (per_key_memory_usage > 0) + { + if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) + { + LOG_INFO( + logger, + "Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}", + ReadableSize(current_mem_used), + ReadableSize(max_mem_used), + ReadableSize(per_key_memory_usage), + current_result_rows); + return true; + } + } + else + { + if (current_mem_used * 2 >= context->getSettingsRef().max_memory_usage) + { + LOG_INFO( + logger, + "Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}", + ReadableSize(current_mem_used), + ReadableSize(context->getSettingsRef().max_memory_usage)); + return true; + } + } + return false; } + void StreamingAggregatingTransform::work() { - if (has_input) + auto pop_one_pending_block = [&]() { - if (pending_buckets > 0) + while (!pending_blocks.empty()) { - if (input_chunk.hasRows()) + if (!pending_blocks.front().rows()) { - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "input chunk must be empty here, but got: {}", input_chunk.dumpStructure()); + pending_blocks.pop_front(); + continue; } - output_chunk = DB::convertToChunk(params->aggregator.convertOneBucketToBlock(*variants, variants->aggregates_pool, false, pending_buckets - 1)); + // downstream is GraceMergingAggregatedStep, don't need this bock_info. + // make it be default value. + pending_blocks.front().info = DB::BlockInfo(); + + output_chunk = DB::convertToChunk(pending_blocks.front()); + pending_blocks.pop_front(); has_output = true; - total_data_variants_rows += output_chunk.getNumRows(); - total_data_variants_bytes += output_chunk.bytes(); - LOG_ERROR(logger, "xxx flush one bucket. rows: {}, bytes: {}, bucket: {}", output_chunk.getNumRows(), ReadableSize(output_chunk.bytes()), pending_buckets - 1); + has_input = !pending_blocks.empty(); + return true; + } + return false; + }; - pending_buckets -= 1; - if (pending_buckets <= 0) - { - auto mem1 = getMemoryUsage(); - is_clear_aggregator = true; - variants = std::make_shared(); - LOG_ERROR(logger, "xxx flush one variant. rows: {}, bytes: {}, mem used: {} -> {}", total_data_variants_rows, ReadableSize(total_data_variants_bytes), ReadableSize(mem1), ReadableSize(getMemoryUsage())); - total_data_variants_rows = 0; - total_data_variants_bytes = 0; - } - has_input = pending_buckets > 0; + if (has_input) + { + if (pop_one_pending_block()) + return; + + if (!input_chunk.getNumRows()) + { + has_input = false; + return; } - else + + if (!data_variants) + data_variants = std::make_shared(); + + auto num_rows = input_chunk.getNumRows(); + Stopwatch watch; + params->aggregator.executeOnBlock( + input_chunk.detachColumns(), 0, num_rows, *data_variants, key_columns, aggregate_columns, no_more_keys); + total_aggregate_time += watch.elapsedMicroseconds(); + has_input = false; + + if (isMemoryOverFlow()) { - const UInt64 num_rows = input_chunk.getNumRows(); - bool need_to_evict_aggregator = false; - try - { - is_clear_aggregator = false; - if (!params->aggregator.executeOnBlock( - input_chunk.detachColumns(), 0, num_rows, *variants, key_columns, aggregate_columns, no_more_keys)) - { - is_consume_finished = true; - need_to_evict_aggregator = true; - } - LOG_ERROR(logger, "xxxx mem used: {} after merging one chunk", ReadableSize(getMemoryUsage())); - input_chunk = {}; - } - catch (DB::Exception & e) - { - if (e.code() == DB::ErrorCodes::LOGICAL_ERROR - && e.message() == "Cannot write to temporary file because temporary file is not initialized") - { + Stopwatch convert_watch; + /// When convert data variants to blocks, memory usage may be double. + pending_blocks = params->aggregator.convertToBlocks(*data_variants, false, 1); - LOG_ERROR(logger, "xxx exception on spill data into disk. mem limit: {}", ReadableSize(params->params.max_bytes_before_external_group_by)); - need_to_evict_aggregator = true; - input_chunk = {}; - } - else - { - LOG_ERROR(logger, "xxx unknow exception: {}", e.message()); - throw; - } - } - if (need_to_evict_aggregator) + size_t total_mem_used = 0; + size_t total_rows = 0; + for (const auto & block : pending_blocks) { - LOG_ERROR(logger, "xxx 2. prepare to clear data variants. size: {}", variants->size()); - if (variants->isTwoLevel()) - { - prepareFlushOneTwoLevelDataVariants(); - } - else - { - auto block = params->aggregator.convertToSingleBlock(*variants, false); - output_chunk = DB::convertToChunk(block); - has_output = true; - has_input = false; - variants = std::make_shared(); - is_clear_aggregator = true; - } + total_mem_used += block.allocatedBytes(); + total_rows += block.rows(); } + if (total_rows) + per_key_memory_usage = total_mem_used * 1.0 / total_rows; + + total_convert_data_variants_time += convert_watch.elapsedMicroseconds(); + total_clear_data_variants_num++; + data_variants = nullptr; + pop_one_pending_block(); } - has_input = pending_buckets > 0; } else { - if (!is_input_finished) - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "input should be finished"); + // NOLINTNEXTLINE + if (!data_variants->size()) + { + has_output = false; + } + Stopwatch convert_watch; pending_blocks = params->aggregator.convertToBlocks(*data_variants, false, 1); - data_variants = std::make_shared(); + total_clear_data_variants_num++; + total_aggregate_time += convert_watch.elapsedMicroseconds(); + data_variants = nullptr; + pop_one_pending_block(); } } -void StreamingAggregatingTransform::prepareFlushOneTwoLevelDataVariants() -{ -#define M(NAME) \ - else if (variants->type == DB::AggregatedDataVariants::Type::NAME) \ - { \ - pending_buckets = getAggregatedDataVariantsBucketNum(*(variants->NAME)); \ - } - - if (false) {} // NOLINT - APPLY_FOR_VARIANTS_TWO_LEVEL(M) -#undef M - has_output = false; - has_input = pending_buckets > 0; -} - static DB::ITransformingStep::Traits getTraits() { return DB::ITransformingStep::Traits diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h index 9b7493bf36cd..7aa37024cd81 100644 --- a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h @@ -15,6 +15,7 @@ * limitations under the License. */ #pragma once +#include #include #include #include @@ -49,25 +50,28 @@ class StreamingAggregatingTransform : public DB::IProcessor bool no_more_keys = false; bool is_consume_finished = false; - bool is_input_finished = false; bool is_clear_aggregator = false; - DB::AggregatedDataVariantsPtr data_variants; + DB::AggregatedDataVariantsPtr data_variants = nullptr; bool has_input = false; bool has_output = false; DB::Chunk input_chunk; DB::Chunk output_chunk; DB::BlocksList pending_blocks; - Int32 pending_buckets = 0; - size_t total_data_variants_rows = 0; - size_t total_data_variants_bytes = 0; Poco::Logger * logger = &Poco::Logger::get("StreamingAggregatingTransform"); - size_t input_rows = 0; - size_t output_rows = 0; - void prepareFlushOneTwoLevelDataVariants(); - bool isMemoryOverFlow(); + double per_key_memory_usage = 0; + + // metrics + size_t total_input_blocks = 0; + size_t total_input_rows = 0; + size_t total_output_blocks = 0; + size_t total_output_rows = 0; + size_t total_clear_data_variants_num = 0; + size_t total_aggregate_time = 0; + size_t total_convert_data_variants_time = 0; + bool isMemoryOverFlow(); }; class StreamingAggregatingStep : public DB::ITransformingStep diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp index 51d51d66b783..fd6de6d2e28d 100644 --- a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp +++ b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp @@ -20,17 +20,18 @@ #include #include #include +#include #include #include +#include +#include +#include #include #include #include #include #include "Common/PODArray.h" #include -#include "DataTypes/IDataType.h" -#include -#include namespace DB { @@ -237,9 +238,10 @@ void AggregateRelParser::addMergingAggregatedStep() buildAggregateDescriptions(aggregate_descriptions); auto settings = getContext()->getSettingsRef(); Aggregator::Params params(grouping_keys, aggregate_descriptions, false, settings.max_threads, settings.max_block_size); + if (settings.distributed_aggregation_memory_efficient) { - auto merging_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params, false); + auto merging_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params); steps.emplace_back(merging_step.get()); plan->addStep(std::move(merging_step)); } @@ -267,44 +269,74 @@ void AggregateRelParser::addAggregatingStep() AggregateDescriptions aggregate_descriptions; buildAggregateDescriptions(aggregate_descriptions); auto settings = getContext()->getSettingsRef(); - Aggregator::Params params( - grouping_keys, - aggregate_descriptions, - false, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - getContext()->getTempDataOnDisk(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data, - true, - 3, - settings.max_block_size, - /*enable_prefetch*/ true, - /*only_merge*/ false, - settings.optimize_group_by_constant_keys); - auto aggregating_step = std::make_unique( - plan->getCurrentDataStream(), - params, - GroupingSetsParamsList(), - false, - settings.max_block_size, - settings.aggregation_in_order_max_block_bytes, - 1, - 1, - false, - false, - SortDescription(), - SortDescription(), - false, - false, - false); - steps.emplace_back(aggregating_step.get()); - plan->addStep(std::move(aggregating_step)); + if (settings.distributed_aggregation_memory_efficient) + { + // Disable spilling to disk. + Aggregator::Params params( + grouping_keys, + aggregate_descriptions, + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + 0, + settings.empty_result_for_aggregation_by_empty_set, + nullptr, + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + true, + 3, + settings.max_block_size, + /*enable_prefetch*/ true, + /*only_merge*/ false, + settings.optimize_group_by_constant_keys); + auto aggregating_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params); + steps.emplace_back(aggregating_step.get()); + plan->addStep(std::move(aggregating_step)); + } + else + { + Aggregator::Params params( + grouping_keys, + aggregate_descriptions, + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + getContext()->getTempDataOnDisk(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + true, + 3, + settings.max_block_size, + /*enable_prefetch*/ true, + /*only_merge*/ false, + settings.optimize_group_by_constant_keys); + + auto aggregating_step = std::make_unique( + plan->getCurrentDataStream(), + params, + GroupingSetsParamsList(), + false, + settings.max_block_size, + settings.aggregation_in_order_max_block_bytes, + 1, + 1, + false, + false, + SortDescription(), + SortDescription(), + false, + false, + false); + steps.emplace_back(aggregating_step.get()); + plan->addStep(std::move(aggregating_step)); + } } // Only be called in final stage. diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index e331ff7b3d90..191b8efb0ff3 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -94,7 +94,6 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitO split_result.raw_partition_lengths.resize(options.partition_num, 0); } - void CachedShuffleWriter::split(DB::Block & block) { auto block_info = block.info;