diff --git a/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp new file mode 100644 index 0000000000000..19fb969f515d9 --- /dev/null +++ b/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp @@ -0,0 +1,397 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +static DB::ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number) +{ + return DB::ITransformingStep::Traits + { + { + .returns_single_stream = should_produce_results_in_order_of_bucket_number, + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +ScalableMergingAggregatedStep::ScalableMergingAggregatedStep( + const DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_, + bool should_produce_results_in_order_of_bucket_number_) + : DB::ITransformingStep( + input_stream_, params_.getHeader(input_stream_.header, true), getTraits(should_produce_results_in_order_of_bucket_number_)) + , context(context_) + , params(std::move(params_)) +{ +} + +void ScalableMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) +{ + auto transform_params = std::make_shared(pipeline.getHeader(), params, true); + pipeline.resize(1); + auto build_transform = [&](DB::OutputPortRawPtrs outputs) + { + DB::Processors new_processors; + for (auto & output : outputs) + { + auto op = std::make_shared(pipeline.getHeader(), transform_params, context); + new_processors.push_back(op); + DB::connect(*output, op->getInputs().front()); + } + return new_processors; + }; + pipeline.transform(build_transform); +} + +void ScalableMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + return params.explain(settings.out, settings.offset); +} + +void ScalableMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const +{ + params.explain(map); +} + +void ScalableMergingAggregatedStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, true), getDataStreamTraits()); +} + +ScalableMergingAggregatedTransform::ScalableMergingAggregatedTransform( + const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_) + : DB::IProcessor({header_}, {params_->getHeader()}) + , header(header_) + , params(params_) + , context(context_) + , tmp_data_disk(std::make_unique(context_->getTempDataOnDisk())) +{ + buckets_data_variants.resize(max_bucket_number + 1, nullptr); + bucket_tmp_files.resize(max_bucket_number + 1, nullptr); +} + +bool ScalableMergingAggregatedTransform::isMemoryOverFlow() +{ + UInt64 current_memory_usage = getMemoryUsage(); + if (params->params.max_bytes_before_external_group_by && current_memory_usage > params->params.max_bytes_before_external_group_by) + { + LOG_INFO( + logger, + "Memory is overflow. current_memory_usage: {}, max_bytes_before_external_group_by: {}", + ReadableSize(current_memory_usage), + ReadableSize(params->params.max_bytes_before_external_group_by)); + return true; + } + return false; +} + +void ScalableMergingAggregatedTransform::swithMode() +{ + if (params->params.keys.empty()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot switch mode for aggregation without keys"); + has_two_level = true; + mode = OVERFLOW; +} + +size_t ScalableMergingAggregatedTransform::spillBucketDataToDisk(Int32 bucket, DB::Block block) +{ + if (!block.rows()) + return 0; + Stopwatch watch; + auto * tmp_file = getBucketTempFile(bucket); + block.info.bucket_num = bucket; + auto bytes = tmp_file->write(block); + LOG_TRACE(logger, "spilling one block to disk. bucket: {}, rows: {}, bytes: {}, time: {}", bucket, block.rows(), bytes, watch.elapsedMilliseconds()); + return bytes; +} + +DB::IProcessor::Status ScalableMergingAggregatedTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + if (!output.canPush()) + { + input.setNotNeeded(); + return Status::PortFull; + } + + if (has_output) + { + output.push(std::move(output_chunk)); + has_output = false; + return Status::PortFull; + } + + if (!input_finished) + { + if (!has_input) + { + if (input.isFinished()) + { + input_finished = true; + return Status::Ready; + } + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + input_chunk = input.pull(true); + const auto & info = input_chunk.getChunkInfo(); + if (!info) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in ScalableMergingAggregatedTransform."); + if (const auto * agg_info = typeid_cast(info.get())) + { + has_single_level |= agg_info->bucket_num == -1; + has_two_level |= agg_info->bucket_num >= 0; + if (agg_info->is_overflows) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Not support overflow blocks."); + } + } + else + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow chunk info type."); + } + + has_input = true; + } + return Status::Ready; + } + else + { + /// All aggregated data has been output. + if (current_bucket_num >= max_bucket_number) + { + output.finish(); + return Status::Finished; + } + } + return Status::Ready; +} + +void ScalableMergingAggregatedTransform::work() +{ + if (has_input && isMemoryOverFlow()) + { + swithMode(); + } + workImpl(); +} + +void ScalableMergingAggregatedTransform::workImpl() +{ + if (input_finished) + { + for (; current_bucket_num < max_bucket_number; ++current_bucket_num) + { + auto bucket_data_variants = getBucketDataVariants(current_bucket_num); + if (bucket_data_variants) + { + Stopwatch watch; + auto block = params->aggregator.convertToSingleBlock(*bucket_data_variants, true); + releaseBucketDataVariants(current_bucket_num); + output_chunk = blockToChunk(block); + has_output = true; + LOG_TRACE(logger, "load bucket data from memory. bucket: {}, rows: {}, bytes: {}, time: {}", current_bucket_num, block.rows(), ReadableSize(block.bytes()), watch.elapsedMilliseconds()); + return; + } + Stopwatch watch; + auto block = loadBucketDataFromDiskAndMerge(current_bucket_num); + if (block) + { + output_chunk = blockToChunk(block); + has_output = true; + LOG_INFO( + logger, + "load bucket data from disk and merge. bucket: {}, rows: {}, bytes: {}, time: {}", + current_bucket_num, + block.rows(), + ReadableSize(block.bytes()), + watch.elapsedMilliseconds()); + return; + } + } + } + else if (has_input) + { + auto add_block = [&](DB::Block & block, Int32 bucket_num, bool create_on_miss) + { + if (!block) + return; + auto bucket_data_variants = getBucketDataVariants(bucket_num, create_on_miss); + if (bucket_data_variants) + { + params->aggregator.mergeOnBlock(block, *bucket_data_variants, no_more_keys); + } + else + { + spillBucketDataToDisk(bucket_num, block); + } + }; + + if (has_single_level && (has_two_level || mode == OVERFLOW)) + { + auto bucket_data_variants = getBucketDataVariants(-1, false); + if (bucket_data_variants) + { + auto blocks_list = params->aggregator.convertToBlocks(*bucket_data_variants, false, 1); + if (blocks_list.size() > 1) + { + /// two level hash table. + for (auto & blk : blocks_list) + add_block(blk, blk.info.bucket_num, true); + } + else if (blocks_list.size() == 1) + { + /// Single level hash table + auto blocks_vector = params->aggregator.convertBlockToTwoLevel(blocks_list.front()); + for (auto & blk : blocks_vector) + add_block(blk, blk.info.bucket_num, true); + } + releaseBucketDataVariants(-1); + } + has_single_level = false; + } + /// In case the OOM is caused by bucket -1, we first split the bucket -1 into 256 buckets. and then spill one bucket here. + if (isMemoryOverFlow() && mode == OVERFLOW) + { + spillOneBucket(); + } + + /// If we have at least one two level block, transform all single level blocks into two level blocks. + const auto * agg_info = typeid_cast(input_chunk.getChunkInfo().get()); + if (agg_info->bucket_num == -1 && (has_two_level || mode == OVERFLOW)) + { + auto block = chunkToBlock(input_chunk); + auto block_struct = block.dumpStructure(); + auto blocks_vector = params->aggregator.convertBlockToTwoLevel(block); + for (auto & blk : blocks_vector) + { + add_block(blk, blk.info.bucket_num, mode != OVERFLOW); + } + has_single_level = false; + } + else + { + auto block = chunkToBlock(input_chunk); + add_block(block, agg_info->bucket_num, mode != OVERFLOW); + } + input_chunk = {}; + has_input = false; + } +} +DB::AggregatedDataVariantsPtr ScalableMergingAggregatedTransform::getBucketDataVariants(Int32 bucket, bool create_on_miss, size_t size_hint [[maybe_unused]]) +{ + UInt32 index = static_cast(1 + bucket); + if (!buckets_data_variants[index] && create_on_miss) + { + in_memory_buckets_num += 1; + buckets_data_variants[index] = std::make_shared(); + } + return buckets_data_variants[index]; +} + +void ScalableMergingAggregatedTransform::releaseBucketDataVariants(Int32 bucket) +{ + UInt32 index = static_cast(1 + bucket); + buckets_data_variants[index] = nullptr; + in_memory_buckets_num -= 1; +} + +DB::Block ScalableMergingAggregatedTransform::loadBucketDataFromDiskAndMerge(Int32 bucket) +{ + UInt32 index = static_cast(1 + bucket); + if (!bucket_tmp_files[index]) + return {}; + bucket_tmp_files[index]->finishWriting(); + auto data_variant = std::make_shared(); + bool has_data = false; + while(true) + { + auto block = bucket_tmp_files[index]->read(); + if (!block) + break; + has_data = true; + params->aggregator.mergeOnBlock(block, *data_variant, no_more_keys); + } + if (!has_data) + return {}; + return params->aggregator.convertToSingleBlock(*data_variant, true); +} + +DB::Chunk ScalableMergingAggregatedTransform::blockToChunk(DB::Block & block) +{ + auto info = std::make_shared(); + info->bucket_num = block.info.bucket_num; + info->is_overflows = block.info.is_overflows; + + UInt64 num_rows = block.rows(); + DB::Chunk chunk(block.getColumns(), num_rows); + chunk.setChunkInfo(std::move(info)); + return chunk; +} + +DB::Block ScalableMergingAggregatedTransform::chunkToBlock(DB::Chunk & chunk) +{ + auto block = header.cloneWithColumns(chunk.detachColumns()); + const auto & info = chunk.getChunkInfo(); + if (const auto * agg_info = typeid_cast(info.get())) + { + block.info.bucket_num = agg_info->bucket_num; + block.info.is_overflows = agg_info->is_overflows; + } + return block; +} + +Int64 ScalableMergingAggregatedTransform::getMemoryUsage() +{ + Int64 current_memory_usage = 0; + if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) + if (auto * memory_tracker = memory_tracker_child->getParent()) + current_memory_usage = memory_tracker->get(); + return current_memory_usage; +} + +void ScalableMergingAggregatedTransform::spillOneBucket() +{ + + if (in_memory_buckets_num <= 1) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot spill one bucket."); + } + for (size_t i = max_bucket_number; i > 0; --i) + { + if (buckets_data_variants[i]) + { + auto block = params->aggregator.convertToSingleBlock(*buckets_data_variants[i], false); + auto write_bytes = spillBucketDataToDisk(i - 1, block); + releaseBucketDataVariants(i - 1); + break; + } + } +} +} diff --git a/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h b/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h new file mode 100644 index 0000000000000..4d30a6de1ecb3 --- /dev/null +++ b/cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h @@ -0,0 +1,120 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ + +/// A more memory efficient implementation for merging aggregated date into final blocks. +/// If the memory consumption exceeds the limit, we will spilt data into 256 buckets, and spill +/// some of them into disk. +class ScalableMergingAggregatedStep : public DB::ITransformingStep +{ +public: + explicit ScalableMergingAggregatedStep( + const DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_, + bool should_produce_results_in_order_of_bucket_number_); + ~ScalableMergingAggregatedStep() override = default; + + String getName() const override { return "ScalableMergingAggregated"; } + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::JSONBuilder::JSONMap & map) const override; + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; +private: + DB::ContextPtr context; + DB::Aggregator::Params params; + void updateOutputStream() override; +}; + +/// Accept one input port and one output port. +class ScalableMergingAggregatedTransform : public DB::IProcessor +{ +public: + static constexpr Int32 max_bucket_number = 256; + using Status = DB::IProcessor::Status; + explicit ScalableMergingAggregatedTransform(const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_); + ~ScalableMergingAggregatedTransform() override = default; + + Status prepare() override; + void work() override; + + String getName() const override { return "ScalableMergingAggregatedTransform"; } + +private: + DB::Block header; + DB::AggregatingTransformParamsPtr params; + DB::ContextPtr context; + Poco::Logger * logger = &Poco::Logger::get("ScalableMergingAggregatedTransform"); + /// In NORMAL model, we will keep all data in memory. + /// In OVERFLOW model, we will spill all data to disk,except for the data in bucket 0. + /// If the bucket number of the blocks come from upstream is -1, we cannot swith mode. + enum Mode + { + NORMAL = 0, + OVERFLOW = 1, + }; + Mode mode = NORMAL; + bool has_single_level = false; + bool has_two_level = false; + size_t in_memory_buckets_num = 0; + std::vector buckets_data_variants; + + Int32 current_bucket_num = -1; + DB::TemporaryDataOnDiskPtr tmp_data_disk; + std::vector bucket_tmp_files; + + bool no_more_keys = false; + + bool isMemoryOverFlow(); + /// Switch from NORMAL to OVERFLOW. + void swithMode(); + + inline DB::TemporaryFileStream * getBucketTempFile(Int32 bucket) + { + UInt32 index = static_cast(bucket + 1); + if (!bucket_tmp_files[index]) + { + auto * tmp_file = &tmp_data_disk->createStream(header); + bucket_tmp_files[index] = tmp_file; + } + return bucket_tmp_files[index]; + } + + DB::AggregatedDataVariantsPtr getBucketDataVariants(Int32 bucket, bool create_on_miss = false, size_t size_hint = 0); + void releaseBucketDataVariants(Int32 bucket); + + bool input_finished = false; + DB::Chunk input_chunk; + DB::Chunk output_chunk; + bool has_input = false; + bool has_output = false; + + void workImpl(); + // Append one block to the tmp file. + size_t spillBucketDataToDisk(Int32 bucket, DB::Block block); + /// Load all blocks from the tmp file and merge them. + DB::Block loadBucketDataFromDiskAndMerge(Int32 bucket); + + DB::Block chunkToBlock(DB::Chunk & chunk); + DB::Chunk blockToChunk(DB::Block & block); + + Int64 getMemoryUsage(); + + void spillOneBucket(); +}; +} diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp index 961a5e7ab8ef1..99bd64b898fb9 100644 --- a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp +++ b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp @@ -31,6 +31,7 @@ #include "DataTypes/IDataType.h" #include +#include namespace DB { @@ -237,20 +238,29 @@ void AggregateRelParser::addMergingAggregatedStep() buildAggregateDescriptions(aggregate_descriptions); auto settings = getContext()->getSettingsRef(); Aggregator::Params params(grouping_keys, aggregate_descriptions, false, settings.max_threads, settings.max_block_size); - auto merging_step = std::make_unique( - plan->getCurrentDataStream(), - params, - true, - false, - 1, - 1, - false, - settings.max_block_size, - settings.aggregation_in_order_max_block_bytes, - SortDescription(), - settings.enable_memory_bound_merging_of_aggregation_results); - steps.emplace_back(merging_step.get()); - plan->addStep(std::move(merging_step)); + if (settings.distributed_aggregation_memory_efficient) + { + auto merging_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params, false); + steps.emplace_back(merging_step.get()); + plan->addStep(std::move(merging_step)); + } + else + { + auto merging_step = std::make_unique( + plan->getCurrentDataStream(), + params, + true, + false, + 1, + 1, + false, + settings.max_block_size, + settings.aggregation_in_order_max_block_bytes, + SortDescription(), + settings.enable_memory_bound_merging_of_aggregation_results); + steps.emplace_back(merging_step.get()); + plan->addStep(std::move(merging_step)); + } } void AggregateRelParser::addAggregatingStep() diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 16d56e9bb8e88..86528f3aaf87e 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -94,6 +94,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions void CachedShuffleWriter::split(DB::Block & block) { + auto block_info = block.info; initOutputIfNeeded(block); Stopwatch split_time_watch; split_time_watch.start(); @@ -110,6 +111,7 @@ void CachedShuffleWriter::split(DB::Block & block) { out_block.insert(block.getByPosition(output_columns_indicies[col])); } + out_block.info = block_info; partition_writer->write(partition_info, out_block); if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold)