Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/bigann'
Browse files Browse the repository at this point in the history
  • Loading branch information
randersenYB committed Jul 23, 2024
2 parents 7ab304e + 4a5e442 commit 032907b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 52 deletions.
21 changes: 18 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,29 @@
"program": "${workspaceFolder}/aerospike/hdf_import.py",
"cwd": "${workspaceFolder}/aerospike",
"args": [
"--hdf", "${input:enterDataset}",
"--hdf", "${input:enterHDFFile}",
"--concurrency", "5000",
"--idxdrop",
"--logfile", "./hdfimport.log",
"--loglevel", "DEBUG"
],
"console": "integratedTerminal"
},
},
{
"name": "Python Debugger: hdf_import angular (prompt HDF)",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/aerospike/hdf_import.py",
"cwd": "${workspaceFolder}/aerospike",
"args": [
"--hdf", "${input:enterHDFFile}",
"--concurrency", "5000",
"--distancetype", "COSINE",
"--idxdrop",
"--logfile", "./hdfimport-angular.log",
],
"console": "integratedTerminal"
},
{
"name": "Python Debugger: hdf_import LB",
"type": "debugpy",
Expand Down Expand Up @@ -148,7 +163,7 @@
"-r", "10"
],
"console": "integratedTerminal"
},
},
{
"name": "Python Debugger: hdf_query (proimpt HDF)",
"type": "debugpy",
Expand Down
23 changes: 13 additions & 10 deletions aerospike/AerospikeHDFDashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,7 @@
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
"color": "green"
}
]
},
Expand Down Expand Up @@ -2195,7 +2194,7 @@
"list": [
{
"current": {
"selected": true,
"selected": false,
"text": "prometheus",
"value": "edj97jcoudpfkd"
},
Expand Down Expand Up @@ -2238,6 +2237,10 @@
},
{
"current": {},
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_heartbeat,ns)",
"hide": 0,
"includeAll": true,
Expand All @@ -2262,7 +2265,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_heartbeat{ns=\"$namespace\"},set)",
"definition": "label_values(aerospike_hdf_heartbeat{ns=~\"$namespace\"},set)",
"hide": 0,
"includeAll": true,
"label": "Set",
Expand All @@ -2271,7 +2274,7 @@
"options": [],
"query": {
"qryType": 1,
"query": "label_values(aerospike_hdf_heartbeat{ns=\"$namespace\"},set)",
"query": "label_values(aerospike_hdf_heartbeat{ns=~\"$namespace\"},set)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 2,
Expand Down Expand Up @@ -2310,7 +2313,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_heartbeat{idxns=\"$idxns\"},idx)",
"definition": "label_values(aerospike_hdf_heartbeat{idxns=~\"$idxns\"},idx)",
"hide": 0,
"includeAll": true,
"label": "Index name",
Expand All @@ -2319,7 +2322,7 @@
"options": [],
"query": {
"qryType": 1,
"query": "label_values(aerospike_hdf_heartbeat{idxns=\"$idxns\"},idx)",
"query": "label_values(aerospike_hdf_heartbeat{idxns=~\"$idxns\"},idx)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 2,
Expand All @@ -2334,7 +2337,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"definition": "label_values(aerospike_hdf_query_total{idx=\"$idxname\", ns=\"$idxns\"},run)",
"definition": "label_values(aerospike_hdf_query_total{idx=~\"$idxname\", ns=~\"$idxns\"},run)",
"hide": 0,
"includeAll": true,
"label": "Query Run",
Expand All @@ -2343,7 +2346,7 @@
"options": [],
"query": {
"qryType": 1,
"query": "label_values(aerospike_hdf_query_total{idx=\"$idxname\", ns=\"$idxns\"},run)",
"query": "label_values(aerospike_hdf_query_total{idx=~\"$idxname\", ns=~\"$idxns\"},run)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
},
"refresh": 2,
Expand Down Expand Up @@ -2386,6 +2389,6 @@
"timezone": "",
"title": "Aerospike HDF",
"uid": "fzUPYeJIkhdf",
"version": 18,
"version": 19,
"weekStart": ""
}
34 changes: 26 additions & 8 deletions aerospike/aerospikehdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from enum import Flag, auto
from typing import Iterable, List, Union, Any
from importlib.metadata import version
from math import sqrt

from aerospike_vector_search import types as vectorTypes, Client as vectorSyncClient
from aerospike_vector_search.aio import AdminClient as vectorASyncAdminClient, Client as vectorASyncClient
from aerospike_vector_search.shared.proto_generated.types_pb2_grpc import grpc as vectorResultCodes

from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, OperationActions
from baseaerospike import BaseAerospike, _distanceNameToAerospikeType as DistanceMaps, _distanceAerospikeTypeToAnn as DistanceMapsAnn, OperationActions
from datasets import DATASETS, load_and_transform_dataset, get_dataset_fn
from metrics import all_metrics as METRICS, DummyMetric
from distance import metrics as DISTANCES, Metric as DistanceMetric
Expand Down Expand Up @@ -99,8 +100,8 @@ def parse_arguments_population(parser: argparse.ArgumentParser) -> None:
'-D', "--distancetype",
metavar="DIST",
help="The Vector's Index Distance Type. The default is to select the type based on the dataset",
type=vectorTypes.VectorDistanceMetric,
choices=list(vectorTypes.VectorDistanceMetric),
type=str,
choices=[v.name for v in vectorTypes.VectorDistanceMetric],
default=None
)
parser.add_argument(
Expand Down Expand Up @@ -220,6 +221,11 @@ def parse_arguments_query(parser: argparse.ArgumentParser) -> None:
default="k-nn",
type=str,
choices=METRICS.keys(),
)
parser.add_argument(
"--dontadustdistance",
help="Don't adjust the distance returned by Aerospike based on the distance type (e.g., Square-Euclidean)",
action='store_true'
)

BaseAerospike.parse_arguments(parser)
Expand Down Expand Up @@ -268,7 +274,9 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions):
self._idx_name = runtimeArgs.idxname
self._idx_binName = runtimeArgs.vectorbinname

self._idx_distance = runtimeArgs.distancetype
if runtimeArgs.distancetype is not None and runtimeArgs.distancetype:
self._idx_distance = next(v for v in vectorTypes.VectorDistanceMetric if v.name == runtimeArgs.distancetype)

if runtimeArgs.indexparams is None or len(runtimeArgs.indexparams) == 0:
self._idx_hnswparams = None
else:
Expand Down Expand Up @@ -296,6 +304,7 @@ def __init__(self, runtimeArgs: argparse.Namespace, actions: OperationActions):
self._query_distance_aerospike : List = None
self._query_latencies : List[float] = None
self._query_produce_resultfile : bool = not runtimeArgs.noresultfile
self._query_distance_no_adjustments = runtimeArgs.dontadustdistance

if runtimeArgs.searchparams is None or len(runtimeArgs.searchparams) == 0:
self._query_hnswparams = None
Expand Down Expand Up @@ -332,9 +341,14 @@ async def get_dataset(self) -> None:
elif distance != self._ann_distance:
self.print_log(f"ANN distance types do not match! Found: {distance} Provided: {self._ann_distance}. Distance calculations could be wrong!", logging.WARN)

if self._idx_distance is None or not self._idx_distance:
if self._idx_distance is None:
self._idx_distance = DistanceMaps.get(distance)

else:
idxdistance = DistanceMaps.get(distance)
if idxdistance.name != self._idx_distance.name:
self.print_log(f"ANN distance and Idx types do not match! Found: {distance} Idx: {self._idx_distance.name}. Using {self._idx_distance.name}!", logging.WARN)
self._ann_distance = DistanceMapsAnn.get(self._idx_distance.name)

if self._idx_distance is None or not self._idx_distance:
raise ValueError(f"Distance Map '{distance}' was not found.")

Expand Down Expand Up @@ -906,8 +920,12 @@ async def query_run(self, client:vectorASyncClient, runNbr:int, distancemetric :
rundistance.append([])
runlatencies.append(latency)
continue
result_ids = [neighbor.key.key for neighbor in result]
aerospike_distances = [neighbor.distance for neighbor in result]
result_ids = [neighbor.key.key for neighbor in result]

if self._idx_distance.name == "SQUARED_EUCLIDEAN" and not self._query_distance_no_adjustments:
aerospike_distances = [sqrt(neighbor.distance) for neighbor in result]
else:
aerospike_distances = [neighbor.distance for neighbor in result]

if self._query_check:
if len(result_ids) == 0:
Expand Down
50 changes: 19 additions & 31 deletions aerospike/baseaerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
'dot': vectorTypes.VectorDistanceMetric.DOT_PRODUCT,
}

_distanceAerospikeTypeToAnn: Dict[str, str] = {
'COSINE' : 'angular',
'SQUARED_EUCLIDEAN' : 'euclidean',
'HAMMING' : 'hamming',
'DOT_PRODUCT' : 'dot',
'jaccard' : 'jaccard'
}

loggerASClient = logging.getLogger("aerospike_vector_search")
logFileHandler = None

Expand Down Expand Up @@ -290,39 +298,19 @@ def prometheus_status(self, done:bool = False) -> None:
if self._heartbeat_current_stage == self._heartbeat_stage:
return
self._heartbeat_current_stage = self._heartbeat_stage
self._prometheus_heartbeat_gauge.set(self.__cnthb__,
{"paused": "Starting"
})
return

if self._heartbeat_stage == 1:
if self._heartbeat_current_stage == self._heartbeat_stage:
return
self._heartbeat_current_stage = self._heartbeat_stage
attrs : Dict = {"dims": self._dimensions,
"poprecs": None if self._trainarray is None else len(self._trainarray),
"queries": None if self._queryarray is None else len(self._queryarray),
"querynbrlmt": self._query_nbrlimit,
"queryruns": self._query_runs,
"dataset":self._datasetname,
"paused":"Cellecting",
"action": None if self._actions is None else self._actions.name,
"hnswparams": self.hnswstr()
}
if self._namespace is not None:
attrs.update({"ns": self._namespace,
"set": self._setName})
if self._idx_namespace is not None:
attrs.update({"idxns": self._idx_namespace,
"idx": self._idx_name})

self._prometheus_heartbeat_gauge.set(self.__cnthb__,
attrs)

return


pausestate : str = None
if done:
pausestate = "Done"
elif self._heartbeat_stage == 0:
pausestate = "Starting"
elif self._heartbeat_stage == 1:
pausestate = "Collecting"
elif self._actions is not None and OperationActions.POPULATION in self._actions:
if self._waitidx:
pausestate = "Waiting"
Expand All @@ -341,10 +329,10 @@ def prometheus_status(self, done:bool = False) -> None:
queryef = self._query_hnswparams.ef

self._prometheus_heartbeat_gauge.set(self.__cnthb__,
{"ns":self._namespace,
"set":self._setName,
"idxns":self._idx_namespace,
"idx":self._idx_name,
{"ns": '' if self._namespace is None else self._namespace,
"set": '' if self._setName is None else self._setName,
"idxns":'' if self._idx_namespace is None else self._idx_namespace,
"idx": '' if self._idx_name is None else self._idx_name,
"idxbin":self._idx_binName,
"idxdist": None if self._idx_distance is None else self._idx_distance.name,
"anndist": self._ann_distance,
Expand All @@ -353,7 +341,7 @@ def prometheus_status(self, done:bool = False) -> None:
"queries": None if self._queryarray is None else len(self._queryarray),
"querynbrlmt": self._query_nbrlimit,
"queryruns": self._query_runs,
"querycurrun": self._query_current_run,
"querycurrun": '' if self._query_current_run is None else self._query_current_run,
"dataset":self._datasetname,
"paused": pausestate,
"action": None if self._actions is None else self._actions.name,
Expand Down

0 comments on commit 032907b

Please sign in to comment.