From db79574d8e254f96734dc4e30dd74821f9329035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Wed, 24 Jul 2024 10:19:12 -0700 Subject: [PATCH] pindexer: implement dex value circuit breaker indexing (#4753) ## Describe your changes This implements a basic dex indexing component for the value circuit breaker; more importantly, this adds some basic scaffolding that other dex related indexing can latch onto. ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > client code only, no event changes --- Cargo.lock | 16 +++ crates/bin/pindexer/Cargo.toml | 3 +- crates/bin/pindexer/src/dex/dex.sql | 20 +++ crates/bin/pindexer/src/dex/mod.rs | 169 +++++++++++++++++++++++++ crates/bin/pindexer/src/indexer_ext.rs | 2 +- crates/bin/pindexer/src/lib.rs | 2 + crates/bin/pindexer/src/sql.rs | 114 +++++++++++++++++ 7 files changed, 324 insertions(+), 2 deletions(-) create mode 100644 crates/bin/pindexer/src/dex/dex.sql create mode 100644 crates/bin/pindexer/src/dex/mod.rs create mode 100644 crates/bin/pindexer/src/sql.rs diff --git a/Cargo.lock b/Cargo.lock index 22c91257f6..65451b53d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -761,6 +761,17 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" +[[package]] +name = "bigdecimal" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6773ddc0eafc0e509fb60e48dff7f450f8e674a0686ae8605e8d9901bd5eefa" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bincode" version = "1.3.3" @@ -5810,6 +5821,7 @@ dependencies = [ "anyhow", "clap", "cometindex", + "num-bigint", "penumbra-app", "penumbra-asset", "penumbra-num", @@ -7349,6 +7361,7 @@ checksum = "8d6753e460c998bbd4cd8c6f0ed9a64346fcca0723d6e75e52fdc351c5d2169d" dependencies = [ "ahash", "atoi", + "bigdecimal", "byteorder", "bytes", "chrono", @@ -7432,6 +7445,7 @@ checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" dependencies = [ "atoi", "base64 0.21.7", + "bigdecimal", "bitflags 2.6.0", "byteorder", "bytes", @@ -7475,6 +7489,7 @@ checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" dependencies = [ "atoi", "base64 0.21.7", + "bigdecimal", "bitflags 2.6.0", "byteorder", "chrono", @@ -7493,6 +7508,7 @@ dependencies = [ "log", "md-5", "memchr", + "num-bigint", "once_cell", "rand", "serde", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 64cf3204bc..92a3d75445 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -14,6 +14,7 @@ publish = false anyhow = {workspace = true} clap = {workspace = true} cometindex = {workspace = true} +num-bigint = { version = "0.4" } penumbra-shielded-pool = {workspace = true, default-features = false} penumbra-stake = {workspace = true, default-features = false} penumbra-app = {workspace = true, default-features = false} @@ -22,5 +23,5 @@ penumbra-asset = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} tokio = {workspace = true, features = ["full"]} serde_json = {workspace = true} -sqlx = { workspace = true, features = ["chrono"] } +sqlx = { workspace = true, features = ["bigdecimal", "chrono", "postgres"] } tracing = {workspace = true} diff --git a/crates/bin/pindexer/src/dex/dex.sql b/crates/bin/pindexer/src/dex/dex.sql new file mode 100644 index 0000000000..e1e93b11ee --- /dev/null +++ b/crates/bin/pindexer/src/dex/dex.sql @@ -0,0 +1,20 @@ +-- This component is responsible for processing events related to the DEX. + +-- # Design Choices +-- +-- ## Asset IDs +-- +-- We represent them as raw bytes---i.e. BYTEA---, rather than using a 1:1 table. +-- This is probably more efficient, and makes our lives much easier by the fact +-- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter +-- tables, rather than needing to do a join with another table. + +-- Keeps track of changes to the dex's value circuit breaker. +CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change ( + -- The asset being moved into or out of the dex. + asset_id BYTEA NOT NULL, + -- The flow, either positive, or negative, into the dex via this particular asset. + -- + -- Because we're dealing with arbitrary assets, we need to use something which can store u128 + flow NUMERIC(39, 0) NOT NULL +); diff --git a/crates/bin/pindexer/src/dex/mod.rs b/crates/bin/pindexer/src/dex/mod.rs new file mode 100644 index 0000000000..e854130375 --- /dev/null +++ b/crates/bin/pindexer/src/dex/mod.rs @@ -0,0 +1,169 @@ +use std::collections::HashSet; + +use anyhow::anyhow; +use cometindex::async_trait; +use penumbra_asset::asset::Id as AssetId; +use penumbra_num::Amount; +use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb}; +use sqlx::{PgPool, Postgres, Transaction}; + +use crate::sql::Sql; +use crate::{AppView, ContextualizedEvent, PgTransaction}; + +/// One of the possible events that we care about. +#[derive(Clone, Copy, Debug)] +enum Event { + /// A parsed version of [pb::EventValueCircuitBreakerCredit]. + CircuitBreakerCredit { + asset_id: AssetId, + previous_balance: Amount, + new_balance: Amount, + }, + /// A parsed version of [pb::EventValueCircuitBreakerDebit] + CircuitBreakerDebit { + asset_id: AssetId, + previous_balance: Amount, + new_balance: Amount, + }, +} + +impl Event { + const NAMES: [&'static str; 2] = [ + "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit", + "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit", + ]; + + /// Index this event, using the handle to the postgres transaction. + async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> { + match *self { + Event::CircuitBreakerCredit { + asset_id, + previous_balance, + new_balance, + } => { + let amount = new_balance.checked_sub(&previous_balance).ok_or(anyhow!( + "balance decreased after dex credit: previous: {}, new: {}", + previous_balance, + new_balance + ))?; + sqlx::query( + r#" + INSERT INTO dex_value_circuit_breaker_change + VALUES ($1, $2); + "#, + ) + .bind(Sql::from(asset_id)) + .bind(Sql::from(amount)) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + Event::CircuitBreakerDebit { + asset_id, + previous_balance, + new_balance, + } => { + let amount = previous_balance.checked_sub(&new_balance).ok_or(anyhow!( + "balance increased after dex credit: previous: {}, new: {}", + previous_balance, + new_balance + ))?; + sqlx::query( + r#" + INSERT INTO dex_value_circuit_breaker_change + VALUES ($1, -$2); + "#, + ) + .bind(Sql::from(asset_id)) + .bind(Sql::from(amount)) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + } + } +} + +impl<'a> TryFrom<&'a ContextualizedEvent> for Event { + type Error = anyhow::Error; + + fn try_from(event: &'a ContextualizedEvent) -> Result { + match event.event.kind.as_str() { + // Credit + x if x == Event::NAMES[0] => { + let pe = pb::EventValueCircuitBreakerCredit::from_event(event.as_ref())?; + let asset_id = + AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?; + let previous_balance = Amount::try_from( + pe.previous_balance + .ok_or(anyhow!("event missing previous_balance"))?, + )?; + let new_balance = + Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?; + Ok(Self::CircuitBreakerCredit { + asset_id, + previous_balance, + new_balance, + }) + } + // Debit + x if x == Event::NAMES[1] => { + let pe = pb::EventValueCircuitBreakerDebit::from_event(event.as_ref())?; + let asset_id = + AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?; + let previous_balance = Amount::try_from( + pe.previous_balance + .ok_or(anyhow!("event missing previous_balance"))?, + )?; + let new_balance = + Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?; + Ok(Self::CircuitBreakerDebit { + asset_id, + previous_balance, + new_balance, + }) + } + x => Err(anyhow!(format!("unrecognized event kind: {x}"))), + } + } +} + +#[derive(Debug)] +pub struct Component { + event_strings: HashSet<&'static str>, +} + +impl Component { + pub fn new() -> Self { + let event_strings = Event::NAMES.into_iter().collect(); + Self { event_strings } + } +} + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _app_state: &serde_json::Value, + ) -> anyhow::Result<()> { + sqlx::query(include_str!("dex.sql")) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + self.event_strings.contains(type_str) + } + + #[tracing::instrument(skip_all, fields(height = event.block_height))] + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + _src_db: &PgPool, + ) -> anyhow::Result<()> { + Event::try_from(event)?.index(dbtx).await + } +} diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 693924626f..0bc3b0e511 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -7,8 +7,8 @@ impl IndexerExt for cometindex::Indexer { self.with_index(crate::shielded_pool::fmd::ClueSet {}) .with_index(crate::stake::ValidatorSet {}) .with_index(crate::stake::Slashings {}) - .with_index(crate::stake::MissedBlocks {}) .with_index(crate::stake::DelegationTxs {}) .with_index(crate::stake::UndelegationTxs {}) + .with_index(crate::dex::Component::new()) } } diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index 0f942bc374..9c443724d8 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -3,5 +3,7 @@ pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool mod indexer_ext; pub use indexer_ext::IndexerExt; pub mod block; +pub mod dex; pub mod shielded_pool; +mod sql; pub mod stake; diff --git a/crates/bin/pindexer/src/sql.rs b/crates/bin/pindexer/src/sql.rs new file mode 100644 index 0000000000..d36d566ab6 --- /dev/null +++ b/crates/bin/pindexer/src/sql.rs @@ -0,0 +1,114 @@ +use std::error::Error; + +use anyhow::anyhow; +use num_bigint::{BigInt, Sign}; +use penumbra_asset::asset::Id as AssetId; +use penumbra_num::Amount; +use sqlx::{types::BigDecimal, Decode, Encode, Postgres, Type}; + +/// An extension trait to make it easier to implement serialization for existing Penumbra types. +/// +/// Types that implement this trait can then be shoved into [Sql] and passed along +/// to the various sqlx functions. +pub trait SqlExt: Clone + Sized { + type SqlT; + + fn to_sql_type(&self) -> Self::SqlT; + fn from_sql_type(value: Self::SqlT) -> anyhow::Result; +} + +/// A wrapper over `T` allowing for SQL serialization and deserialization. +/// +/// When `T` implements [SqlExt] then this type will be encodeable and decodeable +/// from a Postgres database. +pub struct Sql(T); + +impl Sql { + #[allow(dead_code)] + pub fn into(self) -> T { + self.0 + } +} + +impl From for Sql { + fn from(value: T) -> Self { + Self(value) + } +} + +impl<'q, T> Encode<'q, Postgres> for Sql +where + T: SqlExt, + T::SqlT: Encode<'q, Postgres>, +{ + fn encode_by_ref( + &self, + buf: &mut >::ArgumentBuffer, + ) -> sqlx::encode::IsNull { + ::to_sql_type(&self.0).encode_by_ref(buf) + } +} + +impl<'q, T> Decode<'q, Postgres> for Sql +where + T: SqlExt, + T::SqlT: Decode<'q, Postgres>, +{ + fn decode( + value: >::ValueRef, + ) -> Result { + let sql_t = ::SqlT::decode(value)?; + let t = T::from_sql_type(sql_t) + .map_err(|e| Box::::from(e))?; + Ok(Sql(t)) + } +} + +impl Type for Sql +where + T: SqlExt, + T::SqlT: Type, +{ + fn type_info() -> ::TypeInfo { + <[u8; 32]>::type_info() + } +} + +impl SqlExt for Amount { + type SqlT = BigDecimal; + + fn to_sql_type(&self) -> Self::SqlT { + BigDecimal::from(BigInt::from_bytes_le( + Sign::Plus, + self.to_le_bytes().as_slice(), + )) + } + + fn from_sql_type(value: Self::SqlT) -> anyhow::Result { + if !value.is_integer() { + return Err(anyhow!("database value is not an integer").into()); + } + let big_int = value.as_bigint_and_exponent().0; + // Get the bytes only from a positive BigInt + let bytes = match big_int.to_bytes_le() { + (Sign::Plus | Sign::NoSign, bytes) => bytes, + (Sign::Minus, bytes) => bytes, + }; + let bytes: [u8; 16] = bytes + .try_into() + .map_err(|_| anyhow!("failed to convert slice to 16 bytes"))?; + Ok(Amount::from_le_bytes(bytes)) + } +} + +impl SqlExt for AssetId { + type SqlT = [u8; 32]; + + fn to_sql_type(&self) -> Self::SqlT { + self.to_bytes() + } + + fn from_sql_type(value: Self::SqlT) -> anyhow::Result { + Ok(AssetId::try_from(value.as_slice())?) + } +}