Skip to content

Commit

Permalink
Make the ShardPositions model into a ShardPositionsService actor
Browse files Browse the repository at this point in the history
Its purpose is to encapsulate all of the logic used to maintained a
distributed eventually consistent view of the published shard positions
over the cluster.

From the user point of view, after instantiation
- indexing pipelines need to feed it with updates. (this happens on
  suggest_truncate). This is done by publishing
  `LocalShardPositionsUpdates` to the event broker.
- clients interested in updates can just subscript the
  `ShardPositionsUpdate` object in the event broker. The event
  received can come from a local indexing pipeline or anywhere in the
  cluster.

The service takes care of deduping/ignoring updates when necessary.

The two object (Local and not) are very similar, but different in
semantics.

Preliminary step for #4056
  • Loading branch information
fulmicoton committed Nov 17, 2023
1 parent ac983a7 commit b3b7469
Show file tree
Hide file tree
Showing 15 changed files with 457 additions and 122 deletions.
5 changes: 2 additions & 3 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ base64 = "0.21"
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = "0.7"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "4a74399" }
chrono = { version = "0.4.23", default-features = false, features = [
"clock",
"std",
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ mod tests {
}

pub(crate) fn build(self) -> NodeState {
let mut node_state = NodeState::default();
let mut node_state = NodeState::for_test();

node_state.set(
ENABLED_SERVICES_KEY,
Expand Down
16 changes: 13 additions & 3 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use anyhow::Context;
use chitchat::transport::Transport;
use chitchat::{
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, ClusterStateSnapshot,
FailureDetectorConfig, NodeState,
FailureDetectorConfig, ListenerHandle, NodeState,
};
use futures::Stream;
use itertools::Itertools;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct Cluster {
cluster_id: String,
self_chitchat_id: ChitchatId,
/// Socket address (UDP) the node listens on for receiving gossip messages.
gossip_listen_addr: SocketAddr,
pub gossip_listen_addr: SocketAddr,
inner: Arc<RwLock<InnerCluster>>,
}

Expand Down Expand Up @@ -105,6 +105,16 @@ impl Cluster {
self.self_chitchat_id.gossip_advertise_addr
}

pub async fn subscribe(
&self,
key_prefix: &str,
callback: impl Fn(&str, &str) + 'static + Send + Sync,
) -> ListenerHandle {
let chitchat = self.chitchat().await;
let chitchat_lock = chitchat.lock().await;
chitchat_lock.subscribe_event(key_prefix, callback)
}

pub async fn join(
cluster_id: String,
self_node: ClusterMember,
Expand Down Expand Up @@ -366,7 +376,7 @@ impl Cluster {
Ok(())
}

async fn chitchat(&self) -> Arc<Mutex<Chitchat>> {
pub async fn chitchat(&self) -> Arc<Mutex<Chitchat>> {
self.inner.read().await.chitchat_handle.chitchat()
}
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ClusterNode {
let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into();
let chitchat_id = ChitchatId::new(node_id.to_string(), 0, gossip_advertise_addr);
let channel = make_channel(grpc_advertise_addr).await;
let mut node_state = NodeState::default();
let mut node_state = NodeState::for_test();
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());

Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,3 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32
/// In order to amortized search with scroll, we fetch more documents than are
/// being requested.
pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to publish the shard positions.
pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:";
14 changes: 14 additions & 0 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use quickwit_proto::control_plane::{
ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse, MetastoreError,
Expand Down Expand Up @@ -135,6 +136,19 @@ impl Actor for ControlPlane {
}
}

#[async_trait]
impl Handler<ShardPositionsUpdate> for ControlPlane {
type Reply = ();

async fn handle(
&mut self,
shard_positions: ShardPositionsUpdate,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
Ok(())
}
}

#[async_trait]
impl Handler<ControlPlanLoop> for ControlPlane {
type Reply = ();
Expand Down
13 changes: 3 additions & 10 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use quickwit_actors::{
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir;
use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
Expand All @@ -56,10 +56,7 @@ use tracing::{debug, error, info, warn};

use super::merge_pipeline::{MergePipeline, MergePipelineParams};
use super::MergePlanner;
use crate::models::{
DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, PublishedShardPositions,
SpawnPipeline,
};
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
use crate::source::{AssignShards, Assignment};
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};
Expand Down Expand Up @@ -120,7 +117,6 @@ pub struct IndexingService {
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
event_broker: EventBroker,
_event_subscription_handle: EventSubscriptionHandle,
}

impl Debug for IndexingService {
Expand Down Expand Up @@ -148,8 +144,6 @@ impl IndexingService {
storage_resolver: StorageResolver,
event_broker: EventBroker,
) -> anyhow::Result<IndexingService> {
let published_shard_positions = PublishedShardPositions::new(cluster.clone());
let event_subscription_handle = event_broker.subscribe(published_shard_positions);
let split_store_space_quota = SplitStoreQuota::new(
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
Expand All @@ -165,7 +159,7 @@ impl IndexingService {
} else {
None
};
Ok(Self {
Ok(IndexingService {
node_id,
indexing_root_directory,
queue_dir_path,
Expand All @@ -181,7 +175,6 @@ impl IndexingService {
merge_pipeline_handles: HashMap::new(),
cooperative_indexing_permits,
event_broker,
_event_subscription_handle: event_subscription_handle,
})
}

Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use crate::actors::{
Sequencer, SplitsUpdateMailbox,
};
pub use crate::controlled_directory::ControlledDirectory;
use crate::models::IndexingStatistics;
use crate::models::{IndexingStatistics, ShardPositionsService};
pub use crate::split_store::{get_tantivy_directory_from_split_bundle, IndexingSplitStore};

pub mod actors;
Expand Down Expand Up @@ -77,6 +77,8 @@ pub async fn start_indexing_service(
) -> anyhow::Result<Mailbox<IndexingService>> {
info!("starting indexer service");

ShardPositionsService::spawn(universe.spawn_ctx(), event_broker.clone(), cluster.clone());

// Spawn indexing service.
let indexing_service = IndexingService::new(
config.node_id.clone(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ pub use publish_lock::{NewPublishLock, PublishLock};
pub use publisher_message::SplitsUpdate;
use quickwit_proto::types::PublishToken;
pub use raw_doc_batch::RawDocBatch;
pub use shard_positions::{PublishedShardPositions, PublishedShardPositionsUpdate};
pub(crate) use shard_positions::LocalShardPositionsUpdate;
pub use shard_positions::ShardPositionsService;
pub use split_attrs::{create_split_metadata, SplitAttrs};

#[derive(Debug)]
Expand Down
Loading

0 comments on commit b3b7469

Please sign in to comment.