Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: fold Positions into the price indexes #4564

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/bin/pd/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub enum Migration {
Testnet77,
/// Testnet-78 migration:
/// - Truncate various user-supplied `String` fields to a maximum length.
/// - Populate the DEX NV price idnexes with position data
Testnet78,
}

Expand Down Expand Up @@ -90,11 +91,12 @@ impl Migration {
Migration::Testnet78 => {
testnet78::migrate(storage, pd_home.clone(), genesis_start).await?
}
_ => unreachable!(),
// We keep historical migrations around for now, this will help inform an abstracted
// design. Feel free to remove it if it's causing you trouble.
_ => unimplemented!("the specified migration is unimplemented"),
}

if let Some(comet_home) = comet_home {
// TODO avoid this when refactoring to clean up migrations
let genesis_path = pd_home.join("genesis.json");
migrate_comet_data(comet_home, genesis_path).await?;
}
Expand Down
89 changes: 61 additions & 28 deletions crates/bin/pd/src/migrate/testnet78.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
//! Contains functions related to the migration script of Testnet78.
use anyhow::Context;
use cnidarium::{Snapshot, StateDelta, Storage};
use cnidarium::{Snapshot, StateDelta, StateWrite, Storage};
use futures::TryStreamExt as _;
use futures::{pin_mut, StreamExt};
use jmt::RootHash;
use penumbra_app::app::StateReadExt as _;
use penumbra_dex::component::PositionManager;
use penumbra_dex::lp::position;
use penumbra_dex::lp::position::Position;
use penumbra_governance::proposal_state::State as ProposalState;
use penumbra_governance::Proposal;
use penumbra_governance::StateReadExt as _;
use penumbra_governance::StateWriteExt;
use penumbra_governance::StateWriteExt as _;
use penumbra_proto::core::component::governance::v1 as pb_governance;
use penumbra_proto::{StateReadProto as _, StateWriteProto as _};
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_sct::component::clock::EpochManager;
use penumbra_sct::component::clock::EpochRead as _;
use penumbra_sct::component::clock::EpochRead;
use penumbra_stake::validator::Validator;
use std::path::PathBuf;
use tracing::instrument;
Expand Down Expand Up @@ -41,32 +44,36 @@ use crate::testnet::generate::TestnetConfig;
/// - `client_id` (128 bytes)
/// * Governance Signaling Proposals:
/// - `commit hash` (255 bytes)
/// - Close and re-open all *open* positions so that they are re-indexed.
#[instrument]
pub async fn migrate(
storage: Storage,
pd_home: PathBuf,
genesis_start: Option<tendermint::time::Time>,
) -> anyhow::Result<()> {
// Setup:
/* `Migration::prepare`: collect basic migration data, logging, initialize alt-storage if needed */
let initial_state = storage.latest_snapshot();

let chain_id = initial_state.get_chain_id().await?;
let root_hash = initial_state
.root_hash()
.await
.expect("chain state has a root hash");
let pre_upgrade_root_hash: RootHash = root_hash.into();

let pre_upgrade_height = initial_state
.get_block_height()
.await
.expect("chain state has a block height");
let post_upgrade_height = pre_upgrade_height.wrapping_add(1);

// We initialize a `StateDelta` and start by reaching into the JMT for all entries matching the
// swap execution prefix. Then, we write each entry to the nv-storage.
let pre_upgrade_root_hash: RootHash = root_hash.into();

/* `Migration::migrate`: reach into the chain state and perform an offline state transition */
let mut delta = StateDelta::new(initial_state);
tracing::info!("beginning migration steps");

let (migration_duration, post_upgrade_root_hash) = {
let start_time = std::time::SystemTime::now();

// Adjust the length of `Validator` fields.
truncate_validator_fields(&mut delta).await?;

Expand All @@ -76,30 +83,26 @@ pub async fn migrate(
// Adjust the length of governance proposal outcome fields.
truncate_proposal_outcome_fields(&mut delta).await?;

// Re-index all open positions.
reindex_dex_positions(&mut delta).await?;

// Reset the application height and halt flag.
delta.ready_to_start();
delta.put_block_height(0u64);

// Finally, commit the changes to the chain state.
let post_upgrade_root_hash = storage.commit_in_place(delta).await?;
tracing::info!(?post_upgrade_root_hash, "post-migration root hash");

(
start_time.elapsed().expect("start time not set"),
start_time.elapsed().expect("start is set"),
post_upgrade_root_hash,
)
};
tracing::info!("completed migration steps");

// Set halt bit to 0, so chain can start again.
let migrated_state = storage.latest_snapshot();
let mut delta = StateDelta::new(migrated_state);
delta.ready_to_start();
delta.put_block_height(0u64);
let _ = storage
.commit_in_place(delta)
.await
.context("failed to reset halt bit")?;
storage.release().await;

// The migration is complete, now we need to generate a genesis file. To do this, we need
// to lookup a validator view from the chain, and specify the post-upgrade app hash and
// initial height.
tracing::info!("migration completed, generating genesis and signing state...");

/* `Migration::complete`: the state transition has been performed, we prepare the checkpointed genesis and signing state */
let app_state = penumbra_app::genesis::Content {
chain_id,
..Default::default()
Expand All @@ -110,22 +113,26 @@ pub async fn migrate(
.to_vec()
.try_into()
.expect("infaillible conversion");

genesis.initial_height = post_upgrade_height as i64;
genesis.genesis_time = genesis_start.unwrap_or_else(|| {
let now = tendermint::time::Time::now();
tracing::info!(%now, "no genesis time provided, detecting a testing setup");
now
});

tracing::info!("generating checkpointed genesis");
let checkpoint = post_upgrade_root_hash.0.to_vec();
let genesis = TestnetConfig::make_checkpoint(genesis, Some(checkpoint));

tracing::info!("writing genesis to disk");
let genesis_json = serde_json::to_string(&genesis).expect("can serialize genesis");
tracing::info!("genesis: {}", genesis_json);
let genesis_path = pd_home.join("genesis.json");
std::fs::write(genesis_path, genesis_json).expect("can write genesis");

tracing::info!("updating signing state");
let validator_state_path = pd_home.join("priv_validator_state.json");

let fresh_validator_state = crate::testnet::generate::TestnetValidator::initial_state();
std::fs::write(validator_state_path, fresh_validator_state).expect("can write validator state");

Expand All @@ -135,12 +142,38 @@ pub async fn migrate(
?pre_upgrade_root_hash,
?post_upgrade_root_hash,
duration = migration_duration.as_secs(),
"successful migration!"
"migration fully complete"
);

Ok(())
}

async fn reindex_dex_positions(delta: &mut StateDelta<Snapshot>) -> anyhow::Result<()> {
tracing::info!("running dex re-indexing migration");
let prefix_key_lp = penumbra_dex::state_key::all_positions();
let stream_all_lp = delta.prefix::<Position>(&prefix_key_lp);
let stream_open_lp = stream_all_lp.filter_map(|entry| async {
match entry {
Ok((_, lp)) if lp.state == position::State::Opened => Some(lp),
_ => None,
}
});
pin_mut!(stream_open_lp);

while let Some(lp) = stream_open_lp.next().await {
// Re-hash the position, since the key is a bech32 string.
let id = lp.id();
// Close the position, adjusting all its index entries.
delta.close_position_by_id(&id).await?;
// Erase the position from the state, so that we circumvent the `update_position` guard.
delta.delete(penumbra_dex::state_key::position_by_id(&id));
// Open a position with the adjusted indexing logic.
delta.open_position(lp).await?;
}
tracing::info!("completed dex migration");
Ok(())
}

/// * Validators:
/// - `name` (140 bytes)
/// - `website` (70 bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod tests {

let position = buy_1;
state_tx
.update_position_by_price_index(&None, &position, &position.id())
.update_position_by_price_index(&position.id(), &None, &position)
.expect("can update price index");
state_tx.put(state_key::position_by_id(&id), position);

Expand Down
39 changes: 20 additions & 19 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ pub trait PositionRead: StateRead {
fn positions_by_price(
&self,
pair: &DirectedTradingPair,
) -> Pin<Box<dyn Stream<Item = Result<position::Id>> + Send + 'static>> {
) -> Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send + 'static>>
{
let prefix = engine::price_index::prefix(pair);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
self.nonverifiable_prefix_raw(&prefix)
self.nonverifiable_prefix(&prefix)
.map(|entry| match entry {
Ok((k, _)) => {
Ok((k, lp)) => {
let raw_id = <&[u8; 32]>::try_from(&k[103..135])?.to_owned();
Ok(position::Id(raw_id))
Ok((position::Id(raw_id), lp))
}
Err(e) => Err(e),
})
Expand All @@ -90,12 +91,9 @@ pub trait PositionRead: StateRead {
async fn best_position(
&self,
pair: &DirectedTradingPair,
) -> Result<Option<position::Position>> {
) -> Result<Option<(position::Id, position::Position)>> {
let mut positions_by_price = self.positions_by_price(pair);
match positions_by_price.next().await.transpose()? {
Some(id) => self.position_by_id(&id).await,
None => Ok(None),
}
positions_by_price.next().await.transpose()
}

/// Fetch the list of pending position closures.
Expand Down Expand Up @@ -205,7 +203,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(id, Some(prev_state), new_state)
.await?;

Ok(())
}
Expand Down Expand Up @@ -280,7 +279,7 @@ pub trait PositionManager: StateWrite + PositionRead {

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(None, position).await?;
self.update_position(&id, None, position).await?;

Ok(())
}
Expand Down Expand Up @@ -374,7 +373,8 @@ pub trait PositionManager: StateWrite + PositionRead {
.map_err(|e| tracing::warn!(?e, "failed to record position execution"))
.ok();

self.update_position(Some(prev_state), new_state).await
self.update_position(&position_id, Some(prev_state), new_state)
.await
}

/// Withdraw from a closed position, incrementing its sequence number.
Expand Down Expand Up @@ -450,7 +450,8 @@ pub trait PositionManager: StateWrite + PositionRead {
new_state
};

self.update_position(Some(prev_state), new_state).await?;
self.update_position(&position_id, Some(prev_state), new_state)
.await?;

Ok(reserves)
}
Expand All @@ -462,28 +463,28 @@ impl<T: StateWrite + ?Sized + Chandelier> PositionManager for T {}
trait Inner: StateWrite {
/// Writes a position to the state, updating all necessary indexes.
///
/// This should be the SOLE ENTRYPOINT for writing positions to the state.
/// This should be the **SOLE ENTRYPOINT** for writing positions to the state.
/// All other position changes exposed by the `PositionManager` should run through here.
#[instrument(level = "debug", skip_all)]
async fn update_position(
&mut self,
id: &position::Id,
erwanor marked this conversation as resolved.
Show resolved Hide resolved
prev_state: Option<Position>,
new_state: Position,
) -> Result<Position> {
let id = new_state.id();
tracing::debug!(?id, prev_position_state = ?prev_state.as_ref().map(|p| &p.state), new_position_state = ?new_state.state, "updating position state");
tracing::trace!(?id, ?prev_state, ?new_state, "updating position state");

// Assert `update_position` state transitions invariants:
Self::guard_invalid_transitions(&prev_state, &new_state, &id)?;

// Update the DEX engine indices:
self.update_position_by_price_index(&prev_state, &new_state, &id)?;
self.update_position_by_inventory_index(&prev_state, &new_state, &id)?;
self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id)
self.update_position_by_inventory_index(&id, &prev_state, &new_state)?;
self.update_asset_by_base_liquidity_index(&id, &prev_state, &new_state)
.await?;
self.update_trading_pair_position_counter(&prev_state, &new_state, &id)
self.update_trading_pair_position_counter(&prev_state, &new_state)
.await?;
self.update_position_by_price_index(&id, &prev_state, &new_state)?;

self.put(state_key::position_by_id(&id), new_state.clone());
Ok(new_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite {
/// │ └──┘
async fn update_asset_by_base_liquidity_index(
&mut self,
id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
id: &position::Id,
) -> Result<()> {
// We need to reconstruct the position's previous contribution and compute
// its new contribution to the index. We do this for each asset in the pair
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub(crate) trait PositionCounter: StateWrite {
&mut self,
prev_state: &Option<Position>,
new_state: &Position,
_id: &position::Id,
) -> Result<()> {
use position::State::*;
let trading_pair = new_state.phi.pair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use position::State::*;
pub(super) trait PositionByInventoryIndex: StateWrite {
fn update_position_by_inventory_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record of the position, since changes to the
// reserves or the position state might have invalidated it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use cnidarium::StateWrite;
use penumbra_proto::StateWriteProto;

use crate::{
lp::position::{self, Position},
Expand All @@ -12,9 +13,9 @@ use position::State::*;
pub(crate) trait PositionByPriceIndex: StateWrite {
fn update_position_by_price_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
position_id: &position::Id,
) -> Result<()> {
// Clear an existing record for the position, since changes to the
// reserves or the position state might have invalidated it.
Expand Down Expand Up @@ -57,7 +58,10 @@ trait Inner: StateWrite {
end: pair.asset_2(),
};
let phi12 = phi.component.clone();
self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair12, &phi12, &id),
position.clone(),
);
tracing::debug!("indexing position for 1=>2 trades");
}

Expand All @@ -68,7 +72,10 @@ trait Inner: StateWrite {
end: pair.asset_1(),
};
let phi21 = phi.component.flip();
self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]);
self.nonverifiable_put(
engine::price_index::key(&pair21, &phi21, &id),
position.clone(),
);
tracing::debug!("indexing position for 2=>1 trades");
}
}
Expand Down
Loading
Loading