diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp index d6029ada65..62564f7d40 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp @@ -59,6 +59,39 @@ void RejectVolumeRequest( NCloud::Send(ctx, caller, std::move(response), callerCookie); } +//////////////////////////////////////////////////////////////////////////////// + +void CopyDataBuffer(TEvService::TEvWriteBlocksLocalRequest& request) +{ + auto& record = request.Record; + auto g = record.Sglist.Acquire(); + if (!g) { + return; + } + + const auto& sgList = g.Get(); + STORAGE_VERIFY_C( + record.GetBlocks().BuffersSize() == 0, + TWellKnownEntityTypes::DISK, + record.GetDiskId(), + TStringBuilder() << "Buffers: " << record.GetBlocks().BuffersSize()); + TSgList newSgList; + newSgList.reserve(sgList.size()); + for (const auto& block: sgList) { + auto& buffer = *record.MutableBlocks()->AddBuffers(); + buffer.ReserveAndResize(block.Size()); + memcpy(buffer.begin(), block.Data(), block.Size()); + newSgList.emplace_back(buffer.data(), buffer.size()); + } + record.Sglist.SetSgList(std::move(newSgList)); +} + +template +void CopyDataBuffer(T& t) +{ + Y_UNUSED(t); +} + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -719,6 +752,10 @@ void TVolumeActor::ForwardRequest( * to the underlying (storage) layer. */ if constexpr (IsWriteMethod) { + if (State->GetUseIntermediateWriteBuffer()) { + CopyDataBuffer(*msg); + } + const auto range = BuildRequestBlockRange( *msg, State->GetBlockSize()); diff --git a/cloud/blockstore/libs/storage/volume/volume_state.cpp b/cloud/blockstore/libs/storage/volume/volume_state.cpp index c66eefaf44..d264fdadb6 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_state.cpp @@ -239,6 +239,7 @@ void TVolumeState::Reset() UseFastPath = false; UseRdmaForThisVolume = false; AcceptInvalidDiskAllocationResponse = false; + UseIntermediateWriteBuffer = false; if (IsDiskRegistryMediaKind()) { if (Meta.GetDevices().size()) { @@ -297,6 +298,8 @@ void TVolumeState::Reset() TDuration::TryParse(value, MaxTimedOutDeviceStateDuration); } else if (tag == "use-fastpath") { UseFastPath = true; + } else if (tag == "use-intermediate-write-buffer") { + UseIntermediateWriteBuffer = true; } } diff --git a/cloud/blockstore/libs/storage/volume/volume_state.h b/cloud/blockstore/libs/storage/volume/volume_state.h index 3b421005dc..6df2dd1dee 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.h +++ b/cloud/blockstore/libs/storage/volume/volume_state.h @@ -226,6 +226,7 @@ class TVolumeState bool UseRdmaForThisVolume = false; bool RdmaUnavailable = false; TDuration MaxTimedOutDeviceStateDuration; + bool UseIntermediateWriteBuffer = false; bool UseMirrorResync = false; bool ForceMirrorResync = false; @@ -677,6 +678,11 @@ class TVolumeState return MaxTimedOutDeviceStateDuration; } + bool GetUseIntermediateWriteBuffer() const + { + return UseIntermediateWriteBuffer; + } + size_t GetUsedBlockCount() const { return UsedBlocks ? UsedBlocks->Count() : 0; diff --git a/cloud/blockstore/libs/storage/volume/volume_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_ut.cpp index 56c8890dd7..7278cede09 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut.cpp @@ -21,10 +21,10 @@ using namespace NTestVolume; using namespace NTestVolumeHelpers; -//////////////////////////////////////////////////////////////////////////////// - namespace NTestVolumeHelpers { +//////////////////////////////////////////////////////////////////////////////// + TBlockRange64 GetBlockRangeById(ui32 blockIndex) { return TBlockRange64::WithLength(1024 * blockIndex, 1024); @@ -9178,6 +9178,135 @@ Y_UNIT_TEST_SUITE(TVolumeTest) v.GetReplicas(1).GetDevices(0).GetTransportId()); } } + + TVector WriteToM3DiskWithInflightDataCorruptionAndReadResults( + const TString& tags) + { + NProto::TStorageServiceConfig config; + config.SetAcquireNonReplicatedDevices(true); + auto state = MakeIntrusive(); + auto runtime = PrepareTestActorRuntime(config, state); + + TVolumeClient volume(*runtime); + + state->ReplicaCount = 2; + + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3, + 1024, + "vol0", + "cloud", + "folder", + 1, // partition count + 0, // blocksPerStripe + tags); + + volume.WaitReady(); + + auto clientInfo = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(clientInfo); + + // intercepting write request to one of the replicas + TAutoPtr writeToReplica; + THashMap sender2WriteCount; + auto obs = [&] (TTestActorRuntimeBase&, TAutoPtr& event) { + if (event->GetTypeRewrite() + == TEvService::EvWriteBlocksLocalRequest) + { + auto& writeCount = sender2WriteCount[event->Sender]; + ++writeCount; + // intercepting write request to the 3rd replica + if (writeCount == 3) { + writeToReplica = event.Release(); + return true; + } + } + + return false; + }; + + runtime->SetEventFilter(obs); + + const auto range = TBlockRange64::WithLength(0, 1); + const TString adata(4_KB, 'a'); + const TString bdata(4_KB, 'b'); + TString blockData; + // using explicit memcpy to avoid COW + blockData.ReserveAndResize(adata.size()); + memcpy(blockData.begin(), adata.c_str(), adata.size()); + + // sending write request to the volume - the request should hang + { + volume.SendWriteBlocksLocalRequest( + range, + clientInfo.GetClientId(), + blockData); + + runtime->DispatchEvents({}, TDuration::MilliSeconds(100)); + UNIT_ASSERT(writeToReplica); + TEST_NO_RESPONSE(runtime, WriteBlocksLocal); + + } + + // replacing block data + memcpy(blockData.begin(), bdata.c_str(), bdata.size()); + + // releasing the intercepted request + runtime->Send(writeToReplica.Release()); + runtime->DispatchEvents({}, TDuration::MilliSeconds(100)); + { + auto response = volume.RecvWriteBlocksLocalResponse(); + UNIT_ASSERT_VALUES_EQUAL_C( + S_OK, + response->GetStatus(), + response->GetErrorReason()); + } + + // the data in all replicas should be the same and should be equal to + // the version before the replacement + TVector results; + for (ui32 i = 0; i < 3; ++i) { + auto response = volume.ReadBlocks(range, clientInfo.GetClientId()); + const auto& bufs = response->Record.GetBlocks().GetBuffers(); + UNIT_ASSERT_VALUES_EQUAL(1, bufs.size()); + results.push_back(bufs[0]); + } + + volume.RemoveClient(clientInfo.GetClientId()); + + return results; + } + + Y_UNIT_TEST(ShouldCopyWriteRequestDataBeforeWritingToStorageIfTagIsSet) + { + auto results = WriteToM3DiskWithInflightDataCorruptionAndReadResults( + "use-intermediate-write-buffer"); + const TString adata(4_KB, 'a'); + const TString bdata(4_KB, 'b'); + UNIT_ASSERT_VALUES_EQUAL(adata, results[0]); + UNIT_ASSERT_VALUES_EQUAL(adata, results[1]); + UNIT_ASSERT_VALUES_EQUAL(adata, results[2]); + } + + Y_UNIT_TEST(ShouldHaveDifferentDataInReplicasUponInflightBufferCorruption) + { + auto results = + WriteToM3DiskWithInflightDataCorruptionAndReadResults(""); + const TString adata(4_KB, 'a'); + const TString bdata(4_KB, 'b'); + UNIT_ASSERT_VALUES_EQUAL(adata, results[0]); + UNIT_ASSERT_VALUES_EQUAL(adata, results[1]); + UNIT_ASSERT_VALUES_EQUAL(bdata, results[2]); + } } } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_ut.h b/cloud/blockstore/libs/storage/volume/volume_ut.h index 6f60491fdf..f980bf2592 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut.h +++ b/cloud/blockstore/libs/storage/volume/volume_ut.h @@ -1,6 +1,6 @@ #pragma once -#include "testlib/test_env.h" +#include namespace NCloud::NBlockStore::NStorage::NTestVolumeHelpers {