Skip to content

Commit

Permalink
Merge to stable (#2179)
Browse files Browse the repository at this point in the history
* Fix flaky test (#2172)

* Add agent state to DA node id response (#2140)

* fix flaky test 2 (#2180)

* Fix TPendingCleanup initialization (#2198)

* Fix TPendingCleanup initialization

* Review fix
  • Loading branch information
komarevtsev-d authored Oct 8, 2024
1 parent b9351c7 commit afd77c1
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ void TDiskRegistryActor::HandleDeallocateDisk(

if (msg->Record.GetSync() && State->HasPendingCleanup(diskId)) {
LOG_INFO(ctx, TBlockStoreComponents::DISK_REGISTRY,
"[%lu] Postpone DeallocateDisk response",
TabletID());
"[%lu] Postpone DeallocateDisk response. DiskId=%s",
TabletID(), diskId.c_str());

AddPendingDeallocation(ctx, diskId, std::move(requestInfo));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ void TDiskRegistryActor::HandleGetAgentNodeId(
return;
}

const auto* info = AgentRegInfo.FindPtr(agent->GetAgentId());
const bool connected = info && info->Connected;

response->Record.SetNodeId(agent->GetNodeId());
response->Record.SetAgentState(agent->GetState());
response->Record.SetConnected(connected);
NCloud::Reply(ctx, *ev, std::move(response));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,9 @@ void TDiskRegistryState::ProcessDisksToCleanup(TVector<TString> disksToCleanup)
void TDiskRegistryState::ProcessDirtyDevices(TVector<TDirtyDevice> dirtyDevices)
{
for (auto&& [uuid, diskId]: dirtyDevices) {
PendingCleanup.Insert(std::move(diskId), std::move(uuid));
if (!diskId.empty()) {
PendingCleanup.Insert(diskId, std::move(uuid));
}
}
}

Expand Down Expand Up @@ -3753,6 +3755,7 @@ TVector<TDiskRegistryState::TDeviceId> TDiskRegistryState::TryUpdateDevices(
for (const auto& uuid: uuids) {
auto [agent, device] = FindDeviceLocation(uuid);
if (!agent || !device) {
STORAGE_WARN("Could not update device: %s", uuid.c_str());
continue;
}
ret.push_back(uuid);
Expand All @@ -3762,6 +3765,7 @@ TVector<TDiskRegistryState::TDeviceId> TDiskRegistryState::TryUpdateDevices(

for (const auto& agentId: agentsSet) {
auto* agent = AgentList.FindAgent(agentId);
Y_DEBUG_ABORT_UNLESS(agent);
if (!agent) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,71 @@ Y_UNIT_TEST_SUITE(TDiskRegistryStatePendingCleanupTest)
UNIT_ASSERT_VALUES_EQUAL("vol0", diskId);
});
}

Y_UNIT_TEST(ShouldEraseDiskCreatedFromSuspendedDevice)
{
TTestExecutor executor;
executor.WriteTx([&](TDiskRegistryDatabase db) { db.InitSchema(); });

const TVector agents{AgentConfig(
1,
{Device("dev-1", "uuid-1.1"),
Device("dev-2", "uuid-1.2"),
Device("dev-3", "uuid-1.3"),
Device("dev-4", "uuid-1.4")})};

TDiskRegistryState state =
TDiskRegistryStateBuilder()
.WithAgents(agents)
.WithSuspendedDevices({"uuid-1.1"})
.WithDirtyDevices({TDirtyDevice{"uuid-1.1", ""}})
.Build();

// Create a disk.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TDiskRegistryState::TAllocateDiskResult result;
NProto::TDeviceConfig device = state.GetDevice("uuid-1.1");
auto error = state.CreateDiskFromDevices(
TInstant::Zero(),
db,
/*force=*/true,
"vol0",
4_KB,
{device},
&result);

UNIT_ASSERT_VALUES_EQUAL_C(error.GetCode(), S_OK, error);
UNIT_ASSERT_VALUES_EQUAL(1, result.Devices.size());
UNIT_ASSERT_EQUAL(
device.GetDeviceUUID(),
result.Devices[0].GetDeviceUUID());
});

// Create pending deallocation with the disk.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
UNIT_ASSERT_SUCCESS(state.MarkDiskForCleanup(db, "vol0"));
auto error = state.DeallocateDisk(db, "vol0");
UNIT_ASSERT_VALUES_EQUAL_C(error.GetCode(), S_OK, error);
});

// Marking the device as clean removes it from PendingCleanup.
executor.WriteTx(
[&](TDiskRegistryDatabase db)
{
TVector<TDirtyDevice> dirtyDevices;
UNIT_ASSERT(db.ReadDirtyDevices(dirtyDevices));
UNIT_ASSERT_VALUES_EQUAL(1, dirtyDevices.size());
UNIT_ASSERT_VALUES_EQUAL("uuid-1.1", dirtyDevices[0].Id);

auto diskId =
state.MarkDeviceAsClean(Now(), db, dirtyDevices.back().Id);
UNIT_ASSERT_VALUES_EQUAL("vol0", diskId);
});
}
}

} // namespace NCloud::NBlockStore::NStorage
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/protos/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,12 @@ message TGetAgentNodeIdResponse

// Node that agent is running on.
uint32 NodeId = 2;

// Agent state.
EAgentState AgentState = 3;

// Whether the agent is connected to the DR or not.
bool Connected = 4;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ void TGetDiskAgentNodeIdActor::HandleGetAgentNodeIdResponse(
SetErrorProtoFlag(error, NCloud::NProto::EF_SILENT);
}

auto response =
std::make_unique<TEvService::TEvExecuteActionResponse>(std::move(error));
auto response = std::make_unique<TEvService::TEvExecuteActionResponse>(
std::move(error));

google::protobuf::util::JsonPrintOptions options;
options.always_print_primitive_fields = true;
google::protobuf::util::MessageToJsonString(
msg->Record,
response->Record.MutableOutput());
response->Record.MutableOutput(),
options);

ReplyAndDie(ctx, std::move(response));
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/tests/client/test_with_multiple_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ def test_wait_dependent_disks_to_switch_node_timeout():
agent_id = make_agent_id(0)
node_id_response = json.loads(client.get_disk_agent_node_id(agent_id))
assert node_id_response["NodeId"] > 50000
assert node_id_response["AgentState"] == "AGENT_STATE_ONLINE"
assert node_id_response["Connected"]

# This should return immediately.
wait_response = client.wait_dependent_disks_to_switch_node(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def wait_for_vhost_servers(nbs, expected_count):
try:
process_name = os.path.basename(process.exe())
process_parent = process.parent()
except psutil.AccessDenied:
except psutil.Error:
continue

if process_parent is None:
Expand Down Expand Up @@ -229,7 +229,7 @@ def wait_for_vhost_servers(nbs, expected_count):
try:
process_name = os.path.basename(process.exe())
process_parent = process.parent()
except psutil.AccessDenied:
except psutil.Error:
continue

if process_parent is None:
Expand Down

0 comments on commit afd77c1

Please sign in to comment.