From 17080e73cd15d6bc5889e3837ac2c5a28627edac Mon Sep 17 00:00:00 2001 From: LiuNeng <1398775315@qq.com> Date: Wed, 6 Nov 2024 13:59:34 +0800 Subject: [PATCH] [CH] Rename Mergetree part file name to avoid duplicated file name (#7769) What changes were proposed in this pull request? Rename Mergetree part file name to avoid duplicated file name and support prefetch mergetree data file How was this patch tested? unit tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) --- ...tenClickHouseMergeTreeWriteOnS3Suite.scala | 4 +-- cpp-ch/local-engine/Common/GlutenConfig.cpp | 6 +++++ cpp-ch/local-engine/Common/GlutenConfig.h | 9 +++++++ .../CompactObjectStorageDiskTransaction.cpp | 4 +-- .../CompactObjectStorageDiskTransaction.h | 3 +++ .../Storages/Cache/CacheManager.cpp | 7 ++++- .../Storages/Cache/CacheManager.h | 2 +- .../MergeTree/SparkStorageMergeTree.cpp | 26 +++++++++++++------ .../MergeTree/SparkStorageMergeTree.h | 4 ++- 9 files changed, 50 insertions(+), 15 deletions(-) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index af67b01f49f0..331009d12c96 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -196,9 +196,9 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val objectName = obj.get().objectName() if (objectName.contains("metadata.gluten")) { metadataGlutenExist = true - } else if (objectName.contains("meta.bin")) { + } else if (objectName.contains("part_meta.gluten")) { metadataBinExist = true - } else if (objectName.contains("data.bin")) { + } else if (objectName.contains("part_data.gluten")) { dataBinExist = true } else if (objectName.contains("_commits")) { // Spark 35 has _commits directory diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp b/cpp-ch/local-engine/Common/GlutenConfig.cpp index 93d074ecc21c..eb6c2dcab622 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.cpp +++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp @@ -137,4 +137,10 @@ GlutenJobSchedulerConfig GlutenJobSchedulerConfig::loadFromContext(const DB::Con config.job_scheduler_max_threads = context->getConfigRef().getUInt64(JOB_SCHEDULER_MAX_THREADS, 10); return config; } +MergeTreeCacheConfig MergeTreeCacheConfig::loadFromContext(const DB::ContextPtr & context) +{ + MergeTreeCacheConfig config; + config.enable_data_prefetch = context->getConfigRef().getBool(ENABLE_DATA_PREFETCH, config.enable_data_prefetch); + return config; +} } \ No newline at end of file diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index 82402eaafa47..11220afb4878 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -142,4 +142,13 @@ struct GlutenJobSchedulerConfig static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr & context); }; + +struct MergeTreeCacheConfig +{ + inline static const String ENABLE_DATA_PREFETCH = "enable_data_prefetch"; + + bool enable_data_prefetch = true; + + static MergeTreeCacheConfig loadFromContext(const DB::ContextPtr & context); +}; } diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp index 82afeb85e2b8..5b1fe63a09c5 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp @@ -31,8 +31,8 @@ bool isMetaDataFile(const std::string & path) void CompactObjectStorageDiskTransaction::commit() { auto metadata_tx = disk.getMetadataStorage()->createTransaction(); - std::filesystem::path data_path = std::filesystem::path(prefix_path) / "data.bin"; - std::filesystem::path meta_path = std::filesystem::path(prefix_path) / "meta.bin"; + std::filesystem::path data_path = std::filesystem::path(prefix_path) / PART_DATA_FILE_NAME; + std::filesystem::path meta_path = std::filesystem::path(prefix_path) / PART_META_FILE_NAME; auto object_storage = disk.getObjectStorage(); auto data_key = object_storage->generateObjectKeyForPath(data_path, std::nullopt); diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h index e15c362f304a..becb5371aad2 100644 --- a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h +++ b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h @@ -34,6 +34,9 @@ namespace local_engine class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction { public: + static inline const String PART_DATA_FILE_NAME = "part_data.gluten"; + static inline const String PART_META_FILE_NAME = "part_meta.gluten"; + explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_) : disk(disk_), tmp_data(tmp_) { diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index e2ba48e9d272..3218db2741d4 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -88,7 +88,9 @@ Task CacheManager::cachePart( job_context.table.parts.clear(); job_context.table.parts.push_back(part); job_context.table.snapshot_id = ""; - Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache]() + MergeTreeCacheConfig config = MergeTreeCacheConfig::loadFromContext(context); + Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache, + prefetch_data = config.enable_data_prefetch]() { try { @@ -106,6 +108,9 @@ Task CacheManager::cachePart( job_detail.table.parts.front().name); return; } + // prefetch part data + if (prefetch_data) + storage->prefetchPartDataFile({job_detail.table.parts.front().name}); auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); NamesAndTypesList names_and_types_list; diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index 8fd26d249abc..c44026ce0bbc 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -29,7 +29,7 @@ struct MergeTreePart; struct MergeTreeTableInstance; /*** - * Manage the cache of the MergeTree, mainly including meta.bin, data.bin, metadata.gluten + * Manage the cache of the MergeTree, mainly including part_data.gluten, part_meta.gluten, metadata.gluten */ class CacheManager { diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index 45be9dcf7442..15da00fbee74 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -16,6 +16,7 @@ */ #include "SparkStorageMergeTree.h" +#include #include #include #include @@ -159,27 +160,36 @@ SparkStorageMergeTree::SparkStorageMergeTree( std::atomic SparkStorageMergeTree::part_num; -void SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set parts) const +void SparkStorageMergeTree::prefetchPartDataFile(const std::unordered_set& parts) const +{ + prefetchPartFiles(parts, CompactObjectStorageDiskTransaction::PART_DATA_FILE_NAME); +} + +void SparkStorageMergeTree::prefetchPartFiles(const std::unordered_set& parts, String file_name) const { auto disk = getDisks().front(); if (!disk->isRemote()) return; - std::vector meta_paths; - std::ranges::for_each(parts, [&](const String & name) { meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); }); + std::vector data_paths; + std::ranges::for_each(parts, [&](const String & name) { data_paths.emplace_back(fs::path(relative_data_path) / name / file_name); }); auto read_settings = ReadSettings{}; - // read_settings.enable_filesystem_cache = false; read_settings.remote_fs_method = RemoteFSReadMethod::read; - for (const auto & meta_path : meta_paths) + for (const auto & data_path : data_paths) { - if (!disk->existsDirectory(meta_path)) + if (!disk->existsFile(data_path)) continue; - - auto in = disk->readFile(meta_path, read_settings); + LOG_DEBUG(log, "Prefetching part file {}", data_path); + auto in = disk->readFile(data_path, read_settings); String ignore_data; readStringUntilEOF(ignore_data, *in); } } +void SparkStorageMergeTree::prefetchMetaDataFile(const std::unordered_set& parts) const +{ + prefetchPartFiles(parts, CompactObjectStorageDiskTransaction::PART_META_FILE_NAME); +} + std::vector SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set & parts) { Stopwatch watch; diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h index cec1597eab08..237cf6919208 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h @@ -71,6 +71,7 @@ class SparkStorageMergeTree : public MergeTreeData std::map getUnfinishedMutationCommands() const override; std::vector loadDataPartsWithNames(const std::unordered_set & parts); void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach); + void prefetchPartDataFile(const std::unordered_set& parts) const; MergeTreeDataSelectExecutor reader; MergeTreeDataMergerMutator merger_mutator; @@ -91,7 +92,8 @@ class SparkStorageMergeTree : public MergeTreeData static std::atomic part_num; SimpleIncrement increment; - void prefetchMetaDataFile(std::unordered_set parts) const; + void prefetchPartFiles(const std::unordered_set& parts, String file_name) const; + void prefetchMetaDataFile(const std::unordered_set& parts) const; void startBackgroundMovesIfNeeded() override; std::unique_ptr getDefaultSettings() const override; LoadPartResult loadDataPart(