Skip to content

Commit

Permalink
add inject
Browse files Browse the repository at this point in the history
  • Loading branch information
ByteYue committed Apr 16, 2024
1 parent ac1ac6c commit 4275129
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
41 changes: 41 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,47 @@ void register_suites() {
pair->second = true;
});
});
suite_map.emplace("test_s3_file_writer", []{
auto* sp = SyncPoint::get_instance();
sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache", [](auto&&) {
std::srand(static_cast<unsigned int>(std::time(nullptr)));
int random_sleep_time_second = std::rand() % 10 + 1;
std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
});
sp->set_call_back("UploadFileBuffer::upload_to_local_file_cache_inject", [](auto&& args) {
auto pair = try_any_cast<std::pair<Status, bool>*>(args.back());
pair->first =
Status::IOError<false>("failed to write into file cache due to inject error");
pair->second = true;
});
});
suite_map.emplace("test_storage_vault", [] {
auto *sp = SyncPoint::get_instance();
sp->set_call_back("HdfsFileWriter::appendv_delay", [](auto&&) {
std::srand(static_cast<unsigned int>(std::time(nullptr)));
int random_sleep_time_second = std::rand() % 10 + 1;
std::this_thread::sleep_for(std::chrono::seconds(random_sleep_time_second));
});
sp->set_call_back("HdfsFileWriter::appendv_error", [](auto&& args) {
auto pair = try_any_cast<std::pair<Status, bool>*>(args.back());
pair->second = true;
});
sp->set_call_back("HdfsFileWriter::close_hdfsFlush", [](auto&& args) {
auto pair = try_any_cast<std::pair<int, bool>*>(args.back());
pair->first = -1;
pair->second = true;
});
sp->set_call_back("HdfsFileWriter::close_hdfsCloseFile", [](auto&& args) {
auto pair = try_any_cast<std::pair<int, bool>*>(args.back());
pair->first = -1;
pair->second = true;
});
sp->set_call_back("HdfsFileReader:read_error", [](auto&& args) {
auto pair = try_any_cast<std::pair<Status, bool>*>(args.back());
pair->first = Status::InternalError("read hdfs error");
pair->second = true;
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "common/sync_point.h"
#include "io/fs/err_utils.h"
#include "io/hdfs_util.h"
#include "service/backend_options.h"
Expand Down Expand Up @@ -126,6 +127,10 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
while (has_read < bytes_req) {
tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read,
to + has_read, bytes_req - has_read);
{
[[maybe_unused]] Status error_ret;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileReader:read_error", &error_ret);
}
if (loop_read < 0) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
Expand Down
10 changes: 10 additions & 0 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "common/logging.h"
#include "common/status.h"
#include "common/sync_point.h"
#include "io/fs/err_utils.h"
#include "io/fs/hdfs_file_system.h"
#include "io/hdfs_util.h"
Expand Down Expand Up @@ -54,6 +55,7 @@ Status HdfsFileWriter::close() {
}
_closed = true;
int result = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::close_hdfsFlush", &result);
if (result == -1) {
std::stringstream ss;
ss << "failed to flush hdfs file. "
Expand All @@ -64,6 +66,7 @@ Status HdfsFileWriter::close() {

result = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
_hdfs_file = nullptr;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::close_hdfsCloseFile", &result);
if (result != 0) {
std::string err_msg = hdfs_error();
return Status::InternalError(
Expand All @@ -83,7 +86,14 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
size_t left_bytes = result.size;
const char* p = result.data;
while (left_bytes > 0) {
TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::appendv_delay");
int64_t written_bytes = hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, p, left_bytes);
{
[[maybe_unused]] Status error_ret = Status::InternalError(
"write hdfs failed. fs_name: {}, path: {}, error: size exceeds", _fs_name,
_path.native());
TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::appendv_error", &error_ret);
}
if (written_bytes < 0) {
return Status::InternalError("write hdfs failed. fs_name: {}, path: {}, error: {}",
_fs_name, _path.native(), hdfs_error());
Expand Down

0 comments on commit 4275129

Please sign in to comment.