Skip to content

Commit

Permalink
Simlify the vhost server spawn (#2057)
Browse files Browse the repository at this point in the history
  • Loading branch information
komarevtsev-d committed Sep 18, 2024
1 parent efc7153 commit 4b1038b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 24 deletions.
28 changes: 6 additions & 22 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,27 +606,8 @@ class TEndpoint final

void Start() override
{
TCondVar processStarted;
// To avoid a race, we need to get the shared pointer in the calling
// thread and pass it to the background thread. This guaranteed that the
// background thread will deal with a live this.
auto workFunc = [&processStarted, self = shared_from_this()]()
{
// It is important to start the vhost-server on the thread that
// outlives it. vhost-server waits for the parent-death signal via
// PR_SET_PDEATHSIG which tracks the aliveness of the thread that
// spawned the process.
self->Process = self->StartProcess();
processStarted.Signal();
self->ThreadProc();
};

with_lock (Mutex) {
std::thread(std::move(workFunc)).detach();
// Infinite time wait is safe here, since we are in the coroutine
// thread.
processStarted.WaitI(Mutex);
}
Process = StartProcess();
std::thread(&TEndpoint::ThreadProc, shared_from_this()).detach();
}

TFuture<NProto::TError> Stop() override
Expand Down Expand Up @@ -679,7 +660,10 @@ class TEndpoint final
}
}

StopPromise.SetValue(error);
// We must call "SetValue()" on coroutine thread, since it will trigger
// future handlers synchronously.
Executor->ExecuteSimple([promise = this->StopPromise, error]() mutable
{ promise.SetValue(error); });
}

TIntrusivePtr<TEndpointProcess> StartProcess()
Expand Down
78 changes: 78 additions & 0 deletions cloud/blockstore/tests/external_endpoint/multiple_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,81 @@ def wait_for_vhost_servers(nbs, expected_count):
})
assert crit is not None
assert crit['value'] == 0 # Vhost servers should not have restarted.


def test_switch_multiple_endpoints(nbs):
client = CreateClient(f"localhost:{nbs.port}")

@retry(max_times=10, exception=ClientError)
def create_vol(disk_id: str):
client.create_volume(
disk_id=disk_id,
block_size=4096,
blocks_count=DEVICE_SIZE//4096,
storage_media_kind=STORAGE_MEDIA_SSD_LOCAL,
storage_pool_name="1Mb")

@retry(max_times=10, exception=requests.ConnectionError)
def wait_for_vhost_servers(nbs, expected_count):
count = 0
for process in psutil.process_iter():
try:
process_name = os.path.basename(process.exe())
process_parent = process.parent()
except psutil.AccessDenied:
continue

if process_parent is None:
continue

if process_name == "blockstore-vhost-server" and process_parent.pid == nbs.pid:
count += 1
if count != expected_count:
raise RuntimeError(
f"vhost count expected {expected_count}, actual {count}")

# Create disks and start an endpoint for each one.
DISK_COUNT = 6
disks = []
for i in range(DISK_COUNT):
disks.append(f"local-{i+1}")
sockets = []
for disk in disks:
create_vol(disk)
socket = tempfile.NamedTemporaryFile()
sockets.append(socket)
client.start_endpoint(
unix_socket_path=socket.name,
disk_id=disk,
ipc_type=IPC_VHOST,
client_id=f"{socket.name}-id",
seq_number=1
)

wait_for_vhost_servers(nbs, DISK_COUNT)

# Switch the endpoints. This will restart all vhost servers.
for i in range(0, DISK_COUNT):
idx = i % DISK_COUNT
disk = disks[idx]
socket = sockets[idx]

client.start_endpoint_async(
unix_socket_path=socket.name,
disk_id=disk,
ipc_type=IPC_VHOST,
client_id=f"{socket.name}-id",
seq_number=2
)

wait_for_vhost_servers(nbs, len(disks))

# Wait for the counters to be updated
time.sleep(15)

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
# Vhost servers should not have restarted unexpectedly.
assert crit['value'] == 0
2 changes: 2 additions & 0 deletions cloud/blockstore/tests/python/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ def generate_log_txt():
b"BLOCKSTORE_DISK_AGENT",
b"BLOCKSTORE_HIVE_PROXY",
b"BLOCKSTORE_SS_PROXY",
b"BLOCKSTORE_EXTERNAL_ENDPOINT",
b"BLOCKSTORE_VHOST",
]

log_config = TLogConfig()
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/vhost-server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ int main(int argc, char** argv)
for (bool running = true, parentExit = false; running;) {
int sig = 0;
if (parentExit) {
TDuration delayAfterParentExit = TInstant::Now() +
TDuration delayAfterParentExit = deathTimerStartedAt +
options.WaitAfterParentExit -
deathTimerStartedAt;
TInstant::Now();
if (!delayAfterParentExit) {
break;
}
Expand Down

0 comments on commit 4b1038b

Please sign in to comment.