Skip to content

Commit

Permalink
fix: Handle duplicate key violation in voter contributions | NPG-000 (#…
Browse files Browse the repository at this point in the history
…662)

# Description

Changes how voter contribution is uploaded to DB by keeping track of the
unique key constraint tuple, and processing the network with the highest
priority first.

Duplicate records are ignored and an error is logged.

break: CLI option for `network_ids` help docs updated to specify the
valid network names.

## Type of change

Please delete options that are not relevant.

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] This change requires a documentation update

## Checklist

- [x] My code follows the style guidelines of this project
- [x] I have performed a self-review of my code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [x] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged and published in downstream
modules
  • Loading branch information
saibatizoku authored Jan 23, 2024
1 parent f9cce5c commit adb2c1c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def import_snapshot(
network_ids: List[str] = typer.Option(
...,
envvar="SNAPSHOT_NETWORK_IDS",
help="Network id to pass as parameter to snapshot_tool",
help=("Network id to pass as parameter to snapshot_tool. Valid values are: 'mainnet' 'preprod' 'testnet'"),
),
snapshot_tool_path: str = typer.Option(default="snapshot_tool", envvar="SNAPSHOT_TOOL_PATH", help="Path to the snapshot tool"),
catalyst_toolbox_path: str = typer.Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import json
import os
import re
from typing import Dict, List, Tuple, Optional
from typing import Dict, List, Literal, Set, Tuple, Optional
from loguru import logger
from pydantic import BaseModel

from ideascale_importer.gvc import Client as GvcClient
import ideascale_importer.db
from ideascale_importer.db import models
from ideascale_importer.utils import run_cmd
Expand Down Expand Up @@ -59,7 +58,7 @@ class Registration(BaseModel):
class CatalystToolboxDreps(BaseModel):
"""Represents the input format of the dreps file of catalyst-toolbox."""

reps: List[str]
reps: List[str] = []


class OutputDirectoryDoesNotExist(Exception):
Expand Down Expand Up @@ -288,7 +287,7 @@ def __init__(
eventdb_url: str,
event_id: int,
output_dir: str,
network_ids: List[str],
network_ids: List[Literal["mainnet", "preprod", "testnet"]],
snapshot_tool_path: str,
catalyst_toolbox_path: str,
gvc_api_url: str,
Expand Down Expand Up @@ -321,7 +320,7 @@ def __init__(
self.dreps_json = "[]"

if not os.path.exists(output_dir):
raise OutputDirectoryDoesNotExist(output_dir)
os.makedirs(output_dir)

self.output_dir = output_dir

Expand Down Expand Up @@ -397,6 +396,12 @@ async def _run_snapshot_tool(self):

params = self.network_params[network_id]

match network_id:
case "mainnet":
snapshot_net = "mainnet"
case _:
snapshot_net = "testnet"

if self.ssh_config is None:
snapshot_tool_cmd = (
f"{self.snapshot_tool_path}"
Expand All @@ -405,7 +410,7 @@ async def _run_snapshot_tool(self):
f" --db-host {db_host}"
f" --db {db_name}"
f" --min-slot 0 --max-slot {params.registration_snapshot_slot}"
f" --network-id {network_id}"
f" --network-id {snapshot_net}"
f" --out-file {params.snapshot_tool_out_file}"
)

Expand Down Expand Up @@ -473,12 +478,16 @@ async def _run_catalyst_toolbox_snapshot(self):
)

for network_id, params in self.network_params.items():
discr = "test"
if network_id == "main" or network_id == "mainnet":
discr = "production"
with logger.contextualize(network_id=network_id):
catalyst_toolbox_cmd = (
f"{self.catalyst_toolbox_path} snapshot"
f" -s {params.snapshot_tool_out_file}"
f" -m {self.event_parameters.min_stake_threshold}"
f" -v {self.event_parameters.voting_power_cap}"
f" -d {discr}"
f" {params.catalyst_toolbox_out_file}"
)

Expand Down Expand Up @@ -681,8 +690,9 @@ async def _write_db_data(self):

voters: Dict[str, models.Voter] = {}
contributions: List[models.Contribution] = []
uniq_contrib_keys: Set[Tuple[str, str, str]] = set([])

for network_id, network_processed_snapshot in catalyst_toolbox_data.items():
async def process_voters(network_id, network_processed_snapshot):
network_report = network_snapshot_reports[network_id]

for ctd in network_processed_snapshot.voters:
Expand All @@ -694,19 +704,19 @@ async def _write_db_data(self):
# This can be removed once it's fixed in catalyst-toolbox
if not voting_key.startswith("0x"):
voting_key = "0x" + voting_key
stake_public_key = snapshot_contribution.stake_public_key
voting_group = ctd.hir.voting_group

delegation_data = registration_delegation_data[network_id][
f"{snapshot_contribution.stake_public_key}{voting_key}"
]
delegation_data = registration_delegation_data[network_id][f"{stake_public_key}{voting_key}"]

contribution = models.Contribution(
stake_public_key=snapshot_contribution.stake_public_key,
stake_public_key=stake_public_key,
snapshot_id=0,
voting_key=voting_key,
voting_weight=delegation_data["voting_weight"],
voting_key_idx=delegation_data["voting_key_idx"],
value=snapshot_contribution.value,
voting_group=ctd.hir.voting_group,
voting_group=voting_group,
reward_address=snapshot_contribution.reward_address,
)

Expand All @@ -717,8 +727,23 @@ async def _write_db_data(self):
voting_power=ctd.hir.voting_power,
)

contributions.append(contribution)
voters[f"{voter.voting_key}{voter.voting_group}"] = voter
# uniq_key that mirrors the unique key constraint in the DB
uniq_key = (stake_public_key, voting_key, voting_group)

# Add uniq_key if not already present, and append
# contribution and voter models.
if uniq_key not in uniq_contrib_keys:
uniq_contrib_keys.add(uniq_key)
contributions.append(contribution)
voters[f"{voter.voting_key}{voter.voting_group}"] = voter
else:
logger.error(
"Duplicate unique contribution key found, ignoring voter contribution",
network_id=network_id,
uniq_key=str(uniq_key),
contribution=str(contribution),
voter=str(voter),
)

await asyncio.sleep(0)

Expand All @@ -743,6 +768,16 @@ async def _write_db_data(self):
total_unique_rewards=len(network_report.unique_rewards),
)

# Process the snapshot from the highest_priority_network first to get the
# uniq_contrib_keys.
if highest_priority_network in catalyst_toolbox_data:
network_processed_snapshot = catalyst_toolbox_data.pop(highest_priority_network)
await process_voters(highest_priority_network, network_processed_snapshot)

# Process the rest of the network data.
for network_id, network_processed_snapshot in catalyst_toolbox_data.items():
await process_voters(network_id, network_processed_snapshot)

conn = await ideascale_importer.db.connect(self.eventdb_url)

async with conn.transaction():
Expand Down

0 comments on commit adb2c1c

Please sign in to comment.