Skip to content

Commit

Permalink
[CH] Rename Mergetree part file name to avoid duplicated file name (a…
Browse files Browse the repository at this point in the history
…pache#7769)

What changes were proposed in this pull request?
Rename Mergetree part file name to avoid duplicated file name and support prefetch mergetree data file

How was this patch tested?
unit tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
liuneng1994 authored Nov 6, 2024
1 parent 3099799 commit 17080e7
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val objectName = obj.get().objectName()
if (objectName.contains("metadata.gluten")) {
metadataGlutenExist = true
} else if (objectName.contains("meta.bin")) {
} else if (objectName.contains("part_meta.gluten")) {
metadataBinExist = true
} else if (objectName.contains("data.bin")) {
} else if (objectName.contains("part_data.gluten")) {
dataBinExist = true
} else if (objectName.contains("_commits")) {
// Spark 35 has _commits directory
Expand Down
6 changes: 6 additions & 0 deletions cpp-ch/local-engine/Common/GlutenConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,10 @@ GlutenJobSchedulerConfig GlutenJobSchedulerConfig::loadFromContext(const DB::Con
config.job_scheduler_max_threads = context->getConfigRef().getUInt64(JOB_SCHEDULER_MAX_THREADS, 10);
return config;
}
MergeTreeCacheConfig MergeTreeCacheConfig::loadFromContext(const DB::ContextPtr & context)
{
MergeTreeCacheConfig config;
config.enable_data_prefetch = context->getConfigRef().getBool(ENABLE_DATA_PREFETCH, config.enable_data_prefetch);
return config;
}
}
9 changes: 9 additions & 0 deletions cpp-ch/local-engine/Common/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,13 @@ struct GlutenJobSchedulerConfig

static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr & context);
};

struct MergeTreeCacheConfig
{
inline static const String ENABLE_DATA_PREFETCH = "enable_data_prefetch";

bool enable_data_prefetch = true;

static MergeTreeCacheConfig loadFromContext(const DB::ContextPtr & context);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ bool isMetaDataFile(const std::string & path)
void CompactObjectStorageDiskTransaction::commit()
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
std::filesystem::path data_path = std::filesystem::path(prefix_path) / "data.bin";
std::filesystem::path meta_path = std::filesystem::path(prefix_path) / "meta.bin";
std::filesystem::path data_path = std::filesystem::path(prefix_path) / PART_DATA_FILE_NAME;
std::filesystem::path meta_path = std::filesystem::path(prefix_path) / PART_META_FILE_NAME;

auto object_storage = disk.getObjectStorage();
auto data_key = object_storage->generateObjectKeyForPath(data_path, std::nullopt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ namespace local_engine

class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
public:
static inline const String PART_DATA_FILE_NAME = "part_data.gluten";
static inline const String PART_META_FILE_NAME = "part_meta.gluten";

explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const DB::DiskPtr tmp_)
: disk(disk_), tmp_data(tmp_)
{
Expand Down
7 changes: 6 additions & 1 deletion cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ Task CacheManager::cachePart(
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache]()
MergeTreeCacheConfig config = MergeTreeCacheConfig::loadFromContext(context);
Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache,
prefetch_data = config.enable_data_prefetch]()
{
try
{
Expand All @@ -106,6 +108,9 @@ Task CacheManager::cachePart(
job_detail.table.parts.front().name);
return;
}
// prefetch part data
if (prefetch_data)
storage->prefetchPartDataFile({job_detail.table.parts.front().name});

auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Storages/Cache/CacheManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct MergeTreePart;
struct MergeTreeTableInstance;

/***
* Manage the cache of the MergeTree, mainly including meta.bin, data.bin, metadata.gluten
* Manage the cache of the MergeTree, mainly including part_data.gluten, part_meta.gluten, metadata.gluten
*/
class CacheManager
{
Expand Down
26 changes: 18 additions & 8 deletions cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#include "SparkStorageMergeTree.h"

#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
Expand Down Expand Up @@ -159,27 +160,36 @@ SparkStorageMergeTree::SparkStorageMergeTree(

std::atomic<int> SparkStorageMergeTree::part_num;

void SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string> parts) const
void SparkStorageMergeTree::prefetchPartDataFile(const std::unordered_set<std::string>& parts) const
{
prefetchPartFiles(parts, CompactObjectStorageDiskTransaction::PART_DATA_FILE_NAME);
}

void SparkStorageMergeTree::prefetchPartFiles(const std::unordered_set<std::string>& parts, String file_name) const
{
auto disk = getDisks().front();
if (!disk->isRemote())
return;
std::vector<String> meta_paths;
std::ranges::for_each(parts, [&](const String & name) { meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
std::vector<String> data_paths;
std::ranges::for_each(parts, [&](const String & name) { data_paths.emplace_back(fs::path(relative_data_path) / name / file_name); });
auto read_settings = ReadSettings{};
// read_settings.enable_filesystem_cache = false;
read_settings.remote_fs_method = RemoteFSReadMethod::read;
for (const auto & meta_path : meta_paths)
for (const auto & data_path : data_paths)
{
if (!disk->existsDirectory(meta_path))
if (!disk->existsFile(data_path))
continue;

auto in = disk->readFile(meta_path, read_settings);
LOG_DEBUG(log, "Prefetching part file {}", data_path);
auto in = disk->readFile(data_path, read_settings);
String ignore_data;
readStringUntilEOF(ignore_data, *in);
}
}

void SparkStorageMergeTree::prefetchMetaDataFile(const std::unordered_set<std::string>& parts) const
{
prefetchPartFiles(parts, CompactObjectStorageDiskTransaction::PART_META_FILE_NAME);
}

std::vector<MergeTreeDataPartPtr> SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set<std::string> & parts)
{
Stopwatch watch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SparkStorageMergeTree : public MergeTreeData
std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;
std::vector<MergeTreeDataPartPtr> loadDataPartsWithNames(const std::unordered_set<std::string> & parts);
void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach);
void prefetchPartDataFile(const std::unordered_set<std::string>& parts) const;

MergeTreeDataSelectExecutor reader;
MergeTreeDataMergerMutator merger_mutator;
Expand All @@ -91,7 +92,8 @@ class SparkStorageMergeTree : public MergeTreeData
static std::atomic<int> part_num;
SimpleIncrement increment;

void prefetchMetaDataFile(std::unordered_set<std::string> parts) const;
void prefetchPartFiles(const std::unordered_set<std::string>& parts, String file_name) const;
void prefetchMetaDataFile(const std::unordered_set<std::string>& parts) const;
void startBackgroundMovesIfNeeded() override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
LoadPartResult loadDataPart(
Expand Down

0 comments on commit 17080e7

Please sign in to comment.