Skip to content

Commit

Permalink
merge to stable-23-3: blockstore-server: use-intermediate-write-buffe…
Browse files Browse the repository at this point in the history
…r tag implementation; filestore: compaction tuning, cli tuning, shard autocreation bits (#2470)

* moved unstable output from test canondata to test stderr (#2416)

* issue-2421: added volume tag which forces user write buffer copying to our internal buffers to avoid writing different data to different replicas of the same volume in case of concurrent buffer modifications by the client (#2431)

* issue-1932: automatic filesystem shard configuration upon filesystem creation (#2415)

* issue-1932: autosharding params

* issue-1932: automatic filesystem shard creation+configuration upon filesystem creation

* issue-1932: properly limiting max shard count

* issue-1932: fixed signed/unsigned comparison

* issue-1932: fixed use-after-free in ut (#2441)

* filestore-client: outputting progress in findgarbage; properly processing ForcedOperationStatus errors in forcedcompaction (#2442)

* issue-2137: if all ranges in CompactionMap have the 'compacted' flag then GetTop{Compaction,Garbage}Score should return zero (in order not to trigger dud Compaction iterations all the time) (#2455)

* issue-2421: safer code + extra ut for use-intermediate-write-buffer volume tag (#2452)

* issue-2421: nonrepl disk ut with use-intermediate-write-buffer volume tag

* issue-2421: nonrepl disk ut with use-intermediate-write-buffer volume tag - making the intentions a bit more clear

* issue-2421: made local->remote request conversion in partition_nonrepl safer, added some other safety checks and comments

* fixed build after incorrect merge 5887fb6

* fixed build 2
  • Loading branch information
qkrorlqr authored Nov 12, 2024
1 parent e91494f commit a72868b
Show file tree
Hide file tree
Showing 28 changed files with 1,082 additions and 142 deletions.
1 change: 1 addition & 0 deletions cloud/blockstore/libs/diagnostics/critical_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ namespace NCloud::NBlockStore {
xxx(DiskRegistryInsertToPendingCleanupFailed) \
xxx(OverlappingRangesDuringMigrationDetected) \
xxx(StartExternalEndpointError) \
xxx(EmptyRequestSgList) \
// BLOCKSTORE_IMPOSSIBLE_EVENTS

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,35 @@ void TNonreplicatedPartitionActor::HandleWriteBlocksLocal(
return;
}

if (guard.Get().empty()) {
// can happen only if there is a bug in the code of the layers above
// this one
ReportEmptyRequestSgList();
replyError(
ctx,
*requestInfo,
E_ARGUMENT,
"empty SgList in request");
return;
}

// convert local request to remote

// copying request data into a new TIOVector and moving it to msg->Record
// afterwards since msg->Record.Blocks can be holding current request data
// or parts of it
NProto::TIOVector blocks;
SgListCopy(
guard.Get(),
ResizeIOVector(
*msg->Record.MutableBlocks(),
blocks,
msg->Record.BlocksCount,
PartConfig->GetBlockSize()));
*msg->Record.MutableBlocks() = std::move(blocks);

// explicitly clearing request data (SgList) just in case anyone adds some
// code to TDiskAgentWriteActor that tries to use it
msg->Record.Sglist.SetSgList({});

const bool assignVolumeRequestId =
Config->GetAssignIdToWriteAndZeroRequestsEnabled() &&
Expand Down
50 changes: 50 additions & 0 deletions cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,40 @@ void RejectVolumeRequest(
NCloud::Send(ctx, caller, std::move(response), callerCookie);
}

////////////////////////////////////////////////////////////////////////////////

void CopySgListIntoRequestBuffers(
TEvService::TEvWriteBlocksLocalRequest& request)
{
auto& record = request.Record;
auto g = record.Sglist.Acquire();
if (!g) {
return;
}

const auto& sgList = g.Get();
STORAGE_VERIFY_C(
record.GetBlocks().BuffersSize() == 0,
TWellKnownEntityTypes::DISK,
record.GetDiskId(),
TStringBuilder() << "Buffers: " << record.GetBlocks().BuffersSize());
TSgList newSgList;
newSgList.reserve(sgList.size());
for (const auto& block: sgList) {
auto& buffer = *record.MutableBlocks()->AddBuffers();
buffer.ReserveAndResize(block.Size());
memcpy(buffer.begin(), block.Data(), block.Size());
newSgList.emplace_back(buffer.data(), buffer.size());
}
record.Sglist.SetSgList(std::move(newSgList));
}

template <typename T>
void CopySgListIntoRequestBuffers(T& t)
{
Y_UNUSED(t);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -714,6 +748,22 @@ void TVolumeActor::ForwardRequest(
}
}

/*
* Support for copying request data from the user-supplied buffer to some
* buffers which we own to protect the layer below Volume from bugged
* guests which modify buffer contents before receiving write response.
*
* Impacts performance (due to extra copying) and is thus not switched on
* by default.
*
* See https://github.com/ydb-platform/nbs/issues/2421
*/
if constexpr (IsWriteMethod<TMethod>) {
if (State->GetUseIntermediateWriteBuffer()) {
CopySgListIntoRequestBuffers(*msg);
}
}

/*
* Processing overlapping writes. Overlapping writes should not be sent
* to the underlying (storage) layer.
Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/volume/volume_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ void TVolumeState::Reset()
UseFastPath = false;
UseRdmaForThisVolume = false;
AcceptInvalidDiskAllocationResponse = false;
UseIntermediateWriteBuffer = false;

if (IsDiskRegistryMediaKind()) {
if (Meta.GetDevices().size()) {
Expand Down Expand Up @@ -297,6 +298,8 @@ void TVolumeState::Reset()
TDuration::TryParse(value, MaxTimedOutDeviceStateDuration);
} else if (tag == "use-fastpath") {
UseFastPath = true;
} else if (tag == "use-intermediate-write-buffer") {
UseIntermediateWriteBuffer = true;
}
}

Expand Down
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/volume/volume_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class TVolumeState
bool UseRdmaForThisVolume = false;
bool RdmaUnavailable = false;
TDuration MaxTimedOutDeviceStateDuration;
bool UseIntermediateWriteBuffer = false;

bool UseMirrorResync = false;
bool ForceMirrorResync = false;
Expand Down Expand Up @@ -677,6 +678,11 @@ class TVolumeState
return MaxTimedOutDeviceStateDuration;
}

bool GetUseIntermediateWriteBuffer() const
{
return UseIntermediateWriteBuffer;
}

size_t GetUsedBlockCount() const
{
return UsedBlocks ? UsedBlocks->Count() : 0;
Expand Down
170 changes: 168 additions & 2 deletions cloud/blockstore/libs/storage/volume/volume_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ using namespace NTestVolume;

using namespace NTestVolumeHelpers;

////////////////////////////////////////////////////////////////////////////////

namespace NTestVolumeHelpers {

////////////////////////////////////////////////////////////////////////////////

TBlockRange64 GetBlockRangeById(ui32 blockIndex)
{
return TBlockRange64::WithLength(1024 * blockIndex, 1024);
Expand Down Expand Up @@ -9180,6 +9180,172 @@ Y_UNIT_TEST_SUITE(TVolumeTest)
v.GetReplicas(1).GetDevices(0).GetTransportId());
}
}

TVector<TString> WriteToDiskWithInflightDataCorruptionAndReadResults(
NCloud::NProto::EStorageMediaKind mediaKind,
ui32 writeNumberToIntercept,
const TString& tags)
{
NProto::TStorageServiceConfig config;
config.SetAcquireNonReplicatedDevices(true);
auto state = MakeIntrusive<TDiskRegistryState>();
auto runtime = PrepareTestActorRuntime(config, state);

TVolumeClient volume(*runtime);

state->ReplicaCount = 2;

volume.UpdateVolumeConfig(
0,
0,
0,
0,
false,
1,
mediaKind,
1024,
"vol0",
"cloud",
"folder",
1, // partition count
0, // blocksPerStripe
tags);

volume.WaitReady();

auto clientInfo = CreateVolumeClientInfo(
NProto::VOLUME_ACCESS_READ_WRITE,
NProto::VOLUME_MOUNT_LOCAL,
0);
volume.AddClient(clientInfo);

// intercepting write request to one of the replicas
TAutoPtr<IEventHandle> writeToReplica;
ui32 writeNo = 0;
auto obs = [&] (TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
if (event->GetTypeRewrite()
== TEvService::EvWriteBlocksLocalRequest)
{
++writeNo;
if (writeNo == writeNumberToIntercept) {
writeToReplica = event.Release();
return true;
}
}

return false;
};

runtime->SetEventFilter(obs);

const auto range = TBlockRange64::WithLength(0, 1);
const TString adata(4_KB, 'a');
const TString bdata(4_KB, 'b');
TString blockData;
// using explicit memcpy to avoid COW
blockData.ReserveAndResize(adata.size());
memcpy(blockData.begin(), adata.c_str(), adata.size());

// sending write request to the volume - the request should hang
{
volume.SendWriteBlocksLocalRequest(
range,
clientInfo.GetClientId(),
blockData);

runtime->DispatchEvents({}, TDuration::MilliSeconds(100));
UNIT_ASSERT(writeToReplica);
TEST_NO_RESPONSE(runtime, WriteBlocksLocal);

}

// replacing block data
memcpy(blockData.begin(), bdata.c_str(), bdata.size());

// releasing the intercepted request
runtime->Send(writeToReplica.Release());
runtime->DispatchEvents({}, TDuration::MilliSeconds(100));
{
auto response = volume.RecvWriteBlocksLocalResponse();
UNIT_ASSERT_VALUES_EQUAL_C(
S_OK,
response->GetStatus(),
response->GetErrorReason());
}

// the data in all replicas should be the same and should be equal to
// the version before the replacement
TVector<TString> results;
for (ui32 i = 0; i < 3; ++i) {
auto response = volume.ReadBlocks(range, clientInfo.GetClientId());
const auto& bufs = response->Record.GetBlocks().GetBuffers();
UNIT_ASSERT_VALUES_EQUAL(1, bufs.size());
results.push_back(bufs[0]);
}

volume.RemoveClient(clientInfo.GetClientId());

return results;
}

Y_UNIT_TEST(ShouldCopyWriteRequestDataBeforeWritingToStorageIfTagIsSetM3)
{
auto results = WriteToDiskWithInflightDataCorruptionAndReadResults(
NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3,
// 1 - to volume
// 1 - to mirror actor
// 3 - to 3 replicas
1 + 1 + 3,
"use-intermediate-write-buffer");
const TString adata(4_KB, 'a');
UNIT_ASSERT_VALUES_EQUAL(adata, results[0]);
UNIT_ASSERT_VALUES_EQUAL(adata, results[1]);
UNIT_ASSERT_VALUES_EQUAL(adata, results[2]);
}

Y_UNIT_TEST(ShouldHaveDifferentDataInReplicasUponInflightBufferCorruptionM3)
{
auto results = WriteToDiskWithInflightDataCorruptionAndReadResults(
NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3,
// 1 - to volume
// 1 - to mirror actor
// 3 - to 3 replicas (nonrepl part actors)
1 + 1 + 3,
"");
const TString adata(4_KB, 'a');
const TString bdata(4_KB, 'b');
UNIT_ASSERT_VALUES_EQUAL(adata, results[0]);
UNIT_ASSERT_VALUES_EQUAL(adata, results[1]);
UNIT_ASSERT_VALUES_EQUAL(bdata, results[2]);
}

Y_UNIT_TEST(ShouldCopyWriteRequestDataBeforeWritingToStorageIfTagIsSetNonrepl)
{
auto results = WriteToDiskWithInflightDataCorruptionAndReadResults(
NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED,
// 1 - to volume
// 1 - to nonrepl part actor
1 + 1,
"use-intermediate-write-buffer");
const TString adata(4_KB, 'a');
UNIT_ASSERT_VALUES_EQUAL(adata, results[0]);
UNIT_ASSERT_VALUES_EQUAL(adata, results[1]);
UNIT_ASSERT_VALUES_EQUAL(adata, results[2]);
}

Y_UNIT_TEST(ShouldHaveChangedDataInStorageUponInflightBufferCorruptionNonrepl)
{
auto results = WriteToDiskWithInflightDataCorruptionAndReadResults(
NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED,
// 1 - to volume
// 1 - to nonrepl part actor
1 + 1,
"");
const TString bdata(4_KB, 'b');
UNIT_ASSERT_VALUES_EQUAL(bdata, results[0]);
UNIT_ASSERT_VALUES_EQUAL(bdata, results[1]);
UNIT_ASSERT_VALUES_EQUAL(bdata, results[2]);
}
}

} // namespace NCloud::NBlockStore::NStorage
2 changes: 1 addition & 1 deletion cloud/blockstore/libs/storage/volume/volume_ut.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "testlib/test_env.h"
#include <cloud/blockstore/libs/storage/volume/testlib/test_env.h>

namespace NCloud::NBlockStore::NStorage::NTestVolumeHelpers {

Expand Down
10 changes: 9 additions & 1 deletion cloud/filestore/apps/client/lib/find_garbage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,16 @@ class TFindGarbageCommand final
auto shardSessionGuard =
CreateCustomSession(shard, shard + "::" + ClientId);
auto& shardSession = shardSessionGuard.AccessSession();
FetchAll(shardSession, shard, RootNodeId, &shard2Nodes[shard]);
auto& shardNodes = shard2Nodes[shard];
STORAGE_INFO("Fetching nodes for shard " << shard);
FetchAll(shardSession, shard, RootNodeId, &shardNodes);
STORAGE_INFO("Fetched " << shardNodes.size() << " nodes");
}

STORAGE_INFO("Fetching nodes for leader");
TVector<TNode> leaderNodes;
FetchAll(session, FileSystemId, RootNodeId, &leaderNodes);
STORAGE_INFO("Fetched " << leaderNodes.size() << " nodes");

THashSet<TString> shardNames;
for (const auto& node: leaderNodes) {
Expand Down Expand Up @@ -183,8 +188,11 @@ class TFindGarbageCommand final
auto& shardSession = shardSessionGuard.AccessSession();
for (const auto& node: nodes) {
if (!shardNames.contains(node.Name)) {
STORAGE_INFO("Node " << node.Name << " not found in shard"
", calling stat");
auto stat =
Stat(shardSession, shard, RootNodeId, node.Name);
STORAGE_INFO("Stat done");

if (stat) {
results.push_back({shard, node.Name, stat->GetSize()});
Expand Down
6 changes: 3 additions & 3 deletions cloud/filestore/apps/client/lib/forced_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class TForcedCompactionCommand final
continue;
}

CheckResponse(statusResponse);

const auto processed = statusResponse.GetProcessedRangeCount();
const auto total = statusResponse.GetRangeCount();
if (processed >= total) {
Expand All @@ -111,9 +113,7 @@ class TForcedCompactionCommand final
break;
}

CheckResponse(statusResponse);

Cout << "progress: " << statusResponse.GetProcessedRangeCount()
Cerr << "progress: " << statusResponse.GetProcessedRangeCount()
<< "/" << statusResponse.GetRangeCount() << ", last="
<< statusResponse.GetLastProcessedRangeId() << Endl;

Expand Down
Loading

0 comments on commit a72868b

Please sign in to comment.