From 4a5e442f9cd5ee88391a06646dbf548584086380 Mon Sep 17 00:00:00 2001 From: randersenyb Date: Tue, 23 Jul 2024 14:11:42 -0700 Subject: [PATCH] Added logic to use the sqrt on squared euclidean distance from Aerospike. Updated dashboard... --- .vscode/launch.json | 21 ++++++++++-- aerospike/AerospikeHDFDashboard.json | 23 +++++++------ aerospike/aerospikehdf.py | 34 ++++++++++++++----- aerospike/baseaerospike.py | 50 +++++++++++----------------- 4 files changed, 76 insertions(+), 52 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 3ae4b551..09ea221b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -84,14 +84,29 @@ "program": "${workspaceFolder}/aerospike/hdf_import.py", "cwd": "${workspaceFolder}/aerospike", "args": [ - "--hdf", "${input:enterDataset}", + "--hdf", "${input:enterHDFFile}", "--concurrency", "5000", "--idxdrop", "--logfile", "./hdfimport.log", "--loglevel", "DEBUG" ], "console": "integratedTerminal" - }, + }, + { + "name": "Python Debugger: hdf_import angular (prompt HDF)", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/aerospike/hdf_import.py", + "cwd": "${workspaceFolder}/aerospike", + "args": [ + "--hdf", "${input:enterHDFFile}", + "--concurrency", "5000", + "--distancetype", "COSINE", + "--idxdrop", + "--logfile", "./hdfimport-angular.log", + ], + "console": "integratedTerminal" + }, { "name": "Python Debugger: hdf_import LB", "type": "debugpy", @@ -148,7 +163,7 @@ "-r", "10" ], "console": "integratedTerminal" - }, + }, { "name": "Python Debugger: hdf_query (proimpt HDF)", "type": "debugpy", diff --git a/aerospike/AerospikeHDFDashboard.json b/aerospike/AerospikeHDFDashboard.json index 78c33b14..9c6f899d 100644 --- a/aerospike/AerospikeHDFDashboard.json +++ b/aerospike/AerospikeHDFDashboard.json @@ -752,8 +752,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" } ] }, @@ -2195,7 +2194,7 @@ "list": [ { "current": { - "selected": true, + "selected": false, "text": "prometheus", "value": "edj97jcoudpfkd" }, @@ -2238,6 +2237,10 @@ }, { "current": {}, + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, "definition": "label_values(aerospike_hdf_heartbeat,ns)", "hide": 0, "includeAll": true, @@ -2262,7 +2265,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "definition": "label_values(aerospike_hdf_heartbeat{ns=\"$namespace\"},set)", + "definition": "label_values(aerospike_hdf_heartbeat{ns=~\"$namespace\"},set)", "hide": 0, "includeAll": true, "label": "Set", @@ -2271,7 +2274,7 @@ "options": [], "query": { "qryType": 1, - "query": "label_values(aerospike_hdf_heartbeat{ns=\"$namespace\"},set)", + "query": "label_values(aerospike_hdf_heartbeat{ns=~\"$namespace\"},set)", "refId": "PrometheusVariableQueryEditor-VariableQuery" }, "refresh": 2, @@ -2310,7 +2313,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "definition": "label_values(aerospike_hdf_heartbeat{idxns=\"$idxns\"},idx)", + "definition": "label_values(aerospike_hdf_heartbeat{idxns=~\"$idxns\"},idx)", "hide": 0, "includeAll": true, "label": "Index name", @@ -2319,7 +2322,7 @@ "options": [], "query": { "qryType": 1, - "query": "label_values(aerospike_hdf_heartbeat{idxns=\"$idxns\"},idx)", + "query": "label_values(aerospike_hdf_heartbeat{idxns=~\"$idxns\"},idx)", "refId": "PrometheusVariableQueryEditor-VariableQuery" }, "refresh": 2, @@ -2334,7 +2337,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "definition": "label_values(aerospike_hdf_query_total{idx=\"$idxname\", ns=\"$idxns\"},run)", + "definition": "label_values(aerospike_hdf_query_total{idx=~\"$idxname\", ns=~\"$idxns\"},run)", "hide": 0, "includeAll": true, "label": "Query Run", @@ -2343,7 +2346,7 @@ "options": [], "query": { "qryType": 1, - "query": "label_values(aerospike_hdf_query_total{idx=\"$idxname\", ns=\"$idxns\"},run)", + "query": "label_values(aerospike_hdf_query_total{idx=~\"$idxname\", ns=~\"$idxns\"},run)", "refId": "PrometheusVariableQueryEditor-VariableQuery" }, "refresh": 2, @@ -2386,6 +2389,6 @@ "timezone": "", "title": "Aerospike HDF", "uid": "fzUPYeJIkhdf", - "version": 18, + "version": 19, "weekStart": "" } \ No newline at end of file diff --git a/aerospike/aerospikehdf.py b/aerospike/aerospikehdf.py index f034f015..180a6b86 100644 --- a/aerospike/aerospikehdf.py +++ b/aerospike/aerospikehdf.py @@ -9,12 +9,13 @@ from enum import Flag, auto from typing import Iterable, List, Union, Any from importlib.metadata import version +from math import sqrt from aerospike_vector_search import types as vectorTypes, Client as vectorSyncClient from aerospike_vector_search.aio import AdminClient as vectorASyncAdminClient, Client as vectorASyncClient from aerospike_vector_search.shared.proto_generated.types_pb2_grpc import grpc as vectorResultCodes -from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, OperationActions +from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, _distanceAerospikeTypeToAnn as DistanceMapsAnn, OperationActions from datasets import DATASETS, load_and_transform_dataset, get_dataset_fn from metrics import all_metrics as METRICS, DummyMetric from distance import metrics as DISTANCES, Metric as DistanceMetric @@ -99,8 +100,8 @@ def parse_arguments_population(parser: argparse.ArgumentParser) -> None: '-D', "--distancetype", metavar="DIST", help="The Vector's Index Distance Type. The default is to select the type based on the dataset", - type=vectorTypes.VectorDistanceMetric, - choices=list(vectorTypes.VectorDistanceMetric), + type=str, + choices=[v.name for v in vectorTypes.VectorDistanceMetric], default=None ) parser.add_argument( @@ -220,6 +221,11 @@ def parse_arguments_query(parser: argparse.ArgumentParser) -> None: default="k-nn", type=str, choices=METRICS.keys(), + ) + parser.add_argument( + "--dontadustdistance", + help="Don't adjust the distance returned by Aerospike based on the distance type (e.g., Square-Euclidean)", + action='store_true' ) BaseAerospike.parse_arguments(parser) @@ -268,7 +274,9 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions): self._idx_name = runtimeArgs.idxname self._idx_binName = runtimeArgs.vectorbinname - self._idx_distance = runtimeArgs.distancetype + if runtimeArgs.distancetype is not None and runtimeArgs.distancetype: + self._idx_distance = next(v for v in vectorTypes.VectorDistanceMetric if v.name == runtimeArgs.distancetype) + if runtimeArgs.indexparams is None or len(runtimeArgs.indexparams) == 0: self._idx_hnswparams = None else: @@ -296,6 +304,7 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions): self._query_distance_aerospike : List = None self._query_latencies : List[float] = None self._query_produce_resultfile : bool = not runtimeArgs.noresultfile + self._query_distance_no_adjustments = runtimeArgs.dontadustdistance if runtimeArgs.searchparams is None or len(runtimeArgs.searchparams) == 0: self._query_hnswparams = None @@ -332,9 +341,14 @@ async def get_dataset(self) -> None: elif distance != self._ann_distance: self.print_log(f"ANN distance types do not match! Found: {distance} Provided: {self._ann_distance}. Distance calculations could be wrong!", logging.WARN) - if self._idx_distance is None or not self._idx_distance: + if self._idx_distance is None: self._idx_distance = DistanceMaps.get(distance) - + else: + idxdistance = DistanceMaps.get(distance) + if idxdistance.name != self._idx_distance.name: + self.print_log(f"ANN distance and Idx types do not match! Found: {distance} Idx: {self._idx_distance.name}. Using {self._idx_distance.name}!", logging.WARN) + self._ann_distance = DistanceMapsAnn.get(self._idx_distance.name) + if self._idx_distance is None or not self._idx_distance: raise ValueError(f"Distance Map '{distance}' was not found.") @@ -906,8 +920,12 @@ async def query_run(self, client:vectorASyncClient, runNbr:int, distancemetric : rundistance.append([]) runlatencies.append(latency) continue - result_ids = [neighbor.key.key for neighbor in result] - aerospike_distances = [neighbor.distance for neighbor in result] + result_ids = [neighbor.key.key for neighbor in result] + + if self._idx_distance.name == "SQUARED_EUCLIDEAN" and not self._query_distance_no_adjustments: + aerospike_distances = [sqrt(neighbor.distance) for neighbor in result] + else: + aerospike_distances = [neighbor.distance for neighbor in result] if self._query_check: if len(result_ids) == 0: diff --git a/aerospike/baseaerospike.py b/aerospike/baseaerospike.py index 25a51a55..bb2ff6b5 100644 --- a/aerospike/baseaerospike.py +++ b/aerospike/baseaerospike.py @@ -31,6 +31,14 @@ 'dot': vectorTypes.VectorDistanceMetric.DOT_PRODUCT, } +_distanceAerospikeTypeToAnn: Dict[str, str] = { + 'COSINE' : 'angular', + 'SQUARED_EUCLIDEAN' : 'euclidean', + 'HAMMING' : 'hamming', + 'DOT_PRODUCT' : 'dot', + 'jaccard' : 'jaccard' +} + loggerASClient = logging.getLogger("aerospike_vector_search") logFileHandler = None @@ -290,39 +298,19 @@ def prometheus_status(self, done:bool = False) -> None: if self._heartbeat_current_stage == self._heartbeat_stage: return self._heartbeat_current_stage = self._heartbeat_stage - self._prometheus_heartbeat_gauge.set(self.__cnthb__, - {"paused": "Starting" - }) - return + if self._heartbeat_stage == 1: if self._heartbeat_current_stage == self._heartbeat_stage: return self._heartbeat_current_stage = self._heartbeat_stage - attrs : Dict = {"dims": self._dimensions, - "poprecs": None if self._trainarray is None else len(self._trainarray), - "queries": None if self._queryarray is None else len(self._queryarray), - "querynbrlmt": self._query_nbrlimit, - "queryruns": self._query_runs, - "dataset":self._datasetname, - "paused":"Cellecting", - "action": None if self._actions is None else self._actions.name, - "hnswparams": self.hnswstr() - } - if self._namespace is not None: - attrs.update({"ns": self._namespace, - "set": self._setName}) - if self._idx_namespace is not None: - attrs.update({"idxns": self._idx_namespace, - "idx": self._idx_name}) - - self._prometheus_heartbeat_gauge.set(self.__cnthb__, - attrs) - - return - + pausestate : str = None if done: pausestate = "Done" + elif self._heartbeat_stage == 0: + pausestate = "Starting" + elif self._heartbeat_stage == 1: + pausestate = "Collecting" elif self._actions is not None and OperationActions.POPULATION in self._actions: if self._waitidx: pausestate = "Waiting" @@ -341,10 +329,10 @@ def prometheus_status(self, done:bool = False) -> None: queryef = self._query_hnswparams.ef self._prometheus_heartbeat_gauge.set(self.__cnthb__, - {"ns":self._namespace, - "set":self._setName, - "idxns":self._idx_namespace, - "idx":self._idx_name, + {"ns": '' if self._namespace is None else self._namespace, + "set": '' if self._setName is None else self._setName, + "idxns":'' if self._idx_namespace is None else self._idx_namespace, + "idx": '' if self._idx_name is None else self._idx_name, "idxbin":self._idx_binName, "idxdist": None if self._idx_distance is None else self._idx_distance.name, "anndist": self._ann_distance, @@ -353,7 +341,7 @@ def prometheus_status(self, done:bool = False) -> None: "queries": None if self._queryarray is None else len(self._queryarray), "querynbrlmt": self._query_nbrlimit, "queryruns": self._query_runs, - "querycurrun": self._query_current_run, + "querycurrun": '' if self._query_current_run is None else self._query_current_run, "dataset":self._datasetname, "paused": pausestate, "action": None if self._actions is None else self._actions.name,