diff --git a/scripts/dev-ingest.sh b/scripts/dev-ingest.sh index ac133aa..5bb8fde 100644 --- a/scripts/dev-ingest.sh +++ b/scripts/dev-ingest.sh @@ -1,8 +1,22 @@ #!/usr/bin/env bash NW=${1:-bch} EB=${2:-100000} +UV=${3:-2} +WB=${4:-3} +RB=${5:-15} # --end-date 2011-11-20 -graphsense-cli -v ingest from-node -e dev -c ${NW} --end-block ${EB} --batch-size 15 --create-schema --mode='utxo_with_tx_graph' && \ -graphsense-cli -v exchange-rates coindesk ingest -e dev -c ${NW} --abort-on-gaps && \ -graphsense-cli -v exchange-rates coinmarketcap ingest -e dev -c ${NW} --abort-on-gaps && \ -graphsense-cli -v delta-update update -e dev -c ${NW} --end-block ${EB} --write-batch-size 3 --updater-version 2 --create-schema --pedantic --forward-fill-rates + +echo "Import on ${NW} till ${EB} with delta updater version ${UV}" + +if [ ${NW}=eth ] +then + graphsense-cli -v ingest from-node -e dev -c ${NW} --end-block ${EB} --batch-size ${RB} --create-schema && \ + graphsense-cli -v exchange-rates coindesk ingest -e dev -c ${NW} --abort-on-gaps && \ + graphsense-cli -v exchange-rates coinmarketcap ingest -e dev -c ${NW} --abort-on-gaps && \ + graphsense-cli -v delta-update update -e dev -c ${NW} --end-block ${EB} --write-batch-size ${WB} --updater-version ${UV} --create-schema --pedantic --forward-fill-rates +else + graphsense-cli -v ingest from-node -e dev -c ${NW} --end-block ${EB} --batch-size ${RB} --create-schema --mode='utxo_with_tx_graph' && \ + graphsense-cli -v exchange-rates coindesk ingest -e dev -c ${NW} --abort-on-gaps && \ + graphsense-cli -v exchange-rates coinmarketcap ingest -e dev -c ${NW} --abort-on-gaps && \ + graphsense-cli -v delta-update update -e dev -c ${NW} --end-block ${EB} --write-batch-size ${WB} --updater-version ${UV} --create-schema --pedantic --forward-fill-rates +fi diff --git a/src/graphsenselib/deltaupdate/update/abstractupdater.py b/src/graphsenselib/deltaupdate/update/abstractupdater.py index 876e8f0..46e2199 100644 --- a/src/graphsenselib/deltaupdate/update/abstractupdater.py +++ b/src/graphsenselib/deltaupdate/update/abstractupdater.py @@ -45,7 +45,9 @@ def write_dirty_addresses( ) -def forward_fill_rates(rates, fill_from_block, fill_values) -> Tuple[List[Dict], bool]: +def forward_fill_rates_with_fill_value( + rates, fill_from_block, fill_values +) -> Tuple[List[Dict], bool]: return ( [ ( @@ -61,6 +63,30 @@ def forward_fill_rates(rates, fill_from_block, fill_values) -> Tuple[List[Dict], ) +def fill_and_store_rates(db, batch, forward_fill_rates: bool): + rates = db.raw.get_exchange_rates_for_block_batch(batch) + if forward_fill_rates: + hbe, fill_values = get_forward_fill_rate(db, forward_fill_rates) + rates, had_to_fill = forward_fill_rates_with_fill_value(rates, hbe, fill_values) + if had_to_fill: + logger.warning( + "Missing exchange rates forward filled with " + f"last good data from block {hbe} {fill_values}" + ) + db.transformed.ingest("exchange_rates", rates) + + +def get_forward_fill_rate(db, forward_fill_rates: bool): + if forward_fill_rates: + hbe = db.raw.find_highest_block_with_exchange_rates() + return ( + hbe, + db.raw.get_exchange_rates_for_block_batch([hbe])[0]["fiat_values"], + ) + else: + return (None, None) + + class AbstractUpdateStrategy(ABC): def __init__(self): self._time_last_batch = 0 @@ -113,15 +139,7 @@ def __init__(self, db, currency, forward_fill_rates=False): self.forward_fill_rates = forward_fill_rates def get_forward_fill_rate(self): - if self.forward_fill_rates: - db = self._db - hbe = db.raw.find_highest_block_with_exchange_rates() - return ( - hbe, - db.raw.get_exchange_rates_for_block_batch([hbe])[0]["fiat_values"], - ) - else: - return (None, None) + return @property def currency(self): @@ -162,16 +180,7 @@ def process_batch_impl_hook(self, batch): pass def import_exchange_rates(self, batch: List[int]): - rates = self._db.raw.get_exchange_rates_for_block_batch(batch) - if self.forward_fill_rates: - hbe, fill_values = self.get_forward_fill_rate() - rates, had_to_fill = forward_fill_rates(rates, hbe, fill_values) - if had_to_fill: - logger.warning( - "Missing exchange rates forward filled with " - f"last good data from block {hbe} {fill_values}" - ) - self._db.transformed.ingest("exchange_rates", rates) + fill_and_store_rates(self._db, batch, self.forward_fill_rates) def process_batch(self, batch: Iterable[int]): self._batch_start_time = time.time() @@ -188,7 +197,7 @@ def process_batch(self, batch: Iterable[int]): class LegacyUpdateStrategy(AbstractUpdateStrategy): - def __init__(self, db, currency, write_new, write_dirty): + def __init__(self, db, currency, write_new, write_dirty, forward_fill_rates=False): super().__init__() self._db = db self._write_new = write_new @@ -196,7 +205,8 @@ def __init__(self, db, currency, write_new, write_dirty): self._new_addresses = {} self._nr_queried_addresses = 0 self._nr_new_addresses = 0 - self._highest_address_id = db.transformed.get_highest_address_id() + self._highest_address_id = db.transformed.get_highest_address_id() or 0 + self.forward_fill_rates = forward_fill_rates def prepare_database(self): HISTORY_TABLE_COLUMNS = [ @@ -240,8 +250,9 @@ def process_batch(self, batch): start_time = time.time() logger.debug("Start - Importing Exchange Rates") - rates = self._db.raw.get_exchange_rates_for_block_batch(list(batch)) - self._db.transformed.ingest("exchange_rates", rates) + fill_and_store_rates(self._db, list(batch), self.forward_fill_rates) + # rates = self._db.raw.get_exchange_rates_for_block_batch(list(batch)) + # self._db.transformed.ingest("exchange_rates", rates) logger.debug("End - Importing Exchange Rates") logger.debug("Start - Chain Specific Import") diff --git a/src/graphsenselib/deltaupdate/update/account.py b/src/graphsenselib/deltaupdate/update/account.py index 3c96382..45db0ef 100644 --- a/src/graphsenselib/deltaupdate/update/account.py +++ b/src/graphsenselib/deltaupdate/update/account.py @@ -12,8 +12,10 @@ class UpdateStrategyAccount(LegacyUpdateStrategy): - def __init__(self, db, currency, write_new, write_dirty): - super().__init__(db, currency, write_new, write_dirty) + def __init__(self, db, currency, write_new, write_dirty, forward_fill_rates=False): + super().__init__( + db, currency, write_new, write_dirty, forward_fill_rates=forward_fill_rates + ) def prepare_database(self): super().prepare_database() diff --git a/src/graphsenselib/deltaupdate/update/factory.py b/src/graphsenselib/deltaupdate/update/factory.py index 85fc7fb..e1fd859 100644 --- a/src/graphsenselib/deltaupdate/update/factory.py +++ b/src/graphsenselib/deltaupdate/update/factory.py @@ -36,6 +36,12 @@ def get_updater( forward_fill_rates=forward_fill_rates, ) if schema_type == "account" and version == 1: - return UpdateStrategyAccount(db, currency, write_new, write_dirty) + return UpdateStrategyAccount( + db, + currency, + write_new, + write_dirty, + forward_fill_rates=forward_fill_rates, + ) else: raise Exception(f"Unsupported schema type {schema_type} or {version}") diff --git a/src/graphsenselib/rates/coindesk.py b/src/graphsenselib/rates/coindesk.py index 9d9544a..b36fbde 100644 --- a/src/graphsenselib/rates/coindesk.py +++ b/src/graphsenselib/rates/coindesk.py @@ -112,7 +112,7 @@ def fetch_impl( if most_recent_date is not None: start_date = most_recent_date.strftime(DATE_FORMAT) - logger.info("*** Starting exchange rate ingest for BTC ***") + logger.info(f"*** Starting exchange rate ingest for {currency} ***") logger.info(f"Start date: {start_date}") logger.info(f"End date: {end_date}")