Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle duplicate key violation in voter contributions | NPG-000 #662

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def import_snapshot(
network_ids: List[str] = typer.Option(
...,
envvar="SNAPSHOT_NETWORK_IDS",
help="Network id to pass as parameter to snapshot_tool",
help=("Network id to pass as parameter to snapshot_tool. Valid values are: 'mainnet' 'preprod' 'testnet'"),
),
snapshot_tool_path: str = typer.Option(default="snapshot_tool", envvar="SNAPSHOT_TOOL_PATH", help="Path to the snapshot tool"),
catalyst_toolbox_path: str = typer.Option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import json
import os
import re
from typing import Dict, List, Tuple, Optional
from typing import Dict, List, Literal, Set, Tuple, Optional
from loguru import logger
from pydantic import BaseModel

from ideascale_importer.gvc import Client as GvcClient
import ideascale_importer.db
from ideascale_importer.db import models
from ideascale_importer.utils import run_cmd
Expand Down Expand Up @@ -59,7 +58,7 @@ class Registration(BaseModel):
class CatalystToolboxDreps(BaseModel):
"""Represents the input format of the dreps file of catalyst-toolbox."""

reps: List[str]
reps: List[str] = []


class OutputDirectoryDoesNotExist(Exception):
Expand Down Expand Up @@ -288,7 +287,7 @@ def __init__(
eventdb_url: str,
event_id: int,
output_dir: str,
network_ids: List[str],
network_ids: List[Literal["mainnet", "preprod", "testnet"]],
snapshot_tool_path: str,
catalyst_toolbox_path: str,
gvc_api_url: str,
Expand Down Expand Up @@ -321,7 +320,7 @@ def __init__(
self.dreps_json = "[]"

if not os.path.exists(output_dir):
raise OutputDirectoryDoesNotExist(output_dir)
os.makedirs(output_dir)

self.output_dir = output_dir

Expand Down Expand Up @@ -397,6 +396,12 @@ async def _run_snapshot_tool(self):

params = self.network_params[network_id]

match network_id:
case "mainnet":
snapshot_net = "mainnet"
case _:
snapshot_net = "testnet"

if self.ssh_config is None:
snapshot_tool_cmd = (
f"{self.snapshot_tool_path}"
Expand All @@ -405,7 +410,7 @@ async def _run_snapshot_tool(self):
f" --db-host {db_host}"
f" --db {db_name}"
f" --min-slot 0 --max-slot {params.registration_snapshot_slot}"
f" --network-id {network_id}"
f" --network-id {snapshot_net}"
f" --out-file {params.snapshot_tool_out_file}"
)

Expand Down Expand Up @@ -473,12 +478,16 @@ async def _run_catalyst_toolbox_snapshot(self):
)

for network_id, params in self.network_params.items():
discr = "test"
if network_id == "main" or network_id == "mainnet":
discr = "production"
with logger.contextualize(network_id=network_id):
catalyst_toolbox_cmd = (
f"{self.catalyst_toolbox_path} snapshot"
f" -s {params.snapshot_tool_out_file}"
f" -m {self.event_parameters.min_stake_threshold}"
f" -v {self.event_parameters.voting_power_cap}"
f" -d {discr}"
f" {params.catalyst_toolbox_out_file}"
)

Expand Down Expand Up @@ -681,8 +690,9 @@ async def _write_db_data(self):

voters: Dict[str, models.Voter] = {}
contributions: List[models.Contribution] = []
uniq_contrib_keys: Set[Tuple[str, str, str]] = set([])

for network_id, network_processed_snapshot in catalyst_toolbox_data.items():
async def process_voters(network_id, network_processed_snapshot):
network_report = network_snapshot_reports[network_id]

for ctd in network_processed_snapshot.voters:
Expand All @@ -694,19 +704,19 @@ async def _write_db_data(self):
# This can be removed once it's fixed in catalyst-toolbox
if not voting_key.startswith("0x"):
voting_key = "0x" + voting_key
stake_public_key = snapshot_contribution.stake_public_key
voting_group = ctd.hir.voting_group

delegation_data = registration_delegation_data[network_id][
f"{snapshot_contribution.stake_public_key}{voting_key}"
]
delegation_data = registration_delegation_data[network_id][f"{stake_public_key}{voting_key}"]

contribution = models.Contribution(
stake_public_key=snapshot_contribution.stake_public_key,
stake_public_key=stake_public_key,
snapshot_id=0,
voting_key=voting_key,
voting_weight=delegation_data["voting_weight"],
voting_key_idx=delegation_data["voting_key_idx"],
value=snapshot_contribution.value,
voting_group=ctd.hir.voting_group,
voting_group=voting_group,
reward_address=snapshot_contribution.reward_address,
)

Expand All @@ -717,8 +727,23 @@ async def _write_db_data(self):
voting_power=ctd.hir.voting_power,
)

contributions.append(contribution)
voters[f"{voter.voting_key}{voter.voting_group}"] = voter
# uniq_key that mirrors the unique key constraint in the DB
uniq_key = (stake_public_key, voting_key, voting_group)

# Add uniq_key if not already present, and append
# contribution and voter models.
if uniq_key not in uniq_contrib_keys:
uniq_contrib_keys.add(uniq_key)
contributions.append(contribution)
voters[f"{voter.voting_key}{voter.voting_group}"] = voter
else:
logger.error(
"Duplicate unique contribution key found, ignoring voter contribution",
network_id=network_id,
uniq_key=str(uniq_key),
contribution=str(contribution),
voter=str(voter),
)

await asyncio.sleep(0)

Expand All @@ -743,6 +768,16 @@ async def _write_db_data(self):
total_unique_rewards=len(network_report.unique_rewards),
)

# Process the snapshot from the highest_priority_network first to get the
# uniq_contrib_keys.
if highest_priority_network in catalyst_toolbox_data:
network_processed_snapshot = catalyst_toolbox_data.pop(highest_priority_network)
await process_voters(highest_priority_network, network_processed_snapshot)

# Process the rest of the network data.
for network_id, network_processed_snapshot in catalyst_toolbox_data.items():
await process_voters(network_id, network_processed_snapshot)

conn = await ideascale_importer.db.connect(self.eventdb_url)

async with conn.transaction():
Expand Down
Loading