Skip to content

Commit

Permalink
Fixed exceptions due to API change, changed __str to only output indx…
Browse files Browse the repository at this point in the history
…/dataset
  • Loading branch information
randersenYB committed Jul 18, 2024
1 parent ba18d2e commit 5bc8daf
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
"program": "${workspaceFolder}/aerospike/hdf_import.py",
"cwd": "${workspaceFolder}/aerospike",
"args": [
"--dataset", "random-xs-20-angular",
"--dataset", "${input:enterDataset}",
"--concurrency", "5000",
"--idxdrop",
"--logfile", "./hdfimport.log",
Expand Down
56 changes: 20 additions & 36 deletions aerospike/aerospikehdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,20 +345,26 @@ async def _resourceexhaused_handler(self, key: int, embedding, i: int, client: v
async def put_vector(self, key, embedding, i: int, client: vectorASyncClient, retry: bool = False) -> None:
try:
try:
if type(key).__module__ == np.__name__:
key = key.item()
await client.upsert(namespace=self._namespace,
set_name=self._setName,
key=key,
record_data={
self._idx_binName:embedding.tolist()
}
)
try:
if type(key).__module__ == np.__name__:
key = key.item()
await client.upsert(namespace=self._namespace,
set_name=self._setName,
key=key,
record_data={
self._idx_binName:embedding.tolist()
}
)
except vectorTypes.AVSServerError as avse:
if self._idx_resource_event != 0 and not retry and avse.rpc_error.code() == vectorResultCodes.StatusCode.RESOURCE_EXHAUSTED:
await self._resourceexhaused_handler(key, embedding, i, client)
else:
raise
except vectorTypes.AVSServerError as avse:
if self._idx_resource_event != 0 and not retry and avse.rpc_error.code() == vectorResultCodes.StatusCode.RESOURCE_EXHAUSTED:
await self._resourceexhaused_handler(key, embedding, i, client)
else:
raise
if str(avse.rpc_error.details()).find("Server memory error") != -1:
raise RuntimeError(f"A Stop Write was detected which indicates the Aerospike DB is out of memory. AVS Error: {avse.rpc_error.debug_error_string()}")
else:
raise
except Exception as e:
print(f'\n** Count: {i} Key: {key} Exception: "{e}" **\r\n')
logger.exception(f"Put Failure on Count: {i}, Key: {key}, Idx: {self._idx_namespace}.{self._idx_name}, Retry: {retry}")
Expand Down Expand Up @@ -686,26 +692,4 @@ async def vector_search(self, client:vectorASyncClient, query:List[float], runNb
return result

def __str__(self):
arrayLen = None
nbrArrayLen = None
if self._trainarray is not None:
arrayLen = len(self._trainarray)
if self._neighbors is not None:
if len(self._neighbors) > 0:
nbrArrayLen = f"{len(self._neighbors)}x{len(self._neighbors[0])}"
else:
nbrArrayLen = "0x0"
if OperationActions.POPULATION in self._actions:
popstr = f", DropIdx: {self._idx_drop}, Concurrency: {self._concurrency}, MaxRecs: {self._idx_maxrecs}, WaitIdxCompletion: {not self._idx_nowait} Exhausted Evt: {self._idx_resource_event}"
else:
popstr = ""
if OperationActions.QUERY in self._actions:
qrystr = f", Runs: {self._query_runs}, Parallel: {self._query_parallel}, Check: {self._query_check}"
else:
qrystr = ""
if self._query_metric is None:
metricstr = ""
else:
typestr = self._query_metric["type"]
metricstr = f", recall:{typestr}"
return f"Aerospike([{self.basestring()}, Actions: {self._actions}, Dimensions: {self._dimensions}, Array: {arrayLen}, NbrResult: {nbrArrayLen}, DS: {self._datasetname}{popstr}{qrystr}{metricstr}]"
return super().__str__()
7 changes: 6 additions & 1 deletion aerospike/baseaerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,9 @@ def basestring(self) -> str:
return f"BaseAerospike([[{hosts}], {self._useloadbalancer}, {fullName}, {self._idx_distance}, {{{hnswparams}}}{searchhnswparams}])"

def __str__(self):
return self.basestring()
if self._idx_namespace == self._namespace:
fullName = f"{self._namespace}.{self._setName}.{self._idx_name}"
else:
fullName = f"{self._namespace}.{self._setName}.{self._idx_namespace}.{self._idx_name}"

return f"{fullName}({self._datasetname})"
6 changes: 6 additions & 0 deletions aerospike/bigann/bigann_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self, runtimeArgs: argparse.Namespace, ds : DatasetCompetitionForma
self._bigann_query : np.ndarray
self._bigann_neighbors : np.ndarray
self._bigann_distances : np.ndarray
self._bigann_searchtype : str
self._bigann_nbrneighbors : int

if os.path.exists(self._hdf_filepath):
print(f"Warn: ANN HDF File '{self._hdf_filepath}' exist and will be overwritten")
Expand Down Expand Up @@ -63,6 +65,8 @@ async def bigann_getinfo(self) -> None:
await asyncio.gather(*gettasks)

self._hdf_dimension = self._bigann_dataset.shape[1]
self._bigann_searchtype = str(self._bigann_ds.search_type())
self._bigann_nbrneighbors = int(self._bigann_ds.default_count())

async def create_hdf(self) -> None:
import h5py
Expand All @@ -73,7 +77,9 @@ async def create_hdf(self) -> None:
f.attrs["sourcedataset"] = self._bigann_ds.short_name()
f.attrs["distance"] = self._hdf_distance
f.attrs["dimension"] = self._hdf_dimension
f.attrs["searchtype"] = self._bigann_searchtype
f.attrs["point_type"] = self._bigann_dataset[0].dtype.name.rstrip(digits)
f.attrs["nbrneighbors"] = self._bigann_nbrneighbors
print(f"train size: {self._bigann_dataset.shape[0]} * {self._bigann_dataset.shape[1]}")
print(f"test size: {self._bigann_query.shape[0]} * {self._bigann_query.shape[1]}")
f.create_dataset("train", data=self._bigann_dataset)
Expand Down

0 comments on commit 5bc8daf

Please sign in to comment.