Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification ttl #227

Merged
merged 12 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions notification-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions notification-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tokio-postgres = "0.7"
tonic = {version = "0.10", features = ["tls", "tls-roots"]} # Use system trust roots.
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3" }
chrono = "0.4.38"

[dev-dependencies]
async-trait = "0.1"
Expand Down
30 changes: 29 additions & 1 deletion notification-server/src/bin/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum_prometheus::{
};
use backoff::{future::retry, ExponentialBackoff};
use clap::Parser;
use chrono::Utc;
use concordium_rust_sdk::{
types::AbsoluteBlockHeight,
v2::{Client, Endpoint, FinalizedBlockInfo, FinalizedBlocksStream},
Expand Down Expand Up @@ -98,6 +99,13 @@ struct Args {
env = "NOTIFICATION_SERVER_PROMETHEUS_ADDRESS"
)]
listen_address: Option<std::net::SocketAddr>,
#[arg(
long = "notification-ttl-mins",
default_value_t = 60,
help = "This variable defines the maximum allowable time (in minutes) after which a notification is no longer being emitted.",
env = "NOTIFICATION_SERVER_NOTIFICATION_TTL_MIN"
)]
notification_ttl_min: u64,
}

const DATABASE_RETRY_DELAY: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -346,6 +354,26 @@ async fn traverse_chain(
processed_height
}

async fn catch_up(concordium_client: &mut Client, current_height: AbsoluteBlockHeight, notification_ttl: Duration) -> anyhow::Result<AbsoluteBlockHeight> {
let current_block_height = concordium_client
.get_consensus_info()
.await?
.last_finalized_block_height;
let lower_bound = AbsoluteBlockHeight {
lassemand marked this conversation as resolved.
Show resolved Hide resolved
// We are not fast than 2 sec per block, hence this should be conservative enough
height: current_block_height.height - notification_ttl.as_secs(),
};
let lower_bound_time: chrono::DateTime<Utc> = Utc::now() - notification_ttl;
let time_ago_block = concordium_client.find_first_finalized_block_no_earlier_than(lower_bound.., lower_bound_time).await?;

if &time_ago_block.block_height > &current_height {
lassemand marked this conversation as resolved.
Show resolved Hide resolved
info!("Skipping {} blocks", time_ago_block.block_height.height - current_height.height);
counter!("block.process_skipped").increment(time_ago_block.block_height.height - current_height.height);
return Ok(time_ago_block.block_height);
}
Ok(current_height)
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
dotenv().ok();
Expand Down Expand Up @@ -414,7 +442,7 @@ async fn main() -> anyhow::Result<()> {
.await
.context("Failed to get processed block height")?
{
height
catch_up(&mut concordium_client, height, Duration::from_secs(args.notification_ttl_min * 60)).await?
lassemand marked this conversation as resolved.
Show resolved Hide resolved
} else {
concordium_client
.get_consensus_info()
Expand Down
Loading