Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge to stable-23-3 #697

Merged
merged 6 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cloud/blockstore/libs/disk_agent/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,8 @@ void TBootstrap::Start()
START_COMPONENT(TraceProcessor);
START_COMPONENT(Spdk);
START_COMPONENT(RdmaServer);
START_COMPONENT(ActorSystem);
START_COMPONENT(FileIOService);
START_COMPONENT(ActorSystem);

// we need to start scheduler after all other components for 2 reasons:
// 1) any component can schedule a task that uses a dependency that hasn't
Expand Down Expand Up @@ -553,8 +553,11 @@ void TBootstrap::Stop()
// scheduled tasks and shutting down of component dependencies
STOP_COMPONENT(Scheduler);

STOP_COMPONENT(FileIOService);
STOP_COMPONENT(ActorSystem);
// stop FileIOService after ActorSystem to ensure that there are no
// in-flight I/O requests from TDiskAgentActor
STOP_COMPONENT(FileIOService);

STOP_COMPONENT(Spdk);
STOP_COMPONENT(RdmaServer);
STOP_COMPONENT(TraceProcessor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include "session_cache_actor.h"

#include <cloud/blockstore/libs/diagnostics/critical_events.h>
#include <cloud/blockstore/libs/kikimr/components.h>

#include <cloud/storage/core/libs/actors/helpers.h>
#include <cloud/storage/core/libs/common/error.h>

#include <contrib/ydb/library/actors/core/actor_bootstrapped.h>
#include <contrib/ydb/library/actors/core/events.h>
#include <contrib/ydb/library/actors/core/log.h>

#include <library/cpp/protobuf/util/pb_io.h>

#include <util/system/fs.h>

using namespace NActors;

namespace NCloud::NBlockStore::NStorage::NDiskAgent {

namespace {

////////////////////////////////////////////////////////////////////////////////

class TSessionCacheActor
: public TActorBootstrapped<TSessionCacheActor>
{
private:
const TString CachePath;

TVector<NProto::TDiskAgentDeviceSession> Sessions;
TRequestInfoPtr RequestInfo;
NActors::IEventBasePtr Response;

public:
TSessionCacheActor(
TVector<NProto::TDiskAgentDeviceSession> sessions,
TString cachePath,
TRequestInfoPtr requestInfo,
NActors::IEventBasePtr response)
: CachePath {std::move(cachePath)}
, Sessions {std::move(sessions)}
, RequestInfo {std::move(requestInfo)}
, Response {std::move(response)}
{
ActivityType = TBlockStoreComponents::DISK_AGENT_WORKER;
}

void Bootstrap(const TActorContext& ctx);
};

////////////////////////////////////////////////////////////////////////////////

void TSessionCacheActor::Bootstrap(const TActorContext& ctx)
{
try {
NProto::TDiskAgentDeviceSessionCache proto;
proto.MutableSessions()->Assign(
std::make_move_iterator(Sessions.begin()),
std::make_move_iterator(Sessions.end())
);

const TString tmpPath {CachePath + ".tmp"};

SerializeToTextFormat(proto, tmpPath);

if (!NFs::Rename(tmpPath, CachePath)) {
char buf[64] = {};

const auto ec = errno;
ythrow TServiceError {MAKE_SYSTEM_ERROR(ec)}
<< strerror_r(ec, buf, sizeof(buf));
}
} catch (...) {
LOG_ERROR_S(
ctx,
ActivityType,
"Can't update session cache: " << CurrentExceptionMessage());

ReportDiskAgentSessionCacheUpdateError();
}

NCloud::Reply(
ctx,
*RequestInfo,
std::move(Response));

Die(ctx);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////

std::unique_ptr<IActor> CreateSessionCacheActor(
TVector<NProto::TDiskAgentDeviceSession> sessions,
TString cachePath,
TRequestInfoPtr requestInfo,
NActors::IEventBasePtr response)
{
return std::make_unique<TSessionCacheActor>(
std::move(sessions),
std::move(cachePath),
std::move(requestInfo),
std::move(response));
}

} // namespace NCloud::NBlockStore::NStorage::NDiskAgent
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <cloud/blockstore/config/disk.pb.h>

#include <cloud/blockstore/libs/storage/core/request_info.h>

#include <contrib/ydb/library/actors/core/actor.h>

#include <memory>

namespace NCloud::NBlockStore::NStorage::NDiskAgent {

////////////////////////////////////////////////////////////////////////////////

std::unique_ptr<NActors::IActor> CreateSessionCacheActor(
TVector<NProto::TDiskAgentDeviceSession> sessions,
TString cachePath,
TRequestInfoPtr requestInfo,
NActors::IEventBasePtr response);

} // namespace NCloud::NBlockStore::NStorage::NDiskAgent
20 changes: 20 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/actors/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
LIBRARY()

SRCS(
session_cache_actor.cpp
)

PEERDIR(
cloud/blockstore/libs/storage/disk_agent/model

cloud/storage/core/libs/actors
cloud/storage/core/protos

contrib/ydb/library/actors/core
contrib/ydb/core/protos
)

END()

RECURSE_FOR_TESTS(
)
21 changes: 21 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "disk_agent_actor.h"

#include "actors/session_cache_actor.h"

#include <cloud/blockstore/libs/nvme/nvme.h>
#include <cloud/blockstore/libs/service/storage_provider.h>
#include <cloud/storage/core/libs/diagnostics/monitoring.h>
Expand Down Expand Up @@ -110,6 +112,25 @@ void TDiskAgentActor::UpdateActorStats()
}
}

void TDiskAgentActor::UpdateSessionCacheAndRespond(
const NActors::TActorContext& ctx,
TRequestInfoPtr requestInfo,
NActors::IEventBasePtr response)
{
LOG_INFO(ctx, TBlockStoreComponents::DISK_AGENT, "Update the session cache");

auto actor = NDiskAgent::CreateSessionCacheActor(
State->GetSessions(),
AgentConfig->GetCachedSessionsPath(),
std::move(requestInfo),
std::move(response));

ctx.Register(
actor.release(),
TMailboxType::HTSwap,
NKikimr::AppData()->IOPoolId);
}

////////////////////////////////////////////////////////////////////////////////

void TDiskAgentActor::HandlePoisonPill(
Expand Down
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <cloud/blockstore/libs/storage/api/disk_registry_proxy.h>
#include <cloud/blockstore/libs/storage/core/config.h>
#include <cloud/blockstore/libs/storage/core/pending_request.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/disk_agent/model/config.h>
#include <cloud/blockstore/libs/storage/disk_agent/recent_blocks_tracker.h>

Expand Down Expand Up @@ -132,6 +133,11 @@ class TDiskAgentActor final

TRecentBlocksTracker& GetRecentBlocksTracker(const TString& deviceUUID);

void UpdateSessionCacheAndRespond(
const NActors::TActorContext& ctx,
TRequestInfoPtr requestInfo,
NActors::IEventBasePtr response);

private:
STFUNC(StateInit);
STFUNC(StateWork);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <cloud/blockstore/libs/storage/core/request_info.h>

#include <contrib/ydb/core/base/appdata.h>

#include <util/string/join.h>

namespace NCloud::NBlockStore::NStorage {
Expand Down Expand Up @@ -56,7 +58,7 @@ void TDiskAgentActor::HandleAcquireDevices(
<< ", uuids=" << JoinSeq(",", uuids)
);

State->AcquireDevices(
const bool updated = State->AcquireDevices(
uuids,
clientId,
ctx.Now(),
Expand All @@ -66,6 +68,17 @@ void TDiskAgentActor::HandleAcquireDevices(
record.GetVolumeGeneration());

if (!Spdk || !record.HasRateLimits()) {
// If something has changed in sessions we should update the session
// cache (if it was configured). To do this, we spawn a special actor
// that updates the session cache and responds to the acquire request.
if (updated && AgentConfig->GetCachedSessionsPath()) {
UpdateSessionCacheAndRespond(
ctx,
std::move(requestInfo),
std::make_unique<TEvDiskAgent::TEvAcquireDevicesResponse>());
return;
sharpeye marked this conversation as resolved.
Show resolved Hide resolved
}

reply(NProto::TError());
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "disk_agent_actor.h"

#include <contrib/ydb/core/base/appdata.h>

namespace NCloud::NBlockStore::NStorage {

using namespace NActors;
Expand All @@ -25,6 +27,20 @@ void TDiskAgentActor::HandleReleaseDevices(
record.GetHeaders().GetClientId(),
record.GetDiskId(),
record.GetVolumeGeneration());

// We should update the session cache (if it was configured) with every
// release request.
if (AgentConfig->GetCachedSessionsPath()) {
UpdateSessionCacheAndRespond(
ctx,
CreateRequestInfo(
ev->Sender,
ev->Cookie,
ev->Get()->CallContext),
std::move(response));

return;
}
} catch (const TServiceError& e) {
*response->Record.MutableError() = MakeError(e.GetCode(), e.what());
}
Expand Down
Loading
Loading