From ddef0a31d349654fb35219b0316815268dfd1916 Mon Sep 17 00:00:00 2001 From: ByteYue Date: Tue, 16 Apr 2024 15:57:17 +0800 Subject: [PATCH] add inject --- be/src/cloud/injection_point_action.cpp | 41 +++++++++++++++++++++++++ be/src/io/fs/hdfs_file_reader.cpp | 5 +++ be/src/io/fs/hdfs_file_writer.cpp | 10 ++++++ 3 files changed, 56 insertions(+) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index adcc6cad70878f0..cff29f38a2cdac2 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -52,6 +52,47 @@ void register_suites() { pair->second = true; }); }); + suite_map.emplace("test_s3_file_writer", []{ + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache", [](auto&&) { + std::srand(static_cast(std::time(nullptr))); + int random_sleep_time_second = std::rand() % 10 + 1; + std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second)); + }); + sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject", [](auto&& args) { + auto pair = try_any_cast*>(args.back()); + pair->first = + Status::IOError("failed to write into file cache due to inject error"); + pair->second = true; + }); + }); + suite_map.emplace("test_storage_vault", [] { + auto *sp = SyncPoint::get_instance(); + sp->set_call_back("HdfsFileWriter::appendv_delay", [](auto&&) { + std::srand(static_cast(std::time(nullptr))); + int random_sleep_time_second = std::rand() % 10 + 1; + std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second)); + }); + sp->set_call_back("HdfsFileWriter::appendv_error", [](auto&& args) { + auto pair = try_any_cast*>(args.back()); + pair->second = true; + }); + sp->set_call_back("HdfsFileWriter::close_hdfsFlush", [](auto&& args) { + auto pair = try_any_cast*>(args.back()); + pair->first = -1; + pair->second = true; + }); + sp->set_call_back("HdfsFileWriter::close_hdfsCloseFile", [](auto&& args) { + auto pair = try_any_cast*>(args.back()); + pair->first = -1; + pair->second = true; + }); + sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) { + auto pair = try_any_cast*>(args.back()); + pair->first = Status::InternalError("read hdfs error"); + pair->second = true; + }); + }); } void set_sleep(const std::string& point, HttpRequest* req) { diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 45be6ffd60acdfd..3b8d76d55dc061b 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -26,6 +26,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" +#include "common/sync_point.h" #include "io/fs/err_utils.h" #include "io/hdfs_util.h" #include "service/backend_options.h" @@ -126,6 +127,10 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r while (has_read < bytes_req) { tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read, to + has_read, bytes_req - has_read); + { + [[maybe_unused]] Status error_ret; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", &error_ret); + } if (loop_read < 0) { // invoker maybe just skip Status.NotFound and continue // so we need distinguish between it and other kinds of errors diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index fcb4ccfd74acd6a..eb06bb74f74d7b6 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -26,6 +26,7 @@ #include "common/logging.h" #include "common/status.h" +#include "common/sync_point.h" #include "io/fs/err_utils.h" #include "io/fs/hdfs_file_system.h" #include "io/hdfs_util.h" @@ -54,6 +55,7 @@ Status HdfsFileWriter::close() { } _closed = true; int result = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::close_hdfsFlush", &result); if (result == -1) { std::stringstream ss; ss << "failed to flush hdfs file. " @@ -64,6 +66,7 @@ Status HdfsFileWriter::close() { result = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); _hdfs_file = nullptr; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::close_hdfsCloseFile", &result); if (result != 0) { std::string err_msg = hdfs_error(); return Status::InternalError( @@ -83,7 +86,14 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { size_t left_bytes = result.size; const char* p = result.data; while (left_bytes > 0) { + TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::appendv_delay"); int64_t written_bytes = hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, p, left_bytes); + { + [[maybe_unused]] Status error_ret = Status::InternalError( + "write hdfs failed. fs_name: {}, path: {}, error: size exceeds", _fs_name, + _path.native()); + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::appendv_error", &error_ret); + } if (written_bytes < 0) { return Status::InternalError("write hdfs failed. fs_name: {}, path: {}, error: {}", _fs_name, _path.native(), hdfs_error());