Skip to content

Commit

Permalink
enable forward fill for ethereum delta update
Browse files Browse the repository at this point in the history
  • Loading branch information
soad003 committed Oct 6, 2023
1 parent ca52f41 commit bc28558
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 32 deletions.
22 changes: 18 additions & 4 deletions scripts/dev-ingest.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
#!/usr/bin/env bash
NW=${1:-bch}
EB=${2:-100000}
UV=${3:-2}
WB=${4:-3}
RB=${5:-15}
# --end-date 2011-11-20
graphsense-cli -v ingest from-node -e dev -c ${NW} --end-block ${EB} --batch-size 15 --create-schema --mode='utxo_with_tx_graph' && \
graphsense-cli -v exchange-rates coindesk ingest -e dev -c ${NW} --abort-on-gaps && \
graphsense-cli -v exchange-rates coinmarketcap ingest -e dev -c ${NW} --abort-on-gaps && \
graphsense-cli -v delta-update update -e dev -c ${NW} --end-block ${EB} --write-batch-size 3 --updater-version 2 --create-schema --pedantic --forward-fill-rates

echo "Import on ${NW} till ${EB} with delta updater version ${UV}"

if [ ${NW}=eth ]
then
graphsense-cli -v ingest from-node -e dev -c ${NW} --end-block ${EB} --batch-size ${RB} --create-schema && \
graphsense-cli -v exchange-rates coindesk ingest -e dev -c ${NW} --abort-on-gaps && \
graphsense-cli -v exchange-rates coinmarketcap ingest -e dev -c ${NW} --abort-on-gaps && \
graphsense-cli -v delta-update update -e dev -c ${NW} --end-block ${EB} --write-batch-size ${WB} --updater-version ${UV} --create-schema --pedantic --forward-fill-rates
else
graphsense-cli -v ingest from-node -e dev -c ${NW} --end-block ${EB} --batch-size ${RB} --create-schema --mode='utxo_with_tx_graph' && \
graphsense-cli -v exchange-rates coindesk ingest -e dev -c ${NW} --abort-on-gaps && \
graphsense-cli -v exchange-rates coinmarketcap ingest -e dev -c ${NW} --abort-on-gaps && \
graphsense-cli -v delta-update update -e dev -c ${NW} --end-block ${EB} --write-batch-size ${WB} --updater-version ${UV} --create-schema --pedantic --forward-fill-rates
fi
59 changes: 35 additions & 24 deletions src/graphsenselib/deltaupdate/update/abstractupdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def write_dirty_addresses(
)


def forward_fill_rates(rates, fill_from_block, fill_values) -> Tuple[List[Dict], bool]:
def forward_fill_rates_with_fill_value(
rates, fill_from_block, fill_values
) -> Tuple[List[Dict], bool]:
return (
[
(
Expand All @@ -61,6 +63,30 @@ def forward_fill_rates(rates, fill_from_block, fill_values) -> Tuple[List[Dict],
)


def fill_and_store_rates(db, batch, forward_fill_rates: bool):
rates = db.raw.get_exchange_rates_for_block_batch(batch)
if forward_fill_rates:
hbe, fill_values = get_forward_fill_rate(db, forward_fill_rates)
rates, had_to_fill = forward_fill_rates_with_fill_value(rates, hbe, fill_values)
if had_to_fill:
logger.warning(
"Missing exchange rates forward filled with "
f"last good data from block {hbe} {fill_values}"
)
db.transformed.ingest("exchange_rates", rates)


def get_forward_fill_rate(db, forward_fill_rates: bool):
if forward_fill_rates:
hbe = db.raw.find_highest_block_with_exchange_rates()
return (
hbe,
db.raw.get_exchange_rates_for_block_batch([hbe])[0]["fiat_values"],
)
else:
return (None, None)


class AbstractUpdateStrategy(ABC):
def __init__(self):
self._time_last_batch = 0
Expand Down Expand Up @@ -113,15 +139,7 @@ def __init__(self, db, currency, forward_fill_rates=False):
self.forward_fill_rates = forward_fill_rates

def get_forward_fill_rate(self):
if self.forward_fill_rates:
db = self._db
hbe = db.raw.find_highest_block_with_exchange_rates()
return (
hbe,
db.raw.get_exchange_rates_for_block_batch([hbe])[0]["fiat_values"],
)
else:
return (None, None)
return

@property
def currency(self):
Expand Down Expand Up @@ -162,16 +180,7 @@ def process_batch_impl_hook(self, batch):
pass

def import_exchange_rates(self, batch: List[int]):
rates = self._db.raw.get_exchange_rates_for_block_batch(batch)
if self.forward_fill_rates:
hbe, fill_values = self.get_forward_fill_rate()
rates, had_to_fill = forward_fill_rates(rates, hbe, fill_values)
if had_to_fill:
logger.warning(
"Missing exchange rates forward filled with "
f"last good data from block {hbe} {fill_values}"
)
self._db.transformed.ingest("exchange_rates", rates)
fill_and_store_rates(self._db, batch, self.forward_fill_rates)

def process_batch(self, batch: Iterable[int]):
self._batch_start_time = time.time()
Expand All @@ -188,15 +197,16 @@ def process_batch(self, batch: Iterable[int]):


class LegacyUpdateStrategy(AbstractUpdateStrategy):
def __init__(self, db, currency, write_new, write_dirty):
def __init__(self, db, currency, write_new, write_dirty, forward_fill_rates=False):
super().__init__()
self._db = db
self._write_new = write_new
self._write_dirty = write_dirty
self._new_addresses = {}
self._nr_queried_addresses = 0
self._nr_new_addresses = 0
self._highest_address_id = db.transformed.get_highest_address_id()
self._highest_address_id = db.transformed.get_highest_address_id() or 0
self.forward_fill_rates = forward_fill_rates

def prepare_database(self):
HISTORY_TABLE_COLUMNS = [
Expand Down Expand Up @@ -240,8 +250,9 @@ def process_batch(self, batch):
start_time = time.time()

logger.debug("Start - Importing Exchange Rates")
rates = self._db.raw.get_exchange_rates_for_block_batch(list(batch))
self._db.transformed.ingest("exchange_rates", rates)
fill_and_store_rates(self._db, list(batch), self.forward_fill_rates)
# rates = self._db.raw.get_exchange_rates_for_block_batch(list(batch))
# self._db.transformed.ingest("exchange_rates", rates)
logger.debug("End - Importing Exchange Rates")

logger.debug("Start - Chain Specific Import")
Expand Down
6 changes: 4 additions & 2 deletions src/graphsenselib/deltaupdate/update/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@


class UpdateStrategyAccount(LegacyUpdateStrategy):
def __init__(self, db, currency, write_new, write_dirty):
super().__init__(db, currency, write_new, write_dirty)
def __init__(self, db, currency, write_new, write_dirty, forward_fill_rates=False):
super().__init__(
db, currency, write_new, write_dirty, forward_fill_rates=forward_fill_rates
)

def prepare_database(self):
super().prepare_database()
Expand Down
8 changes: 7 additions & 1 deletion src/graphsenselib/deltaupdate/update/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ def get_updater(
forward_fill_rates=forward_fill_rates,
)
if schema_type == "account" and version == 1:
return UpdateStrategyAccount(db, currency, write_new, write_dirty)
return UpdateStrategyAccount(
db,
currency,
write_new,
write_dirty,
forward_fill_rates=forward_fill_rates,
)
else:
raise Exception(f"Unsupported schema type {schema_type} or {version}")
2 changes: 1 addition & 1 deletion src/graphsenselib/rates/coindesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def fetch_impl(
if most_recent_date is not None:
start_date = most_recent_date.strftime(DATE_FORMAT)

logger.info("*** Starting exchange rate ingest for BTC ***")
logger.info(f"*** Starting exchange rate ingest for {currency} ***")
logger.info(f"Start date: {start_date}")
logger.info(f"End date: {end_date}")

Expand Down

0 comments on commit bc28558

Please sign in to comment.