From 4b1038ba96ee324e806172659b396945bcad4906 Mon Sep 17 00:00:00 2001 From: Daniil Komarevtsev Date: Wed, 18 Sep 2024 16:11:19 +0700 Subject: [PATCH] Simlify the vhost server spawn (#2057) --- .../endpoints_vhost/external_vhost_server.cpp | 28 ++----- .../external_endpoint/multiple_endpoints.py | 78 +++++++++++++++++++ cloud/blockstore/tests/python/lib/config.py | 2 + cloud/blockstore/vhost-server/main.cpp | 4 +- 4 files changed, 88 insertions(+), 24 deletions(-) diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp index a9abf2b6f93..7d36572b04f 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp @@ -606,27 +606,8 @@ class TEndpoint final void Start() override { - TCondVar processStarted; - // To avoid a race, we need to get the shared pointer in the calling - // thread and pass it to the background thread. This guaranteed that the - // background thread will deal with a live this. - auto workFunc = [&processStarted, self = shared_from_this()]() - { - // It is important to start the vhost-server on the thread that - // outlives it. vhost-server waits for the parent-death signal via - // PR_SET_PDEATHSIG which tracks the aliveness of the thread that - // spawned the process. - self->Process = self->StartProcess(); - processStarted.Signal(); - self->ThreadProc(); - }; - - with_lock (Mutex) { - std::thread(std::move(workFunc)).detach(); - // Infinite time wait is safe here, since we are in the coroutine - // thread. - processStarted.WaitI(Mutex); - } + Process = StartProcess(); + std::thread(&TEndpoint::ThreadProc, shared_from_this()).detach(); } TFuture Stop() override @@ -679,7 +660,10 @@ class TEndpoint final } } - StopPromise.SetValue(error); + // We must call "SetValue()" on coroutine thread, since it will trigger + // future handlers synchronously. + Executor->ExecuteSimple([promise = this->StopPromise, error]() mutable + { promise.SetValue(error); }); } TIntrusivePtr StartProcess() diff --git a/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py b/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py index 03c3ae8f35a..b19d2f5f55b 100644 --- a/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py +++ b/cloud/blockstore/tests/external_endpoint/multiple_endpoints.py @@ -208,3 +208,81 @@ def wait_for_vhost_servers(nbs, expected_count): }) assert crit is not None assert crit['value'] == 0 # Vhost servers should not have restarted. + + +def test_switch_multiple_endpoints(nbs): + client = CreateClient(f"localhost:{nbs.port}") + + @retry(max_times=10, exception=ClientError) + def create_vol(disk_id: str): + client.create_volume( + disk_id=disk_id, + block_size=4096, + blocks_count=DEVICE_SIZE//4096, + storage_media_kind=STORAGE_MEDIA_SSD_LOCAL, + storage_pool_name="1Mb") + + @retry(max_times=10, exception=requests.ConnectionError) + def wait_for_vhost_servers(nbs, expected_count): + count = 0 + for process in psutil.process_iter(): + try: + process_name = os.path.basename(process.exe()) + process_parent = process.parent() + except psutil.AccessDenied: + continue + + if process_parent is None: + continue + + if process_name == "blockstore-vhost-server" and process_parent.pid == nbs.pid: + count += 1 + if count != expected_count: + raise RuntimeError( + f"vhost count expected {expected_count}, actual {count}") + + # Create disks and start an endpoint for each one. + DISK_COUNT = 6 + disks = [] + for i in range(DISK_COUNT): + disks.append(f"local-{i+1}") + sockets = [] + for disk in disks: + create_vol(disk) + socket = tempfile.NamedTemporaryFile() + sockets.append(socket) + client.start_endpoint( + unix_socket_path=socket.name, + disk_id=disk, + ipc_type=IPC_VHOST, + client_id=f"{socket.name}-id", + seq_number=1 + ) + + wait_for_vhost_servers(nbs, DISK_COUNT) + + # Switch the endpoints. This will restart all vhost servers. + for i in range(0, DISK_COUNT): + idx = i % DISK_COUNT + disk = disks[idx] + socket = sockets[idx] + + client.start_endpoint_async( + unix_socket_path=socket.name, + disk_id=disk, + ipc_type=IPC_VHOST, + client_id=f"{socket.name}-id", + seq_number=2 + ) + + wait_for_vhost_servers(nbs, len(disks)) + + # Wait for the counters to be updated + time.sleep(15) + + crit = nbs.counters.find({ + 'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit' + }) + assert crit is not None + # Vhost servers should not have restarted unexpectedly. + assert crit['value'] == 0 diff --git a/cloud/blockstore/tests/python/lib/config.py b/cloud/blockstore/tests/python/lib/config.py index 43d421cfd0e..1d53e945cd4 100644 --- a/cloud/blockstore/tests/python/lib/config.py +++ b/cloud/blockstore/tests/python/lib/config.py @@ -239,6 +239,8 @@ def generate_log_txt(): b"BLOCKSTORE_DISK_AGENT", b"BLOCKSTORE_HIVE_PROXY", b"BLOCKSTORE_SS_PROXY", + b"BLOCKSTORE_EXTERNAL_ENDPOINT", + b"BLOCKSTORE_VHOST", ] log_config = TLogConfig() diff --git a/cloud/blockstore/vhost-server/main.cpp b/cloud/blockstore/vhost-server/main.cpp index 7031e93cfb4..636e6c2bfa1 100644 --- a/cloud/blockstore/vhost-server/main.cpp +++ b/cloud/blockstore/vhost-server/main.cpp @@ -239,9 +239,9 @@ int main(int argc, char** argv) for (bool running = true, parentExit = false; running;) { int sig = 0; if (parentExit) { - TDuration delayAfterParentExit = TInstant::Now() + + TDuration delayAfterParentExit = deathTimerStartedAt + options.WaitAfterParentExit - - deathTimerStartedAt; + TInstant::Now(); if (!delayAfterParentExit) { break; }