Skip to content

Commit

Permalink
Add more logs (#4648)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Feb 28, 2024
1 parent 8b7e828 commit cb5cbce
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 122 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ base64 = "0.21"
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "b3f109b" }
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "867207e" }

chrono = { version = "0.4", default-features = false, features = [
"clock",
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-cli/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn setup_logging_and_tracing(
let env_filter = env::var("RUST_LOG")
.map(|_| EnvFilter::from_default_env())
.or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN")))
.context("Failed to set up tracing env filter.")?;
.context("failed to set up tracing env filter")?;
global::set_text_map_propagator(TraceContextPropagator::new());
let registry = tracing_subscriber::registry().with(env_filter);
let event_format = tracing_subscriber::fmt::format()
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn setup_logging_and_tracing(
.with_trace_config(trace_config)
.with_batch_config(batch_config)
.install_batch(opentelemetry::runtime::Tokio)
.context("Failed to initialize OpenTelemetry OTLP exporter.")?;
.context("failed to initialize OpenTelemetry OTLP exporter")?;
registry
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.with(
Expand All @@ -91,7 +91,7 @@ pub fn setup_logging_and_tracing(
.with_ansi(ansi_colors),
)
.try_init()
.context("Failed to set up tracing.")?;
.context("failed to register tracing subscriber")?;
} else {
registry
.with(
Expand All @@ -100,7 +100,7 @@ pub fn setup_logging_and_tracing(
.with_ansi(ansi_colors),
)
.try_init()
.context("Failed to set up tracing.")?;
.context("failed to register tracing subscriber")?;
}
Ok(())
}
6 changes: 3 additions & 3 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ async fn main_impl() -> anyhow::Result<()> {

let command = match CliCommand::parse_cli_args(matches) {
Ok(command) => command,
Err(err) => {
eprintln!("Failed to parse command arguments: {err:?}");
Err(error) => {
eprintln!("failed to parse command line arguments: {error:?}");
std::process::exit(1);
}
};
Expand All @@ -62,7 +62,7 @@ async fn main_impl() -> anyhow::Result<()> {
let build_info = BuildInfo::get();
setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?;
let return_code: i32 = if let Err(err) = command.execute().await {
eprintln!("{} Command failed: {:?}\n", "✘".color(RED_COLOR), err);
eprintln!("{} command failed: {:?}\n", "✘".color(RED_COLOR), err);
1
} else {
0
Expand Down
125 changes: 45 additions & 80 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,8 @@ pub(crate) async fn compute_cluster_change_events(
KeyDiff::Unchanged(_chitchat_id, _previous_max_version, _new_max_version) => {}
// The node has left the cluster, i.e. it is considered dead by the failure detector.
KeyDiff::Removed(chitchat_id, _node_state) => {
let node_event_opt = compute_cluster_change_events_on_removed(
cluster_id,
self_chitchat_id,
chitchat_id,
previous_nodes,
);
let node_event_opt =
compute_cluster_change_events_on_removed(chitchat_id, previous_nodes);

if let Some(node_event) = node_event_opt {
cluster_events.push(node_event);
Expand Down Expand Up @@ -155,17 +151,16 @@ async fn compute_cluster_change_events_on_added(

if previous_node_ref.chitchat_id().generation_id > new_chitchat_id.generation_id {
warn!(
cluster_id=%cluster_id,
rogue_node_id=%new_chitchat_id.node_id,
rogue_node_ip=%new_chitchat_id.gossip_advertise_addr.ip(),
"rogue node `{}` has rejoined the cluster with a lower incarnation ID and will be ignored",
node_id=%new_chitchat_id.node_id,
generation_id=%new_chitchat_id.generation_id,
"node `{}` has rejoined the cluster with a lower generation ID and will be ignored",
new_chitchat_id.node_id
);
return events;
}
info!(
cluster_id=%cluster_id,
node_id=%new_chitchat_id.node_id,
generation_id=%new_chitchat_id.generation_id,
"node `{}` has rejoined the cluster",
new_chitchat_id.node_id
);
Expand All @@ -174,31 +169,28 @@ async fn compute_cluster_change_events_on_added(
if previous_node.is_ready() {
events.push(ClusterChange::Remove(previous_node));
}
} else if !is_self_node {
info!(
cluster_id=%cluster_id,
node_id=%new_chitchat_id.node_id,
"node `{}` has joined the cluster",
new_chitchat_id.node_id
);
}
let Some(new_node) =
try_new_node(cluster_id, new_chitchat_id, new_node_state, is_self_node).await
else {
return events;
};
info!(
node_id=%new_chitchat_id.node_id,
generation_id=%new_chitchat_id.generation_id,
"node `{}` has joined the cluster",
new_chitchat_id.node_id
);
let new_node_id: NodeId = new_node.node_id().into();
previous_nodes.insert(new_node_id, new_node.clone());

if new_node.is_ready() {
if !is_self_node {
info!(
cluster_id=%cluster_id,
node_id=%new_chitchat_id.node_id,
"node `{}` has transitioned to ready state",
new_chitchat_id.node_id
);
}
info!(
node_id=%new_chitchat_id.node_id,
generation_id=%new_chitchat_id.generation_id,
"node `{}` has transitioned to ready state",
new_chitchat_id.node_id
);
warmup_channel(new_node.channel()).await;
events.push(ClusterChange::Add(new_node));
}
Expand Down Expand Up @@ -228,24 +220,20 @@ async fn compute_cluster_change_events_on_updated(
if !previous_node.is_ready() && updated_node.is_ready() {
warmup_channel(updated_node.channel()).await;

if !is_self_node {
info!(
cluster_id=%cluster_id,
node_id=%updated_chitchat_id.node_id,
"node `{}` has transitioned to ready state",
updated_chitchat_id.node_id
);
}
info!(
node_id=%updated_chitchat_id.node_id,
generation_id=%updated_chitchat_id.generation_id,
"node `{}` has transitioned to ready state",
updated_chitchat_id.node_id
);
Some(ClusterChange::Add(updated_node))
} else if previous_node.is_ready() && !updated_node.is_ready() {
if !is_self_node {
info!(
cluster_id=%cluster_id,
node_id=%updated_chitchat_id.node_id,
"node `{}` has transitioned out of ready state",
updated_chitchat_id.node_id
);
}
info!(
node_id=%updated_chitchat_id.node_id,
generation_id=%updated_chitchat_id.generation_id,
"node `{}` has transitioned out of ready state",
updated_chitchat_id.node_id
);
Some(ClusterChange::Remove(updated_node))
} else if previous_node.is_ready() && updated_node.is_ready() {
Some(ClusterChange::Update(updated_node))
Expand All @@ -255,8 +243,6 @@ async fn compute_cluster_change_events_on_updated(
}

fn compute_cluster_change_events_on_removed(
cluster_id: &str,
self_chitchat_id: &ChitchatId,
removed_chitchat_id: &ChitchatId,
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
) -> Option<ClusterChange> {
Expand All @@ -266,14 +252,12 @@ fn compute_cluster_change_events_on_removed(
let previous_node_ref = previous_node_entry.get();

if previous_node_ref.chitchat_id().generation_id == removed_chitchat_id.generation_id {
if self_chitchat_id != removed_chitchat_id {
info!(
cluster_id=%cluster_id,
node_id=%removed_chitchat_id.node_id,
"node `{}` has left the cluster",
removed_chitchat_id.node_id
);
}
info!(
node_id=%removed_chitchat_id.node_id,
generation_id=%removed_chitchat_id.generation_id,
"node `{}` has left the cluster",
removed_chitchat_id.node_id
);
let previous_node = previous_node_entry.remove();

if previous_node.is_ready() {
Expand Down Expand Up @@ -745,21 +729,14 @@ mod tests {

#[tokio::test]
async fn test_compute_cluster_change_events_on_removed() {
let cluster_id = "test-cluster".to_string();
let self_port = 1234;
let self_chitchat_id = ChitchatId::for_local_test(self_port);
{
// Node leaves the cluster but it's missing from the previous live nodes.
let port = 1235;
let removed_chitchat_id = ChitchatId::for_local_test(port);
let mut previous_nodes = BTreeMap::default();

let event_opt = compute_cluster_change_events_on_removed(
&cluster_id,
&self_chitchat_id,
&removed_chitchat_id,
&mut previous_nodes,
);
let event_opt =
compute_cluster_change_events_on_removed(&removed_chitchat_id, &mut previous_nodes);
assert!(event_opt.is_none());
}
{
Expand All @@ -783,12 +760,8 @@ mod tests {
.unwrap();
let mut previous_nodes = BTreeMap::from_iter([(removed_node_id, previous_node)]);

let event_opt = compute_cluster_change_events_on_removed(
&cluster_id,
&self_chitchat_id,
&removed_chitchat_id,
&mut previous_nodes,
);
let event_opt =
compute_cluster_change_events_on_removed(&removed_chitchat_id, &mut previous_nodes);
assert!(event_opt.is_none());
assert!(!previous_nodes.contains_key(&removed_chitchat_id.node_id));
}
Expand All @@ -812,13 +785,9 @@ mod tests {
.unwrap();
let mut previous_nodes = BTreeMap::from_iter([(removed_node_id.clone(), removed_node)]);

let event = compute_cluster_change_events_on_removed(
&cluster_id,
&self_chitchat_id,
&removed_chitchat_id,
&mut previous_nodes,
)
.unwrap();
let event =
compute_cluster_change_events_on_removed(&removed_chitchat_id, &mut previous_nodes)
.unwrap();

let ClusterChange::Remove(node) = event else {
panic!("expected `ClusterChange::Remove` event, got `{:?}`", event);
Expand Down Expand Up @@ -854,12 +823,8 @@ mod tests {
let mut previous_nodes =
BTreeMap::from_iter([(rejoined_node_id.clone(), rejoined_node.clone())]);

let event_opt = compute_cluster_change_events_on_removed(
&cluster_id,
&self_chitchat_id,
&removed_chitchat_id,
&mut previous_nodes,
);
let event_opt =
compute_cluster_change_events_on_removed(&removed_chitchat_id, &mut previous_nodes);
assert!(event_opt.is_none());
assert_eq!(
previous_nodes.get(&rejoined_node_id).unwrap(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ impl Cluster {
info!(
cluster_id=%cluster_id,
node_id=%self_node.node_id,
generation_id=self_node.generation_id.as_u64(),
enabled_services=?self_node.enabled_services,
gossip_listen_addr=%gossip_listen_addr,
gossip_advertise_addr=%self_node.gossip_advertise_addr,
grpc_advertise_addr=%self_node.grpc_advertise_addr,
peer_seed_addrs=%peer_seed_addrs.join(", "),
"Joining cluster."
"joining cluster"
);
// Set up catchup callback and extra liveness predicate functions.
let (catchup_callback_tx, catchup_callback_rx) = watch::channel(());
Expand Down
18 changes: 18 additions & 0 deletions quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@ impl ClusterNode {
&self.inner.enabled_services
}

pub fn is_indexer(&self) -> bool {
self.inner
.enabled_services
.contains(&QuickwitService::Indexer)
}

pub fn is_ingester(&self) -> bool {
self.inner
.enabled_services
.contains(&QuickwitService::Indexer)
}

pub fn is_searcher(&self) -> bool {
self.inner
.enabled_services
.contains(&QuickwitService::Searcher)
}

pub fn grpc_advertise_addr(&self) -> SocketAddr {
self.inner.grpc_advertise_addr
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ fn default_data_dir_uri() -> ConfigValue<Uri, QW_DATA_DIR> {
fn default_advertise_host(listen_ip: &IpAddr) -> anyhow::Result<Host> {
if listen_ip.is_unspecified() {
if let Some((interface_name, private_ip)) = find_private_ip() {
info!(advertise_address=%private_ip, interface_name=%interface_name, "using sniffed advertise address");
info!(advertise_address=%private_ip, interface_name=%interface_name, "using sniffed advertise address `{private_ip}`");
return Ok(Host::from(private_ip));
}
bail!("listen address `{listen_ip}` is unspecified and advertise address is not set");
}
info!(advertise_address=%listen_ip, "using listen address as advertise address");
info!(advertise_address=%listen_ip, "using listen address `{listen_ip}` as advertise address");
Ok(Host::from(*listen_ip))
}

Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::time::Instant;

use anyhow::bail;
use fnv::{FnvHashMap, FnvHashSet};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::Progress;
use quickwit_config::SourceConfig;
use quickwit_ingest::ShardInfos;
Expand Down Expand Up @@ -139,11 +140,10 @@ impl ControlPlaneModel {
.insert_shards(&index_uid, &source_id, shards);
}
}
let elapsed_secs = now.elapsed().as_secs();

info!(
"synced control plane model with metastore in {elapsed_secs} seconds ({num_indexes} \
indexes, {num_sources} sources, {num_shards} shards)",
"synced control plane model with metastore in {} ({num_indexes} indexes, \
{num_sources} sources, {num_shards} shards)",
now.elapsed().pretty_display()
);
Ok(())
}
Expand Down
11 changes: 5 additions & 6 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use async_trait::async_trait;
use fnv::FnvHashMap;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, SpawnContext};
use quickwit_cluster::{Cluster, ListenerHandle};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{Event, EventBroker};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::types::{Position, ShardId, SourceUid};
Expand Down Expand Up @@ -166,15 +167,13 @@ impl Actor for ShardPositionsService {
let elapsed = now.elapsed();
if elapsed > Duration::from_millis(300) {
warn!(
"initializing shard positions took longer than expected: ({:?})ms ({} keys)",
elapsed.as_millis(),
num_keys
"initializing shard positions took longer than expected: {} ({num_keys} keys)",
elapsed.pretty_display(),
);
} else {
info!(
"initializing shard positions took ({:?})ms ({} keys)",
elapsed.as_millis(),
num_keys
"initialized shard positions in {} ({num_keys} keys)",
elapsed.pretty_display(),
);
}
Ok(())
Expand Down
Loading

0 comments on commit cb5cbce

Please sign in to comment.