Skip to content

Commit

Permalink
dex: augment price indexes with position data
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor committed Jun 13, 2024
1 parent 8296243 commit 67f8546
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 86 deletions.
4 changes: 2 additions & 2 deletions crates/bin/pd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use cnidarium::Storage;
use metrics_exporter_prometheus::PrometheusBuilder;
use pd::{
cli::{Opt, RootCommand, TestnetCommand},
migrate::Migration::{ReadyToStart, Testnet77},
migrate::Migration::{ReadyToStart, Testnet78},
testnet::{
config::{get_testnet_dir, parse_tm_address, url_has_necessary_parts},
generate::TestnetConfig,
Expand Down Expand Up @@ -458,7 +458,7 @@ async fn main() -> anyhow::Result<()> {

let genesis_start = pd::migrate::last_block_timestamp(pd_home.clone()).await?;
tracing::info!(?genesis_start, "last block timestamp");
Testnet77
Testnet78
.migrate(pd_home.clone(), comet_home, Some(genesis_start), force)
.instrument(pd_migrate_span)
.await
Expand Down
8 changes: 7 additions & 1 deletion crates/bin/pd/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod testnet72;
mod testnet74;
mod testnet76;
mod testnet77;
mod testnet78;

use anyhow::{ensure, Context};
use penumbra_governance::StateReadExt;
Expand Down Expand Up @@ -47,6 +48,9 @@ pub enum Migration {
/// Testnet-77 migration:
/// - Reset the halt bit
Testnet77,
/// Testnet-78 migration:
/// - Populate the DEX NV price idnexes with position data
Testnet78,
}

impl Migration {
Expand Down Expand Up @@ -95,10 +99,12 @@ impl Migration {
Migration::Testnet77 => {
testnet77::migrate(storage, pd_home.clone(), genesis_start).await?
}
Migration::Testnet78 => {
testnet78::migrate(storage, pd_home.clone(), genesis_start).await?
}
};

if let Some(comet_home) = comet_home {
// TODO avoid this when refactoring to clean up migrations
let genesis_path = pd_home.join("genesis.json");
migrate_comet_data(comet_home, genesis_path).await?;
}
Expand Down
121 changes: 121 additions & 0 deletions crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//! Contains functions related to the migration script for Testnet78.
use anyhow::{Context, Result};
use cnidarium::{Snapshot, StateWrite};
use cnidarium::{StateDelta, Storage};
use futures::{pin_mut, StreamExt};
use jmt::RootHash;
use penumbra_app::app::StateReadExt as _;
use penumbra_dex::lp::position;
use penumbra_dex::lp::position::Position;
use penumbra_governance::StateWriteExt;
use penumbra_proto::StateReadProto;
use penumbra_sct::component::clock::EpochManager;
use penumbra_sct::component::clock::EpochRead;
use std::path::PathBuf;
use tracing::instrument;

use crate::testnet::generate::TestnetConfig;

/// Run the full migration, given an export path and a start time for genesis.
///
/// Menu:
/// - Close and re-open all *open* positions so that they are re-indexed.
#[instrument]
pub async fn migrate(
storage: Storage,
pd_home: PathBuf,
genesis_start: Option<tendermint::time::Time>,
) -> anyhow::Result<()> {
let initial_state = storage.latest_snapshot();
let chain_id = initial_state.get_chain_id().await?;
let root_hash = initial_state
.root_hash()
.await
.expect("chain state has a root hash");
let pre_upgrade_root_hash: RootHash = root_hash.into();
let pre_upgrade_height = initial_state
.get_block_height()
.await
.expect("chain state has a block height");
let post_upgrade_height = pre_upgrade_height.wrapping_add(1);
let mut delta = StateDelta::new(initial_state);

/* Migration */
reindex_dex_positions(&mut delta).await?;
/* ********* */

// Reset the halt flag, and application height.
delta.ready_to_start();
delta.put_block_height(0u64);
let _ = storage
.commit_in_place(delta)
.await
.context("failed to reset halt bit")?;
storage.release().await;

// The migration is complete, now we need to generate a genesis file. To do this, we need
// to lookup a validator view from the chain, and specify the post-upgrade app hash and
// initial height.
let app_state = penumbra_app::genesis::Content {
chain_id,
..Default::default()
};
let mut genesis = TestnetConfig::make_genesis(app_state.clone()).expect("can make genesis");
genesis.app_hash = pre_upgrade_root_hash
.0
.to_vec()
.try_into()
.expect("infallible conversion");

genesis.initial_height = post_upgrade_height as i64;
genesis.genesis_time = genesis_start.unwrap_or_else(|| {
let now = tendermint::time::Time::now();
tracing::info!(%now, "no genesis time provided, detecting a testing setup");
now
});
let checkpoint = pre_upgrade_root_hash.0.to_vec();
let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));
let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
tracing::info!("genesis: {}", genesis_json);
let genesis_path = pd_home.join("genesis.json");
std::fs::write(genesis_path, genesis_json).expect("can write genesis");

let validator_state_path = pd_home.join("priv_validator_state.json");
let fresh_validator_state = crate::testnet::generate::TestnetValidator::initial_state();
std::fs::write(validator_state_path, fresh_validator_state).expect("can write validator state");

tracing::info!(
pre_upgrade_height,
?pre_upgrade_root_hash,
"successful migration!"
);

Ok(())
}

async fn reindex_dex_positions(delta: &mut StateDelta<Snapshot>) -> Result<()> {
use penumbra_dex::component::PositionManager;
tracing::info!("running dex re-indexing migration");
let prefix_key_lp = penumbra_dex::state_key::all_positions();
let stream_all_lp = delta.prefix::<Position>(&prefix_key_lp);
let stream_open_lp = stream_all_lp.filter_map(|entry| async {
match entry {
Ok((_, lp)) if lp.state == position::State::Opened => Some(lp),
_ => None,
}
});
pin_mut!(stream_open_lp);

while let Some(lp) = stream_open_lp.next().await {
// Re-hash the position, since the key is a bech32 string.
let id = lp.id();
// Close the position, adjusting all its index entries.
delta.close_position_by_id(&id).await?;
// Erase the position from the state, so that we circumvent the `update_position` guard.
delta.delete(penumbra_dex::state_key::position_by_id(&id));
// Open a position with the adjusted indexing logic.
delta.open_position(lp).await?;
}
tracing::info!("completed dex migration");
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod tests {

let position = buy_1;
state_tx
.update_position_by_price_index(&None, &position, &position.id())
.update_position_by_price_index(&position.id(), &None, &position)
.expect("can update price index");
state_tx.put(state_key::position_by_id(&id), position);

Expand Down
39 changes: 20 additions & 19 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ pub trait PositionRead: StateRead {
fn positions_by_price(
&self,
pair: &DirectedTradingPair,
) -> Pin<Box<dyn Stream<Item = Result<position::Id>> + Send + 'static>> {
) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
{
let prefix = engine::price_index::prefix(pair);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
self.nonverifiable_prefix_raw(&prefix)
self.nonverifiable_prefix(&prefix)
.map(|entry| match entry {
Ok((k, _)) => {
Ok((k, lp)) => {
let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
Ok(position::Id(raw_id))
Ok((position::Id(raw_id), lp))
}
Err(e) => Err(e),
})
Expand All @@ -90,12 +91,9 @@ pub trait PositionRead: StateRead {
async fn best_position(
&self,
pair: &DirectedTradingPair,
) -> Result<Option<position::Position>> {
) -> Result<Option<(position::Id, position::Position)>> {
let mut positions_by_price = self.positions_by_price(pair);
match positions_by_price.next().await.transpose()? {
Some(id) => self.position_by_id(&id).await,
None => Ok(None),
}
positions_by_price.next().await.transpose()
}

/// Fetch the list of pending position closures.
Expand Down Expand Up @@ -205,7 +203,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(id, Some(prev_state), new_state)
.await?;

Ok(())
}
Expand Down Expand Up @@ -280,7 +279,7 @@ pub trait PositionManager: StateWrite + PositionRead {

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(None, position).await?;
self.update_position(&id, None, position).await?;

Ok(())
}
Expand Down Expand Up @@ -374,7 +373,8 @@ pub trait PositionManager: StateWrite + PositionRead {
.map_err(|e| tracing::warn!(?e, "failed to record position execution"))
.ok();

self.update_position(Some(prev_state), new_state).await
self.update_position(&position_id, Some(prev_state), new_state)
.await
}

/// Withdraw from a closed position, incrementing its sequence number.
Expand Down Expand Up @@ -450,7 +450,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(&position_id, Some(prev_state), new_state)
.await?;

Ok(reserves)
}
Expand All @@ -462,28 +463,28 @@ impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
trait Inner: StateWrite {
/// Writes a position to the state, updating all necessary indexes.
///
/// This should be the SOLE ENTRYPOINT for writing positions to the state.
/// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
/// All other position changes exposed by the `PositionManager` should run through here.
#[instrument(level = "debug", skip_all)]
async fn update_position(
&mut self,
id: &position::Id,
prev_state: Option<Position>,
new_state: Position,
) -> Result<Position> {
let id = new_state.id();
tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");

// Assert `update_position` state transitions invariants:
Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;

// Update the DEX engine indices:
self.update_position_by_price_index(&prev_state, &new_state, &id)?;
self.update_position_by_inventory_index(&prev_state, &new_state, &id)?;
self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id)
self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
.await?;
self.update_trading_pair_position_counter(&prev_state, &new_state, &id)
self.update_trading_pair_position_counter(&prev_state, &new_state)
.await?;
self.update_position_by_price_index(&id, &prev_state, &new_state)?;

self.put(state_key::position_by_id(&id), new_state.clone());
Ok(new_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite {
/// │ └──┘
async fn update_asset_by_base_liquidity_index(
&mut self,
id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
id: &position::Id,
) -> Result<()> {
// We need to reconstruct the position's previous contribution and compute
// its new contribution to the index. We do this for each asset in the pair
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub(crate) trait PositionCounter: StateWrite {
&mut self,
prev_state: &Option<Position>,
new_state: &Position,
_id: &position::Id,
) -> Result<()> {
use position::State::*;
let trading_pair = new_state.phi.pair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use position::State::*;
pub(super) trait PositionByInventoryIndex: StateWrite {
fn update_position_by_inventory_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record of the position, since changes to the
// reserves or the position state might have invalidated it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use cnidarium::StateWrite;
use penumbra_proto::StateWriteProto;

use crate::{
lp::position::{self, Position},
Expand All @@ -12,9 +13,9 @@ use position::State::*;
pub(crate) trait PositionByPriceIndex: StateWrite {
fn update_position_by_price_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record for the position, since changes to the
// reserves or the position state might have invalidated it.
Expand Down Expand Up @@ -57,7 +58,10 @@ trait Inner: StateWrite {
end: pair.asset_2(),
};
let phi12 = phi.component.clone();
self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair12, &phi12, &id),
position.clone(),
);
tracing::debug!("indexing position for 1=>2 trades");
}

Expand All @@ -68,7 +72,10 @@ trait Inner: StateWrite {
end: pair.asset_1(),
};
let phi21 = phi.component.flip();
self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair21, &phi21, &id),
position.clone(),
);
tracing::debug!("indexing position for 2=>1 trades");
}
}
Expand Down
Loading

0 comments on commit 67f8546

Please sign in to comment.