Skip to content

Commit

Permalink
Added a bash script to check the log format. (#4150)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Nov 16, 2023
1 parent 7a29318 commit 26350df
Show file tree
Hide file tree
Showing 58 changed files with 164 additions and 132 deletions.
19 changes: 15 additions & 4 deletions CODE_STYLE.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,26 @@ These assert will not be part of the release binary and won't hurt the execution

**example needed**

## Errors
## Errors and log messages

Error messages should be concise, lowercase (except proper names), and without trailing punctuation.
Error and log messages follow the same format. They should be concise, lowercase (except proper names), and without trailing punctuation.

### Examples
As a loose rule, where it does not hurt readability, log messages should rely on `tracing`
structured logging instead of templating.

In other words, prefer:
`warn!(remaining=remaining_attempts, "trubulizor rpc plane retry failed")`
to
`warn!("trubulizor rpc plane retry failed ({remaining_attempts} attempts remaining)")`

### Error Examples
- "failed to start actor runtimes"
- "cannot join PostgreSQL URI {} with path {:?}"
- "could not find split metadata in Metastore {}"
- "unkown output format {:?}"
- "unknown output format {:?}"

### Log examples


## Comments

Expand Down
2 changes: 2 additions & 0 deletions quickwit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ fmt:
@(rustup toolchain list | ( ! grep -q nightly && echo "Toolchain 'nightly' is not installed. Please install using 'rustup toolchain install nightly'.") ) || cargo +nightly fmt
@echo "Checking license headers"
@bash scripts/check_license_headers.sh
@echo "Checking log format"
@bash scripts/check_log_format.sh

fix:
@echo "Running cargo clippy --fix"
Expand Down
13 changes: 6 additions & 7 deletions quickwit/quickwit-actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ pub use observation::{Observation, ObservationType};
use quickwit_common::KillSwitch;
pub use spawn_builder::SpawnContext;
use thiserror::Error;
use tracing::info;
use tracing::log::warn;
use tracing::{info, warn};
pub use universe::Universe;

pub use self::actor_context::ActorContext;
Expand Down Expand Up @@ -93,19 +92,19 @@ fn heartbeat_from_env_or_default() -> Duration {
match std::env::var("QW_ACTOR_HEARTBEAT_SECS") {
Ok(actor_hearbeat_secs_str) => {
if let Ok(actor_hearbeat_secs) = actor_hearbeat_secs_str.parse::<NonZeroU64>() {
info!("Set the actor heartbeat to {actor_hearbeat_secs} seconds.");
info!("set the actor heartbeat to {actor_hearbeat_secs} seconds");
return Duration::from_secs(actor_hearbeat_secs.get());
} else {
warn!(
"Failed to parse `QW_ACTOR_HEARTBEAT_SECS={actor_hearbeat_secs_str}` in \
seconds > 0, using default heartbeat (30 seconds)."
"failed to parse `QW_ACTOR_HEARTBEAT_SECS={actor_hearbeat_secs_str}` in \
seconds > 0, using default heartbeat (30 seconds)"
);
};
}
Err(std::env::VarError::NotUnicode(os_str)) => {
warn!(
"Failed to parse `QW_ACTOR_HEARTBEAT_SECS={os_str:?}` in a valid unicode string, \
using default heartbeat (30 seconds)."
"failed to parse `QW_ACTOR_HEARTBEAT_SECS={os_str:?}` in a valid unicode string, \
using default heartbeat (30 seconds)"
);
}
Err(std::env::VarError::NotPresent) => {}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub async fn jemalloc_metrics_loop() -> tikv_jemalloc_ctl::Result<()> {
pub fn start_jemalloc_metrics_loop() {
tokio::task::spawn(async {
if let Err(jemalloc_metrics_err) = jemalloc_metrics_loop().await {
error!(err=?jemalloc_metrics_err, "Failed to gather metrics from jemalloc.");
error!(err=?jemalloc_metrics_err, "failed to gather metrics from jemalloc");
}
});
}
6 changes: 3 additions & 3 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async fn load_node_config(config_uri: &Uri) -> anyhow::Result<NodeConfig> {
let config = NodeConfig::load(config_format, config_content.as_slice())
.await
.with_context(|| format!("failed to parse node config `{config_uri}`"))?;
info!(config_uri=%config_uri, config=?config, "Loaded node config.");
info!(config_uri=%config_uri, config=?config, "loaded node config");
Ok(config)
}

Expand Down Expand Up @@ -402,10 +402,10 @@ pub mod busy_detector {

let suppressed = SUPPRESSED_DEBUG_COUNT.swap(0, Ordering::Relaxed);
if suppressed == 0 {
debug!("Thread wasn't parked for {delta}µs, is the runtime too busy?");
debug!("thread wasn't parked for {delta}µs, is the runtime too busy?");
} else {
debug!(
"Thread wasn't parked for {delta}µs, is the runtime too busy? ({suppressed} \
"thread wasn't parked for {delta}µs, is the runtime too busy? ({suppressed} \
similar messages suppressed)"
);
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl RunCliCommand {
crate::busy_detector::set_enabled(true);

if let Some(services) = &self.services {
tracing::info!(services = %services.iter().join(", "), "Setting services from override.");
tracing::info!(services = %services.iter().join(", "), "setting services from override");
node_config.enabled_services = services.clone();
}
let telemetry_handle_opt =
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,12 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("Merge pipeline has no more ongoing merges, Exiting.");
info!("merge pipeline has no more ongoing merges, exiting");
break;
}

if pipeline_handle.state().is_exit() {
info!("Merge pipeline has exited, Exiting.");
info!("merge pipeline has exited, exiting");
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl TestEnv {
};
tokio::spawn(async move {
if let Err(error) = run_command.execute().await {
error!(err=?error, "Failed to start a quickwit server.");
error!(err=?error, "failed to start a quickwit server");
}
});
wait_for_server_ready(([127, 0, 0, 1], self.rest_listen_port).into()).await?;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn parse_indexing_cpu_capacity(node_state: &NodeState) -> CpuCapacity {
if let Ok(indexing_capacity) = CpuCapacity::from_str(indexing_capacity_str) {
indexing_capacity
} else {
error!(indexing_capacity=?indexing_capacity_str, "Received an unparseable indexing capacity from node.");
error!(indexing_capacity=?indexing_capacity_str, "received an unparseable indexing capacity from node");
CpuCapacity::zero()
}
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl RuntimeType {
.get_or_init(|| {
#[cfg(any(test, feature = "testsuite"))]
{
tracing::warn!("Starting Tokio actor runtimes for tests.");
tracing::warn!("starting Tokio actor runtimes for tests");
start_runtimes(RuntimesConfig::light_for_tests())
}
#[cfg(not(any(test, feature = "testsuite")))]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where T: Send + 'static
Ok(Some(message)) => Some((message, streaming)),
Ok(None) => None,
Err(error) => {
warn!(error=?error, "gRPC transport error.");
warn!(error=?error, "gRPC transport error");
None
}
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl ConfigFormat {
serde_json::from_reader(StripComments::new(payload))?;
let version_value = json_value.get_mut("version").context("missing version")?;
if let Some(version_number) = version_value.as_u64() {
warn!("`version` is supposed to be a string.");
warn!(version_value=?version_value, "`version` is supposed to be a string");
*version_value = JsonValue::String(version_number.to_string());
}
serde_json::from_value(json_value).context("failed to read JSON file")
Expand All @@ -211,7 +211,7 @@ impl ConfigFormat {
toml::from_str(payload_str).context("failed to read TOML file")?;
let version_value = toml_value.get_mut("version").context("missing version")?;
if let Some(version_number) = version_value.as_integer() {
warn!("`version` is supposed to be a string.");
warn!(version_value=?version_value, "`version` is supposed to be a string");
*version_value = toml::Value::String(version_number.to_string());
let reserialized = toml::to_string(version_value)
.context("failed to reserialize toml config")?;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl NodeConfig {
for peer_seed in &self.peer_seeds {
let peer_seed_addr = HostAddr::parse_with_default_port(peer_seed, default_gossip_port)?;
if let Err(error) = peer_seed_addr.resolve().await {
warn!(peer_seed = %peer_seed_addr, error = ?error, "Failed to resolve peer seed address.");
warn!(peer_seed = %peer_seed_addr, error = ?error, "failed to resolve peer seed address");
continue;
}
peer_seed_addrs.push(peer_seed_addr.to_string())
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn default_node_id() -> ConfigValue<String, QW_NODE_ID> {
Ok(short_hostname) => short_hostname,
Err(error) => {
let node_id = new_coolid("node");
warn!(error=?error, "Failed to determine hostname or hostname was invalid, falling back to random node ID `{}`.", node_id);
warn!(error=?error, "failed to determine hostname or hostname was invalid, falling back to random node ID `{}`", node_id);
node_id
}
};
Expand Down Expand Up @@ -100,12 +100,12 @@ fn default_data_dir_uri() -> ConfigValue<Uri, QW_DATA_DIR> {
fn default_advertise_host(listen_ip: &IpAddr) -> anyhow::Result<Host> {
if listen_ip.is_unspecified() {
if let Some((interface_name, private_ip)) = find_private_ip() {
info!(advertise_address=%private_ip, interface_name=%interface_name, "Using sniffed advertise address.");
info!(advertise_address=%private_ip, interface_name=%interface_name, "using sniffed advertise address");
return Ok(Host::from(private_ip));
}
bail!("listen address `{listen_ip}` is unspecified and advertise address is not set");
}
info!(advertise_address=%listen_ip, "Using listen address as advertise address.");
info!(advertise_address=%listen_ip, "using listen address as advertise address");
Ok(Host::from(*listen_ip))
}

Expand Down Expand Up @@ -301,12 +301,12 @@ fn validate(node_config: &NodeConfig) -> anyhow::Result<()> {

if node_config.cluster_id == DEFAULT_CLUSTER_ID {
warn!(
"Cluster ID is not set, falling back to default value: `{}`.",
"cluster ID is not set, falling back to default value: `{}`",
DEFAULT_CLUSTER_ID
);
}
if node_config.peer_seeds.is_empty() {
warn!("Peer seed list is empty.");
warn!("peer seed list is empty");
}
Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl IndexingScheduler {
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();
let mut indexers: Vec<(String, IndexerNodeInfo)> = self.get_indexers_from_indexer_pool();
if indexers.is_empty() {
warn!("No indexer available, cannot schedule an indexing plan.");
warn!("no indexer available, cannot schedule an indexing plan");
return;
};

Expand Down Expand Up @@ -254,11 +254,11 @@ impl IndexingScheduler {
last_applied_plan.indexing_tasks_per_indexer(),
);
if !indexing_plans_diff.has_same_nodes() {
info!(plans_diff=?indexing_plans_diff, "Running plan and last applied plan node IDs differ: schedule an indexing plan.");
info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan");
self.schedule_indexing_plan_if_needed(model);
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "Running tasks and last applied tasks differ: reapply last plan.");
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
self.apply_physical_indexing_plan(&mut indexers, last_applied_plan.clone());
}
}
Expand All @@ -272,7 +272,7 @@ impl IndexingScheduler {
indexers: &mut [(String, IndexerNodeInfo)],
new_physical_plan: PhysicalIndexingPlan,
) {
debug!("Apply physical indexing plan: {:?}", new_physical_plan);
debug!(new_physical_plan=?new_physical_plan, "apply physical indexing plan");
for (node_id, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() {
// We don't want to block on a slow indexer so we apply this change asynchronously
// TODO not blocking is cool, but we need to make sure there is not accumulation
Expand All @@ -292,7 +292,7 @@ impl IndexingScheduler {
.apply_indexing_plan(ApplyIndexingPlanRequest { indexing_tasks })
.await
{
error!(indexer_node_id=%indexer.0, err=?error, "Error occurred when applying indexing plan to indexer.");
error!(indexer_node_id=%indexer.0, err=?error, "error occurred when applying indexing plan to indexer");
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ pub fn build_physical_indexing_plan(
// TODO this is probably a bad idea to just not overschedule, as having a single index trail
// behind will prevent the log GC.
// A better strategy would probably be to close shard, and start prevent ingestion.
error!("unable to assign all sources in the cluster.");
error!("unable to assign all sources in the cluster");
}

// Convert the new scheduling solution back to a physical plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ async fn delete_splits_marked_for_deletion(
.map(|split| split.split_metadata)
.collect(),
Err(error) => {
error!(error = ?error, "Failed to fetch deletable splits.");
error!(error = ?error, "failed to fetch deletable splits");
break;
}
};
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,11 @@ impl Handler<Spawn> for IndexingPipeline {
if let Some(MetastoreError::NotFound { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
info!(error = ?spawn_error, "could not spawn pipeline, index might have been deleted");
return Err(ActorExitStatus::Success);
}
let retry_delay = wait_duration_before_retry(spawn.retry_count + 1);
error!(error = ?spawn_error, retry_count = spawn.retry_count, retry_delay = ?retry_delay, "Error while spawning indexing pipeline, retrying after some time.");
error!(error = ?spawn_error, retry_count = spawn.retry_count, retry_delay = ?retry_delay, "error while spawning indexing pipeline, retrying after some time");
ctx.schedule_self_msg(
retry_delay,
Spawn {
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,11 @@ impl IndexingService {
)
.await
{
error!(pipeline_id=?new_pipeline_id, err=?error, "Failed to spawn pipeline.");
error!(pipeline_id=?new_pipeline_id, err=?error, "failed to spawn pipeline");
failed_spawning_pipeline_ids.push(new_pipeline_id.clone());
}
} else {
error!(pipeline_id=?new_pipeline_id, "Failed to spawn pipeline: source does not exist.");
error!(pipeline_id=?new_pipeline_id, "failed to spawn pipeline: source does not exist");
failed_spawning_pipeline_ids.push(new_pipeline_id.clone());
}
} else {
Expand Down Expand Up @@ -674,7 +674,7 @@ impl IndexingService {
.queues
.into_iter()
.collect();
debug!(queues=?queues, "List ingest API queues.");
debug!(queues=?queues, "list ingest API queues");

let indexes_metadatas = self
.metastore
Expand All @@ -686,7 +686,7 @@ impl IndexingService {
.into_iter()
.map(|index_metadata| index_metadata.index_id().to_string())
.collect();
debug!(index_ids=?index_ids, "List indexes.");
debug!(index_ids=?index_ids, "list indexes");

let queue_ids_to_delete = queues.difference(&index_ids);

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,11 @@ impl Handler<Spawn> for MergePipeline {
if let Some(MetastoreError::NotFound { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
info!(error = ?spawn_error, "could not spawn pipeline, index might have been deleted");
return Err(ActorExitStatus::Success);
}
let retry_delay = wait_duration_before_retry(spawn.retry_count);
error!(error = ?spawn_error, retry_count = spawn.retry_count, retry_delay = ?retry_delay, "Error while spawning indexing pipeline, retrying after some time.");
error!(error = ?spawn_error, retry_count = spawn.retry_count, retry_delay = ?retry_delay, "error while spawning indexing pipeline, retrying after some time");
ctx.schedule_self_msg(
retry_delay,
Spawn {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl MergePlanner {
// We run smaller merges in priority.
merge_ops.sort_by_cached_key(|merge_op| Reverse(max_merge_ops(merge_op)));
while let Some(merge_operation) = merge_ops.pop() {
info!(merge_operation=?merge_operation, "Planned merge operation.");
info!(merge_operation=?merge_operation, "planned merge operation");
let tracked_merge_operation = self
.ongoing_merge_operations_inventory
.track(merge_operation);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ fn create_packaged_split(
append_to_tag_set(&named_field.name, &terms, &mut tags);
}
Err(tag_extraction_error) => {
warn!(err=?tag_extraction_error, "No field values will be registered in the split metadata.");
warn!(err=?tag_extraction_error, "no field values will be registered in the split metadata");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
let kill_switch = ctx.kill_switch().clone();
let split_ids = batch.split_ids();
if kill_switch.is_dead() {
warn!(split_ids=?split_ids,"Kill switch was activated. Cancelling upload.");
warn!(split_ids=?split_ids,"kill switch was activated, cancelling upload");
return Err(ActorExitStatus::Killed);
}
let metastore = self.metastore.clone();
Expand All @@ -311,7 +311,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
for packaged_split in batch.splits.iter() {
if batch.publish_lock.is_dead() {
// TODO: Remove the junk right away?
info!("Splits' publish lock is dead.");
info!("splits' publish lock is dead");
split_update_sender.discard()?;
return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub async fn start_indexing_service(
storage_resolver: StorageResolver,
event_broker: EventBroker,
) -> anyhow::Result<Mailbox<IndexingService>> {
info!("Starting indexer service.");
info!("starting indexer service");

// Spawn indexing service.
let indexing_service = IndexingService::new(
Expand Down
Loading

0 comments on commit 26350df

Please sign in to comment.