Skip to content

Commit

Permalink
logging macros
Browse files Browse the repository at this point in the history
  • Loading branch information
makemake-kbo committed Feb 26, 2024
1 parent a4a18ce commit 3e9d280
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 63 deletions.
5 changes: 3 additions & 2 deletions src/admin/accept.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::log_info;
use http_body_util::Full;
use hyper::{
body::Bytes,
Expand Down Expand Up @@ -154,7 +155,7 @@ pub async fn accept_admin_request(
};

// Reconstruct the TX as a normal json rpc request
println!("\x1b[35mInfo:\x1b[0m JWT claims: {:?}", token);
log_info!("JWT claims: {:?}", token);

tx = json!({
"id": token.claims.id,
Expand All @@ -168,7 +169,7 @@ pub async fn accept_admin_request(
let time = Instant::now();
let response = forward_body(tx, &rpc_list_rwlock, &poverty_list_rwlock, cache, config).await;
let time = time.elapsed();
println!("\x1b[35mInfo:\x1b[0m Request time: {:?}", time);
log_info!("Request time: {:?}", time);

response
}
Expand Down
5 changes: 3 additions & 2 deletions src/admin/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::{
use sled::Db;

use crate::{
log_info,
admin::accept::accept_admin_request,
Rpc,
Settings,
Expand Down Expand Up @@ -66,11 +67,11 @@ pub async fn listen_for_admin_requests(

// Create a listener and bind to it
let listener = TcpListener::bind(address).await?;
println!("\x1b[35mInfo:\x1b[0m Bound admin to: {}", address);
log_info!("Bound admin to: {}", address);

loop {
let (stream, socketaddr) = listener.accept().await?;
println!("\x1b[35mInfo:\x1b[0m Admin connection from: {}", socketaddr);
log_info!("Admin connection from: {}", socketaddr);

// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
Expand Down
17 changes: 10 additions & 7 deletions src/balancer/accept_http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{
log_info,
log_wrn,
log_err,
balancer::{
format::{
incoming_to_value,
Expand Down Expand Up @@ -164,7 +167,7 @@ macro_rules! accept {
.with_upgrades()
.await
{
println!("\x1b[31mErr:\x1b[0m Error serving connection: {:?}", err);
log_err!("Error serving connection: {:?}", err);
}
};
}
Expand Down Expand Up @@ -207,7 +210,7 @@ macro_rules! get_response {
let mut rpc_list = $rpc_list_rwlock.write().unwrap();
(rpc, $rpc_position) = pick(&mut rpc_list);
}
println!("\x1b[35mInfo:\x1b[0m Forwarding to: {}", rpc.url);
log_info!("Forwarding to: {}", rpc.url);

// Check if we have any RPCs in the list, if not return error
if $rpc_position == None {
Expand All @@ -228,7 +231,7 @@ macro_rules! get_response {
break;
},
Err(_) => {
println!("\x1b[93mWrn:\x1b[0m An RPC request has timed out, picking new RPC and retrying.");
log_wrn!("\x1b[93mWrn:\x1b[0m An RPC request has timed out, picking new RPC and retrying.");
rpc.update_latency($ttl as f64);
retries += 1;
},
Expand Down Expand Up @@ -360,7 +363,7 @@ pub async fn accept_request(
) -> Result<hyper::Response<Full<Bytes>>, Infallible> {
// Check if the request is a websocket upgrade request.
if is_upgrade_request(&tx) {
println!("\x1b[35mInfo:\x1b[0m Received WS upgrade request");
log_info!("Received WS upgrade request");

if !connection_params.config.read().unwrap().is_ws {
return rpc_response!(
Expand All @@ -374,7 +377,7 @@ pub async fn accept_request(
let (response, websocket) = match upgrade(&mut tx, None) {
Ok((response, websocket)) => (response, websocket),
Err(e) => {
println!("\x1b[31mErr:\x1b[0m Websocket upgrade error: {e}");
log_err!("Websocket upgrade error: {}", e);
return rpc_response!(500, Full::new(Bytes::from(
"{code:-32004, message:\"error: Websocket upgrade error! Try again later...\"}"
.to_string(),
Expand All @@ -400,7 +403,7 @@ pub async fn accept_request(
)
.await
{
println!("\x1b[31mErr:\x1b[0m Websocket connection error: {e}");
log_err!("Websocket connection error: {}", e);
}
});

Expand Down Expand Up @@ -437,7 +440,7 @@ pub async fn accept_request(
)
.await;
let time = time.elapsed();
println!("\x1b[35mInfo:\x1b[0m Request time: {:?}", time);
log_info!("Request time: {:?}", time);

// `rpc_position` is an Option<> that either contains the index of the RPC
// we forwarded our request to, or is None if the result was cached.
Expand Down
2 changes: 1 addition & 1 deletion src/balancer/selection/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn pick(list: &mut Vec<Rpc>) -> (Rpc, Option<usize>) {
}

// Sorting algo
pub fn argsort(data: &Vec<Rpc>) -> Vec<usize> {
pub fn argsort(data: &[Rpc]) -> Vec<usize> {
let mut indices = (0..data.len()).collect::<Vec<usize>>();

// Use sort_by_cached_key with a closure that compares latency
Expand Down
12 changes: 8 additions & 4 deletions src/config/cache_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::config::system::{
use crate::{
config::system::{
TAGLINE,
VERSION_STR,
},
log_info,
log_err,
};
use sled::Db;
use std::sync::Arc;
Expand All @@ -11,7 +15,7 @@ pub fn setup_data(cache: Arc<Db>) {
VERSION_STR, TAGLINE
);

println!("\x1b[35mInfo:\x1b[0m Starting {}", VERSION_STR);
log_info!("Starting {}", VERSION_STR);

// Insert kv pair `blutgang_is_lb` `true` to know what we're interacting with
// `blutgang_is_lb` is cached as a blake3 cache
Expand Down Expand Up @@ -39,14 +43,14 @@ pub fn setup_data(cache: Arc<Db>) {
if cfg!(feature = "xxhash") {
let _ = cache.insert(b"xxhash", b"true");
if cache.get(b"blake3").unwrap().is_some() {
println!("\x1b[31mErr:\x1b[0m Blutgang has detected that your DB is using blake3 while we're currently using xxhash! \
log_err!("Blutgang has detected that your DB is using blake3 while we're currently using xxhash! \
Please remove all cache entries and try again.");
println!("If you believe this is an error, please open a pull request!");
}
} else {
let _ = cache.insert(b"blake3", b"true");
if cache.get(b"xxhash").unwrap().is_some() {
println!("\x1b[31mErr:\x1b[0m Blutgang has detected that your DB is using xxhash while we're currently using blake3! \
log_err!("Blutgang has detected that your DB is using xxhash while we're currently using blake3! \
Please remove all cache entries and try again.");
println!("If you believe this is an error, please open a pull request!");
}
Expand Down
11 changes: 7 additions & 4 deletions src/config/setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::config::error::ConfigError;
use crate::Rpc;
use crate::{
Rpc,
log_err,
config::error::ConfigError,
};
use std::time::Instant;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -50,7 +53,7 @@ pub async fn sort_by_latency(
) -> Result<Vec<Rpc>, ConfigError> {
// Return empty vec if we dont supply any RPCs
if rpc_list.is_empty() {
println!("\x1b[31mErr:\x1b[0m No RPCs supplied!");
log_err!("No RPCs supplied!");
return Ok(Vec::new());
}

Expand All @@ -73,7 +76,7 @@ pub async fn sort_by_latency(
let rpc = match rpc {
StartingLatencyResp::Ok(rax) => rax,
StartingLatencyResp::Error(e) => {
println!("\x1b[31mErr:\x1b[0m {}", e);
log_err!("{}", e);
continue;
}
};
Expand Down
32 changes: 31 additions & 1 deletion src/config/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,35 @@ pub const WS_SUB_MANAGER_ID: u32 = 2;
pub const MAGIC: u32 = 0xb153;

// Version consts, dont impact functionality
pub const VERSION_STR: &str = "Blutgang 0.3.0 Garreg Mach";
pub const VERSION_STR: &str = "Blutgang 0.3.1 Garreg Mach";
pub const TAGLINE: &str = "`Now there's a way forward.`";

#[macro_export]
macro_rules! log_info {
($fmt:expr, $($arg:tt)*) => {
println!(concat!("\x1b[35mInfo:\x1b[0m ", $fmt), $($arg)*)
};
($fmt:expr) => {
println!(concat!("\x1b[35mInfo:\x1b[0m ", $fmt))
};
}

#[macro_export]
macro_rules! log_wrn {
($fmt:expr, $($arg:tt)*) => {
println!(concat!("\x1b[93mWrn:\x1b[0m ", $fmt), $($arg)*)
};
($fmt:expr) => {
println!(concat!("\x1b[93mWrn:\x1b[0m ", $fmt))
};
}

#[macro_export]
macro_rules! log_err {
($fmt:expr, $($arg:tt)*) => {
println!(concat!("\x1b[31mErr:\x1b[0m ", $fmt), $($arg)*)
};
($fmt:expr) => {
println!(concat!("\x1b[31mErr:\x1b[0m ", $fmt))
};
}
14 changes: 8 additions & 6 deletions src/config/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{
log_wrn,
log_info,
config::setup::sort_by_latency,
Rpc,
};
Expand Down Expand Up @@ -101,11 +103,11 @@ impl Settings {
};

if let Some(file) = file {
println!("\x1b[35mInfo:\x1b[0m Using config file at {}", path);
log_info!("Using config file at {}", path);
return Settings::create_from_file(file).await;
}

println!("\x1b[35mInfo:\x1b[0m Using command line arguments for settings...");
log_info!("Using command line arguments for settings...");
Settings::create_from_matches(matches)
}

Expand Down Expand Up @@ -178,7 +180,7 @@ impl Settings {
.expect("\x1b[31mErr:\x1b[0m Could not parse ttl as int!")
as u64;
if expected_block_time == 0 {
println!("\x1b[93mWrn:\x1b[0m expected_block_time is 0, turning off WS and health checks!");
log_wrn!("Expected_block_time is 0, turning off WS and health checks!");
is_ws = false;
} else {
// This is to account for block propagation/execution/whatever delay
Expand Down Expand Up @@ -323,9 +325,9 @@ impl Settings {
}

if !is_ws {
println!("\x1b[93mWrn:\x1b[0m WebSocket endpoints not present for all nodes, or newHeads_ttl is 0.");
println!(
"\x1b[93mWrn:\x1b[0m Disabling WS only-features. Please check docs for more info."
log_wrn!("WebSocket endpoints not present for all nodes, or newHeads_ttl is 0.");
log_wrn!(
"Disabling WS only-features. Please check docs for more info."
)
}

Expand Down
14 changes: 8 additions & 6 deletions src/health/check.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::IncomingResponse;
use crate::SubscriptionData;
use crate::{
log_info,
IncomingResponse,
SubscriptionData,
log_wrn,
health::{
error::HealthError,
safe_block::{
Expand Down Expand Up @@ -184,8 +186,8 @@ fn make_poverty(
if head.reported_head < highest_head {
// Mark the RPC as erroring
rpc_list_guard[head.rpc_list_index].status.is_erroring = true;
println!(
"\x1b[93mWrn:\x1b[0m {} is falling behind! Removing froma active RPC pool.",
log_wrn!(
"{} is falling behind! Removing froma active RPC pool.",
rpc_list_guard[head.rpc_list_index].url
);

Expand Down Expand Up @@ -215,8 +217,8 @@ fn escape_poverty(
if head_result.reported_head >= agreed_head {
let mut rpc = poverty_list_guard[head_result.rpc_list_index].clone();
rpc.status.is_erroring = false;
println!(
"\x1b[35mInfo:\x1b[0m {} is following the head again! Added to active RPC pool.",
log_info!(
"{} is following the head again! Added to active RPC pool.",
rpc.url
);

Expand Down
11 changes: 8 additions & 3 deletions src/health/head_cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use crate::{
log_info,
log_wrn,
};

use std::{
collections::BTreeMap,
sync::{
Expand Down Expand Up @@ -32,15 +37,15 @@ pub async fn manage_cache(
// that means that the chain has experienced a reorg and that we should
// remove everything from the last block to the `new_block`
if new_block <= block_number {
println!("\x1b[93mWrn:\x1b[0m Reorg detected!\nRemoving stale entries from the cache.");
log_wrn!("Reorg detected!\nRemoving stale entries from the cache.");
handle_reorg(head_cache, block_number, new_block, cache)?;
}

// Check if finalized_stream has changed
if last_finalized != *finalized_rx.borrow() {
last_finalized = *finalized_rx.borrow();
println!(
"\x1b[35mInfo:\x1b[0m New finalized block!\nRemoving stale entries from the cache."
log_info!(
"New finalized block!\nRemoving stale entries from the cache."
);
// Remove stale entries from the head_cache
remove_stale(head_cache, last_finalized)?;
Expand Down
13 changes: 8 additions & 5 deletions src/health/safe_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{
log_info,
log_wrn,
log_err,
balancer::processing::CacheArgs,
config::system::WS_HEALTH_CHECK_USER_ID,
rpc::{
Expand Down Expand Up @@ -196,7 +199,7 @@ pub async fn subscribe_to_new_heads(
let user_id = WS_HEALTH_CHECK_USER_ID;

// Add the user to the sink map
println!("\x1b[35mInfo:\x1b[0m Adding user {} to sink map", user_id);
log_info!("Adding user {} to sink map", user_id);
let user_data = tx.clone();
sub_data.add_user(user_id, user_data);

Expand All @@ -213,7 +216,7 @@ pub async fn subscribe_to_new_heads(
let a = hex_to_decimal(sub["params"]["result"]["number"].as_str().unwrap())
.unwrap();
subscription_id = sub["params"]["subscription"].as_str().unwrap().to_owned();
println!("\x1b[35mInfo:\x1b[0m New chain head: {}", a);
log_info!("New chain head: {}", a);
let _ = blocknum_tx.send(a);
nn_rwlock.latest = a;
}
Expand All @@ -229,11 +232,11 @@ pub async fn subscribe_to_new_heads(
nn_rwlock.latest = 0;
incoming_tx.send(WsconnMessage::Reconnect()).unwrap();
}
println!("\x1b[93mWrn:\x1b[0m Timeout in newHeads subscription, possible connection failiure or missed block.");
log_wrn!("Timeout in newHeads subscription, possible connection failiure or missed block.");
let node_id = match sub_data.get_node_from_id(&subscription_id) {
Some(node_id) => node_id,
None => {
println!("\x1b[31mErr:\x1b[0m Failed to get some failed node subscription IDs! Subscriptions might be silently dropped!");
log_err!("Failed to get some failed node subscription IDs! Subscriptions might be silently dropped!");
continue;
},
};
Expand All @@ -246,7 +249,7 @@ pub async fn subscribe_to_new_heads(
.await
{
Ok(_) => {}
Err(err) => println!("{}", err),
Err(err) => log_err!("{}", err),
};
}
}
Expand Down
Loading

0 comments on commit 3e9d280

Please sign in to comment.