diff --git a/cloud/blockstore/libs/diagnostics/critical_events.h b/cloud/blockstore/libs/diagnostics/critical_events.h index ba716be2a7..4cb6687d28 100644 --- a/cloud/blockstore/libs/diagnostics/critical_events.h +++ b/cloud/blockstore/libs/diagnostics/critical_events.h @@ -51,6 +51,7 @@ namespace NCloud::NBlockStore { xxx(DiskRegistryDeviceNotFoundSoft) \ xxx(DiskRegistrySourceDiskNotFound) \ xxx(EndpointSwitchFailure) \ + xxx(ExternalEndpointUnexpectedExit) \ xxx(DiskAgentSessionCacheUpdateError) \ xxx(DiskAgentSessionCacheRestoreError) \ xxx(BlockDigestMismatchInBlob) \ diff --git a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp index 29a8977959..7ad704b7c6 100644 --- a/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp +++ b/cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp @@ -2,6 +2,7 @@ #include "external_endpoint_stats.h" #include +#include #include #include @@ -485,7 +486,10 @@ class TEndpoint final break; } - // TODO: limiter + ReportExternalEndpointUnexpectedExit(TStringBuilder() + << "External endpoint for a disk " << Stats.DiskId.Quote() + << " and a client " << Stats.ClientId.Quote() + << " unexpectedly stopped: " << FormatError(error)); auto process = RestartProcess(); if (!process) { diff --git a/cloud/blockstore/libs/server/config.cpp b/cloud/blockstore/libs/server/config.cpp index 06f2522d8d..6f7d73e6fd 100644 --- a/cloud/blockstore/libs/server/config.cpp +++ b/cloud/blockstore/libs/server/config.cpp @@ -116,11 +116,7 @@ template <> TVector ConvertValue( const google::protobuf::RepeatedPtrField& value) { - TVector v; - for (const auto& x : value) { - v.push_back(x); - } - return v; + return { value.begin(), value.end() }; } template <> @@ -128,7 +124,7 @@ TVector ConvertValue( const google::protobuf::RepeatedPtrField& value) { TVector v; - for (const auto& x : value) { + for (const auto& x: value) { v.push_back({x.GetCertFile(), x.GetCertPrivateKeyFile()}); } return v; diff --git a/cloud/blockstore/public/sdk/python/client/base_client.py b/cloud/blockstore/public/sdk/python/client/base_client.py index 5f4563d535..0bf4cd8fa2 100644 --- a/cloud/blockstore/public/sdk/python/client/base_client.py +++ b/cloud/blockstore/public/sdk/python/client/base_client.py @@ -41,6 +41,8 @@ "execute_action", "kick_endpoint", "cms_action", + "update_disk_registry_config", + "describe_disk_registry_config", ] diff --git a/cloud/blockstore/public/sdk/python/client/safe_client.py b/cloud/blockstore/public/sdk/python/client/safe_client.py index fde8fd7912..226b8946cc 100644 --- a/cloud/blockstore/public/sdk/python/client/safe_client.py +++ b/cloud/blockstore/public/sdk/python/client/safe_client.py @@ -2,6 +2,8 @@ import cloud.blockstore.public.sdk.python.protos as protos +from google.protobuf.json_format import ParseDict + from .error import _handle_errors @@ -901,3 +903,69 @@ def cms_action( trace_id, request_timeout) return response + + @_handle_errors + def update_disk_registry_config_async( + self, + config, + idempotence_id=None, + timestamp=None, + trace_id=None, + request_timeout=None): + + request = ParseDict(config, protos.TUpdateDiskRegistryConfigRequest()) + + return self.__impl.update_disk_registry_config_async( + request, + idempotence_id, + timestamp, + trace_id, + request_timeout) + + @_handle_errors + def update_disk_registry_config( + self, + config, + idempotence_id=None, + timestamp=None, + trace_id=None, + request_timeout=None): + + request = ParseDict(config, protos.TUpdateDiskRegistryConfigRequest()) + + return self.__impl.update_disk_registry_config( + request, + idempotence_id, + timestamp, + trace_id, + request_timeout) + + @_handle_errors + def describe_disk_registry_config_async( + self, + idempotence_id=None, + timestamp=None, + trace_id=None, + request_timeout=None): + + return self.__impl.describe_disk_registry_config_async( + protos.TDescribeDiskRegistryConfigRequest(), + idempotence_id, + timestamp, + trace_id, + request_timeout) + + @_handle_errors + def describe_disk_registry_config( + self, + idempotence_id=None, + timestamp=None, + trace_id=None, + request_timeout=None): + + return self.__impl.describe_disk_registry_config( + protos.TDescribeDiskRegistryConfigRequest(), + idempotence_id, + timestamp, + trace_id, + request_timeout) diff --git a/cloud/blockstore/tests/external_endpoint/test.py b/cloud/blockstore/tests/external_endpoint/test.py new file mode 100644 index 0000000000..28d207815d --- /dev/null +++ b/cloud/blockstore/tests/external_endpoint/test.py @@ -0,0 +1,245 @@ +import os +import pytest +import requests +import stat +import tempfile +import time + +from cloud.blockstore.public.sdk.python.client import CreateClient, \ + ClientError +from cloud.blockstore.public.sdk.python.protos import TCmsActionRequest, \ + TAction, STORAGE_MEDIA_SSD_LOCAL, IPC_VHOST + +from cloud.blockstore.tests.python.lib.config import NbsConfigurator, \ + generate_disk_agent_txt +from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, \ + start_disk_agent, get_fqdn + +import yatest.common as yatest_common + +from contrib.ydb.tests.library.common.yatest_common import PortManager +from contrib.ydb.tests.library.harness.kikimr_runner import get_unique_path_for_current_test, \ + ensure_path_exists + +from library.python.retry import retry + + +DEVICE_SIZE = 1024**2 +KNOWN_DEVICE_POOLS = { + "KnownDevicePools": [ + {"Name": "1Mb", "Kind": "DEVICE_POOL_KIND_LOCAL", "AllocationUnit": DEVICE_SIZE}, + ]} + + +@pytest.fixture(name='data_path') +def create_data_path(): + + p = get_unique_path_for_current_test( + output_path=yatest_common.output_path(), + sub_folder="data") + + p = os.path.join(p, "dev", "disk", "by-partlabel") + ensure_path_exists(p) + + return p + + +@pytest.fixture(autouse=True) +def create_device_files(data_path): + + for i in range(6): + with open(os.path.join(data_path, f"NVMENBS{i + 1:02}"), 'wb') as f: + f.seek(DEVICE_SIZE-1) + f.write(b'\0') + f.flush() + + +@pytest.fixture(name='disk_agent_configurator') +def create_disk_agent_configurator(ydb, data_path): + + configurator = NbsConfigurator(ydb, 'disk-agent') + configurator.generate_default_nbs_configs() + + disk_agent_config = generate_disk_agent_txt( + agent_id='', + device_erase_method='DEVICE_ERASE_METHOD_NONE', # speed up tests + storage_discovery_config={ + "PathConfigs": [{ + "PathRegExp": f"{data_path}/NVMENBS([0-9]+)", + "PoolConfigs": [{ + "PoolName": "1Mb", + "MinSize": DEVICE_SIZE, + "MaxSize": DEVICE_SIZE + }]}] + }) + + configurator.files["disk-agent"] = disk_agent_config + + return configurator + + +@pytest.fixture(name='ydb') +def start_ydb_cluster(): + + ydb_cluster = start_ydb() + + yield ydb_cluster + + ydb_cluster.stop() + + +@pytest.fixture(name='fake_vhost_server') +def setup_fake_vhost_server_script(): + + class Script: + + def __init__(self, port, path): + self.port = port + self.path = path + + port = PortManager().get_port() + binary_path = yatest_common.binary_path( + "cloud/blockstore/tools/testing/fake-vhost-server/fake-vhost-server") + + with tempfile.NamedTemporaryFile(mode='w') as script: + script.write("\n".join([ + "#!/bin/bash", + f"exec {binary_path} --port {port} $@" + ])) + + os.chmod(script.name, os.stat(script.name).st_mode | stat.S_IXUSR) + + script.file.close() + + yield Script(port, script.name) + + +@pytest.fixture(name='nbs') +def start_nbs_daemon(ydb, fake_vhost_server): + + cfg = NbsConfigurator(ydb) + cfg.generate_default_nbs_configs() + + server = cfg.files["server"].ServerConfig + server.VhostEnabled = True + server.VhostServerPath = fake_vhost_server.path + + daemon = start_nbs(cfg) + + yield daemon + + daemon.kill() + + +@pytest.fixture(name='disk_agent') +def start_disk_agent_daemon(ydb, disk_agent_configurator): + + daemon = start_disk_agent(disk_agent_configurator) + + yield daemon + + daemon.kill() + + +@pytest.fixture(autouse=True) +def setup_env(nbs, disk_agent, data_path): + + client = CreateClient(f"localhost:{nbs.port}") + client.execute_action( + action="DiskRegistrySetWritableState", + input_bytes=str.encode('{"State": true}')) + + client.update_disk_registry_config(KNOWN_DEVICE_POOLS) + + disk_agent.wait_for_registration() + + request = TCmsActionRequest() + + for i in range(2): + action = request.Actions.add() + action.Type = TAction.ADD_DEVICE + action.Host = get_fqdn() + action.Device = os.path.join(data_path, f"NVMENBS{i + 1:02}") + + response = client.cms_action(request) + + assert len(response.ActionResults) == 2 + for r in response.ActionResults: + assert r.Result.Code == 0, r + + +def test_external_endpoint(nbs, fake_vhost_server): + + client = CreateClient(f"localhost:{nbs.port}") + + @retry(max_times=10, exception=ClientError) + def create_vol0(): + client.create_volume( + disk_id="vol0", + block_size=4096, + blocks_count=2 * DEVICE_SIZE//4096, + storage_media_kind=STORAGE_MEDIA_SSD_LOCAL, + storage_pool_name="1Mb") + + vhost_server_url = f"http://localhost:{fake_vhost_server.port}" + + @retry(max_times=10, exception=requests.ConnectionError) + def wait_for_vhost_server(): + r = requests.get(f"{vhost_server_url}/ping").text + assert r == "pong" + + create_vol0() + + with tempfile.NamedTemporaryFile() as unix_socket: + r = client.start_endpoint( + unix_socket_path=unix_socket.name, + disk_id="vol0", + ipc_type=IPC_VHOST, + client_id="test" + ) + + assert r["Volume"].DiskId == "vol0" + + wait_for_vhost_server() + + r = requests.get(f"{vhost_server_url}/describe").json() + devices = r["devices"] + assert len(devices) == 2 + + for d in devices: + assert d[0].find("NVMENBS") != -1 + assert d[1] == DEVICE_SIZE + assert d[2] == 0 + + crit = nbs.counters.find({ + 'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit' + }) + assert crit is not None + assert crit['value'] == 0 + + try: + _ = requests.post(f"{vhost_server_url}/terminate", data='{"exit_code": 42}') + except requests.ConnectionError: + pass + + # wait for the counters to be updated + time.sleep(15) + + crit = nbs.counters.find({ + 'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit' + }) + assert crit is not None + assert crit['value'] == 1 + + wait_for_vhost_server() + + client.stop_endpoint(unix_socket.name) + + # wait for the counters to be updated + time.sleep(15) + + crit = nbs.counters.find({ + 'sensor': 'AppCriticalEvents/ExternalEndpointUnexpectedExit' + }) + assert crit is not None + assert crit['value'] == 1 # the same value diff --git a/cloud/blockstore/tests/external_endpoint/ya.make b/cloud/blockstore/tests/external_endpoint/ya.make new file mode 100644 index 0000000000..67253ca4cc --- /dev/null +++ b/cloud/blockstore/tests/external_endpoint/ya.make @@ -0,0 +1,22 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc) + +TEST_SRCS(test.py) + +DEPENDS( + cloud/blockstore/apps/client + cloud/blockstore/apps/disk_agent + cloud/blockstore/apps/server + cloud/blockstore/tools/testing/fake-vhost-server + contrib/ydb/apps/ydbd +) + +PEERDIR( + cloud/blockstore/tests/python/lib + contrib/python/requests/py3 + + library/python/retry +) + +END() diff --git a/cloud/blockstore/tests/ya.make b/cloud/blockstore/tests/ya.make index aaffe07e88..d6b8f0bbc6 100644 --- a/cloud/blockstore/tests/ya.make +++ b/cloud/blockstore/tests/ya.make @@ -8,6 +8,7 @@ ENDIF() RECURSE( client cms + external_endpoint fio functional fuzzing diff --git a/cloud/blockstore/tools/testing/fake-vhost-server/__main__.py b/cloud/blockstore/tools/testing/fake-vhost-server/__main__.py new file mode 100644 index 0000000000..24740524e3 --- /dev/null +++ b/cloud/blockstore/tools/testing/fake-vhost-server/__main__.py @@ -0,0 +1,248 @@ +import argparse +import json +import logging +import signal +import sys + +from datetime import datetime +from http.server import HTTPServer, BaseHTTPRequestHandler +from threading import Thread + + +def _prepare_logging(verbose): + log_level = { + "error": logging.ERROR, + "warn": logging.WARNING, + "info": logging.INFO, + "debug": logging.DEBUG, + }[verbose] + + logging.basicConfig( + stream=sys.stderr, + level=log_level, + format="[%(levelname)s] [%(asctime)s] %(message)s") + + +class _DeviceChunk: + + def __init__(self, s: str): + i = s.rfind(':') + if i == -1 or i == 0: + raise argparse.ArgumentTypeError(f"invalid format: {s}") + + j = s[0:i-1].rfind(':') + if j == -1: + raise argparse.ArgumentTypeError(f"invalid format: {s}") + + self.offset = int(s[i+1:]) + self.size = int(s[j+1:i]) + self.path = s[:j] + + +class _App: + + def __init__(self): + self.exit_code = 0 + + def terminate(self, exit_code): + self.exit_code = exit_code + exit(exit_code) + + +def _create_handler(args, app: _App): + + class Handler(BaseHTTPRequestHandler): + + def do_GET(self): + self.log_request() + + if self.path == '/ping': + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write('pong'.encode('utf-8')) + return + + if self.path == '/describe': + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({ + "devices": [[d.path, d.size, d.offset] for d in args.device] + }).encode('utf-8')) + return + + self.send_error(400) + + def do_POST(self): + self.log_request() + + length = int(self.headers['content-length']) + req = self.rfile.read(length).decode('utf-8') + + if self.path == '/terminate': + data = json.loads(req) + + exit_code = data.get('exit_code', -1) + logging.info(f'exit with {exit_code}...') + app.terminate(exit_code) + return + + self.send_error(400) + + return Handler + + +def _run_server(args): + app = _App() + + server = HTTPServer(('localhost', args.port), _create_handler(args, app)) + + t0 = datetime.now() + + def signal_handler(signum, frame): + if signum == signal.SIGUSR1: + # TODO + print(json.dumps({ + "elapsed_ms": int((datetime.now() - t0).total_seconds() * 1000) + })) + return + + logging.info('shutdown...') + server.shutdown() + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGUSR1, signal_handler) + + thread = Thread(target=server.serve_forever, name='server') + + logging.info('start...') + + thread.start() + thread.join() + + logging.info('done') + + return app.exit_code + + +def main(): + parser = argparse.ArgumentParser() + + # custom options + + parser.add_argument('--port', type=int, required=True) + + # vhost-server options + + parser.add_argument( + "-v", + '--verbose', + help="output level for diagnostics messages", + default="debug", + type=str) + + parser.add_argument("-s", '--socket-path', type=str, metavar="FILE", required=True) + + parser.add_argument( + "-i", + "--serial", + type=str, + metavar="STR", + help="disk serial", + required=True) + + parser.add_argument("--disk-id", type=str, metavar="STR", help="disk id") + + parser.add_argument( + "--client-id", + type=str, + metavar="STR", + help="client id", + default="vhost-server") + + parser.add_argument( + "--device", + required=True, + type=_DeviceChunk, + metavar="STR", + action="append", + help="specify device string path:size:offset " + "(e.g. /dev/vda:1000000:0, rdma://host:10020/abcdef:1000000:0)") + + parser.add_argument( + "--device-backend", + type=str, + metavar="STR", + help="specify device backend", + choices=["aio", "rdma", "null"], + default="aio") + + parser.add_argument( + "-r", + "--read-only", + action='store_true', + help="read only mode") + + parser.add_argument( + "--no-sync", + action='store_true', + help="do not use O_SYNC") + + parser.add_argument( + "--no-chmod", + action='store_true', + help="do not chmod socket") + + parser.add_argument( + "-B", + "--batch-size", + type=int, + metavar="INT", + default=1024) + + parser.add_argument( + "--block-size", + help="size of block device", + type=int, + metavar="INT", + default=512) + + parser.add_argument( + "-q", + "--queue-count", + type=int, + metavar="INT", + default=0) + + parser.add_argument( + "--log-type", + type=str, + metavar="STR", + choices=["json", "console"], + default="json") + + parser.add_argument( + "--rdma-queue-size", + help="Rdma client queue size", + type=int, + metavar="INT", + default=256) + + parser.add_argument( + "--rdma-max-buffer-size", + help="Rdma client queue size", + type=int, + metavar="INT", + default=4*1024**2 + 4096) + + args = parser.parse_args() + + _prepare_logging(args.verbose) + + return _run_server(args) + + +if __name__ == '__main__': + exit(main()) diff --git a/cloud/blockstore/tools/testing/fake-vhost-server/ya.make b/cloud/blockstore/tools/testing/fake-vhost-server/ya.make new file mode 100644 index 0000000000..d3481ed771 --- /dev/null +++ b/cloud/blockstore/tools/testing/fake-vhost-server/ya.make @@ -0,0 +1,10 @@ +PY3_PROGRAM() + +PEERDIR( +) + +PY_SRCS( + __main__.py +) + +END() diff --git a/cloud/blockstore/tools/testing/ya.make b/cloud/blockstore/tools/testing/ya.make index 30f57cde32..7ca46d879d 100644 --- a/cloud/blockstore/tools/testing/ya.make +++ b/cloud/blockstore/tools/testing/ya.make @@ -4,6 +4,7 @@ RECURSE( eternal_tests fake-conductor fake-nbs + fake-vhost-server generate-agents infra-client loadtest