From bade50db566f77ce88e541f8845147ab8aacbff7 Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Sun, 24 Dec 2023 21:59:11 +0800 Subject: [PATCH] [chore](test) Add testing util sync point (#28924) --- be/src/common/sync_point.cpp | 236 ++++++++++++++++++++++++++++++++++ be/src/common/sync_point.h | 240 +++++++++++++++++++++++++++++++++++ 2 files changed, 476 insertions(+) create mode 100644 be/src/common/sync_point.cpp create mode 100644 be/src/common/sync_point.h diff --git a/be/src/common/sync_point.cpp b/be/src/common/sync_point.cpp new file mode 100644 index 00000000000000..816c5a82a94bac --- /dev/null +++ b/be/src/common/sync_point.cpp @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Most code of this file is copied from rocksdb SyncPoint. +// https://github.com/facebook/rocksdb + +// clang-format off +#include "sync_point.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace doris { + +struct SyncPoint::Data { // impl +public: + Data() : enabled_(false) { } + virtual ~Data() {} + void process(const std::string& point, std::vector&& cb_args); + void load_dependency(const std::vector& dependencies); + void load_dependency_and_markers( + const std::vector& dependencies, + const std::vector& markers); + bool predecessors_all_cleared(const std::string& point); + void set_call_back(const std::string& point, + const std::function&&)>& callback); + void clear_call_back(const std::string& point); + void clear_all_call_backs(); + void enable_processing(); + void disable_processing(); + void clear_trace(); +private: + bool disable_by_marker(const std::string& point, std::thread::id thread_id); +private: + // successor/predecessor map loaded from load_dependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + std::unordered_map&&)>> callbacks_; + std::unordered_map> markers_; + std::unordered_map marked_thread_id_; + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + std::atomic enabled_; + int num_callbacks_running_ = 0; +}; + +SyncPoint* SyncPoint::get_instance() { + static SyncPoint sync_point; + return &sync_point; +} +SyncPoint::SyncPoint() : + impl_(new Data) { +} +SyncPoint:: ~SyncPoint() { + delete impl_; +} +void SyncPoint::load_dependency(const std::vector& dependencies) { + impl_->load_dependency(dependencies); +} +void SyncPoint::load_dependency_and_markers( + const std::vector& dependencies, + const std::vector& markers) { + impl_->load_dependency_and_markers(dependencies, markers); +} +void SyncPoint::set_call_back(const std::string& point, + const std::function&&)>& callback) { + impl_->set_call_back(point, callback); +} +void SyncPoint::clear_call_back(const std::string& point) { + impl_->clear_call_back(point); +} +void SyncPoint::clear_all_call_backs() { + impl_->clear_all_call_backs(); +} +void SyncPoint::enable_processing() { + impl_->enable_processing(); +} +void SyncPoint::disable_processing() { + impl_->disable_processing(); +} +void SyncPoint::clear_trace() { + impl_->clear_trace(); +} +void SyncPoint::process(const std::string& point, std::vector&& cb_arg) { + impl_->process(point, std::move(cb_arg)); +} + +// ============================================================================= +// SyncPoint implementation +// ============================================================================= + +void SyncPoint::Data::load_dependency( + const std::vector& dependencies) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + cv_.notify_all(); +} + +/** + * Markers are also dependency descriptions + */ +void SyncPoint::Data::load_dependency_and_markers( + const std::vector& dependencies, + const std::vector& markers) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + for (const auto& marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + } + cv_.notify_all(); +} + +bool SyncPoint::Data::predecessors_all_cleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::Data::clear_call_back(const std::string& point) { + std::unique_lock lock(mutex_); + callbacks_.erase(point); +} + +void SyncPoint::Data::clear_all_call_backs() { + std::unique_lock lock(mutex_); + callbacks_.clear(); +} + +void SyncPoint::Data::process(const std::string& point, std::vector&& cb_arg) { + if (!enabled_) { + return; + } + std::unique_lock lock(mutex_); + auto thread_id = std::this_thread::get_id(); + auto marker_iter = markers_.find(point); + // if current sync point is a marker + // record it in marked_thread_id_ for all its successors + if (marker_iter != markers_.end()) { + for (auto& marked_point : marker_iter->second) { + marked_thread_id_.emplace(marked_point, thread_id); + } + } + // if current point is a marker's successor + if (disable_by_marker(point, thread_id)) { + return; + } + while (!predecessors_all_cleared(point)) { + cv_.wait(lock); + if (disable_by_marker(point, thread_id)) { + return; + } + } + auto callback_pair = callbacks_.find(point); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + auto callback = callback_pair->second; + mutex_.unlock(); + callback(std::move(cb_arg)); + mutex_.lock(); + num_callbacks_running_--; + } + cleared_points_.insert(point); + cv_.notify_all(); +} + +bool SyncPoint::Data::disable_by_marker(const std::string& point, + std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() // is a successor + && thread_id != marked_point_iter->second; +} + +void SyncPoint::Data::set_call_back(const std::string& point, + const std::function&&)>& callback) { + std::lock_guard lock(mutex_); + callbacks_[point] = callback; +} + +void SyncPoint::Data::clear_trace() { + std::lock_guard lock(mutex_); + cleared_points_.clear(); +} + +void SyncPoint::Data::enable_processing() { + enabled_ = true; +} + +void SyncPoint::Data::disable_processing() { + enabled_ = false; +} + +} // namespace doris +// clang-format on +// vim: et tw=80 ts=2 sw=2 cc=80: diff --git a/be/src/common/sync_point.h b/be/src/common/sync_point.h new file mode 100644 index 00000000000000..18b3a63c05e700 --- /dev/null +++ b/be/src/common/sync_point.h @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Most code of this file is copied from rocksdb SyncPoint. +// https://github.com/facebook/rocksdb + +#pragma once +// clang-format off +#include +#include +#include +#include +#include + +namespace doris { + +#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, ...) \ + [&]() mutable { \ + TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, decltype((expr)) {}, __VA_ARGS__); \ + return (expr); \ + }() + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::load_dependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an example use case. +class SyncPoint { +public: + static SyncPoint* get_instance(); + SyncPoint(const SyncPoint&) = delete; + SyncPoint& operator=(const SyncPoint&) = delete; + ~SyncPoint(); + struct SyncPointPair { + std::string predecessor; + std::string successor; + }; + + // call once at the beginning of a test to setup the dependency between + // sync points + // + // Example: + // load_dependency({{"point1", "point2"}, + // {"point2", "point3"}, + // {"point3", "point4"}}); + // + // test case thread thread for object being tested + // | | + // | | + // | \-------------0-------------\ | + // | \-> x sync point1 set in code + // | /----------1----------------/ | + // point2 o <-/ /-> x sync point4 set in code + // | / | + // z / | + // z /---------2-----------/ | there may be nothing + // | / | between point1 point4 + // ponit3 o --/ | they are for sync + // | | between test case and object + // v v + // + // vertical arrow means the procedure of each thread, the running order will + // be: + // test case thread -> point1 -> point2 -> point3 -> point4 -> object being + // tested + // + // we may do a lot of things between point2 and point3, say, change the + // object's status, call another method, propagate data race and etc. + void load_dependency(const std::vector& dependencies); + + // call once at the beginning of a test to setup the dependency between + // sync points and setup markers indicating the successor is only enabled + // when it is processed on the same thread as the predecessor. + // When adding a marker, it implicitly adds a dependency for the marker pair. + void load_dependency_and_markers( + const std::vector& dependencies, + const std::vector& markers); + + // The argument to the callback is passed through from + // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or + // TEST_IDX_SYNC_POINT was used. + void set_call_back(const std::string& point, + const std::function&&)>& callback); + + // Clear callback function by point + void clear_call_back(const std::string& point); + + // Clear all call back functions. + void clear_all_call_backs(); + + // Enable sync point processing (disabled on startup) + void enable_processing(); + + // Disable sync point processing + void disable_processing(); + + // Remove the execution trace of all sync points + void clear_trace(); + + // Triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + // And/or call registered callback function, with argument `cb_args` + void process(const std::string& point, std::vector&& cb_args = {}); + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + // We want this to be public so we can subclass the implementation + struct Data; + +private: + // Singleton + SyncPoint(); + Data* impl_; // impletation which is hidden in cpp file +}; + +template +T try_any_cast(const std::any& a) { + try { + return std::any_cast(a); + } catch (const std::bad_any_cast& e) { + std::cerr << e.what() << " expected=" << typeid(T).name() << " actual=" << a.type().name() << std::endl; + throw e; + } +} + +template +auto try_any_cast_ret(std::vector& any) { + return try_any_cast*>(any.back()); +} + +} // namespace doris + +#define SYNC_POINT(x) doris::SyncPoint::get_instance()->process(x) +#define IDX_SYNC_POINT(x, index) \ + doris::SyncPoint::get_instance()->process(x + std::to_string(index)) +#define SYNC_POINT_CALLBACK(x, ...) doris::SyncPoint::get_instance()->process(x, {__VA_ARGS__}) +#define SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) \ +{ \ + std::pair ret {default_ret_val, false}; \ + std::vector args {__VA_ARGS__}; \ + args.push_back(&ret); \ + doris::SyncPoint::get_instance()->process(x, std::move(args)); \ + if (ret.second) return std::move(ret.first); \ +} +#define SYNC_POINT_RETURN_WITH_VOID(x, ...) \ +{ \ + bool pred = false; \ + std::vector args {__VA_ARGS__}; \ + args.push_back(&pred); \ + doris::SyncPoint::get_instance()->process(x, std::move(args)); \ + if (pred) return; \ +} +#define SYNC_POINT_SINGLETON() (void)doris::SyncPoint::get_instance() + +// TEST_SYNC_POINT is no op in release build. +// Turn on this feature by defining the macro +#ifndef BE_TEST +# define TEST_SYNC_POINT(x) +# define TEST_IDX_SYNC_POINT(x, index) +# define TEST_SYNC_POINT_CALLBACK(x, ...) +# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) +# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...) +// seldom called +# define TEST_SYNC_POINT_SINGLETON() +#else +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after depedency on other sync points, +// configured at runtime via SyncPoint::load_dependency. This could be +// utilized to re-produce race conditions between threads. +# define TEST_SYNC_POINT(x) SYNC_POINT(x) +# define TEST_IDX_SYNC_POINT(x, index) IDX_SYNC_POINT(x, index) +# define TEST_SYNC_POINT_CALLBACK(x, ...) SYNC_POINT_CALLBACK(x, __VA_ARGS__) +# define TEST_SYNC_POINT_SINGLETON() SYNC_POINT_SINGLETON() + +/** + * Inject return points for testing. + * + * Currently we can only insert more points to get context from tested thread + * and process in testing thread, e.g. + * + * tested thread: + * ... + * TEST_SYNC_POINT_RETURN_WITH_VALUE("point_ret", int(0), ctx0); + * ... + * + * testing thread: + * sync_point->add("point_ret", [](auto&& args) { + * auto ctx0 = try_any_cast(args[0]); + * auto pair = try_any_cast*>(args.back()); + * pair->first = ...; + * pair->second = ctx0; }); + * + * See sync_piont_test.cpp for more details. + */ +#pragma GCC diagnostic ignored "-Waddress" +# define TEST_SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__) +# define TEST_SYNC_POINT_RETURN_WITH_VOID(x, ...) SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__) + +#endif // BE_TEST + +// TODO: define injection point in production env. +// the `if` expr can be live configure of the application +#ifndef ENABLE_INJECTION_POINT +# define TEST_INJECTION_POINT(x) +# define TEST_IDX_INJECTION_POINT(x, index) +# define TEST_INJECTION_POINT_CALLBACK(x, ...) +# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) +# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...) +# define TEST_INJECTION_POINT_SINGLETON() +#else +namespace doris::config { +extern bool enable_injection_point; +} +# define TEST_INJECTION_POINT(x) if (doris::config::enable_injection_point) { SYNC_POINT(x); } +# define TEST_IDX_INJECTION_POINT(x, index) if (doris::config::enable_injection_point) { IDX_SYNC_POINT(x, index); } +# define TEST_INJECTION_POINT_CALLBACK(x, ...) if (doris::config::enable_injection_point) { SYNC_POINT_CALLBACK(x, __VA_ARGS__); } +# define TEST_INJECTION_POINT_SINGLETON() if (doris::config::enable_injection_point) { SYNC_POINT_SINGLETON(); } +# define TEST_INJECTION_POINT_RETURN_WITH_VALUE(x, default_ret_val, ...) if (doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VALUE(x, default_ret_val, __VA_ARGS__); } +# define TEST_INJECTION_POINT_RETURN_WITH_VOID(x, ...) if (doris::config::enable_injection_point) { SYNC_POINT_RETURN_WITH_VOID(x, __VA_ARGS__); } +#endif // ENABLE_INJECTION_POINT + +// clang-format on +// vim: et tw=80 ts=2 sw=2 cc=80: