diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp index 4f59c5266f..e87aae1626 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -180,14 +181,14 @@ struct TChild {} TChild(const TChild&) = delete; - TChild& operator = (const TChild&) = delete; + TChild& operator=(const TChild&) = delete; TChild(TChild&& rhs) noexcept { Swap(rhs); } - TChild& operator = (TChild&& rhs) noexcept + TChild& operator=(TChild&& rhs) noexcept { Swap(rhs); @@ -508,8 +509,10 @@ class TEndpointProcess final long long logPriority = TLOG_INFO; logRec["priority"].GetInteger(&logPriority); - STORAGE_LOG(logPriority, "[" << ClientId << "] " - << logRec["message"].GetString()); + STORAGE_LOG( + logPriority, + "vhost-server[" << Process.Pid << "] [" << ClientId << "] " + << logRec["message"].GetString()); } }; @@ -596,16 +599,27 @@ class TEndpoint final void Start() override { - Process = StartProcess(); - + TCondVar var; // To avoid a race, we need to get the shared pointer in the calling // thread and pass it to the background thread. This guaranteed that the // background thread will deal with a live this. - auto workFunc = [self = shared_from_this()]() + auto workFunc = [&var, self = shared_from_this()]() { + // It is important to start the vhost-server on the thread that + // outlives it. vhost-server waits for the parent-death signal via + // PR_SET_PDEATHSIG which tracks the aliveness of the thread that + // spawned the process. + self->Process = self->StartProcess(); + var.Signal(); self->ThreadProc(); }; - std::thread(std::move(workFunc)).detach(); + + with_lock (Mutex) { + std::thread(std::move(workFunc)).detach(); + // Infinite time wait is safe here, since we are in the coroutine + // thread. + var.WaitI(Mutex); + } } TFuture Stop() override @@ -682,6 +696,7 @@ class TEndpoint final Logging, Stats, std::move(process)); + process = TChild{0}; Executor->ExecuteSimple([=] { ep->Start(Executor->GetContExecutor()); diff --git a/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py b/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py new file mode 100644 index 0000000000..443a68148d --- /dev/null +++ b/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py @@ -0,0 +1,212 @@ +import os +import psutil +import pytest +import requests +import tempfile +import time + +from cloud.blockstore.public.sdk.python.client import CreateClient, \ + ClientError +from cloud.blockstore.public.sdk.python.protos import TCmsActionRequest, \ + TAction, STORAGE_MEDIA_SSD_LOCAL, IPC_VHOST, VOLUME_ACCESS_READ_ONLY + +from cloud.blockstore.tests.python.lib.client import NbsClient +from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \ + generate_disk_agent_txt +from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \ + start_disk_agent, get_fqdn + +import yatest.common as yatest_common + +from contrib.ydb.tests.library.harness.kikimr_runner import \ + get_unique_path_for_current_test, \ + ensure_path_exists + + +from library.python.retry import retry + + +DEVICE_SIZE = 1024**2 +KNOWN_DEVICE_POOLS = { + "KnownDevicePools": [ + {"Name": "1Mb", "Kind": "DEVICE_POOL_KIND_LOCAL", + "AllocationUnit": DEVICE_SIZE}, + ]} + + +@pytest.fixture(name='data_path') +def create_data_path(): + + p = get_unique_path_for_current_test( + output_path=yatest_common.output_path(), + sub_folder="data") + + p = os.path.join(p, "dev", "disk", "by-partlabel") + ensure_path_exists(p) + + return p + + +@pytest.fixture(autouse=True) +def create_device_files(data_path): + + for i in range(6): + with open(os.path.join(data_path, f"NVMELOCAL{i + 1:02}"), 'wb') as f: + f.seek(DEVICE_SIZE-1) + f.write(b'\0') + f.flush() + + +@pytest.fixture(name='disk_agent_configurator') +def create_disk_agent_configurator(ydb, data_path): + + configurator = NbsConfigurator(ydb, 'disk-agent') + configurator.generate_default_nbs_configs() + + disk_agent_config = generate_disk_agent_txt( + agent_id='', + device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up tests + storage_discovery_config={ + "PathConfigs": [{ + "PathRegExp": f"{data_path}/NVMELOCAL([0-9]+)", + "PoolConfigs": [{ + "PoolName": "1Mb", + "MinSize": DEVICE_SIZE, + "MaxSize": DEVICE_SIZE + }]}] + }) + + configurator.files["disk-agent"] = disk_agent_config + + return configurator + + +@pytest.fixture(name='ydb') +def start_ydb_cluster(): + + ydb_cluster = start_ydb() + + yield ydb_cluster + + ydb_cluster.stop() + + +@pytest.fixture(name='nbs') +def start_nbs_daemon(ydb): + cfg = NbsConfigurator(ydb) + cfg.generate_default_nbs_configs() + + server = cfg.files["server"].ServerConfig + server.VhostEnabled = True + server.VhostServerPath = yatest_common.binary_path( + "cloud/blockstore/vhost-server/blockstore-vhost-server") + server.VhostServerTimeoutAfterParentExit = 5000 # 5 seconds + cfg.files["storage"].NonReplicatedDontSuspendDevices = True + + daemon = start_nbs(cfg) + yield daemon + + daemon.kill() + + +@pytest.fixture(name='disk_agent') +def start_disk_agent_daemon(ydb, disk_agent_configurator): + + daemon = start_disk_agent(disk_agent_configurator) + + yield daemon + + daemon.kill() + + +@pytest.fixture(autouse=True) +def setup_env(nbs, disk_agent, data_path): + + client = CreateClient(f"localhost:{nbs.port}") + client.execute_action( + action="DiskRegistrySetWritableState", + input_bytes=str.encode('{"State": true}')) + + client.update_disk_registry_config(KNOWN_DEVICE_POOLS) + + disk_agent.wait_for_registration() + + request = TCmsActionRequest() + + action = request.Actions.add() + action.Type = TAction.ADD_HOST + action.Host = get_fqdn() + + response = client.cms_action(request) + + assert len(response.ActionResults) == 1 + for r in response.ActionResults: + assert r.Result.Code == 0, r + + # wait for devices to be cleared + nbs_client = NbsClient(nbs.port) + while True: + bkp = nbs_client.backup_disk_registry_state()["Backup"] + if bkp.get("DirtyDevices", 0) == 0: + break + time.sleep(1) + + +def test_multiple_endpoints(nbs): + client = CreateClient(f"localhost:{nbs.port}") + + @retry(max_times=10, exception=ClientError) + def create_vol0(): + client.create_volume( + disk_id="vol0", + block_size=4096, + blocks_count=2 * DEVICE_SIZE//4096, + storage_media_kind=STORAGE_MEDIA_SSD_LOCAL, + storage_pool_name="1Mb") + + @retry(max_times=10, exception=requests.ConnectionError) + def wait_for_vhost_servers(nbs, expected_count): + count = 0 + for process in psutil.process_iter(): + try: + process_name = os.path.basename(process.exe()) + process_parent = process.parent() + except psutil.AccessDenied: + continue + + if process_parent is None: + continue + + if process_name == "blockstore-vhost-server" and process_parent.pid == nbs.pid: + count += 1 + if count != expected_count: + raise RuntimeError( + f"vhost count expected {expected_count}, actual {count}") + + create_vol0() + + sockets = list() + socket_count = 30 + for i in range(0, socket_count): + socket = tempfile.NamedTemporaryFile() + r = client.start_endpoint( + unix_socket_path=socket.name, + disk_id="vol0", + ipc_type=IPC_VHOST, + access_mode=VOLUME_ACCESS_READ_ONLY, + client_id=f"{socket.name}-id", + seq_number=i+1 + ) + assert r["Volume"].DiskId == "vol0" + sockets.append(socket) + + wait_for_vhost_servers(nbs, socket_count) + + # wait for the counters to be updated + time.sleep(15) + + crit = nbs.counters.find({ + 'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit' + }) + assert crit is not None + assert crit['value'] == 0 # vhost servers should not have restarted. diff --git a/cloud/blockstore/tests/external_endpoint/ya.make b/cloud/blockstore/tests/external_endpoint/ya.make index 67253ca4cc..da29afc758 100644 --- a/cloud/blockstore/tests/external_endpoint/ya.make +++ b/cloud/blockstore/tests/external_endpoint/ya.make @@ -2,18 +2,23 @@ PY3TEST() INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc) -TEST_SRCS(test.py) +TEST_SRCS( + multiple_endpoints.py + test.py +) DEPENDS( cloud/blockstore/apps/client cloud/blockstore/apps/disk_agent cloud/blockstore/apps/server cloud/blockstore/tools/testing/fake-vhost-server + cloud/blockstore/vhost-server contrib/ydb/apps/ydbd ) PEERDIR( cloud/blockstore/tests/python/lib + contrib/python/psutil contrib/python/requests/py3 library/python/retry diff --git a/cloud/blockstore/vhost-server/main.cpp b/cloud/blockstore/vhost-server/main.cpp index 776cc032e7..e1c27cfa19 100644 --- a/cloud/blockstore/vhost-server/main.cpp +++ b/cloud/blockstore/vhost-server/main.cpp @@ -235,7 +235,8 @@ int main(int argc, char** argv) pthread_sigmask(SIG_BLOCK, &sigset, nullptr); auto delayAfterParentExit = options.WaitAfterParentExit; - // wait for signal to stop the server (Ctrl+C) or dump statistics. + TInstant deathTimerStartedAt; + // Wait for signal to stop the server (Ctrl+C) or dump statistics. for (bool running = true, parentExit = false; running;) { int sig = 0; if (parentExit) { @@ -247,14 +248,13 @@ int main(int argc, char** argv) timespec timeout = ToTimeSpec(delayAfterParentExit); siginfo_t info; memset(&info, 0, sizeof(info)); - TInstant startAt = TInstant::Now(); sig = ::sigtimedwait(&sigset, &info, &timeout); // Reduce the remaining time. - delayAfterParentExit -= TInstant::Now() - startAt; + delayAfterParentExit -= TInstant::Now() - deathTimerStartedAt; } else { // Wait for signal without timeout. - sigwait(&sigset, &sig); + ::sigwait(&sigset, &sig); } switch (sig) { case SIGUSR1: { @@ -278,10 +278,12 @@ int main(int argc, char** argv) } break; case SIGUSR2: { STORAGE_INFO("Parent process exit."); + deathTimerStartedAt = TInstant::Now(); parentExit = true; } break; case SIGPIPE: { STORAGE_INFO("Pipe to parent process broken."); + deathTimerStartedAt = TInstant::Now(); parentExit = true; } break; case -1: {