Skip to content

Commit

Permalink
dex: augment price indexes with position data
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor authored and conorsch committed Jun 13, 2024
1 parent 6f47537 commit b584c26
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 113 deletions.
6 changes: 4 additions & 2 deletions crates/bin/pd/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub enum Migration {
Testnet77,
/// Testnet-78 migration:
/// - Truncate various user-supplied `String` fields to a maximum length.
/// - Populate the DEX NV price idnexes with position data
Testnet78,
}

Expand Down Expand Up @@ -90,11 +91,12 @@ impl Migration {
Migration::Testnet78 => {
testnet78::migrate(storage, pd_home.clone(), genesis_start).await?
}
_ => unreachable!(),
// We keep historical migrations around for now, this will help inform an abstracted
// design. Feel free to remove it if it's causing you trouble.
_ => unimplemented!("the specified migration is unimplemented"),
}

if let Some(comet_home) = comet_home {
// TODO avoid this when refactoring to clean up migrations
let genesis_path = pd_home.join("genesis.json");
migrate_comet_data(comet_home, genesis_path).await?;
}
Expand Down
89 changes: 61 additions & 28 deletions crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! Contains functions related to the migration script of Testnet78.
use anyhow::Context;
use cnidarium::{Snapshot, StateDelta, Storage};
use cnidarium::{Snapshot, StateDelta, StateWrite, Storage};
use futures::TryStreamExt as _;
use futures::{pin_mut, StreamExt};
use jmt::RootHash;
use penumbra_app::app::StateReadExt as _;
use penumbra_dex::component::PositionManager;
use penumbra_dex::lp::position;
use penumbra_dex::lp::position::Position;
use penumbra_governance::proposal_state::State as ProposalState;
use penumbra_governance::Proposal;
use penumbra_governance::StateReadExt as _;
use penumbra_governance::StateWriteExt;
use penumbra_governance::StateWriteExt as _;
use penumbra_proto::core::component::governance::v1 as pb_governance;
use penumbra_proto::{StateReadProto as _, StateWriteProto as _};
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_sct::component::clock::EpochManager;
use penumbra_sct::component::clock::EpochRead as _;
use penumbra_sct::component::clock::EpochRead;
use penumbra_stake::validator::Validator;
use std::path::PathBuf;
use tracing::instrument;
Expand Down Expand Up @@ -41,32 +44,36 @@ use crate::testnet::generate::TestnetConfig;
/// - `client_id` (128 bytes)
/// * Governance Signaling Proposals:
/// - `commit hash` (255 bytes)
/// - Close and re-open all *open* positions so that they are re-indexed.
#[instrument]
pub async fn migrate(
storage: Storage,
pd_home: PathBuf,
genesis_start: Option<tendermint::time::Time>,
) -> anyhow::Result<()> {
// Setup:
/* `Migration::prepare`: collect basic migration data, logging, initialize alt-storage if needed */
let initial_state = storage.latest_snapshot();

let chain_id = initial_state.get_chain_id().await?;
let root_hash = initial_state
.root_hash()
.await
.expect("chain state has a root hash");
let pre_upgrade_root_hash: RootHash = root_hash.into();

let pre_upgrade_height = initial_state
.get_block_height()
.await
.expect("chain state has a block height");
let post_upgrade_height = pre_upgrade_height.wrapping_add(1);

// We initialize a `StateDelta` and start by reaching into the JMT for all entries matching the
// swap execution prefix. Then, we write each entry to the nv-storage.
let pre_upgrade_root_hash: RootHash = root_hash.into();

/* `Migration::migrate`: reach into the chain state and perform an offline state transition */
let mut delta = StateDelta::new(initial_state);
tracing::info!("beginning migration steps");

let (migration_duration, post_upgrade_root_hash) = {
let start_time = std::time::SystemTime::now();

// Adjust the length of `Validator` fields.
truncate_validator_fields(&mut delta).await?;

Expand All @@ -76,30 +83,26 @@ pub async fn migrate(
// Adjust the length of governance proposal outcome fields.
truncate_proposal_outcome_fields(&mut delta).await?;

// Re-index all open positions.
reindex_dex_positions(&mut delta).await?;

// Reset the application height and halt flag.
delta.ready_to_start();
delta.put_block_height(0u64);

// Finally, commit the changes to the chain state.
let post_upgrade_root_hash = storage.commit_in_place(delta).await?;
tracing::info!(?post_upgrade_root_hash, "post-migration root hash");

(
start_time.elapsed().expect("start time not set"),
start_time.elapsed().expect("start is set"),
post_upgrade_root_hash,
)
};
tracing::info!("completed migration steps");

// Set halt bit to 0, so chain can start again.
let migrated_state = storage.latest_snapshot();
let mut delta = StateDelta::new(migrated_state);
delta.ready_to_start();
delta.put_block_height(0u64);
let _ = storage
.commit_in_place(delta)
.await
.context("failed to reset halt bit")?;
storage.release().await;

// The migration is complete, now we need to generate a genesis file. To do this, we need
// to lookup a validator view from the chain, and specify the post-upgrade app hash and
// initial height.
tracing::info!("migration completed, generating genesis and signing state...");

/* `Migration::complete`: the state transition has been performed, we prepare the checkpointed genesis and signing state */
let app_state = penumbra_app::genesis::Content {
chain_id,
..Default::default()
Expand All @@ -110,22 +113,26 @@ pub async fn migrate(
.to_vec()
.try_into()
.expect("infaillible conversion");

genesis.initial_height = post_upgrade_height as i64;
genesis.genesis_time = genesis_start.unwrap_or_else(|| {
let now = tendermint::time::Time::now();
tracing::info!(%now, "no genesis time provided, detecting a testing setup");
now
});

tracing::info!("generating checkpointed genesis");
let checkpoint = post_upgrade_root_hash.0.to_vec();
let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));

tracing::info!("writing genesis to disk");
let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
tracing::info!("genesis: {}", genesis_json);
let genesis_path = pd_home.join("genesis.json");
std::fs::write(genesis_path, genesis_json).expect("can write genesis");

tracing::info!("updating signing state");
let validator_state_path = pd_home.join("priv_validator_state.json");

let fresh_validator_state = crate::testnet::generate::TestnetValidator::initial_state();
std::fs::write(validator_state_path, fresh_validator_state).expect("can write validator state");

Expand All @@ -135,12 +142,38 @@ pub async fn migrate(
?pre_upgrade_root_hash,
?post_upgrade_root_hash,
duration = migration_duration.as_secs(),
"successful migration!"
"migration fully complete"
);

Ok(())
}

async fn reindex_dex_positions(delta: &mut StateDelta<Snapshot>) -> anyhow::Result<()> {
tracing::info!("running dex re-indexing migration");
let prefix_key_lp = penumbra_dex::state_key::all_positions();
let stream_all_lp = delta.prefix::<Position>(&prefix_key_lp);
let stream_open_lp = stream_all_lp.filter_map(|entry| async {
match entry {
Ok((_, lp)) if lp.state == position::State::Opened => Some(lp),
_ => None,
}
});
pin_mut!(stream_open_lp);

while let Some(lp) = stream_open_lp.next().await {
// Re-hash the position, since the key is a bech32 string.
let id = lp.id();
// Close the position, adjusting all its index entries.
delta.close_position_by_id(&id).await?;
// Erase the position from the state, so that we circumvent the `update_position` guard.
delta.delete(penumbra_dex::state_key::position_by_id(&id));
// Open a position with the adjusted indexing logic.
delta.open_position(lp).await?;
}
tracing::info!("completed dex migration");
Ok(())
}

/// * Validators:
/// - `name` (140 bytes)
/// - `website` (70 bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod tests {

let position = buy_1;
state_tx
.update_position_by_price_index(&None, &position, &position.id())
.update_position_by_price_index(&position.id(), &None, &position)
.expect("can update price index");
state_tx.put(state_key::position_by_id(&id), position);

Expand Down
39 changes: 20 additions & 19 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ pub trait PositionRead: StateRead {
fn positions_by_price(
&self,
pair: &DirectedTradingPair,
) -> Pin<Box<dyn Stream<Item = Result<position::Id>> + Send + 'static>> {
) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
{
let prefix = engine::price_index::prefix(pair);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
self.nonverifiable_prefix_raw(&prefix)
self.nonverifiable_prefix(&prefix)
.map(|entry| match entry {
Ok((k, _)) => {
Ok((k, lp)) => {
let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
Ok(position::Id(raw_id))
Ok((position::Id(raw_id), lp))
}
Err(e) => Err(e),
})
Expand All @@ -90,12 +91,9 @@ pub trait PositionRead: StateRead {
async fn best_position(
&self,
pair: &DirectedTradingPair,
) -> Result<Option<position::Position>> {
) -> Result<Option<(position::Id, position::Position)>> {
let mut positions_by_price = self.positions_by_price(pair);
match positions_by_price.next().await.transpose()? {
Some(id) => self.position_by_id(&id).await,
None => Ok(None),
}
positions_by_price.next().await.transpose()
}

/// Fetch the list of pending position closures.
Expand Down Expand Up @@ -205,7 +203,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(id, Some(prev_state), new_state)
.await?;

Ok(())
}
Expand Down Expand Up @@ -280,7 +279,7 @@ pub trait PositionManager: StateWrite + PositionRead {

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(None, position).await?;
self.update_position(&id, None, position).await?;

Ok(())
}
Expand Down Expand Up @@ -374,7 +373,8 @@ pub trait PositionManager: StateWrite + PositionRead {
.map_err(|e| tracing::warn!(?e, "failed to record position execution"))
.ok();

self.update_position(Some(prev_state), new_state).await
self.update_position(&position_id, Some(prev_state), new_state)
.await
}

/// Withdraw from a closed position, incrementing its sequence number.
Expand Down Expand Up @@ -450,7 +450,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(&position_id, Some(prev_state), new_state)
.await?;

Ok(reserves)
}
Expand All @@ -462,28 +463,28 @@ impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
trait Inner: StateWrite {
/// Writes a position to the state, updating all necessary indexes.
///
/// This should be the SOLE ENTRYPOINT for writing positions to the state.
/// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
/// All other position changes exposed by the `PositionManager` should run through here.
#[instrument(level = "debug", skip_all)]
async fn update_position(
&mut self,
id: &position::Id,
prev_state: Option<Position>,
new_state: Position,
) -> Result<Position> {
let id = new_state.id();
tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");

// Assert `update_position` state transitions invariants:
Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;

// Update the DEX engine indices:
self.update_position_by_price_index(&prev_state, &new_state, &id)?;
self.update_position_by_inventory_index(&prev_state, &new_state, &id)?;
self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id)
self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
.await?;
self.update_trading_pair_position_counter(&prev_state, &new_state, &id)
self.update_trading_pair_position_counter(&prev_state, &new_state)
.await?;
self.update_position_by_price_index(&id, &prev_state, &new_state)?;

self.put(state_key::position_by_id(&id), new_state.clone());
Ok(new_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite {
/// │ └──┘
async fn update_asset_by_base_liquidity_index(
&mut self,
id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
id: &position::Id,
) -> Result<()> {
// We need to reconstruct the position's previous contribution and compute
// its new contribution to the index. We do this for each asset in the pair
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub(crate) trait PositionCounter: StateWrite {
&mut self,
prev_state: &Option<Position>,
new_state: &Position,
_id: &position::Id,
) -> Result<()> {
use position::State::*;
let trading_pair = new_state.phi.pair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use position::State::*;
pub(super) trait PositionByInventoryIndex: StateWrite {
fn update_position_by_inventory_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record of the position, since changes to the
// reserves or the position state might have invalidated it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use cnidarium::StateWrite;
use penumbra_proto::StateWriteProto;

use crate::{
lp::position::{self, Position},
Expand All @@ -12,9 +13,9 @@ use position::State::*;
pub(crate) trait PositionByPriceIndex: StateWrite {
fn update_position_by_price_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record for the position, since changes to the
// reserves or the position state might have invalidated it.
Expand Down Expand Up @@ -57,7 +58,10 @@ trait Inner: StateWrite {
end: pair.asset_2(),
};
let phi12 = phi.component.clone();
self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair12, &phi12, &id),
position.clone(),
);
tracing::debug!("indexing position for 1=>2 trades");
}

Expand All @@ -68,7 +72,10 @@ trait Inner: StateWrite {
end: pair.asset_1(),
};
let phi21 = phi.component.flip();
self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair21, &phi21, &id),
position.clone(),
);
tracing::debug!("indexing position for 2=>1 trades");
}
}
Expand Down
Loading

0 comments on commit b584c26

Please sign in to comment.