Skip to content

Commit

Permalink
Added logic to use the sqrt on squared euclidean distance from Aerosp…
Browse files Browse the repository at this point in the history
…ike. Updated dashboard...
  • Loading branch information
randersenYB committed Jul 23, 2024
1 parent 385681a commit 4a5e442
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 52 deletions.
21 changes: 18 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,29 @@
"program": "${workspaceFolder}/aerospike/hdf_import.py",
"cwd": "${workspaceFolder}/aerospike",
"args": [
"--hdf", "${input:enterDataset}",
"--hdf", "${input:enterHDFFile}",
"--concurrency", "5000",
"--idxdrop",
"--logfile", "./hdfimport.log",
"--loglevel", "DEBUG"
],
"console": "integratedTerminal"
},
},
{
"name": "Python Debugger: hdf_import angular (prompt HDF)",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/aerospike/hdf_import.py",
"cwd": "${workspaceFolder}/aerospike",
"args": [
"--hdf", "${input:enterHDFFile}",
"--concurrency", "5000",
"--distancetype", "COSINE",
"--idxdrop",
"--logfile", "./hdfimport-angular.log",
],
"console": "integratedTerminal"
},
{
"name": "Python Debugger: hdf_import LB",
"type": "debugpy",
Expand Down Expand Up @@ -148,7 +163,7 @@
"-r", "10"
],
"console": "integratedTerminal"
},
},
{
"name": "Python Debugger: hdf_query (proimpt HDF)",
"type": "debugpy",
Expand Down
23 changes: 13 additions & 10 deletions aerospike/AerospikeHDFDashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
}
]
},
Expand Down Expand Up @@ -2195,7 +2194,7 @@
"list": [
{
"current": {
"selected": true,
"selected": false,
"text": "prometheus",
"value": "edj97jcoudpfkd"
},
Expand Down Expand Up @@ -2238,6 +2237,10 @@
},
{
"current": {},
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_heartbeat,ns)",
"hide": 0,
"includeAll": true,
Expand All @@ -2262,7 +2265,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_heartbeat{ns=\"$namespace\"},set)",
"definition": "label_values(aerospike_hdf_heartbeat{ns=~\"$namespace\"},set)",
"hide": 0,
"includeAll": true,
"label": "Set",
Expand All @@ -2271,7 +2274,7 @@
"options": [],
"query": {
"qryType": 1,
"query": "label_values(aerospike_hdf_heartbeat{ns=\"$namespace\"},set)",
"query": "label_values(aerospike_hdf_heartbeat{ns=~\"$namespace\"},set)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 2,
Expand Down Expand Up @@ -2310,7 +2313,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_heartbeat{idxns=\"$idxns\"},idx)",
"definition": "label_values(aerospike_hdf_heartbeat{idxns=~\"$idxns\"},idx)",
"hide": 0,
"includeAll": true,
"label": "Index name",
Expand All @@ -2319,7 +2322,7 @@
"options": [],
"query": {
"qryType": 1,
"query": "label_values(aerospike_hdf_heartbeat{idxns=\"$idxns\"},idx)",
"query": "label_values(aerospike_hdf_heartbeat{idxns=~\"$idxns\"},idx)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 2,
Expand All @@ -2334,7 +2337,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_query_total{idx=\"$idxname\", ns=\"$idxns\"},run)",
"definition": "label_values(aerospike_hdf_query_total{idx=~\"$idxname\", ns=~\"$idxns\"},run)",
"hide": 0,
"includeAll": true,
"label": "Query Run",
Expand All @@ -2343,7 +2346,7 @@
"options": [],
"query": {
"qryType": 1,
"query": "label_values(aerospike_hdf_query_total{idx=\"$idxname\", ns=\"$idxns\"},run)",
"query": "label_values(aerospike_hdf_query_total{idx=~\"$idxname\", ns=~\"$idxns\"},run)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 2,
Expand Down Expand Up @@ -2386,6 +2389,6 @@
"timezone": "",
"title": "Aerospike HDF",
"uid": "fzUPYeJIkhdf",
"version": 18,
"version": 19,
"weekStart": ""
}
34 changes: 26 additions & 8 deletions aerospike/aerospikehdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from enum import Flag, auto
from typing import Iterable, List, Union, Any
from importlib.metadata import version
from math import sqrt

from aerospike_vector_search import types as vectorTypes, Client as vectorSyncClient
from aerospike_vector_search.aio import AdminClient as vectorASyncAdminClient, Client as vectorASyncClient
from aerospike_vector_search.shared.proto_generated.types_pb2_grpc import grpc as vectorResultCodes

from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, OperationActions
from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, _distanceAerospikeTypeToAnn as DistanceMapsAnn, OperationActions
from datasets import DATASETS, load_and_transform_dataset, get_dataset_fn
from metrics import all_metrics as METRICS, DummyMetric
from distance import metrics as DISTANCES, Metric as DistanceMetric
Expand Down Expand Up @@ -99,8 +100,8 @@ def parse_arguments_population(parser: argparse.ArgumentParser) -> None:
'-D', "--distancetype",
metavar="DIST",
help="The Vector's Index Distance Type. The default is to select the type based on the dataset",
type=vectorTypes.VectorDistanceMetric,
choices=list(vectorTypes.VectorDistanceMetric),
type=str,
choices=[v.name for v in vectorTypes.VectorDistanceMetric],
default=None
)
parser.add_argument(
Expand Down Expand Up @@ -220,6 +221,11 @@ def parse_arguments_query(parser: argparse.ArgumentParser) -> None:
default="k-nn",
type=str,
choices=METRICS.keys(),
)
parser.add_argument(
"--dontadustdistance",
help="Don't adjust the distance returned by Aerospike based on the distance type (e.g., Square-Euclidean)",
action='store_true'
)

BaseAerospike.parse_arguments(parser)
Expand Down Expand Up @@ -268,7 +274,9 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions):
self._idx_name = runtimeArgs.idxname
self._idx_binName = runtimeArgs.vectorbinname

self._idx_distance = runtimeArgs.distancetype
if runtimeArgs.distancetype is not None and runtimeArgs.distancetype:
self._idx_distance = next(v for v in vectorTypes.VectorDistanceMetric if v.name == runtimeArgs.distancetype)

if runtimeArgs.indexparams is None or len(runtimeArgs.indexparams) == 0:
self._idx_hnswparams = None
else:
Expand Down Expand Up @@ -296,6 +304,7 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions):
self._query_distance_aerospike : List = None
self._query_latencies : List[float] = None
self._query_produce_resultfile : bool = not runtimeArgs.noresultfile
self._query_distance_no_adjustments = runtimeArgs.dontadustdistance

if runtimeArgs.searchparams is None or len(runtimeArgs.searchparams) == 0:
self._query_hnswparams = None
Expand Down Expand Up @@ -332,9 +341,14 @@ async def get_dataset(self) -> None:
elif distance != self._ann_distance:
self.print_log(f"ANN distance types do not match! Found: {distance} Provided: {self._ann_distance}. Distance calculations could be wrong!", logging.WARN)

if self._idx_distance is None or not self._idx_distance:
if self._idx_distance is None:
self._idx_distance = DistanceMaps.get(distance)

else:
idxdistance = DistanceMaps.get(distance)
if idxdistance.name != self._idx_distance.name:
self.print_log(f"ANN distance and Idx types do not match! Found: {distance} Idx: {self._idx_distance.name}. Using {self._idx_distance.name}!", logging.WARN)
self._ann_distance = DistanceMapsAnn.get(self._idx_distance.name)

if self._idx_distance is None or not self._idx_distance:
raise ValueError(f"Distance Map '{distance}' was not found.")

Expand Down Expand Up @@ -906,8 +920,12 @@ async def query_run(self, client:vectorASyncClient, runNbr:int, distancemetric :
rundistance.append([])
runlatencies.append(latency)
continue
result_ids = [neighbor.key.key for neighbor in result]
aerospike_distances = [neighbor.distance for neighbor in result]
result_ids = [neighbor.key.key for neighbor in result]

if self._idx_distance.name == "SQUARED_EUCLIDEAN" and not self._query_distance_no_adjustments:
aerospike_distances = [sqrt(neighbor.distance) for neighbor in result]
else:
aerospike_distances = [neighbor.distance for neighbor in result]

if self._query_check:
if len(result_ids) == 0:
Expand Down
50 changes: 19 additions & 31 deletions aerospike/baseaerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
'dot': vectorTypes.VectorDistanceMetric.DOT_PRODUCT,
}

_distanceAerospikeTypeToAnn: Dict[str, str] = {
'COSINE' : 'angular',
'SQUARED_EUCLIDEAN' : 'euclidean',
'HAMMING' : 'hamming',
'DOT_PRODUCT' : 'dot',
'jaccard' : 'jaccard'
}

loggerASClient = logging.getLogger("aerospike_vector_search")
logFileHandler = None

Expand Down Expand Up @@ -290,39 +298,19 @@ def prometheus_status(self, done:bool = False) -> None:
if self._heartbeat_current_stage == self._heartbeat_stage:
return
self._heartbeat_current_stage = self._heartbeat_stage
self._prometheus_heartbeat_gauge.set(self.__cnthb__,
{"paused": "Starting"
})
return

if self._heartbeat_stage == 1:
if self._heartbeat_current_stage == self._heartbeat_stage:
return
self._heartbeat_current_stage = self._heartbeat_stage
attrs : Dict = {"dims": self._dimensions,
"poprecs": None if self._trainarray is None else len(self._trainarray),
"queries": None if self._queryarray is None else len(self._queryarray),
"querynbrlmt": self._query_nbrlimit,
"queryruns": self._query_runs,
"dataset":self._datasetname,
"paused":"Cellecting",
"action": None if self._actions is None else self._actions.name,
"hnswparams": self.hnswstr()
}
if self._namespace is not None:
attrs.update({"ns": self._namespace,
"set": self._setName})
if self._idx_namespace is not None:
attrs.update({"idxns": self._idx_namespace,
"idx": self._idx_name})

self._prometheus_heartbeat_gauge.set(self.__cnthb__,
attrs)

return


pausestate : str = None
if done:
pausestate = "Done"
elif self._heartbeat_stage == 0:
pausestate = "Starting"
elif self._heartbeat_stage == 1:
pausestate = "Collecting"
elif self._actions is not None and OperationActions.POPULATION in self._actions:
if self._waitidx:
pausestate = "Waiting"
Expand All @@ -341,10 +329,10 @@ def prometheus_status(self, done:bool = False) -> None:
queryef = self._query_hnswparams.ef

self._prometheus_heartbeat_gauge.set(self.__cnthb__,
{"ns":self._namespace,
"set":self._setName,
"idxns":self._idx_namespace,
"idx":self._idx_name,
{"ns": '' if self._namespace is None else self._namespace,
"set": '' if self._setName is None else self._setName,
"idxns":'' if self._idx_namespace is None else self._idx_namespace,
"idx": '' if self._idx_name is None else self._idx_name,
"idxbin":self._idx_binName,
"idxdist": None if self._idx_distance is None else self._idx_distance.name,
"anndist": self._ann_distance,
Expand All @@ -353,7 +341,7 @@ def prometheus_status(self, done:bool = False) -> None:
"queries": None if self._queryarray is None else len(self._queryarray),
"querynbrlmt": self._query_nbrlimit,
"queryruns": self._query_runs,
"querycurrun": self._query_current_run,
"querycurrun": '' if self._query_current_run is None else self._query_current_run,
"dataset":self._datasetname,
"paused": pausestate,
"action": None if self._actions is None else self._actions.name,
Expand Down

0 comments on commit 4a5e442

Please sign in to comment.