Skip to content

Commit

Permalink
pipeline modification for price
Browse files Browse the repository at this point in the history
  • Loading branch information
naik-ai committed Oct 28, 2024
1 parent 40e4abf commit 5ed12ba
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 42 deletions.
233 changes: 233 additions & 0 deletions src/integrations/defilamma_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import pytz
import httpx
import numpy as np
from datetime import datetime
import typing
from dataclasses import dataclass
import asyncio
from typing import Optional, Dict, Any
import pandas as pd
import pandas_gbq as gbq
import logging
from pydantic import BaseModel
from typing import List

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


@dataclass
class TokenPriceConfig:
"""Configuration class for token price fetching"""

base_url: str = "https://coins.llama.fi/chart"
chain: str = "ethereum"
search_width: int = 100
period: str = "1m"
span: int = 30


class TokenPriceClient:
"""Client for fetching token prices from DefiLlama"""

def __init__(self, config: TokenPriceConfig = TokenPriceConfig()):
self.config = config
self.client = httpx.AsyncClient(timeout=30.0)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()

def _build_url(self, token_address: str) -> str:
"""Build URL for API request"""
token_identifier = f"{self.config.chain}:{token_address}"
return f"{self.config.base_url}/{token_identifier}"

def _build_params(self, start_timestamp: int) -> Dict[str, Any]:
"""Build query parameters"""
return {
"start": start_timestamp,
"span": self.config.span,
"period": self.config.period,
"searchWidth": self.config.search_width,
}

async def get_token_prices(self, token_address: str, start_timestamp: int):
"""
Fetch token prices from DefiLlama
Args:
token_address: Token contract address
start_timestamp: Start timestamp in unix format
Returns:
TokenPriceResponse object containing price data
"""
url = self._build_url(token_address)
params = self._build_params(start_timestamp)

try:
response = await self.client.get(url, params=params)
response.raise_for_status()
data = response.json()

return data

except httpx.HTTPError as e:
raise Exception(f"HTTP error occurred: {str(e)}")
except Exception as e:
raise Exception(f"Error fetching token prices: {str(e)}")


class PricePoint(BaseModel):
"""Model for individual price points"""

timestamp: int
price: float


class TokenData(BaseModel):
"""Model for token information"""

symbol: str
confidence: float
decimals: int
prices: List[PricePoint]


class TokenResponse(BaseModel):
"""Root response model"""

coins: Dict[str, TokenData]


def process_token_data(json_data: dict) -> pd.DataFrame:
"""
Process token price data into a DataFrame
Args:
json_data: Raw JSON response from the API
Returns:
pd.DataFrame: Processed and validated token price data
"""
# Validate data using Pydantic
validated_data = TokenResponse(coins=json_data["coins"])

# Process into rows for DataFrame
rows = []
for contract_address, token_data in validated_data.coins.items():
# Split contract address into chain and address
chain = contract_address.split(":")[0]
address = contract_address.split(":")[1]

# Create a row for each price point
for price_data in token_data.prices:
# minute,blockchain, contract_address, decimals, symbol, price
rows.append(
{
"minute": datetime.fromtimestamp(price_data.timestamp, tz=pytz.UTC),
"blockchain": chain,
"contract_address": address.lower(),
"decimals": token_data.decimals,
"symbol": token_data.symbol.upper(),
"price": price_data.price,
}
)

return pd.DataFrame(rows)


def get_max_token_price_timestamp_from_bq() -> int:
sql = "SELECT max(CAST(minute AS TIMESTAMP)) AS max_timestamp FROM `mainnet-bigq.dune.all_everclear_tokens_prices`"
df = gbq.read_gbq(sql)
final_start_date = np.array(df["max_timestamp"])[0]
# convert to int
return int(final_start_date.timestamp())


async def pull_defilamma_coingecko_price(start_timestamp: int):
"""
Fetch token prices from DefiLlama and push to BigQuery
"""

token_addresses = [
"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", # WETH
"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", # USDC
"0xdac17f958d2ee523a2206206994597c13d831ec7", # USDT
"0xB8C77482E45F1F44DE1745F52C74426C631BDD52", # BNB
]

logging.info(f"Starting from {start_timestamp}")
config = TokenPriceConfig()
all_price_data = []

async with TokenPriceClient(config) as client:
for token_address in token_addresses:
try:
response = await client.get_token_prices(token_address, start_timestamp)
logging.info(f"Fetching price data for token: {token_address}")

# Process data for this token
token_price_data = process_token_data(response)
all_price_data.append(token_price_data)

# Add delay to avoid rate limiting
await asyncio.sleep(10)

except Exception as e:
logging.error(
f"Error fetching data for token {token_address}: {str(e)}"
)
continue

if all_price_data:
# Combine all dataframes
combined_price_data = pd.concat(all_price_data, ignore_index=True)
return combined_price_data
else:
logging.error("No price data was collected")


def get_all_everclear_tokens_prices_pipeline():
"""date as UTC, convert the number to timestamp from BQ"""

max_date_bq = get_max_token_price_timestamp_from_bq()
from_date, to_date = max_date_bq, int(datetime.now(pytz.UTC).timestamp())
logging.info(
f"Pulling data for Everclear tokens prices from {from_date} to {to_date}"
)
df = asyncio.run(pull_defilamma_coingecko_price(start_timestamp=max_date_bq))
if not df.empty:
logging.info(df.head())

max_timestamps_by_symbol = df.groupby("symbol")["minute"].max().reset_index()
min_of_max_timestamps = np.array(max_timestamps_by_symbol["minute"].min())
logging.info(
f"Min of max timestamps: {min_of_max_timestamps}, filter out all data above this"
)

df_final = df[
(df["minute"] <= min_of_max_timestamps)
& (df["minute"] > pd.to_datetime(max_date_bq, unit="s", utc=True))
].reset_index(drop=True)
if not df_final.empty:
logging.info(f"Adding data to BigQuery: length {len(df_final)}")
gbq.to_gbq(
dataframe=df_final,
project_id="mainnet-bigq",
destination_table="dune.all_everclear_tokens_prices",
if_exists="append",
api_method="load_csv",
)
else:
logging.info(
f"""data not up to date for all symbols,
from {from_date} to {to_date} with min timestamp {min_of_max_timestamps}
"""
)
else:
logging.info(f"No data fetched for the period {from_date} to {to_date}")
26 changes: 26 additions & 0 deletions src/integrations/models/defilamma_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime
import pandas as pd


class TokenPrice(BaseModel):
"""Individual price point for a token"""

timestamp: int
price: float


class TokenData(BaseModel):
"""Token information and price data"""

symbol: str
confidence: float
decimals: int
prices: List[TokenPrice]


class TokenResponse(BaseModel):
"""Root response model"""

coins: dict[str, TokenData]
2 changes: 1 addition & 1 deletion src/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
run_bigquery_query,
)

from src.integrations.dune_txs import get_all_everclear_tokens_prices_pipeline
from src.integrations.defilamma_2 import get_all_everclear_tokens_prices_pipeline
from src.integrations.everclear_pg import everclear_pg_2_bq
from src.integrations.everclear_metrics_api import (
metrics_daily,
Expand Down
81 changes: 40 additions & 41 deletions src/sql/everclear/prod/new_prod_date_change.sql
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ netted_raw AS (
AND CAST(i.settlement_domain AS INTEGER) = tm.domain_id
)
WHERE inv.id IS NULL
AND i.status IN ('SETTLED_AND_COMPLETED', 'SETTLED_AND_MANUALLY_EXECUTED')
AND i.status = 'SETTLED_AND_COMPLETED'
AND i.hub_status != 'DISPATCHED_UNSUPPORTED'
GROUP BY 1,
2,
Expand Down Expand Up @@ -232,7 +232,7 @@ settled_raw AS (
LOWER(i.settlement_asset) = tm.address
AND CAST(i.settlement_domain AS INTEGER) = tm.domain_id
)
WHERE i.status IN ('SETTLED_AND_COMPLETED', 'SETTLED_AND_MANUALLY_EXECUTED')
WHERE i.status = 'SETTLED_AND_COMPLETED'
AND i.hub_status IN ('DISPATCHED', 'SETTLED')
GROUP BY 1,
2,
Expand Down Expand Up @@ -263,47 +263,46 @@ settled_final AS (
FROM settled_raw
),
combined AS (
SELECT -- groups
COALESCE(n.day, s.day) AS day,
COALESCE(n.from_chain_id, s.from_chain_id) AS from_chain_id,
COALESCE(n.from_asset_address, s.from_asset_address) AS from_asset_address,
COALESCE(n.from_asset_symbol, s.from_asset_symbol) AS from_asset_symbol,
COALESCE(n.to_chain_id, s.to_chain_id) AS to_chain_id,
COALESCE(n.to_asset_address, s.to_asset_address) AS to_asset_address,
COALESCE(n.to_asset_symbol, s.to_asset_symbol) AS to_asset_symbol,
-- metrics
n.netting_volume,
n.netting_avg_intent_size,
n.netting_protocol_revenue,
n.netting_total_intents,
n.netting_avg_time_in_hrs,
s.volume_settled_by_mm,
s.total_intents_by_mm,
s.discounts_by_mm,
s.avg_discounts_by_mm,
s.rewards_for_invoices,
s.avg_rewards_by_invoice,
s.avg_settlement_time_in_hrs_by_mm,
s.apy,
s.avg_discount_epoch_by_mm,
-- add the combinations of metrics here
-- clearing volume
COALESCE(n.netting_volume, 0) + COALESCE(s.volume_settled_by_mm, 0) AS total_volume,
-- intents
COALESCE(n.netting_total_intents, 0) + COALESCE(s.total_intents_by_mm, 0) AS total_intents,
-- revenue
COALESCE(n.netting_protocol_revenue, 0) + COALESCE(s.protocol_revenue_mm, 0) AS total_protocol_revenue,
-- rebalancing fee
COALESCE(n.netting_protocol_revenue, 0) + COALESCE(s.protocol_revenue_mm, 0) + COALESCE(s.discounts_by_mm, 0) AS total_rebalancing_fee
FROM netted_final n
FULL OUTER JOIN settled_final s ON n.day = s.day
AND n.from_chain_id = s.from_chain_id
AND n.to_chain_id = s.to_chain_id
AND n.from_asset_address = s.from_asset_address
SELECT -- groups
COALESCE(n.day, s.day) AS day,
COALESCE(n.from_chain_id, s.from_chain_id) AS from_chain_id,
COALESCE(n.from_asset_address, s.from_asset_address) AS from_asset_address,
COALESCE(n.from_asset_symbol, s.from_asset_symbol) AS from_asset_symbol,
COALESCE(n.to_chain_id, s.to_chain_id) AS to_chain_id,
COALESCE(n.to_asset_address, s.to_asset_address) AS to_asset_address,
COALESCE(n.to_asset_symbol, s.to_asset_symbol) AS to_asset_symbol,
-- metrics
n.netting_volume,
n.netting_avg_intent_size,
n.netting_protocol_revenue,
n.netting_total_intents,
n.netting_avg_time_in_hrs,
s.volume_settled_by_mm,
s.total_intents_by_mm,
s.discounts_by_mm,
s.avg_discounts_by_mm,
s.rewards_for_invoices,
s.avg_rewards_by_invoice,
s.avg_settlement_time_in_hrs_by_mm,
s.apy,
s.avg_discount_epoch_by_mm,
-- add the combinations of metrics here
-- clearing volume
COALESCE(n.netting_volume, 0) + COALESCE(s.volume_settled_by_mm, 0) AS total_volume,
-- intents
COALESCE(n.netting_total_intents, 0) + COALESCE(s.total_intents_by_mm, 0) AS total_intents,
-- revenue
COALESCE(n.netting_protocol_revenue, 0) + COALESCE(s.protocol_revenue_mm, 0) AS total_protocol_revenue,
-- rebalancing fee
COALESCE(n.netting_protocol_revenue, 0) + COALESCE(s.protocol_revenue_mm, 0) + COALESCE(s.discounts_by_mm, 0) AS total_rebalancing_fee
FROM netted_final n
FULL OUTER JOIN settled_final s ON n.day = s.day
AND n.from_chain_id = s.from_chain_id
AND n.to_chain_id = s.to_chain_id
AND n.from_asset_address = s.from_asset_address
AND n.to_asset_address = s.to_asset_address
)
SELECT *
FROM combined
SELECT * FROM combined
WHERE day = '2024-09-30 00:00:00+00'
AND from_asset_symbol = 'USDC'
AND (to_chain_id = 56 OR to_chain_id = 8453)
Expand Down

0 comments on commit 5ed12ba

Please sign in to comment.