diff --git a/CHANGELOG.md b/CHANGELOG.md index b85ff40..04ea1a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). -## [24.02.3/2.2.4] 2024-03-19 + +## [24.02.5/2.2.5] 2024-04-08 +### fixed +- performance problem (timeouts) on fetching transactions per block for utxo currencies. + +## [24.02.4/2.2.4] 2024-03-19 ### fixed - tron delta update: missing tx_hash for traces in deployment txs. diff --git a/Makefile b/Makefile index afc7424..8298f61 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ SHELL := /bin/bash PROJECT := graphsense-lib VENV := venv -RELEASE := 'v24.02.4' -RELEASESEM := 'v2.2.4' +RELEASE := 'v24.02.5' +RELEASESEM := 'v2.2.5' all: format lint test build diff --git a/src/graphsenselib/db/cassandra.py b/src/graphsenselib/db/cassandra.py index 6d22f86..8108a4f 100644 --- a/src/graphsenselib/db/cassandra.py +++ b/src/graphsenselib/db/cassandra.py @@ -394,6 +394,10 @@ def execute(self, cql_query_str: str, fetch_size=None) -> Iterable: return self.session.execute(stmt) + @needs_session + def execute_statement(self, stmt: BoundStatement, fetch_size=None) -> Iterable: + return self.session.execute(stmt) + @needs_session def execute_async(self, cql_query_str: str, fetch_size=None): # flat_stmt = cql_query_str.replace("\n", " ") @@ -417,6 +421,7 @@ def execute_statements_atomic(self, statements: List[BoundStatement]): self.session.execute(batch) + @needs_session def execute_statements(self, statements: List[BoundStatement]): return execute_concurrent( self.session, @@ -424,6 +429,7 @@ def execute_statements(self, statements: List[BoundStatement]): raise_on_first_error=True, ) + @needs_session def execute_statements_async( self, statements: List[BoundStatement], concurrency=100 ): @@ -441,6 +447,12 @@ def execute_statement_async(self, stmt, params): @needs_session def execute_batch_async(self, stmt, params): + if len(params) > 10000: + logger.warning( + "CAUTION: Running many (10k+) parallel queries against db " + "without concurrency control. " + "Might lead to timeouts. Consider using execute_statements_async." + ) prp = self.get_prepared_statement(stmt) futures = [ (identifier, self.execute_statement_async(prp, param_list)) diff --git a/src/graphsenselib/db/utxo.py b/src/graphsenselib/db/utxo.py index 0372158..5b34bca 100644 --- a/src/graphsenselib/db/utxo.py +++ b/src/graphsenselib/db/utxo.py @@ -26,20 +26,30 @@ def get_transaction_ids_in_block(self, block: int) -> Iterable: return ids def get_transactions_in_block(self, block: int) -> Iterable: - tx_ids = self.get_transaction_ids_in_block(block) tx_bucket_size = self.get_tx_bucket_size() - stmt = self.select_stmt( - "transaction", - where={"tx_id_group": "?", "tx_id": "?"}, - limit=1, + minb = self.get_latest_tx_id_before_block(block) + maxb = self.get_latest_tx_id_before_block(block + 1) + + mbg = self.get_id_group(minb, tx_bucket_size) + mxbg = self.get_id_group(maxb, tx_bucket_size) + + rg = list(range(mbg, mxbg + 1)) + + stmt = ( + f"select * from {self.get_keyspace()}.transaction where " + "tx_id_group in :txidgroups and tx_id > :txid_lower " + "and tx_id <= :txid_upper" ) - parameters = [ - (tx_id, [self.get_id_group(tx_id, tx_bucket_size), tx_id]) - for tx_id in tx_ids - ] - results = self._db.execute_batch_async(stmt, parameters) - return [tx.one() for tx_id, tx in self._db.await_batch(results)] + prepared_statement = self._db.get_prepared_statement(stmt) + + bstmt = prepared_statement.bind( + {"txidgroups": rg, "txid_lower": minb, "txid_upper": maxb} + ) + + results = self._db.execute_statement(bstmt) + + return list(results) def get_addresses_in_block(self, block: int) -> Iterable[SlimTx]: tx_ids = self.get_transaction_ids_in_block(block) diff --git a/src/graphsenselib/deltaupdate/update/utxo/update.py b/src/graphsenselib/deltaupdate/update/utxo/update.py index 1776a98..e1a589f 100644 --- a/src/graphsenselib/deltaupdate/update/utxo/update.py +++ b/src/graphsenselib/deltaupdate/update/utxo/update.py @@ -1273,9 +1273,6 @@ def process_batch_impl_hook(self, batch) -> Tuple[Action, Optional[int]]: block ).fiat_values if fiat_values is None: - # raise Exception( - # "No exchange rate for block {block}. Abort processing." - # ) missing_rates_in_block = True fiat_values = [0, 0] rates[block] = fiat_values