diff --git a/osbenchmark/results_publisher.py b/osbenchmark/results_publisher.py index 32b2e4ec..4b3ebebd 100644 --- a/osbenchmark/results_publisher.py +++ b/osbenchmark/results_publisher.py @@ -27,6 +27,8 @@ import io import logging import sys +import re +from enum import Enum import tabulate @@ -43,6 +45,11 @@ ------------------------------------------------------ """ +class Throughput(Enum): + MEAN = "mean" + MAX = "max" + MIN = "min" + MEDIAN = "median" def summarize(results, cfg): SummaryResultsPublisher(results, cfg).publish() @@ -126,6 +133,17 @@ def __init__(self, results, config): "throughput":comma_separated_string_to_number_list(config.opts("workload", "throughput.percentiles", mandatory=False)), "latency": comma_separated_string_to_number_list(config.opts("workload", "latency.percentiles", mandatory=False)) } + self.logger = logging.getLogger(__name__) + + def publish_operational_statistics(self, metrics_table: list, warnings: list, record, task): + metrics_table.extend(self._publish_throughput(record, task)) + metrics_table.extend(self._publish_latency(record, task)) + metrics_table.extend(self._publish_service_time(record, task)) + # this is mostly needed for debugging purposes but not so relevant to end users + if self.show_processing_time: + metrics_table.extend(self._publish_processing_time(record, task)) + metrics_table.extend(self._publish_error_rate(record, task)) + self.add_warnings(warnings, record, task) def publish(self): print_header(FINAL_SCORE) @@ -145,16 +163,33 @@ def publish(self): metrics_table.extend(self._publish_transform_stats(stats)) + # These variables are used with the clients_list parameter in test_procedures to find the max throughput. + max_throughput = -1 + record_with_best_throughput = None + + throughput_pattern = r"_(\d+)_clients$" + + for record in stats.op_metrics: task = record["task"] - metrics_table.extend(self._publish_throughput(record, task)) - metrics_table.extend(self._publish_latency(record, task)) - metrics_table.extend(self._publish_service_time(record, task)) - # this is mostly needed for debugging purposes but not so relevant to end users - if self.show_processing_time: - metrics_table.extend(self._publish_processing_time(record, task)) - metrics_table.extend(self._publish_error_rate(record, task)) - self.add_warnings(warnings, record, task) + is_task_part_of_throughput_testing = re.search(throughput_pattern, task) + if is_task_part_of_throughput_testing: + # assumption: all units are the same and only maximizing throughput over one operation (i.e. not both ingest and search). + # To maximize throughput over multiple operations, would need a list/dictionary of maximum throughputs. + task_throughput = record["throughput"][Throughput.MEAN.value] + self.logger.info("Task %s has throughput %s", task, task_throughput) + if task_throughput > max_throughput: + max_throughput = task_throughput + record_with_best_throughput = record + + else: + self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record, task=task) + + # The following code is run when the clients_list parameter is specified and publishes the max throughput. + if max_throughput != -1 and record_with_best_throughput is not None: + self.publish_operational_statistics(metrics_table=metrics_table, warnings=warnings, record=record_with_best_throughput, + task=record_with_best_throughput["task"]) + metrics_table.extend(self._publish_best_client_settings(record_with_best_throughput, record_with_best_throughput["task"])) for record in stats.correctness_metrics: task = record["task"] @@ -217,6 +252,10 @@ def _publish_recall(self, values, task): self._line("Mean recall@1", task, recall_1_mean, "", lambda v: "%.2f" % v) ) + def _publish_best_client_settings(self, record, task): + num_clients = re.search(r"_(\d+)_clients$", task).group(1) + return self._join(self._line("Number of clients that achieved max throughput", "", num_clients, "")) + def _publish_percentiles(self, name, task, value, unit="ms"): lines = [] percentiles = self.display_percentiles.get(name, metrics.GlobalStatsCalculator.OTHER_PERCENTILES) diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index a84c0755..0d811423 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1596,11 +1596,25 @@ def _create_test_procedures(self, workload_spec): schedule = [] for op in self._r(test_procedure_spec, "schedule", error_ctx=name): - if "parallel" in op: - task = self.parse_parallel(op["parallel"], ops, name) + if "clients_list" in op: + self.logger.info("Clients list specified: %s. Running multiple search tasks, "\ + "each scheduled with the corresponding number of clients from the list.", op["clients_list"]) + for num_clients in op["clients_list"]: + op["clients"] = num_clients + + new_name = self._rename_task_based_on_num_clients(name, num_clients) + + new_name = name + "_" + str(num_clients) + "_clients" + new_task = self.parse_task(op, ops, new_name) + new_task.name = new_name + schedule.append(new_task) else: - task = self.parse_task(op, ops, name) - schedule.append(task) + if "parallel" in op: + task = self.parse_parallel(op["parallel"], ops, name) + else: + task = self.parse_task(op, ops, name) + + schedule.append(task) # verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing). known_task_names = set() @@ -1635,6 +1649,18 @@ def _create_test_procedures(self, workload_spec): % ", ".join([c.name for c in test_procedures])) return test_procedures + def _rename_task_based_on_num_clients(self, name: str, num_clients: int) -> str: + has_underscore = "_" in name + has_hyphen = "-" in name + if has_underscore and has_hyphen: + self.logger.warning("The test procedure name %s contains a mix of _ and -. "\ + "Consider changing the name to avoid frustrating bugs in the future.", name) + return name + "_" + str(num_clients) + "_clients" + elif has_hyphen: + return name + "-" + str(num_clients) + "-clients" + else: + return name + "_" + str(num_clients) + "_clients" + def _get_test_procedure_specs(self, workload_spec): schedule = self._r(workload_spec, "schedule", mandatory=False) test_procedure = self._r(workload_spec, "test_procedure", mandatory=False) diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index eeccc14e..285e5277 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -2477,6 +2477,70 @@ def test_parse_unique_task_names(self): self.assertEqual("search-two-clients", schedule[1].name) self.assertEqual("search", schedule[1].operation.name) + def test_parse_clients_list(self): + workload_specification = { + "description": "description for unit test", + "operations": [ + { + "name": "search", + "operation-type": "search", + "index": "_all" + } + ], + "test_procedure": { + "name": "default-test-procedure", + "schedule": [ + { + "name": "search-one-client", + "operation": "search", + "clients": 1, + "clients_list": [1,2,3] + }, + { + "name": "search-two-clients", + "operation": "search", + "clients": 2 + } + ] + } + } + + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test-procedure") + resulting_workload = reader("unittest", workload_specification, "/mappings") + self.assertEqual("unittest", resulting_workload.name) + test_procedure = resulting_workload.test_procedures[0] + self.assertTrue(test_procedure.selected) + schedule = test_procedure.schedule + self.assertEqual(4, len(schedule)) + + self.assertEqual("default-test-procedure_1_clients", schedule[0].name) + self.assertEqual("search", schedule[0].operation.name) + self.assertEqual("default-test-procedure_2_clients", schedule[1].name) + self.assertEqual("search", schedule[1].operation.name) + self.assertEqual("default-test-procedure_3_clients", schedule[2].name) + self.assertEqual("search", schedule[2].operation.name) + + self.assertEqual("search-two-clients", schedule[3].name) + self.assertEqual("search", schedule[3].operation.name) + # pylint: disable=W0212 + def test_naming_with_clients_list(self): + reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + # Test case 1: name contains both "_" and "-" + result = reader._rename_task_based_on_num_clients("test_name-task", 5) + self.assertEqual(result, "test_name-task_5_clients") + + # Test case 2: name contains only "-" + result = reader._rename_task_based_on_num_clients("test-name", 3) + self.assertEqual(result, "test-name-3-clients") + + # Test case 3: name contains only "_" + result = reader._rename_task_based_on_num_clients("test_name", 2) + self.assertEqual(result, "test_name_2_clients") + + # Test case 4: name contains neither "_" nor "-" + result = reader._rename_task_based_on_num_clients("testname", 1) + self.assertEqual(result, "testname_1_clients") + def test_parse_indices_valid_workload_specification(self): workload_specification = { "description": "description for unit test",