Skip to content

Commit

Permalink
Exposing published position thru chitchat (#4110)
Browse files Browse the repository at this point in the history
* Exposing published positions through chitchat.

The info is transmitted via an EventSubscriber at the same time as regular truncate:

upon fetching the starting offset when assigning a shard to an IngestSource
upon suggest truncate.
This PR does not include any code to remove sources yet.

Refactoring:
Moving SourceUid to quickwit-proto
Making it more difficult to misuse pubsub
- Added #[must_use] for methods returning the handle
- Internalizing the `Clone` mecanic. A "normal" Clone could yield
unexpected result.

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

* Improved doc in unit test
  • Loading branch information
fulmicoton authored Nov 16, 2023
1 parent a65997a commit 7a29318
Show file tree
Hide file tree
Showing 18 changed files with 758 additions and 90 deletions.
10 changes: 10 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ impl Cluster {
.set(key, value);
}

pub async fn get_self_key_value(&self, key: &str) -> Option<String> {
self.chitchat()
.await
.lock()
.await
.self_node_state()
.get_versioned(key)
.map(|versioned_value| versioned_value.value.clone())
}

/// Waits until the predicate holds true for the set of ready members.
pub async fn wait_for_ready_members<F>(
&self,
Expand Down
112 changes: 100 additions & 12 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,54 @@

use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::Mutex as TokioMutex;

use crate::type_map::TypeMap;

pub trait Event: fmt::Debug + Clone + Send + Sync + 'static {}

#[async_trait]
pub trait EventSubscriber<E>: fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
pub trait EventSubscriber<E>: Send + Sync + 'static {
async fn handle_event(&mut self, event: E);
}

dyn_clone::clone_trait_object!(<E> EventSubscriber<E>);
struct ClosureSubscriber<E, F> {
callback: Arc<F>,
_phantom: PhantomData<E>,
}

impl<E, F> Clone for ClosureSubscriber<E, F> {
fn clone(&self) -> Self {
ClosureSubscriber {
callback: self.callback.clone(),
_phantom: self._phantom,
}
}
}

impl<E, F> fmt::Debug for ClosureSubscriber<E, F> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ClosureSubscriber")
.field("callback", &std::any::type_name::<F>())
.finish()
}
}

#[async_trait]
impl<E: Sync + Send + 'static, F: Fn(E) + Sync + Send + 'static> EventSubscriber<E>
for ClosureSubscriber<E, F>
{
async fn handle_event(&mut self, event: E) {
(self.callback)(event);
}
}

type EventSubscriptions<E> = HashMap<usize, EventSubscription<E>>;

Expand All @@ -59,6 +91,7 @@ struct InnerEventBroker {

impl EventBroker {
/// Subscribes to an event type.
#[must_use]
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
where E: Event {
let mut subscriptions = self
Expand All @@ -76,8 +109,7 @@ impl EventBroker {
.fetch_add(1, Ordering::Relaxed);

let subscription = EventSubscription {
subscription_id,
subscriber: Box::new(subscriber),
subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))),
};
let typed_subscriptions = subscriptions
.get_mut::<EventSubscriptions<E>>()
Expand All @@ -99,6 +131,21 @@ impl EventBroker {
}
}

/// Subscribes to an event with a callback function.
#[must_use]
pub fn subscribe_fn<E>(
&self,
callback_fn: impl Fn(E) + Sync + Send + 'static,
) -> EventSubscriptionHandle
where
E: Event,
{
self.subscribe(ClosureSubscriber {
callback: Arc::new(callback_fn),
_phantom: Default::default(),
})
}

/// Publishes an event.
pub fn publish<E>(&self, event: E)
where E: Event {
Expand All @@ -111,20 +158,18 @@ impl EventBroker {
if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
for subscription in typed_subscriptions.values() {
let event = event.clone();
let mut subscriber = subscription.subscriber.clone();
let subscriber_clone = subscription.subscriber.clone();
tokio::spawn(tokio::time::timeout(Duration::from_secs(600), async move {
subscriber.handle_event(event).await;
let mut subscriber_lock = subscriber_clone.lock().await;
subscriber_lock.handle_event(event).await;
}));
}
}
}
}

#[derive(Debug)]
struct EventSubscription<E> {
#[allow(dead_code)]
subscription_id: usize, // Used for the `Debug` implementation.
subscriber: Box<dyn EventSubscriber<E>>,
subscriber: Arc<TokioMutex<Box<dyn EventSubscriber<E>>>>,
}

pub struct EventSubscriptionHandle {
Expand All @@ -135,6 +180,12 @@ pub struct EventSubscriptionHandle {

impl EventSubscriptionHandle {
pub fn cancel(self) {}

/// By default, dropping an event cancels the subscription.
/// `forever` consumes the handle and avoid drop
pub fn forever(mut self) {
self.broker = Weak::new();
}
}

impl Drop for EventSubscriptionHandle {
Expand Down Expand Up @@ -184,15 +235,52 @@ mod tests {
let event = MyEvent { value: 42 };
event_broker.publish(event);

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 42);

subscription_handle.cancel();

let event = MyEvent { value: 1337 };
event_broker.publish(event);

tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(counter.load(Ordering::Relaxed), 42);
}

#[tokio::test]
async fn test_event_broker_handle_drop() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
drop(event_broker.subscribe_fn::<MyEvent>(move |event| {
tx.send(event.value).unwrap();
}));
event_broker.publish(MyEvent { value: 42 });
assert!(rx.recv().await.is_none());
}

#[tokio::test]
async fn test_event_broker_handle_cancel() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
event_broker
.subscribe_fn::<MyEvent>(move |event| {
tx.send(event.value).unwrap();
})
.cancel();
event_broker.publish(MyEvent { value: 42 });
assert!(rx.recv().await.is_none());
}

#[tokio::test]
async fn test_event_broker_handle_forever() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
event_broker
.subscribe_fn::<MyEvent>(move |event| {
tx.send(event.value).unwrap();
})
.forever();
event_broker.publish(MyEvent { value: 42 });
assert_eq!(rx.recv().await, Some(42));
}
}
3 changes: 3 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32
/// In order to amortized search with scroll, we fetch more documents than are
/// being requested.
pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to publish the shard positions.
pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:";
1 change: 0 additions & 1 deletion quickwit/quickwit-common/src/tower/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ mod tests {

impl Event for MyEvent {}

#[derive(Debug, Clone)]
struct MySubscriber {
counter: Arc<AtomicUsize>,
}
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ use quickwit_proto::metastore::{
self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError,
MetastoreService, MetastoreServiceClient, SourceType,
};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId, SourceUid};
use serde::Serialize;
use tracing::{error, info, warn};

use crate::SourceUid;

type NextShardId = ShardId;
#[derive(Debug, Eq, PartialEq)]
struct ShardTableEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,9 @@ mod tests {
use proptest::{prop_compose, proptest};
use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::types::IndexUid;
use quickwit_proto::types::{IndexUid, SourceUid};

use super::*;
use crate::SourceUid;

#[test]
fn test_indexing_plans_diff() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::num::NonZeroU32;

use fnv::FnvHashMap;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::{IndexUid, ShardId};
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};
use scheduling_logic_model::{IndexerOrd, SourceOrd};
use tracing::error;
use tracing::log::warn;
Expand All @@ -34,7 +34,6 @@ use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
SchedulingProblem, SchedulingSolution,
};
use crate::SourceUid;

/// If we have several pipelines below this threshold we
/// reduce the number of pipelines.
Expand Down Expand Up @@ -530,13 +529,12 @@ mod tests {

use fnv::FnvHashMap;
use quickwit_proto::indexing::{mcpu, IndexingTask};
use quickwit_proto::types::{IndexUid, ShardId};
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};

use super::{
build_physical_indexing_plan, group_shards_into_pipelines, indexing_task,
spread_shards_optimally, SourceToSchedule, SourceToScheduleType,
};
use crate::SourceUid;

#[test]
fn test_spread_shard_optimally() {
Expand Down
9 changes: 0 additions & 9 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ pub(crate) mod metrics;

use quickwit_common::tower::Pool;
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask};
use quickwit_proto::types::{IndexUid, SourceId};

/// It can however appear only once in a given index.
/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is.
#[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)]
pub struct SourceUid {
pub index_uid: IndexUid,
pub source_id: SourceId,
}

/// Indexer-node specific information stored in the pool of available indexer nodes
#[derive(Debug, Clone)]
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ impl IndexingPipeline {
ingester_pool: self.params.ingester_pool.clone(),
queues_dir_path: self.params.queues_dir_path.clone(),
storage_resolver: self.params.source_storage_resolver.clone(),
event_broker: self.params.event_broker.clone(),
}),
source_checkpoint,
))
Expand Down
19 changes: 13 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use quickwit_actors::{
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle};
use quickwit_common::temp_dir;
use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
Expand All @@ -56,7 +56,10 @@ use tracing::{debug, error, info, warn};

use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::MergePlanner;
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::models::{
DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, PublishedShardPositions,
SpawnPipeline,
};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};
Expand Down Expand Up @@ -117,6 +120,7 @@ pub struct IndexingService {
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
event_broker: EventBroker,
_event_subscription_handle: EventSubscriptionHandle,
}

impl Debug for IndexingService {
Expand Down Expand Up @@ -144,6 +148,8 @@ impl IndexingService {
storage_resolver: StorageResolver,
event_broker: EventBroker,
) -> anyhow::Result<IndexingService> {
let published_shard_positions = PublishedShardPositions::new(cluster.clone());
let event_subscription_handle = event_broker.subscribe(published_shard_positions);
let split_store_space_quota = SplitStoreQuota::new(
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
Expand Down Expand Up @@ -175,6 +181,7 @@ impl IndexingService {
merge_pipeline_handles: HashMap::new(),
cooperative_indexing_permits,
event_broker,
_event_subscription_handle: event_subscription_handle,
})
}

Expand Down Expand Up @@ -418,10 +425,10 @@ impl IndexingService {
let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self
.indexing_pipelines
.iter()
.flat_map(|(pipeline, (_, pipeline_handle))| {
let indexer_metrics = pipeline_handle.last_observation();
let pipeline_metrics = indexer_metrics.pipeline_metrics_opt?;
Some((pipeline, pipeline_metrics))
.filter_map(|(pipeline_id, (_, pipeline_handle))| {
let indexing_statistics = pipeline_handle.last_observation();
let pipeline_metrics = indexing_statistics.pipeline_metrics_opt?;
Some((pipeline_id, pipeline_metrics))
})
.collect();
self.cluster
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,6 @@ mod tests {
Ok(())
}

#[derive(Clone)]
struct ReportSplitListener {
report_splits_tx: flume::Sender<ReportSplitsRequest>,
}
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod processed_doc;
mod publish_lock;
mod publisher_message;
mod raw_doc_batch;
mod shard_positions;
mod split_attrs;

pub use indexed_split::{
Expand All @@ -49,6 +50,7 @@ pub use publish_lock::{NewPublishLock, PublishLock};
pub use publisher_message::SplitsUpdate;
use quickwit_proto::types::PublishToken;
pub use raw_doc_batch::RawDocBatch;
pub use shard_positions::{PublishedShardPositions, PublishedShardPositionsUpdate};
pub use split_attrs::{create_split_metadata, SplitAttrs};

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 7a29318

Please sign in to comment.