Skip to content

Commit

Permalink
temporarly disable indexing of nomination_pools for specVersion < 100…
Browse files Browse the repository at this point in the history
…2000
  • Loading branch information
paulormart committed Apr 18, 2024
1 parent 5663a2f commit 2562836
Showing 1 changed file with 177 additions and 138 deletions.
315 changes: 177 additions & 138 deletions src/runtimes/kusama.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2549,74 +2549,95 @@ pub async fn cache_nomination_pools_stats(
None => return Err("Current session index not defined".into()),
};

let mut valid_pool = Some(1);
while let Some(pool_id) = valid_pool {
if pool_id > last_pool_id {
valid_pool = None;
} else {
let mut pool_stats = PoolStats::new();
pool_stats.block_number = block_number;
// ***********************************************************************
// TODO: verify how to decode with the metadata available at the block_hash being queried
// SpecVersion > 1002000 changed nomination_pools.bonded_pools resopnse
// ***********************************************************************
let last_runtime_upgrade_addr = node_runtime::storage().system().last_runtime_upgrade();

let bonded_pools_addr = node_runtime::storage()
.nomination_pools()
.bonded_pools(&pool_id);
if let Some(bonded) = api
.storage()
.at(block_hash)
.fetch(&bonded_pools_addr)
.await?
{
pool_stats.points = bonded.points;
pool_stats.member_counter = bonded.member_counter;

// fetch pool stash account staked amount
// let stash_account = nomination_pool_account(AccountType::Bonded, pool_id);
// let account_addr = node_runtime::storage().system().account(&stash_account);
// if let Some(account_info) =
// api.storage().fetch(&account_addr, block_hash).await?
// {
// pool_stats.staked = account_info.data.fee_frozen;
// }

// fetch pool stash account staked amount from staking pallet
let stash_account = nomination_pool_account(AccountType::Bonded, pool_id);
let ledger_addr = node_runtime::storage().staking().ledger(&stash_account);
if let Some(data) = api.storage().at(block_hash).fetch(&ledger_addr).await? {
pool_stats.staked = data.active;
pool_stats.unbonding = data.total - data.active;
}
if let Some(info) = api
.storage()
.at(block_hash)
.fetch(&last_runtime_upgrade_addr)
.await?
{
if info.spec_version >= 1002000 {
let mut valid_pool = Some(1);
while let Some(pool_id) = valid_pool {
if pool_id > last_pool_id {
valid_pool = None;
} else {
let mut pool_stats = PoolStats::new();
pool_stats.block_number = block_number;

// fetch pool reward account free amount
let stash_account = nomination_pool_account(AccountType::Reward, pool_id);
let account_addr = node_runtime::storage().system().account(&stash_account);
if let Some(account_info) =
api.storage().at(block_hash).fetch(&account_addr).await?
{
pool_stats.reward = account_info.data.free;
}
let bonded_pools_addr = node_runtime::storage()
.nomination_pools()
.bonded_pools(&pool_id);
if let Some(bonded) = api
.storage()
.at(block_hash)
.fetch(&bonded_pools_addr)
.await?
{
pool_stats.points = bonded.points;
pool_stats.member_counter = bonded.member_counter;

// fetch pool stash account staked amount
// let stash_account = nomination_pool_account(AccountType::Bonded, pool_id);
// let account_addr = node_runtime::storage().system().account(&stash_account);
// if let Some(account_info) =
// api.storage().fetch(&account_addr, block_hash).await?
// {
// pool_stats.staked = account_info.data.fee_frozen;
// }

// serialize and cache pool
let serialized = serde_json::to_string(&pool_stats)?;
redis::cmd("SET")
.arg(CacheKey::NominationPoolStatsByPoolAndSession(
pool_id,
epoch_index,
))
.arg(serialized)
.query_async(&mut cache as &mut Connection)
.await
.map_err(CacheError::RedisCMDError)?;
}
// fetch pool stash account staked amount from staking pallet
let stash_account =
nomination_pool_account(AccountType::Bonded, pool_id);
let ledger_addr =
node_runtime::storage().staking().ledger(&stash_account);
if let Some(data) =
api.storage().at(block_hash).fetch(&ledger_addr).await?
{
pool_stats.staked = data.active;
pool_stats.unbonding = data.total - data.active;
}

valid_pool = Some(pool_id + 1);
// fetch pool reward account free amount
let stash_account =
nomination_pool_account(AccountType::Reward, pool_id);
let account_addr =
node_runtime::storage().system().account(&stash_account);
if let Some(account_info) =
api.storage().at(block_hash).fetch(&account_addr).await?
{
pool_stats.reward = account_info.data.free;
}

// serialize and cache pool
let serialized = serde_json::to_string(&pool_stats)?;
redis::cmd("SET")
.arg(CacheKey::NominationPoolStatsByPoolAndSession(
pool_id,
epoch_index,
))
.arg(serialized)
.query_async(&mut cache as &mut Connection)
.await
.map_err(CacheError::RedisCMDError)?;
}

valid_pool = Some(pool_id + 1);
}
}
// Log cache processed duration time
info!(
"Pools stats #{} cached ({:?})",
epoch_index,
start.elapsed()
);
}
}
// Log cache processed duration time
info!(
"Pools stats #{} cached ({:?})",
epoch_index,
start.elapsed()
);
}
Ok(())
}
Expand Down Expand Up @@ -2649,90 +2670,108 @@ pub async fn cache_nomination_pools(
None => return Err("Current session index not defined".into()),
};

let mut valid_pool = Some(1);
while let Some(pool_id) = valid_pool {
if pool_id > last_pool_id {
valid_pool = None;
} else {
// Load chain data
let metadata_addr = node_runtime::storage()
.nomination_pools()
.metadata(&pool_id);
if let Some(BoundedVec(metadata)) =
api.storage().at(block_hash).fetch(&metadata_addr).await?
{
let metadata = str(metadata);
let mut pool = Pool::with_id_and_metadata(pool_id, metadata);

let bonded_pools_addr = node_runtime::storage()
.nomination_pools()
.bonded_pools(&pool_id);
if let Some(bonded) = api
.storage()
.at(block_hash)
.fetch(&bonded_pools_addr)
.await?
{
let state = match bonded.state {
PoolState::Blocked => pools::PoolState::Blocked,
PoolState::Destroying => pools::PoolState::Destroying,
_ => pools::PoolState::Open,
};
pool.state = state;

// assign roles
let mut depositor = Account::with_address(bonded.roles.depositor.clone());
depositor.identity =
get_identity(&onet, &bonded.roles.depositor, None).await?;
let root = if let Some(root) = bonded.roles.root {
let mut root_acc = Account::with_address(root.clone());
root_acc.identity = get_identity(&onet, &root, None).await?;
Some(root_acc)
} else {
None
};
let nominator = if let Some(acc) = bonded.roles.nominator {
let mut nominator = Account::with_address(acc.clone());
nominator.identity = get_identity(&onet, &acc, None).await?;
Some(nominator)
} else {
None
};
let state_toggler = if let Some(acc) = bonded.roles.bouncer {
let mut state_toggler = Account::with_address(acc.clone());
state_toggler.identity = get_identity(&onet, &acc, None).await?;
Some(state_toggler)
} else {
None
};

pool.roles = Some(Roles::with(depositor, root, nominator, state_toggler));
pool.block_number = block_number;
// ***********************************************************************
// TODO: verify how to decode with the metadata available at the block_hash being queried
// SpecVersion > 1002000 changed nomination_pools.bonded_pools resopnse
// ***********************************************************************
let last_runtime_upgrade_addr = node_runtime::storage().system().last_runtime_upgrade();

// serialize and cache pool
let serialized = serde_json::to_string(&pool)?;
redis::cmd("SET")
.arg(CacheKey::NominationPoolRecord(pool_id))
.arg(serialized)
if let Some(info) = api
.storage()
.at(block_hash)
.fetch(&last_runtime_upgrade_addr)
.await?
{
if info.spec_version >= 1002000 {
let mut valid_pool = Some(1);
while let Some(pool_id) = valid_pool {
if pool_id > last_pool_id {
valid_pool = None;
} else {
// Load chain data
let metadata_addr = node_runtime::storage()
.nomination_pools()
.metadata(&pool_id);
if let Some(BoundedVec(metadata)) =
api.storage().at(block_hash).fetch(&metadata_addr).await?
{
let metadata = str(metadata);
let mut pool = Pool::with_id_and_metadata(pool_id, metadata.clone());

let bonded_pools_addr = node_runtime::storage()
.nomination_pools()
.bonded_pools(&pool_id);
if let Some(bonded) = api
.storage()
.at(block_hash)
.fetch(&bonded_pools_addr)
.await?
{
let state = match bonded.state {
PoolState::Blocked => pools::PoolState::Blocked,
PoolState::Destroying => pools::PoolState::Destroying,
_ => pools::PoolState::Open,
};
pool.state = state;

// assign roles
let mut depositor =
Account::with_address(bonded.roles.depositor.clone());
depositor.identity =
get_identity(&onet, &bonded.roles.depositor, None).await?;
let root = if let Some(root) = bonded.roles.root {
let mut root_acc = Account::with_address(root.clone());
root_acc.identity = get_identity(&onet, &root, None).await?;
Some(root_acc)
} else {
None
};
let nominator = if let Some(acc) = bonded.roles.nominator {
let mut nominator = Account::with_address(acc.clone());
nominator.identity = get_identity(&onet, &acc, None).await?;
Some(nominator)
} else {
None
};
let state_toggler = if let Some(acc) = bonded.roles.bouncer {
let mut state_toggler = Account::with_address(acc.clone());
state_toggler.identity =
get_identity(&onet, &acc, None).await?;
Some(state_toggler)
} else {
None
};

pool.roles =
Some(Roles::with(depositor, root, nominator, state_toggler));
pool.block_number = block_number;

// serialize and cache pool
let serialized = serde_json::to_string(&pool)?;
redis::cmd("SET")
.arg(CacheKey::NominationPoolRecord(pool_id))
.arg(serialized)
.query_async(&mut cache as &mut Connection)
.await
.map_err(CacheError::RedisCMDError)?;
}
}
// cache pool_id into a sorted set by session
redis::cmd("ZADD")
.arg(CacheKey::NominationPoolIdsBySession(epoch_index))
.arg(0)
.arg(pool_id)
.query_async(&mut cache as &mut Connection)
.await
.map_err(CacheError::RedisCMDError)?;

valid_pool = Some(pool_id + 1);
}
}
// cache pool_id into a sorted set by session
redis::cmd("ZADD")
.arg(CacheKey::NominationPoolIdsBySession(epoch_index))
.arg(0)
.arg(pool_id)
.query_async(&mut cache as &mut Connection)
.await
.map_err(CacheError::RedisCMDError)?;

valid_pool = Some(pool_id + 1);
// Log cache processed duration time
info!("Pools #{} cached ({:?})", epoch_index, start.elapsed());
}
}
// Log cache processed duration time
info!("Pools #{} cached ({:?})", epoch_index, start.elapsed());
}
Ok(())
}
Expand Down

0 comments on commit 2562836

Please sign in to comment.