Skip to content

Commit

Permalink
Wait for counterparty state machine update in beefy consensus stream (#…
Browse files Browse the repository at this point in the history
…33)

* submit beefy updates when corresponding state machine is emitted

* minimum update interval

* sync inside stream

* wip

* refactor messaging and consensus tasks

* fix
  • Loading branch information
Wizdave97 authored Oct 19, 2023
1 parent f060208 commit 83c239e
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 238 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 92 additions & 54 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,113 @@

//! Consensus message relay

use anyhow::anyhow;
use futures::StreamExt;
use ismp::messaging::Message;
use ismp::messaging::{ConsensusMessage, Message};
use tesseract_primitives::{reconnect_with_exponential_back_off, IsmpHost, IsmpProvider};
/// Relays [`ConsensusMessage`] updates.
pub async fn relay<A, B>(mut chain_a: A, mut chain_b: B) -> Result<(), anyhow::Error>
pub async fn relay<A, B>(chain_a: A, chain_b: B) -> Result<(), anyhow::Error>
where
A: IsmpHost + IsmpProvider + 'static,
B: IsmpHost + IsmpProvider + 'static,
{
let mut consensus_a = chain_a.consensus_notification(chain_b.clone()).await?;
let mut consensus_b = chain_b.consensus_notification(chain_a.clone()).await?;
let task_a = tokio::spawn({
let mut chain_a = chain_a.clone();
let mut chain_b = chain_b.clone();
async move {
let mut consensus_stream = chain_a
.consensus_notification(chain_b.clone())
.await
.expect("Failed to create consensus stream");
loop {
let item = consensus_stream.next().await;
let res = handle_notification(&mut chain_a, &mut chain_b, item).await;

loop {
tokio::select! {
result = consensus_a.next() => {
match result {
None => {
log::info!("RESTARTING {}-{} consensus task", chain_a.name(), chain_b.name());
reconnect_with_exponential_back_off(&mut chain_a, &chain_b, 1000).await?;
reconnect_with_exponential_back_off(&mut chain_b, &chain_a, 1000).await?;
consensus_a = chain_a.consensus_notification(chain_b.clone()).await?;
consensus_b = chain_b.consensus_notification(chain_a.clone()).await?;
log::info!("RESTARTING completed");
continue
if let Err(_) = res {
log::info!("RESTARTING {} consensus task", chain_a.name());
if let Err(_) =
reconnect_with_exponential_back_off(&mut chain_a, &mut chain_b, 1000).await
{
panic!("Fatal Error, failed to reconnect")
}
Some(Ok(consensus_message)) => {
log::info!(
target: "tesseract",
"🛰️ Transmitting consensus update message from {} to {}",
chain_a.name(), chain_b.name()
);
let _ = chain_b.submit(vec![Message::Consensus(consensus_message)]).await;
},
Some(Err(e)) => {
log::error!(
target: "tesseract",
"{} encountered an error in the consensus stream: {e}", chain_a.name()
)
if let Err(_) =
reconnect_with_exponential_back_off(&mut chain_b, &mut chain_a, 1000).await
{
panic!("Fatal Error, failed to reconnect")
}
consensus_stream = chain_a
.consensus_notification(chain_b.clone())
.await
.expect("Failed to create consensus stream");
log::info!("RESTARTING completed");
}
}
}
});

result = consensus_b.next() => {
match result {
None => {
log::info!("RESTARTING {}-{} consensus task", chain_a.name(), chain_b.name());
reconnect_with_exponential_back_off(&mut chain_b, &chain_a, 1000).await?;
reconnect_with_exponential_back_off(&mut chain_a, &chain_b, 1000).await?;
consensus_b = chain_b.consensus_notification(chain_a.clone()).await?;
consensus_a = chain_a.consensus_notification(chain_b.clone()).await?;
log::info!("RESTARTING completed");
continue
},
Some(Ok(consensus_message)) => {
log::info!(
target: "tesseract",
"🛰️ Transmitting consensus update message from {} to {}",
chain_b.name(), chain_a.name()
);
let _ = chain_a.submit(vec![Message::Consensus(consensus_message)]).await;
},
Some(Err(e)) => {
log::error!(
target: "tesseract",
"{} encountered an error in the consensus stream: {e}", chain_b.name()
)
let task_b = tokio::spawn({
let mut chain_a = chain_a.clone();
let mut chain_b = chain_b.clone();
async move {
let mut consensus_stream = chain_b
.consensus_notification(chain_a.clone())
.await
.expect("Failed to create consensus stream");
loop {
let item = consensus_stream.next().await;
let res = handle_notification(&chain_b, &chain_a, item).await;
if let Err(_) = res {
log::info!("RESTARTING {} consensus task", chain_b.name());
if let Err(_) =
reconnect_with_exponential_back_off(&mut chain_a, &mut chain_b, 1000).await
{
panic!("Fatal Error, failed to reconnect")
}
if let Err(_) =
reconnect_with_exponential_back_off(&mut chain_b, &mut chain_a, 1000).await
{
panic!("Fatal Error, failed to reconnect")
}
consensus_stream = chain_b
.consensus_notification(chain_a.clone())
.await
.expect("Failed to create consensus stream");
log::info!("RESTARTING completed");
}
}
}
}
});
let _ = futures::future::join_all(vec![task_a, task_b]).await;
Ok(())
}

async fn handle_notification<A, B>(
chain_a: &A,
chain_b: &B,
update: Option<Result<ConsensusMessage, anyhow::Error>>,
) -> Result<(), anyhow::Error>
where
A: IsmpHost + IsmpProvider,
B: IsmpHost + IsmpProvider,
{
let res = match update {
None => Err(anyhow!("Stream Returned None")),
Some(Ok(consensus_message)) => {
log::info!(
target: "tesseract",
"🛰️ Transmitting consensus update message from {} to {}",
chain_a.name(), chain_b.name()
);
let _ = chain_b.submit(vec![Message::Consensus(consensus_message)]).await;
Ok(())
},
Some(Err(e)) => {
log::error!(
target: "tesseract",
"{} encountered an error in the consensus stream: {e}", chain_a.name()
);
Err(e)
},
};
res
}
4 changes: 3 additions & 1 deletion ethereum/sync-committee/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) 2023 Polytope Labs.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -139,7 +141,7 @@ impl IsmpHost for SyncCommitteeHost {
let delay = counterparty_timestamp - last_consensus_update;
// If onchain timestamp has not progressed sleep
if delay < challenge_period {
tokio::time::sleep(delay).await;
tokio::time::sleep(delay + Duration::from_secs(12 * 10)).await;
}

let update = consensus_notification(&client, counterparty.clone(), checkpoint)
Expand Down
7 changes: 2 additions & 5 deletions ethereum/sync-committee/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use ismp::{
messaging::{ConsensusMessage, Message},
};
use primitives::{
consensus_types::Checkpoint,
constants::Root,
util::{compute_sync_committee_period, compute_sync_committee_period_at_slot},
consensus_types::Checkpoint, constants::Root, util::compute_sync_committee_period,
};
use std::collections::BTreeMap;
use tesseract_primitives::{IsmpHost, IsmpProvider};
Expand Down Expand Up @@ -39,8 +37,7 @@ where
consensus_state_id: client.consensus_state_id,
};
// Do a sync check before returning any updates
let state_period =
compute_sync_committee_period_at_slot(light_client_state.finalized_header.slot);
let state_period = light_client_state.state_period;

let checkpoint_period = compute_sync_committee_period(checkpoint.epoch);
if !(state_period..=(state_period + 1)).contains(&checkpoint_period) {
Expand Down
Loading

0 comments on commit 83c239e

Please sign in to comment.