Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge to stable 23-3 #2022

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <util/stream/file.h>
#include <util/string/builder.h>
#include <util/string/cast.h>
#include <util/system/condvar.h>
#include <util/system/file.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
Expand Down Expand Up @@ -180,14 +181,14 @@ struct TChild
{}

TChild(const TChild&) = delete;
TChild& operator = (const TChild&) = delete;
TChild& operator=(const TChild&) = delete;

TChild(TChild&& rhs) noexcept
{
Swap(rhs);
}

TChild& operator = (TChild&& rhs) noexcept
TChild& operator=(TChild&& rhs) noexcept
{
Swap(rhs);

Expand Down Expand Up @@ -508,8 +509,10 @@ class TEndpointProcess final
long long logPriority = TLOG_INFO;
logRec["priority"].GetInteger(&logPriority);

STORAGE_LOG(logPriority, "[" << ClientId << "] "
<< logRec["message"].GetString());
STORAGE_LOG(
logPriority,
"vhost-server[" << Process.Pid << "] [" << ClientId << "] "
<< logRec["message"].GetString());
}
};

Expand Down Expand Up @@ -596,16 +599,27 @@ class TEndpoint final

void Start() override
{
Process = StartProcess();

TCondVar processStarted;
// To avoid a race, we need to get the shared pointer in the calling
// thread and pass it to the background thread. This guaranteed that the
// background thread will deal with a live this.
auto workFunc = [self = shared_from_this()]()
auto workFunc = [&processStarted, self = shared_from_this()]()
{
// It is important to start the vhost-server on the thread that
// outlives it. vhost-server waits for the parent-death signal via
// PR_SET_PDEATHSIG which tracks the aliveness of the thread that
// spawned the process.
self->Process = self->StartProcess();
processStarted.Signal();
self->ThreadProc();
};
std::thread(std::move(workFunc)).detach();

with_lock (Mutex) {
std::thread(std::move(workFunc)).detach();
// Infinite time wait is safe here, since we are in the coroutine
// thread.
processStarted.WaitI(Mutex);
}
}

TFuture<NProto::TError> Stop() override
Expand Down Expand Up @@ -682,6 +696,7 @@ class TEndpoint final
Logging,
Stats,
std::move(process));
process = TChild{0};

Executor->ExecuteSimple([=] {
ep->Start(Executor->GetContExecutor());
Expand Down
210 changes: 210 additions & 0 deletions cloud/blockstore/tests/external_endpoint/multiple_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import os
import psutil
import pytest
import requests
import tempfile
import time

from cloud.blockstore.public.sdk.python.client import CreateClient, \
ClientError
from cloud.blockstore.public.sdk.python.protos import TCmsActionRequest, \
TAction, STORAGE_MEDIA_SSD_LOCAL, IPC_VHOST, VOLUME_ACCESS_READ_ONLY

from cloud.blockstore.tests.python.lib.client import NbsClient
from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \
generate_disk_agent_txt
from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \
start_disk_agent, get_fqdn

import yatest.common as yatest_common

from contrib.ydb.tests.library.harness.kikimr_runner import \
get_unique_path_for_current_test, \
ensure_path_exists


from library.python.retry import retry


DEVICE_SIZE = 1024**2
KNOWN_DEVICE_POOLS = {
"KnownDevicePools": [
{"Name": "1Mb", "Kind": "DEVICE_POOL_KIND_LOCAL",
"AllocationUnit": DEVICE_SIZE},
]}


@pytest.fixture(name='data_path')
def create_data_path():

p = get_unique_path_for_current_test(
output_path=yatest_common.output_path(),
sub_folder="data")

p = os.path.join(p, "dev", "disk", "by-partlabel")
ensure_path_exists(p)

return p


@pytest.fixture(autouse=True)
def create_device_files(data_path):

for i in range(6):
with open(os.path.join(data_path, f"NVMELOCAL{i + 1:02}"), 'wb') as f:
f.seek(DEVICE_SIZE-1)
f.write(b'\0')
f.flush()


@pytest.fixture(name='disk_agent_configurator')
def create_disk_agent_configurator(ydb, data_path):

configurator = NbsConfigurator(ydb, 'disk-agent')
configurator.generate_default_nbs_configs()

disk_agent_config = generate_disk_agent_txt(
agent_id='',
device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up tests
storage_discovery_config={
"PathConfigs": [{
"PathRegExp": f"{data_path}/NVMELOCAL([0-9]+)",
"PoolConfigs": [{
"PoolName": "1Mb",
"MinSize": DEVICE_SIZE,
"MaxSize": DEVICE_SIZE
}]}]
})

configurator.files["disk-agent"] = disk_agent_config

return configurator


@pytest.fixture(name='ydb')
def start_ydb_cluster():

ydb_cluster = start_ydb()

yield ydb_cluster

ydb_cluster.stop()


@pytest.fixture(name='nbs')
def start_nbs_daemon(ydb):
cfg = NbsConfigurator(ydb)
cfg.generate_default_nbs_configs()

server = cfg.files["server"].ServerConfig
server.VhostEnabled = True
server.VhostServerPath = yatest_common.binary_path(
"cloud/blockstore/vhost-server/blockstore-vhost-server")
server.VhostServerTimeoutAfterParentExit = 3000 # 3 seconds
cfg.files["storage"].NonReplicatedDontSuspendDevices = True

daemon = start_nbs(cfg)
yield daemon

daemon.kill()


@pytest.fixture(name='disk_agent')
def start_disk_agent_daemon(ydb, disk_agent_configurator):

daemon = start_disk_agent(disk_agent_configurator)

yield daemon

daemon.kill()


@pytest.fixture(autouse=True)
def setup_env(nbs, disk_agent, data_path):

client = CreateClient(f"localhost:{nbs.port}")
client.execute_action(
action="DiskRegistrySetWritableState",
input_bytes=str.encode('{"State": true}'))

client.update_disk_registry_config(KNOWN_DEVICE_POOLS)

disk_agent.wait_for_registration()

request = TCmsActionRequest()

action = request.Actions.add()
action.Type = TAction.ADD_HOST
action.Host = get_fqdn()

response = client.cms_action(request)

assert len(response.ActionResults) == 1
for r in response.ActionResults:
assert r.Result.Code == 0, r

# wait for devices to be cleared
nbs_client = NbsClient(nbs.port)
while True:
bkp = nbs_client.backup_disk_registry_state()["Backup"]
if bkp.get("DirtyDevices", 0) == 0:
break
time.sleep(1)


def test_multiple_endpoints(nbs):
client = CreateClient(f"localhost:{nbs.port}")

@retry(max_times=10, exception=ClientError)
def create_vol0():
client.create_volume(
disk_id="vol0",
block_size=4096,
blocks_count=2 * DEVICE_SIZE//4096,
storage_media_kind=STORAGE_MEDIA_SSD_LOCAL,
storage_pool_name="1Mb")

@retry(max_times=10, exception=requests.ConnectionError)
def wait_for_vhost_servers(nbs, expected_count):
count = 0
for process in psutil.process_iter():
try:
process_name = os.path.basename(process.exe())
process_parent = process.parent()
except psutil.AccessDenied:
continue

if process_parent is None:
continue

if process_name == "blockstore-vhost-server" and process_parent.pid == nbs.pid:
count += 1
if count != expected_count:
raise RuntimeError(
f"vhost count expected {expected_count}, actual {count}")

create_vol0()

# Start a lot of blockstore-vhost-server processes.
SOCKET_COUNT = 30
for i in range(0, SOCKET_COUNT):
socket = tempfile.NamedTemporaryFile()
client.start_endpoint_async(
unix_socket_path=socket.name,
disk_id="vol0",
ipc_type=IPC_VHOST,
access_mode=VOLUME_ACCESS_READ_ONLY,
client_id=f"{socket.name}-id",
seq_number=i+1
)

wait_for_vhost_servers(nbs, SOCKET_COUNT)

# Wait for the counters to be updated
time.sleep(15)

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
assert crit['value'] == 0 # Vhost servers should not have restarted.
7 changes: 6 additions & 1 deletion cloud/blockstore/tests/external_endpoint/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ PY3TEST()

INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc)

TEST_SRCS(test.py)
TEST_SRCS(
multiple_endpoints.py
test.py
)

DEPENDS(
cloud/blockstore/apps/client
cloud/blockstore/apps/disk_agent
cloud/blockstore/apps/server
cloud/blockstore/tools/testing/fake-vhost-server
cloud/blockstore/vhost-server
ydb/apps/ydbd
)

PEERDIR(
cloud/blockstore/tests/python/lib
contrib/python/psutil
contrib/python/requests/py3

library/python/retry
Expand Down
18 changes: 12 additions & 6 deletions cloud/blockstore/vhost-server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,14 @@ int main(int argc, char** argv)
sigaddset(&sigset, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &sigset, nullptr);

auto delayAfterParentExit = options.WaitAfterParentExit;
// wait for signal to stop the server (Ctrl+C) or dump statistics.
TInstant deathTimerStartedAt;
// Wait for signal to stop the server (Ctrl+C) or dump statistics.
for (bool running = true, parentExit = false; running;) {
int sig = 0;
if (parentExit) {
TDuration delayAfterParentExit = TInstant::Now() +
options.WaitAfterParentExit -
deathTimerStartedAt;
if (!delayAfterParentExit) {
break;
}
Expand All @@ -247,14 +250,11 @@ int main(int argc, char** argv)
timespec timeout = ToTimeSpec(delayAfterParentExit);
siginfo_t info;
memset(&info, 0, sizeof(info));
TInstant startAt = TInstant::Now();
sig = ::sigtimedwait(&sigset, &info, &timeout);

// Reduce the remaining time.
delayAfterParentExit -= TInstant::Now() - startAt;
} else {
// Wait for signal without timeout.
sigwait(&sigset, &sig);
::sigwait(&sigset, &sig);
}
switch (sig) {
case SIGUSR1: {
Expand All @@ -278,10 +278,16 @@ int main(int argc, char** argv)
} break;
case SIGUSR2: {
STORAGE_INFO("Parent process exit.");
if (!deathTimerStartedAt) {
deathTimerStartedAt = TInstant::Now();
}
parentExit = true;
} break;
case SIGPIPE: {
STORAGE_INFO("Pipe to parent process broken.");
if (!deathTimerStartedAt) {
deathTimerStartedAt = TInstant::Now();
}
parentExit = true;
} break;
case -1: {
Expand Down
Loading
Loading