Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filestore, merge to stable 23-3] issue-1350: restoring follower sessions after follower restarts, issue-1350: increasing default IdleSessionTimeout to 1 hour, issue-1814: passing StorageStatusFlags for written blobs from StorageServiceActor to tablets via AddData requests #1819

Merged
merged 5 commits into from
Aug 19, 2024
2 changes: 1 addition & 1 deletion cloud/filestore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ using TAliases = NProto::TStorageConfig::TFilestoreAliases;
xxx(PipeClientMaxRetryTime, TDuration, TDuration::Seconds(4) )\
\
xxx(EstablishSessionTimeout, TDuration, TDuration::Seconds(30) )\
xxx(IdleSessionTimeout, TDuration, TDuration::Minutes(5) )\
xxx(IdleSessionTimeout, TDuration, TDuration::Hours(1) )\
\
xxx(WriteBatchEnabled, bool, false )\
xxx(WriteBatchTimeout, TDuration, TDuration::MilliSeconds(0) )\
Expand Down
10 changes: 10 additions & 0 deletions cloud/filestore/libs/storage/service/service_actor_writedata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
// in flight
TMaybe<TInFlightRequest> InFlightRequest;
TVector<std::unique_ptr<TInFlightRequest>> InFlightBSRequests;
TVector<ui32> StorageStatusFlags;
const NCloud::NProto::EStorageMediaKind MediaKind;

public:
Expand Down Expand Up @@ -173,6 +174,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>

RequestInfo->CallContext->RequestType = EFileStoreRequest::WriteBlob;
InFlightBSRequests.reserve(RemainingBlobsToWrite);
StorageStatusFlags.resize(GenerateBlobIdsResponse.BlobsSize());
for (const auto& blob: GenerateBlobIdsResponse.GetBlobs()) {
NKikimr::TLogoBlobID blobId =
LogoBlobIDFromLogoBlobID(blob.GetBlobId());
Expand Down Expand Up @@ -249,11 +251,14 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
ui64 blobIdx = msg->Id.Cookie();
// It is implicitly expected that cookies are generated in increasing
// order starting from 0.
// TODO: replace this TABLET_VERIFY with a critical event + WriteData
// fallback
TABLET_VERIFY(
blobIdx < InFlightBSRequests.size() &&
InFlightBSRequests[blobIdx] &&
!InFlightBSRequests[blobIdx]->IsCompleted());
InFlightBSRequests[blobIdx]->Complete(ctx.Now(), {});
StorageStatusFlags[blobIdx] = msg->StatusFlags.Raw;

--RemainingBlobsToWrite;
if (RemainingBlobsToWrite == 0) {
Expand All @@ -277,6 +282,11 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
request->Record.AddBlobIds()->Swap(blob.MutableBlobId());
}
request->Record.SetCommitId(GenerateBlobIdsResponse.GetCommitId());
request->Record.MutableStorageStatusFlags()->Reserve(
StorageStatusFlags.size());
for (const auto flags: StorageStatusFlags) {
request->Record.AddStorageStatusFlags(flags);
}

if (Range.Offset < BlobRange.Offset) {
auto& unalignedHead = *request->Record.AddUnalignedDataRanges();
Expand Down
304 changes: 304 additions & 0 deletions cloud/filestore/libs/storage/service/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2609,6 +2609,70 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
// clang-format on
}

Y_UNIT_TEST(ShouldSendBSGroupFlagsToTabletViaAddDataRequests)
{
TTestEnv env;
env.CreateSubDomain("nfs");

ui32 nodeIdx = env.CreateNode("nfs");

TServiceClient service(env.GetRuntime(), nodeIdx);
const TString fs = "test";
service.CreateFileStore(fs, 1000);

{
NProto::TStorageConfig newConfig;
newConfig.SetThreeStageWriteEnabled(true);
const auto response =
ExecuteChangeStorageConfig(std::move(newConfig), service);
env.GetRuntime().DispatchEvents({}, TDuration::Seconds(1));
}

auto headers = service.InitSession(fs, "client");
ui64 nodeId = service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle = service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();

const auto yellowFlag =
NKikimrBlobStorage::EStatusFlags::StatusDiskSpaceYellowStop;

NProtoPrivate::TAddDataRequest addData;
using TFlags = NKikimr::TStorageStatusFlags;
env.GetRuntime().SetEventFilter(
[&](auto& runtime, auto& event)
{
Y_UNUSED(runtime);

switch (event->GetTypeRewrite()) {
case TEvBlobStorage::EvPutResult: {
auto* msg =
event->template Get<TEvBlobStorage::TEvPutResult>();
const_cast<TFlags&>(msg->StatusFlags).Raw |=
ui32(yellowFlag);
break;
}

case TEvIndexTablet::EvAddDataRequest: {
addData = event->template Get<
TEvIndexTablet::TEvAddDataRequest>()->Record;
break;
}
}
return false;
});

TString data = GenerateValidateData(256_KB);
service.WriteData(headers, fs, nodeId, handle, 0, data);
UNIT_ASSERT_VALUES_EQUAL(1, addData.BlobIdsSize());
UNIT_ASSERT_VALUES_EQUAL(1, addData.StorageStatusFlagsSize());
UNIT_ASSERT(NKikimr::TStorageStatusFlags(
addData.GetStorageStatusFlags(0)).Check(yellowFlag));
}

void ConfigureFollowers(
TServiceClient& service,
const TString& fsId,
Expand Down Expand Up @@ -2781,6 +2845,246 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
}
}

Y_UNIT_TEST(ShouldRestoreSessionInFollowerAfterFollowerRestart)
{
const auto idleSessionTimeout = TDuration::Minutes(2);
NProto::TStorageConfig config;
config.SetIdleSessionTimeout(idleSessionTimeout.MilliSeconds());
TTestEnv env({}, config);
env.CreateSubDomain("nfs");

ui32 nodeIdx = env.CreateNode("nfs");

const TString fsId = "test";
const auto shard1Id = fsId + "-f1";
const auto shard2Id = fsId + "-f2";

TServiceClient service(env.GetRuntime(), nodeIdx);

TActorId leaderActorId;
ui64 shard1TabletId = -1;
ui64 shard2TabletId = -1;
const auto startupEventType =
TEvIndexTabletPrivate::EvLoadCompactionMapChunkRequest;
env.GetRuntime().SetEventFilter(
[&] (auto& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
switch (event->GetTypeRewrite()) {
case TEvSSProxy::EvDescribeFileStoreResponse: {
using TDesc = TEvSSProxy::TEvDescribeFileStoreResponse;
const auto* msg = event->Get<TDesc>();
const auto& desc =
msg->PathDescription.GetFileStoreDescription();
if (desc.GetConfig().GetFileSystemId() == shard1Id) {
shard1TabletId = desc.GetIndexTabletId();
}

if (desc.GetConfig().GetFileSystemId() == shard2Id) {
shard2TabletId = desc.GetIndexTabletId();
}

break;
}

case startupEventType: {
if (!leaderActorId) {
leaderActorId = event->Sender;
}

break;
}
}

return false;
});

service.CreateFileStore(fsId, 1'000);
service.CreateFileStore(shard1Id, 1'000);
service.CreateFileStore(shard2Id, 1'000);

ConfigureFollowers(service, fsId, shard1Id, shard2Id);

auto headers = service.InitSession(fsId, "client");
auto headers1 = headers;
headers1.FileSystemId = shard1Id;
auto headers2 = headers;
headers2.FileSystemId = shard2Id;

// creating nodes and handles in both shards

ui64 nodeId1 =
service
.CreateNode(headers1, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();

UNIT_ASSERT_VALUES_EQUAL((1LU << 56U) + 2, nodeId1);

ui64 handle1 =
service
.CreateHandle(
headers1,
headers1.FileSystemId,
nodeId1,
"",
TCreateHandleArgs::RDWR)
->Record.GetHandle();

UNIT_ASSERT_C(
handle1 >= (1LU << 56U) && handle1 < (2LU << 56U),
handle1);

ui64 nodeId2 =
service
.CreateNode(headers2, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();

UNIT_ASSERT_VALUES_EQUAL((2LU << 56U) + 2, nodeId2);

ui64 handle2 =
service
.CreateHandle(
headers2,
headers2.FileSystemId,
nodeId2,
"",
TCreateHandleArgs::RDWR)
->Record.GetHandle();

UNIT_ASSERT_C(
handle2 >= (2LU << 56U) && handle2 < (3LU << 56U),
handle2);

// rebooting shards

TVector<TActorId> fsActorIds;
env.GetRuntime().SetEventFilter(
[&] (auto& runtime, TAutoPtr<IEventHandle>& event) {
Y_UNUSED(runtime);
switch (event->GetTypeRewrite()) {
case TEvIndexTabletPrivate::EvLoadCompactionMapChunkRequest: {
// catching one of the startup events to detect shard
// actor ids

if (Find(fsActorIds, event->Sender) == fsActorIds.end()) {
fsActorIds.push_back(event->Sender);
}

break;
}
}

return false;
});

TIndexTabletClient shard1(
env.GetRuntime(),
nodeIdx,
shard1TabletId,
{}, // config
false // updateConfig
);
shard1.RebootTablet();

TIndexTabletClient shard2(
env.GetRuntime(),
nodeIdx,
shard2TabletId,
{}, // config
false // updateConfig
);
shard2.RebootTablet();

// triggering follower sessions sync
// sending the event manually since registration observers which enable
// scheduling for actors are reset upon tablet reboot

env.GetRuntime().AdvanceCurrentTime(idleSessionTimeout / 2);

{
using TRequest = TEvIndexTabletPrivate::TEvSyncSessionsRequest;

env.GetRuntime().Send(
new IEventHandle(
leaderActorId, // recipient
TActorId(), // sender
new TRequest(),
0, // flags
0),
0);
}

env.GetRuntime().DispatchEvents({}, TDuration::MilliSeconds(100));
// waiting for idle session expiration
// sending the event manually since registration observers which enable
// scheduling for actors are reset upon tablet reboot

env.GetRuntime().AdvanceCurrentTime(idleSessionTimeout / 2);
service.PingSession(headers);

for (const auto& actorId: fsActorIds) {
using TRequest = TEvIndexTabletPrivate::TEvCleanupSessionsRequest;

env.GetRuntime().Send(
new IEventHandle(
actorId, // recipient
TActorId(), // sender
new TRequest(),
0, // flags
0),
0);
}

// need to pass deadline instead of timeout here since otherwise the
// adjusted time gets added to the timeout
env.GetRuntime().DispatchEvents(
{},
TInstant::Now() + TDuration::MilliSeconds(100));

// shard sessions should exist

for (const auto& id: {fsId, shard1Id, shard2Id}) {
NProtoPrivate::TDescribeSessionsRequest request;
request.SetFileSystemId(id);

TString buf;
google::protobuf::util::MessageToJsonString(request, &buf);
auto jsonResponse = service.ExecuteAction("describesessions", buf);
NProtoPrivate::TDescribeSessionsResponse response;
UNIT_ASSERT(google::protobuf::util::JsonStringToMessage(
jsonResponse->Record.GetOutput(), &response).ok());

const auto& sessions = response.GetSessions();
UNIT_ASSERT_VALUES_EQUAL(1, sessions.size());

UNIT_ASSERT_VALUES_EQUAL(
headers.SessionId,
sessions[0].GetSessionId());
UNIT_ASSERT_VALUES_EQUAL(
headers.ClientId,
sessions[0].GetClientId());
}

// handles should be alive

service.WriteData(
headers1,
headers1.FileSystemId,
nodeId1,
handle1,
0,
TString(1_MB, 'a'));

service.WriteData(
headers2,
headers2.FileSystemId,
nodeId2,
handle2,
0,
TString(1_MB, 'a'));
}

Y_UNIT_TEST(ShouldCreateNodeInFollowerViaLeader)
{
NProto::TStorageConfig config;
Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/storage/tablet/subsessions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ NActors::TActorId TSubSessions::UpdateSubSession(
if (!readOnly) {
MaxSeenRwSeqNo = std::max(MaxSeenRwSeqNo, seqNo);
}
auto subsession = FindIf(
auto* subsession = FindIf(
SubSessions,
[&] (const auto& subsession) {
return subsession.SeqNo == seqNo;
Expand Down
Loading
Loading