Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simlify the vhost server spawn #2057

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 6 additions & 22 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,27 +606,8 @@ class TEndpoint final

void Start() override
{
TCondVar processStarted;
// To avoid a race, we need to get the shared pointer in the calling
// thread and pass it to the background thread. This guaranteed that the
// background thread will deal with a live this.
auto workFunc = [&processStarted, self = shared_from_this()]()
{
// It is important to start the vhost-server on the thread that
// outlives it. vhost-server waits for the parent-death signal via
// PR_SET_PDEATHSIG which tracks the aliveness of the thread that
// spawned the process.
self->Process = self->StartProcess();
processStarted.Signal();
self->ThreadProc();
};

with_lock (Mutex) {
std::thread(std::move(workFunc)).detach();
// Infinite time wait is safe here, since we are in the coroutine
// thread.
processStarted.WaitI(Mutex);
}
Process = StartProcess();
std::thread(&TEndpoint::ThreadProc, shared_from_this()).detach();
}

TFuture<NProto::TError> Stop() override
Expand Down Expand Up @@ -679,7 +660,10 @@ class TEndpoint final
}
}

StopPromise.SetValue(error);
// We must call "SetValue()" on coroutine thread, since it will trigger
// future handlers synchronously.
Executor->ExecuteSimple([promise = this->StopPromise, error]() mutable
{ promise.SetValue(error); });
}

TIntrusivePtr<TEndpointProcess> StartProcess()
Expand Down
78 changes: 78 additions & 0 deletions cloud/blockstore/tests/external_endpoint/multiple_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,81 @@ def wait_for_vhost_servers(nbs, expected_count):
})
assert crit is not None
assert crit['value'] == 0 # Vhost servers should not have restarted.


def test_switch_multiple_endpoints(nbs):
client = CreateClient(f"localhost:{nbs.port}")

@retry(max_times=10, exception=ClientError)
def create_vol(disk_id: str):
client.create_volume(
disk_id=disk_id,
block_size=4096,
blocks_count=DEVICE_SIZE//4096,
storage_media_kind=STORAGE_MEDIA_SSD_LOCAL,
storage_pool_name="1Mb")

@retry(max_times=10, exception=requests.ConnectionError)
def wait_for_vhost_servers(nbs, expected_count):
count = 0
for process in psutil.process_iter():
try:
process_name = os.path.basename(process.exe())
process_parent = process.parent()
except psutil.AccessDenied:
continue

if process_parent is None:
continue

if process_name == "blockstore-vhost-server" and process_parent.pid == nbs.pid:
count += 1
if count != expected_count:
raise RuntimeError(
f"vhost count expected {expected_count}, actual {count}")

# Create disks and start an endpoint for each one.
DISK_COUNT = 6
disks = []
for i in range(DISK_COUNT):
disks.append(f"local-{i+1}")
sockets = []
for disk in disks:
create_vol(disk)
socket = tempfile.NamedTemporaryFile()
sockets.append(socket)
client.start_endpoint(
unix_socket_path=socket.name,
disk_id=disk,
ipc_type=IPC_VHOST,
client_id=f"{socket.name}-id",
seq_number=1
)

wait_for_vhost_servers(nbs, DISK_COUNT)

# Switch the endpoints. This will restart all vhost servers.
for i in range(0, DISK_COUNT):
idx = i % DISK_COUNT
disk = disks[idx]
socket = sockets[idx]

client.start_endpoint_async(
unix_socket_path=socket.name,
disk_id=disk,
ipc_type=IPC_VHOST,
client_id=f"{socket.name}-id",
seq_number=2
)

wait_for_vhost_servers(nbs, len(disks))

# Wait for the counters to be updated
time.sleep(15)

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
# Vhost servers should not have restarted unexpectedly.
assert crit['value'] == 0
2 changes: 2 additions & 0 deletions cloud/blockstore/tests/python/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ def generate_log_txt():
b"BLOCKSTORE_DISK_AGENT",
b"BLOCKSTORE_HIVE_PROXY",
b"BLOCKSTORE_SS_PROXY",
b"BLOCKSTORE_EXTERNAL_ENDPOINT",
b"BLOCKSTORE_VHOST",
]

log_config = TLogConfig()
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/vhost-server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ int main(int argc, char** argv)
for (bool running = true, parentExit = false; running;) {
int sig = 0;
if (parentExit) {
TDuration delayAfterParentExit = TInstant::Now() +
TDuration delayAfterParentExit = deathTimerStartedAt +
options.WaitAfterParentExit -
deathTimerStartedAt;
TInstant::Now();
komarevtsev-d marked this conversation as resolved.
Show resolved Hide resolved
if (!delayAfterParentExit) {
break;
}
Expand Down
Loading