diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index 0e990609..f7af9cfd 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -9,7 +9,6 @@ from abc import ABC, ABCMeta, abstractmethod from enum import Enum from typing import cast - import h5py import numpy as np @@ -159,6 +158,20 @@ def parse_context(context: Context) -> str: raise Exception("Unsupported context") +def context_string_to_context(context_string: str) -> Context: + if context_string == "neighbors": + return Context.NEIGHBORS + elif context_string == "train": + return Context.INDEX + elif context_string == "test": + return Context.QUERY + elif context_string == "max_distance_neighbors": + return Context.MAX_DISTANCE_NEIGHBORS + elif context_string == "min_score_neighbors": + return Context.MIN_SCORE_NEIGHBORS + else: + raise ValueError(f"Invalid context string: {context_string}") + class BigANNDataSet(DataSet): diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 7d8d69de..93bd1521 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -515,7 +515,6 @@ async def __call__(self, opensearch, params): if not detailed_results: opensearch.return_raw_response() request_context_holder.on_client_request_start() - if with_action_metadata: api_kwargs.pop("index", None) # only half of the lines are documents diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index 0d811423..35ea49a3 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1477,6 +1477,12 @@ def _create_corpora(self, corpora_specs, indices, data_streams): default_value=workload.Documents.SOURCE_FORMAT_BULK) default_action_and_meta_data = self._r(corpus_spec, "includes-action-and-meta-data", mandatory=False, default_value=False) + default_generate_increasing_vector_ids = self._r(corpus_spec, "generate-increasing-vector-ids", mandatory=False, + default_value=False) + default_id_field_name = self._r(corpus_spec, "id-field-name", mandatory=False, + default_value=None) + default_vector_field_name = self._r(corpus_spec, "vector-field-name", mandatory=False, + default_value=None) corpus_target_idx = None corpus_target_ds = None corpus_target_type = None @@ -1518,6 +1524,12 @@ def _create_corpora(self, corpora_specs, indices, data_streams): includes_action_and_meta_data = self._r(doc_spec, "includes-action-and-meta-data", mandatory=False, default_value=default_action_and_meta_data) + generate_increasing_vector_ids = self._r(doc_spec, "generate-increasing-vector-ids", mandatory=False, + default_value=default_generate_increasing_vector_ids) + id_field_name = self._r(doc_spec, "id-field-name", mandatory=False, + default_value=default_id_field_name) + vector_field_name = self._r(doc_spec, "vector-field-name", mandatory=False, + default_value=default_vector_field_name) if includes_action_and_meta_data: target_idx = None target_type = None @@ -1558,6 +1570,9 @@ def _create_corpora(self, corpora_specs, indices, data_streams): base_url=base_url, source_url=source_url, includes_action_and_meta_data=includes_action_and_meta_data, + generate_increasing_vector_ids=generate_increasing_vector_ids, + id_field_name = id_field_name, + vector_field_name = vector_field_name, number_of_documents=num_docs, compressed_size_in_bytes=compressed_bytes, uncompressed_size_in_bytes=uncompressed_bytes, diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 92cc61c9..8bb71852 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -34,12 +34,13 @@ from abc import ABC, abstractmethod from enum import Enum from typing import List, Dict, Any, Optional, Tuple +import json import numpy as np from osbenchmark import exceptions from osbenchmark.utils import io -from osbenchmark.utils.dataset import DataSet, get_data_set, Context +from osbenchmark.utils.dataset import DataSet, get_data_set, Context, HDF5DataSet from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter from osbenchmark.workload import workload @@ -643,12 +644,21 @@ def __init__(self, workload, params, **kwargs): for corpus in self.corpora: for document_set in corpus.documents: - if document_set.includes_action_and_meta_data and self.id_conflicts != IndexIdConflict.NoConflicts: + if (document_set.includes_action_and_meta_data and self.id_conflicts != IndexIdConflict.NoConflicts) or ( + document_set.generate_increasing_vector_ids and self.id_conflicts != IndexIdConflict.NoConflicts + ): file_name = document_set.document_archive if document_set.has_compressed_corpus() else document_set.document_file raise exceptions.InvalidSyntax("Cannot generate id conflicts [%s] as [%s] in document corpus [%s] already contains an " "action and meta-data line." % (id_conflicts, file_name, corpus)) + if document_set.includes_action_and_meta_data and document_set.generate_increasing_vector_ids: + file_name = document_set.document_archive if document_set.has_compressed_corpus() else document_set.document_file + raise exceptions.InvalidSyntax( + """Cannot specify generate_increasing_vector_ids=True and includes_action_and_meta_data=True + for file [%s] in document corpus [%s].""" + % (file_name, corpus) + ) self.pipeline = params.get("pipeline", None) try: self.bulk_size = int(params["bulk-size"]) @@ -671,6 +681,7 @@ def __init__(self, workload, params, **kwargs): raise exceptions.InvalidSyntax("'batch-size' must be numeric") self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100) + self.param_source = PartitionBulkIndexParamSource(self.corpora, self.batch_size, self.bulk_size, self.ingest_percentage, self.id_conflicts, self.conflict_probability, self.on_conflict, @@ -691,15 +702,16 @@ def used_corpora(self, t, params): corpora = [] workload_corpora_names = [corpus.name for corpus in t.corpora] corpora_names = params.get("corpora", workload_corpora_names) + source_format = params.get("source_format", workload.Documents.SOURCE_FORMAT_BULK) if isinstance(corpora_names, str): corpora_names = [corpora_names] - for corpus in t.corpora: if corpus.name in corpora_names: - filtered_corpus = corpus.filter(source_format=workload.Documents.SOURCE_FORMAT_BULK, + filtered_corpus = corpus.filter(source_format=source_format, target_indices=params.get("indices"), target_data_streams=params.get("data-streams")) - if filtered_corpus.number_of_documents(source_format=workload.Documents.SOURCE_FORMAT_BULK) > 0: + + if filtered_corpus.number_of_documents(source_format=source_format) > 0: corpora.append(filtered_corpus) # the workload has corpora but none of them match @@ -753,6 +765,8 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic # use a value > 0 so percent_completed returns a sensible value self.total_bulks = 1 self.infinite = False + self.dataset_context = original_params.get("vector_dataset_context", None) + logging.getLogger(__name__).info("dataset context: %s", self.dataset_context) def partition(self, partition_index, total_partitions): if self.total_partitions is None: @@ -770,6 +784,7 @@ def params(self): if self.current_bulk == self.total_bulks: raise StopIteration() self.current_bulk += 1 + return next(self.internal_params) def _init_internal_params(self): @@ -777,11 +792,12 @@ def _init_internal_params(self): self.partitions = sorted(self.partitions) start_index = self.partitions[0] end_index = self.partitions[-1] - + # must convert file into corpus to use bulk API. + # self.internal_params is now a generator of different bulks. self.internal_params = bulk_data_based(self.total_partitions, start_index, end_index, self.corpora, self.batch_size, self.bulk_size, self.id_conflicts, self.conflict_probability, self.on_conflict, self.recency, - self.pipeline, self.original_params, self.create_reader) + self.pipeline, self.original_params, self.create_reader, self.dataset_context) all_bulks = number_of_bulks(self.corpora, start_index, end_index, self.total_partitions, self.bulk_size) self.total_bulks = math.ceil((all_bulks * self.ingest_percentage) / 100) @@ -972,6 +988,7 @@ def partition(self, partition_index, total_partitions): partition_x.num_vectors += remaining_vectors # We need to create a new instance of the data set for each client + # Here3 partition_x.data_set = get_data_set( self.data_set_format, self.data_set_path, @@ -1532,8 +1549,7 @@ def chain(*iterables): def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, conflict_probability, - on_conflict, recency): - source = Slice(io.MmapSource, offset, num_lines) + on_conflict, recency, dataset_context=None): target = None use_create = False if docs.target_index: @@ -1545,8 +1561,20 @@ def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_si # can only create docs in data streams raise exceptions.BenchmarkError("Conflicts cannot be generated with append only data streams") - if docs.includes_action_and_meta_data: - return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, target, docs.target_type) + if dataset_context is None: + dataset_context = "index" + if docs.source_format == workload.Documents.SOURCE_FORMAT_HDF5: + if docs.generate_increasing_vector_ids: + source = HDF5SourceGenerateIds(offset, num_lines, dataset_context, target, docs.id_field_name, docs.vector_field_name) + else: + source = HDF5Source(offset, num_lines, dataset_context, target, docs.id_field_name, docs.vector_field_name) + else: + source = Slice(io.MmapSource, offset, num_lines) + + if docs.includes_action_and_meta_data or docs.generate_increasing_vector_ids: + return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, target, docs.target_type, + docs.generate_increasing_vector_ids) + else: am_handler = GenerateActionMetaData(target, docs.target_type, build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability, @@ -1555,7 +1583,7 @@ def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_si def create_readers(num_clients, start_client_index, end_client_index, corpora, batch_size, bulk_size, id_conflicts, - conflict_probability, on_conflict, recency, create_reader): + conflict_probability, on_conflict, recency, create_reader, dataset_context=None): logger = logging.getLogger(__name__) readers = [] for corpus in corpora: @@ -1570,7 +1598,7 @@ def create_readers(num_clients, start_client_index, end_client_index, corpora, b "from corpus [%s].", start_client_index, end_client_index, num_docs, offset, target, corpus.name) readers.append(create_reader(docs, offset, num_lines, num_docs, batch_size, bulk_size, id_conflicts, - conflict_probability, on_conflict, recency)) + conflict_probability, on_conflict, recency, dataset_context)) else: logger.info("Task-relative clients at index [%d-%d] skip [%s] (no documents to read).", start_client_index, end_client_index, corpus.name) @@ -1630,7 +1658,8 @@ def bulk_generator(readers, pipeline, original_params): def bulk_data_based(num_clients, start_client_index, end_client_index, corpora, batch_size, bulk_size, id_conflicts, - conflict_probability, on_conflict, recency, pipeline, original_params, create_reader=create_default_reader): + conflict_probability, on_conflict, recency, pipeline, original_params, create_reader=create_default_reader, + context=None): """ Calculates the necessary schedule for bulk operations. @@ -1654,7 +1683,7 @@ def bulk_data_based(num_clients, start_client_index, end_client_index, corpora, :return: A generator for the bulk operations of the given client. """ readers = create_readers(num_clients, start_client_index, end_client_index, corpora, batch_size, bulk_size, - id_conflicts, conflict_probability, on_conflict, recency, create_reader) + id_conflicts, conflict_probability, on_conflict, recency, create_reader, context) return bulk_generator(chain(*readers), pipeline, original_params) @@ -1779,6 +1808,122 @@ def __str__(self): return "%s[%d;%d]" % (self.source, self.offset, self.offset + self.number_of_lines) +class HDF5Source: + def __init__(self, offset, number_of_lines, dataset_context, index, id_field_name, vector_field_name): + self.offset = offset + self.number_of_lines = number_of_lines + self.current_line = 0 + self.dataset_context = dataset_context + self.index_name = index + self.id_field_name = id_field_name + self.vector_field_name : str = vector_field_name + self.logger = logging.getLogger(__name__) + self.dataset = None + + def open(self, file_name, mode, bulk_size): + self.source: HDF5DataSet = HDF5DataSet(file_name, Context.INDEX) + + # bulk size used in the read method + self.bulk_size = bulk_size + + if self.offset >= self.source.size(): + self.logger.error("Offset [%d] is out of range for dataset of size [%d].", self.offset, self.source.size()) + raise IndexError(f"Offset {self.offset} is out of range for dataset of size {self.source.size()}.") + + self.logger.info("Will read [%d] lines from [%s] starting from line [%d] with bulk size [%d].", + self.number_of_lines, file_name, self.offset, self.bulk_size) + + start = time.perf_counter() + self.source.seek(self.offset) + end = time.perf_counter() + + self.logger.debug("Skipping [%d] lines took [%f] s.", self.offset, end - start) + return self + + def close(self): + self.logger.info("Unimplemented HDF5 close.") + self.source = None + + def __iter__(self): + return self + + def encode_dict(self, input: Dict): + return (json.dumps(input) + "\n").encode("utf-8") + + def __next__(self): + if self.current_line >= self.number_of_lines: + raise StopIteration() + else: + vectors = self.source.read(min(self.bulk_size, self.number_of_lines - self.current_line)) + + if len(vectors) == 0: + raise StopIteration() + + to_return = [self.encode_dict({self.vector_field_name : vector.tolist() }) for vector in vectors] + + self.current_line += len(vectors) + + return to_return + +class HDF5SourceGenerateIds(HDF5Source): + def __init__(self, offset, number_of_lines, dataset_context, index, id_field_name, vector_field_name, attributes_list=None): + super().__init__(offset, number_of_lines, dataset_context, index, id_field_name, vector_field_name) + self.DEFAULT_ID_FIELD_NAME = "_id" + + def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]: + """Partitions and transforms a list of vectors into OpenSearch's bulk + injection format. + Args: + offset: to start counting from + partition: An array of vectors to transform. + action: Bulk API action. + Returns: + An array of transformed vectors in bulk format. + """ + self.current = self.current_line + + actions = [] + _ = [ + actions.extend([action(self.id_field_name, i + self.current + self.offset), None]) + for i in range(len(partition)) + ] + bulk_contents = [] + add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME + for vec, identifier in zip(partition.tolist(), range(self.current, self.current + len(partition))): + row = {self.vector_field_name: vec} + if add_id_field_to_body: + row.update({self.id_field_name: identifier}) + bulk_contents.append(self.encode_dict(row)) + actions[1::2] = bulk_contents + return actions + + def __next__(self): + if self.current_line >= self.number_of_lines: + raise StopIteration() + else: + + def action(id_field_name, doc_id): + # support only index operation + bulk_action = 'index' + metadata = { + '_index': self.index_name + } + # Add id field to metadata only if it is _id + if id_field_name == self.DEFAULT_ID_FIELD_NAME: + metadata.update({id_field_name: doc_id}) + return self.encode_dict({bulk_action: metadata}) + + # ensure we don't read past the allowed number of lines. + vectors = self.source.read(min(self.bulk_size, self.number_of_lines - self.current_line)) + + if len(vectors) == 0: + raise StopIteration() + + to_return = self.bulk_transform(vectors, action) + + self.current_line += len(vectors) + return to_return + class IndexDataReader: """ Reads a file in bulks into an array and also adds a meta-data line before each document if necessary. @@ -1869,7 +2014,9 @@ def _read_bulk_regular(self): action_metadata_item = next(self.action_metadata) if action_metadata_item: action_type, action_metadata_line = action_metadata_item + current_bulk.append(action_metadata_line.encode("utf-8")) + if action_type == "update": # remove the trailing "\n" as the doc needs to fit on one line doc = doc.strip() @@ -1882,10 +2029,15 @@ def _read_bulk_regular(self): class SourceOnlyIndexDataReader(IndexDataReader): - def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name): - # keep batch size as it only considers documents read, not lines read but increase the bulk size as - # documents are only on every other line. - super().__init__(data_file, batch_size, bulk_size * 2, file_source, index_name, type_name) + def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name, is_vector_generate_ids=False): + if is_vector_generate_ids: + # in this case, the source contains exclusively vectors and the HDF5SourceGenerateIds object + # will generate id metadata for each line of the bulk. + super().__init__(data_file, batch_size, bulk_size, file_source, index_name, type_name) + else: + # keep batch size as it only considers documents read, not lines read but increase the bulk size as + # documents are only on every other line. + super().__init__(data_file, batch_size, bulk_size * 2, file_source, index_name, type_name) def read_bulk(self): bulk_items = next(self.file_source) diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 848cb5b3..d0fa9c31 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -191,7 +191,7 @@ class Documents: SUPPORTED_SOURCE_FORMAT = [SOURCE_FORMAT_BULK, SOURCE_FORMAT_HDF5, SOURCE_FORMAT_BIG_ANN] def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, source_url=None, - includes_action_and_meta_data=False, + includes_action_and_meta_data=False, generate_increasing_vector_ids=False, id_field_name=None, vector_field_name=None, number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None, target_data_stream=None, target_type=None, meta_data=None): """ @@ -227,6 +227,9 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas self.base_url = base_url self.source_url = source_url self.includes_action_and_meta_data = includes_action_and_meta_data + self.generate_increasing_vector_ids = generate_increasing_vector_ids + self.id_field_name = id_field_name + self.vector_field_name = vector_field_name self._number_of_documents = number_of_documents self._compressed_size_in_bytes = compressed_size_in_bytes self._uncompressed_size_in_bytes = uncompressed_size_in_bytes @@ -296,19 +299,61 @@ def __repr__(self): return ", ".join(r) def __hash__(self): - return hash(self.source_format) ^ hash(self.document_file) ^ hash(self.document_archive) ^ hash(self.base_url) ^ \ - hash(self.source_url) ^ hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ \ - hash(self.compressed_size_in_bytes) ^ hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ \ - hash(self.target_data_stream) ^ hash(self.target_type) ^ hash(frozenset(self.meta_data.items())) + return ( + hash(self.source_format) + ^ hash(self.document_file) + ^ hash(self.document_archive) + ^ hash(self.base_url) + ^ hash(self.source_url) + ^ hash(self.includes_action_and_meta_data) + ^ hash(self.id_field_name) + ^ hash(self.vector_field_name) + ^ hash(self.generate_increasing_vector_ids) + ^ hash(self.number_of_documents) + ^ hash(self.compressed_size_in_bytes) + ^ hash(self.uncompressed_size_in_bytes) + ^ hash(self.target_index) + ^ hash(self.target_data_stream) + ^ hash(self.target_type) + ^ hash(frozenset(self.meta_data.items())) + ) def __eq__(self, othr): - return (isinstance(othr, type(self)) and - (self.source_format, self.document_file, self.document_archive, self.base_url, self.source_url, - self.includes_action_and_meta_data, self.number_of_documents, self.compressed_size_in_bytes, - self.uncompressed_size_in_bytes, self.target_type, self.target_data_stream, self.target_type, self.meta_data) == - (othr.source_format, othr.document_file, othr.document_archive, othr.base_url, self.source_url, - othr.includes_action_and_meta_data, othr.number_of_documents, othr.compressed_size_in_bytes, - othr.uncompressed_size_in_bytes, othr.target_type, othr.target_data_stream, othr.target_type, othr.meta_data)) + return isinstance(othr, type(self)) and ( + self.source_format, + self.document_file, + self.document_archive, + self.base_url, + self.source_url, + self.includes_action_and_meta_data, + self.generate_increasing_vector_ids, + self.id_field_name, + self.vector_field_name, + self.number_of_documents, + self.compressed_size_in_bytes, + self.uncompressed_size_in_bytes, + self.target_type, + self.target_data_stream, + self.target_type, + self.meta_data, + ) == ( + othr.source_format, + othr.document_file, + othr.document_archive, + othr.base_url, + self.source_url, + othr.includes_action_and_meta_data, + othr.generate_increasing_vector_ids, + othr.id_field_name, + othr.vector_field_name, + othr.number_of_documents, + othr.compressed_size_in_bytes, + othr.uncompressed_size_in_bytes, + othr.target_type, + othr.target_data_stream, + othr.target_type, + othr.meta_data, + ) class DocumentCorpus: diff --git a/tests/utils/dataset_test.py b/tests/utils/dataset_test.py index 02f89d53..a765a95c 100644 --- a/tests/utils/dataset_test.py +++ b/tests/utils/dataset_test.py @@ -6,7 +6,7 @@ import tempfile from unittest import TestCase -from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet +from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet, context_string_to_context from osbenchmark.utils.parse import ConfigurationError from tests.utils.dataset_helper import create_data_set, create_ground_truth @@ -66,3 +66,28 @@ def testBigANNGroundTruthAsAcceptableDataSetFormat(self): def testUnSupportedDataSetFormat(self): with self.assertRaises(ConfigurationError) as _: get_data_set("random", "/some/path", Context.INDEX) + +class TestContextStringToContext(TestCase): + def test_neighbors_context(self): + context = context_string_to_context("neighbors") + self.assertEqual(context, Context.NEIGHBORS) + + def test_index_context(self): + context = context_string_to_context("train") + self.assertEqual(context, Context.INDEX) + + def test_query_context(self): + context = context_string_to_context("test") + self.assertEqual(context, Context.QUERY) + + def test_max_distance_neighbors_context(self): + context = context_string_to_context("max_distance_neighbors") + self.assertEqual(context, Context.MAX_DISTANCE_NEIGHBORS) + + def test_min_score_neighbors_context(self): + context = context_string_to_context("min_score_neighbors") + self.assertEqual(context, Context.MIN_SCORE_NEIGHBORS) + + def test_invalid_context_string(self): + with self.assertRaises(ValueError): + context_string_to_context("invalid_string") diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 51921f5f..4402b8e9 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -27,6 +27,8 @@ import shutil import tempfile from unittest import TestCase +import re +import json import numpy as np @@ -1159,6 +1161,591 @@ def test_create_with_conflict_probability_not_numeric(self): self.assertEqual("'conflict-probability' must be numeric", ctx.exception.args[0]) +class HDF5SourceTests(TestCase): + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + # DEFAULT_CONTEXT = "train" + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + DEFAULT_ID_FIELD_NAME = "_id" + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_hdf5_source_with_source_larger_than_slice(self): + # Create a dataset with DEFAULT_NUM_VECTORS vectors of DEFAULT_DIMENSION dimension + data_set_path = create_data_set( + num_vectors=self.DEFAULT_NUM_VECTORS, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5Source( + offset=2, + number_of_lines=5, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + source.open(data_set_path, "r", 2) # Open with bulk size of 2 + + # Collect the data read from the source + result = list(next(source)) + + # Define the regular expression to match the JSON output with float values + float_vector_regex = re.compile( + r'\{"' + + re.escape(self.DEFAULT_FIELD_NAME) + + r'": \[([-+]?\d*\.\d+|\d+)(, [-+]?\d*\.\d+|\d+)*\]\}\n' + ) + + # Assert that each line in the result matches the expected pattern + for line in result: + # line_str = line.decode() + + self.assertRegex(line.decode(), float_vector_regex) + + source.close() + + def test_hdf5_source_with_slice_larger_than_source(self): + # Create a dataset with 3 vectors of DEFAULT_DIMENSION + data_set_path = create_data_set( + num_vectors=3, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5Source( + offset=0, + number_of_lines=10, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + source.open(data_set_path, "r", 5) # Open with bulk size of 5 + + result = list(next(source)) + # Replace with actual expected vector values + # expected_result = [ + # b'{"' + self.DEFAULT_FIELD_NAME.encode() + b'": [some_values]}\n', + # # Add more expected results according to the dataset created + # ] + + self.assertEqual(len(result), 3) + source.close() + + def test_hdf5_source_with_offset_greater_than_dataset_size(self): + # Create a dataset with 3 vectors of DEFAULT_DIMENSION + data_set_path = create_data_set( + num_vectors=3, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5Source( + offset=10, + number_of_lines=5, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + with self.assertRaises(IndexError): + source.open(data_set_path, "r", 2) # Open with bulk size of 2 + + def test_hdf5_source_happy_path(self): + # Create a valid dataset with DEFAULT_NUM_VECTORS vectors of DEFAULT_DIMENSION dimension + data_set_path = create_data_set( + num_vectors=self.DEFAULT_NUM_VECTORS, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5Source( + offset=0, + number_of_lines=self.DEFAULT_NUM_VECTORS, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + source.open(data_set_path, "r", 5) # Open with a bulk size of 5 + + # Collect the data read from the source + result = list(next(source)) + + # Define the regular expression to match the JSON output with float values + float_vector_regex = re.compile( + r'\{"' + + re.escape(self.DEFAULT_FIELD_NAME) + + r'": \[([-+]?\d*\.\d+|\d+)(, [-+]?\d*\.\d+|\d+)*\]\}\n' + ) + + # Assert that each line in the result matches the expected pattern + for line in result: + # line_str = line.decode() + + self.assertRegex(line.decode(), float_vector_regex) + + source.close() + + +class HDF5SourceGenerateIdsTests(TestCase): + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_ID_FIELD_NAME = "_id" + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_hdf5_source_generate_ids_with_source_larger_than_slice(self): + # Create a dataset with DEFAULT_NUM_VECTORS vectors of DEFAULT_DIMENSION dimension + data_set_path = create_data_set( + num_vectors=self.DEFAULT_NUM_VECTORS, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5SourceGenerateIds( + offset=2, + number_of_lines=5, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + source.open(data_set_path, "r", 2) # Open with bulk size of 2 + + # Collect the data read from the source + result = list(next(source)) + # Optimized regex for metadata lines + metadata_regex = re.compile( + r'\{"index":\s*\{"_index":\s*"' + + re.escape(self.DEFAULT_INDEX_NAME) + + r'"\s*,\s*"_id":\s*\d+\s*\}\}\n' + ) + + # Optimized regex for vector data lines + vector_data_regex = re.compile( + r'\{"%s":\s*\[(?:-?\d+(?:\.\d+)?\s*,\s*)*-?\d+(?:\.\d+)?\s*\]\}\n' + % re.escape(self.DEFAULT_FIELD_NAME) + ) + # Assert that each line matches either the metadata pattern or the vector data pattern + for i, line in enumerate(result): + if i % 2 == 0: # Metadata line + self.assertRegex(line.decode(), metadata_regex) + else: # Vector data line + self.assertRegex(line.decode(), vector_data_regex) + + source.close() + + def test_hdf5_source_generate_ids_with_slice_larger_than_source(self): + # Create a dataset with 3 vectors of DEFAULT_DIMENSION + data_set_path = create_data_set( + num_vectors=3, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5SourceGenerateIds( + offset=0, + number_of_lines=10, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + source.open(data_set_path, "r", 5) # Open with bulk size of 5 + + result = list(next(source)) + + self.assertEqual(len(result), 6) + source.close() + + def test_hdf5_source_generate_ids_with_offset_greater_than_dataset_size(self): + # Create a dataset with 3 vectors of DEFAULT_DIMENSION + data_set_path = create_data_set( + num_vectors=3, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5SourceGenerateIds( + offset=10, + number_of_lines=5, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + with self.assertRaises(IndexError): + source.open(data_set_path, "r", 2) # Open with bulk size of 2 + + def test_hdf5_source_generate_ids_happy_path(self): + # Create a valid dataset with DEFAULT_NUM_VECTORS vectors of DEFAULT_DIMENSION dimension + data_set_path = create_data_set( + num_vectors=self.DEFAULT_NUM_VECTORS, + dimension=self.DEFAULT_DIMENSION, + extension=self.DEFAULT_TYPE, + data_set_context=self.DEFAULT_CONTEXT, + data_set_dir=self.data_set_dir, + ) + + source = params.HDF5SourceGenerateIds( + offset=0, + number_of_lines=self.DEFAULT_NUM_VECTORS, + dataset_context=self.DEFAULT_CONTEXT, + index=self.DEFAULT_INDEX_NAME, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_FIELD_NAME, + ) + + source.open(data_set_path, "r", 5) # Open with a bulk size of 5 + + # Collect the data read from the source + result = list(next(source)) + + metadata_regex = re.compile( + r'\{"index":\s*\{"_index":\s*"' + + re.escape(self.DEFAULT_INDEX_NAME) + + r'"\s*,\s*"_id":\s*\d+\s*\}\}\n' + ) + + # Optimized regex for vector data lines + vector_data_regex = re.compile( + r'\{"%s":\s*\[(?:-?\d+(?:\.\d+)?\s*,\s*)*-?\d+(?:\.\d+)?\s*\]\}\n' + % re.escape(self.DEFAULT_FIELD_NAME) + ) + # Assert that each line matches either the metadata pattern or the vector data pattern + for i, line in enumerate(result): + if i % 2 == 0: # Metadata line + self.assertRegex(line.decode(), metadata_regex) + else: # Vector data line + self.assertRegex(line.decode(), vector_data_regex) + + source.close() + + +class BulkIndexWithHDF5Tests(TestCase): + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_VECTOR_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + DEFAULT_ID_FIELD_NAME = "_id" + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_from_VectorSearchParamSourceTests(self): + num_vectors = 100 + num_partitions = 10 + corpus_name = "random-hdf5-corpus" + + hdf5_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + HDF5DataSet.FORMAT_NAME, + self.DEFAULT_CONTEXT, + self.data_set_dir, + ) + + corpora = [ + workload.DocumentCorpus( + name=corpus_name, + documents=[ + workload.Documents( + source_format=workload.Documents.SOURCE_FORMAT_HDF5, + number_of_documents=num_vectors, + document_file=hdf5_data_set_path, + ) + ], + ), + ] + + test_param_source = params.BulkIndexParamSource( + workload=workload.Workload(name="unit-test", corpora=corpora), + params={ + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": hdf5_data_set_path, + "bulk-size": 5, + "source_format": workload.Documents.SOURCE_FORMAT_HDF5, + "context": "neighbors", + }, + ) + + vectors_per_partition = num_vectors // num_partitions + + self._test_partition(test_param_source, num_partitions, vectors_per_partition) + + def _test_partition( + self, + test_param_source: params.BulkIndexParamSource, + num_partitions: int, + vec_per_partition: int, + ): + def schedule(param_source): + while True: + try: + yield param_source.params() + except StopIteration: + return + + for i in range(num_partitions): + partition = test_param_source.partition(i, num_partitions) + + partition._init_internal_params() + + bulk_schedule = list(schedule(partition)) + self.assertEqual(2, len(bulk_schedule)) + + def test_params_default(self): + num_vectors = 49 + bulk_size = 10 + data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.INDEX, + self.data_set_dir, + ) + corpus_name = "random-hdf5-corpus" + + corpora = [ + workload.DocumentCorpus( + name=corpus_name, + documents=[ + workload.Documents( + source_format=workload.Documents.SOURCE_FORMAT_HDF5, + number_of_documents=num_vectors, + document_file=data_set_path, + ) + ], + ), + ] + + bulk_param_source = params.BulkIndexParamSource( + workload=workload.Workload(name="unit-test", corpora=corpora), + params={ + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "bulk-size": bulk_size, + "id-field-name": self.DEFAULT_ID_FIELD_NAME, + "source_format": workload.Documents.SOURCE_FORMAT_HDF5, + "context": "neighbors", + }, + ) + # bulk_param_source = BulkVectorsFromDataSetParamSource( + # workload.Workload(name="unit-test"), test_param_source_params) + bulk_param_source_partition = bulk_param_source.partition(0, 1) + # Check each payload returned + vectors_consumed = 0 + while vectors_consumed < num_vectors: + expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size) + actual_params = bulk_param_source_partition.params() + self._check_params_no_ids( + actual_params, + "None", + "null", + self.DEFAULT_DIMENSION, + expected_num_vectors, + "None", + ) + vectors_consumed += expected_num_vectors + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + bulk_param_source_partition.params() + + def test_params_generate_ids(self): + num_vectors = 49 + bulk_size = 10 + data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.INDEX, + self.data_set_dir, + ) + corpus_name = "random-hdf5-corpus" + + corpora = [ + workload.DocumentCorpus( + name=corpus_name, + documents=[ + workload.Documents( + source_format=workload.Documents.SOURCE_FORMAT_HDF5, + number_of_documents=num_vectors, + document_file=data_set_path, + generate_increasing_vector_ids=True, + id_field_name=self.DEFAULT_ID_FIELD_NAME, + vector_field_name=self.DEFAULT_VECTOR_FIELD_NAME, + target_index=self.DEFAULT_INDEX_NAME, + ) + ], + ), + ] + + bulk_param_source = params.BulkIndexParamSource( + workload=workload.Workload(name="unit-test", corpora=corpora), + params={ + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "bulk-size": bulk_size, + "id-field-name": self.DEFAULT_ID_FIELD_NAME, + "source_format": workload.Documents.SOURCE_FORMAT_HDF5, + "context": "neighbors", + }, + ) + # bulk_param_source = BulkVectorsFromDataSetParamSource( + # workload.Workload(name="unit-test"), test_param_source_params) + bulk_param_source_partition = bulk_param_source.partition(0, 1) + # Check each payload returned + vectors_consumed = 0 + while vectors_consumed < num_vectors: + expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size) + actual_params = bulk_param_source_partition.params() + self._check_params( + actual_params, + self.DEFAULT_INDEX_NAME, + self.DEFAULT_VECTOR_FIELD_NAME, + self.DEFAULT_DIMENSION, + expected_num_vectors, + self.DEFAULT_ID_FIELD_NAME, + ) + vectors_consumed += expected_num_vectors + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + bulk_param_source_partition.params() + + def _check_params_no_ids( + self, + actual_params: dict, + expected_index: str, + expected_vector_field: str, + expected_dimension: int, + expected_num_vectors_in_payload: int, + expected_id_field: str, + ): + # size = actual_params.get("size") + # self.assertEqual(size, expected_num_vectors_in_payload) + body_bytes = actual_params.get("body") + self.assertIsInstance(body_bytes, bytes) + body_string = body_bytes.decode() + self.assertIsInstance(body_string, str) + body = body_string.split("\n") + self.assertEqual(len(body) // 2, expected_num_vectors_in_payload) + + # Bulk payload has 2 parts: first one is the header and the second one + # is the body. The header will have the index name and the body will + # have the vector + for header, req_body in ( + (json.loads(h), json.loads(b)) for h, b in zip(*[iter(body)] * 2) + ): + + index = header.get("index") + self.assertIsInstance(index, dict) + + index_name = index.get("_index") + self.assertEqual(index_name, expected_index) + + vector = req_body.get(expected_vector_field) + self.assertIsInstance(vector, list) + self.assertEqual(len(vector), expected_dimension) + # Below only happens for HDF5SourceGenerateIds + # if expected_id_field in index: + # self.assertEqual(self.DEFAULT_ID_FIELD_NAME, expected_id_field) + # self.assertFalse(expected_id_field in req_body) + # continue + # self.assertTrue(expected_id_field in req_body) + + def _check_params( + self, + actual_params: dict, + expected_index: str, + expected_vector_field: str, + expected_dimension: int, + expected_num_vectors_in_payload: int, + expected_id_field: str, + ): + # size = actual_params.get("size") + # self.assertEqual(size, expected_num_vectors_in_payload) + body_bytes = actual_params.get("body") + self.assertIsInstance(body_bytes, bytes) + body_string = body_bytes.decode() + self.assertIsInstance(body_string, str) + body = body_string.split("\n") + self.assertEqual(len(body) // 2, expected_num_vectors_in_payload) + + # Bulk payload has 2 parts: first one is the header and the second one + # is the body. The header will have the index name and the body will + # have the vector + for header, req_body in ( + (json.loads(h), json.loads(b)) for h, b in zip(*[iter(body)] * 2) + ): + + index = header.get("index") + self.assertIsInstance(index, dict) + + index_name = index.get("_index") + self.assertEqual(index_name, expected_index) + + vector = req_body.get(expected_vector_field) + self.assertIsInstance(vector, list) + self.assertEqual(len(vector), expected_dimension) + # Below only happens for HDF5SourceGenerateIds + if expected_id_field in index: + self.assertEqual(self.DEFAULT_ID_FIELD_NAME, expected_id_field) + self.assertFalse(expected_id_field in req_body) + continue + self.assertTrue(expected_id_field in req_body) + + class BulkDataGeneratorTests(TestCase): @classmethod