Skip to content

Commit

Permalink
pindexer: implement dex value circuit breaker indexing (#4753)
Browse files Browse the repository at this point in the history
## Describe your changes

This implements a basic dex indexing component for the value circuit
breaker; more importantly, this adds some basic scaffolding that other
dex related indexing can latch onto.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > client code only, no event changes
  • Loading branch information
cronokirby committed Jul 24, 2024
1 parent a2ffd8a commit db79574
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 2 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ publish = false
anyhow = {workspace = true}
clap = {workspace = true}
cometindex = {workspace = true}
num-bigint = { version = "0.4" }
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
Expand All @@ -22,5 +23,5 @@ penumbra-asset = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
tokio = {workspace = true, features = ["full"]}
serde_json = {workspace = true}
sqlx = { workspace = true, features = ["chrono"] }
sqlx = { workspace = true, features = ["bigdecimal", "chrono", "postgres"] }
tracing = {workspace = true}
20 changes: 20 additions & 0 deletions crates/bin/pindexer/src/dex/dex.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- This component is responsible for processing events related to the DEX.

-- # Design Choices
--
-- ## Asset IDs
--
-- We represent them as raw bytes---i.e. BYTEA---, rather than using a 1:1 table.
-- This is probably more efficient, and makes our lives much easier by the fact
-- that given an `penumbra_asset::asset::Id`, we always know exactly how to filter
-- tables, rather than needing to do a join with another table.

-- Keeps track of changes to the dex's value circuit breaker.
CREATE TABLE IF NOT EXISTS dex_value_circuit_breaker_change (
-- The asset being moved into or out of the dex.
asset_id BYTEA NOT NULL,
-- The flow, either positive, or negative, into the dex via this particular asset.
--
-- Because we're dealing with arbitrary assets, we need to use something which can store u128
flow NUMERIC(39, 0) NOT NULL
);
169 changes: 169 additions & 0 deletions crates/bin/pindexer/src/dex/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use std::collections::HashSet;

use anyhow::anyhow;
use cometindex::async_trait;
use penumbra_asset::asset::Id as AssetId;
use penumbra_num::Amount;
use penumbra_proto::{event::ProtoEvent, penumbra::core::component::dex::v1 as pb};
use sqlx::{PgPool, Postgres, Transaction};

use crate::sql::Sql;
use crate::{AppView, ContextualizedEvent, PgTransaction};

/// One of the possible events that we care about.
#[derive(Clone, Copy, Debug)]
enum Event {
/// A parsed version of [pb::EventValueCircuitBreakerCredit].
CircuitBreakerCredit {
asset_id: AssetId,
previous_balance: Amount,
new_balance: Amount,
},
/// A parsed version of [pb::EventValueCircuitBreakerDebit]
CircuitBreakerDebit {
asset_id: AssetId,
previous_balance: Amount,
new_balance: Amount,
},
}

impl Event {
const NAMES: [&'static str; 2] = [
"penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
"penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
];

/// Index this event, using the handle to the postgres transaction.
async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> {
match *self {
Event::CircuitBreakerCredit {
asset_id,
previous_balance,
new_balance,
} => {
let amount = new_balance.checked_sub(&previous_balance).ok_or(anyhow!(
"balance decreased after dex credit: previous: {}, new: {}",
previous_balance,
new_balance
))?;
sqlx::query(
r#"
INSERT INTO dex_value_circuit_breaker_change
VALUES ($1, $2);
"#,
)
.bind(Sql::from(asset_id))
.bind(Sql::from(amount))
.execute(dbtx.as_mut())
.await?;
Ok(())
}
Event::CircuitBreakerDebit {
asset_id,
previous_balance,
new_balance,
} => {
let amount = previous_balance.checked_sub(&new_balance).ok_or(anyhow!(
"balance increased after dex credit: previous: {}, new: {}",
previous_balance,
new_balance
))?;
sqlx::query(
r#"
INSERT INTO dex_value_circuit_breaker_change
VALUES ($1, -$2);
"#,
)
.bind(Sql::from(asset_id))
.bind(Sql::from(amount))
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}
}
}

impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
type Error = anyhow::Error;

fn try_from(event: &'a ContextualizedEvent) -> Result<Self, Self::Error> {
match event.event.kind.as_str() {
// Credit
x if x == Event::NAMES[0] => {
let pe = pb::EventValueCircuitBreakerCredit::from_event(event.as_ref())?;
let asset_id =
AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?;
let previous_balance = Amount::try_from(
pe.previous_balance
.ok_or(anyhow!("event missing previous_balance"))?,
)?;
let new_balance =
Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?;
Ok(Self::CircuitBreakerCredit {
asset_id,
previous_balance,
new_balance,
})
}
// Debit
x if x == Event::NAMES[1] => {
let pe = pb::EventValueCircuitBreakerDebit::from_event(event.as_ref())?;
let asset_id =
AssetId::try_from(pe.asset_id.ok_or(anyhow!("event missing asset_id"))?)?;
let previous_balance = Amount::try_from(
pe.previous_balance
.ok_or(anyhow!("event missing previous_balance"))?,
)?;
let new_balance =
Amount::try_from(pe.new_balance.ok_or(anyhow!("event missing new_balance"))?)?;
Ok(Self::CircuitBreakerDebit {
asset_id,
previous_balance,
new_balance,
})
}
x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
}
}
}

#[derive(Debug)]
pub struct Component {
event_strings: HashSet<&'static str>,
}

impl Component {
pub fn new() -> Self {
let event_strings = Event::NAMES.into_iter().collect();
Self { event_strings }
}
}

#[async_trait]
impl AppView for Component {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_app_state: &serde_json::Value,
) -> anyhow::Result<()> {
sqlx::query(include_str!("dex.sql"))
.execute(dbtx.as_mut())
.await?;
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
self.event_strings.contains(type_str)
}

#[tracing::instrument(skip_all, fields(height = event.block_height))]
async fn index_event(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> anyhow::Result<()> {
Event::try_from(event)?.index(dbtx).await
}
}
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/indexer_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ impl IndexerExt for cometindex::Indexer {
self.with_index(crate::shielded_pool::fmd::ClueSet {})
.with_index(crate::stake::ValidatorSet {})
.with_index(crate::stake::Slashings {})
.with_index(crate::stake::MissedBlocks {})
.with_index(crate::stake::DelegationTxs {})
.with_index(crate::stake::UndelegationTxs {})
.with_index(crate::dex::Component::new())
}
}
2 changes: 2 additions & 0 deletions crates/bin/pindexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool
mod indexer_ext;
pub use indexer_ext::IndexerExt;
pub mod block;
pub mod dex;
pub mod shielded_pool;
mod sql;
pub mod stake;
114 changes: 114 additions & 0 deletions crates/bin/pindexer/src/sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::error::Error;

use anyhow::anyhow;
use num_bigint::{BigInt, Sign};
use penumbra_asset::asset::Id as AssetId;
use penumbra_num::Amount;
use sqlx::{types::BigDecimal, Decode, Encode, Postgres, Type};

/// An extension trait to make it easier to implement serialization for existing Penumbra types.
///
/// Types that implement this trait can then be shoved into [Sql] and passed along
/// to the various sqlx functions.
pub trait SqlExt: Clone + Sized {
type SqlT;

fn to_sql_type(&self) -> Self::SqlT;
fn from_sql_type(value: Self::SqlT) -> anyhow::Result<Self>;
}

/// A wrapper over `T` allowing for SQL serialization and deserialization.
///
/// When `T` implements [SqlExt] then this type will be encodeable and decodeable
/// from a Postgres database.
pub struct Sql<T>(T);

impl<T> Sql<T> {
#[allow(dead_code)]
pub fn into(self) -> T {
self.0
}
}

impl<T> From<T> for Sql<T> {
fn from(value: T) -> Self {
Self(value)
}
}

impl<'q, T> Encode<'q, Postgres> for Sql<T>
where
T: SqlExt,
T::SqlT: Encode<'q, Postgres>,
{
fn encode_by_ref(
&self,
buf: &mut <Postgres as sqlx::database::HasArguments<'q>>::ArgumentBuffer,
) -> sqlx::encode::IsNull {
<T as SqlExt>::to_sql_type(&self.0).encode_by_ref(buf)
}
}

impl<'q, T> Decode<'q, Postgres> for Sql<T>
where
T: SqlExt,
T::SqlT: Decode<'q, Postgres>,
{
fn decode(
value: <Postgres as sqlx::database::HasValueRef<'q>>::ValueRef,
) -> Result<Self, sqlx::error::BoxDynError> {
let sql_t = <T as SqlExt>::SqlT::decode(value)?;
let t = T::from_sql_type(sql_t)
.map_err(|e| Box::<dyn Error + Send + Sync + 'static>::from(e))?;
Ok(Sql(t))
}
}

impl<T> Type<Postgres> for Sql<T>
where
T: SqlExt,
T::SqlT: Type<Postgres>,
{
fn type_info() -> <Postgres as sqlx::Database>::TypeInfo {
<[u8; 32]>::type_info()
}
}

impl SqlExt for Amount {
type SqlT = BigDecimal;

fn to_sql_type(&self) -> Self::SqlT {
BigDecimal::from(BigInt::from_bytes_le(
Sign::Plus,
self.to_le_bytes().as_slice(),
))
}

fn from_sql_type(value: Self::SqlT) -> anyhow::Result<Self> {
if !value.is_integer() {
return Err(anyhow!("database value is not an integer").into());
}
let big_int = value.as_bigint_and_exponent().0;
// Get the bytes only from a positive BigInt
let bytes = match big_int.to_bytes_le() {
(Sign::Plus | Sign::NoSign, bytes) => bytes,
(Sign::Minus, bytes) => bytes,
};
let bytes: [u8; 16] = bytes
.try_into()
.map_err(|_| anyhow!("failed to convert slice to 16 bytes"))?;
Ok(Amount::from_le_bytes(bytes))
}
}

impl SqlExt for AssetId {
type SqlT = [u8; 32];

fn to_sql_type(&self) -> Self::SqlT {
self.to_bytes()
}

fn from_sql_type(value: Self::SqlT) -> anyhow::Result<Self> {
Ok(AssetId::try_from(value.as_slice())?)
}
}

0 comments on commit db79574

Please sign in to comment.