Skip to content

Commit

Permalink
Spawn the vhost-server from special thread (#2014)
Browse files Browse the repository at this point in the history
* Spawn the vhost-server from special thread

* Make vhost death time easier

* Make the code more coroutiny
  • Loading branch information
komarevtsev-d committed Sep 16, 2024
1 parent be3f05e commit c10cd45
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 14 deletions.
27 changes: 20 additions & 7 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <util/stream/file.h>
#include <util/string/builder.h>
#include <util/string/cast.h>
#include <util/system/condvar.h>
#include <util/system/file.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
Expand Down Expand Up @@ -180,14 +181,14 @@ struct TChild
{}

TChild(const TChild&) = delete;
TChild& operator = (const TChild&) = delete;
TChild& operator=(const TChild&) = delete;

TChild(TChild&& rhs) noexcept
{
Swap(rhs);
}

TChild& operator = (TChild&& rhs) noexcept
TChild& operator=(TChild&& rhs) noexcept
{
Swap(rhs);

Expand Down Expand Up @@ -508,8 +509,10 @@ class TEndpointProcess final
long long logPriority = TLOG_INFO;
logRec["priority"].GetInteger(&logPriority);

STORAGE_LOG(logPriority, "[" << ClientId << "] "
<< logRec["message"].GetString());
STORAGE_LOG(
logPriority,
"vhost-server[" << Process.Pid << "] [" << ClientId << "] "
<< logRec["message"].GetString());
}
};

Expand Down Expand Up @@ -596,16 +599,25 @@ class TEndpoint final

void Start() override
{
Process = StartProcess();

auto processStarted = NewPromise();
// To avoid a race, we need to get the shared pointer in the calling
// thread and pass it to the background thread. This guaranteed that the
// background thread will deal with a live this.
auto workFunc = [self = shared_from_this()]()
auto workFunc = [&processStarted, self = shared_from_this()]()
{
// It is important to start the vhost-server on the thread that
// outlives it. vhost-server waits for the parent-death signal via
// PR_SET_PDEATHSIG which tracks the aliveness of the thread that
// spawned the process.
self->Process = self->StartProcess();
processStarted.SetValue();
self->ThreadProc();
};

std::thread(std::move(workFunc)).detach();
// Infinite time wait is safe here, since we are in the coroutine
// thread.
Executor->WaitFor(processStarted);
}

TFuture<NProto::TError> Stop() override
Expand Down Expand Up @@ -682,6 +694,7 @@ class TEndpoint final
Logging,
Stats,
std::move(process));
process = TChild{0};

Executor->ExecuteSimple([=] {
ep->Start(Executor->GetContExecutor());
Expand Down
210 changes: 210 additions & 0 deletions cloud/blockstore/tests/external_endpoint/multiple_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import os
import psutil
import pytest
import requests
import tempfile
import time

from cloud.blockstore.public.sdk.python.client import CreateClient, \
ClientError
from cloud.blockstore.public.sdk.python.protos import TCmsActionRequest, \
TAction, STORAGE_MEDIA_SSD_LOCAL, IPC_VHOST, VOLUME_ACCESS_READ_ONLY

from cloud.blockstore.tests.python.lib.client import NbsClient
from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \
generate_disk_agent_txt
from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \
start_disk_agent, get_fqdn

import yatest.common as yatest_common

from contrib.ydb.tests.library.harness.kikimr_runner import \
get_unique_path_for_current_test, \
ensure_path_exists


from library.python.retry import retry


DEVICE_SIZE = 1024**2
KNOWN_DEVICE_POOLS = {
"KnownDevicePools": [
{"Name": "1Mb", "Kind": "DEVICE_POOL_KIND_LOCAL",
"AllocationUnit": DEVICE_SIZE},
]}


@pytest.fixture(name='data_path')
def create_data_path():

p = get_unique_path_for_current_test(
output_path=yatest_common.output_path(),
sub_folder="data")

p = os.path.join(p, "dev", "disk", "by-partlabel")
ensure_path_exists(p)

return p


@pytest.fixture(autouse=True)
def create_device_files(data_path):

for i in range(6):
with open(os.path.join(data_path, f"NVMELOCAL{i + 1:02}"), 'wb') as f:
f.seek(DEVICE_SIZE-1)
f.write(b'\0')
f.flush()


@pytest.fixture(name='disk_agent_configurator')
def create_disk_agent_configurator(ydb, data_path):

configurator = NbsConfigurator(ydb, 'disk-agent')
configurator.generate_default_nbs_configs()

disk_agent_config = generate_disk_agent_txt(
agent_id='',
device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up tests
storage_discovery_config={
"PathConfigs": [{
"PathRegExp": f"{data_path}/NVMELOCAL([0-9]+)",
"PoolConfigs": [{
"PoolName": "1Mb",
"MinSize": DEVICE_SIZE,
"MaxSize": DEVICE_SIZE
}]}]
})

configurator.files["disk-agent"] = disk_agent_config

return configurator


@pytest.fixture(name='ydb')
def start_ydb_cluster():

ydb_cluster = start_ydb()

yield ydb_cluster

ydb_cluster.stop()


@pytest.fixture(name='nbs')
def start_nbs_daemon(ydb):
cfg = NbsConfigurator(ydb)
cfg.generate_default_nbs_configs()

server = cfg.files["server"].ServerConfig
server.VhostEnabled = True
server.VhostServerPath = yatest_common.binary_path(
"cloud/blockstore/vhost-server/blockstore-vhost-server")
server.VhostServerTimeoutAfterParentExit = 3000 # 3 seconds
cfg.files["storage"].NonReplicatedDontSuspendDevices = True

daemon = start_nbs(cfg)
yield daemon

daemon.kill()


@pytest.fixture(name='disk_agent')
def start_disk_agent_daemon(ydb, disk_agent_configurator):

daemon = start_disk_agent(disk_agent_configurator)

yield daemon

daemon.kill()


@pytest.fixture(autouse=True)
def setup_env(nbs, disk_agent, data_path):

client = CreateClient(f"localhost:{nbs.port}")
client.execute_action(
action="DiskRegistrySetWritableState",
input_bytes=str.encode('{"State": true}'))

client.update_disk_registry_config(KNOWN_DEVICE_POOLS)

disk_agent.wait_for_registration()

request = TCmsActionRequest()

action = request.Actions.add()
action.Type = TAction.ADD_HOST
action.Host = get_fqdn()

response = client.cms_action(request)

assert len(response.ActionResults) == 1
for r in response.ActionResults:
assert r.Result.Code == 0, r

# wait for devices to be cleared
nbs_client = NbsClient(nbs.port)
while True:
bkp = nbs_client.backup_disk_registry_state()["Backup"]
if bkp.get("DirtyDevices", 0) == 0:
break
time.sleep(1)


def test_multiple_endpoints(nbs):
client = CreateClient(f"localhost:{nbs.port}")

@retry(max_times=10, exception=ClientError)
def create_vol0():
client.create_volume(
disk_id="vol0",
block_size=4096,
blocks_count=2 * DEVICE_SIZE//4096,
storage_media_kind=STORAGE_MEDIA_SSD_LOCAL,
storage_pool_name="1Mb")

@retry(max_times=10, exception=requests.ConnectionError)
def wait_for_vhost_servers(nbs, expected_count):
count = 0
for process in psutil.process_iter():
try:
process_name = os.path.basename(process.exe())
process_parent = process.parent()
except psutil.AccessDenied:
continue

if process_parent is None:
continue

if process_name == "blockstore-vhost-server" and process_parent.pid == nbs.pid:
count += 1
if count != expected_count:
raise RuntimeError(
f"vhost count expected {expected_count}, actual {count}")

create_vol0()

# Start a lot of blockstore-vhost-server processes.
SOCKET_COUNT = 30
for i in range(0, SOCKET_COUNT):
socket = tempfile.NamedTemporaryFile()
client.start_endpoint_async(
unix_socket_path=socket.name,
disk_id="vol0",
ipc_type=IPC_VHOST,
access_mode=VOLUME_ACCESS_READ_ONLY,
client_id=f"{socket.name}-id",
seq_number=i+1
)

wait_for_vhost_servers(nbs, SOCKET_COUNT)

# Wait for the counters to be updated
time.sleep(15)

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
assert crit['value'] == 0 # Vhost servers should not have restarted.
7 changes: 6 additions & 1 deletion cloud/blockstore/tests/external_endpoint/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ PY3TEST()

INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc)

TEST_SRCS(test.py)
TEST_SRCS(
multiple_endpoints.py
test.py
)

DEPENDS(
cloud/blockstore/apps/client
cloud/blockstore/apps/disk_agent
cloud/blockstore/apps/server
cloud/blockstore/tools/testing/fake-vhost-server
cloud/blockstore/vhost-server
ydb/apps/ydbd
)

PEERDIR(
cloud/blockstore/tests/python/lib
contrib/python/psutil
contrib/python/requests/py3

library/python/retry
Expand Down
18 changes: 12 additions & 6 deletions cloud/blockstore/vhost-server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,14 @@ int main(int argc, char** argv)
sigaddset(&sigset, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &sigset, nullptr);

auto delayAfterParentExit = options.WaitAfterParentExit;
// wait for signal to stop the server (Ctrl+C) or dump statistics.
TInstant deathTimerStartedAt;
// Wait for signal to stop the server (Ctrl+C) or dump statistics.
for (bool running = true, parentExit = false; running;) {
int sig = 0;
if (parentExit) {
TDuration delayAfterParentExit = TInstant::Now() +
options.WaitAfterParentExit -
deathTimerStartedAt;
if (!delayAfterParentExit) {
break;
}
Expand All @@ -247,14 +250,11 @@ int main(int argc, char** argv)
timespec timeout = ToTimeSpec(delayAfterParentExit);
siginfo_t info;
memset(&info, 0, sizeof(info));
TInstant startAt = TInstant::Now();
sig = ::sigtimedwait(&sigset, &info, &timeout);

// Reduce the remaining time.
delayAfterParentExit -= TInstant::Now() - startAt;
} else {
// Wait for signal without timeout.
sigwait(&sigset, &sig);
::sigwait(&sigset, &sig);
}
switch (sig) {
case SIGUSR1: {
Expand All @@ -278,10 +278,16 @@ int main(int argc, char** argv)
} break;
case SIGUSR2: {
STORAGE_INFO("Parent process exit.");
if (!deathTimerStartedAt) {
deathTimerStartedAt = TInstant::Now();
}
parentExit = true;
} break;
case SIGPIPE: {
STORAGE_INFO("Pipe to parent process broken.");
if (!deathTimerStartedAt) {
deathTimerStartedAt = TInstant::Now();
}
parentExit = true;
} break;
case -1: {
Expand Down

0 comments on commit c10cd45

Please sign in to comment.