Skip to content

Commit

Permalink
read token config from cassandra table; handle cases where cassandra …
Browse files Browse the repository at this point in the history
…is ahead of fs-cache; adjust highest processed block indexing
  • Loading branch information
Tommel71 committed Feb 29, 2024
1 parent e8b6147 commit 81e8aa3
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 51 deletions.
46 changes: 36 additions & 10 deletions src/graphsenselib/db/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from functools import lru_cache, partial
from typing import Iterable, List, Optional, Sequence, Tuple, Union

import pandas as pd
from cassandra import WriteTimeout
from tenacity import Retrying, retry_if_exception_type, stop_after_attempt

Expand Down Expand Up @@ -74,11 +75,12 @@ def get_cql_statement(self, keyspace):


class KeyspaceConfig:
def __init__(self, keyspace_name, db_type, address_type, tx_hash_type):
def __init__(self, keyspace_name, db_type, address_type, tx_hash_type, currency):
self._keyspace_name = keyspace_name
self._db_type = db_type
self._address_type = address_type
self._tx_hash_type = tx_hash_type
self._currency = currency

@property
def keyspace_name(self):
Expand Down Expand Up @@ -463,8 +465,16 @@ def get_item(index):
batch = self.get_exchange_rates_for_block_batch([index])
return 0 if has_er_value(batch) else 1

r = binary_search(GenericArrayFacade(get_item), 1, lo=start, hi=hb)
# r = get_last_notnone(GenericArrayFacade(get_item), start, hb)
def get_item_first_rates(index):
batch = self.get_exchange_rates_for_block_batch([index])
return 1 if has_er_value(batch) else 0

# find first block with exchange rates
first_rates = max(
binary_search(GenericArrayFacade(get_item_first_rates), 1, lo=start, hi=hb),
0,
)
r = binary_search(GenericArrayFacade(get_item), 1, lo=first_rates, hi=hb)

if r == -1:
# minus one could mean two things, either
Expand Down Expand Up @@ -675,23 +685,32 @@ def get_highest_cluster_id(self, sanity_check=True) -> Optional[int]:

def get_highest_block(self) -> Optional[int]:
stats = self.get_summary_statistics()
return int(stats.no_blocks) + 1 if stats is not None else None
if stats is None:
return None

# minus one when starting to count at 0
height_minus_noblocks = -1
height = height_minus_noblocks + int(stats.no_blocks)

def is_first_dalta_update_run(self) -> bool:
return height

def is_first_delta_update_run(self) -> bool:
stats = self.get_summary_statistics()
return stats is not None and int(stats.no_blocks) == int(
stats.no_blocks_transform
)

def get_highest_block_fulltransform(self) -> Optional[int]:
stats = self.get_summary_statistics()
if stats is not None and hasattr(stats, "no_blocks_transform"):
return int(stats.no_blocks_transform) + 1
if stats is not None:
return int(stats.no_blocks) + 1
else:
height_minus_noblocks = -1
if stats is None:
return None

if hasattr(stats, "no_blocks_transform"):
return height_minus_noblocks + int(stats.no_blocks_transform)
else:
return height_minus_noblocks + int(stats.no_blocks)

def get_highest_exchange_rate_block(self, sanity_check=True) -> Optional[int]:
res = self.select("exchange_rates", columns=["block_id"], per_partition_limit=1)
m = max([x.block_id for x in res])
Expand Down Expand Up @@ -791,6 +810,13 @@ def known_addresses_batch(
for (a, row) in self._db.await_batch(results)
}

def get_token_configuration(self):
stmt = self.select_stmt("token_configuration", limit=100)
res = self._db.execute(stmt)
df = pd.DataFrame(res)
df["token_address"] = df["token_address"].apply(lambda x: "0x" + x.hex())
return df

def get_address_id_async_batch(self, addresses: List[str]):
stmt = self.select_stmt(
"address_ids_by_address_prefix",
Expand Down
10 changes: 9 additions & 1 deletion src/graphsenselib/db/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ def from_config(self, env, currency) -> AnalyticsDb:
ks.transformed_keyspace_name,
ks.schema_type,
e.cassandra_nodes,
currency,
)

def from_name(
self, raw_keyspace_name, transformed_keyspace_name, schema_type, cassandra_nodes
self,
raw_keyspace_name,
transformed_keyspace_name,
schema_type,
cassandra_nodes,
currency,
) -> AnalyticsDb:
db_types = get_db_types_by_schema_type(schema_type)
return AnalyticsDb(
Expand All @@ -63,12 +69,14 @@ def from_name(
db_types.raw_db_type,
db_types.address_type,
db_types.transaction_type,
currency,
),
transformed=KeyspaceConfigDB(
transformed_keyspace_name,
db_types.transformed_db_type,
db_types.address_type,
db_types.transaction_type,
currency,
),
db=CassandraDb(cassandra_nodes),
)
26 changes: 24 additions & 2 deletions src/graphsenselib/deltaupdate/deltaupdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ def find_import_range(
hb_du = 0 if hb_du is None else hb_du
hb_ft = 0 if hb_ft is None else hb_ft

if start_block_overwrite is not None:
last_block = max([i for i in (hb_du, hb_ft) if i is not None])
if start_block_overwrite <= last_block:
raise Exception(
f"Start block {start_block_overwrite} is before last "
f"delta update {hb_du}."
f" Or before last full transform {hb_ft}."
f" This would corrupt the state of balances. Exiting."
)
if start_block_overwrite > last_block + 1:
raise Exception(
f"Start block {start_block_overwrite} is in the future."
f" It looks like blocks are left out in the transformation"
f" Start block should be {last_block+1}"
f" Tried starting at {start_block_overwrite}. Exiting."
)

start_block = hb_du + 1 if start_block_overwrite is None else start_block_overwrite
latest_address_id = db.transformed.get_highest_address_id()
latest_cluster_id = db.transformed.get_highest_cluster_id()
Expand Down Expand Up @@ -128,7 +145,12 @@ def update_transformed(
f"Done with {min(b) - start_block}, {end_block - min(b) + 1} to go."
)

updater.process_batch(b)
final_block = updater.process_batch(b)
if final_block == -1:
logger.warning(
f"First block in batch {min(b)} is empty." f"Finishing update."
)
break
updater.persist_updater_progress()

blocks_processed = (updater.last_block_processed - start_block) + 1
Expand Down Expand Up @@ -180,7 +202,7 @@ def update(
db, start_block, end_block, forward_fill_rates=forward_fill_rates
)
if end_block >= start_block:
is_first_delta_run = db.transformed.is_first_dalta_update_run()
is_first_delta_run = db.transformed.is_first_delta_update_run()
if is_first_delta_run:
# Full transform set nr_blocks a bit different (currency dep).
# To be sure about the block we look at exchange rates table
Expand Down
6 changes: 4 additions & 2 deletions src/graphsenselib/deltaupdate/update/abstractupdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ def process_batch(self, batch: Iterable[int]):
self.import_exchange_rates(batch_int)

with LoggerScope.debug(logger, "Transform data"):
self.process_batch_impl_hook(batch_int)
final_block = self.process_batch_impl_hook(batch_int)

self._time_last_batch = time.time() - self._batch_start_time
self._last_block_processed = batch[-1]
if final_block != -1:
self._last_block_processed = final_block
return final_block


class LegacyUpdateStrategy(AbstractUpdateStrategy):
Expand Down
39 changes: 16 additions & 23 deletions src/graphsenselib/deltaupdate/update/account/tokens.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
from dataclasses import dataclass

import pandas as pd
from eth_abi import decode_single
from eth_utils import function_abi_to_4byte_selector, to_hex
from web3 import Web3

from graphsenselib.deltaupdate.update.resources.supported_tokens_eth import (
SUPPORTED_TOKENS as eth_tokens,
)
from graphsenselib.deltaupdate.update.resources.supported_tokens_trx import (
SUPPORTED_TOKENS as trx_tokens,
)


@dataclass
class TokenTransfer:
Expand All @@ -27,7 +21,7 @@ class TokenTransfer:


class ERC20Decoder:
def __init__(self, network="eth"):
def __init__(self, currency: str, supported_tokens: pd.DataFrame):
self.w3 = Web3()

self.token_transfer_event_abi = {
Expand All @@ -44,25 +38,23 @@ def __init__(self, network="eth"):
self.token_transfer_event_selector = self.get_event_selector(
self.token_transfer_event_abi
)
self.network = network

if self.network == "eth":
self.supported_tokens = eth_tokens
elif self.network == "trx":
self.supported_tokens = trx_tokens
else:
raise Exception("Unsupported network")
self.currency = currency.upper()
self.supported_tokens = supported_tokens

def get_event_selector(self, event_abi):
return to_hex(function_abi_to_4byte_selector(event_abi))

def log_to_transfer(self, log):
if "0x" + log.address.hex() in self.supported_tokens["address"].values:
if "0x" + log.address.hex() in self.supported_tokens["token_address"].values:
return self.decode_transfer(log)

def decode_transfer(self, log):
if "0x" + log.topics[0].hex()[:8] == self.token_transfer_event_selector:
if "0x" + log.address.hex() not in self.supported_tokens["address"].values:
if (
"0x" + log.address.hex()
not in self.supported_tokens["token_address"].values
):
raise Exception(
"Unsupported token, use the log_to_transfer function instead"
)
Expand All @@ -79,12 +71,13 @@ def decode_transfer(self, log):
]
)
value = decode_single("uint256", log.data)
mask = self.supported_tokens["address"] == "0x" + log.address.hex()
asset = self.supported_tokens[mask]["asset"].values[0]
coin_equivalent = self.supported_tokens[mask]["coin_equivalent"].values[
0
]
usd_equivalent = self.supported_tokens[mask]["usd_equivalent"].values[0]
mask = (
self.supported_tokens["token_address"] == "0x" + log.address.hex()
)
asset = self.supported_tokens[mask]["currency_ticker"].values[0]
peg = self.supported_tokens[mask]["peg_currency"].values[0]
coin_equivalent = peg == self.currency
usd_equivalent = peg == "USD"
decimals = self.supported_tokens[mask]["decimals"].values[0]

return TokenTransfer(
Expand Down
29 changes: 21 additions & 8 deletions src/graphsenselib/deltaupdate/update/account/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def get_block_data(self, cache, block):
def get_fee_data(self, cache, txs):
return cache.get(("fee", txs), [{"fee": None}])[0]["fee"]

def process_batch_impl_hook(self, batch):
def process_batch_impl_hook(self, batch: List[int]) -> int:
rates = {}
transactions = []
traces = []
Expand Down Expand Up @@ -195,11 +195,23 @@ def process_batch_impl_hook(self, batch):

if len(blocks_new) == 0: #
msg = (
f"Block {block} is not present in cache. Please ingest with"
f"option --sinks fs-cache"
f"Block {block} is not present in cache. Please ingest"
f" more blocks with option --sinks fs-cache"
)
log.error(msg)
raise Exception(msg)

if block == batch[0]:
log.warning("First block of batch not in cache.")
return -1
else:
log.warning(
"Skipping block without data in cache."
"Continuing with next block."
)
break

final_block = block

transactions.extend(txs_new)
traces.extend(traces_new)
logs.extend(logs_new)
Expand Down Expand Up @@ -250,7 +262,6 @@ def process_batch_impl_hook(self, batch):
)

changes.extend(tx_changes)
last_block_processed = batch[-1]

if self.currency == "trx":
nr_new_tx = len([tx for tx in transactions if tx.receipt_status == 1])
Expand All @@ -262,14 +273,14 @@ def process_batch_impl_hook(self, batch):
bookkeeping_changes = get_bookkeeping_changes(
self._statistics,
self._db.transformed.get_summary_statistics(),
last_block_processed,
final_block,
nr_new_address_relations,
nr_new_addresses,
nr_new_tx,
self.highest_address_id,
runtime_seconds,
bts,
len(batch),
len(blocks),
patch_mode=self._patch_mode,
)

Expand All @@ -279,6 +290,7 @@ def process_batch_impl_hook(self, batch):
# They are applied at the end of the batch in
# persist_updater_progress
self.changes = changes
return final_block

else:
raise ValueError(
Expand Down Expand Up @@ -343,7 +355,8 @@ def get_address_prefix(address_str):
hash_to_tx = dict(zip(tx_hashes, transactions))

with LoggerScope.debug(logger, "Decode logs to token transfers"):
tokendecoder = ERC20Decoder(self.currency)
supported_tokens = self._db.transformed.get_token_configuration()
tokendecoder = ERC20Decoder(currency, supported_tokens)
token_transfers = no_nones(
[tokendecoder.log_to_transfer(log) for log in logs]
)
Expand Down
2 changes: 1 addition & 1 deletion src/graphsenselib/deltaupdate/update/utxo/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def get_bookkeeping_changes(
with LoggerScope.debug(logger, "Creating summary_statistics updates") as lg:
lb_date = bts[last_block_processed]
stats = base_statistics
no_blocks = last_block_processed - 1
no_blocks = last_block_processed + 1

""" Update local stats """
if not patch_mode:
Expand Down
Loading

0 comments on commit 81e8aa3

Please sign in to comment.