Skip to content

Commit

Permalink
NBS-4665, NBS-4262: make decision on saving checkpoint request before…
Browse files Browse the repository at this point in the history
… executing transactionl; small fixes
  • Loading branch information
gy2411 committed Jan 30, 2024
1 parent 17b03a1 commit 0464a93
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 118 deletions.
13 changes: 9 additions & 4 deletions cloud/blockstore/libs/storage/volume/model/checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,20 +200,25 @@ class TPostponedCheckpointRequestsQueue
Requests[checkpointId].push(std::move(request));
}

std::optional<TRequest> TakePostponedRequest(TString checkpointId)
std::optional<TRequest> GetPostponedRequest(const TString& checkpointId) const
{
auto queue = Requests.FindPtr(checkpointId);
if (!queue) {
return std::nullopt;
}

auto request = std::move(queue->front());
return queue->front();
}

void RemovePostponedRequest(const TString& checkpointId)
{
auto queue = Requests.FindPtr(checkpointId);
Y_DEBUG_ABORT_UNLESS(queue);

queue->pop();
if (queue->empty()) {
Requests.erase(checkpointId);
}

return request;
}
};

Expand Down
10 changes: 7 additions & 3 deletions cloud/blockstore/libs/storage/volume/model/checkpoint_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,20 +717,24 @@ Y_UNIT_TEST_SUITE(TCheckpointStore)
UNIT_ASSERT_VALUES_EQUAL(true, postponedRequests.HasPostponedRequest("ch_2"));
UNIT_ASSERT_VALUES_EQUAL(false, postponedRequests.HasPostponedRequest("ch_0"));

auto request = postponedRequests.TakePostponedRequest("ch_1");
auto request = postponedRequests.GetPostponedRequest("ch_1");
UNIT_ASSERT_VALUES_EQUAL(true, request.has_value());
UNIT_ASSERT_VALUES_EQUAL(1, request->RequestId);

postponedRequests.RemovePostponedRequest("ch_1");
UNIT_ASSERT_VALUES_EQUAL(true, postponedRequests.HasPostponedRequest("ch_1"));

request = postponedRequests.TakePostponedRequest("ch_1");
request = postponedRequests.GetPostponedRequest("ch_1");
UNIT_ASSERT_VALUES_EQUAL(true, request.has_value());
UNIT_ASSERT_VALUES_EQUAL(2, request->RequestId);

postponedRequests.RemovePostponedRequest("ch_1");
UNIT_ASSERT_VALUES_EQUAL(false, postponedRequests.HasPostponedRequest("ch_1"));

postponedRequests.AddPostponedRequest("ch_1", {4, 4});
UNIT_ASSERT_VALUES_EQUAL(true, postponedRequests.HasPostponedRequest("ch_1"));

request = postponedRequests.TakePostponedRequest("ch_0");
request = postponedRequests.GetPostponedRequest("ch_0");
UNIT_ASSERT_VALUES_EQUAL(false, request.has_value());
}
}
Expand Down
17 changes: 5 additions & 12 deletions cloud/blockstore/libs/storage/volume/volume_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,25 +648,18 @@ class TVolumeActor final
const TCheckpointRequest& request,
const TCheckpointRequestInfo& info);

void ProcessPostponedCheckpointRequests(
const NActors::TActorContext& ctx,
const TString& checkpointId);

void TrySaveCheckpointRequest(
void SaveCheckpointRequest(
const NActors::TActorContext& ctx,
const TCheckpointRequest& request,
const TCheckpointRequestInfo& info);

void ReplyToCheckpointRequestWithoutSaving(
void ProcessPostponedCheckpointRequests(
const NActors::TActorContext& ctx,
const TCheckpointRequest& request,
const TCheckpointRequestInfo& info,
const NProto::TError& error);
const TString& checkpointId);

ECheckpointRequestValidityStatus ValidateCheckpointRequest(
std::optional<NProto::TError> ValidateCheckpointRequest(
const NActors::TActorContext& ctx,
const TCheckpointRequest& request,
NProto::TError* error);
const TCheckpointRequest& request) const;

void ProcessNextCheckpointRequest(const NActors::TActorContext& ctx);

Expand Down
174 changes: 84 additions & 90 deletions cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ void TVolumeActor::AddCheckpointRequest(

if (PostponedCheckpointRequests.HasPostponedRequest(request.CheckpointId)) {
LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME,
"[%lu] CheckpointRequest %lu %s is postopned, it can not be saved because "
"[%lu] CheckpointRequest %lu %s is postponed, it can not be saved because "
"there is already saved request for the same checkpoint id",
TabletID(),
request.RequestId,
Expand All @@ -971,47 +971,50 @@ void TVolumeActor::AddCheckpointRequest(
return;
}

// Saves checkpoint request if there is no another saved request with the same checkpoint id.
TrySaveCheckpointRequest(ctx, request, info);
if (!State->GetCheckpointStore().GetCheckpoitsWithSavedRequest().contains(
request.CheckpointId)) {
SaveCheckpointRequest(ctx, request, info);
return;
}

LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME,
"[%lu] CheckpointRequest %lu %s is postponed, it can not be saved because "
"there is another saved request for the same checkpoint id",
TabletID(),
request.RequestId,
request.CheckpointId.Quote().c_str());
PostponedCheckpointRequests.AddPostponedRequest(
request.CheckpointId,
{request.RequestId,
info});
}

bool TVolumeActor::TryReplyToCheckpointRequestWithoutSaving(
const TActorContext& ctx,
const TCheckpointRequest& request,
const TCheckpointRequestInfo& info)
{
NProto::TError error;
ECheckpointRequestValidityStatus validityStatus =
ValidateCheckpointRequest(ctx, request, &error);
std::optional<NProto::TError> error =
ValidateCheckpointRequest(ctx, request);

if (validityStatus == ECheckpointRequestValidityStatus::Ok) {
if (!error) {
return false;
}

ReplyToCheckpointRequestWithoutSaving(ctx, request, info, error);
return true;
}

void TVolumeActor::ReplyToCheckpointRequestWithoutSaving(
const TActorContext& ctx,
const TCheckpointRequest& request,
const TCheckpointRequestInfo& info,
const NProto::TError& error)
{
NActors::IEventBasePtr response;

switch (request.ReqType) {
case ECheckpointRequestType::Create:
case ECheckpointRequestType::CreateWithoutData: {
response = std::make_unique<TCreateCheckpointMethod::TResponse>(error);
response = std::make_unique<TCreateCheckpointMethod::TResponse>(*error);
break;
}
case ECheckpointRequestType::Delete: {
response = std::make_unique<TDeleteCheckpointMethod::TResponse>(error);
response = std::make_unique<TDeleteCheckpointMethod::TResponse>(*error);
break;
}
case ECheckpointRequestType::DeleteData: {
response = std::make_unique<TDeleteCheckpointDataMethod::TResponse>(error);
response = std::make_unique<TDeleteCheckpointDataMethod::TResponse>(*error);
break;
}
}
Expand All @@ -1020,21 +1023,8 @@ void TVolumeActor::ReplyToCheckpointRequestWithoutSaving(
ctx,
*info.RequestInfo,
std::move(response));
}

void TVolumeActor::TrySaveCheckpointRequest(
const TActorContext& ctx,
const TCheckpointRequest& request,
const TCheckpointRequestInfo& info)
{
AddTransaction(*info.RequestInfo);

ExecuteTx<TSaveCheckpointRequest>(
ctx,
info.RequestInfo,
request.RequestId,
info.IsTraced,
info.TraceTs);
return true;
}

void TVolumeActor::ProcessPostponedCheckpointRequests(
Expand All @@ -1043,7 +1033,7 @@ void TVolumeActor::ProcessPostponedCheckpointRequests(
{
while (true) {
auto ponstponedRequestInfo =
PostponedCheckpointRequests.TakePostponedRequest(checkpointId);
PostponedCheckpointRequests.GetPostponedRequest(checkpointId);
if (!ponstponedRequestInfo) {
break;
}
Expand All @@ -1052,18 +1042,48 @@ void TVolumeActor::ProcessPostponedCheckpointRequests(
ponstponedRequestInfo->RequestId);

if (!TryReplyToCheckpointRequestWithoutSaving(ctx, request, ponstponedRequestInfo->Info)) {
TrySaveCheckpointRequest(ctx, request, ponstponedRequestInfo->Info);
if (!State->GetCheckpointStore().GetCheckpoitsWithSavedRequest().contains(
request.CheckpointId)) {
PostponedCheckpointRequests.RemovePostponedRequest(checkpointId);
SaveCheckpointRequest(ctx, request, ponstponedRequestInfo->Info);
}

LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME,
"[%lu] CheckpointRequest %lu %s remains postponed, it can not be saved because "
"there is another saved request for the same checkpoint id",
TabletID(),
request.RequestId,
request.CheckpointId.Quote().c_str());
return;
}

PostponedCheckpointRequests.RemovePostponedRequest(checkpointId);
}

ProcessNextCheckpointRequest(ctx);
}

ECheckpointRequestValidityStatus TVolumeActor::ValidateCheckpointRequest(
void TVolumeActor::SaveCheckpointRequest(
const TActorContext& ctx,
const TCheckpointRequest& request,
NProto::TError* error)
const TCheckpointRequestInfo& info)
{
State->GetCheckpointStore().GetCheckpoitsWithSavedRequest().insert(
request.CheckpointId);

AddTransaction(*info.RequestInfo);

ExecuteTx<TSaveCheckpointRequest>(
ctx,
info.RequestInfo,
request.RequestId,
info.IsTraced,
info.TraceTs);
}

std::optional<NProto::TError> TVolumeActor::ValidateCheckpointRequest(
const TActorContext& ctx,
const TCheckpointRequest& request) const
{
TString message;
ECheckpointRequestValidityStatus validityStatus =
Expand All @@ -1074,7 +1094,7 @@ ECheckpointRequestValidityStatus TVolumeActor::ValidateCheckpointRequest(
&message);

if (validityStatus == ECheckpointRequestValidityStatus::Ok) {
return validityStatus;
return std::nullopt;
}

auto messageWithRequest = TStringBuilder()
Expand All @@ -1083,32 +1103,31 @@ ECheckpointRequestValidityStatus TVolumeActor::ValidateCheckpointRequest(
<< ", checkpointId = " << request.CheckpointId
<< ", checkpointType = " << ToString(request.Type);

bool succeeded = validityStatus == ECheckpointRequestValidityStatus::Already;

if (succeeded) {
TString message = TStringBuilder()
if (validityStatus == ECheckpointRequestValidityStatus::Already) {
TString messageTotal = TStringBuilder()
<< "Checkpoint request is duplicate, nothing to do: "
<< messageWithRequest.c_str();
LOG_INFO(ctx, TBlockStoreComponents::VOLUME,
"[%lu] %s: %s",
TabletID(),
TCreateCheckpointMethod::Name,
message.c_str());
} else {
TString message = TStringBuilder()
<< "Checkpoint request is invalid: %s: "
<< messageWithRequest.c_str();
LOG_ERROR(ctx, TBlockStoreComponents::VOLUME,
"[%lu] %s: %s",
TabletID(),
TCreateCheckpointMethod::Name,
messageWithRequest.c_str());
ui32 flags = 0;
SetProtoFlag(flags, NProto::EF_SILENT);
*error = MakeError(E_PRECONDITION_FAILED, message, flags);
messageTotal.c_str());

return MakeError(S_ALREADY, message);
}

return validityStatus;
TString messageTotal = TStringBuilder()
<< "Checkpoint request is invalid: %s: "
<< messageWithRequest.c_str();
LOG_ERROR(ctx, TBlockStoreComponents::VOLUME,
"[%lu] %s: %s",
TabletID(),
TCreateCheckpointMethod::Name,
messageTotal.c_str());

ui32 flags = 0;
SetProtoFlag(flags, NProto::EF_SILENT);
return MakeError(E_PRECONDITION_FAILED, message, flags);
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1302,15 +1321,6 @@ void TVolumeActor::ExecuteSaveCheckpointRequest(
Y_DEBUG_ABORT_UNLESS(
checkpointRequestCopy.State == ECheckpointRequestState::Received);

auto& checkpointsWithSavedRequest =
State->GetCheckpointStore().GetCheckpoitsWithSavedRequest();
args.CanSave =
!checkpointsWithSavedRequest.contains(checkpointRequestCopy.CheckpointId);
if (!args.CanSave) {
return;
}
checkpointsWithSavedRequest.insert(checkpointRequestCopy.CheckpointId);

TVolumeDatabase db(tx.DB);
checkpointRequestCopy.State = ECheckpointRequestState::Saved;
db.WriteCheckpointRequest(checkpointRequestCopy);
Expand All @@ -1323,32 +1333,16 @@ void TVolumeActor::CompleteSaveCheckpointRequest(
const auto& checkpointRequest =
State->GetCheckpointStore().GetRequestById(args.RequestId);

if (!args.CanSave) {
LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME,
"[%lu] CheckpointRequest %lu %s is postopned, it can not be saved because "
"there is another saved request for the same checkpoint id",
TabletID(),
checkpointRequest.RequestId,
checkpointRequest.CheckpointId.Quote().c_str());

PostponedCheckpointRequests.AddPostponedRequest(
checkpointRequest.CheckpointId,
{args.RequestId,
{args.RequestInfo,
args.IsTraced,
args.TraceTs}});
} else {
LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME,
"[%lu] CheckpointRequest %lu %s saved",
TabletID(),
checkpointRequest.RequestId,
checkpointRequest.CheckpointId.Quote().c_str());
LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME,
"[%lu] CheckpointRequest %lu %s saved",
TabletID(),
checkpointRequest.RequestId,
checkpointRequest.CheckpointId.Quote().c_str());

State->GetCheckpointStore().SetCheckpointRequestSaved(args.RequestId);
State->GetCheckpointStore().SetCheckpointRequestSaved(args.RequestId);

CheckpointRequests.insert(
{args.RequestId, {args.RequestInfo, args.IsTraced, args.TraceTs}});
}
CheckpointRequests.insert(
{args.RequestId, {args.RequestInfo, args.IsTraced, args.TraceTs}});

RemoveTransaction(*args.RequestInfo);
ProcessNextCheckpointRequest(ctx);
Expand Down
2 changes: 0 additions & 2 deletions cloud/blockstore/libs/storage/volume/volume_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,6 @@ struct TTxVolume
const bool IsTraced;
const ui64 TraceTs;

bool CanSave = false;

TSaveCheckpointRequest(
TRequestInfoPtr requestInfo,
ui64 requestId,
Expand Down
Loading

0 comments on commit 0464a93

Please sign in to comment.