Skip to content

Commit

Permalink
Address smaller review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 17, 2024
1 parent d1885af commit e4f45d1
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 52 deletions.
16 changes: 10 additions & 6 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use quickwit_config::{FileSourceMessageType, FileSourceSqs};
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::SourceUid;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use ulid::Ulid;
Expand Down Expand Up @@ -96,15 +97,17 @@ impl QueueCoordinator {
source_runtime: SourceRuntime,
queue: Arc<dyn Queue>,
message_type: MessageType,
shard_max_age: Option<u32>,
shard_max_age: Option<Duration>,
shard_max_count: Option<u32>,
shard_pruning_interval: Duration,
) -> Self {
Self {
shared_state: QueueSharedState::new(
source_runtime.metastore,
source_runtime.pipeline_id.index_uid.clone(),
source_runtime.pipeline_id.source_id.clone(),
SourceUid {
index_uid: source_runtime.pipeline_id.index_uid.clone(),
source_id: source_runtime.pipeline_id.source_id.clone(),
},
Duration::from_secs(2 * source_runtime.indexing_setting.commit_timeout_secs as u64),
shard_max_age,
shard_max_count,
Expand Down Expand Up @@ -137,11 +140,12 @@ impl QueueCoordinator {
FileSourceMessageType::S3Notification => MessageType::S3Notification,
FileSourceMessageType::RawUri => MessageType::RawUri,
};
let shard_max_age = Duration::from_secs(config.deduplication_window_duration_secs as u64);
Ok(QueueCoordinator::new(
source_runtime,
Arc::new(queue),
message_type,
Some(config.deduplication_window_duration_secs),
Some(shard_max_age),
Some(config.deduplication_window_max_messages),
Duration::from_secs(config.deduplication_cleanup_interval_secs as u64),
))
Expand Down Expand Up @@ -342,8 +346,8 @@ mod tests {
) -> QueueCoordinator {
let pipeline_id = IndexingPipelineId {
node_id: NodeId::from_str("test-node").unwrap(),
index_uid: shared_state.index_uid.clone(),
source_id: shared_state.source_id.clone(),
index_uid: shared_state.source_uid.index_uid.clone(),
source_id: shared_state.source_uid.source_id.clone(),
pipeline_uid: PipelineUid::random(),
};

Expand Down
68 changes: 40 additions & 28 deletions quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_proto::metastore::{
AcquireShardsRequest, MetastoreService, MetastoreServiceClient, OpenShardSubrequest,
OpenShardsRequest, PruneShardsRequest,
};
use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId};
use quickwit_proto::types::{DocMappingUid, Position, ShardId, SourceUid};
use time::OffsetDateTime;
use tracing::{error, info};

Expand All @@ -36,8 +36,7 @@ use super::message::PreProcessedMessage;
#[derive(Clone)]
pub struct QueueSharedState {
metastore: MetastoreServiceClient,
pub index_uid: IndexUid,
pub source_id: String,
pub source_uid: SourceUid,
/// Duration after which the processing of a shard is considered stale and
/// should be reacquired
reacquire_grace_period: Duration,
Expand All @@ -49,44 +48,45 @@ impl QueueSharedState {
/// in the background
pub fn new(
metastore: MetastoreServiceClient,
index_uid: IndexUid,
source_id: String,
source_uid: SourceUid,
reacquire_grace_period: Duration,
max_age: Option<u32>,
max_age: Option<Duration>,
max_count: Option<u32>,
pruning_interval: Duration,
) -> Self {
let cleanup_handle = Arc::new(());
tokio::spawn(Self::run_cleanup_task(
metastore.clone(),
index_uid.clone(),
source_id.clone(),
source_uid.clone(),
max_age,
max_count,
pruning_interval,
cleanup_handle.clone(),
));
Self {
metastore,
index_uid,
source_id,
source_uid,
reacquire_grace_period,
_cleanup_handle: cleanup_handle,
}
}

async fn run_cleanup_task(
metastore: MetastoreServiceClient,
index_uid: IndexUid,
source_id: String,
max_age: Option<u32>,
source_uid: SourceUid,
max_age: Option<Duration>,
max_count: Option<u32>,
pruning_interval: Duration,
owner_handle: Arc<()>,
) {
if max_count.is_none() && max_age.is_none() {
return;
}
let max_age_secs = max_age.map(|duration| duration.as_secs() as u32);
let SourceUid {
index_uid,
source_id,
} = source_uid;
tokio::spawn(async move {
let mut interval = tokio::time::interval(pruning_interval);
loop {
Expand All @@ -98,7 +98,7 @@ impl QueueSharedState {
.prune_shards(PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age,
max_age_secs,
max_count,
})
.await;
Expand All @@ -124,8 +124,8 @@ impl QueueSharedState {
.enumerate()
.map(|(idx, partition_id)| OpenShardSubrequest {
subrequest_id: idx as u32,
index_uid: Some(self.index_uid.clone()),
source_id: self.source_id.clone(),
index_uid: Some(self.source_uid.index_uid.clone()),
source_id: self.source_uid.source_id.clone(),
leader_id: String::new(),
follower_id: None,
shard_id: Some(ShardId::from(partition_id.as_str())),
Expand Down Expand Up @@ -171,8 +171,8 @@ impl QueueSharedState {
let acquire_shard_resp = self
.metastore
.acquire_shards(AcquireShardsRequest {
index_uid: Some(self.index_uid.clone()),
source_id: self.source_id.clone(),
index_uid: Some(self.source_uid.index_uid.clone()),
source_id: self.source_uid.source_id.clone(),
shard_ids: re_acquired_shards,
publish_token: publish_token.to_string(),
})
Expand Down Expand Up @@ -224,6 +224,7 @@ pub mod shared_state_for_tests {
use quickwit_proto::metastore::{
AcquireShardsResponse, MockMetastoreService, OpenShardSubresponse, OpenShardsResponse,
};
use quickwit_proto::types::IndexUid;

use super::*;

Expand Down Expand Up @@ -357,8 +358,10 @@ pub mod shared_state_for_tests {
let metastore = mock_metastore(&metastore_state, None, None);
QueueSharedState {
metastore,
index_uid,
source_id: "test-queue-src".to_string(),
source_uid: SourceUid {
index_uid,
source_id: "test-queue-src".to_string(),
},
reacquire_grace_period: Duration::from_secs(10),
_cleanup_handle: Arc::new(()),
}
Expand All @@ -372,6 +375,7 @@ mod tests {
use std::vec;

use quickwit_common::uri::Uri;
use quickwit_proto::types::IndexUid;
use shared_state_for_tests::mock_metastore;

use super::*;
Expand Down Expand Up @@ -409,8 +413,10 @@ mod tests {

let mut shared_state = QueueSharedState {
metastore,
index_uid,
source_id: "test-sqs-source".to_string(),
source_uid: SourceUid {
index_uid,
source_id: "test-sqs-source".to_string(),
},
reacquire_grace_period: Duration::from_secs(10),
_cleanup_handle: Arc::new(()),
};
Expand Down Expand Up @@ -439,8 +445,10 @@ mod tests {

let mut shared_state = QueueSharedState {
metastore,
index_uid,
source_id: "test-sqs-source".to_string(),
source_uid: SourceUid {
index_uid,
source_id: "test-sqs-source".to_string(),
},
reacquire_grace_period: Duration::from_secs(10),
_cleanup_handle: Arc::new(()),
};
Expand Down Expand Up @@ -469,8 +477,10 @@ mod tests {

let mut shared_state = QueueSharedState {
metastore,
index_uid,
source_id: "test-sqs-source".to_string(),
source_uid: SourceUid {
index_uid,
source_id: "test-sqs-source".to_string(),
},
reacquire_grace_period: Duration::from_secs(10),
_cleanup_handle: Arc::new(()),
};
Expand Down Expand Up @@ -503,8 +513,10 @@ mod tests {
let metastore = mock_metastore(init_state, Some(1), Some(0));
let mut shared_state = QueueSharedState {
metastore,
index_uid,
source_id: "test-sqs-source".to_string(),
source_uid: SourceUid {
index_uid,
source_id: "test-sqs-source".to_string(),
},
reacquire_grace_period: Duration::from_secs(10),
_cleanup_handle: Arc::new(()),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,10 @@ impl Shards {
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
let initial_shard_count = self.shards.len();

if let Some(max_age) = request.max_age {
if let Some(max_age_secs) = request.max_age_secs {
self.shards.retain(|_, shard| {
let limit_timestamp = OffsetDateTime::now_utc().unix_timestamp() - max_age as i64;
let limit_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - max_age_secs as i64;
shard.update_timestamp >= limit_timestamp
});
};
Expand Down Expand Up @@ -643,7 +644,7 @@ mod tests {
let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: None,
max_age_secs: None,
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
Expand All @@ -655,7 +656,7 @@ mod tests {
let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(50),
max_age_secs: Some(50),
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
Expand Down Expand Up @@ -693,7 +694,7 @@ mod tests {
let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(150),
max_age_secs: Some(150),
max_count: None,
};
let MutationOccurred::Yes(response) = shards.prune_shards(request).unwrap() else {
Expand All @@ -705,7 +706,7 @@ mod tests {
let request = PruneShardsRequest {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
max_age: Some(150),
max_age_secs: Some(150),
max_count: None,
};
let MutationOccurred::No(response) = shards.prune_shards(request).unwrap() else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1495,8 +1495,9 @@ impl MetastoreService for PostgresqlMetastore {
const PRUNE_AGE_SHARDS_QUERY: &str = include_str!("queries/shards/prune_age.sql");
const PRUNE_COUNT_SHARDS_QUERY: &str = include_str!("queries/shards/prune_count.sql");

if let Some(max_age) = request.max_age {
let limit_datetime = OffsetDateTime::now_utc() - Duration::from_secs(max_age as u64);
if let Some(max_age_secs) = request.max_age_secs {
let limit_datetime =
OffsetDateTime::now_utc() - Duration::from_secs(max_age_secs as u64);
sqlx::query(PRUNE_AGE_SHARDS_QUERY)
.bind(request.index_uid())
.bind(&request.source_id)
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-metastore/src/tests/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ pub async fn test_metastore_prune_shards<
let prune_index_request = PruneShardsRequest {
index_uid: Some(test_index.index_uid.clone()),
source_id: test_index.source_id.clone(),
max_age: None,
max_age_secs: None,
max_count: None,
};
let response = metastore.prune_shards(prune_index_request).await.unwrap();
Expand All @@ -669,7 +669,7 @@ pub async fn test_metastore_prune_shards<
let prune_index_request = PruneShardsRequest {
index_uid: Some(test_index.index_uid.clone()),
source_id: test_index.source_id.clone(),
max_age: Some(oldest_shard_age - 350),
max_age_secs: Some(oldest_shard_age - 350),
max_count: None,
};
let response = metastore.prune_shards(prune_index_request).await.unwrap();
Expand All @@ -689,7 +689,7 @@ pub async fn test_metastore_prune_shards<
let prune_index_request = PruneShardsRequest {
index_uid: Some(test_index.index_uid.clone()),
source_id: test_index.source_id.clone(),
max_age: None,
max_age_secs: None,
max_count: Some(90),
};
let response = metastore.prune_shards(prune_index_request).await.unwrap();
Expand All @@ -708,7 +708,7 @@ pub async fn test_metastore_prune_shards<
let prune_index_request = PruneShardsRequest {
index_uid: Some(test_index.index_uid.clone()),
source_id: test_index.source_id.clone(),
max_age: Some(oldest_shard_age - 2950),
max_age_secs: Some(oldest_shard_age - 2950),
max_count: Some(80),
};
let response = metastore.prune_shards(prune_index_request).await.unwrap();
Expand All @@ -723,7 +723,7 @@ pub async fn test_metastore_prune_shards<
let prune_index_request = PruneShardsRequest {
index_uid: Some(test_index.index_uid.clone()),
source_id: test_index.source_id.clone(),
max_age: Some(oldest_shard_age - 4000),
max_age_secs: Some(oldest_shard_age - 4000),
max_count: Some(50),
};
let response = metastore.prune_shards(prune_index_request).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-proto/protos/quickwit/metastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ message PruneShardsRequest {
quickwit.common.IndexUid index_uid = 1;
string source_id = 2;
// The maximum age of the shards to keep, in seconds.
optional uint32 max_age = 5;
optional uint32 max_age_secs = 5;
// The maximum number of the shards to keep. Delete older shards first.
optional uint32 max_count = 6;
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ async fn get_source_shards(
.await?
.deserialize_index_metadata()?
.index_uid;
let shards = metastore
let response = metastore
.list_shards(ListShardsRequest {
subrequests: vec![ListShardsSubrequest {
index_uid: Some(index_uid),
Expand All @@ -953,8 +953,12 @@ async fn get_source_shards(
}],
})
.await?;

Ok(shards.subresponses[0].clone().shards)
let shards = response
.subresponses
.into_iter()
.flat_map(|resp| resp.shards)
.collect();
Ok(shards)
}

#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
Expand Down

0 comments on commit e4f45d1

Please sign in to comment.