-
Notifications
You must be signed in to change notification settings - Fork 435
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
543 additions
and
14 deletions.
There are no files selected for viewing
397 changes: 397 additions & 0 deletions
397
cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,397 @@ | ||
#include <iterator> | ||
#include <type_traits> | ||
#include <typeinfo> | ||
#include <Operator/ScalableMergingAggregatedStep.h> | ||
#include <Processors/Transforms/AggregatingTransform.h> | ||
#include <QueryPipeline/QueryPipelineBuilder.h> | ||
#include <Common/CurrentThread.h> | ||
#include <Common/formatReadable.h> | ||
|
||
namespace DB | ||
{ | ||
namespace ErrorCodes | ||
{ | ||
extern const int LOGICAL_ERROR; | ||
} | ||
} | ||
|
||
namespace local_engine | ||
{ | ||
static DB::ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number) | ||
{ | ||
return DB::ITransformingStep::Traits | ||
{ | ||
{ | ||
.returns_single_stream = should_produce_results_in_order_of_bucket_number, | ||
.preserves_number_of_streams = false, | ||
.preserves_sorting = false, | ||
}, | ||
{ | ||
.preserves_number_of_rows = false, | ||
} | ||
}; | ||
} | ||
|
||
ScalableMergingAggregatedStep::ScalableMergingAggregatedStep( | ||
const DB::ContextPtr context_, | ||
const DB::DataStream & input_stream_, | ||
DB::Aggregator::Params params_, | ||
bool should_produce_results_in_order_of_bucket_number_) | ||
: DB::ITransformingStep( | ||
input_stream_, params_.getHeader(input_stream_.header, true), getTraits(should_produce_results_in_order_of_bucket_number_)) | ||
, context(context_) | ||
, params(std::move(params_)) | ||
{ | ||
} | ||
|
||
void ScalableMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) | ||
{ | ||
auto transform_params = std::make_shared<DB::AggregatingTransformParams>(pipeline.getHeader(), params, true); | ||
pipeline.resize(1); | ||
auto build_transform = [&](DB::OutputPortRawPtrs outputs) | ||
{ | ||
DB::Processors new_processors; | ||
for (auto & output : outputs) | ||
{ | ||
auto op = std::make_shared<ScalableMergingAggregatedTransform>(pipeline.getHeader(), transform_params, context); | ||
new_processors.push_back(op); | ||
DB::connect(*output, op->getInputs().front()); | ||
} | ||
return new_processors; | ||
}; | ||
pipeline.transform(build_transform); | ||
} | ||
|
||
void ScalableMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const | ||
{ | ||
return params.explain(settings.out, settings.offset); | ||
} | ||
|
||
void ScalableMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const | ||
{ | ||
params.explain(map); | ||
} | ||
|
||
void ScalableMergingAggregatedStep::updateOutputStream() | ||
{ | ||
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, true), getDataStreamTraits()); | ||
} | ||
|
||
ScalableMergingAggregatedTransform::ScalableMergingAggregatedTransform( | ||
const DB::Block & header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_) | ||
: DB::IProcessor({header_}, {params_->getHeader()}) | ||
, header(header_) | ||
, params(params_) | ||
, context(context_) | ||
, tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk())) | ||
{ | ||
buckets_data_variants.resize(max_bucket_number + 1, nullptr); | ||
bucket_tmp_files.resize(max_bucket_number + 1, nullptr); | ||
} | ||
|
||
bool ScalableMergingAggregatedTransform::isMemoryOverFlow() | ||
{ | ||
UInt64 current_memory_usage = getMemoryUsage(); | ||
if (params->params.max_bytes_before_external_group_by && current_memory_usage > params->params.max_bytes_before_external_group_by) | ||
{ | ||
LOG_INFO( | ||
logger, | ||
"Memory is overflow. current_memory_usage: {}, max_bytes_before_external_group_by: {}", | ||
ReadableSize(current_memory_usage), | ||
ReadableSize(params->params.max_bytes_before_external_group_by)); | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
void ScalableMergingAggregatedTransform::swithMode() | ||
{ | ||
if (params->params.keys.empty()) | ||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot switch mode for aggregation without keys"); | ||
has_two_level = true; | ||
mode = OVERFLOW; | ||
} | ||
|
||
size_t ScalableMergingAggregatedTransform::spillBucketDataToDisk(Int32 bucket, DB::Block block) | ||
{ | ||
if (!block.rows()) | ||
return 0; | ||
Stopwatch watch; | ||
auto * tmp_file = getBucketTempFile(bucket); | ||
block.info.bucket_num = bucket; | ||
auto bytes = tmp_file->write(block); | ||
LOG_TRACE(logger, "spilling one block to disk. bucket: {}, rows: {}, bytes: {}, time: {}", bucket, block.rows(), bytes, watch.elapsedMilliseconds()); | ||
return bytes; | ||
} | ||
|
||
DB::IProcessor::Status ScalableMergingAggregatedTransform::prepare() | ||
{ | ||
auto & output = outputs.front(); | ||
auto & input = inputs.front(); | ||
if (output.isFinished()) | ||
{ | ||
input.close(); | ||
return Status::Finished; | ||
} | ||
if (!output.canPush()) | ||
{ | ||
input.setNotNeeded(); | ||
return Status::PortFull; | ||
} | ||
|
||
if (has_output) | ||
{ | ||
output.push(std::move(output_chunk)); | ||
has_output = false; | ||
return Status::PortFull; | ||
} | ||
|
||
if (!input_finished) | ||
{ | ||
if (!has_input) | ||
{ | ||
if (input.isFinished()) | ||
{ | ||
input_finished = true; | ||
return Status::Ready; | ||
} | ||
input.setNeeded(); | ||
if (!input.hasData()) | ||
return Status::NeedData; | ||
|
||
input_chunk = input.pull(true); | ||
const auto & info = input_chunk.getChunkInfo(); | ||
if (!info) | ||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Chunk info was not set for chunk in ScalableMergingAggregatedTransform."); | ||
if (const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(info.get())) | ||
{ | ||
has_single_level |= agg_info->bucket_num == -1; | ||
has_two_level |= agg_info->bucket_num >= 0; | ||
if (agg_info->is_overflows) | ||
{ | ||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Not support overflow blocks."); | ||
} | ||
} | ||
else | ||
{ | ||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow chunk info type."); | ||
} | ||
|
||
has_input = true; | ||
} | ||
return Status::Ready; | ||
} | ||
else | ||
{ | ||
/// All aggregated data has been output. | ||
if (current_bucket_num >= max_bucket_number) | ||
{ | ||
output.finish(); | ||
return Status::Finished; | ||
} | ||
} | ||
return Status::Ready; | ||
} | ||
|
||
void ScalableMergingAggregatedTransform::work() | ||
{ | ||
if (has_input && isMemoryOverFlow()) | ||
{ | ||
swithMode(); | ||
} | ||
workImpl(); | ||
} | ||
|
||
void ScalableMergingAggregatedTransform::workImpl() | ||
{ | ||
if (input_finished) | ||
{ | ||
for (; current_bucket_num < max_bucket_number; ++current_bucket_num) | ||
{ | ||
auto bucket_data_variants = getBucketDataVariants(current_bucket_num); | ||
if (bucket_data_variants) | ||
{ | ||
Stopwatch watch; | ||
auto block = params->aggregator.convertToSingleBlock(*bucket_data_variants, true); | ||
releaseBucketDataVariants(current_bucket_num); | ||
output_chunk = blockToChunk(block); | ||
has_output = true; | ||
LOG_TRACE(logger, "load bucket data from memory. bucket: {}, rows: {}, bytes: {}, time: {}", current_bucket_num, block.rows(), ReadableSize(block.bytes()), watch.elapsedMilliseconds()); | ||
return; | ||
} | ||
Stopwatch watch; | ||
auto block = loadBucketDataFromDiskAndMerge(current_bucket_num); | ||
if (block) | ||
{ | ||
output_chunk = blockToChunk(block); | ||
has_output = true; | ||
LOG_INFO( | ||
logger, | ||
"load bucket data from disk and merge. bucket: {}, rows: {}, bytes: {}, time: {}", | ||
current_bucket_num, | ||
block.rows(), | ||
ReadableSize(block.bytes()), | ||
watch.elapsedMilliseconds()); | ||
return; | ||
} | ||
} | ||
} | ||
else if (has_input) | ||
{ | ||
auto add_block = [&](DB::Block & block, Int32 bucket_num, bool create_on_miss) | ||
{ | ||
if (!block) | ||
return; | ||
auto bucket_data_variants = getBucketDataVariants(bucket_num, create_on_miss); | ||
if (bucket_data_variants) | ||
{ | ||
params->aggregator.mergeOnBlock(block, *bucket_data_variants, no_more_keys); | ||
} | ||
else | ||
{ | ||
spillBucketDataToDisk(bucket_num, block); | ||
} | ||
}; | ||
|
||
if (has_single_level && (has_two_level || mode == OVERFLOW)) | ||
{ | ||
auto bucket_data_variants = getBucketDataVariants(-1, false); | ||
if (bucket_data_variants) | ||
{ | ||
auto blocks_list = params->aggregator.convertToBlocks(*bucket_data_variants, false, 1); | ||
if (blocks_list.size() > 1) | ||
{ | ||
/// two level hash table. | ||
for (auto & blk : blocks_list) | ||
add_block(blk, blk.info.bucket_num, true); | ||
} | ||
else if (blocks_list.size() == 1) | ||
{ | ||
/// Single level hash table | ||
auto blocks_vector = params->aggregator.convertBlockToTwoLevel(blocks_list.front()); | ||
for (auto & blk : blocks_vector) | ||
add_block(blk, blk.info.bucket_num, true); | ||
} | ||
releaseBucketDataVariants(-1); | ||
} | ||
has_single_level = false; | ||
} | ||
/// In case the OOM is caused by bucket -1, we first split the bucket -1 into 256 buckets. and then spill one bucket here. | ||
if (isMemoryOverFlow() && mode == OVERFLOW) | ||
{ | ||
spillOneBucket(); | ||
} | ||
|
||
/// If we have at least one two level block, transform all single level blocks into two level blocks. | ||
const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(input_chunk.getChunkInfo().get()); | ||
if (agg_info->bucket_num == -1 && (has_two_level || mode == OVERFLOW)) | ||
{ | ||
auto block = chunkToBlock(input_chunk); | ||
auto block_struct = block.dumpStructure(); | ||
auto blocks_vector = params->aggregator.convertBlockToTwoLevel(block); | ||
for (auto & blk : blocks_vector) | ||
{ | ||
add_block(blk, blk.info.bucket_num, mode != OVERFLOW); | ||
} | ||
has_single_level = false; | ||
} | ||
else | ||
{ | ||
auto block = chunkToBlock(input_chunk); | ||
add_block(block, agg_info->bucket_num, mode != OVERFLOW); | ||
} | ||
input_chunk = {}; | ||
has_input = false; | ||
} | ||
} | ||
DB::AggregatedDataVariantsPtr ScalableMergingAggregatedTransform::getBucketDataVariants(Int32 bucket, bool create_on_miss, size_t size_hint [[maybe_unused]]) | ||
{ | ||
UInt32 index = static_cast<UInt32>(1 + bucket); | ||
if (!buckets_data_variants[index] && create_on_miss) | ||
{ | ||
in_memory_buckets_num += 1; | ||
buckets_data_variants[index] = std::make_shared<DB::AggregatedDataVariants>(); | ||
} | ||
return buckets_data_variants[index]; | ||
} | ||
|
||
void ScalableMergingAggregatedTransform::releaseBucketDataVariants(Int32 bucket) | ||
{ | ||
UInt32 index = static_cast<UInt32>(1 + bucket); | ||
buckets_data_variants[index] = nullptr; | ||
in_memory_buckets_num -= 1; | ||
} | ||
|
||
DB::Block ScalableMergingAggregatedTransform::loadBucketDataFromDiskAndMerge(Int32 bucket) | ||
{ | ||
UInt32 index = static_cast<UInt32>(1 + bucket); | ||
if (!bucket_tmp_files[index]) | ||
return {}; | ||
bucket_tmp_files[index]->finishWriting(); | ||
auto data_variant = std::make_shared<DB::AggregatedDataVariants>(); | ||
bool has_data = false; | ||
while(true) | ||
{ | ||
auto block = bucket_tmp_files[index]->read(); | ||
if (!block) | ||
break; | ||
has_data = true; | ||
params->aggregator.mergeOnBlock(block, *data_variant, no_more_keys); | ||
} | ||
if (!has_data) | ||
return {}; | ||
return params->aggregator.convertToSingleBlock(*data_variant, true); | ||
} | ||
|
||
DB::Chunk ScalableMergingAggregatedTransform::blockToChunk(DB::Block & block) | ||
{ | ||
auto info = std::make_shared<DB::AggregatedChunkInfo>(); | ||
info->bucket_num = block.info.bucket_num; | ||
info->is_overflows = block.info.is_overflows; | ||
|
||
UInt64 num_rows = block.rows(); | ||
DB::Chunk chunk(block.getColumns(), num_rows); | ||
chunk.setChunkInfo(std::move(info)); | ||
return chunk; | ||
} | ||
|
||
DB::Block ScalableMergingAggregatedTransform::chunkToBlock(DB::Chunk & chunk) | ||
{ | ||
auto block = header.cloneWithColumns(chunk.detachColumns()); | ||
const auto & info = chunk.getChunkInfo(); | ||
if (const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(info.get())) | ||
{ | ||
block.info.bucket_num = agg_info->bucket_num; | ||
block.info.is_overflows = agg_info->is_overflows; | ||
} | ||
return block; | ||
} | ||
|
||
Int64 ScalableMergingAggregatedTransform::getMemoryUsage() | ||
{ | ||
Int64 current_memory_usage = 0; | ||
if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) | ||
if (auto * memory_tracker = memory_tracker_child->getParent()) | ||
current_memory_usage = memory_tracker->get(); | ||
return current_memory_usage; | ||
} | ||
|
||
void ScalableMergingAggregatedTransform::spillOneBucket() | ||
{ | ||
|
||
if (in_memory_buckets_num <= 1) | ||
{ | ||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot spill one bucket."); | ||
} | ||
for (size_t i = max_bucket_number; i > 0; --i) | ||
{ | ||
if (buckets_data_variants[i]) | ||
{ | ||
auto block = params->aggregator.convertToSingleBlock(*buckets_data_variants[i], false); | ||
auto write_bytes = spillBucketDataToDisk(i - 1, block); | ||
releaseBucketDataVariants(i - 1); | ||
break; | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.