From 05b05bdc1bc0bf9428d05e9d802872442f171115 Mon Sep 17 00:00:00 2001 From: AlexYue Date: Fri, 19 Jul 2024 22:39:01 +0800 Subject: [PATCH] [feature](Recycler) Parallelize s3 delete operations and recycle_tablet (#37630) Previously the procedure of recycler instance is single-threaded, which is not full sufficiently parallel. And there exists many network IO operation. So this pr tries to spilt recycle tasks into different stage which can be parallel. And make the delete operations parallel. --- cloud/src/common/config.h | 2 + cloud/src/recycler/azure_obj_client.cpp | 6 +- cloud/src/recycler/azure_obj_client.h | 5 +- cloud/src/recycler/obj_storage_client.cpp | 30 +++- cloud/src/recycler/obj_storage_client.h | 11 +- cloud/src/recycler/recycler.cpp | 180 +++++++++++++++++----- cloud/src/recycler/recycler.h | 23 ++- cloud/src/recycler/recycler_service.cpp | 9 +- cloud/src/recycler/s3_accessor.cpp | 14 +- cloud/src/recycler/s3_accessor.h | 1 + cloud/src/recycler/s3_obj_client.cpp | 6 +- cloud/src/recycler/s3_obj_client.h | 5 +- cloud/src/recycler/sync_executor.h | 125 +++++++++++++++ cloud/src/recycler/util.h | 1 + cloud/test/CMakeLists.txt | 8 +- cloud/test/recycler_test.cpp | 55 ++++--- cloud/test/util_test.cpp | 150 +++++++++++++++++- 17 files changed, 545 insertions(+), 86 deletions(-) create mode 100644 cloud/src/recycler/sync_executor.h diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index c5786bedff7c04..cb4bee9648e254 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -62,6 +62,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list CONF_Strings(recycle_blacklist, ""); // Comma seprated list CONF_mInt32(instance_recycler_worker_pool_size, "1"); CONF_Bool(enable_checker, "false"); +// The parallelism for parallel recycle operation +CONF_Int32(recycle_pool_parallelism, "10"); // Currently only used for recycler test CONF_Bool(enable_inverted_check, "false"); // interval for scanning instances to do checks and inspections diff --git a/cloud/src/recycler/azure_obj_client.cpp b/cloud/src/recycler/azure_obj_client.cpp index 1ad5d5dca7dfac..39f263b1938014 100644 --- a/cloud/src/recycler/azure_obj_client.cpp +++ b/cloud/src/recycler/azure_obj_client.cpp @@ -209,7 +209,8 @@ std::unique_ptr AzureObjClient::list_objects(ObjectStoragePa // You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id // > Each batch request supports a maximum of 256 subrequests. ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, - std::vector keys) { + std::vector keys, + ObjClientOptions option) { if (keys.empty()) { return {0}; } @@ -275,8 +276,9 @@ ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) { } ObjectStorageResponse AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path, + ObjClientOptions option, int64_t expiration_time) { - return delete_objects_recursively_(path, expiration_time, BlobBatchMaxOperations); + return delete_objects_recursively_(path, option, expiration_time, BlobBatchMaxOperations); } ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/azure_obj_client.h b/cloud/src/recycler/azure_obj_client.h index 49b54ca8c6db91..96212d720a34ce 100644 --- a/cloud/src/recycler/azure_obj_client.h +++ b/cloud/src/recycler/azure_obj_client.h @@ -38,12 +38,13 @@ class AzureObjClient final : public ObjStorageClient { std::unique_ptr list_objects(ObjectStoragePathRef path) override; - ObjectStorageResponse delete_objects(const std::string& bucket, - std::vector keys) override; + ObjectStorageResponse delete_objects(const std::string& bucket, std::vector keys, + ObjClientOptions option) override; ObjectStorageResponse delete_object(ObjectStoragePathRef path) override; ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, + ObjClientOptions option, int64_t expiration_time = 0) override; ObjectStorageResponse get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/obj_storage_client.cpp b/cloud/src/recycler/obj_storage_client.cpp index 1dd6435214d329..855fa110a4ce92 100644 --- a/cloud/src/recycler/obj_storage_client.cpp +++ b/cloud/src/recycler/obj_storage_client.cpp @@ -18,10 +18,13 @@ #include "recycler/obj_storage_client.h" #include "cpp/sync_point.h" +#include "recycler/sync_executor.h" +#include "recycler/util.h" namespace doris::cloud { ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path, + const ObjClientOptions& option, int64_t expired_time, size_t batch_size) { TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size); @@ -29,6 +32,10 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag ObjectStorageResponse ret; std::vector keys; + SyncExecutor concurrent_delete_executor( + option.executor, + fmt::format("delete objects under bucket {}, path {}", path.bucket, path.key), + [](const int& ret) { return ret != 0; }); for (auto obj = list_iter->next(); obj.has_value(); obj = list_iter->next()) { if (expired_time > 0 && obj->mtime_s > expired_time) { @@ -39,20 +46,31 @@ ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStorag if (keys.size() < batch_size) { continue; } - - ret = delete_objects(path.bucket, std::move(keys)); - if (ret.ret != 0) { - return ret; - } + concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable { + return delete_objects(path.bucket, std::move(k), option).ret; + }); } if (!list_iter->is_valid()) { + bool finished; + concurrent_delete_executor.when_all(&finished); return {-1}; } if (!keys.empty()) { - return delete_objects(path.bucket, std::move(keys)); + concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable { + return delete_objects(path.bucket, std::move(k), option).ret; + }); } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { + ret = -1; + } + } + + ret = finished ? ret : -1; return ret; } diff --git a/cloud/src/recycler/obj_storage_client.h b/cloud/src/recycler/obj_storage_client.h index 955a8c174e5b15..fc0211820d1a50 100644 --- a/cloud/src/recycler/obj_storage_client.h +++ b/cloud/src/recycler/obj_storage_client.h @@ -51,6 +51,12 @@ class ObjectListIterator { virtual std::optional next() = 0; }; +class SimpleThreadPool; +struct ObjClientOptions { + bool prefetch {true}; + std::shared_ptr executor; +}; + class ObjStorageClient { public: ObjStorageClient() = default; @@ -71,7 +77,8 @@ class ObjStorageClient { // According to the bucket and prefix specified by the user, it performs batch deletion based on the object names in the object array. virtual ObjectStorageResponse delete_objects(const std::string& bucket, - std::vector keys) = 0; + std::vector keys, + ObjClientOptions option) = 0; // Delete the file named key in the object storage bucket. virtual ObjectStorageResponse delete_object(ObjectStoragePathRef path) = 0; @@ -79,6 +86,7 @@ class ObjStorageClient { // According to the prefix, recursively delete all objects under the prefix. // If `expiration_time` > 0, only delete objects with mtime earlier than `expiration_time`. virtual ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, + ObjClientOptions option, int64_t expiration_time = 0) = 0; // Get the objects' expiration time on the bucket @@ -91,6 +99,7 @@ class ObjStorageClient { protected: ObjectStorageResponse delete_objects_recursively_(ObjectStoragePathRef path, + const ObjClientOptions& option, int64_t expiration_time, size_t batch_size); }; diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index b4c0ae84fc3ef4..f2d21e0a1b9ad7 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -47,6 +47,7 @@ #include "cpp/sync_point.h" #include "meta-service/keys.h" #include "recycler/recycler_service.h" +#include "recycler/sync_executor.h" #include "recycler/util.h" namespace doris::cloud { @@ -166,6 +167,16 @@ static inline void check_recycle_task(const std::string& instance_id, const std: Recycler::Recycler(std::shared_ptr txn_kv) : txn_kv_(std::move(txn_kv)) { ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port); + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); + recycle_tablet_pool->start(); + auto group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + group_recycle_function_pool->start(); + _thread_pool_group = + RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), + std::move(group_recycle_function_pool)); } Recycler::~Recycler() { @@ -225,7 +236,8 @@ void Recycler::recycle_callback() { // skip instance in recycling if (recycling_instance_map_.count(instance_id)) continue; } - auto instance_recycler = std::make_shared(txn_kv_, instance); + auto instance_recycler = + std::make_shared(txn_kv_, instance, _thread_pool_group); if (instance_recycler->init() != 0) { LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id; continue; @@ -438,11 +450,13 @@ class InstanceRecycler::InvertedIndexIdCache { std::unordered_set schemas_without_inverted_index_; }; -InstanceRecycler::InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance) +InstanceRecycler::InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, + RecyclerThreadPoolGroup thread_pool_group) : txn_kv_(std::move(txn_kv)), instance_id_(instance.instance_id()), instance_info_(instance), - inverted_index_id_cache_(std::make_unique(instance_id_, txn_kv_)) {} + inverted_index_id_cache_(std::make_unique(instance_id_, txn_kv_)), + _thread_pool_group(std::move(thread_pool_group)) {} InstanceRecycler::~InstanceRecycler() = default; @@ -539,22 +553,51 @@ int InstanceRecycler::init() { return init_storage_vault_accessors(); } +template +auto task_wrapper(Func... funcs) -> std::function { + return [funcs...]() { + return [](std::initializer_list numbers) { + int i = 0; + for (int num : numbers) { + if (num != 0) { + i = num; + } + } + return i; + }({funcs()...}); + }; +} + int InstanceRecycler::do_recycle() { TEST_SYNC_POINT("InstanceRecycler.do_recycle"); if (instance_info_.status() == InstanceInfoPB::DELETED) { return recycle_deleted_instance(); } else if (instance_info_.status() == InstanceInfoPB::NORMAL) { - int ret = recycle_indexes(); - if (recycle_partitions() != 0) ret = -1; - if (recycle_tmp_rowsets() != 0) ret = -1; - if (recycle_rowsets() != 0) ret = -1; - if (abort_timeout_txn() != 0) ret = -1; - if (recycle_expired_txn_label() != 0) ret = -1; - if (recycle_copy_jobs() != 0) ret = -1; - if (recycle_stage() != 0) ret = -1; - if (recycle_expired_stage_objects() != 0) ret = -1; - if (recycle_versions() != 0) ret = -1; - return ret; + SyncExecutor sync_executor(_thread_pool_group.group_recycle_function_pool, + fmt::format("instance id {}", instance_id_), + [](int r) { return r != 0; }); + sync_executor + .add(task_wrapper( + [this]() -> int { return InstanceRecycler::recycle_indexes(); }, + [this]() -> int { return InstanceRecycler::recycle_partitions(); }, + [this]() -> int { return InstanceRecycler::recycle_tmp_rowsets(); }, + [this]() -> int { return InstanceRecycler::recycle_rowsets(); })) + .add(task_wrapper( + [this]() { return InstanceRecycler::abort_timeout_txn(); }, + [this]() { return InstanceRecycler::recycle_expired_txn_label(); })) + .add(task_wrapper([this]() { return InstanceRecycler::recycle_copy_jobs(); })) + .add(task_wrapper([this]() { return InstanceRecycler::recycle_stage(); })) + .add(task_wrapper( + [this]() { return InstanceRecycler::recycle_expired_stage_objects(); })) + .add(task_wrapper([this]() { return InstanceRecycler::recycle_versions(); })); + bool finished = true; + std::vector rets = sync_executor.when_all(&finished); + for (int ret : rets) { + if (ret != 0) { + return ret; + } + } + return finished ? 0 : -1; } else { LOG(WARNING) << "invalid instance status: " << instance_info_.status() << " instance_id=" << instance_id_; @@ -1009,7 +1052,7 @@ int InstanceRecycler::recycle_versions() { int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_t partition_id, bool is_empty_tablet) { int num_scanned = 0; - int num_recycled = 0; + std::atomic_int num_recycled = 0; std::string tablet_key_begin, tablet_key_end; std::string stats_key_begin, stats_key_end; @@ -1051,12 +1094,20 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ .tag("num_recycled", num_recycled); }); + // The first string_view represents the tablet key which has been recycled + // The second bool represents whether the following fdb's tablet key deletion could be done using range move or not + using TabletKeyPair = std::pair; + SyncExecutor sync_executor( + _thread_pool_group.recycle_tablet_pool, + fmt::format("recycle tablets, tablet id {}, index id {}, partition id {}", table_id, + index_id, partition_id), + [](const TabletKeyPair& k) { return k.first.empty(); }); + // Elements in `tablet_keys` has the same lifetime as `it` in `scan_and_recycle` - std::vector tablet_keys; std::vector tablet_idx_keys; std::vector init_rs_keys; - bool use_range_remove = true; auto recycle_func = [&, is_empty_tablet, this](std::string_view k, std::string_view v) -> int { + bool use_range_remove = true; ++num_scanned; doris::TabletMetaCloudPB tablet_meta_pb; if (!tablet_meta_pb.ParseFromArray(v.data(), v.size())) { @@ -1067,13 +1118,19 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ int64_t tablet_id = tablet_meta_pb.tablet_id(); tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id_, tablet_id})); if (!is_empty_tablet) { - if (recycle_tablet(tablet_id) != 0) { - LOG_WARNING("failed to recycle tablet") - .tag("instance_id", instance_id_) - .tag("tablet_id", tablet_id); - use_range_remove = false; - return -1; - } + sync_executor.add([this, &num_recycled, tid = tablet_id, range_move = use_range_remove, + k]() mutable -> TabletKeyPair { + if (recycle_tablet(tid) != 0) { + LOG_WARNING("failed to recycle tablet") + .tag("instance_id", instance_id_) + .tag("tablet_id", tid); + range_move = false; + return {std::string_view(), range_move}; + } + ++num_recycled; + LOG_INFO("k is {}, is empty {}", k, k.empty()); + return {k, range_move}; + }); } else { // Empty tablet only has a [0-1] init rowset init_rs_keys.push_back(meta_rowset_key({instance_id_, tablet_id, 1})); @@ -1097,19 +1154,38 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ } return true; }()); + sync_executor.add([k]() mutable -> TabletKeyPair { + LOG_INFO("k is {}, is empty {}", k, k.empty()); + return {k, true}; + }); + ++num_recycled; } - ++num_recycled; - tablet_keys.push_back(k); return 0; }; + // TODO(AlexYue): Add one ut to cover use_range_remove = false auto loop_done = [&, this]() -> int { + bool finished = true; + auto tablet_keys = sync_executor.when_all(&finished); + if (!finished) { + LOG_WARNING("failed to recycle tablet").tag("instance_id", instance_id_); + return -1; + } + sync_executor.reset(); if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0; + // sort the vector using key's order + std::sort(tablet_keys.begin(), tablet_keys.end(), + [](const auto& prev, const auto& last) { return prev.first < last.first; }); + bool use_range_remove = true; + for (auto& [_, remove] : tablet_keys) { + if (!remove) { + use_range_remove = remove; + break; + } + } std::unique_ptr> defer((int*)0x01, [&](int*) { - tablet_keys.clear(); tablet_idx_keys.clear(); init_rs_keys.clear(); - use_range_remove = true; }); std::unique_ptr txn; if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { @@ -1119,10 +1195,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ std::string tablet_key_end; if (!tablet_keys.empty()) { if (use_range_remove) { - tablet_key_end = std::string(tablet_keys.back()) + '\x00'; - txn->remove(tablet_keys.front(), tablet_key_end); + tablet_key_end = std::string(tablet_keys.back().first) + '\x00'; + txn->remove(tablet_keys.front().first, tablet_key_end); } else { - for (auto k : tablet_keys) { + for (auto& [k, _] : tablet_keys) { txn->remove(k); } } @@ -1284,13 +1360,26 @@ int InstanceRecycler::delete_rowset_data(const std::vector concurrent_delete_executor(_thread_pool_group.s3_producer_pool, + "delete_rowset_data", + [](const int& ret) { return ret != 0; }); for (auto& [resource_id, file_paths] : resource_file_paths) { - auto& accessor = accessor_map_[resource_id]; - DCHECK(accessor); - if (accessor->delete_files(file_paths) != 0) { + concurrent_delete_executor.add([&, rid = &resource_id, paths = &file_paths]() -> int { + auto& accessor = accessor_map_[*rid]; + DCHECK(accessor); + return accessor->delete_files(*paths); + }); + } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { ret = -1; + break; } } + ret = finished ? ret : -1; return ret; } @@ -1349,15 +1438,32 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { ret = -1; } + SyncExecutor concurrent_delete_executor( + _thread_pool_group.s3_producer_pool, + fmt::format("delete tablet {} s3 rowset", tablet_id), + [](const int& ret) { return ret != 0; }); + // delete all rowset data in this tablet for (auto& [_, accessor] : accessor_map_) { - if (accessor->delete_directory(tablet_path_prefix(tablet_id)) != 0) { - LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id - << " s3_path=" << accessor->uri(); + concurrent_delete_executor.add([&, accessor_ptr = &accessor]() { + if ((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) { + LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id + << " s3_path=" << accessor->uri(); + return -1; + } + return 0; + }); + } + bool finished = true; + std::vector rets = concurrent_delete_executor.when_all(&finished); + for (int r : rets) { + if (r != 0) { ret = -1; } } + ret = finished ? ret : -1; + if (ret == 0) { // All object files under tablet have been deleted std::lock_guard lock(recycled_tablets_mtx_); diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 130d860849d6d7..9f6a238877bde7 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -40,6 +40,24 @@ class TxnKv; class InstanceRecycler; class StorageVaultAccessor; class Checker; +class SimpleThreadPool; +struct RecyclerThreadPoolGroup { + RecyclerThreadPoolGroup() = default; + RecyclerThreadPoolGroup(std::shared_ptr s3_producer_pool, + std::shared_ptr recycle_tablet_pool, + std::shared_ptr group_recycle_function_pool) + : s3_producer_pool(std::move(s3_producer_pool)), + recycle_tablet_pool(std::move(recycle_tablet_pool)), + group_recycle_function_pool(std::move(group_recycle_function_pool)) {} + ~RecyclerThreadPoolGroup() = default; + RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default; + RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default; + RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default; + RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default; + std::shared_ptr s3_producer_pool; + std::shared_ptr recycle_tablet_pool; + std::shared_ptr group_recycle_function_pool; +}; class Recycler { public: @@ -83,11 +101,13 @@ class Recycler { WhiteBlackList instance_filter_; std::unique_ptr checker_; + RecyclerThreadPoolGroup _thread_pool_group; }; class InstanceRecycler { public: - explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance); + explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, + RecyclerThreadPoolGroup thread_pool_group); ~InstanceRecycler(); // returns 0 for success otherwise error @@ -217,6 +237,7 @@ class InstanceRecycler { std::mutex recycle_tasks_mutex; // > std::map running_recycle_tasks; + RecyclerThreadPoolGroup _thread_pool_group; }; } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index f0b8959a5176cb..52c510fb2e7da0 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -151,7 +151,8 @@ void RecyclerServiceImpl::check_instance(const std::string& instance_id, MetaSer } void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& instance_id, - MetaServiceCode& code, std::string& msg) { + MetaServiceCode& code, std::string& msg, + RecyclerThreadPoolGroup thread_pool_group) { std::unique_ptr txn; TxnErrorCode err = txn_kv->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -188,13 +189,13 @@ void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& return; } } - auto recycler = std::make_unique(txn_kv, instance); + + auto recycler = std::make_unique(txn_kv, instance, thread_pool_group); if (recycler->init() != 0) { LOG(WARNING) << "failed to init InstanceRecycler recycle_copy_jobs on instance " << instance_id; return; } - std::thread worker([recycler = std::move(recycler), instance_id] { LOG(INFO) << "manually trigger recycle_copy_jobs on instance " << instance_id; recycler->recycle_copy_jobs(); @@ -332,7 +333,7 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, status_code = 400; return; } - recycle_copy_jobs(txn_kv_, *instance_id, code, msg); + recycle_copy_jobs(txn_kv_, *instance_id, code, msg, recycler_->_thread_pool_group); response_body = msg; return; } diff --git a/cloud/src/recycler/s3_accessor.cpp b/cloud/src/recycler/s3_accessor.cpp index 54869a14b52d0f..e05ec7c72eb947 100644 --- a/cloud/src/recycler/s3_accessor.cpp +++ b/cloud/src/recycler/s3_accessor.cpp @@ -34,6 +34,7 @@ #include "common/config.h" #include "common/encryption_util.h" #include "common/logging.h" +#include "common/simple_thread_pool.h" #include "common/string_util.h" #include "common/util.h" #include "cpp/obj_retry_strategy.h" @@ -200,7 +201,15 @@ int S3Accessor::create(S3Conf conf, std::shared_ptr* accessor) { return (*accessor)->init(); } +static std::shared_ptr worker_pool; + int S3Accessor::init() { + static std::once_flag log_annotated_tags_key_once; + std::call_once(log_annotated_tags_key_once, [&]() { + LOG_INFO("start s3 accessor parallel worker pool"); + worker_pool = std::make_shared(config::recycle_pool_parallelism); + worker_pool->start(); + }); switch (conf_.provider) { case S3Conf::AZURE: { Azure::Storage::Blobs::BlobClientOptions options; @@ -252,7 +261,7 @@ int S3Accessor::delete_prefix_impl(const std::string& path_prefix, int64_t expir LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); return obj_client_ ->delete_objects_recursively({.bucket = conf_.bucket, .key = get_key(path_prefix)}, - expiration_time) + {.executor = worker_pool}, expiration_time) .ret; } @@ -294,7 +303,8 @@ int S3Accessor::delete_files(const std::vector& paths) { keys.emplace_back(get_key(path)); } - return obj_client_->delete_objects(conf_.bucket, std::move(keys)).ret; + return obj_client_->delete_objects(conf_.bucket, std::move(keys), {.executor = worker_pool}) + .ret; } int S3Accessor::delete_file(const std::string& path) { diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h index 19dc08fe83e47c..4ec9716f90de62 100644 --- a/cloud/src/recycler/s3_accessor.h +++ b/cloud/src/recycler/s3_accessor.h @@ -34,6 +34,7 @@ class S3RateLimiterHolder; enum class S3RateLimitType; namespace cloud { class ObjectStoreInfoPB; +class SimpleThreadPool; struct AccessorRateLimiter { public: diff --git a/cloud/src/recycler/s3_obj_client.cpp b/cloud/src/recycler/s3_obj_client.cpp index c44473bb9eb2eb..d81ed08b619003 100644 --- a/cloud/src/recycler/s3_obj_client.cpp +++ b/cloud/src/recycler/s3_obj_client.cpp @@ -192,7 +192,8 @@ std::unique_ptr S3ObjClient::list_objects(ObjectStoragePathR } ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket, - std::vector keys) { + std::vector keys, + ObjClientOptions option) { if (keys.empty()) { return {0}; } @@ -281,8 +282,9 @@ ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) { } ObjectStorageResponse S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path, + ObjClientOptions option, int64_t expiration_time) { - return delete_objects_recursively_(path, expiration_time, MaxDeleteBatch); + return delete_objects_recursively_(path, option, expiration_time, MaxDeleteBatch); } ObjectStorageResponse S3ObjClient::get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/s3_obj_client.h b/cloud/src/recycler/s3_obj_client.h index 7804d081816256..c486fdc18dcde5 100644 --- a/cloud/src/recycler/s3_obj_client.h +++ b/cloud/src/recycler/s3_obj_client.h @@ -39,12 +39,13 @@ class S3ObjClient final : public ObjStorageClient { std::unique_ptr list_objects(ObjectStoragePathRef path) override; - ObjectStorageResponse delete_objects(const std::string& bucket, - std::vector keys) override; + ObjectStorageResponse delete_objects(const std::string& bucket, std::vector keys, + ObjClientOptions option) override; ObjectStorageResponse delete_object(ObjectStoragePathRef path) override; ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, + ObjClientOptions option, int64_t expiration_time = 0) override; ObjectStorageResponse get_life_cycle(const std::string& bucket, diff --git a/cloud/src/recycler/sync_executor.h b/cloud/src/recycler/sync_executor.h new file mode 100644 index 00000000000000..d7009a99ed436c --- /dev/null +++ b/cloud/src/recycler/sync_executor.h @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include "common/logging.h" +#include "common/simple_thread_pool.h" + +namespace doris::cloud { + +template +class SyncExecutor { +public: + SyncExecutor( + std::shared_ptr pool, std::string name_tag, + std::function cancel = [](const T& /**/) { return false; }) + : _pool(std::move(pool)), _cancel(std::move(cancel)), _name_tag(std::move(name_tag)) {} + auto add(std::function callback) -> SyncExecutor& { + auto task = std::make_unique(std::move(callback), _cancel, _count); + _count.add_count(); + // The actual task logic would be wrapped by one promise and passed to the threadpool. + // The result would be returned by the future once the task is finished. + // Or the task would be invalid if the whole task is cancelled. + int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); }); + CHECK(r == 0); + _res.emplace_back(std::move(task)); + return *this; + } + std::vector when_all(bool* finished) { + timespec current_time; + auto current_time_second = time(nullptr); + current_time.tv_sec = current_time_second + 300; + current_time.tv_nsec = 0; + auto msg = fmt::format("{} has already taken 5 min", _name_tag); + while (0 != _count.timed_wait(current_time)) { + current_time.tv_sec += 300; + LOG(WARNING) << msg; + } + *finished = !_stop_token; + std::vector res; + res.reserve(_res.size()); + for (auto& task : _res) { + if (!task->valid()) { + *finished = false; + return res; + } + res.emplace_back((*task).get()); + } + return res; + } + void reset() { + _res.clear(); + _stop_token = false; + } + +private: + class Task { + public: + Task(std::function callback, std::function cancel, + bthread::CountdownEvent& count) + : _callback(std::move(callback)), + _cancel(std::move(cancel)), + _count(count), + _fut(_pro.get_future()) {} + void operator()(std::atomic_bool& stop_token) { + std::unique_ptr> defer((int*)0x01, + [&](int*) { _count.signal(); }); + if (stop_token) { + _valid = false; + return; + } + T t = _callback(); + // We'll return this task result to user even if this task return error + // So we don't set _valid to false here + if (_cancel(t)) { + stop_token = true; + } + _pro.set_value(std::move(t)); + } + bool valid() { return _valid; } + T get() { return _fut.get(); } + + private: + // It's guarantted that the valid function can only be called inside SyncExecutor's `when_all()` function + // and only be called when the _count.timed_wait function returned. So there would be no data race for + // _valid then it doesn't need to be one atomic bool. + bool _valid = true; + std::function _callback; + std::function _cancel; + std::promise _pro; + bthread::CountdownEvent& _count; + std::future _fut; + }; + std::vector> _res; + // use CountdownEvent to do periodically log using CountdownEvent::time_wait() + bthread::CountdownEvent _count {0}; + std::atomic_bool _stop_token {false}; + std::shared_ptr _pool; + std::function _cancel; + std::string _name_tag; +}; +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h index 20ea66def8c0b7..be81561b75bf16 100644 --- a/cloud/src/recycler/util.h +++ b/cloud/src/recycler/util.h @@ -19,6 +19,7 @@ #include #include +#include #include diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt index a27f7a1c538f93..ba5e2909918e53 100644 --- a/cloud/test/CMakeLists.txt +++ b/cloud/test/CMakeLists.txt @@ -50,10 +50,10 @@ add_executable(s3_accessor_test s3_accessor_test.cpp) add_executable(hdfs_accessor_test hdfs_accessor_test.cpp) -add_executable(util_test util_test.cpp) - add_executable(stopwatch_test stopwatch_test.cpp) +add_executable(util_test util_test.cpp) + add_executable(network_util_test network_util_test.cpp) message("Meta-service test dependencies: ${TEST_LINK_LIBS}") @@ -85,10 +85,10 @@ target_link_libraries(s3_accessor_test ${TEST_LINK_LIBS}) target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS}) -target_link_libraries(util_test ${TEST_LINK_LIBS}) - target_link_libraries(stopwatch_test ${TEST_LINK_LIBS}) +target_link_libraries(util_test ${TEST_LINK_LIBS}) + target_link_libraries(network_util_test ${TEST_LINK_LIBS}) # FDB related tests need to be linked with libfdb_c diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index d8da067457bad2..4409b8a2c410b3 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -29,6 +29,7 @@ #include "common/config.h" #include "common/logging.h" +#include "common/simple_thread_pool.h" #include "common/util.h" #include "cpp/sync_point.h" #include "meta-service/keys.h" @@ -48,6 +49,8 @@ static const std::string instance_id = "instance_id_recycle_test"; static int64_t current_time = 0; static constexpr int64_t db_id = 1000; +static doris::cloud::RecyclerThreadPoolGroup thread_group; + int main(int argc, char** argv) { auto conf_file = "doris_cloud.conf"; if (!cloud::config::init(conf_file, true)) { @@ -63,6 +66,16 @@ int main(int argc, char** argv) { current_time = duration_cast(system_clock::now().time_since_epoch()).count(); ::testing::InitGoogleTest(&argc, argv); + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + auto recycle_tablet_pool = std::make_shared(config::recycle_pool_parallelism); + recycle_tablet_pool->start(); + auto group_recycle_function_pool = + std::make_shared(config::recycle_pool_parallelism); + group_recycle_function_pool->start(); + thread_group = + RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), + std::move(group_recycle_function_pool)); return RUN_ALL_TESTS(); } @@ -618,7 +631,7 @@ TEST(RecyclerTest, recycle_empty) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_empty"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); ASSERT_EQ(recycler.recycle_rowsets(), 0); @@ -651,7 +664,7 @@ TEST(RecyclerTest, recycle_rowsets) { sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { ++insert_inverted_index; }); sp->enable_processing(); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -713,7 +726,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) { config::instance_recycler_worker_pool_size = 10; config::recycle_task_threshold_seconds = 0; - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); auto sp = SyncPoint::get_instance(); @@ -793,7 +806,7 @@ TEST(RecyclerTest, recycle_tmp_rowsets) { sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { ++insert_inverted_index; }); sp->enable_processing(); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -852,7 +865,7 @@ TEST(RecyclerTest, recycle_tablet) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_tablet"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -925,7 +938,7 @@ TEST(RecyclerTest, recycle_indexes) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_indexes"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -1034,7 +1047,7 @@ TEST(RecyclerTest, recycle_partitions) { obj_info->set_bucket(config::test_s3_bucket); obj_info->set_prefix("recycle_partitions"); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); std::vector schemas; @@ -1142,7 +1155,7 @@ TEST(RecyclerTest, recycle_versions) { InstanceInfoPB instance; instance.set_instance_id(instance_id); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); // Recycle all partitions in table except 30006 ASSERT_EQ(recycler.recycle_partitions(), 0); @@ -1211,7 +1224,7 @@ TEST(RecyclerTest, abort_timeout_txn) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); ASSERT_EQ(recycler.abort_timeout_txn(), 0); @@ -1254,7 +1267,7 @@ TEST(RecyclerTest, abort_timeout_txn_and_rebegin) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); ASSERT_EQ(recycler.abort_timeout_txn(), 0); @@ -1321,7 +1334,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); recycler.abort_timeout_txn(); TxnInfoPB txn_info_pb; @@ -1372,7 +1385,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); recycler.abort_timeout_txn(); @@ -1424,7 +1437,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); recycler.abort_timeout_txn(); @@ -1483,7 +1496,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) { } InstanceInfoPB instance; instance.set_instance_id(mock_instance); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); sleep(1); recycler.abort_timeout_txn(); @@ -1620,7 +1633,7 @@ TEST(RecyclerTest, recycle_copy_jobs) { InstanceInfoPB instance_info; create_instance(internal_stage_id, external_stage_id, instance_info); - InstanceRecycler recycler(txn_kv, instance_info); + InstanceRecycler recycler(txn_kv, instance_info, thread_group); ASSERT_EQ(recycler.init(), 0); auto internal_accessor = recycler.accessor_map_.find(internal_stage_id)->second; @@ -1779,7 +1792,7 @@ TEST(RecyclerTest, recycle_batch_copy_jobs) { InstanceInfoPB instance_info; create_instance(internal_stage_id, external_stage_id, instance_info); - InstanceRecycler recycler(txn_kv, instance_info); + InstanceRecycler recycler(txn_kv, instance_info, thread_group); ASSERT_EQ(recycler.init(), 0); const auto& internal_accessor = recycler.accessor_map_.find(internal_stage_id)->second; @@ -1893,7 +1906,7 @@ TEST(RecyclerTest, recycle_stage) { instance.set_instance_id(mock_instance); instance.add_obj_info()->CopyFrom(object_info); - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; for (int i = 0; i < 10; ++i) { @@ -1953,7 +1966,7 @@ TEST(RecyclerTest, recycle_deleted_instance) { InstanceInfoPB instance_info; create_instance(internal_stage_id, external_stage_id, instance_info); - InstanceRecycler recycler(txn_kv, instance_info); + InstanceRecycler recycler(txn_kv, instance_info, thread_group); ASSERT_EQ(recycler.init(), 0); // create txn key for (size_t i = 0; i < 100; i++) { @@ -2531,7 +2544,7 @@ TEST(RecyclerTest, delete_rowset_data) { } { - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; int64_t txn_id_base = 114115; @@ -2565,7 +2578,7 @@ TEST(RecyclerTest, delete_rowset_data) { tmp_obj_info->set_bucket(config::test_s3_bucket); tmp_obj_info->set_prefix(resource_id); - InstanceRecycler recycler(txn_kv, tmp_instance); + InstanceRecycler recycler(txn_kv, tmp_instance, thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; // Delete multiple rowset files using one series of RowsetPB @@ -2585,7 +2598,7 @@ TEST(RecyclerTest, delete_rowset_data) { ASSERT_FALSE(list_iter->has_next()); } { - InstanceRecycler recycler(txn_kv, instance); + InstanceRecycler recycler(txn_kv, instance, thread_group); ASSERT_EQ(recycler.init(), 0); auto accessor = recycler.accessor_map_.begin()->second; // Delete multiple rowset files using one series of RowsetPB diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp index 0292117076165c..c88ef555f82806 100644 --- a/cloud/test/util_test.cpp +++ b/cloud/test/util_test.cpp @@ -15,17 +15,36 @@ // specific language governing permissions and limitations // under the License. +#include "recycler/util.h" + +#include #include +#include +#include #include #include #include "common/config.h" +#include "common/logging.h" +#include "common/simple_thread_pool.h" #include "common/string_util.h" -#include "glog/logging.h" #include "gtest/gtest.h" +#include "recycler/recycler.h" +#include "recycler/sync_executor.h" + +using namespace doris::cloud; int main(int argc, char** argv) { - doris::cloud::config::init(nullptr, true); + const auto* conf_file = "doris_cloud.conf"; + if (!config::init(conf_file, true)) { + std::cerr << "failed to init config file, conf=" << conf_file << std::endl; + return -1; + } + if (!::init_glog("util")) { + std::cerr << "failed to init glog" << std::endl; + return -1; + } + ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } @@ -89,3 +108,130 @@ TEST(StringUtilTest, test_string_strip) { // clang-format on } + +template +auto task_wrapper(Func... funcs) -> std::function { + return [funcs...]() { + return [](std::initializer_list numbers) { + int i = 0; + for (int num : numbers) { + if (num != 0) { + i = num; + } + } + return i; + }({funcs()...}); + }; +} + +TEST(UtilTest, stage_wrapper) { + std::function func1 = []() { return 0; }; + std::function func2 = []() { return -1; }; + std::function func3 = []() { return 0; }; + auto f = task_wrapper(func1, func2, func3); + ASSERT_EQ(-1, f()); + + f = task_wrapper(func1, func3); + ASSERT_EQ(0, f()); +} + +TEST(UtilTest, delay) { + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + // test normal execute + { + SyncExecutor sync_executor(s3_producer_pool, "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return -1; }; + auto f2 = []() { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return 1; + }; + sync_executor.add(f2); + sync_executor.add(f2); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + ASSERT_EQ(3, res.size()); + } + // test normal execute + { + SyncExecutor sync_executor( + s3_producer_pool, "normal test", + [](const std::string_view k) { return k.empty(); }); + auto f1 = []() { return ""; }; + auto f2 = []() { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return "fake"; + }; + sync_executor.add(f2); + sync_executor.add(f2); + sync_executor.add(f1); + bool finished = true; + auto res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + ASSERT_EQ(3, res.size()); + } +} + +TEST(UtilTest, normal) { + auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); + s3_producer_pool->start(); + // test normal execute + { + SyncExecutor sync_executor(s3_producer_pool, "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return 1; }; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(3, res.size()); + ASSERT_EQ(finished, true); + std::for_each(res.begin(), res.end(), [](auto&& n) { ASSERT_EQ(1, n); }); + } + // test when error happen + { + SyncExecutor sync_executor(s3_producer_pool, "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return 1; }; + sync_executor._stop_token = true; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + ASSERT_EQ(0, res.size()); + } + { + SyncExecutor sync_executor(s3_producer_pool, "normal test", + [](int k) { return k == -1; }); + auto f1 = []() { return 1; }; + auto cancel = []() { return -1; }; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(cancel); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(finished, false); + } + // test string_view + { + SyncExecutor sync_executor( + s3_producer_pool, "normal test", + [](const std::string_view k) { return k.empty(); }); + std::string s = "Hello World"; + auto f1 = [&s]() { return std::string_view(s); }; + sync_executor.add(f1); + sync_executor.add(f1); + sync_executor.add(f1); + bool finished = true; + std::vector res = sync_executor.when_all(&finished); + ASSERT_EQ(3, res.size()); + std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, n); }); + } +}