Skip to content

Commit

Permalink
feat(synchronizer): query bootstrap nodes on highest epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
matthias-wright committed Feb 13, 2024
1 parent e0c3a61 commit 5c92662
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6192,6 +6192,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"fleek-crypto",
"futures",
"lightning-interfaces",
"lightning-metrics",
"lightning-utils",
Expand Down
3 changes: 2 additions & 1 deletion core/e2e/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::{Duration, SystemTime};
use anyhow::Result;
use lightning_e2e::swarm::Swarm;
use lightning_e2e::utils::{logging, rpc};
use lightning_interfaces::types::Epoch;
use resolved_pathbuf::ResolvedPathBuf;
use serde_json::json;
use serial_test::serial;
Expand Down Expand Up @@ -66,7 +67,7 @@ async fn e2e_checkpoint() -> Result<()> {
.await
.unwrap();

let epoch_hash = rpc::parse_response::<[u8; 32]>(response)
let (epoch_hash, _) = rpc::parse_response::<([u8; 32], Epoch)>(response)
.await
.expect("Failed to parse response.");
if target_hash.is_none() {
Expand Down
2 changes: 1 addition & 1 deletion core/rpc/src/api/flk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub trait FleekApi {
) -> RpcResult<Vec<((NodePublicKey, NodePublicKey), Duration)>>;

#[method(name = "get_last_epoch_hash")]
async fn get_last_epoch_hash(&self) -> RpcResult<[u8; 32]>;
async fn get_last_epoch_hash(&self) -> RpcResult<([u8; 32], Epoch)>;

#[method(name = "send_txn")]
async fn send_txn(&self, tx: TransactionRequest) -> RpcResult<()>;
Expand Down
7 changes: 5 additions & 2 deletions core/rpc/src/logic/flk_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl<C: Collection> FleekApiServer for FleekApi<C> {
.collect())
}

async fn get_last_epoch_hash(&self) -> RpcResult<[u8; 32]> {
async fn get_last_epoch_hash(&self) -> RpcResult<([u8; 32], Epoch)> {
let last_epoch_hash = match self
.data
.query_runner
Expand All @@ -371,7 +371,10 @@ impl<C: Collection> FleekApiServer for FleekApi<C> {
Some(Value::Hash(hash)) => hash,
_ => [0; 32],
};
Ok(last_epoch_hash)
Ok((
last_epoch_hash,
self.data.query_runner.get_epoch_info().epoch,
))
}

async fn send_txn(&self, tx: TransactionRequest) -> RpcResult<()> {
Expand Down
1 change: 1 addition & 0 deletions core/syncronizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
lightning-interfaces = { path = "../interfaces" }
lightning-utils = { path = "../utils" }
lightning-metrics = { path = "../metrics" }
futures.workspace = true
anyhow.workspace = true
tracing.workspace = true
reqwest.workspace = true
Expand Down
92 changes: 81 additions & 11 deletions core/syncronizer/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::IpAddr;

use anyhow::{anyhow, Result};
use fleek_crypto::NodePublicKey;
use lightning_interfaces::types::{EpochInfo, NodeIndex, NodeInfo};
use lightning_interfaces::types::{Epoch, EpochInfo, NodeIndex, NodeInfo};
use serde::de::DeserializeOwned;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -33,18 +33,53 @@ pub async fn rpc_request<T: DeserializeOwned>(
}

pub async fn ask_nodes<T: DeserializeOwned>(
req: String,
nodes: &[(NodeIndex, NodeInfo)],
rpc_client: &reqwest::Client,
) -> Result<Vec<T>> {
let mut futs = Vec::new();
for (_, node) in nodes {
let req_clone = req.clone();
let fut = async move {
rpc_request::<T>(rpc_client, node.domain, node.ports.rpc, req_clone)
.await
.ok()
};
futs.push(fut);
}

let results: Vec<T> = futures::future::join_all(futs)
.await
.into_iter()
.flatten()
.map(|x| x.result)
.collect();

if results.is_empty() {
Err(anyhow!("Unable to get a responce from nodes"))
} else {
Ok(results)
}
}

pub async fn ask_nodes_old<T: DeserializeOwned>(
req: String,
nodes: &Vec<(NodeIndex, NodeInfo)>,
rpc_client: &reqwest::Client,
) -> Result<T> {
) -> Result<Vec<T>> {
let mut results = Vec::with_capacity(nodes.len());
for (_, node) in nodes {
if let Ok(res) =
rpc_request::<T>(rpc_client, node.domain, node.ports.rpc, req.clone()).await
{
return Ok(res.result);
results.push(res.result);
}
}
Err(anyhow!("Unable to get a responce from nodes"))
if results.is_empty() {
Err(anyhow!("Unable to get a responce from nodes"))
} else {
Ok(results)
}
}

/// Runs the given future to completion on the current tokio runtime.
Expand All @@ -65,7 +100,13 @@ pub async fn get_epoch_info(
nodes: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
) -> Result<EpochInfo> {
ask_nodes(rpc_epoch_info().to_string(), &nodes, &rpc_client).await
let mut epochs: Vec<EpochInfo> =
ask_nodes(rpc_epoch_info().to_string(), &nodes, &rpc_client).await?;
if epochs.is_empty() {
return Err(anyhow!("Failed to get epoch info from bootstrap nodes"));
}
epochs.sort_by(|a, b| a.epoch.partial_cmp(&b.epoch).unwrap());
Ok(epochs.pop().unwrap())
}

/// Returns the node info for our node, if it's already on the state.
Expand All @@ -74,12 +115,18 @@ pub async fn get_node_info(
nodes: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
) -> Result<Option<NodeInfo>> {
ask_nodes(
let mut node_info: Vec<(Option<NodeInfo>, Epoch)> = ask_nodes(
rpc_node_info(&node_public_key).to_string(),
&nodes,
&rpc_client,
)
.await
.await?;

if node_info.is_empty() {
return Err(anyhow!("Failed to get node info from bootstrap nodes"));
}
node_info.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap());
Ok(node_info.pop().unwrap().0)
}

/// Returns the node info for our node, if it's already on the state.
Expand All @@ -88,12 +135,35 @@ pub async fn check_is_valid_node(
nodes: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
) -> Result<bool> {
ask_nodes(
let mut is_valid: Vec<(bool, Epoch)> = ask_nodes(
rpc_is_valid_node(&node_public_key).to_string(),
&nodes,
&rpc_client,
)
.await
.await?;

if is_valid.is_empty() {
return Err(anyhow!("Failed to get node validity from bootstrap nodes"));
}
is_valid.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap());
Ok(is_valid.pop().unwrap().0)
}

/// Returns the hash of the last epoch ckpt, and the current epoch.
pub async fn last_epoch_hash(
nodes: &[(NodeIndex, NodeInfo)],
rpc_client: &reqwest::Client,
) -> Result<[u8; 32]> {
let mut hash: Vec<([u8; 32], Epoch)> =
ask_nodes(rpc_last_epoch_hash().to_string(), nodes, rpc_client).await?;

if hash.is_empty() {
return Err(anyhow!(
"Failed to get last epoch hash from bootstrap nodes"
));
}
hash.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap());
Ok(hash.pop().unwrap().0)
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
Expand Down Expand Up @@ -134,7 +204,7 @@ pub fn rpc_epoch_info() -> serde_json::Value {
pub fn rpc_node_info(public_key: &NodePublicKey) -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_get_node_info",
"method":"flk_get_node_info_epoch",
"params":{"public_key": public_key},
"id":1,
})
Expand All @@ -143,7 +213,7 @@ pub fn rpc_node_info(public_key: &NodePublicKey) -> serde_json::Value {
pub fn rpc_is_valid_node(public_key: &NodePublicKey) -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_is_valid_node",
"method":"flk_is_valid_node_epoch",
"params":{"public_key": public_key},
"id":1,
})
Expand Down
25 changes: 15 additions & 10 deletions core/syncronizer/src/syncronizer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Mutex;
use std::time::{Duration, SystemTime};

use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};
use fleek_crypto::NodePublicKey;
use lightning_interfaces::infu_collection::{c, Collection};
use lightning_interfaces::types::{
Expand Down Expand Up @@ -34,7 +34,7 @@ use tokio::task::JoinHandle;
use tracing::info;

use crate::config::Config;
use crate::rpc::{self, rpc_epoch, rpc_last_epoch_hash};
use crate::rpc::{self, rpc_epoch};
use crate::utils;

pub struct Syncronizer<C: Collection> {
Expand Down Expand Up @@ -166,13 +166,13 @@ impl<C: Collection> Syncronizer<C> {
// passed into a thread, and thus need to satisfy the 'static lifetime.

// Check if node is staked.
if !rpc::sync_call(rpc::check_is_valid_node(
let is_valid = rpc::sync_call(rpc::check_is_valid_node(
our_public_key,
genesis_committee.clone(),
rpc_client.clone(),
))
.expect("Cannot reach bootstrap nodes")
{
.expect("Cannot reach bootstrap nodes");
if is_valid {
// TODO(matthias): print this message in a loop?
println!(
"The node is not staked. Only staked nodes can participate in the network. Submit a stake transaction and try again."
Expand All @@ -185,7 +185,7 @@ impl<C: Collection> Syncronizer<C> {
rpc_client.clone(),
))
.expect("Cannot reach bootstrap nodes")
.unwrap(); // safe unwrap because we check if the node is valid above
.unwrap(); // we unwrap here because we already checked if the node is valid above

let epoch_info = rpc::sync_call(rpc::get_epoch_info(
genesis_committee.clone(),
Expand Down Expand Up @@ -316,6 +316,7 @@ impl<C: Collection> SyncronizerInner<C> {

// Get the epoch the bootstrap nodes are at
let bootstrap_epoch = self.get_current_epoch().await?;

//let bootstrap_epoch = self.ask_bootstrap_nodes(rpc_epoch().to_string()).await?;

if bootstrap_epoch <= current_epoch {
Expand All @@ -339,7 +340,7 @@ impl<C: Collection> SyncronizerInner<C> {
}

/// This function will rpc request genesis nodes in sequence and stop when one of them responds
async fn ask_bootstrap_nodes<T: DeserializeOwned>(&self, req: String) -> Result<T> {
async fn ask_bootstrap_nodes<T: DeserializeOwned>(&self, req: String) -> Result<Vec<T>> {
rpc::ask_nodes(req, &self.genesis_committee, &self.rpc_client).await
}

Expand All @@ -366,13 +367,17 @@ impl<C: Collection> SyncronizerInner<C> {
// This function will hit the bootstrap nodes(Genesis committee) to ask what epoch they are on
// who the current committee is
async fn get_latest_checkpoint_hash(&self) -> Result<[u8; 32]> {
self.ask_bootstrap_nodes(rpc_last_epoch_hash().to_string())
.await
rpc::last_epoch_hash(&self.genesis_committee, &self.rpc_client).await
}

/// Returns the epoch the bootstrap nodes are on
async fn get_current_epoch(&self) -> Result<Epoch> {
self.ask_bootstrap_nodes(rpc_epoch().to_string()).await
let epochs = self.ask_bootstrap_nodes(rpc_epoch().to_string()).await?;
let epoch = epochs
.into_iter()
.max()
.context("Failed to get epoch from bootstrap nodes")?;
Ok(epoch)
}
}

Expand Down

0 comments on commit 5c92662

Please sign in to comment.