Skip to content

Commit

Permalink
[stable-23-3] Merge to stable (#2064)
Browse files Browse the repository at this point in the history
* Ability to disable node broker registration in the DA (#2038)

* Ability to disable node broker registration in the DA

* Fix typo in comment

* Fix linkage

* Fix local disks test (#2039)

* Respond to HTTP requests while being in the deep idle state (#2045)

* Respond to HTTP requests while being in the deep idle state

* Review fixes

* Fix blockstore-vhost-server deadlock before execvp call (#2052)

* Fix vhost deadlock before execvp call

* Make sure no allocations is done

* Simlify the vhost server spawn (#2057)

* Deduce MediaKind from PoolKind while creating a disk from device (#2061)

* fix cmake build
  • Loading branch information
komarevtsev-d authored Sep 19, 2024
1 parent 8740632 commit 87d86ec
Show file tree
Hide file tree
Showing 27 changed files with 498 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cloud/blockstore/apps/disk_agent/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ PEERDIR(
library/cpp/getopt
)

IF (BUILD_TYPE != "PROFILE")
IF (BUILD_TYPE != "PROFILE" AND BUILD_TYPE != "DEBUG")
SPLIT_DWARF()
ENDIF()

Expand Down
9 changes: 9 additions & 0 deletions cloud/blockstore/config/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,15 @@ message TDiskAgentConfig

// Offload the parsing of all IO requests (by default offloads only write requests).
optional bool OffloadAllIORequestsParsingEnabled = 31;

// When enabled, the Disk Agents checks that devices were found either in
// the "CachedConfigPath", in the "FileDevices", or with the
// "StorageDiscoveryConfig". If none were found, the process falls in the
// deep idle state without even registering in the NodeBroker.
// WARNING: CMS configs can only be retrieved after registering in the
// NodeBroker. Enabling this option completely disables them if there were
// no devices found.
optional bool DisableNodeBrokerRegisterationOnDevicelessAgent = 32;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ target_link_libraries(blockstore-libs-disk_agent PUBLIC
core-libs-common
core-libs-daemon
core-libs-diagnostics
core-libs-http
core-libs-version
ydb-core-protos
udf-service-exception_policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ target_link_libraries(blockstore-libs-disk_agent PUBLIC
core-libs-common
core-libs-daemon
core-libs-diagnostics
core-libs-http
core-libs-version
ydb-core-protos
udf-service-exception_policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ target_link_libraries(blockstore-libs-disk_agent PUBLIC
core-libs-common
core-libs-daemon
core-libs-diagnostics
core-libs-http
core-libs-version
ydb-core-protos
udf-service-exception_policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ target_link_libraries(blockstore-libs-disk_agent PUBLIC
core-libs-common
core-libs-daemon
core-libs-diagnostics
core-libs-http
core-libs-version
ydb-core-protos
udf-service-exception_policy
Expand Down
79 changes: 77 additions & 2 deletions cloud/blockstore/libs/disk_agent/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <cloud/blockstore/libs/storage/core/config.h>
#include <cloud/blockstore/libs/storage/core/probes.h>
#include <cloud/blockstore/libs/storage/disk_agent/model/config.h>
#include <cloud/blockstore/libs/storage/disk_agent/model/device_generator.h>
#include <cloud/blockstore/libs/storage/disk_agent/model/device_scanner.h>
#include <cloud/blockstore/libs/storage/disk_agent/model/probes.h>
#include <cloud/blockstore/libs/storage/disk_registry_proxy/model/config.h>
#include <cloud/blockstore/libs/storage/init/disk_agent/actorsystem.h>
Expand Down Expand Up @@ -91,6 +93,33 @@ void ParseProtoTextFromFile(const TString& fileName, T& dst)
ParseFromTextFormat(in, dst);
}

bool AgentHasDevices(
TLog log,
const NStorage::TStorageConfigPtr& storageConfig,
const NStorage::TDiskAgentConfigPtr& agentConfig)
{
if (!agentConfig->GetFileDevices().empty()) {
return true;
}

const TString storagePath = storageConfig->GetCachedDiskAgentConfigPath();
const TString diskAgentPath = agentConfig->GetCachedConfigPath();
const TString& path = diskAgentPath.empty() ? storagePath : diskAgentPath;
auto cachedDevices = NStorage::LoadCachedConfig(path);
if (!cachedDevices.empty()) {
return true;
}

NStorage::TDeviceGenerator gen{std::move(log), agentConfig->GetAgentId()};
auto error =
FindDevices(agentConfig->GetStorageDiscoveryConfig(), std::ref(gen));
if (HasError(error)) {
return false;
}

return !gen.ExtractResult().empty();
}

////////////////////////////////////////////////////////////////////////////////

class TLoggingProxy final
Expand Down Expand Up @@ -194,6 +223,17 @@ void TBootstrap::ParseOptions(int argc, char** argv)
Configs = std::make_unique<TConfigInitializer>(std::move(options));
}

void TBootstrap::InitHTTPServer()
{
Y_DEBUG_ABORT_UNLESS(!Initialized);

StubMonPageServer = std::make_unique<NCloud::NStorage::TSimpleHttpServer>(
Configs->DiagnosticsConfig->GetNbsMonPort(),
"This node is not registered in the NodeBroker. See "
"\"DisableNodeBrokerRegisterationOnDevicelessAgent\" in the disk agent "
"config.");
}

void TBootstrap::Init()
{
BootstrapLogging = CreateLoggingService("console", TLogSettings{});
Expand All @@ -203,8 +243,12 @@ void TBootstrap::Init()
Timer = CreateWallClockTimer();
Scheduler = CreateScheduler();

InitKikimrService();
if (!InitKikimrService()) {
InitHTTPServer();
return;
}

Initialized = true;
STORAGE_INFO("Kikimr service initialized");

auto diagnosticsConfig = Configs->DiagnosticsConfig;
Expand Down Expand Up @@ -279,7 +323,7 @@ void TBootstrap::InitRdmaServer(NRdma::TRdmaConfig& config)
}
}

void TBootstrap::InitKikimrService()
bool TBootstrap::InitKikimrService()
{
Configs->InitKikimrConfig();
Configs->InitServerConfig();
Expand Down Expand Up @@ -326,6 +370,23 @@ void TBootstrap::InitKikimrService()

STORAGE_INFO("Configs initialized");

if (const auto& agentConfig = Configs->DiskAgentConfig;
agentConfig->GetDisableNodeBrokerRegisterationOnDevicelessAgent())
{
if (!agentConfig->GetEnabled()) {
STORAGE_INFO(
"Agent is disabled. Skipping the node broker registration.");
return false;
}

if (!AgentHasDevices(Log, Configs->StorageConfig, agentConfig)) {
STORAGE_INFO(
"Devices were not found. Skipping the node broker "
"registration.");
return false;
}
}

auto [nodeId, scopeId, cmsConfig] = RegisterDynamicNode(
Configs->KikimrConfig,
registerOpts,
Expand Down Expand Up @@ -448,6 +509,8 @@ void TBootstrap::InitKikimrService()
if (SpdkLogInitializer) {
SpdkLogInitializer(spdkLog);
}

return true;
}

void TBootstrap::InitLWTrace()
Expand Down Expand Up @@ -520,6 +583,12 @@ void TBootstrap::InitLWTrace()

void TBootstrap::Start()
{
if (!Initialized) {
if (StubMonPageServer) {
StubMonPageServer->Start();
}
return;
}
#define START_COMPONENT(c) \
if (c) { \
c->Start(); \
Expand Down Expand Up @@ -554,6 +623,12 @@ void TBootstrap::Start()

void TBootstrap::Stop()
{
if (!Initialized) {
if (StubMonPageServer) {
StubMonPageServer->Stop();
}
return;
}
#define STOP_COMPONENT(c) \
if (c) { \
c->Stop(); \
Expand Down
8 changes: 7 additions & 1 deletion cloud/blockstore/libs/disk_agent/bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <cloud/blockstore/libs/service_local/public.h>
#include <cloud/blockstore/libs/spdk/iface/public.h>
#include <cloud/blockstore/libs/storage/disk_agent/public.h>
#include <cloud/storage/core/libs/http/simple_http_server.h>

#include <ydb/core/driver_lib/run/factories.h>

Expand Down Expand Up @@ -74,6 +75,9 @@ class TBootstrap
TProgramShouldContinue ShouldContinue;
TVector<TString> PostponedCriticalEvents;

std::unique_ptr<NCloud::NStorage::TSimpleHttpServer> StubMonPageServer;
bool Initialized = false;

public:
TBootstrap(
std::shared_ptr<NKikimr::TModuleFactories> moduleFactories,
Expand All @@ -91,7 +95,9 @@ class TBootstrap
private:
void InitLWTrace();
void InitProfileLog();
void InitKikimrService();
bool InitKikimrService();

void InitHTTPServer();

void InitRdmaServer(NRdma::TRdmaConfig& config);
};
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/disk_agent/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ PEERDIR(
cloud/storage/core/libs/common
cloud/storage/core/libs/daemon
cloud/storage/core/libs/diagnostics
cloud/storage/core/libs/http
cloud/storage/core/libs/version

ydb/core/protos
Expand Down
91 changes: 41 additions & 50 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ struct TPipe
R.Close();

TFileHandle h {fd};
Y_SCOPE_EXIT(&h) { h.Release(); };

h.LinkTo(W);
h.Release();
}
};

Expand All @@ -281,42 +281,49 @@ TChild SpawnChild(
TPipe stdOut;
TPipe stdErr;

pid_t childPid = ::fork();
// Make allocations before the fork() call. In rare cases, the tcmalloc can
// deadlock on allocation because other threads have locked mutexes that
// would never be unlocked since the threads that placed the locks are not
// duplicated in the child.
TVector<char*> qargs;
qargs.reserve(args.size() + 2);

if (childPid == -1) {
int err = errno;
char buf[64] {};
ythrow TServiceError {MAKE_SYSTEM_ERROR(err)}
<< "fork error: " << ::strerror_r(err, buf, sizeof(buf));
}
pid_t childPid = ::fork();

if (childPid) {
// Parent process.
return TChild {childPid, std::move(stdOut.R), std::move(stdErr.R)};
}
// WARNING: Don't make heap allocations here!
{
if (childPid == -1) {
int err = errno;
char buf[64]{};
ythrow TServiceError{MAKE_SYSTEM_ERROR(err)}
<< "fork error: " << ::strerror_r(err, buf, sizeof(buf));
}

// child process
if (childPid) {
// Parent process.
return TChild{childPid, std::move(stdOut.R), std::move(stdErr.R)};
}

stdOut.LinkTo(STDOUT_FILENO);
stdErr.LinkTo(STDERR_FILENO);
// child process

// Last chance to figure out what's going on.
// freopen((TString("/tmp/out.") + ToString(::getpid())).c_str(), "w", stdout);
// freopen((TString("/tmp/err.") + ToString(::getpid())).c_str(), "w", stderr);
stdOut.LinkTo(STDOUT_FILENO);
stdErr.LinkTo(STDERR_FILENO);

// Following "const_cast"s are safe:
// http://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
// Last chance to figure out what's going on.
// freopen((TString("/tmp/out.") + ToString(::getpid())).c_str(), "w",
// stdout); freopen((TString("/tmp/err.") +
// ToString(::getpid())).c_str(), "w", stderr);

TVector<char*> qargs;
qargs.reserve(args.size() + 2);
// Following "const_cast"s are safe:
// http://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
qargs.push_back(const_cast<char*>(binaryPath.data()));
for (auto& arg: args) {
qargs.push_back(const_cast<char*>(arg.data()));
}
qargs.emplace_back();

qargs.push_back(const_cast<char*>(binaryPath.data()));
for (auto& arg: args) {
qargs.push_back(const_cast<char*>(arg.data()));
::execvp(binaryPath.c_str(), qargs.data());
}
qargs.emplace_back();

::execvp(binaryPath.c_str(), qargs.data());

int err = errno;
char buf[64] {};
Expand Down Expand Up @@ -599,27 +606,8 @@ class TEndpoint final

void Start() override
{
TCondVar processStarted;
// To avoid a race, we need to get the shared pointer in the calling
// thread and pass it to the background thread. This guaranteed that the
// background thread will deal with a live this.
auto workFunc = [&processStarted, self = shared_from_this()]()
{
// It is important to start the vhost-server on the thread that
// outlives it. vhost-server waits for the parent-death signal via
// PR_SET_PDEATHSIG which tracks the aliveness of the thread that
// spawned the process.
self->Process = self->StartProcess();
processStarted.Signal();
self->ThreadProc();
};

with_lock (Mutex) {
std::thread(std::move(workFunc)).detach();
// Infinite time wait is safe here, since we are in the coroutine
// thread.
processStarted.WaitI(Mutex);
}
Process = StartProcess();
std::thread(&TEndpoint::ThreadProc, shared_from_this()).detach();
}

TFuture<NProto::TError> Stop() override
Expand Down Expand Up @@ -672,7 +660,10 @@ class TEndpoint final
}
}

StopPromise.SetValue(error);
// We must call "SetValue()" on coroutine thread, since it will trigger
// future handlers synchronously.
Executor->ExecuteSimple([promise = this->StopPromise, error]() mutable
{ promise.SetValue(error); });
}

TIntrusivePtr<TEndpointProcess> StartProcess()
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/disk_agent/model/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace {
xxx(TemporaryAgent, bool, false )\
xxx(IOParserActorCount, ui32, 0 )\
xxx(OffloadAllIORequestsParsingEnabled, bool, false )\
xxx(DisableNodeBrokerRegisterationOnDevicelessAgent, bool, false )\
// BLOCKSTORE_AGENT_CONFIG

#define BLOCKSTORE_DECLARE_CONFIG(name, type, value) \
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/disk_agent/model/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class TDiskAgentConfig

ui32 GetIOParserActorCount() const;
bool GetOffloadAllIORequestsParsingEnabled() const;
bool GetDisableNodeBrokerRegisterationOnDevicelessAgent() const;

void Dump(IOutputStream& out) const;
void DumpHtml(IOutputStream& out) const;
Expand Down
Loading

0 comments on commit 87d86ec

Please sign in to comment.