diff --git a/cloud/blockstore/config/disk.proto b/cloud/blockstore/config/disk.proto index d9d31058ddd..7ca055fdb26 100644 --- a/cloud/blockstore/config/disk.proto +++ b/cloud/blockstore/config/disk.proto @@ -213,6 +213,40 @@ message TDiskAgentConfig // The path where Disk Agent will store the cached config. optional string CachedConfigPath = 27; + + // The path where Disk Agent will store active sessions. + optional string CachedSessionsPath = 28; +} + +//////////////////////////////////////////////////////////////////////////////// + +message TDiskAgentDeviceSession +{ + // Owner of the session. + optional string ClientId = 1; + + // List of device UUIDs. + repeated string DeviceIds = 2; + + // Access mode. + optional bool ReadOnly = 3; + + // MountSeqNumber from volume state. + optional uint64 MountSeqNumber = 4; + + // Disk id. + optional string DiskId = 5; + + // Volume generation. + optional uint32 VolumeGeneration = 6; + + // Last activity timestamp of the session (in microseconds). + optional uint64 LastActivityTs = 7; +}; + +message TDiskAgentDeviceSessionCache +{ + repeated TDiskAgentDeviceSession Sessions = 1; } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/diagnostics/critical_events.h b/cloud/blockstore/libs/diagnostics/critical_events.h index 4646973187f..194bda70a35 100644 --- a/cloud/blockstore/libs/diagnostics/critical_events.h +++ b/cloud/blockstore/libs/diagnostics/critical_events.h @@ -50,6 +50,9 @@ namespace NCloud::NBlockStore { xxx(DiskAgentConfigMismatch) \ xxx(DiskRegistryDeviceNotFoundSoft) \ xxx(DiskRegistrySourceDiskNotFound) \ + xxx(EndpointSwitchFailure) \ + xxx(DiskAgentSessionCacheUpdateError) \ + xxx(DiskAgentSessionCacheRestoreError) \ // BLOCKSTORE_CRITICAL_EVENTS #define BLOCKSTORE_IMPOSSIBLE_EVENTS(xxx) \ diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp index ee839b46ab0..7be7586c09d 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp @@ -19,12 +19,16 @@ #include #include +#include #include #include #include #include +#include +#include +#include #include namespace NCloud::NBlockStore::NStorage { @@ -252,6 +256,7 @@ TDiskAgentState::TDiskAgentState( , ProfileLog(std::move(profileLog)) , BlockDigestGenerator(std::move(blockDigestGenerator)) , Logging(std::move(logging)) + , Log(Logging->CreateLog("BLOCKSTORE_DISK_AGENT")) , RdmaServer(std::move(rdmaServer)) , NvmeManager(std::move(nvmeManager)) { @@ -455,6 +460,8 @@ TFuture TDiskAgentState::Initialize( Logging->CreateLog("BLOCKSTORE_DISK_AGENT")); InitRdmaTarget(std::move(rdmaTargetConfig)); + + RestoreSessions(*DeviceClient); }); } @@ -751,18 +758,16 @@ void TDiskAgentState::AcquireDevices( const TString& diskId, ui32 volumeGeneration) { - auto error = DeviceClient->AcquireDevices( + CheckError(DeviceClient->AcquireDevices( uuids, clientId, now, accessMode, mountSeqNumber, diskId, - volumeGeneration); + volumeGeneration)); - if (HasError(error)) { - ythrow TServiceError(error); - } + UpdateSessionCache(*DeviceClient); } void TDiskAgentState::ReleaseDevices( @@ -771,15 +776,13 @@ void TDiskAgentState::ReleaseDevices( const TString& diskId, ui32 volumeGeneration) { - auto error = DeviceClient->ReleaseDevices( + CheckError(DeviceClient->ReleaseDevices( uuids, clientId, diskId, - volumeGeneration); + volumeGeneration)); - if (HasError(error)) { - ythrow TServiceError(error); - } + UpdateSessionCache(*DeviceClient); } void TDiskAgentState::DisableDevice(const TString& uuid) @@ -815,4 +818,104 @@ void TDiskAgentState::StopTarget() } } +void TDiskAgentState::UpdateSessionCache(TDeviceClient& client) const +{ + const auto path = AgentConfig->GetCachedSessionsPath(); + + if (path.empty()) { + STORAGE_INFO("Session cache is not configured."); + return; + } + + try { + auto sessions = client.GetSessions(); + + NProto::TDiskAgentDeviceSessionCache proto; + proto.MutableSessions()->Assign( + std::make_move_iterator(sessions.begin()), + std::make_move_iterator(sessions.end()) + ); + + const TString tmpPath {path + ".tmp"}; + + SerializeToTextFormat(proto, tmpPath); + + if (!NFs::Rename(tmpPath, path)) { + const auto ec = errno; + ythrow TServiceError {MAKE_SYSTEM_ERROR(ec)} << strerror(ec); + } + } catch (...) { + STORAGE_ERROR("Can't update session cache: " << CurrentExceptionMessage()); + ReportDiskAgentSessionCacheUpdateError(); + } +} + +void TDiskAgentState::RestoreSessions(TDeviceClient& client) const +{ + const auto path = AgentConfig->GetCachedSessionsPath(); + + if (path.empty()) { + STORAGE_INFO("Session cache is not configured."); + return; + } + + if (!NFs::Exists(path)) { + STORAGE_INFO("Session cache is empty."); + return; + } + + try { + NProto::TDiskAgentDeviceSessionCache proto; + + ParseProtoTextFromFileRobust(path, proto); + + auto& sessions = *proto.MutableSessions(); + + STORAGE_INFO("Found " << sessions.size() + << " sessions in the session cache: " << JoinSeq(" ", sessions)); + + int errors = 0; + + for (auto& session: sessions) { + TVector uuids( + std::make_move_iterator(session.MutableDeviceIds()->begin()), + std::make_move_iterator(session.MutableDeviceIds()->end())); + + const auto error = client.AcquireDevices( + uuids, + session.GetClientId(), + TInstant::MicroSeconds(session.GetLastActivityTs()), + session.GetReadOnly() + ? NProto::VOLUME_ACCESS_READ_ONLY + : NProto::VOLUME_ACCESS_READ_WRITE, + session.GetMountSeqNumber(), + session.GetDiskId(), + session.GetVolumeGeneration()); + + if (HasError(error)) { + ++errors; + + STORAGE_ERROR("Can't restore session " + << session.GetClientId().Quote() << " from the cache: " + << FormatError(error)); + + client.ReleaseDevices( + uuids, + session.GetClientId(), + session.GetDiskId(), + session.GetVolumeGeneration()); + } + } + + if (errors) { + ReportDiskAgentSessionCacheRestoreError( + "some sessions have not recovered"); + } + } catch (...) { + STORAGE_ERROR("Can't restore sessions from the cache: " + << CurrentExceptionMessage()); + ReportDiskAgentSessionCacheRestoreError(); + } +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h index 4c5fcb1b849..5eb7a1f7115 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h @@ -48,6 +48,7 @@ class TDiskAgentState const IBlockDigestGeneratorPtr BlockDigestGenerator; ILoggingServicePtr Logging; + TLog Log; NSpdk::ISpdkTargetPtr SpdkTarget; NRdma::IServerPtr RdmaServer; IRdmaTargetPtr RdmaTarget; @@ -156,6 +157,9 @@ class TDiskAgentState NThreading::TFuture InitAioStorage(); void InitRdmaTarget(TRdmaTargetConfig rdmaTargetConfig); + + void UpdateSessionCache(TDeviceClient& client) const; + void RestoreSessions(TDeviceClient& client) const; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state_ut.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state_ut.cpp index cfd6e4755d8..53791a95ecc 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state_ut.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state_ut.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -111,6 +112,7 @@ struct TNullConfigParams NProto::TStorageDiscoveryConfig DiscoveryConfig; TString CachedConfigPath; + TString CachedSessionsPath; }; auto CreateNullConfig(TNullConfigParams params) @@ -132,6 +134,7 @@ auto CreateNullConfig(TNullConfigParams params) *config.MutableStorageDiscoveryConfig() = std::move(params.DiscoveryConfig); config.SetCachedConfigPath(std::move(params.CachedConfigPath)); + config.SetCachedSessionsPath(std::move(params.CachedSessionsPath)); return std::make_shared(std::move(config), "rack"); } @@ -293,6 +296,7 @@ struct TFiles }; const TString CachedConfigPath = TempDir.Path() / "nbs-disk-agent.txt"; + const TString CachedSessionsPath = TempDir.Path() / "nbs-disk-agent-sessions.txt"; const ui64 DefaultFileSize = DefaultDeviceBlockSize * DefaultBlocksCount; @@ -1596,6 +1600,311 @@ Y_UNIT_TEST_SUITE(TDiskAgentStateTest) } } } + + Y_UNIT_TEST_F(ShouldCacheSessions, TFiles) + { + PrepareFile(TempDir.Path() / "NVMENBS01", DefaultFileSize); + PrepareFile(TempDir.Path() / "NVMENBS02", DefaultFileSize); + PrepareFile(TempDir.Path() / "NVMENBS03", DefaultFileSize); + PrepareFile(TempDir.Path() / "NVMENBS04", DefaultFileSize); + + auto counters = MakeIntrusive(); + InitCriticalEventsCounter(counters); + + auto restoreError = counters->GetCounter( + "AppCriticalEvents/DiskAgentSessionCacheRestoreError", + true); + + UNIT_ASSERT_EQUAL(0, *restoreError); + + auto createState = [&] { + NProto::TStorageDiscoveryConfig discovery; + auto& path = *discovery.AddPathConfigs(); + path.SetPathRegExp(TempDir.Path() / "NVMENBS([0-9]+)"); + + auto& pool = *path.AddPoolConfigs(); + pool.SetMaxSize(DefaultFileSize); + + auto state = CreateDiskAgentStateNull( + CreateNullConfig({ + .AcquireRequired = true, + .DiscoveryConfig = discovery, + .CachedSessionsPath = CachedSessionsPath + }) + ); + + auto future = state->Initialize({}); + const auto& r = future.GetValue(WaitTimeout); + UNIT_ASSERT_VALUES_EQUAL_C(0, r.Errors.size(), r.Errors[0]); + + return state; + }; + + auto state = createState(); + + UNIT_ASSERT_EQUAL(0, *restoreError); + + TVector devices; + for (auto& d: state->GetDevices()) { + devices.push_back(d.GetDeviceUUID()); + } + Sort(devices); + + UNIT_ASSERT_VALUES_EQUAL(4, devices.size()); + + // acquire a bunch of devices + + state->AcquireDevices( + { devices[0], devices[1] }, + "writer-1", + TInstant::FromValue(1), + NProto::VOLUME_ACCESS_READ_WRITE, + 42, // MountSeqNumber + "vol0", + 1000); // VolumeGeneration + + state->AcquireDevices( + { devices[0], devices[1] }, + "reader-1", + TInstant::FromValue(2), + NProto::VOLUME_ACCESS_READ_ONLY, + -1, // MountSeqNumber + "vol0", + 1001); // VolumeGeneration + + state->AcquireDevices( + { devices[2], devices[3] }, + "reader-2", + TInstant::FromValue(3), + NProto::VOLUME_ACCESS_READ_ONLY, + -1, // MountSeqNumber + "vol1", + 2000); // VolumeGeneration + + // validate the cache file + + { + NProto::TDiskAgentDeviceSessionCache cache; + ParseProtoTextFromFileRobust(CachedSessionsPath, cache); + + UNIT_ASSERT_VALUES_EQUAL(3, cache.SessionsSize()); + + SortBy(*cache.MutableSessions(), [] (auto& session) { + return session.GetClientId(); + }); + + { + auto& session = *cache.MutableSessions(0); + Sort(*session.MutableDeviceIds()); + + UNIT_ASSERT_VALUES_EQUAL("reader-1", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + ASSERT_VECTORS_EQUAL( + TVector({devices[0], devices[1]}), + session.GetDeviceIds()); + + // MountSeqNumber is not applicable to read sessions + UNIT_ASSERT_VALUES_EQUAL(0, session.GetMountSeqNumber()); + UNIT_ASSERT_VALUES_EQUAL("vol0", session.GetDiskId()); + UNIT_ASSERT_VALUES_EQUAL(1001, session.GetVolumeGeneration()); + UNIT_ASSERT_VALUES_EQUAL(2, session.GetLastActivityTs()); + UNIT_ASSERT(session.GetReadOnly()); + } + + { + auto& session = *cache.MutableSessions(1); + + UNIT_ASSERT_VALUES_EQUAL("reader-2", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + ASSERT_VECTORS_EQUAL( + TVector({devices[2], devices[3]}), + session.GetDeviceIds()); + + // MountSeqNumber is not applicable to read sessions + UNIT_ASSERT_VALUES_EQUAL(0, session.GetMountSeqNumber()); + UNIT_ASSERT_VALUES_EQUAL("vol1", session.GetDiskId()); + UNIT_ASSERT_VALUES_EQUAL(2000, session.GetVolumeGeneration()); + UNIT_ASSERT_VALUES_EQUAL(3, session.GetLastActivityTs()); + UNIT_ASSERT(session.GetReadOnly()); + } + + { + auto& session = *cache.MutableSessions(2); + Sort(*session.MutableDeviceIds()); + + UNIT_ASSERT_VALUES_EQUAL("writer-1", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + ASSERT_VECTORS_EQUAL( + TVector({devices[0], devices[1]}), + session.GetDeviceIds()); + + UNIT_ASSERT_VALUES_EQUAL(42, session.GetMountSeqNumber()); + UNIT_ASSERT_VALUES_EQUAL("vol0", session.GetDiskId()); + + // VolumeGeneration was updated by reader-1 + UNIT_ASSERT_VALUES_EQUAL(1001, session.GetVolumeGeneration()); + UNIT_ASSERT_VALUES_EQUAL(1, session.GetLastActivityTs()); + UNIT_ASSERT(!session.GetReadOnly()); + } + } + + // restart + state = createState(); + + UNIT_ASSERT_EQUAL(0, *restoreError); + + auto write = [&] (auto clientId, auto uuid) { + NProto::TWriteDeviceBlocksRequest request; + request.MutableHeaders()->SetClientId(clientId); + request.SetDeviceUUID(uuid); + request.SetStartIndex(1); + request.SetBlockSize(4096); + + ResizeIOVector(*request.MutableBlocks(), 10, 4096); + + return state->Write(Now(), std::move(request)) + .GetValue(WaitTimeout); + }; + + auto read = [&] (auto clientId, auto uuid) { + NProto::TReadDeviceBlocksRequest request; + request.MutableHeaders()->SetClientId(clientId); + request.SetDeviceUUID(uuid); + request.SetStartIndex(1); + request.SetBlockSize(4096); + request.SetBlocksCount(10); + + return state->Read(Now(), std::move(request)) + .GetValue(WaitTimeout); + }; + + // should reject a request with a wrong client id + UNIT_ASSERT_EXCEPTION_SATISFIES( + write("unknown", devices[1]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + + // should reject a request with a wrong client id + UNIT_ASSERT_EXCEPTION_SATISFIES( + write("reader-2", devices[0]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + + // should reject a request with a wrong client id + UNIT_ASSERT_EXCEPTION_SATISFIES( + read("reader-2", devices[1]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + + // should reject a write request for the read only session + UNIT_ASSERT_EXCEPTION_SATISFIES( + write("reader-1", devices[1]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + + // should be ok + + { + auto error = write("writer-1", devices[0]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + { + auto error = write("writer-1", devices[1]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + { + auto error = read("reader-1", devices[0]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + { + auto error = read("reader-1", devices[1]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + { + auto error = read("reader-2", devices[2]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + { + auto error = read("reader-2", devices[3]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + state->ReleaseDevices( + { devices[0], devices[1] }, + "writer-1", + "vol0", + 1001); // VolumeGeneration + + // remove reader-2's file + TFsPath { state->GetDeviceName(devices[3]) }.DeleteIfExists(); + + // restart + state = createState(); + + // reader-2 is broken + UNIT_ASSERT_EQUAL(1, *restoreError); + + { + UNIT_ASSERT_EXCEPTION_SATISFIES( + write("writer-1", devices[0]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + } + + { + UNIT_ASSERT_EXCEPTION_SATISFIES( + write("writer-1", devices[1]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + } + + { + UNIT_ASSERT_EXCEPTION_SATISFIES( + read("reader-2", devices[2]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_BS_INVALID_SESSION; + }); + } + + { + UNIT_ASSERT_EXCEPTION_SATISFIES( + read("reader-2", devices[3]), + TServiceError, + [] (auto& e) { + return e.GetCode() == E_NOT_FOUND; + }); + } + + // should be ok + + { + auto error = read("reader-1", devices[0]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + + { + auto error = read("reader-1", devices[1]).GetError(); + UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error); + } + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/disk_agent/model/config.cpp b/cloud/blockstore/libs/storage/disk_agent/model/config.cpp index 2ec3bf43238..cae058c9dbf 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/config.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/config.cpp @@ -37,6 +37,7 @@ namespace { xxx(DeviceLockingEnabled, bool, false )\ xxx(DeviceHealthCheckDisabled, bool, false )\ xxx(CachedConfigPath, TString, "" )\ + xxx(CachedSessionsPath, TString, "" )\ // BLOCKSTORE_AGENT_CONFIG #define BLOCKSTORE_DECLARE_CONFIG(name, type, value) \ diff --git a/cloud/blockstore/libs/storage/disk_agent/model/config.h b/cloud/blockstore/libs/storage/disk_agent/model/config.h index a99ef824ae1..a091eab5803 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/config.h +++ b/cloud/blockstore/libs/storage/disk_agent/model/config.h @@ -99,6 +99,7 @@ class TDiskAgentConfig } TString GetCachedConfigPath() const; + TString GetCachedSessionsPath() const; void Dump(IOutputStream& out) const; void DumpHtml(IOutputStream& out) const; diff --git a/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp b/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp index 70a90ee7364..63f82ea356d 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/device_client.cpp @@ -21,6 +21,59 @@ TDeviceClient::TDeviceClient( , Log(std::move(log)) {} +TVector TDeviceClient::GetSessions() const +{ + THashMap sessions; + + for (const auto& [id, state]: Devices) { + TReadGuard g(state->Lock); + + if (!state->WriterSession.Id.empty()) { + auto& session = sessions[state->WriterSession.Id]; + + if (session.GetClientId().empty()) { + const auto& ws = state->WriterSession; + + session.SetClientId(ws.Id); + session.SetReadOnly(false); + session.SetDiskId(state->DiskId); + session.SetVolumeGeneration(state->VolumeGeneration); + session.SetMountSeqNumber(ws.MountSeqNumber); + session.SetLastActivityTs(ws.LastActivityTs.MicroSeconds()); + } + *session.AddDeviceIds() = id; + } + + for (const auto& rs: state->ReaderSessions) { + auto& session = sessions[rs.Id]; + if (session.GetClientId().empty()) { + session.SetClientId(rs.Id); + session.SetReadOnly(true); + session.SetDiskId(state->DiskId); + session.SetVolumeGeneration(state->VolumeGeneration); + session.SetMountSeqNumber(rs.MountSeqNumber); + session.SetLastActivityTs(rs.LastActivityTs.MicroSeconds()); + } + *session.AddDeviceIds() = id; + } + } + + TVector r; + r.reserve(sessions.size()); + + for (auto&& [_, session]: sessions) { + Sort(*session.MutableDeviceIds()); + + r.push_back(std::move(session)); + } + + SortBy(r, [] (const auto& session) { + return TStringBuf {session.GetClientId()}; + }); + + return r; +} + NCloud::NProto::TError TDeviceClient::AcquireDevices( const TVector& uuids, const TString& clientId, @@ -34,6 +87,7 @@ NCloud::NProto::TError TDeviceClient::AcquireDevices( return MakeError(E_ARGUMENT, "empty client id"); } + // check devices for (const auto& uuid: uuids) { TDeviceState* deviceState = GetDeviceState(uuid); if (!deviceState) { @@ -55,9 +109,6 @@ NCloud::NProto::TError TDeviceClient::AcquireDevices( << ", LastGeneration: " << deviceState->VolumeGeneration); } - deviceState->DiskId = diskId; - deviceState->VolumeGeneration = volumeGeneration; - if (IsReadWriteMode(accessMode) && deviceState->WriterSession.Id && deviceState->WriterSession.Id != clientId @@ -73,6 +124,7 @@ NCloud::NProto::TError TDeviceClient::AcquireDevices( } } + // acquire devices for (const auto& uuid: uuids) { TDeviceState& ds = *GetDeviceState(uuid); @@ -135,7 +187,7 @@ NCloud::NProto::TError TDeviceClient::ReleaseDevices( return MakeError(E_ARGUMENT, "empty client id"); } - for (const auto& uuid : uuids) { + for (const auto& uuid: uuids) { auto* deviceState = GetDeviceState(uuid); if (deviceState == nullptr) { diff --git a/cloud/blockstore/libs/storage/disk_agent/model/device_client.h b/cloud/blockstore/libs/storage/disk_agent/model/device_client.h index f3dd3f9db22..94df2aa276f 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/device_client.h +++ b/cloud/blockstore/libs/storage/disk_agent/model/device_client.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -46,7 +47,7 @@ class TDeviceClient final TDeviceClient( TDuration releaseInactiveSessionsTimeout, TVector uuids, - TLog Log); + TLog log); TDeviceClient(const TDeviceClient&) = delete; TDeviceClient& operator=(const TDeviceClient&) = delete; @@ -81,6 +82,8 @@ class TDeviceClient final void EnableDevice(const TString& uuid) const; bool IsDeviceDisabled(const TString& uuid) const; + TVector GetSessions() const; + private: static TDevicesState MakeDevices(TVector uuids); [[nodiscard]] TDeviceState* GetDeviceState(const TString& uuid) const; diff --git a/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp b/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp index 0c39625ce08..107950ace47 100644 --- a/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/model/device_client_ut.cpp @@ -592,6 +592,80 @@ Y_UNIT_TEST_SUITE(TDeviceClientTest) .SetVolumeGeneration(1)); UNIT_ASSERT_VALUES_EQUAL_C(S_OK, error.GetCode(), error.GetMessage()); } + + Y_UNIT_TEST_F(TestGetSessions, TFixture) + { + auto client = CreateClient({ + .Devices = {"uuid1", "uuid2"} + }); + + UNIT_ASSERT_VALUES_EQUAL(0, client.GetSessions().size()); + + AcquireDevices( + client, + TAcquireParamsBuilder() + .SetUuids({"uuid2", "uuid1"}) + .SetClientId("writer") + .SetDiskId("vol1") + .SetNow(TInstant::Seconds(42)) + .SetVolumeGeneration(1)); + + { + auto sessions = client.GetSessions(); + UNIT_ASSERT_VALUES_EQUAL(1, sessions.size()); + + auto& session = sessions[0]; + UNIT_ASSERT_VALUES_EQUAL("writer", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(1, session.GetVolumeGeneration()); + UNIT_ASSERT(!session.GetReadOnly()); + UNIT_ASSERT_VALUES_EQUAL( + TInstant::Seconds(42), + TInstant::MicroSeconds(session.GetLastActivityTs())); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + UNIT_ASSERT_VALUES_EQUAL("uuid1", session.GetDeviceIds(0)); + UNIT_ASSERT_VALUES_EQUAL("uuid2", session.GetDeviceIds(1)); + } + + AcquireDevices( + client, + TAcquireParamsBuilder() + .SetUuids({"uuid2", "uuid1"}) + .SetClientId("reader") + .SetDiskId("vol1") + .SetAccessMode(NProto::VOLUME_ACCESS_READ_ONLY) + .SetNow(TInstant::Seconds(100)) + .SetVolumeGeneration(2)); + + { + auto sessions = client.GetSessions(); + UNIT_ASSERT_VALUES_EQUAL(2, sessions.size()); + { + auto& session = sessions[0]; + UNIT_ASSERT_VALUES_EQUAL("reader", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(2, session.GetVolumeGeneration()); + UNIT_ASSERT(session.GetReadOnly()); + UNIT_ASSERT_VALUES_EQUAL( + TInstant::Seconds(100), + TInstant::MicroSeconds(session.GetLastActivityTs())); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + UNIT_ASSERT_VALUES_EQUAL("uuid1", session.GetDeviceIds(0)); + UNIT_ASSERT_VALUES_EQUAL("uuid2", session.GetDeviceIds(1)); + } + + { + auto& session = sessions[1]; + UNIT_ASSERT_VALUES_EQUAL("writer", session.GetClientId()); + UNIT_ASSERT_VALUES_EQUAL(2, session.GetVolumeGeneration()); + UNIT_ASSERT(!session.GetReadOnly()); + UNIT_ASSERT_VALUES_EQUAL( + TInstant::Seconds(42), + TInstant::MicroSeconds(session.GetLastActivityTs())); + UNIT_ASSERT_VALUES_EQUAL(2, session.DeviceIdsSize()); + UNIT_ASSERT_VALUES_EQUAL("uuid1", session.GetDeviceIds(0)); + UNIT_ASSERT_VALUES_EQUAL("uuid2", session.GetDeviceIds(1)); + } + } + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/testlib/ut_helpers.h b/cloud/blockstore/libs/storage/testlib/ut_helpers.h index 924e7300fe0..421915d8fc6 100644 --- a/cloud/blockstore/libs/storage/testlib/ut_helpers.h +++ b/cloud/blockstore/libs/storage/testlib/ut_helpers.h @@ -15,12 +15,12 @@ namespace NCloud::NBlockStore::NStorage { } \ // ASSERT_MAP_EQUAL -#define ASSERT_VECTORS_EQUAL(li, ri) [l = li, r = ri] { \ +#define ASSERT_VECTORS_EQUAL(li, ri) [] (const auto& l, const auto& r) { \ for (size_t i = 0; i < Min(l.size(), r.size()); ++i) { \ UNIT_ASSERT_VALUES_EQUAL_C(l[i], r[i], i); \ } \ UNIT_ASSERT_VALUES_EQUAL(l.size(), r.size()); \ -}() \ +}((li), (ri)) \ // ASSERT_VECTORS_EQUAL #define ASSERT_VECTOR_CONTENTS_EQUAL(li, ri) { \ diff --git a/cloud/blockstore/public/sdk/python/client/safe_client.py b/cloud/blockstore/public/sdk/python/client/safe_client.py index 6230557d8c8..fde8fd79122 100644 --- a/cloud/blockstore/public/sdk/python/client/safe_client.py +++ b/cloud/blockstore/public/sdk/python/client/safe_client.py @@ -62,6 +62,7 @@ def create_volume_async( base_disk_id="", base_disk_checkpoint_id="", partitions_count=1, + storage_pool_name=None, idempotence_id=None, timestamp=None, trace_id=None, @@ -79,7 +80,8 @@ def create_volume_async( TabletVersion=tablet_version, BaseDiskId=base_disk_id, BaseDiskCheckpointId=base_disk_checkpoint_id, - PartitionsCount=partitions_count + PartitionsCount=partitions_count, + StoragePoolName=storage_pool_name ) return self.__impl.create_volume_async( request, @@ -103,6 +105,7 @@ def create_volume( base_disk_id="", base_disk_checkpoint_id="", partitions_count=1, + storage_pool_name=None, idempotence_id=None, timestamp=None, trace_id=None, @@ -120,7 +123,8 @@ def create_volume( TabletVersion=tablet_version, BaseDiskId=base_disk_id, BaseDiskCheckpointId=base_disk_checkpoint_id, - PartitionsCount=partitions_count + PartitionsCount=partitions_count, + StoragePoolName=storage_pool_name ) self.__impl.create_volume( request, diff --git a/cloud/blockstore/tests/python/lib/config.py b/cloud/blockstore/tests/python/lib/config.py index 5050e280a0d..36bea6271e8 100644 --- a/cloud/blockstore/tests/python/lib/config.py +++ b/cloud/blockstore/tests/python/lib/config.py @@ -8,7 +8,7 @@ from cloud.blockstore.config.diagnostics_pb2 import TDiagnosticsConfig from cloud.blockstore.config.disk_pb2 import TDiskRegistryProxyConfig, \ TDiskAgentConfig, TFileDeviceArgs, TStorageDiscoveryConfig, \ - DISK_AGENT_BACKEND_AIO + DISK_AGENT_BACKEND_AIO, EDeviceEraseMethod from cloud.blockstore.config.storage_pb2 import TStorageServiceConfig from cloud.blockstore.config.server_pb2 import TServerAppConfig, \ @@ -274,6 +274,7 @@ def generate_dr_proxy_txt(): def generate_disk_agent_txt( agent_id, file_devices=None, + device_erase_method=None, storage_discovery_config=None): config = TDiskAgentConfig() @@ -287,6 +288,9 @@ def generate_disk_agent_txt( config.RegisterRetryTimeout = 1000 # 1 second config.ShutdownTimeout = 0 + if device_erase_method is not None: + config.DeviceEraseMethod = EDeviceEraseMethod.Value(device_erase_method) + if file_devices is not None: for device in file_devices: config.FileDevices.add().CopyFrom( diff --git a/cloud/blockstore/tests/python/lib/daemon.py b/cloud/blockstore/tests/python/lib/daemon.py index 6748e720859..75aa3b9d38c 100644 --- a/cloud/blockstore/tests/python/lib/daemon.py +++ b/cloud/blockstore/tests/python/lib/daemon.py @@ -120,14 +120,16 @@ def mon_port(self): def counters(self): return _get_counters(self.mon_port) - def wait_for_registration(self, delay_sec=1, max_retry_number=None): + def is_registered(self): url = f"http://localhost:{self.mon_port}/blockstore/disk_agent" + r = requests.get(url, timeout=10) + r.raise_for_status() + return r.text.find('Registered') != -1 + def wait_for_registration(self, delay_sec=1, max_retry_number=None): i = 0 while True: - r = requests.get(url, timeout=10) - r.raise_for_status() - if r.text.find('Registered') != -1: + if self.is_registered(): return True i += 1 if max_retry_number is not None and i >= max_retry_number: @@ -137,12 +139,12 @@ def wait_for_registration(self, delay_sec=1, max_retry_number=None): return False -def start_nbs(config: NbsConfigurator): +def start_nbs(config: NbsConfigurator, name='nbs-server'): exe_path = yatest_common.binary_path("cloud/blockstore/apps/server/nbsd") cwd = get_unique_path_for_current_test( output_path=yatest_common.output_path(), - sub_folder="nbs-server" + sub_folder=f"{name}-{config.ic_port}" ) ensure_path_exists(cwd) @@ -168,12 +170,12 @@ def start_nbs(config: NbsConfigurator): return nbs -def start_disk_agent(config: NbsConfigurator): +def start_disk_agent(config: NbsConfigurator, name='disk-agent'): exe_path = yatest_common.binary_path("cloud/blockstore/apps/disk_agent/diskagentd") cwd = get_unique_path_for_current_test( output_path=yatest_common.output_path(), - sub_folder="nbs-disk-agent" + sub_folder=f"{name}-{config.ic_port}" ) ensure_path_exists(cwd) diff --git a/cloud/blockstore/tests/session_cache/test.py b/cloud/blockstore/tests/session_cache/test.py new file mode 100644 index 00000000000..ba00f3a69b2 --- /dev/null +++ b/cloud/blockstore/tests/session_cache/test.py @@ -0,0 +1,180 @@ +import hashlib +import os +import pytest + +import cloud.blockstore.public.sdk.python.protos as protos + +from cloud.blockstore.public.sdk.python.client import CreateClient, \ + ClientError, Session +from cloud.blockstore.tests.python.lib.client import NbsClient +from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \ + generate_disk_agent_txt +from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \ + start_disk_agent, get_fqdn + +import yatest.common as yatest_common + +from contrib.ydb.tests.library.harness.kikimr_runner import get_unique_path_for_current_test, \ + ensure_path_exists + +from library.python.retry import retry + + +DEVICE_SIZE = 1024**2 +KNOWN_DEVICE_POOLS = { + "KnownDevicePools": [ + {"Name": "1Mb", "Kind": "DEVICE_POOL_KIND_LOCAL", "AllocationUnit": DEVICE_SIZE}, + ]} + + +@pytest.fixture(name='ydb') +def start_ydb_cluster(): + + ydb_cluster = start_ydb() + + yield ydb_cluster + + ydb_cluster.stop() + + +@pytest.fixture(name='agent_id') +def get_agent_id(): + return get_fqdn() + + +@pytest.fixture(name='data_path') +def create_data_path(): + + p = get_unique_path_for_current_test( + output_path=yatest_common.output_path(), + sub_folder="data") + + p = os.path.join(p, "dev", "disk", "by-partlabel") + ensure_path_exists(p) + + return p + + +@pytest.fixture(autouse=True) +def create_device_files(data_path): + + for i in range(6): + with open(os.path.join(data_path, f"NVMENBS{i + 1:02}"), 'wb') as f: + f.seek(DEVICE_SIZE-1) + f.write(b'\0') + f.flush() + + +@pytest.fixture(name='disk_agent_configurator') +def create_disk_agent_configurator(ydb, data_path): + + configurator = NbsConfigurator(ydb, 'disk-agent') + configurator.generate_default_nbs_configs() + + disk_agent_config = generate_disk_agent_txt( + agent_id='', + device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up test + storage_discovery_config={ + "PathConfigs": [{ + "PathRegExp": f"{data_path}/NVMENBS([0-9]+)", + "PoolConfigs": [{ + "PoolName": "1Mb", + "MinSize": DEVICE_SIZE, + "MaxSize": DEVICE_SIZE + }]}] + }) + + disk_agent_config.CachedSessionsPath = os.path.join( + get_unique_path_for_current_test(yatest_common.output_path(), ''), + "nbs-disk-agent-sessions.txt") + + configurator.files["disk-agent"] = disk_agent_config + + return configurator + + +@pytest.fixture(name='nbs_volume_host') +def start_nbs_volume_host(ydb): + + cfg = NbsConfigurator(ydb) + cfg.generate_default_nbs_configs() + cfg.files["storage"].DisableLocalService = 1 + + daemon = start_nbs(cfg, 'volume-host') + + yield daemon + + daemon.kill() + + +def _md5(s): + return hashlib.md5(s.encode("utf-8")).hexdigest() + + +@pytest.fixture(name='nbs_dr_host') +def start_nbs_dr_host(ydb, agent_id): + + cfg = NbsConfigurator(ydb) + cfg.generate_default_nbs_configs() + + cfg.files["storage"].DisableLocalService = 0 + cfg.files["storage"].NonReplicatedDontSuspendDevices = True + + daemon = start_nbs(cfg, name='dr-host') + + client = NbsClient(daemon.port) + client.disk_registry_set_writable_state() + client.update_disk_registry_config({ + "KnownAgents": [{ + "AgentId": agent_id, + "KnownDevices": + [{"DeviceUUID": _md5(f"{agent_id}-{i + 1:02}")} for i in range(6)] + }], + } | KNOWN_DEVICE_POOLS) + + yield daemon + + daemon.kill() + + +def test_session_cache( + nbs_volume_host, + nbs_dr_host, + disk_agent_configurator): + + disk_agent = start_disk_agent(disk_agent_configurator) + disk_agent.wait_for_registration() + + grpc_client = CreateClient(f"localhost:{nbs_volume_host.port}") + + @retry(max_times=10, exception=ClientError) + def create_vol0(): + grpc_client.create_volume( + disk_id="vol0", + block_size=4096, + blocks_count=6*DEVICE_SIZE//4096, + storage_media_kind=protos.STORAGE_MEDIA_SSD_LOCAL, + storage_pool_name="1Mb") + + create_vol0() + + session = Session(grpc_client, "vol0", "") + session.mount_volume() + session.write_blocks(0, [b'\1' * 4096]) + + # kill DR + nbs_dr_host.kill() + + # restart DA + disk_agent.kill() + disk_agent = start_disk_agent(disk_agent_configurator) + + assert not disk_agent.is_registered() + + # I/O should work + session.write_blocks(1, [b'\1' * 4096]) + session.unmount_volume() + + assert not disk_agent.is_registered() + + disk_agent.kill() diff --git a/cloud/blockstore/tests/session_cache/ya.make b/cloud/blockstore/tests/session_cache/ya.make new file mode 100644 index 00000000000..ab918d044da --- /dev/null +++ b/cloud/blockstore/tests/session_cache/ya.make @@ -0,0 +1,25 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc) + +TEST_SRCS(test.py) + +DEPENDS( + cloud/blockstore/apps/client + cloud/blockstore/apps/disk_agent + cloud/blockstore/apps/server + contrib/ydb/apps/ydbd +) + +DATA( + arcadia/cloud/blockstore/tests/certs/server.crt + arcadia/cloud/blockstore/tests/certs/server.key +) + +PEERDIR( + cloud/blockstore/tests/python/lib + + library/python/retry +) + +END() diff --git a/cloud/blockstore/tests/ya.make b/cloud/blockstore/tests/ya.make index fe3325331ef..aaffe07e88e 100644 --- a/cloud/blockstore/tests/ya.make +++ b/cloud/blockstore/tests/ya.make @@ -22,6 +22,7 @@ RECURSE( rdma recipes resize-disk + session_cache spare_node stats_aggregator_perf storage_discovery