Skip to content

Commit

Permalink
grpc fix
Browse files Browse the repository at this point in the history
  • Loading branch information
proller committed Sep 26, 2024
1 parent ff0574b commit 823e1b7
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 48 deletions.
139 changes: 92 additions & 47 deletions cloud/filestore/tools/testing/loadtest/lib/request_replay_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class TReplayRequestGeneratorGRPC final

TNodeLocal NodeIdMapped(const TNodeLog id)
{
TGuard<TMutex> guard(StateLock);

if (const auto it = NodesLogToLocal.find(id);
it != NodesLogToLocal.end())
{
Expand All @@ -153,6 +155,8 @@ class TReplayRequestGeneratorGRPC final

THandleLocal HandleIdMapped(const THandleLog id)
{
TGuard<TMutex> guard(StateLock);

if (const auto it = HandlesLogToActual.find(id);
it != HandlesLogToActual.end())
{
Expand Down Expand Up @@ -323,9 +327,6 @@ class TReplayRequestGeneratorGRPC final
// node_name=ini, flags=14, mode=436, node_id=66,
// handle=11024287581389312, size=0}

TGuard<TMutex> guard(StateLock);
auto started = TInstant::Now();

auto request = CreateRequest<NProto::TCreateHandleRequest>();
auto name = logRequest.GetNodeInfo().GetNodeName();

Expand All @@ -339,25 +340,39 @@ class TReplayRequestGeneratorGRPC final
request->SetFlags(logRequest.GetNodeInfo().GetFlags());
request->SetMode(logRequest.GetNodeInfo().GetMode());

STORAGE_DEBUG(
"open " << " handle=" << logRequest.GetNodeInfo().GetHandle()
<< " flags=" << logRequest.GetNodeInfo().GetFlags() << " "
<< HandleFlagsToString(logRequest.GetNodeInfo().GetFlags())
<< " mode=" << logRequest.GetNodeInfo().GetMode()
<< " node=" << node << " <- "
<< logRequest.GetNodeInfo().GetNodeId());

auto self = weak_from_this();
return Session->CreateHandle(CreateCallContext(), std::move(request))
.Apply(
[=, name = std::move(name)](
const TFuture<NProto::TCreateHandleResponse>& future)
{
if (auto ptr = self.lock()) {
return ptr->HandleCreateHandle(
future,
name,
const auto future =
Session->CreateHandle(CreateCallContext(), std::move(request))
.Apply(
[=, started = Started, name = std::move(name)](
const TFuture<NProto::TCreateHandleResponse>& future)
{
if (auto ptr = self.lock()) {
return ptr->HandleCreateHandle(
future,
name,
started,
logRequest);
}

return MakeFuture(TCompletedRequest{
NProto::ACTION_CREATE_HANDLE,
started,
logRequest);
}

return MakeFuture(TCompletedRequest{
NProto::ACTION_CREATE_HANDLE,
started,
MakeError(E_FAIL, "cancelled")});
});
MakeError(E_FAIL, "cancelled")});
});
const auto& response = future.GetValueSync();
return MakeFuture(TCompletedRequest{
NProto::ACTION_CREATE_HANDLE,
Started,
response.Error});
}

TFuture<TCompletedRequest> HandleCreateHandle(
Expand Down Expand Up @@ -518,19 +533,36 @@ class TReplayRequestGeneratorGRPC final
started,
MakeError(E_PRECONDITION_FAILED, "disabled")});
}

TGuard<TMutex> guard(StateLock);

auto request = CreateRequest<NProto::TWriteDataRequest>();
const auto handle = HandleIdMapped(logRequest.GetRanges(0).GetHandle());
const auto logHandle = logRequest.GetRanges(0).GetHandle();
const auto handle = HandleIdMapped(logHandle);

if (!handle) {
return MakeFuture(TCompletedRequest{});
}
request->SetHandle(handle);

request->SetOffset(logRequest.GetRanges(0).GetOffset());
const auto offset = logRequest.GetRanges(0).GetOffset();
request->SetOffset(offset);
auto bytes = logRequest.GetRanges(0).GetBytes();
auto buffer = NUnitTest::RandomString(bytes);

TString buffer;

if (Spec.GetWriteRandom()) {
buffer = NUnitTest::RandomString(bytes, logHandle);
} else if (Spec.GetWriteEmpty()) {
buffer = TString{bytes, ' '};
} else {
buffer = MakeBuffer(
bytes,
offset,
TStringBuilder{} << "handle=" << logHandle << " node="
<< logRequest.GetNodeInfo().GetNodeId()
<< " bytes=" << bytes << " offset=" << offset);
}

*request->MutableBuffer() = std::move(buffer);

Expand Down Expand Up @@ -600,8 +632,6 @@ class TReplayRequestGeneratorGRPC final
MakeError(E_PRECONDITION_FAILED, "disabled")});
}

TGuard<TMutex> guard(StateLock);

auto request = CreateRequest<NProto::TCreateNodeRequest>();

const auto parentNode =
Expand Down Expand Up @@ -648,24 +678,30 @@ class TReplayRequestGeneratorGRPC final
}
Cerr << "createnoderec" << *request << "\n";
auto self = weak_from_this();
return Session->CreateNode(CreateCallContext(), std::move(request))
.Apply(
[=, started = Started](
const TFuture<NProto::TCreateNodeResponse>& future)
{
if (auto ptr = self.lock()) {
return ptr->HandleCreateNode(
future,
name,
const auto future =
Session->CreateNode(CreateCallContext(), std::move(request))
.Apply(
[=, started = Started](
const TFuture<NProto::TCreateNodeResponse>& future)
{
if (auto ptr = self.lock()) {
return ptr->HandleCreateNode(
future,
name,
started,
logRequest);
}

return TCompletedRequest{
NProto::ACTION_CREATE_NODE,
started,
logRequest);
}

return TCompletedRequest{
NProto::ACTION_CREATE_NODE,
started,
MakeError(E_CANCELLED, "cancelled")};
});
MakeError(E_CANCELLED, "cancelled")};
});
const auto& response = future.GetValueSync();
return MakeFuture(TCompletedRequest{
NProto::ACTION_CREATE_NODE,
Started,
response.Error});
}

TCompletedRequest HandleCreateNode(
Expand All @@ -674,9 +710,8 @@ class TReplayRequestGeneratorGRPC final
TInstant started,
const NCloud::NFileStore::NProto::TProfileLogRequestInfo& logRequest)
{
TGuard<TMutex> guard(StateLock);

try {
TGuard<TMutex> guard(StateLock);
auto response = future.GetValue();
CheckResponse(response);
// Nodes[name] = TNode{name, response.GetNode()};
Expand Down Expand Up @@ -887,7 +922,6 @@ class TReplayRequestGeneratorGRPC final
TGuard<TMutex> guard(StateLock);

// TODO: by parent + name //
// {"TimestampMcs":1726503153650998,"DurationMcs":7163,"RequestType":35,"ErrorCode":2147942422,"NodeInfo":{"NodeName":"security.capability","NewNodeName":"","NodeId":5,"Size":0}}
// {"TimestampMcs":1726615533406265,"DurationMcs":192,"RequestType":33,"ErrorCode":2147942402,"NodeInfo":{"ParentNodeId":17033,"NodeName":"CPackSourceConfig.cmake","Flags":0,"Mode":0,"NodeId":0,"Handle":0,"Size":0}}
// {"TimestampMcs":240399000,"DurationMcs":163,"RequestType":33,"NodeInfo":{"ParentNodeId":3,"NodeName":"branches","Flags":0,"Mode":0,"NodeId":0,"Handle":0,"Size":0}}
// nfs GetNodeAttr 0.006847s S_OK {parent_node_id=1,
Expand All @@ -897,14 +931,25 @@ class TReplayRequestGeneratorGRPC final
const auto node =
NodeIdMapped(logRequest.GetNodeInfo().GetParentNodeId());
if (!node) {
return MakeFuture(TCompletedRequest{});
return MakeFuture(TCompletedRequest{
NProto::ACTION_GET_NODE_ATTR,
started,
MakeError(
E_NOT_FOUND,
TStringBuilder{}
<< "Node missing in mapping "
<< logRequest.GetNodeInfo().GetParentNodeId())});
}
request->SetNodeId(node);
auto name = logRequest.GetNodeInfo().GetNodeName();
request->SetName(logRequest.GetNodeInfo().GetNodeName());
request->SetFlags(logRequest.GetNodeInfo().GetFlags());
auto self = weak_from_this();
STORAGE_DEBUG("GetNodeAttr client started");
STORAGE_DEBUG(
"GetNodeAttr client started name="
<< name << " node=" << node << " <- "
<< logRequest.GetNodeInfo().GetParentNodeId() // why only parent?
<< " request=" << *request);
return Session->GetNodeAttr(CreateCallContext(), std::move(request))
.Apply(
[=, name = std::move(name)](
Expand Down
11 changes: 10 additions & 1 deletion cloud/filestore/tools/testing/loadtest/lib/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,16 @@ class TLoadTest final
}
});
if (RequestGenerator->InstantProcessQueue()) {
if (future.HasValue()) {
if (future.HasException()) {
try {
future.TryRethrow();
} catch (const std::exception& ex) {
DUMP(ex.what());
}
--CurrentIoDepth;
}

if (future.HasValue() || future.HasException()) {
ProcessCompletedRequests();
}
}
Expand Down

0 comments on commit 823e1b7

Please sign in to comment.