diff --git a/cloud/blockstore/apps/client/lib/command.cpp b/cloud/blockstore/apps/client/lib/command.cpp index bd2884fbbda..aa5a161e43d 100644 --- a/cloud/blockstore/apps/client/lib/command.cpp +++ b/cloud/blockstore/apps/client/lib/command.cpp @@ -60,13 +60,6 @@ const TString DefaultIamTokenFile = "~/.nbs-client/iam-token"; //////////////////////////////////////////////////////////////////////////////// -static const TMap EncryptionModes = { - { "no", NProto::NO_ENCRYPTION }, - { "aes-xts", NProto::ENCRYPTION_AES_XTS }, -}; - -//////////////////////////////////////////////////////////////////////////////// - TString ResolvePath(const TString& path) { if (path.StartsWith('~')) { @@ -334,46 +327,6 @@ TString TCommand::NormalizeCommand(TString command) return command; } -NProto::EEncryptionMode TCommand::EncryptionModeFromString(const TString& str) -{ - auto it = EncryptionModes.find(str); - if (it != EncryptionModes.end()) { - return it->second; - } - - ythrow yexception() << "invalid encryption mode: " << str; -} - -NProto::TEncryptionSpec TCommand::CreateEncryptionSpec( - NProto::EEncryptionMode mode, - const TString& keyPath, - const TString& keyHash) -{ - if (mode == NProto::NO_ENCRYPTION) { - if (keyHash || keyPath) { - throw yexception() << "invalid encryption options: " - << " set encryption mode or remove key hash and key path"; - } - return {}; - } - - if (keyHash && keyPath) { - throw yexception() << "invalid encryption options: " - << " set key path or key hash, not both"; - } - - if (!keyHash && !keyPath) { - throw yexception() << "invalid encryption options: " - << " set key hash or key path or remove encryption mode"; - } - - NProto::TEncryptionSpec encryptionSpec; - encryptionSpec.SetMode(mode); - encryptionSpec.SetKeyHash(keyHash); - encryptionSpec.MutableKeyPath()->SetFilePath(keyPath); - return encryptionSpec; -} - NProto::TMountVolumeResponse TCommand::MountVolume( TString diskId, TString mountToken, diff --git a/cloud/blockstore/apps/client/lib/command.h b/cloud/blockstore/apps/client/lib/command.h index 217ad15a8fa..93adade780e 100644 --- a/cloud/blockstore/apps/client/lib/command.h +++ b/cloud/blockstore/apps/client/lib/command.h @@ -142,13 +142,6 @@ class TCommand protected: virtual bool DoExecute() = 0; - static NProto::EEncryptionMode EncryptionModeFromString(const TString& str); - - static NProto::TEncryptionSpec CreateEncryptionSpec( - NProto::EEncryptionMode mode, - const TString& keyPath, - const TString& keyHash); - // For read/write/zero blocks requests NProto::TMountVolumeResponse MountVolume( TString diskId, diff --git a/cloud/blockstore/apps/client/lib/create_volume.cpp b/cloud/blockstore/apps/client/lib/create_volume.cpp index 59a6b07e132..af8c1005b85 100644 --- a/cloud/blockstore/apps/client/lib/create_volume.cpp +++ b/cloud/blockstore/apps/client/lib/create_volume.cpp @@ -3,6 +3,7 @@ #include "volume_manipulation_params.h" #include +#include #include #include #include diff --git a/cloud/blockstore/apps/client/lib/read_blocks.cpp b/cloud/blockstore/apps/client/lib/read_blocks.cpp index 59196ac0857..76378b9419d 100644 --- a/cloud/blockstore/apps/client/lib/read_blocks.cpp +++ b/cloud/blockstore/apps/client/lib/read_blocks.cpp @@ -1,9 +1,9 @@ #include "read_blocks.h" #include +#include #include #include - #include #include #include diff --git a/cloud/blockstore/apps/client/lib/start_endpoint.cpp b/cloud/blockstore/apps/client/lib/start_endpoint.cpp index 63a393efa81..ad6d9f79397 100644 --- a/cloud/blockstore/apps/client/lib/start_endpoint.cpp +++ b/cloud/blockstore/apps/client/lib/start_endpoint.cpp @@ -1,5 +1,6 @@ #include "start_endpoint.h" +#include #include #include #include @@ -110,6 +111,7 @@ class TStartEndpointCommand final TString EncryptionKeyHash; bool Persistent = false; TString NbdDeviceFile; + THashSet CGroups; public: TStartEndpointCommand(IBlockStorePtr client) @@ -185,6 +187,10 @@ class TStartEndpointCommand final Opts.AddLongOption("nbd-device", "nbd device file which nbd-client connected to") .RequiredArgument("STR") .StoreResult(&NbdDeviceFile); + + Opts.AddLongOption("cgroup", "cgroup to place into") + .RequiredArgument("STR") + .InsertTo(&CGroups); } protected: @@ -231,6 +237,7 @@ class TStartEndpointCommand final EncryptionKeyHash)); request->SetPersistent(Persistent); request->SetNbdDeviceFile(NbdDeviceFile); + request->MutableClientCGroups()->Assign(CGroups.begin(), CGroups.end()); } STORAGE_DEBUG("Sending StartEndpoint request"); diff --git a/cloud/blockstore/apps/client/lib/write_blocks.cpp b/cloud/blockstore/apps/client/lib/write_blocks.cpp index dc19b9e0a46..cf44c1e8def 100644 --- a/cloud/blockstore/apps/client/lib/write_blocks.cpp +++ b/cloud/blockstore/apps/client/lib/write_blocks.cpp @@ -2,9 +2,9 @@ #include #include +#include #include #include - #include #include #include diff --git a/cloud/blockstore/apps/client/lib/ya.make b/cloud/blockstore/apps/client/lib/ya.make index ea5efa1fb43..8f25a1c7a64 100644 --- a/cloud/blockstore/apps/client/lib/ya.make +++ b/cloud/blockstore/apps/client/lib/ya.make @@ -57,6 +57,7 @@ PEERDIR( cloud/blockstore/libs/common cloud/blockstore/libs/diagnostics cloud/blockstore/libs/encryption + cloud/blockstore/libs/encryption/model cloud/blockstore/libs/endpoint_proxy/client cloud/blockstore/libs/service diff --git a/cloud/blockstore/apps/client/lib/zero_blocks.cpp b/cloud/blockstore/apps/client/lib/zero_blocks.cpp index 5b2908a385d..a86e196a1b3 100644 --- a/cloud/blockstore/apps/client/lib/zero_blocks.cpp +++ b/cloud/blockstore/apps/client/lib/zero_blocks.cpp @@ -1,6 +1,7 @@ #include "zero_blocks.h" #include +#include #include #include #include diff --git a/cloud/blockstore/libs/encryption/CMakeLists.linux-x86_64.txt b/cloud/blockstore/libs/encryption/CMakeLists.linux-x86_64.txt index e7a4382921b..fd9f7c5e2ec 100644 --- a/cloud/blockstore/libs/encryption/CMakeLists.linux-x86_64.txt +++ b/cloud/blockstore/libs/encryption/CMakeLists.linux-x86_64.txt @@ -29,4 +29,5 @@ target_sources(blockstore-libs-encryption PRIVATE ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/encryption/encryption_service.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/encryption/encryption_test.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/encryption/encryptor.cpp + ${CMAKE_SOURCE_DIR}/cloud/blockstore/libs/encryption/model/utils.cpp ) diff --git a/cloud/blockstore/libs/encryption/model/utils.cpp b/cloud/blockstore/libs/encryption/model/utils.cpp new file mode 100644 index 00000000000..852f8a9ba25 --- /dev/null +++ b/cloud/blockstore/libs/encryption/model/utils.cpp @@ -0,0 +1,69 @@ +#include "utils.h" + +#include +#include + +namespace NCloud::NBlockStore { + +namespace { + +/////////////////////////////////////////////////////////////////////////////// + +const TMap EncryptionModes = { + {"no", NProto::NO_ENCRYPTION}, + {"aes-xts", NProto::ENCRYPTION_AES_XTS}, +}; + +} // namespace + +NProto::EEncryptionMode EncryptionModeFromString(const TString& str) +{ + auto it = EncryptionModes.find(str); + if (it != EncryptionModes.end()) { + return it->second; + } + + ythrow yexception() << "invalid encryption mode: " << str; +} + +TString EncryptionModeToString(NProto::EEncryptionMode encryptionMode) +{ + for (const auto& [key, value]: EncryptionModes) { + if (value == encryptionMode) { + return key; + } + } + ythrow yexception() << "invalid encryption mode: " + << static_cast(encryptionMode); +} + +NProto::TEncryptionSpec CreateEncryptionSpec( + NProto::EEncryptionMode mode, + const TString& keyPath, + const TString& keyHash) +{ + if (mode == NProto::NO_ENCRYPTION) { + Y_ENSURE( + keyHash.empty() && keyPath.empty(), + "invalid encryption options: set encryption mode or remove key " + "hash and key path"); + return {}; + } + + Y_ENSURE( + keyHash.empty() || keyPath.empty(), + "invalid encryption options: set key path or key hash, not both"); + + Y_ENSURE( + keyHash || keyPath, + "invalid encryption options: set key hash or key path or remove " + "encryption mode"); + + NProto::TEncryptionSpec encryptionSpec; + encryptionSpec.SetMode(mode); + encryptionSpec.SetKeyHash(keyHash); + encryptionSpec.MutableKeyPath()->SetFilePath(keyPath); + return encryptionSpec; +} + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/encryption/model/utils.h b/cloud/blockstore/libs/encryption/model/utils.h new file mode 100644 index 00000000000..23c403721ae --- /dev/null +++ b/cloud/blockstore/libs/encryption/model/utils.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +namespace NCloud::NBlockStore { + +NProto::EEncryptionMode EncryptionModeFromString(const TString& str); + +TString EncryptionModeToString(NProto::EEncryptionMode encryptionMode); + +NProto::TEncryptionSpec CreateEncryptionSpec( + NProto::EEncryptionMode mode, + const TString& keyPath, + const TString& keyHash); + +} // namespace NCloud::NBlockStore diff --git a/cloud/blockstore/libs/encryption/model/ya.make b/cloud/blockstore/libs/encryption/model/ya.make new file mode 100644 index 00000000000..72107f5e35e --- /dev/null +++ b/cloud/blockstore/libs/encryption/model/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + utils.cpp +) + +PEERDIR( + cloud/blockstore/public/api/protos +) + +END() diff --git a/cloud/blockstore/libs/encryption/ya.make b/cloud/blockstore/libs/encryption/ya.make index 6c4517c91a4..e0ef1809541 100644 --- a/cloud/blockstore/libs/encryption/ya.make +++ b/cloud/blockstore/libs/encryption/ya.make @@ -13,8 +13,9 @@ PEERDIR( cloud/blockstore/libs/common cloud/blockstore/libs/diagnostics + cloud/blockstore/libs/encryption/model cloud/blockstore/libs/service - + cloud/storage/core/libs/common cloud/storage/core/libs/endpoints/keyring diff --git a/cloud/blockstore/libs/endpoints_vhost/external_endpoint_stats.cpp b/cloud/blockstore/libs/endpoints_vhost/external_endpoint_stats.cpp index 11ff73895c3..2a0b11de5d2 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_endpoint_stats.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_endpoint_stats.cpp @@ -1,7 +1,7 @@ #include "external_endpoint_stats.h" #include - +#include #include #include @@ -67,7 +67,8 @@ void BatchCompleted( request, requestStats["count"].GetUInteger(), requestStats["bytes"].GetUInteger(), - requestStats["errors"].GetUInteger(), + requestStats["errors"].GetUInteger() + + requestStats["encryptor_errors"].GetUInteger(), times, sizes); } @@ -91,6 +92,17 @@ void TEndpointStats::Update(const NJson::TJsonValue& stats) stats["write"], ClientId, DiskId); + + // Report critical events + if (stats.Has("crit_events")) { + for (const auto& event: stats["crit_events"].GetArray()) { + ReportCriticalEvent( + GetCriticalEventFullName(event["name"].GetString()), + event["message"].GetString(), + false // verifyDebug + ); + } + } } } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp index 3f1a1cacc10..4f59c5266f5 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp @@ -6,8 +6,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -37,6 +39,8 @@ #include #include +#include + namespace NCloud::NBlockStore::NServer { using namespace NThreading; @@ -56,6 +60,11 @@ constexpr ui64 ReadFirstStatRetryCount = 10; // COMPLETION_STATS_WAIT_DURATION. constexpr auto StatReadDuration = TDuration::Seconds(1); +// Backoff delays for external-vhost server restart. +constexpr auto RestartMinDelay = TDuration::MilliSeconds(100); +constexpr auto RestartMaxDelay = TDuration::Seconds(30); +constexpr auto RestartWasTooLongAgo = TDuration::Seconds(60); + enum class EEndpointType { Local, @@ -527,8 +536,7 @@ void AddToCGroups(pid_t pid, const TVector& cgroups) class TEndpoint final : public IExternalEndpoint , public std::enable_shared_from_this - , public ISimpleThread // XXX: pidfd_open is not implemented for Linux 4.14 -{ // so we are forced to use a thread for waitpid. +{ private: const ILoggingServicePtr Logging; const TExecutorPtr Executor; @@ -545,6 +553,8 @@ class TEndpoint final TIntrusivePtr Process; std::atomic_bool ShouldStop = false; + TInstant LastRestartAt; + TBackoffDelayProvider RestartBackoff{RestartMinDelay, RestartMaxDelay}; TPromise StopPromise = NewPromise(); @@ -588,7 +598,14 @@ class TEndpoint final { Process = StartProcess(); - ISimpleThread::Start(); + // To avoid a race, we need to get the shared pointer in the calling + // thread and pass it to the background thread. This guaranteed that the + // background thread will deal with a live this. + auto workFunc = [self = shared_from_this()]() + { + self->ThreadProc(); + }; + std::thread(std::move(workFunc)).detach(); } TFuture Stop() override @@ -604,12 +621,10 @@ class TEndpoint final } private: - void* ThreadProc() override + // XXX: pidfd_open is not implemented for Linux 4.14 + // so we are forced to use a thread for waitpid. + void ThreadProc() { - auto holder = shared_from_this(); - - Detach(); // don't call Join in the same thread - ::NCloud::SetCurrentThreadName("waitEP"); NProto::TError error; @@ -644,8 +659,6 @@ class TEndpoint final } StopPromise.SetValue(error); - - return nullptr; } TIntrusivePtr StartProcess() @@ -657,7 +670,12 @@ class TEndpoint final } }; - AddToCGroups(process.Pid, Cgroups); + try { + AddToCGroups(process.Pid, Cgroups); + } catch (...) { + ShouldStop = true; + throw; + } auto ep = MakeIntrusive( ClientId, @@ -674,9 +692,25 @@ class TEndpoint final TIntrusivePtr RestartProcess() { + if (TInstant::Now() - LastRestartAt > RestartWasTooLongAgo) { + // The last restart happened a long time ago, restart immediately. + RestartBackoff.Reset(); + } else { + auto delay = RestartBackoff.GetDelayAndIncrease(); + STORAGE_WARN( + "[" << ClientId << "] Will restart external endpoint after " + << delay.ToString()); + Sleep(delay); + } + + if (ShouldStop) { + return nullptr; + } + STORAGE_WARN("[" << ClientId << "] Restart external endpoint"); try { + LastRestartAt = TInstant::Now(); return StartProcess(); } catch (...) { STORAGE_ERROR("[" << ClientId << "] Can't restart external endpoint: " @@ -1040,6 +1074,25 @@ class TExternalVhostEndpointListener final args.emplace_back("--read-only"); } + const auto& encryptionSpec = request.GetEncryptionSpec(); + if (encryptionSpec.GetMode() != NProto::NO_ENCRYPTION) { + args.emplace_back("--encryption-mode"); + args.emplace_back(EncryptionModeToString(encryptionSpec.GetMode())); + + const auto& keyPath = encryptionSpec.GetKeyPath(); + if (keyPath.HasFilePath()) { + args.emplace_back("--encryption-key-path"); + args.emplace_back(keyPath.GetFilePath()); + } else if (keyPath.HasKeyringId()) { + args.emplace_back("--encryption-keyring-id"); + args.emplace_back(ToString(keyPath.GetKeyringId())); + } else { + ythrow yexception() + << "EncryptionSpec should has FilePath or KeyringId " + << encryptionSpec.AsJSON(); + } + } + TVector cgroups( request.GetClientCGroups().begin(), request.GetClientCGroups().end() diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp index 3e351a15436..657feedd5ed 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp @@ -483,6 +483,82 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest) } } + Y_UNIT_TEST_F(ShouldStartEncryptedAioExternalEndpointWithPath, TFixture) + { + auto request = CreateDefaultStartEndpointRequest(); + auto* encryption = request.MutableEncryptionSpec(); + encryption->SetMode(NProto::EEncryptionMode::ENCRYPTION_AES_XTS); + encryption->MutableKeyPath()->SetFilePath("/tmp/secret.key"); + + auto error = + Listener->StartEndpoint(request, Volume, Session).GetValueSync(); + UNIT_ASSERT_C(!HasError(error), error); + + auto* create = std::get_if(&History[0]); + + /* + --serial local0 2 + --disk-id vol0 2 + --socket-path /tmp/socket.vhost 2 + -q 2 2 + --device ... 2 + --device ... 2 + --read-only 1 + --wait-after-parent-exit ... 2 + --wait-after-parent-exit ... 2 + --encryption-mode ... 2 + --encryption-key-path ... 2 + 19 + */ + + UNIT_ASSERT_VALUES_EQUAL(19, create->CmdArgs.size()); + + UNIT_ASSERT_VALUES_EQUAL( + "aes-xts", + GetArg(create->CmdArgs, "--encryption-mode")); + UNIT_ASSERT_VALUES_EQUAL( + "/tmp/secret.key", + GetArg(create->CmdArgs, "--encryption-key-path")); + } + + Y_UNIT_TEST_F(ShouldStartEncryptedAioExternalEndpointWithKeyring, TFixture) + { + auto request = CreateDefaultStartEndpointRequest(); + auto* encryption = request.MutableEncryptionSpec(); + encryption->SetMode(NProto::EEncryptionMode::ENCRYPTION_AES_XTS); + encryption->MutableKeyPath()->SetKeyringId(100); + + auto error = + Listener->StartEndpoint(request, Volume, Session).GetValueSync(); + UNIT_ASSERT_C(!HasError(error), error); + + auto* create = std::get_if(&History[0]); + + /* + --serial local0 2 + --disk-id vol0 2 + --socket-path /tmp/socket.vhost 2 + -q 2 2 + --device ... 2 + --device ... 2 + --read-only 1 + --wait-after-parent-exit ... 2 + --wait-after-parent-exit ... 2 + --encryption-mode ... 2 + --encryption-keyring-id ... 2 + 19 + */ + + UNIT_ASSERT_VALUES_EQUAL(19, create->CmdArgs.size()); + + UNIT_ASSERT_VALUES_EQUAL( + "aes-xts", + GetArg(create->CmdArgs, "--encryption-mode")); + UNIT_ASSERT_VALUES_EQUAL( + "100", + GetArg(create->CmdArgs, "--encryption-keyring-id")); + } + Y_UNIT_TEST_F(ShouldStartRdmaExternalEndpoint, TFixture) { UNIT_ASSERT_VALUES_EQUAL(0, History.size()); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp index 2f1246e2f62..9258742305b 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_monitoring.cpp @@ -1159,10 +1159,26 @@ void TVolumeActor::RenderConfig(IOutputStream& out) const } TABLER() { - TABLED() { out << "Encryption mode"; } + TABLED() { out << "Encryption"; } TABLED() { - out << NProto::EEncryptionMode_Name( - (NProto::EEncryptionMode)volumeConfig.GetEncryptionDesc().GetMode()); + DIV() + { + auto encryptionMode = + static_cast( + volumeConfig.GetEncryptionDesc().GetMode()); + out << NProto::EEncryptionMode_Name(encryptionMode); + } + DIV() + { + const auto& keyHash = + volumeConfig.GetEncryptionDesc().GetKeyHash(); + if (keyHash.empty()) { + out << "Binding to the encryption key has not " + "yet occurred."; + } else { + out << "Encryption key hash: " << keyHash; + } + } } } diff --git a/cloud/blockstore/tests/vhost-server/ya.make b/cloud/blockstore/tests/vhost-server/ya.make index 1d29fa1eb7a..aa49767f356 100644 --- a/cloud/blockstore/tests/vhost-server/ya.make +++ b/cloud/blockstore/tests/vhost-server/ya.make @@ -12,7 +12,4 @@ DEPENDS( DATA( ) -PEERDIR( -) - END() diff --git a/cloud/blockstore/tests/ya.make b/cloud/blockstore/tests/ya.make index cbefe618f20..ed10ff3d96d 100644 --- a/cloud/blockstore/tests/ya.make +++ b/cloud/blockstore/tests/ya.make @@ -29,4 +29,5 @@ RECURSE( stats_aggregator_perf storage_discovery vhost-server + vhost-server/run_and_die ) diff --git a/cloud/blockstore/tools/nbd/CMakeLists.linux-x86_64.txt b/cloud/blockstore/tools/nbd/CMakeLists.linux-x86_64.txt index 0daf304ed2e..4e5b1ea2c11 100644 --- a/cloud/blockstore/tools/nbd/CMakeLists.linux-x86_64.txt +++ b/cloud/blockstore/tools/nbd/CMakeLists.linux-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(blockstore-nbd PUBLIC blockstore-libs-client blockstore-libs-common blockstore-libs-diagnostics + blockstore-libs-encryption blockstore-libs-nbd blockstore-libs-service core-libs-grpc diff --git a/cloud/blockstore/tools/nbd/options.cpp b/cloud/blockstore/tools/nbd/options.cpp index a71bc417c7c..01cc63da536 100644 --- a/cloud/blockstore/tools/nbd/options.cpp +++ b/cloud/blockstore/tools/nbd/options.cpp @@ -1,5 +1,6 @@ #include "options.h" +#include #include #include @@ -66,23 +67,6 @@ NProto::EVolumeMountMode MountModeFromString(const TString& s) ythrow yexception() << "invalid mount mode: " << s; } -//////////////////////////////////////////////////////////////////////////////// - -static const TMap EncryptionModes = { - { "no", NProto::NO_ENCRYPTION }, - { "aes-xts", NProto::ENCRYPTION_AES_XTS }, -}; - -NProto::EEncryptionMode EncryptionModeFromString(const TString& str) -{ - auto it = EncryptionModes.find(str); - if (it != EncryptionModes.end()) { - return it->second; - } - - ythrow yexception() << "invalid encryption mode: " << str; -} - } // namespace //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/tools/nbd/ya.make b/cloud/blockstore/tools/nbd/ya.make index 4b92acea08b..2ff8add4e74 100644 --- a/cloud/blockstore/tools/nbd/ya.make +++ b/cloud/blockstore/tools/nbd/ya.make @@ -12,6 +12,7 @@ PEERDIR( cloud/blockstore/config cloud/blockstore/libs/client cloud/blockstore/libs/common + cloud/blockstore/libs/encryption/model cloud/blockstore/libs/diagnostics cloud/blockstore/libs/nbd cloud/blockstore/libs/service diff --git a/cloud/blockstore/vhost-server/CMakeLists.linux-x86_64.txt b/cloud/blockstore/vhost-server/CMakeLists.linux-x86_64.txt index fb5d5f560d4..ab049641c7c 100644 --- a/cloud/blockstore/vhost-server/CMakeLists.linux-x86_64.txt +++ b/cloud/blockstore/vhost-server/CMakeLists.linux-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(blockstore-vhost-server PUBLIC yutil library-cpp-cpuid_check blockstore-libs-client + blockstore-libs-encryption libs-rdma-impl blockstore-libs-service_local vhost-server @@ -43,6 +44,7 @@ target_sources(blockstore-vhost-server PRIVATE ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/backend_aio.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/backend_rdma.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/backend_null.cpp + ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/critical_event.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/histogram.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/options.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/request_aio.cpp diff --git a/cloud/blockstore/vhost-server/backend.h b/cloud/blockstore/vhost-server/backend.h index 2b339738d4a..e5befd727d1 100644 --- a/cloud/blockstore/vhost-server/backend.h +++ b/cloud/blockstore/vhost-server/backend.h @@ -12,8 +12,6 @@ namespace NCloud::NBlockStore::NVHostServer { struct IBackend: public IStartable { - virtual ~IBackend() = default; - virtual vhd_bdev_info Init(const TOptions& options) = 0; virtual void ProcessQueue( ui32 queueIndex, diff --git a/cloud/blockstore/vhost-server/backend_aio.cpp b/cloud/blockstore/vhost-server/backend_aio.cpp index 89f01e2048b..40402807a27 100644 --- a/cloud/blockstore/vhost-server/backend_aio.cpp +++ b/cloud/blockstore/vhost-server/backend_aio.cpp @@ -13,15 +13,20 @@ #include #include +#include namespace NCloud::NBlockStore::NVHostServer { +using namespace NThreading; + namespace { //////////////////////////////////////////////////////////////////////////////// void CompleteRequest( - TAioRequest* req, + TLog& log, + IEncryptor* encryptor, + TAioRequestHolder req, vhd_bdev_io_result status, TSimpleStats& stats, TCpuCycles now) @@ -29,45 +34,59 @@ void CompleteRequest( auto* bio = vhd_get_bdev_io(req->Io); const ui64 bytes = bio->total_sectors * VHD_SECTOR_SIZE; - stats.Requests[bio->type].Errors += status != VHD_BDEV_SUCCESS; - stats.Requests[bio->type].Count += status == VHD_BDEV_SUCCESS; - stats.Requests[bio->type].Bytes += bytes; - stats.Requests[bio->type].Unaligned += req->BounceBuf; + auto& requestStat = stats.Requests[bio->type]; + requestStat.Errors += status != VHD_BDEV_SUCCESS; + requestStat.Count += status == VHD_BDEV_SUCCESS; + requestStat.Bytes += bytes; + requestStat.Unaligned += req->Unaligned; if (status == VHD_BDEV_SUCCESS) { stats.Times[bio->type].Increment(now - req->SubmitTs); stats.Sizes[bio->type].Increment(bytes); } - if (req->BounceBuf) { + if (req->BufferAllocated || encryptor) { if (bio->type == VHD_BDEV_READ && status == VHD_BDEV_SUCCESS) { - SgListCopy( + NSan::Unpoison(req->Data[0].iov_base, req->Data[0].iov_len); + const bool success = SgListCopyWithOptionalDecryption( + log, static_cast(req->Data[0].iov_base), - bio->sglist); + bio->sglist, + encryptor, + bio->first_sector); + if (!success) { + status = VHD_BDEV_IOERR; + stats.EncryptorErrors++; + } } - std::free(req->Data[0].iov_base); } vhd_complete_bio(req->Io, status); - std::free(req); } -void CompleteRequest( - iocb* sub, - TAioCompoundRequest* req, +void CompleteCompoundRequest( + TLog& log, + IEncryptor* encryptor, + TAioSubRequestHolder sub, vhd_bdev_io_result status, TSimpleStats& stats, TCpuCycles now) { - req->Errors += status != VHD_BDEV_SUCCESS; + auto* req = sub->GetParentRequest(); - auto* bio = vhd_get_bdev_io(req->Io); + req->Errors += status != VHD_BDEV_SUCCESS; if (req->Inflight.fetch_sub(1) == 1) { + // This is the last subrequest. Take ownership of the parent request and + // release it when leave the scope. + auto holder = sub->TakeParentRequest(); + + auto* bio = vhd_get_bdev_io(req->Io); const ui64 bytes = bio->total_sectors * VHD_SECTOR_SIZE; - stats.Requests[bio->type].Errors += req->Errors != 0; - stats.Requests[bio->type].Count += 1; - stats.Requests[bio->type].Bytes += bytes; + auto& requestStat = stats.Requests[bio->type]; + requestStat.Errors += req->Errors != 0; + requestStat.Count += 1; + requestStat.Bytes += bytes; if (status == VHD_BDEV_SUCCESS) { stats.Times[bio->type].Increment(now - req->SubmitTs); @@ -75,15 +94,20 @@ void CompleteRequest( } if (bio->type == VHD_BDEV_READ && status == VHD_BDEV_SUCCESS) { - SgListCopy(req->Buffer, bio->sglist); + NSan::Unpoison(req->Buffer.get(), bytes); + const bool success = SgListCopyWithOptionalDecryption( + log, + req->Buffer.get(), + bio->sglist, + encryptor, + bio->first_sector); + if (!success) { + status = VHD_BDEV_IOERR; + stats.EncryptorErrors++; + } } - std::free(req->Buffer); - vhd_complete_bio(req->Io, status); - std::free(req); } - - std::free(sub); } //////////////////////////////////////////////////////////////////////////////// @@ -94,6 +118,7 @@ class TAioBackend final: public IBackend const ILoggingServicePtr Logging; TLog Log; + IEncryptorPtr Encryptor; TVector Devices; io_context_t Io = {}; @@ -106,7 +131,7 @@ class TAioBackend final: public IBackend ICompletionStatsPtr CompletionStats; public: - explicit TAioBackend(ILoggingServicePtr logging); + TAioBackend(IEncryptorPtr encryptor, ILoggingServicePtr logging); vhd_bdev_info Init(const TOptions& options) override; void Start() override; @@ -118,17 +143,26 @@ class TAioBackend final: public IBackend std::optional GetCompletionStats(TDuration timeout) override; private: + // Dequeue requests from |queue| and start processing them. Returns the + // number of dequeued requests. + // Puts started io-requests iocb to |batch|. Some requests may splitted for + // cross-device read/write and produce two iocb. size_t PrepareBatch( vhd_request_queue* queue, TVector& batch, - TCpuCycles now); + TCpuCycles now, + TSimpleStats& queueStats); + void CompletionThreadFunc(); }; //////////////////////////////////////////////////////////////////////////////// -TAioBackend::TAioBackend(ILoggingServicePtr logging) +TAioBackend::TAioBackend( + IEncryptorPtr encryptor, + ILoggingServicePtr logging) : Logging{std::move(logging)} + , Encryptor(std::move(encryptor)) , CompletionStats(CreateCompletionStats()) { Log = Logging->CreateLog("AIO"); @@ -137,7 +171,9 @@ TAioBackend::TAioBackend(ILoggingServicePtr logging) vhd_bdev_info TAioBackend::Init(const TOptions& options) { STORAGE_INFO("Initializing AIO backend"); - + if (Encryptor) { + STORAGE_INFO("Encryption enabled"); + } BatchSize = options.BatchSize; Y_ABORT_UNLESS(io_setup(BatchSize, &Io) >= 0, "io_setup"); @@ -159,7 +195,7 @@ vhd_bdev_info TAioBackend::Init(const TOptions& options) i64 totalBytes = 0; - for (auto& chunk: options.Layout) { + for (const auto& chunk: options.Layout) { TFileHandle file{chunk.DevicePath, flags}; if (!file.IsOpen()) { @@ -255,7 +291,7 @@ void TAioBackend::ProcessQueue( const TCpuCycles now = GetCycleCount(); // append new requests to the tail of the batch - queueStats.Dequeued += PrepareBatch(queue, batch, now); + queueStats.Dequeued += PrepareBatch(queue, batch, now, queueStats); if (batch.empty()) { break; @@ -279,15 +315,18 @@ void TAioBackend::ProcessQueue( ++queueStats.SubFailed; if (batch[0]->data) { - CompleteRequest( - batch[0], - static_cast(batch[0]->data), + CompleteCompoundRequest( + Log, + Encryptor.get(), + TAioSubRequest::FromIocb(batch[0]), VHD_BDEV_IOERR, queueStats, now); } else { CompleteRequest( - static_cast(batch[0]), + Log, + Encryptor.get(), + TAioRequest::FromIocb(batch[0]), VHD_BDEV_IOERR, queueStats, now); @@ -311,18 +350,26 @@ std::optional TAioBackend::GetCompletionStats(TDuration timeout) size_t TAioBackend::PrepareBatch( vhd_request_queue* queue, TVector& batch, - TCpuCycles now) + TCpuCycles now, + TSimpleStats& queueStats) { - const size_t size = batch.size(); + const size_t initialSize = batch.size(); - vhd_request req; + vhd_request req{}; while (batch.size() < BatchSize && vhd_dequeue_request(queue, &req)) { Y_DEBUG_ABORT_UNLESS(vhd_vdev_get_priv(req.vdev) == this); - PrepareIO(Log, Devices, req.io, batch, now); + PrepareIO( + Log, + Encryptor.get(), + Devices, + req.io, + batch, + now, + queueStats); } - return batch.size() - size; + return batch.size() - initialSize; } void TAioBackend::CompletionThreadFunc() @@ -338,7 +385,7 @@ void TAioBackend::CompletionThreadFunc() TVector events(BatchSize); TSimpleStats stats; - timespec timeout{.tv_sec = 1}; + timespec timeout{.tv_sec = 1, .tv_nsec = 0}; for (;;) { // TODO: try AIO_RING_MAGIC trick @@ -363,10 +410,7 @@ void TAioBackend::CompletionThreadFunc() for (int i = 0; i != ret; ++i) { if (events[i].data) { - auto* req = static_cast(events[i].data); - NSan::Acquire(req); - iocb* sub = events[i].obj; - NSan::Acquire(sub); + auto sub = TAioSubRequest::FromIocb(events[i].obj); vhd_bdev_io_result result = VHD_BDEV_SUCCESS; @@ -384,13 +428,18 @@ void TAioBackend::CompletionThreadFunc() strerror(-events[i].res)); } - CompleteRequest(sub, req, result, stats, now); + CompleteCompoundRequest( + Log, + Encryptor.get(), + std::move(sub), + result, + stats, + now); continue; } - auto* req = static_cast(events[i].obj); - NSan::Acquire(req); + auto req = TAioRequest::FromIocb(events[i].obj); vhd_bdev_io_result result = VHD_BDEV_SUCCESS; auto* bio = vhd_get_bdev_io(req->Io); @@ -411,7 +460,13 @@ void TAioBackend::CompletionThreadFunc() strerror(-events[i].res)); } - CompleteRequest(req, result, stats, now); + CompleteRequest( + Log, + Encryptor.get(), + std::move(req), + result, + stats, + now); } stats.Completed += ret; @@ -422,9 +477,13 @@ void TAioBackend::CompletionThreadFunc() //////////////////////////////////////////////////////////////////////////////// -IBackendPtr CreateAioBackend(ILoggingServicePtr logging) +IBackendPtr CreateAioBackend( + IEncryptorPtr encryptor, + ILoggingServicePtr logging) { - return std::make_shared(std::move(logging)); + return std::make_shared( + std::move(encryptor), + std::move(logging)); } } // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/backend_aio.h b/cloud/blockstore/vhost-server/backend_aio.h index 3f438a91431..8a19eb1e92a 100644 --- a/cloud/blockstore/vhost-server/backend_aio.h +++ b/cloud/blockstore/vhost-server/backend_aio.h @@ -2,12 +2,15 @@ #include "public.h" +#include #include namespace NCloud::NBlockStore::NVHostServer { //////////////////////////////////////////////////////////////////////////////// -IBackendPtr CreateAioBackend(ILoggingServicePtr logging); +IBackendPtr CreateAioBackend( + IEncryptorPtr encryptor, + ILoggingServicePtr logging); } // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/critical_event.cpp b/cloud/blockstore/vhost-server/critical_event.cpp new file mode 100644 index 00000000000..65b56f79733 --- /dev/null +++ b/cloud/blockstore/vhost-server/critical_event.cpp @@ -0,0 +1,56 @@ +#include "critical_event.h" + +#include + +#include +#include + +namespace NCloud::NBlockStore::NVHostServer { + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +constexpr size_t MaxStoredCriticalEventCount = 1024; + +TLog Log; +TCriticalEvents CriticalEventsAccumulator; +TAdaptiveLock Guard; + +} // namespace + +void SetCriticalEventsLog(TLog log) +{ + Log = std::move(log); +} + +void ReportCriticalEvent(TString sensorName, TString message) +{ + if (Log.IsOpen()) { + TStringBuilder fullMessage; + fullMessage << "CRITICAL_EVENT:" << sensorName; + if (message) { + fullMessage << ":" << message; + } + STORAGE_ERROR(fullMessage); + } + + with_lock (Guard) { + if (CriticalEventsAccumulator.size() >= MaxStoredCriticalEventCount) { + return; + } + CriticalEventsAccumulator.push_back( + TCriticalEvent{std::move(sensorName), std::move(message)}); + } +} + +TCriticalEvents TakeAccumulatedCriticalEvents() +{ + TCriticalEvents result; + with_lock (Guard) { + CriticalEventsAccumulator.swap(result); + } + return result; +} + +} // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/critical_event.h b/cloud/blockstore/vhost-server/critical_event.h new file mode 100644 index 00000000000..264eeabf832 --- /dev/null +++ b/cloud/blockstore/vhost-server/critical_event.h @@ -0,0 +1,24 @@ +#pragma once + +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NVHostServer { + +//////////////////////////////////////////////////////////////////////////////// + +struct TCriticalEvent +{ + TString SensorName; + TString Message; +}; +using TCriticalEvents = TVector; + +void SetCriticalEventsLog(TLog log); +void ReportCriticalEvent(TString sensorName, TString message); + +TCriticalEvents TakeAccumulatedCriticalEvents(); + +} // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/gtest/ya.make b/cloud/blockstore/vhost-server/gtest/ya.make new file mode 100644 index 00000000000..00fe4d10b85 --- /dev/null +++ b/cloud/blockstore/vhost-server/gtest/ya.make @@ -0,0 +1,39 @@ +GTEST() + +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/small.inc) + +SRCS( + ../backend.cpp + ../backend_aio.cpp + ../critical_event.cpp + ../histogram.cpp + ../options.cpp + ../request_aio.cpp + ../server.cpp + ../stats.cpp + + ../server_ut.cpp +) + +ADDINCL( + cloud/contrib/vhost +) + +PEERDIR( + cloud/blockstore/libs/common + cloud/blockstore/libs/encryption + cloud/blockstore/libs/encryption/model + cloud/contrib/vhost + + cloud/storage/core/libs/common + cloud/storage/core/libs/diagnostics + cloud/storage/core/libs/vhost-client + + library/cpp/getopt + library/cpp/getopt/small + library/cpp/testing/gtest + + contrib/libs/libaio +) + +END() diff --git a/cloud/blockstore/vhost-server/main.cpp b/cloud/blockstore/vhost-server/main.cpp index 1545eeb8468..776cc032e7e 100644 --- a/cloud/blockstore/vhost-server/main.cpp +++ b/cloud/blockstore/vhost-server/main.cpp @@ -1,8 +1,11 @@ #include "backend_aio.h" #include "backend_null.h" #include "backend_rdma.h" +#include "critical_event.h" #include "server.h" +#include +#include #include #include @@ -14,7 +17,7 @@ #include #include -using namespace NCloud::NBlockStore::NVHostServer; +namespace NCloud::NBlockStore::NVHostServer { namespace { @@ -113,12 +116,41 @@ NCloud::ILoggingServicePtr CreateLogService(const TOptions& options) return std::make_shared(logLevel); } +IEncryptorPtr CreateEncryptor( + const TOptions& options, + NCloud::ILoggingServicePtr logging) +{ + auto encryptionSpec = options.GetEncryptionSpec(); + if (encryptionSpec.GetMode() == NProto::NO_ENCRYPTION) { + return {}; + } + + auto Log = logging->CreateLog("SERVER"); + + STORAGE_INFO("Encryption. Get key " << encryptionSpec.AsJSON()); + + auto keyProvider = CreateDefaultEncryptionKeyProvider(); + auto keyFuture = + keyProvider->GetKey(options.GetEncryptionSpec(), options.DiskId); + auto keyOrError = std::move(keyFuture).ExtractValue(); + if (HasError(keyOrError)) { + Y_ABORT( + "Error getting encryption key: %s", + ToString(keyOrError.GetError()).c_str()); + } + auto key = keyOrError.ExtractResult(); + STORAGE_ERROR("Got encryption key with hash " << key.GetHash()); + return CreateAesXtsEncryptor(std::move(key)); +} + IBackendPtr CreateBackend( const TOptions& options, NCloud::ILoggingServicePtr logging) { + auto encryptor = CreateEncryptor(options, logging); + if (options.DeviceBackend == "aio") { - return CreateAioBackend(logging); + return CreateAioBackend(std::move(encryptor), logging); } else if (options.DeviceBackend == "rdma") { return CreateRdmaBackend(logging); } else if (options.DeviceBackend == "null") { @@ -153,8 +185,12 @@ void SetProcessMark(const TString& diskId) } // namespace +} // namespace NCloud::NBlockStore::NVHostServer + //////////////////////////////////////////////////////////////////////////////// +using namespace NCloud::NBlockStore::NVHostServer; + int main(int argc, char** argv) { TOptions options; @@ -182,6 +218,7 @@ int main(int argc, char** argv) auto backend = CreateBackend(options, logService); auto server = CreateServer(logService, backend); auto Log = logService->CreateLog("SERVER"); + SetCriticalEventsLog(Log); server->Start(options); @@ -221,11 +258,11 @@ int main(int argc, char** argv) } switch (sig) { case SIGUSR1: { - auto stats = server->GetStats(prevStats); + auto completeStats = server->GetStats(prevStats); auto now = Now(); try { DumpStats( - stats, + completeStats, prevStats, now - ts, Cout, diff --git a/cloud/blockstore/vhost-server/options.cpp b/cloud/blockstore/vhost-server/options.cpp index c6b234d2ac2..f96963661e0 100644 --- a/cloud/blockstore/vhost-server/options.cpp +++ b/cloud/blockstore/vhost-server/options.cpp @@ -1,5 +1,7 @@ #include "options.h" +#include + #include #include @@ -136,6 +138,23 @@ void TOptions::Parse(int argc, char** argv) .NoArgument() .SetFlag(&RdmaClient.AlignedData); + opts.AddLongOption("encryption-mode", "encryption mode [no|aes-xts|test]") + .RequiredArgument("STR") + .Handler1T([this](const auto& s) + { EncryptionMode = EncryptionModeFromString(s); }); + + opts.AddLongOption( + "encryption-key-path", + "path to file with encryption key") + .RequiredArgument("STR") + .StoreResult(&EncryptionKeyPath); + + opts.AddLongOption( + "encryption-keyring-id", + "keyring id with encryption key") + .RequiredArgument("INT") + .StoreResult(&EncryptionKeyringId); + TOptsParseResultException res(&opts, argc, argv); if (res.FindLongOptParseResult("verbose") && VerboseLevel.empty()) { @@ -162,4 +181,17 @@ void TOptions::Parse(int argc, char** argv) } } +NProto::TEncryptionSpec TOptions::GetEncryptionSpec() const +{ + NProto::TEncryptionSpec result; + result.SetMode(EncryptionMode); + if (EncryptionKeyPath) { + result.MutableKeyPath()->SetFilePath(EncryptionKeyPath); + } + if (EncryptionKeyringId) { + result.MutableKeyPath()->SetKeyringId(EncryptionKeyringId); + } + return result; +} + } // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/options.h b/cloud/blockstore/vhost-server/options.h index b406c053d8c..e6ddf1f39cd 100644 --- a/cloud/blockstore/vhost-server/options.h +++ b/cloud/blockstore/vhost-server/options.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -40,6 +41,12 @@ struct TOptions TString ClientId = "vhost-server"; TDuration WaitAfterParentExit = TDuration::Seconds(60); + NProto::EEncryptionMode EncryptionMode = NProto::NO_ENCRYPTION; + TString EncryptionKeyPath; + ui32 EncryptionKeyringId = 0; + + NProto::TEncryptionSpec GetEncryptionSpec() const; + struct { ui32 QueueSize = 256; diff --git a/cloud/blockstore/vhost-server/request_aio.cpp b/cloud/blockstore/vhost-server/request_aio.cpp index 900dc93e749..795d5e0c360 100644 --- a/cloud/blockstore/vhost-server/request_aio.cpp +++ b/cloud/blockstore/vhost-server/request_aio.cpp @@ -1,23 +1,74 @@ #include "request_aio.h" +#include "critical_event.h" + +#include #include #include +#include #include #include +#include +#include namespace NCloud::NBlockStore::NVHostServer { namespace { //////////////////////////////////////////////////////////////////////////////// +template +[[nodiscard]] bool DoCryptoOperation( + IEncryptor& encryptor, + TBlockDataRef src, + TBlockDataRef dst, + ui64 startSector) +{ + const size_t sectorCount = src.Size() / VHD_SECTOR_SIZE; + + for (size_t i = 0; i < sectorCount; ++i) { + TBlockDataRef srcRef(src.Data() + i * VHD_SECTOR_SIZE, VHD_SECTOR_SIZE); + TBlockDataRef dstRef(dst.Data() + i * VHD_SECTOR_SIZE, VHD_SECTOR_SIZE); + + if constexpr (DoDecrypt) { + if (IsAllZeroes(srcRef.Data(), srcRef.Size())) { + // If there was a reading from a block that has not yet been + // written, then we return a block consisting of only zeros. + memset(const_cast(dstRef.Data()), 0, dstRef.Size()); + continue; + } + + if (!encryptor.Decrypt(srcRef, dstRef, startSector + i)) { + // Something went wrong inside the decryption operation. + return false; + } + } else { + if (!encryptor.Encrypt(srcRef, dstRef, startSector + i)) { + // Something went wrong inside the encryption operation. + return false; + } + + if (IsAllZeroes(dstRef.Data(), dstRef.Size())) { + ReportCriticalEvent( + "EncryptorGeneratedZeroBlock", + TStringBuilder() << "Encryptor has generated a zero block #" + << startSector + i << " !"); + return false; + } + } + } + return true; +} + void PrepareCompoundIO( + IEncryptor* encryptor, TLog& Log, const TVector& devices, vhd_io* io, TVector& batch, - TCpuCycles now) + TCpuCycles now, + TSimpleStats& queueStats) { auto* bio = vhd_get_bdev_io(io); const i64 logicalOffset = bio->first_sector * VHD_SECTOR_SIZE; @@ -31,7 +82,7 @@ void PrepareCompoundIO( return device.EndOffset <= offset; }); - auto end = std::lower_bound( + const auto end = std::lower_bound( it, devices.end(), logicalOffset + totalBytes, @@ -50,85 +101,153 @@ void PrepareCompoundIO( bio->first_sector, bio->total_sectors); - auto* req = static_cast( - std::calloc(1, sizeof(TAioCompoundRequest))); - - req->Inflight = n; - req->Io = io; - req->Buffer = static_cast(std::aligned_alloc( - VHD_SECTOR_SIZE, - totalBytes)); - req->SubmitTs = now; + auto req = TAioCompoundRequest::CreateNew(n, io, totalBytes, now); if (bio->type == VHD_BDEV_WRITE) { - SgListCopy(bio->sglist, req->Buffer); + const bool success = SgListCopyWithOptionalEncryption( + Log, + bio->sglist, + req->Buffer.get(), + encryptor, + bio->first_sector); + if (!success) { + ++queueStats.EncryptorErrors; + vhd_complete_bio(req->Io, VHD_BDEV_IOERR); + return; + } } ui64 deviceOffset = logicalOffset - it->StartOffset; - char* ptr = req->Buffer; + char* ptr = req->Buffer.get(); for (; it != end; ++it) { - iocb* sreq = static_cast(std::calloc(1, sizeof(iocb))); + auto subRequest = TAioSubRequest::CreateNew(); const ui64 count = Min( totalBytes, it->EndOffset - it->StartOffset - deviceOffset); if (bio->type == VHD_BDEV_READ) { - io_prep_pread(sreq, it->File, ptr, count, it->FileOffset + deviceOffset); + io_prep_pread( + subRequest.get(), + it->File, + ptr, + count, + it->FileOffset + deviceOffset); } else { - io_prep_pwrite(sreq, it->File, ptr, count, it->FileOffset + deviceOffset); + io_prep_pwrite( + subRequest.get(), + it->File, + ptr, + count, + it->FileOffset + deviceOffset); } - sreq->data = req; + // Save the address of TAioCompoundRequest in each subrequest to share + // ownership among all subrequests. + subRequest->data = req.get(); - batch.push_back(sreq); - NSan::Release(sreq); + NSan::Release(subRequest.get()); + batch.push_back(subRequest.release()); ptr += count; totalBytes -= count; deviceOffset = 0; } - NSan::Release(req); + + // Ownership transferred to subrequests. + NSan::Release(req.get()); + req.release(); } } // namespace //////////////////////////////////////////////////////////////////////////////// -void SgListCopy(const char* src, const vhd_sglist& dst) +bool SgListCopyWithOptionalDecryption( + TLog& Log, + const char* src, + const vhd_sglist& dst, + IEncryptor* encryptor, + ui64 startSector) { - auto [count, bufs] = dst; + auto buffers = std::span{dst.buffers, dst.nbuffers}; - for (ui32 i = 0; i != count; ++i) { - std::memcpy(bufs[i].base, src, bufs[i].len); - src += bufs[i].len; + if (!encryptor) { + for (auto& buffer: buffers) { + std::memcpy(buffer.base, src, buffer.len); + src += buffer.len; + } + return true; } + + for (auto& buffer: buffers) { + TBlockDataRef srcRef{src, buffer.len}; + TBlockDataRef dstRef{static_cast(buffer.base), buffer.len}; + if (!DoCryptoOperation(*encryptor, srcRef, dstRef, startSector)) { + STORAGE_ERROR( + "Decryption error. Start block %" PRIu64 + ", blocks count %" PRIu64, + startSector, + buffers.size()); + return false; + } + startSector += buffer.len / VHD_SECTOR_SIZE; + src += buffer.len; + } + return true; } -void SgListCopy(const vhd_sglist& src, char* dst) +bool SgListCopyWithOptionalEncryption( + TLog& Log, + const vhd_sglist& src, + char* dst, + IEncryptor* encryptor, + ui64 startSector) { - auto [count, bufs] = src; + auto buffers = std::span{src.buffers, src.nbuffers}; - for (ui32 i = 0; i != count; ++i) { - std::memcpy(dst, bufs[i].base, bufs[i].len); - dst += bufs[i].len; + if (!encryptor) { + for (auto& buffer: buffers) { + std::memcpy(dst, buffer.base, buffer.len); + dst += buffer.len; + } + return true; } + + for (auto& buffer: buffers) { + TBlockDataRef srcRef{static_cast(buffer.base), buffer.len}; + TBlockDataRef dstRef{dst, buffer.len}; + if (!DoCryptoOperation(*encryptor, srcRef, dstRef, startSector)) + { + STORAGE_ERROR( + "Encryption error. Start block %" PRIu64 + ", blocks count %" PRIu64, + startSector, + buffers.size()); + return false; + } + startSector += buffer.len / VHD_SECTOR_SIZE; + dst += buffer.len; + } + return true; } //////////////////////////////////////////////////////////////////////////////// void PrepareIO( TLog& Log, + IEncryptor* encryptor, const TVector& devices, vhd_io* io, TVector& batch, - TCpuCycles now) + TCpuCycles now, + TSimpleStats& queueStats) { auto* bio = vhd_get_bdev_io(io); const i64 logicalOffset = bio->first_sector * VHD_SECTOR_SIZE; const i64 totalBytes = bio->total_sectors * VHD_SECTOR_SIZE; - auto it = std::lower_bound( + const auto *it = std::lower_bound( devices.begin(), devices.end(), logicalOffset, @@ -139,7 +258,8 @@ void PrepareIO( Y_ABORT_UNLESS(it != devices.end()); if (it->EndOffset < logicalOffset + totalBytes) { - PrepareCompoundIO(Log, devices, io, batch, now); + // The request is cross-device, so we split it into two. + PrepareCompoundIO(encryptor, Log, devices, io, batch, now, queueStats); return; } @@ -150,58 +270,180 @@ void PrepareIO( bio->first_sector, bio->total_sectors); - auto [nbufs, buffers] = bio->sglist; - - auto* req = static_cast(std::calloc(1, - sizeof(TAioRequest) + sizeof(iovec) * nbufs)); - - req->Io = io; - req->SubmitTs = now; - - for (ui32 i = 0; i != nbufs; ++i) { - // Windows allows i/o with buffers not aligned to i/o block size, but - // Linux doesn't, so use bounce buffer in this case. - // Note: the required alignment is the logical block size of the - // underlying storage; assume it to equal the sector size as BIOS - // requires sector-granular i/o anyway. - if (!VHD_IS_ALIGNED((uintptr_t) buffers[i].base, VHD_SECTOR_SIZE) || - !VHD_IS_ALIGNED(buffers[i].len, VHD_SECTOR_SIZE)) + auto buffers = + std::span{bio->sglist.buffers, bio->sglist.nbuffers}; + + // Windows allows i/o with buffers not aligned to i/o block size, but + // Linux doesn't, so use bounce buffer in this case. + // Note: the required alignment is the logical block size of the + // underlying storage; assume it to equal the sector size as BIOS + // requires sector-granular i/o anyway. + const bool isAllBuffersAligned = AllOf( + buffers, + [](const vhd_buffer& buffer) { - req->BounceBuf = true; - req->Data[0].iov_len = totalBytes; - req->Data[0].iov_base = std::aligned_alloc( - VHD_SECTOR_SIZE, - req->Data[0].iov_len); - - if (bio->type == VHD_BDEV_WRITE) { - char* dst = static_cast(req->Data[0].iov_base); - for (ui32 i = 0; i != nbufs; ++i) { - std::memcpy(dst, buffers[i].base, buffers[i].len); - dst += buffers[i].len; - } + return VHD_IS_ALIGNED((uintptr_t)buffer.base, VHD_SECTOR_SIZE) && + VHD_IS_ALIGNED(buffer.len, VHD_SECTOR_SIZE); + } + ); + + const bool needToAllocateBuffer = + !isAllBuffersAligned || (encryptor && bio->type == VHD_BDEV_WRITE); + + auto req = TAioRequest::CreateNew( + needToAllocateBuffer ? 1 : buffers.size(), + needToAllocateBuffer ? totalBytes : 0, + io, + now); + + if (needToAllocateBuffer) { + req->Unaligned = !isAllBuffersAligned; + if (bio->type == VHD_BDEV_WRITE) { + const bool success = SgListCopyWithOptionalEncryption( + Log, + bio->sglist, + static_cast(req->Data[0].iov_base), + encryptor, + bio->first_sector); + if (!success) { + ++queueStats.EncryptorErrors; + vhd_complete_bio(req->Io, VHD_BDEV_IOERR); + return; } - - nbufs = 1; - - break; } - - req->Data[i].iov_base = buffers[i].base; - req->Data[i].iov_len = buffers[i].len; + // Instead of multiple buffers, we have allocated one large buffer. + buffers = buffers.subspan(0, 1); + } else { + for (ui32 i = 0; i != buffers.size(); ++i) { + req->Data[i].iov_base = buffers[i].base; + req->Data[i].iov_len = buffers[i].len; + } } const auto offset = it->FileOffset + logicalOffset - it->StartOffset; if (bio->type == VHD_BDEV_READ) { - io_prep_preadv(req, it->File, req->Data, nbufs, offset); + io_prep_preadv(req.get(), it->File, req->Data, buffers.size(), offset); } else { - io_prep_pwritev(req, it->File, req->Data, nbufs, offset); + io_prep_pwritev(req.get(), it->File, req->Data, buffers.size(), offset); } - STORAGE_DEBUG("Prepared IO request with addr: %p", req); + STORAGE_DEBUG("Prepared IO request with addr: %p", req.get()); - batch.push_back(req); - NSan::Release(req); + NSan::Release(req.get()); + batch.push_back(req.release()); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TFreeDeleter::operator()(void* obj) +{ + std::free(obj); +} + +void TAioRequestDeleter::operator()(TAioRequest* obj) +{ + if (obj->BufferAllocated) { + std::free(obj->Data[0].iov_base); + } + std::free(obj); +} + +TAioRequest::TAioRequest( + size_t allocatedBufferSize, + vhd_io* io, + TCpuCycles submitTs) + : iocb() + , Io(io) + , SubmitTs(submitTs) + , BufferAllocated(allocatedBufferSize != 0) +{ + if (allocatedBufferSize) { + Data[0].iov_len = allocatedBufferSize; + Data[0].iov_base = + std::aligned_alloc(VHD_SECTOR_SIZE, allocatedBufferSize); + } +} + +// static +TAioRequestHolder TAioRequest::CreateNew( + size_t bufferCount, + size_t allocatedBufferSize, + vhd_io* io, + TCpuCycles submitTs) +{ + const size_t totalSize = sizeof(TAioRequest) + sizeof(iovec) * bufferCount; + return TAioRequestHolder{ + new (std::calloc(1, totalSize)) + TAioRequest(allocatedBufferSize, io, submitTs)}; +} + +// static +TAioRequestHolder TAioRequest::FromIocb(iocb* cb) +{ + NSan::Acquire(cb); + Y_ABORT_UNLESS(cb->data == nullptr); + return TAioRequestHolder{static_cast(cb)}; +} + +// static +TAioSubRequestHolder TAioSubRequest::CreateNew() +{ + const size_t size = sizeof(TAioSubRequest); + return TAioSubRequestHolder{new (std::calloc(1, size)) TAioSubRequest}; +} + +// static +TAioSubRequestHolder TAioSubRequest::FromIocb(iocb* cb) { + NSan::Acquire(cb); + Y_ABORT_UNLESS(cb->data != nullptr); + return TAioSubRequestHolder{static_cast(cb)}; +} + +TAioCompoundRequest* TAioSubRequest::GetParentRequest() const +{ + NSan::Acquire(data); + Y_ABORT_UNLESS(data != nullptr); + return static_cast(data); +} + +TAioCompoundRequestHolder TAioSubRequest::TakeParentRequest() +{ + NSan::Acquire(data); + Y_ABORT_UNLESS(data != nullptr); + auto result = + TAioCompoundRequestHolder{static_cast(data)}; + data = nullptr; + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +TAioCompoundRequest::TAioCompoundRequest( + int inflight, + vhd_io* io, + size_t bufferSize, + TCpuCycles submitTs) + : Inflight(inflight) + , Io(io) + , SubmitTs(submitTs) + , Buffer{ + static_cast(std::aligned_alloc(VHD_SECTOR_SIZE, bufferSize)), + } +{} + +// static +std::unique_ptr TAioCompoundRequest::CreateNew( + int inflight, + vhd_io* io, + size_t bufferSize, + TCpuCycles submitTs) +{ + return std::make_unique( + inflight, + io, + bufferSize, + submitTs); } } // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/request_aio.h b/cloud/blockstore/vhost-server/request_aio.h index 6f8301a3bfe..9be85041785 100644 --- a/cloud/blockstore/vhost-server/request_aio.h +++ b/cloud/blockstore/vhost-server/request_aio.h @@ -2,6 +2,7 @@ #include "stats.h" +#include #include #include #include @@ -10,10 +11,10 @@ #include #include -#include - #include -#include // iovec +#include // iovec + +#include class TLog; @@ -21,6 +22,29 @@ namespace NCloud::NBlockStore::NVHostServer { //////////////////////////////////////////////////////////////////////////////// +struct TAioRequest; +struct TAioSubRequest; +struct TAioCompoundRequest; + +// The unique_ptr<> deleter that performs release via std std::free(). It used +// for memory blocks allocated via std::calloc(). +struct TFreeDeleter +{ + void operator()(void* obj); +}; + +// The unique_ptr<> deleter for TAioRequest. +struct TAioRequestDeleter +{ + void operator()(TAioRequest* obj); +}; + +using TAioRequestHolder = std::unique_ptr; +using TAioSubRequestHolder = std::unique_ptr; +using TAioCompoundRequestHolder = std::unique_ptr; + +//////////////////////////////////////////////////////////////////////////////// + struct TAioDevice { i64 StartOffset = 0; @@ -30,35 +54,92 @@ struct TAioDevice }; // Single IO request. Also map libvhost's vhd_buffer to iovec. +// The memory for these objects is allocated via std::calloc. +// The size of the allocated memory is enough so that the Data can keep all the +// request buffers. Therefore, Data is the last field in the class. struct TAioRequest : iocb { vhd_io* Io; TCpuCycles SubmitTs; - bool BounceBuf; + bool BufferAllocated = false; + bool Unaligned = false; iovec Data[ /* Bio->sglist.nbuffers */ ]; + + static TAioRequestHolder CreateNew( + size_t bufferCount, + size_t allocatedBufferSize, + vhd_io* io, + TCpuCycles submitTs); + static TAioRequestHolder FromIocb(iocb* cb); + +private: + TAioRequest(size_t allocatedBufferSize, vhd_io* io, TCpuCycles submitTs); +}; + +// Cross-device sub IO request. +// The memory for these objects is allocated via std::calloc. +struct TAioSubRequest: public iocb +{ + static TAioSubRequestHolder CreateNew(); + static TAioSubRequestHolder FromIocb(iocb* cb); + + [[nodiscard]] TAioCompoundRequest* GetParentRequest() const; + [[nodiscard]] TAioCompoundRequestHolder TakeParentRequest(); + +private: + TAioSubRequest() = default;; }; -// Compound IO request. +// Cross-device request shared info. struct TAioCompoundRequest { std::atomic Inflight; std::atomic Errors; vhd_io* Io; TCpuCycles SubmitTs; - char* Buffer; + std::unique_ptr Buffer; + + TAioCompoundRequest( + int inflight, + vhd_io* io, + size_t bufferSize, + TCpuCycles submitTs); + + static TAioCompoundRequestHolder CreateNew( + int inflight, + vhd_io* io, + size_t bufferSize, + TCpuCycles submitTs); }; //////////////////////////////////////////////////////////////////////////////// void PrepareIO( TLog& log, + IEncryptor* encryptor, const TVector& devices, vhd_io* io, TVector& batch, - TCpuCycles now); + TCpuCycles now, + TSimpleStats& queueStats); + +// Copies the data, and if an encryptor is specified, encrypt it. Returns true +// if successful. +[[nodiscard]] bool SgListCopyWithOptionalEncryption( + TLog& Log, + const vhd_sglist& src, + char* dst, + IEncryptor* encryptor, + ui64 startSector); -void SgListCopy(const vhd_sglist& src, char* dst); -void SgListCopy(const char* src, const vhd_sglist& dst); +// Copies the data, and if an encryptor is specified, decrypt it. Returns true +// if successful. +[[nodiscard]] bool SgListCopyWithOptionalDecryption( + TLog& Log, + const char* src, + const vhd_sglist& dst, + IEncryptor* encryptor, + ui64 startSector); } // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/request_aio_ut.cpp b/cloud/blockstore/vhost-server/request_aio_ut.cpp index a4d2c54046c..99c7f61cb66 100644 --- a/cloud/blockstore/vhost-server/request_aio_ut.cpp +++ b/cloud/blockstore/vhost-server/request_aio_ut.cpp @@ -13,7 +13,7 @@ #include #include -using namespace NCloud::NBlockStore::NVHostServer; +namespace NCloud::NBlockStore::NVHostServer { namespace { @@ -26,19 +26,6 @@ struct virtio_blk_io { struct vhd_bdev_io bdev_io; }; -void Free(TVector& batch) -{ - for (iocb* cb: batch) { - std::free(cb); - } -} - -void Free(TAioCompoundRequest* req) -{ - std::free(req->Buffer); - std::free(req); -} - //////////////////////////////////////////////////////////////////////////////// struct TTestBackend @@ -144,13 +131,11 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(1, batch.size()); - auto* req = static_cast(batch[0]); - Y_SCOPE_EXIT(req) { - std::free(req); - }; + auto req = TAioRequest::FromIocb(batch[0]); UNIT_ASSERT_VALUES_EQUAL(nullptr, req->data); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); @@ -160,7 +145,8 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) UNIT_ASSERT_VALUES_EQUAL(offset - Devices[2].StartOffset, req->u.c.offset); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT(!req->BounceBuf); + UNIT_ASSERT(!req->Unaligned); + UNIT_ASSERT(!req->BufferAllocated); UNIT_ASSERT_VALUES_EQUAL(buffers[0].base, req->Data[0].iov_base); UNIT_ASSERT_VALUES_EQUAL(buffers[0].len, req->Data[0].iov_len); @@ -202,14 +188,11 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(1, batch.size()); - auto* req = static_cast(batch[0]); - Y_SCOPE_EXIT(req) { - std::free(req->Data[0].iov_base); - std::free(req); - }; + auto req = TAioRequest::FromIocb(batch[0]); UNIT_ASSERT_VALUES_EQUAL(nullptr, req->data); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); @@ -219,7 +202,8 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) UNIT_ASSERT_VALUES_EQUAL(offset - Devices[2].StartOffset, req->u.c.offset); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT(req->BounceBuf); + UNIT_ASSERT(req->Unaligned); + UNIT_ASSERT(req->BufferAllocated); UNIT_ASSERT_VALUES_UNEQUAL(buffers[0].base, req->Data[0].iov_base); UNIT_ASSERT_VALUES_UNEQUAL(buffers[1].base, req->Data[0].iov_base); @@ -260,24 +244,26 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); - Y_SCOPE_EXIT(&batch) { Free(batch); }; + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(2, batch.size()); UNIT_ASSERT_VALUES_UNEQUAL(nullptr, batch[0]->data); - auto* req = static_cast(batch[0]->data); - Y_SCOPE_EXIT(req) { Free(req); }; + auto sub1 = TAioSubRequest::FromIocb(batch[0]); + auto sub2 = TAioSubRequest::FromIocb(batch[1]); + auto req = sub1->GetParentRequest(); + UNIT_ASSERT_VALUES_EQUAL(req, sub2->GetParentRequest()); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); UNIT_ASSERT_VALUES_EQUAL(req, batch[1]->data); UNIT_ASSERT_VALUES_EQUAL(batch.size(), req->Inflight.load()); UNIT_ASSERT_VALUES_EQUAL(0, req->Errors.load()); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT_VALUES_UNEQUAL(nullptr, req->Buffer); + UNIT_ASSERT_VALUES_UNEQUAL(nullptr, req->Buffer.get()); { - iocb* sub = batch[0]; + iocb* sub = sub1.get(); UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(static_cast(Devices[1].File), sub->aio_fildes); @@ -286,13 +272,15 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) } { - iocb* sub = batch[1]; + iocb* sub = sub2.get(); UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(static_cast(Devices[2].File), sub->aio_fildes); UNIT_ASSERT_VALUES_EQUAL(10_KB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(0, sub->u.c.offset); } + + auto holder = sub1->TakeParentRequest(); } Y_UNIT_TEST_F(ShouldPrepareCompoundIOForSmallDevices, TTestBackend) @@ -328,54 +316,60 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); - Y_SCOPE_EXIT(&batch) { Free(batch); }; + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(3, batch.size()); UNIT_ASSERT_VALUES_UNEQUAL(nullptr, batch[0]->data); - auto* req = static_cast(batch[0]->data); - Y_SCOPE_EXIT(req) { Free(req); }; + auto sub1 = TAioSubRequest::FromIocb(batch[0]); + auto sub2 = TAioSubRequest::FromIocb(batch[1]); + auto sub3 = TAioSubRequest::FromIocb(batch[2]); + auto req = sub1->GetParentRequest(); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); + UNIT_ASSERT_VALUES_EQUAL(req, batch[0]->data); UNIT_ASSERT_VALUES_EQUAL(req, batch[1]->data); + UNIT_ASSERT_VALUES_EQUAL(req, batch[2]->data); UNIT_ASSERT_VALUES_EQUAL(batch.size(), req->Inflight.load()); UNIT_ASSERT_VALUES_EQUAL(0, req->Errors.load()); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT_VALUES_UNEQUAL(nullptr, req->Buffer); + UNIT_ASSERT_VALUES_UNEQUAL(nullptr, req->Buffer.get()); { - iocb* sub = batch[0]; + iocb* sub = sub1.get(); UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(static_cast(Devices[1].File), sub->aio_fildes); - UNIT_ASSERT_VALUES_EQUAL(req->Buffer, sub->u.c.buf); + UNIT_ASSERT_VALUES_EQUAL(req->Buffer.get(), sub->u.c.buf); UNIT_ASSERT_VALUES_EQUAL(128_KB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(1_MB, Devices[1].StartOffset); UNIT_ASSERT_VALUES_EQUAL(offset - Devices[1].StartOffset, sub->u.c.offset); } { - iocb* sub = batch[1]; + iocb* sub = sub2.get(); UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(static_cast(Devices[2].File), sub->aio_fildes); UNIT_ASSERT_VALUES_EQUAL(2_MB, Devices[2].StartOffset); - UNIT_ASSERT_VALUES_EQUAL(req->Buffer + 128_KB, sub->u.c.buf); + UNIT_ASSERT_VALUES_EQUAL(req->Buffer.get() + 128_KB, sub->u.c.buf); UNIT_ASSERT_VALUES_EQUAL(1_MB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(0, sub->u.c.offset); } { - iocb* sub = batch[2]; + iocb* sub = sub3.get(); UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(static_cast(Devices[3].File), sub->aio_fildes); UNIT_ASSERT_VALUES_EQUAL(3_MB, Devices[3].StartOffset); - UNIT_ASSERT_VALUES_EQUAL(req->Buffer + 128_KB + 1_MB, sub->u.c.buf); + UNIT_ASSERT_VALUES_EQUAL(req->Buffer.get() + 128_KB + 1_MB, sub->u.c.buf); UNIT_ASSERT_VALUES_EQUAL(1_MB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(0, sub->u.c.offset); } + + auto holder = sub1->TakeParentRequest(); } Y_UNIT_TEST_F(ShouldPrepareIOForSplitDevices, TTestBackend) @@ -413,10 +407,13 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); + + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(1, batch.size()); - auto* req = static_cast(batch[0]); + auto req = TAioRequest::FromIocb(batch[0]); + batch.clear(); UNIT_ASSERT_VALUES_EQUAL(nullptr, req->data); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); @@ -426,15 +423,14 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) UNIT_ASSERT_VALUES_EQUAL(Devices[0].FileOffset, req->u.c.offset); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT(!req->BounceBuf); + UNIT_ASSERT(!req->Unaligned); + UNIT_ASSERT(!req->BufferAllocated); UNIT_ASSERT_VALUES_EQUAL(buffers[0].base, req->Data[0].iov_base); UNIT_ASSERT_VALUES_EQUAL(buffers[0].len, req->Data[0].iov_len); UNIT_ASSERT_VALUES_EQUAL(buffers[1].base, req->Data[1].iov_base); UNIT_ASSERT_VALUES_EQUAL(buffers[1].len, req->Data[1].iov_len); - - Free(batch); } // read the 2nd device @@ -468,10 +464,12 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(1, batch.size()); - auto* req = static_cast(batch[0]); + auto req = TAioRequest::FromIocb(batch[0]); + batch.clear(); UNIT_ASSERT_VALUES_EQUAL(nullptr, req->data); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); @@ -484,15 +482,14 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) req->u.c.offset); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT(!req->BounceBuf); + UNIT_ASSERT(!req->Unaligned); + UNIT_ASSERT(!req->BufferAllocated); UNIT_ASSERT_VALUES_EQUAL(buffers[0].base, req->Data[0].iov_base); UNIT_ASSERT_VALUES_EQUAL(buffers[0].len, req->Data[0].iov_len); UNIT_ASSERT_VALUES_EQUAL(buffers[1].base, req->Data[1].iov_base); UNIT_ASSERT_VALUES_EQUAL(buffers[1].len, req->Data[1].iov_len); - - Free(batch); } } @@ -533,27 +530,31 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) const ui64 now = GetCycleCount(); TVector batch; - PrepareIO(Log, Devices, &bio.io, batch, now); + TSimpleStats queueStats; + PrepareIO(Log, nullptr, Devices, &bio.io, batch, now, queueStats); UNIT_ASSERT_VALUES_EQUAL(3, batch.size()); UNIT_ASSERT_VALUES_UNEQUAL(nullptr, batch[0]->data); - auto* req = static_cast(batch[0]->data); + auto sub1 = TAioSubRequest::FromIocb(batch[0]); + auto sub2 = TAioSubRequest::FromIocb(batch[1]); + auto sub3 = TAioSubRequest::FromIocb(batch[2]); + auto req = sub1->GetParentRequest(); UNIT_ASSERT_VALUES_EQUAL(now, req->SubmitTs); UNIT_ASSERT_VALUES_EQUAL(req, batch[1]->data); UNIT_ASSERT_VALUES_EQUAL(batch.size(), req->Inflight.load()); UNIT_ASSERT_VALUES_EQUAL(0, req->Errors.load()); UNIT_ASSERT_VALUES_EQUAL(&bio.io, req->Io); - UNIT_ASSERT_VALUES_UNEQUAL(nullptr, req->Buffer); + UNIT_ASSERT_VALUES_UNEQUAL(nullptr, req->Buffer.get()); { - iocb* sub = batch[0]; + iocb* sub = sub1.get(); TAioDevice& device = Devices[0]; UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(SplitFileHandle, sub->aio_fildes); - UNIT_ASSERT_VALUES_EQUAL(req->Buffer, sub->u.c.buf); + UNIT_ASSERT_VALUES_EQUAL(req->Buffer.get(), sub->u.c.buf); UNIT_ASSERT_VALUES_EQUAL(512_KB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(0, device.StartOffset); UNIT_ASSERT_VALUES_EQUAL( @@ -562,30 +563,33 @@ Y_UNIT_TEST_SUITE(TAioRequestTest) } { - iocb* sub = batch[1]; + iocb* sub = sub2.get(); TAioDevice& device = Devices[1]; UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(SplitFileHandle, sub->aio_fildes); UNIT_ASSERT_VALUES_EQUAL(1_MB, device.StartOffset); - UNIT_ASSERT_VALUES_EQUAL(req->Buffer + 512_KB, sub->u.c.buf); + UNIT_ASSERT_VALUES_EQUAL(req->Buffer.get() + 512_KB, sub->u.c.buf); UNIT_ASSERT_VALUES_EQUAL(1_MB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(device.FileOffset, sub->u.c.offset); } { - iocb* sub = batch[2]; + iocb* sub = sub3.get(); TAioDevice& device = Devices[2]; UNIT_ASSERT_EQUAL(IO_CMD_PREAD, sub->aio_lio_opcode); UNIT_ASSERT_VALUES_EQUAL(SplitFileHandle, sub->aio_fildes); UNIT_ASSERT_VALUES_EQUAL(2_MB, device.StartOffset); - UNIT_ASSERT_VALUES_EQUAL(req->Buffer + 512_KB + 1_MB, sub->u.c.buf); + UNIT_ASSERT_VALUES_EQUAL( + req->Buffer.get() + 512_KB + 1_MB, + sub->u.c.buf); UNIT_ASSERT_VALUES_EQUAL(256_KB, sub->u.c.nbytes); UNIT_ASSERT_VALUES_EQUAL(device.FileOffset, sub->u.c.offset); } - Free(req); - Free(batch); + auto holder = sub1->TakeParentRequest(); } } + +} // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/server.cpp b/cloud/blockstore/vhost-server/server.cpp index 1d81a514f34..cb5245808f5 100644 --- a/cloud/blockstore/vhost-server/server.cpp +++ b/cloud/blockstore/vhost-server/server.cpp @@ -101,12 +101,12 @@ class TServer final TVector QueueThreads; public: - explicit TServer(ILoggingServicePtr logging, IBackendPtr backend); + TServer(ILoggingServicePtr logging, IBackendPtr backend); void Start(const TOptions& options) override; void Stop() override; - TSimpleStats GetStats(const TSimpleStats& prevStats) override; + TCompleteStats GetStats(const TSimpleStats& prevStats) override; private: void QueueThreadFunc(ui32 queueIndex); @@ -214,21 +214,25 @@ void TServer::Stop() STORAGE_INFO("Server has been stopped."); } -TSimpleStats TServer::GetStats(const TSimpleStats& prevStats) +TCompleteStats TServer::GetStats(const TSimpleStats& prevStats) { auto completionStats = Backend->GetCompletionStats(COMPLETION_STATS_WAIT_DURATION); if (!completionStats) { - return prevStats; + return TCompleteStats{ + .SimpleStats{prevStats}, + .CriticalEvents{TakeAccumulatedCriticalEvents()}}; } - TSimpleStats stats = *std::move(completionStats); + TCompleteStats result{ + .SimpleStats{*completionStats}, + .CriticalEvents = TakeAccumulatedCriticalEvents()}; for (ui32 i = 0; i != Queues.size(); ++i) { - stats += QueueStats[i]; + result.SimpleStats += QueueStats[i]; } - return stats; + return result; } void TServer::QueueThreadFunc(ui32 queueIndex) @@ -262,6 +266,7 @@ void TServer::SyncQueueStats(ui32 queueIndex, const TSimpleStats& queueStats) stats.Dequeued = queueStats.Dequeued; stats.Submitted = queueStats.Submitted; stats.SubFailed = queueStats.SubFailed; + stats.EncryptorErrors = queueStats.EncryptorErrors; stats.Requests[0] = queueStats.Requests[VHD_BDEV_READ]; stats.Requests[1] = queueStats.Requests[VHD_BDEV_WRITE]; diff --git a/cloud/blockstore/vhost-server/server.h b/cloud/blockstore/vhost-server/server.h index 9b2705e23c5..38e02c8cfd8 100644 --- a/cloud/blockstore/vhost-server/server.h +++ b/cloud/blockstore/vhost-server/server.h @@ -7,9 +7,7 @@ #include -#include #include -#include namespace NCloud::NBlockStore::NVHostServer { @@ -21,7 +19,7 @@ struct IServer virtual void Start(const TOptions& options) = 0; virtual void Stop() = 0; - virtual TSimpleStats GetStats(const TSimpleStats& prevStats) = 0; + virtual TCompleteStats GetStats(const TSimpleStats& prevStats) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/vhost-server/server_ut.cpp b/cloud/blockstore/vhost-server/server_ut.cpp index d9e2a0b7521..2d863e805ae 100644 --- a/cloud/blockstore/vhost-server/server_ut.cpp +++ b/cloud/blockstore/vhost-server/server_ut.cpp @@ -2,21 +2,29 @@ #include "backend_aio.h" +#include +#include +#include +#include #include -#include #include +#include -#include - -#include +#include +#include +#include #include #include +#include #include +#include + +#include #include -using namespace NCloud::NBlockStore::NVHostServer; +namespace NCloud::NBlockStore::NVHostServer { using NVHost::TMonotonicBufferResource; @@ -24,25 +32,85 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -struct TFixture - : public NUnitTest::TBaseFixture +const TString DefaultEncryptionKey("1234567890123456789012345678901"); + +using TSectorsInRequest = size_t; +using TUnaligned = bool; +using TTestParams = + std::tuple; + +class TMockEncryptor: public IEncryptor { +public: + enum class EBehaviour + { + ReturnError, + EncryptToAllZeroes, + }; + +private: + EBehaviour Behaviour; + +public: + explicit TMockEncryptor(EBehaviour behaviour) + : Behaviour(behaviour) + {} + + bool Encrypt( + const TBlockDataRef& src, + const TBlockDataRef& dst, + ui64 blockIndex) override + { + Y_UNUSED(src); + Y_UNUSED(blockIndex); + switch (Behaviour) { + case EBehaviour::ReturnError: { + return false; + } + case EBehaviour::EncryptToAllZeroes:{ + memset(const_cast(dst.Data()), 0, dst.Size()); + return true; + } + } + } + + bool Decrypt( + const TBlockDataRef& src, + const TBlockDataRef& dst, + ui64 blockIndex) override + { + Y_UNUSED(src); + Y_UNUSED(dst); + Y_UNUSED(blockIndex); + return false; + } +}; + +class TServerTest + : public testing::TestWithParam +{ +public: static constexpr ui32 QueueCount = 8; static constexpr ui32 QueueIndex = 4; static constexpr ui64 ChunkCount = 3; static constexpr ui64 ChunkByteCount = 16_KB; static constexpr ui64 TotalByteCount = ChunkByteCount * ChunkCount; - static constexpr ui64 SectorSize = 512; + static constexpr ui64 SectorSize = VHD_SECTOR_SIZE; + static constexpr ui64 TotalSectorCount = TotalByteCount / SectorSize; - const i64 HeaderSize = 4_KB; - const i64 PaddingSize = 1_KB; + static constexpr i64 HeaderSize = 4_KB; + static constexpr i64 PaddingSize = 1_KB; + const NProto::EEncryptionMode EncryptionMode = std::get<0>(GetParam()); + const size_t SectorsPerRequest = std::get<1>(GetParam()); + const bool Unaligned = std::get<2>(GetParam()); const TString SocketPath = "server_ut.vhost"; const TString Serial = "server_ut"; NCloud::ILoggingServicePtr Logging; std::shared_ptr Server; - TVector Files; + TVector Files; + IEncryptorPtr Encryptor; TOptions Options { .SocketPath = SocketPath, @@ -57,16 +125,22 @@ struct TFixture TMonotonicBufferResource Memory; public: + TServerTest() + { + if (EncryptionMode == NProto::EEncryptionMode::ENCRYPTION_AES_XTS) { + Encryptor = + CreateAesXtsEncryptor(TEncryptionKey(DefaultEncryptionKey)); + } + } + void StartServer() { - Server = CreateServer(Logging, CreateAioBackend(Logging)); + Server = CreateServer(Logging, CreateAioBackend(Encryptor, Logging)); Options.Layout.reserve(ChunkCount); Files.reserve(ChunkCount); for (ui32 i = 0; i != ChunkCount; ++i) { - auto& file = Files.emplace_back( - "nrd_" + ToString(i), - EOpenModeFlag::CreateAlways); + auto& file = Files.emplace_back(MakeTempName()); Options.Layout.push_back({ .DevicePath = file.GetName(), @@ -78,14 +152,14 @@ struct TFixture Server->Start(Options); - UNIT_ASSERT(Client.Init()); + ASSERT_TRUE(Client.Init()); Memory = TMonotonicBufferResource {Client.GetMemory()}; } void StartServerWithSplitDevices() { - Server = CreateServer(Logging, CreateAioBackend(Logging)); + Server = CreateServer(Logging, CreateAioBackend(Encryptor, Logging)); // H - header // D - device @@ -103,7 +177,7 @@ struct TFixture std::swap(offsets.front(), offsets.back()); - auto& file = Files.emplace_back("nrd_0", EOpenModeFlag::CreateAlways); + auto& file = Files.emplace_back(MakeTempName()); const size_t fileSize = HeaderSize + ChunkCount * ChunkByteCount @@ -143,33 +217,30 @@ struct TFixture Server->Start(Options); - UNIT_ASSERT(Client.Init()); + ASSERT_TRUE(Client.Init()); Memory = TMonotonicBufferResource {Client.GetMemory()}; } - void SetUp(NUnitTest::TTestContext& context) override + void SetUp() override { - Y_UNUSED(context); - Logging = NCloud::CreateLoggingService( "console", {.FiltrationLevel = TLOG_DEBUG}); } - void TearDown(NUnitTest::TTestContext& context) override + void TearDown() override { - Y_UNUSED(context); - - Client.DeInit(); - Server->Stop(); - Server.reset(); - + if (Server) { + Client.DeInit(); + Server->Stop(); + Server.reset(); + } Files.clear(); Options.Layout.clear(); } - TSimpleStats GetStats(ui64 expectedCompleted) const + TCompleteStats GetStats(ui64 expectedCompleted) const { // Without I/O, stats are synced every second and only if there is a // pending GetStats call. The first call to GetStats might not bring the @@ -177,10 +248,10 @@ struct TFixture // backend will sync the stats. TSimpleStats prevStats; - TSimpleStats stats; + TCompleteStats stats; for (int i = 0; i != 5; ++i) { stats = Server->GetStats(prevStats); - if (stats.Completed == expectedCompleted) { + if (stats.SimpleStats.Completed == expectedCompleted) { break; } Sleep(TDuration::Seconds(1)); @@ -188,6 +259,71 @@ struct TFixture return stats; } + + TString LoadRawSector(ui64 sector) + { + const ui64 sectorsPerChunk = ChunkByteCount / SectorSize; + const ui64 chunkIndex = sector/ sectorsPerChunk; + const auto& chunkLayout = Options.Layout[chunkIndex]; + + auto it = FindIf( + Files, + [&](const TFile& f) + { return f.GetName() == chunkLayout.DevicePath; }); + if (it == Files.end()) { + return "File " + chunkLayout.DevicePath + " not found"; + } + auto & file = *it; + + const ui64 fileOffset = + chunkLayout.Offset + (sector % sectorsPerChunk) * SectorSize; + + TString buffer; + buffer.resize(SectorSize); + file.Seek(fileOffset, SeekDir::sSet); + file.Load(&buffer[0], buffer.size()); + + return buffer; + } + + TString LoadSectorAndDecrypt(ui64 sector) + { + TString buffer = LoadRawSector(sector); + if (Encryptor && !IsAllZeroes(buffer.data(), buffer.size())) { + Encryptor->Decrypt( + TBlockDataRef(buffer.data(), buffer.size()), + TBlockDataRef(buffer.data(), buffer.size()), + sector); + } + return buffer; + } + + bool SaveRawSector(ui64 sector, const TString& data) + { + const ui64 sectorsPerChunk = ChunkByteCount / SectorSize; + const ui64 chunkIndex = sector/ sectorsPerChunk; + const auto& chunkLayout = Options.Layout[chunkIndex]; + + auto it = FindIf( + Files, + [&](const TFile& f) + { return f.GetName() == chunkLayout.DevicePath; }); + if (it == Files.end()) { + return false; + } + auto & file = *it; + + const ui64 fileOffset = + chunkLayout.Offset + (sector % sectorsPerChunk) * SectorSize; + + if (data.size() != SectorSize) { + return false; + } + file.Seek(fileOffset, SeekDir::sSet); + file.Write(&data[0], data.size()); + + return true; + } }; //////////////////////////////////////////////////////////////////////////////// @@ -210,251 +346,648 @@ auto Hdr(TMonotonicBufferResource& mem, virtio_blk_req_hdr hdr) return Create(mem, hdr); } +TString MakeRandomPattern(size_t size) { + TString result; + result.resize(size); + + for (size_t i = 0; i < size; ++i) { + result[i] = RandomNumber(255); + } + return result; +} + } // namespace //////////////////////////////////////////////////////////////////////////////// -Y_UNIT_TEST_SUITE(TServerTest) +TEST_P(TServerTest, ShouldGetDeviceID) { - Y_UNIT_TEST_F(ShouldGetDeviceID, TFixture) + StartServer(); + + std::span hdr = Hdr(Memory, {.type = VIRTIO_BLK_T_GET_ID}); + std::span serial = Memory.Allocate(VIRTIO_BLK_DISKID_LENGTH); + std::span status = Memory.Allocate(1); + + const ui32 len = + Client.WriteAsync(QueueIndex, {hdr}, {serial, status}).GetValueSync(); + + EXPECT_EQ(serial.size() + status.size(), len); + EXPECT_EQ(Serial, TStringBuf(serial.data())); + EXPECT_EQ(VIRTIO_BLK_S_OK, status[0]); +} + +TEST_P(TServerTest, ShouldReadAndWrite) +{ + StartServer(); + + auto getFillChar = [](size_t sector) -> ui8 { - StartServer(); + return ('A' + sector) % 256; + }; + auto makePatten = [&](size_t startSector) -> TString + { + TString result; + result.resize(SectorSize * SectorsPerRequest); + for (size_t i = 0; i < SectorsPerRequest; ++i) { + memset( + const_cast(result.data()) + SectorSize * i, + getFillChar(startSector + i), + SectorSize); + } + return result; + }; - std::span hdr = Hdr(Memory, { .type = VIRTIO_BLK_T_GET_ID }); - std::span serial = Memory.Allocate(VIRTIO_BLK_DISKID_LENGTH); + // write data + size_t writesCount = 0; + { + std::span hdr = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); std::span status = Memory.Allocate(1); - const ui32 len = Client.WriteAsync( - QueueIndex, - { hdr }, - { serial, status }).GetValueSync(); + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + reinterpret_cast(hdr.data())->sector = i; + TString expectedData = makePatten(i); + memcpy( + writeBuffer.data(), + expectedData.data(), + expectedData.size()); + auto writeOp = + Client.WriteAsync(QueueIndex, {hdr, writeBuffer}, {status}); + EXPECT_EQ(status.size(), writeOp.GetValueSync()); + EXPECT_EQ(VIRTIO_BLK_S_OK, status[0]); + ++writesCount; + } + } + + // read data + size_t readsCount = 0; + { + std::span hdr = Hdr(Memory, {.type = VIRTIO_BLK_T_IN}); + std::span readBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span status = Memory.Allocate(1); + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + reinterpret_cast(hdr.data())->sector = i; + auto readOp = + Client.WriteAsync(QueueIndex, {hdr}, {readBuffer, status}); + EXPECT_EQ(status.size() + readBuffer.size(), readOp.GetValueSync()); + EXPECT_EQ(VIRTIO_BLK_S_OK, status[0]); + + TString readData(readBuffer.data(), readBuffer.size()); + EXPECT_EQ(makePatten(i), readData); + ++readsCount; + } + } - UNIT_ASSERT_VALUES_EQUAL(serial.size() + status.size(), len); - UNIT_ASSERT_VALUES_EQUAL(Serial, TStringBuf(serial.data())); - UNIT_ASSERT_VALUES_EQUAL(0, status[0]); + // validate storage + for (ui64 i = 0; i != TotalSectorCount; ++i) { + const TString expectedData(SectorSize, getFillChar(i)); + const TString realData = LoadSectorAndDecrypt(i); + EXPECT_EQ(expectedData, realData); } - Y_UNIT_TEST_F(ShouldWriteUnaligned, TFixture) + // validate stats + const auto splittedReads = (SectorsPerRequest - 1) * (ChunkCount - 1); + const auto splittedWrites = (SectorsPerRequest - 1) * (ChunkCount - 1); + const auto expectedTotalRequestCount = + writesCount + readsCount + splittedReads + splittedWrites; + const auto completeStats = GetStats(expectedTotalRequestCount); + const auto& stats = completeStats.SimpleStats; + + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); + EXPECT_EQ(expectedTotalRequestCount, stats.Completed); + EXPECT_EQ(expectedTotalRequestCount, stats.Dequeued); + EXPECT_EQ(expectedTotalRequestCount, stats.Submitted); + { + const auto& read = stats.Requests[0]; + EXPECT_EQ(readsCount, read.Count); + EXPECT_EQ(readsCount * SectorsPerRequest * SectorSize, read.Bytes); + EXPECT_EQ(0u, read.Errors); + EXPECT_EQ(Unaligned ? readsCount - splittedReads : 0, read.Unaligned); + } { - StartServer(); + const auto& write = stats.Requests[1]; + EXPECT_EQ(writesCount, write.Count); + EXPECT_EQ(writesCount * SectorsPerRequest * SectorSize, write.Bytes); + EXPECT_EQ(0u, write.Errors); + EXPECT_EQ( + Unaligned ? writesCount - splittedWrites : 0, + write.Unaligned); + } +} - const ui64 sectorsPerChunk = ChunkByteCount / SectorSize; - const ui64 sectorCount = sectorsPerChunk * ChunkCount; +TEST_P(TServerTest, ShouldWriteToSplitDevices) +{ + if (SectorsPerRequest != 1) { + return; + } + StartServerWithSplitDevices(); - std::span hdr = Hdr(Memory, { .type = VIRTIO_BLK_T_OUT }); - std::span sector = Memory.Allocate(SectorSize); - std::span status = Memory.Allocate(1); + std::vector sectorsFill(TotalSectorCount); - // write data - for (ui64 i = 0; i != sectorCount; ++i) { - reinterpret_cast(hdr.data())->sector = i; + // layout: [ H | --- D --- | P | --- D --- | P | --- D --- | ... ] + auto verifyLayoutAndData = [&]() + { + char header[HeaderSize]; + TFile file{Files[0].GetName(), EOpenModeFlag::OpenAlways}; - memset(sector.data(), 'A' + i, sector.size_bytes()); + // Check header + file.Load(header, HeaderSize); + EXPECT_EQ(HeaderSize, std::count(header, header + HeaderSize, 'H')); - const ui32 len = Client.WriteAsync( - QueueIndex, - { hdr, sector }, - { status } - ).GetValueSync(); + const TString paddingData(PaddingSize, 'P'); + // Check paddings + for (ui32 i = 0; i != ChunkCount; ++i) { + // Skip sectors data + file.Seek(ChunkByteCount, SeekDir::sCur); + + // Check padding + if (i + 1 != ChunkCount) { + TString realPadding(PaddingSize, 0); + file.Load(const_cast(realPadding.data()), PaddingSize); + EXPECT_EQ(paddingData, realPadding); + } + } - UNIT_ASSERT_VALUES_EQUAL(status.size(), len); - UNIT_ASSERT_VALUES_EQUAL(0, status[0]); + // Check sectors + for (ui32 i = 0; i < TotalSectorCount; ++i) { + const TString expectedData(SectorSize, sectorsFill[i]); + const TString realData = LoadSectorAndDecrypt(i); + EXPECT_EQ(expectedData, realData); } + }; + // initial verification + verifyLayoutAndData(); - // validate - TString buffer; - buffer.resize(SectorSize); - for (ui64 i = 0; i != sectorCount; ++i) { - const TString expectedData(SectorSize, 'A' + i); + // disk: [ --- Dn-1 --- | --- D1 --- | ... | --- Dn-2 --- | --- D0 --- ] + // write: ^------------^ + // offset: ChunkByteCount/2 + // size: ChunkByteCount + { + std::span hdr = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span buffer = + Memory.Allocate(ChunkByteCount, Unaligned ? 1 : SectorSize); + std::span status = Memory.Allocate(1); + + const ui64 startSector = ChunkByteCount / 2 / SectorSize; + reinterpret_cast(hdr.data())->sector = startSector; - auto& file = Files[i / sectorsPerChunk]; - file.Load(&buffer[0], buffer.size()); - UNIT_ASSERT_VALUES_EQUAL(expectedData, buffer); + for (ui32 i = 0; i < ChunkByteCount / SectorSize; ++i) { + const ui8 sectorFill = (startSector + i) % 256; + sectorsFill[startSector + i] = sectorFill; + std::memset(buffer.data() + i * SectorSize, sectorFill, SectorSize); } - const auto stats = GetStats(sectorCount); + auto writeOp = Client.WriteAsync(QueueIndex, {hdr, buffer}, {status}); + const ui32 len = writeOp.GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(0, stats.CompFailed); - UNIT_ASSERT_VALUES_EQUAL(0, stats.SubFailed); - UNIT_ASSERT_VALUES_EQUAL(sectorCount, stats.Completed); - UNIT_ASSERT_VALUES_EQUAL(sectorCount, stats.Dequeued); - UNIT_ASSERT_VALUES_EQUAL(sectorCount, stats.Submitted); + EXPECT_EQ(status.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_OK, status[0]); + } - const auto& read = stats.Requests[0]; - UNIT_ASSERT_VALUES_EQUAL(0, read.Count); - UNIT_ASSERT_VALUES_EQUAL(0, read.Bytes); - UNIT_ASSERT_VALUES_EQUAL(0, read.Errors); - UNIT_ASSERT_VALUES_EQUAL(0, read.Unaligned); + // verification after cross-chunk write + verifyLayoutAndData(); +} - const auto& write = stats.Requests[1]; - UNIT_ASSERT_VALUES_EQUAL(sectorCount, write.Count); - UNIT_ASSERT_VALUES_EQUAL(TotalByteCount, write.Bytes); - UNIT_ASSERT_VALUES_EQUAL(0, write.Errors); - UNIT_ASSERT_VALUES_EQUAL(sectorCount, write.Unaligned); +TEST_P(TServerTest, ShouldHandleMultipleQueues) +{ + if (!Unaligned) { + // TODO fix data-race + return; } + StartServer(); - Y_UNIT_TEST_F(ShouldWriteToSplitDevices, TFixture) - { - StartServerWithSplitDevices(); + std::vector sectorsFill(TotalSectorCount); + const ui32 requestCount = 10; - // layout: [ H | --- D --- | P | --- D --- | P | --- D --- | ... ] + TVector> statuses; + TVector> futures; - // initial verification - { - char header[HeaderSize]; - TFile file { Files[0].GetName(), EOpenModeFlag::OpenAlways }; + for (ui64 i = 0; i != requestCount; ++i) { + ui64 startSector = (i * SectorsPerRequest) % TotalSectorCount; + std::span hdr = + Hdr(Memory, {.type = VIRTIO_BLK_T_OUT, .sector = startSector}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span status = Memory.Allocate(1); - file.Load(header, HeaderSize); - UNIT_ASSERT_VALUES_EQUAL( - HeaderSize, std::count(header, header + HeaderSize, 'H')); + EXPECT_EQ(SectorSize * SectorsPerRequest, writeBuffer.size()); + EXPECT_EQ(1u, status.size()); - char padding[PaddingSize]; - TVector buffer(ChunkByteCount); - - for (ui32 i = 0; i != ChunkCount; ++i) { - file.Load(buffer.data(), ChunkByteCount); - UNIT_ASSERT_VALUES_EQUAL( - ChunkByteCount, std::count(buffer.begin(), buffer.end(), 0)); - - if (i + 1 != ChunkCount) { - file.Load(padding, PaddingSize); - UNIT_ASSERT_VALUES_EQUAL( - PaddingSize, - std::count(padding, padding + PaddingSize, 'P')); - } - } + ui8 sectorFill = 'A' + i % 26; + memset(writeBuffer.data(), sectorFill, writeBuffer.size_bytes()); + for (size_t j = 0; j < SectorsPerRequest; ++j) { + sectorsFill[startSector + j] = sectorFill; } - // disk: [ --- Dn-1 --- | --- D1 --- | ... | --- Dn-2 --- | --- D0 --- ] - // write: ^------------^ - // offset: ChunkByteCount/2 - // size: ChunkByteCount - { - std::span hdr = Hdr(Memory, { .type = VIRTIO_BLK_T_OUT }); - std::span buffer = Memory.Allocate(ChunkByteCount); - std::span status = Memory.Allocate(1); + statuses.push_back(status); + futures.push_back( + Client.WriteAsync(i % QueueCount, {hdr, writeBuffer}, {status})); + } - reinterpret_cast(hdr.data()) - ->sector = ChunkByteCount / 2 / SectorSize; + WaitAll(futures).Wait(); - std::memset(buffer.data(), 'X', ChunkByteCount); + const auto completeStats = GetStats(requestCount); + const auto& stats = completeStats.SimpleStats; - const ui32 len = Client.WriteAsync( - QueueIndex, - { hdr, buffer }, - { status } - ).GetValueSync(); + EXPECT_EQ(requestCount, stats.Submitted); + EXPECT_EQ(requestCount, stats.Completed); + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); - UNIT_ASSERT_VALUES_EQUAL(status.size(), len); - UNIT_ASSERT_VALUES_EQUAL(0, status[0]); - } + for (ui32 i = 0; i != requestCount; ++i) { + const ui32 len = futures[i].GetValueSync(); - // check the write - { - char header[HeaderSize]; - TFile file { Files[0].GetName(), EOpenModeFlag::OpenAlways }; + EXPECT_EQ(statuses[i].size(), len); + EXPECT_EQ(char(0), statuses[i][0]); + } - file.Load(header, HeaderSize); - UNIT_ASSERT_VALUES_EQUAL( - HeaderSize, std::count(header, header + HeaderSize, 'H')); + // Check sectors + for (ui32 i = 0; i < TotalSectorCount; ++i) { + const TString expectedData(SectorSize, sectorsFill[i]); + const TString realData = LoadSectorAndDecrypt(i); + EXPECT_EQ(expectedData, realData); + } +} - char padding[PaddingSize]; - TVector buffer(ChunkByteCount); - - for (ui32 i = 0; i != ChunkCount; ++i) { - file.Load(buffer.data(), ChunkByteCount); - - switch (i) { - case ChunkCount - 1: { - const char* p = buffer.data(); - const auto none = std::count(p, p + ChunkByteCount / 2, 0); - const auto data = std::count( - p + ChunkByteCount / 2, - p + ChunkByteCount, - 'X'); - - UNIT_ASSERT_VALUES_EQUAL(ChunkByteCount / 2, none); - UNIT_ASSERT_VALUES_EQUAL(ChunkByteCount / 2, data); - break; - } - case 1: { - const char* p = buffer.data(); - const auto data = std::count(p, p + ChunkByteCount / 2, 'X'); - const auto none = std::count( - p + ChunkByteCount / 2, - p + ChunkByteCount, - 0); - - UNIT_ASSERT_VALUES_EQUAL(ChunkByteCount / 2, data); - UNIT_ASSERT_VALUES_EQUAL(ChunkByteCount / 2, none); - break; - } - default: { - UNIT_ASSERT_VALUES_EQUAL( - ChunkByteCount, - std::count(buffer.begin(), buffer.end(), 0)); - break; - } - } - - if (i + 1 != ChunkCount) { - file.Load(padding, PaddingSize); - UNIT_ASSERT_VALUES_EQUAL( - PaddingSize, - std::count(padding, padding + PaddingSize, 'P')); - } - } +TEST_P(TServerTest, ShouldWriteMultipleAndReadByOne) +{ + if (SectorsPerRequest == 1) { + return; + } + StartServer(); + + std::span hdr_w = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span writeStatus = Memory.Allocate(1); + + std::span hdr_r = Hdr(Memory, {.type = VIRTIO_BLK_T_IN}); + std::span readBuffer = + Memory.Allocate(SectorSize, Unaligned ? 1 : SectorSize); + std::span readStatus = Memory.Allocate(1); + + size_t readCount = 0; + size_t writeCount = 0; + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + const TString pattern = MakeRandomPattern(writeBuffer.size_bytes()); + + // write SectorsPerRequest at once + reinterpret_cast(hdr_w.data())->sector = i; + memcpy(writeBuffer.data(), pattern.data(), writeBuffer.size_bytes()); + auto writeOp = + Client.WriteAsync(QueueIndex, {hdr_w, writeBuffer}, {writeStatus}); + const ui32 len = writeOp.GetValueSync(); + EXPECT_EQ(writeStatus.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_OK, writeStatus[0]); + writeCount++; + + // read one sector at a time + for (size_t j = 0; j < SectorsPerRequest; ++j) { + reinterpret_cast(hdr_r.data())->sector = i + j; + auto readOp = Client.WriteAsync( + QueueIndex, + {hdr_r}, + {readBuffer, readStatus}); + const ui32 len = readOp.GetValueSync(); + EXPECT_EQ(readStatus.size() + readBuffer.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_OK, readStatus[0]); + std::string_view readData(readBuffer.data(), readBuffer.size()); + std::string_view expectedData( + pattern.data() + SectorSize * j, + SectorSize); + EXPECT_EQ(expectedData, readData); + ++readCount; } } - Y_UNIT_TEST_F(ShouldHandleMutlipleQueues, TFixture) + // validate stats + const auto splittedReads = 0; + const auto splittedWrites = (SectorsPerRequest - 1) * (ChunkCount - 1); + const auto expectedTotalRequestCount = + writeCount + readCount + splittedReads + splittedWrites; + const auto completeStats = GetStats(expectedTotalRequestCount); + const auto& stats = completeStats.SimpleStats; + + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); + EXPECT_EQ(expectedTotalRequestCount, stats.Completed); + EXPECT_EQ(expectedTotalRequestCount, stats.Dequeued); + EXPECT_EQ(expectedTotalRequestCount, stats.Submitted); { - StartServer(); + const auto& read = stats.Requests[0]; + EXPECT_EQ(readCount, read.Count); + EXPECT_EQ(readCount * SectorSize, read.Bytes); + EXPECT_EQ(0u, read.Errors); + EXPECT_EQ(Unaligned ? readCount - splittedReads : 0, read.Unaligned); + } + { + const auto& write = stats.Requests[1]; + EXPECT_EQ(writeCount, write.Count); + EXPECT_EQ(writeCount * SectorsPerRequest * SectorSize, write.Bytes); + EXPECT_EQ(0u, write.Errors); + EXPECT_EQ(Unaligned ? writeCount - splittedWrites : 0, write.Unaligned); + } +} - const ui32 requestCount = 10; - const ui64 sectorsPerChunk = ChunkByteCount / SectorSize; - const ui64 sectorCount = sectorsPerChunk * ChunkCount; +TEST_P(TServerTest, ShouldWriteByOneAndReadMultiple) +{ + if (SectorsPerRequest == 1) { + return; + } + StartServer(); + + std::span hdr_w = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = + Memory.Allocate(SectorSize, Unaligned ? 1 : SectorSize); + std::span writeStatus = Memory.Allocate(1); + + std::span hdr_r = Hdr(Memory, {.type = VIRTIO_BLK_T_IN}); + std::span readBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span readStatus = Memory.Allocate(1); + + size_t readCount = 0; + size_t writeCount = 0; + + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + const TString pattern = + MakeRandomPattern(SectorSize * SectorsPerRequest); + + // write one sectors at a time + for (size_t j = 0; j < SectorsPerRequest; ++j) { + reinterpret_cast(hdr_w.data())->sector = i + j; + memcpy( + writeBuffer.data(), + pattern.data() + SectorSize * j, + writeBuffer.size_bytes()); + auto writeOp = Client.WriteAsync( + QueueIndex, + {hdr_w, writeBuffer}, + {writeStatus}); + const ui32 len = writeOp.GetValueSync(); + EXPECT_EQ(writeStatus.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_OK, writeStatus[0]); + ++writeCount; + } - TVector> statuses; - TVector> futures; + // read SectorsPerRequest at once + reinterpret_cast(hdr_r.data())->sector = i; + auto read_result = + Client.WriteAsync(QueueIndex, {hdr_r}, {readBuffer, readStatus}); + const ui32 len = read_result.GetValueSync(); + EXPECT_EQ(readStatus.size() + readBuffer.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_OK, readStatus[0]); + std::string_view readData(readBuffer.data(), readBuffer.size()); + std::string_view expectedData(pattern.data(), readBuffer.size()); + EXPECT_EQ(expectedData, readData); + ++readCount; + } - for (ui64 i = 0; i != requestCount; ++i) { - std::span hdr = Hdr(Memory, { - .type = VIRTIO_BLK_T_OUT, - .sector = i % sectorCount - }); - std::span sector = Memory.Allocate(SectorSize); - std::span status = Memory.Allocate(1); + // validate stats + const auto splittedReads = (SectorsPerRequest - 1) * (ChunkCount - 1); + const auto splittedWrites = 0; + const auto expectedTotalRequestCount = + writeCount + readCount + splittedReads + splittedWrites; + const auto completeStats = GetStats(expectedTotalRequestCount); + const auto& stats = completeStats.SimpleStats; + + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); + EXPECT_EQ(expectedTotalRequestCount, stats.Completed); + EXPECT_EQ(expectedTotalRequestCount, stats.Dequeued); + EXPECT_EQ(expectedTotalRequestCount, stats.Submitted); + { + const auto& read = stats.Requests[0]; + EXPECT_EQ(readCount, read.Count); + EXPECT_EQ(readCount * SectorsPerRequest * SectorSize, read.Bytes); + EXPECT_EQ(0u, read.Errors); + EXPECT_EQ(Unaligned ? readCount - splittedReads : 0, read.Unaligned); + } + { + const auto& write = stats.Requests[1]; + EXPECT_EQ(writeCount, write.Count); + EXPECT_EQ(writeCount * SectorSize, write.Bytes); + EXPECT_EQ(0u, write.Errors); + EXPECT_EQ(Unaligned ? writeCount - splittedWrites : 0, write.Unaligned); + } +} - UNIT_ASSERT_VALUES_EQUAL(SectorSize, sector.size()); - UNIT_ASSERT_VALUES_EQUAL(1, status.size()); +TEST_P(TServerTest, ShouldHandleWrongSectorIndex) +{ + StartServer(); - memset(sector.data(), 'A' + i % 26, sector.size_bytes()); + { + std::span hdr_w = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span writeStatus = Memory.Allocate(1); + + reinterpret_cast(hdr_w.data())->sector = + TotalSectorCount; + auto writeOp = + Client.WriteAsync(QueueIndex, {hdr_w, writeBuffer}, {writeStatus}); + const ui32 len = writeOp.GetValueSync(); + EXPECT_EQ(writeStatus.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_IOERR, writeStatus[0]); + } - statuses.push_back(status); - futures.push_back(Client.WriteAsync( - i % QueueCount, - { hdr, sector }, - { status } - )); + { + std::span hdr_r = Hdr(Memory, {.type = VIRTIO_BLK_T_IN}); + std::span readBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span readStatus = Memory.Allocate(1); + + reinterpret_cast(hdr_r.data())->sector = + TotalSectorCount; + auto read_result = + Client.WriteAsync(QueueIndex, {hdr_r}, {readBuffer, readStatus}); + const ui32 len = read_result.GetValueSync(); + EXPECT_EQ(readStatus.size() + readBuffer.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_IOERR, readStatus[0]); + } + // validate stats + const auto completeStats = GetStats(0); + const auto& stats = completeStats.SimpleStats; + + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); + EXPECT_EQ(0u, stats.Completed); + EXPECT_EQ(0u, stats.Dequeued); + EXPECT_EQ(0u, stats.Submitted); +} + +TEST_P(TServerTest, ShouldStoreEncryptedZeroes) +{ + StartServer(); + + std::span hdr_w = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span writeStatus = Memory.Allocate(1); + const auto pattern = TString(writeBuffer.size_bytes(), 0); + + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + // write SectorsPerRequest at once + reinterpret_cast(hdr_w.data())->sector = i; + memcpy(writeBuffer.data(), pattern.data(), writeBuffer.size_bytes()); + auto writeOp = + Client.WriteAsync(QueueIndex, {hdr_w, writeBuffer}, {writeStatus}); + const ui32 len = writeOp.GetValueSync(); + EXPECT_EQ(writeStatus.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_OK, writeStatus[0]); + + for (size_t j = 0; j < SectorsPerRequest; ++j) { + const auto rawSector = LoadRawSector(i + j); + const bool shouldReadZeroes = + EncryptionMode == NProto::EEncryptionMode::NO_ENCRYPTION; + EXPECT_EQ( + shouldReadZeroes, + IsAllZeroes(rawSector.data(), rawSector.size())); } + } +} - WaitAll(futures).Wait(); +TEST_P(TServerTest, ShouldStatEncryptorErrors) +{ + if (EncryptionMode != NProto::EEncryptionMode::ENCRYPTION_AES_XTS) { + return; + } - const auto stats = GetStats(requestCount); + Encryptor = std::make_shared( + TMockEncryptor::EBehaviour::ReturnError); + StartServer(); - UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Submitted); - UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Completed); - UNIT_ASSERT_VALUES_EQUAL(0, stats.CompFailed); - UNIT_ASSERT_VALUES_EQUAL(0, stats.SubFailed); + // Fill storage with random data + { + auto randomSector = MakeRandomPattern(SectorSize); + for (size_t i = 0; i < TotalSectorCount; ++i) { + SaveRawSector(i, randomSector); + } + } - for (ui32 i = 0; i != requestCount; ++i) { - const ui32 len = futures[i].GetValueSync(); + std::span hdr_w = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span writeStatus = Memory.Allocate(1); + + std::span hdr_r = Hdr(Memory, {.type = VIRTIO_BLK_T_IN}); + std::span readBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span readStatus = Memory.Allocate(1); + + size_t readCount = 0; + size_t writeCount = 0; + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + // check writes + { + reinterpret_cast(hdr_w.data())->sector = i; + auto writeOp = Client.WriteAsync( + QueueIndex, + {hdr_w, writeBuffer}, + {writeStatus}); + const ui32 len = writeOp.GetValueSync(); + EXPECT_EQ(1u, len); + EXPECT_EQ(VIRTIO_BLK_S_IOERR, writeStatus[0]); + ++writeCount; + } - UNIT_ASSERT_VALUES_EQUAL_C( - statuses[i].size(), len, - sectorsPerChunk << " | " << i); - UNIT_ASSERT_VALUES_EQUAL(0, statuses[i][0]); + // check reads + { + reinterpret_cast(hdr_r.data())->sector = i; + auto readOp = Client.WriteAsync( + QueueIndex, + {hdr_r}, + {readBuffer, readStatus}); + const ui32 len = readOp.GetValueSync(); + EXPECT_EQ(readStatus.size() + readBuffer.size(), len); + EXPECT_EQ(VIRTIO_BLK_S_IOERR, readStatus[0]); + ++readCount; } } + + // validate stats + const auto splittedReads = (SectorsPerRequest - 1) * (ChunkCount - 1); + const auto completeStats = GetStats(readCount + splittedReads); + const auto& stats = completeStats.SimpleStats; + + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); + EXPECT_EQ(readCount + splittedReads, stats.Completed); + EXPECT_EQ(readCount + splittedReads, stats.Dequeued); + EXPECT_EQ(readCount + splittedReads, stats.Submitted); + EXPECT_EQ(readCount + writeCount, stats.EncryptorErrors); } + +TEST_P(TServerTest, ShouldStatAllZeroesBlocks) +{ + if (EncryptionMode != NProto::EEncryptionMode::ENCRYPTION_AES_XTS) { + return; + } + + Encryptor = std::make_shared( + TMockEncryptor::EBehaviour::EncryptToAllZeroes); + StartServer(); + + std::span hdr_w = Hdr(Memory, {.type = VIRTIO_BLK_T_OUT}); + std::span writeBuffer = Memory.Allocate( + SectorSize * SectorsPerRequest, + Unaligned ? 1 : SectorSize); + std::span writeStatus = Memory.Allocate(1); + + size_t writeCount = 0; + for (ui64 i = 0; i <= TotalSectorCount - SectorsPerRequest; ++i) { + reinterpret_cast(hdr_w.data())->sector = i; + auto writeOp = + Client.WriteAsync(QueueIndex, {hdr_w, writeBuffer}, {writeStatus}); + const ui32 len = writeOp.GetValueSync(); + EXPECT_EQ(1u, len); + EXPECT_EQ(VIRTIO_BLK_S_IOERR, writeStatus[0]); + ++writeCount; + } + + // validate stats + const auto completeStats = GetStats(0); + const auto& stats = completeStats.SimpleStats; + + EXPECT_EQ(0u, stats.CompFailed); + EXPECT_EQ(0u, stats.SubFailed); + EXPECT_EQ(0u, stats.Completed); + EXPECT_EQ(0u, stats.Dequeued); + EXPECT_EQ(0u, stats.Submitted); + EXPECT_EQ(writeCount, stats.EncryptorErrors); + + // validate crit events + EXPECT_EQ(writeCount, completeStats.CriticalEvents.size()); + for (auto& [sensorName, message]: completeStats.CriticalEvents) { + EXPECT_EQ("EncryptorGeneratedZeroBlock", sensorName); + EXPECT_EQ( + true, + message.StartsWith("Encryptor has generated a zero block #")); + } +} + +INSTANTIATE_TEST_SUITE_P( + ValueParametrized, + TServerTest, + testing::Combine( + testing::Values( + NProto::EEncryptionMode::NO_ENCRYPTION, + NProto::EEncryptionMode::ENCRYPTION_AES_XTS), + testing::Values(1, 2, 4), // Sectors per request + testing::Values(true, false) // Unaligned + )); + +} // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/stats.cpp b/cloud/blockstore/vhost-server/stats.cpp index 8a11e130e35..c328db3dfa6 100644 --- a/cloud/blockstore/vhost-server/stats.cpp +++ b/cloud/blockstore/vhost-server/stats.cpp @@ -1,5 +1,7 @@ #include "stats.h" +#include "critical_event.h" + #include #include @@ -21,8 +23,6 @@ class TCompletionStats TAutoEvent CompletionStatsEvent; public: - virtual ~TCompletionStats() = default; - std::optional Get(TDuration timeout) override { if (!IsCompletionStatsWaitTimeout) { @@ -49,6 +49,7 @@ class TCompletionStats CompletionStats.Completed = stats.Completed; CompletionStats.CompFailed = stats.CompFailed; + CompletionStats.EncryptorErrors = stats.EncryptorErrors; CompletionStats.Requests = stats.Requests; CompletionStats.Times = stats.Times; @@ -125,12 +126,14 @@ void WriteSizes(NJsonWriter::TBuf& buf, const auto& hist, const auto& prevHist) //////////////////////////////////////////////////////////////////////////////// void DumpStats( - const TSimpleStats& stats, + const TCompleteStats& completeStats, TSimpleStats& old, TDuration elapsed, IOutputStream& stream, ui64 cyclesPerMs) { + const auto & stats = completeStats.SimpleStats; + NJsonWriter::TBuf buf {NJsonWriter::HEM_DONT_ESCAPE_HTML, &stream}; auto write = [&] (TStringBuf key, ui64 value) { @@ -162,6 +165,21 @@ void DumpStats( write("submission_failed", stats.SubFailed - old.SubFailed); write("completed", stats.Completed - old.Completed); write("failed", stats.CompFailed - old.CompFailed); + write("encryptor_errors", stats.EncryptorErrors - old.EncryptorErrors); + + if (completeStats.CriticalEvents) { + buf.WriteKey("crit_events"); + buf.BeginList(); + for (const auto& criticalEvent: completeStats.CriticalEvents) { + buf.BeginObject(); + buf.WriteKey("name"); + buf.WriteString(criticalEvent.SensorName); + buf.WriteKey("message"); + buf.WriteString(criticalEvent.Message); + buf.EndObject(); + } + buf.EndList(); + } request(0, "read"); request(1, "write"); diff --git a/cloud/blockstore/vhost-server/stats.h b/cloud/blockstore/vhost-server/stats.h index b3cfc3f21d0..7f3b1630d15 100644 --- a/cloud/blockstore/vhost-server/stats.h +++ b/cloud/blockstore/vhost-server/stats.h @@ -2,6 +2,7 @@ #include "public.h" +#include "critical_event.h" #include "histogram.h" #include @@ -10,7 +11,6 @@ #include #include #include -#include class IOutputStream; @@ -33,7 +33,7 @@ struct TRequestStats TRequestStats() = default; template - TRequestStats(const TRequestStats& rhs) noexcept + explicit TRequestStats(const TRequestStats& rhs) noexcept : Count{rhs.Count} , Bytes{rhs.Bytes} , Errors{rhs.Errors} @@ -82,6 +82,7 @@ struct TStats T SubFailed = {}; T Completed = {}; T CompFailed = {}; + T EncryptorErrors = {}; std::array, 2> Requests = {}; std::array Times = {}; @@ -90,18 +91,16 @@ struct TStats TStats() = default; template - TStats(const TStats& rhs) noexcept + explicit TStats(const TStats& rhs) noexcept : Dequeued{rhs.Dequeued} , Submitted{rhs.Submitted} , SubFailed{rhs.SubFailed} , Completed{rhs.Completed} , CompFailed{rhs.CompFailed} + , EncryptorErrors{rhs.EncryptorErrors} , Requests{rhs.Requests[0], rhs.Requests[1]} , Times{rhs.Times[0], rhs.Times[1]} - , Sizes{ - rhs.Sizes[0], - rhs.Sizes[1], - } + , Sizes{rhs.Sizes[0], rhs.Sizes[1]} {} template @@ -112,6 +111,7 @@ struct TStats SubFailed = rhs.SubFailed; Completed = rhs.Completed; CompFailed = rhs.CompFailed; + EncryptorErrors = rhs.EncryptorErrors; Requests = rhs.Requests; Times = rhs.Times; Sizes = rhs.Sizes; @@ -127,6 +127,7 @@ struct TStats SubFailed += rhs.SubFailed; Completed += rhs.Completed; CompFailed += rhs.CompFailed; + EncryptorErrors += rhs.EncryptorErrors; for (size_t i = 0; i != Requests.size(); ++i) { Requests[i] += rhs.Requests[i]; @@ -147,6 +148,12 @@ struct TStats using TAtomicStats = TStats>; using TSimpleStats = TStats; +//////////////////////////////////////////////////////////////////////////////// +struct TCompleteStats { + TSimpleStats SimpleStats; + TCriticalEvents CriticalEvents; +}; + //////////////////////////////////////////////////////////////////////////////// struct ICompletionStats @@ -163,7 +170,7 @@ struct ICompletionStats ICompletionStatsPtr CreateCompletionStats(); void DumpStats( - const TSimpleStats& stats, + const TCompleteStats& completeStats, TSimpleStats& old, TDuration elapsed, IOutputStream& stream, diff --git a/cloud/blockstore/vhost-server/stats_ut.cpp b/cloud/blockstore/vhost-server/stats_ut.cpp index a6fcc946e79..45dcefd369c 100644 --- a/cloud/blockstore/vhost-server/stats_ut.cpp +++ b/cloud/blockstore/vhost-server/stats_ut.cpp @@ -1,4 +1,5 @@ #include "stats.h" +#include "critical_event.h" #include #include @@ -28,9 +29,11 @@ Y_UNIT_TEST_SUITE(TStatsTest) auto dump = [&] (auto dt) { TStringStream ss; - auto curStats = completionStats; + auto curStats = TCompleteStats{ + .SimpleStats = completionStats, + .CriticalEvents = TakeAccumulatedCriticalEvents()}; for (auto& s: queueStats) { - curStats += s; + curStats.SimpleStats += s; } DumpStats( @@ -96,6 +99,15 @@ Y_UNIT_TEST_SUITE(TStatsTest) write(32_MB, 32ms); } + // Fill crit events + TMultiMap testCritEvents{ + {"event1", "message 1"}, + {"event1", "message 2"}, + {"event2", "message n"}}; + for (const auto& [name, message]: testCritEvents) { + ReportCriticalEvent(name, message); + } + { auto stats = dump(1s); @@ -190,6 +202,17 @@ Y_UNIT_TEST_SUITE(TStatsTest) UNIT_ASSERT_VALUES_EQUAL_C(32_MB, size, "32M"); UNIT_ASSERT_VALUES_EQUAL(50, count); } + + { // Check crit events + UNIT_ASSERT_VALUES_EQUAL(true, stats.Has("crit_events")); + TMultiMap critEvents; + for (const auto& event: stats["crit_events"].GetArray()) { + const auto& name = event["name"].GetString(); + const auto& message = event["message"].GetString(); + critEvents.emplace(name, message); + } + UNIT_ASSERT_VALUES_EQUAL(testCritEvents, critEvents); + } } for (int i = 0; i != 800; ++i) { @@ -315,6 +338,12 @@ Y_UNIT_TEST_SUITE(TStatsTest) UNIT_ASSERT_VALUES_EQUAL_C(96_GB, size, "100G"); UNIT_ASSERT_VALUES_EQUAL(3, count); } + { + // The critical events list were taken in the last dump. + // ReportCriticalEvent() was not called after that, so the key + // "crit_events" absent. + UNIT_ASSERT_VALUES_EQUAL(false, stats.Has("crit_events")); + } } } } diff --git a/cloud/blockstore/vhost-server/ut/CMakeLists.linux-x86_64.txt b/cloud/blockstore/vhost-server/ut/CMakeLists.linux-x86_64.txt index 6025e016693..44313539cf7 100644 --- a/cloud/blockstore/vhost-server/ut/CMakeLists.linux-x86_64.txt +++ b/cloud/blockstore/vhost-server/ut/CMakeLists.linux-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(cloud-blockstore-vhost-server-ut PUBLIC contrib-libs-cxxsupp yutil library-cpp-cpuid_check + blockstore-libs-encryption cpp-testing-unittest_main vhost-server core-libs-common @@ -40,6 +41,7 @@ target_link_options(cloud-blockstore-vhost-server-ut PRIVATE target_sources(cloud-blockstore-vhost-server-ut PRIVATE ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/backend.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/backend_aio.cpp + ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/critical_event.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/histogram.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/histogram_ut.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/options.cpp @@ -47,7 +49,6 @@ target_sources(cloud-blockstore-vhost-server-ut PRIVATE ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/request_aio.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/request_aio_ut.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/server.cpp - ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/server_ut.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/stats.cpp ${CMAKE_SOURCE_DIR}/cloud/blockstore/vhost-server/stats_ut.cpp ) diff --git a/cloud/blockstore/vhost-server/ut/ya.make b/cloud/blockstore/vhost-server/ut/ya.make index 4e5ddcd6586..6b476821d53 100644 --- a/cloud/blockstore/vhost-server/ut/ya.make +++ b/cloud/blockstore/vhost-server/ut/ya.make @@ -6,6 +6,8 @@ SRCS( backend.cpp backend_aio.cpp + critical_event.cpp + histogram.cpp histogram_ut.cpp @@ -15,9 +17,6 @@ SRCS( request_aio.cpp request_aio_ut.cpp - server.cpp - server_ut.cpp - stats.cpp stats_ut.cpp ) @@ -27,6 +26,8 @@ ADDINCL( ) PEERDIR( + cloud/blockstore/libs/common + cloud/blockstore/libs/encryption/model cloud/contrib/vhost cloud/storage/core/libs/common cloud/storage/core/libs/diagnostics diff --git a/cloud/blockstore/vhost-server/ya.make b/cloud/blockstore/vhost-server/ya.make index a33d3378e85..614e3d0b960 100644 --- a/cloud/blockstore/vhost-server/ya.make +++ b/cloud/blockstore/vhost-server/ya.make @@ -7,6 +7,7 @@ SRCS( backend_aio.cpp backend_rdma.cpp backend_null.cpp + critical_event.cpp histogram.cpp options.cpp request_aio.cpp @@ -21,6 +22,9 @@ ADDINCL( PEERDIR( cloud/blockstore/libs/client + cloud/blockstore/libs/common + cloud/blockstore/libs/encryption + cloud/blockstore/libs/encryption/model cloud/blockstore/libs/rdma/impl cloud/blockstore/libs/service_local @@ -39,4 +43,7 @@ PEERDIR( END() -RECURSE_FOR_TESTS(ut) +RECURSE_FOR_TESTS( + gtest + ut +) diff --git a/cloud/storage/core/libs/diagnostics/critical_events.cpp b/cloud/storage/core/libs/diagnostics/critical_events.cpp index f9edddc0224..e3ac8cad87b 100644 --- a/cloud/storage/core/libs/diagnostics/critical_events.cpp +++ b/cloud/storage/core/libs/diagnostics/critical_events.cpp @@ -30,6 +30,11 @@ void InitCriticalEventsCounter(NMonitoring::TDynamicCountersPtr counters) #undef STORAGE_INIT_CRITICAL_EVENT_COUNTER } +TString GetCriticalEventFullName(const TString& name) +{ + return "AppCriticalEvents/" + name; +} + TString ReportCriticalEvent( const TString& sensorName, const TString& message, diff --git a/cloud/storage/core/libs/diagnostics/critical_events.h b/cloud/storage/core/libs/diagnostics/critical_events.h index e4959e99faa..a128371376e 100644 --- a/cloud/storage/core/libs/diagnostics/critical_events.h +++ b/cloud/storage/core/libs/diagnostics/critical_events.h @@ -17,6 +17,8 @@ namespace NCloud { void InitCriticalEventsCounter(NMonitoring::TDynamicCountersPtr counters); +TString GetCriticalEventFullName(const TString& name); + TString ReportCriticalEvent( const TString& sensorName, const TString& message,