Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBSOPSNEBIUS-25: report ExternalEndpointUnexpectedExit on vhost-server crashes #288

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/blockstore/libs/diagnostics/critical_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace NCloud::NBlockStore {
xxx(DiskRegistryDeviceNotFoundSoft) \
xxx(DiskRegistrySourceDiskNotFound) \
xxx(EndpointSwitchFailure) \
xxx(ExternalEndpointUnexpectedExit) \
xxx(DiskAgentSessionCacheUpdateError) \
xxx(DiskAgentSessionCacheRestoreError) \
// BLOCKSTORE_CRITICAL_EVENTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "external_endpoint_stats.h"

#include <cloud/blockstore/libs/common/device_path.h>
#include <cloud/blockstore/libs/diagnostics/critical_events.h>
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>

Expand Down Expand Up @@ -485,7 +486,10 @@ class TEndpoint final
break;
}

// TODO: limiter
ReportExternalEndpointUnexpectedExit(TStringBuilder()
sharpeye marked this conversation as resolved.
Show resolved Hide resolved
<< "External endpoint for a disk " << Stats.DiskId.Quote()
<< " and a client " << Stats.ClientId.Quote()
<< " unexpectedly stopped: " << FormatError(error));

auto process = RestartProcess();
if (!process) {
Expand Down
8 changes: 2 additions & 6 deletions cloud/blockstore/libs/server/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,15 @@ template <>
TVector<TString> ConvertValue(
const google::protobuf::RepeatedPtrField<TString>& value)
{
TVector<TString> v;
for (const auto& x : value) {
v.push_back(x);
}
return v;
return { value.begin(), value.end() };
}

template <>
TVector<TCertificate> ConvertValue(
const google::protobuf::RepeatedPtrField<NCloud::NProto::TCertificate>& value)
{
TVector<TCertificate> v;
for (const auto& x : value) {
for (const auto& x: value) {
v.push_back({x.GetCertFile(), x.GetCertPrivateKeyFile()});
}
return v;
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/public/sdk/python/client/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
"execute_action",
"kick_endpoint",
"cms_action",
"update_disk_registry_config",
"describe_disk_registry_config",
]


Expand Down
68 changes: 68 additions & 0 deletions cloud/blockstore/public/sdk/python/client/safe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import cloud.blockstore.public.sdk.python.protos as protos

from google.protobuf.json_format import ParseDict

from .error import _handle_errors


Expand Down Expand Up @@ -901,3 +903,69 @@ def cms_action(
trace_id,
request_timeout)
return response

@_handle_errors
def update_disk_registry_config_async(
self,
config,
idempotence_id=None,
timestamp=None,
trace_id=None,
request_timeout=None):

request = ParseDict(config, protos.TUpdateDiskRegistryConfigRequest())

return self.__impl.update_disk_registry_config_async(
request,
idempotence_id,
timestamp,
trace_id,
request_timeout)

@_handle_errors
def update_disk_registry_config(
self,
config,
idempotence_id=None,
timestamp=None,
trace_id=None,
request_timeout=None):

request = ParseDict(config, protos.TUpdateDiskRegistryConfigRequest())

return self.__impl.update_disk_registry_config(
request,
idempotence_id,
timestamp,
trace_id,
request_timeout)

@_handle_errors
def describe_disk_registry_config_async(
self,
idempotence_id=None,
timestamp=None,
trace_id=None,
request_timeout=None):

return self.__impl.describe_disk_registry_config_async(
protos.TDescribeDiskRegistryConfigRequest(),
idempotence_id,
timestamp,
trace_id,
request_timeout)

@_handle_errors
def describe_disk_registry_config(
self,
idempotence_id=None,
timestamp=None,
trace_id=None,
request_timeout=None):

return self.__impl.describe_disk_registry_config(
protos.TDescribeDiskRegistryConfigRequest(),
idempotence_id,
timestamp,
trace_id,
request_timeout)
245 changes: 245 additions & 0 deletions cloud/blockstore/tests/external_endpoint/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import os
import pytest
import requests
import stat
import tempfile
import time

from cloud.blockstore.public.sdk.python.client import CreateClient, \
ClientError
from cloud.blockstore.public.sdk.python.protos import TCmsActionRequest, \
TAction, STORAGE_MEDIA_SSD_LOCAL, IPC_VHOST

from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \
generate_disk_agent_txt
from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \
start_disk_agent, get_fqdn

import yatest.common as yatest_common

from contrib.ydb.tests.library.common.yatest_common import PortManager
from contrib.ydb.tests.library.harness.kikimr_runner import get_unique_path_for_current_test, \
ensure_path_exists

from library.python.retry import retry


DEVICE_SIZE = 1024**2
KNOWN_DEVICE_POOLS = {
"KnownDevicePools": [
{"Name": "1Mb", "Kind": "DEVICE_POOL_KIND_LOCAL", "AllocationUnit": DEVICE_SIZE},
]}


@pytest.fixture(name='data_path')
def create_data_path():

p = get_unique_path_for_current_test(
output_path=yatest_common.output_path(),
sub_folder="data")

p = os.path.join(p, "dev", "disk", "by-partlabel")
ensure_path_exists(p)

return p


@pytest.fixture(autouse=True)
def create_device_files(data_path):

for i in range(6):
with open(os.path.join(data_path, f"NVMENBS{i + 1:02}"), 'wb') as f:
f.seek(DEVICE_SIZE-1)
f.write(b'\0')
f.flush()


@pytest.fixture(name='disk_agent_configurator')
def create_disk_agent_configurator(ydb, data_path):

configurator = NbsConfigurator(ydb, 'disk-agent')
configurator.generate_default_nbs_configs()

disk_agent_config = generate_disk_agent_txt(
agent_id='',
device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up tests
storage_discovery_config={
"PathConfigs": [{
"PathRegExp": f"{data_path}/NVMENBS([0-9]+)",
"PoolConfigs": [{
"PoolName": "1Mb",
"MinSize": DEVICE_SIZE,
"MaxSize": DEVICE_SIZE
}]}]
})

configurator.files["disk-agent"] = disk_agent_config

return configurator


@pytest.fixture(name='ydb')
def start_ydb_cluster():

ydb_cluster = start_ydb()

yield ydb_cluster

ydb_cluster.stop()


@pytest.fixture(name='fake_vhost_server')
def setup_fake_vhost_server_script():

class Script:

def __init__(self, port, path):
self.port = port
self.path = path

port = PortManager().get_port()
binary_path = yatest_common.binary_path(
"cloud/blockstore/tools/testing/fake-vhost-server/fake-vhost-server")

with tempfile.NamedTemporaryFile(mode='w') as script:
script.write("\n".join([
"#!/bin/bash",
f"exec {binary_path} --port {port} $@"
]))

os.chmod(script.name, os.stat(script.name).st_mode | stat.S_IXUSR)

script.file.close()

yield Script(port, script.name)


@pytest.fixture(name='nbs')
def start_nbs_daemon(ydb, fake_vhost_server):

cfg = NbsConfigurator(ydb)
cfg.generate_default_nbs_configs()

server = cfg.files["server"].ServerConfig
server.VhostEnabled = True
server.VhostServerPath = fake_vhost_server.path

daemon = start_nbs(cfg)

yield daemon

daemon.kill()


@pytest.fixture(name='disk_agent')
def start_disk_agent_daemon(ydb, disk_agent_configurator):

daemon = start_disk_agent(disk_agent_configurator)

yield daemon

daemon.kill()


@pytest.fixture(autouse=True)
def setup_env(nbs, disk_agent, data_path):

client = CreateClient(f"localhost:{nbs.port}")
client.execute_action(
action="DiskRegistrySetWritableState",
input_bytes=str.encode('{"State": true}'))

client.update_disk_registry_config(KNOWN_DEVICE_POOLS)

disk_agent.wait_for_registration()

request = TCmsActionRequest()

for i in range(2):
action = request.Actions.add()
action.Type = TAction.ADD_DEVICE
action.Host = get_fqdn()
action.Device = os.path.join(data_path, f"NVMENBS{i + 1:02}")

response = client.cms_action(request)

assert len(response.ActionResults) == 2
for r in response.ActionResults:
assert r.Result.Code == 0, r


def test_external_endpoint(nbs, fake_vhost_server):

client = CreateClient(f"localhost:{nbs.port}")

@retry(max_times=10, exception=ClientError)
def create_vol0():
client.create_volume(
disk_id="vol0",
block_size=4096,
blocks_count=2 * DEVICE_SIZE//4096,
storage_media_kind=STORAGE_MEDIA_SSD_LOCAL,
storage_pool_name="1Mb")

vhost_server_url = f"http://localhost:{fake_vhost_server.port}"

@retry(max_times=10, exception=requests.ConnectionError)
def wait_for_vhost_server():
r = requests.get(f"{vhost_server_url}/ping").text
assert r == "pong"

create_vol0()

with tempfile.NamedTemporaryFile() as unix_socket:
r = client.start_endpoint(
unix_socket_path=unix_socket.name,
disk_id="vol0",
ipc_type=IPC_VHOST,
client_id="test"
)

assert r["Volume"].DiskId == "vol0"

wait_for_vhost_server()

r = requests.get(f"{vhost_server_url}/describe").json()
devices = r["devices"]
assert len(devices) == 2

for d in devices:
assert d[0].find("NVMENBS") != -1
assert d[1] == DEVICE_SIZE
assert d[2] == 0

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
assert crit['value'] == 0

try:
_ = requests.post(f"{vhost_server_url}/terminate", data='{"exit_code": 42}')
except requests.ConnectionError:
pass

# wait for the counters to be updated
time.sleep(15)

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
assert crit['value'] == 1

wait_for_vhost_server()

client.stop_endpoint(unix_socket.name)

# wait for the counters to be updated
time.sleep(15)

crit = nbs.counters.find({
'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit'
})
assert crit is not None
assert crit['value'] == 1 # the same value
Loading
Loading