Skip to content

Commit

Permalink
Close shards with EOF record (#4021)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 25, 2023
1 parent fd1547b commit fdc60f8
Show file tree
Hide file tree
Showing 61 changed files with 1,771 additions and 3,135 deletions.
10 changes: 10 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ backoff = { version = "0.4", features = ["tokio"] }
base64 = "0.21"
byte-unit = { version = "4", default-features = false, features = ["serde", "std"] }
bytes = { version = "1", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat", rev = "bc29598" }
chrono = { version = "0.4.23", default-features = false, features = ["clock", "std"] }
clap = { version = "4.4.1", features = ["env", "string"] }
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ mod tests {
use std::str::FromStr;

use quickwit_config::{SourceInputFormat, SourceParams};
use quickwit_metastore::checkpoint::{PartitionId, Position};
use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::types::Position;
use serde_json::json;

use super::*;
Expand Down
52 changes: 4 additions & 48 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use quickwit_proto::control_plane::{
GetOrCreateOpenShardsResponse,
};
use quickwit_proto::metastore::{
serde_utils as metastore_serde_utils, AddSourceRequest, CloseShardsRequest, CreateIndexRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteShardsRequest, DeleteSourceRequest,
EmptyResponse, MetastoreError, MetastoreService, MetastoreServiceClient, ToggleSourceRequest,
serde_utils as metastore_serde_utils, AddSourceRequest, CreateIndexRequest,
CreateIndexResponse, DeleteIndexRequest, DeleteSourceRequest, EmptyResponse, MetastoreError,
MetastoreService, MetastoreServiceClient, ToggleSourceRequest,
};
use quickwit_proto::{IndexUid, NodeId};
use quickwit_proto::types::{IndexUid, NodeId};
use serde::Serialize;
use tracing::error;

Expand Down Expand Up @@ -365,50 +365,6 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
}
}

// This is a metastore callback. Ingesters call the metastore to close shards directly, then the
// metastore notifies the control plane of the event.
#[async_trait]
impl Handler<CloseShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<EmptyResponse>;

async fn handle(
&mut self,
request: CloseShardsRequest,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
for close_shards_subrequest in request.subrequests {
let index_uid: IndexUid = close_shards_subrequest.index_uid.into();
let source_id = close_shards_subrequest.source_id;
// TODO: Group by (index_uid, source_id) first, or change schema of
// `CloseShardsSubrequest`.
let shard_ids = [close_shards_subrequest.shard_id];
self.model.close_shards(&index_uid, &source_id, &shard_ids)
}
Ok(Ok(EmptyResponse {}))
}
}

// This is a metastore callback. Ingesters call the metastore to delete shards directly, then the
// metastore notifies the control plane of the event.
#[async_trait]
impl Handler<DeleteShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<EmptyResponse>;

async fn handle(
&mut self,
request: DeleteShardsRequest,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
for delete_shards_subrequest in request.subrequests {
let index_uid: IndexUid = delete_shards_subrequest.index_uid.into();
let source_id = delete_shards_subrequest.source_id;
let shard_ids = delete_shards_subrequest.shard_ids;
self.model.delete_shards(&index_uid, &source_id, &shard_ids)
}
Ok(Ok(EmptyResponse {}))
}
}

#[cfg(test)]
mod tests {
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
Expand Down
8 changes: 5 additions & 3 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ use quickwit_proto::metastore::{
EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, MetastoreService,
MetastoreServiceClient,
};
use quickwit_proto::types::IndexId;
use quickwit_proto::{metastore, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use quickwit_proto::{metastore, IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use serde::Serialize;
use tracing::{error, info};

Expand Down Expand Up @@ -249,6 +248,7 @@ impl ControlPlaneModel {
}

/// Removes the shards identified by their index UID, source ID, and shard IDs.
#[allow(dead_code)] // Will remove this in a future PR.
pub fn delete_shards(
&mut self,
index_uid: &IndexUid,
Expand All @@ -261,6 +261,7 @@ impl ControlPlaneModel {

/// Sets the state of the shards identified by their index UID, source ID, and shard IDs to
/// `Closed`.
#[allow(dead_code)] // Will remove this in a future PR.
pub fn close_shards(
&mut self,
index_uid: &IndexUid,
Expand Down Expand Up @@ -425,6 +426,7 @@ impl ShardTable {

/// Sets the state of the shards identified by their index UID, source ID, and shard IDs to
/// `Closed`.
#[allow(dead_code)] // Will remove this in a future PR.
pub fn close_shards(
&mut self,
index_uid: &IndexUid,
Expand Down Expand Up @@ -517,7 +519,7 @@ mod tests {
source_id: source_id.clone(),
shard_id: 2,
leader_id: "test-leader-0".to_string(),
shard_state: ShardState::Closing as i32,
shard_state: ShardState::Unavailable as i32,
..Default::default()
};
let shard_03 = Shard {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::metastore::{
EntityKind, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::NodeId;
use quickwit_proto::{metastore, IndexUid};
use quickwit_proto::{metastore, IndexUid, NodeId};
use rand::seq::SliceRandom;
use tokio::time::timeout;

Expand Down
42 changes: 1 addition & 41 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,9 @@ pub mod indexing_plan;
pub mod indexing_scheduler;
pub mod ingest;

use async_trait::async_trait;
use quickwit_common::pubsub::EventSubscriber;
use quickwit_common::tower::Pool;
use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient};
use quickwit_proto::indexing::{IndexingServiceClient, IndexingTask};
use quickwit_proto::metastore::{CloseShardsRequest, DeleteShardsRequest};
use quickwit_proto::{IndexUid, SourceId};
use tracing::error;
use quickwit_proto::types::{IndexUid, SourceId};

/// It can however appear only once in a given index.
/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is.
Expand All @@ -49,40 +44,5 @@ pub struct IndexerNodeInfo {

pub type IndexerPool = Pool<String, IndexerNodeInfo>;

/// Subscribes to various metastore events and forwards them to the control plane using the inner
/// client. The actual subscriptions are set up in `quickwit-serve`.
#[derive(Debug, Clone)]
pub struct ControlPlaneEventSubscriber(ControlPlaneServiceClient);

impl ControlPlaneEventSubscriber {
pub fn new(control_plane: ControlPlaneServiceClient) -> Self {
Self(control_plane)
}
}

#[async_trait]
impl EventSubscriber<CloseShardsRequest> for ControlPlaneEventSubscriber {
async fn handle_event(&mut self, request: CloseShardsRequest) {
if let Err(error) = self.0.close_shards(request).await {
error!(
"failed to notify control plane of close shards event: `{}`",
error
);
}
}
}

#[async_trait]
impl EventSubscriber<DeleteShardsRequest> for ControlPlaneEventSubscriber {
async fn handle_event(&mut self, request: DeleteShardsRequest) {
if let Err(error) = self.0.delete_shards(request).await {
error!(
"failed to notify control plane of delete shards event: `{}`",
error
);
}
}
}

#[cfg(test)]
mod tests;
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ impl Handler<SplitsUpdate> for Publisher {
mod tests {
use quickwit_actors::Universe;
use quickwit_metastore::checkpoint::{
IndexCheckpointDelta, PartitionId, Position, SourceCheckpoint, SourceCheckpointDelta,
IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta,
};
use quickwit_metastore::{PublishSplitsRequestExt, SplitMetadata};
use quickwit_proto::metastore::EmptyResponse;
use quickwit_proto::IndexUid;
use quickwit_proto::{IndexUid, Position};
use tracing::Span;

use super::*;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_metastore::{SplitMetadata, StageSplitsRequestExt};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, StageSplitsRequest};
use quickwit_proto::search::{ReportSplit, ReportSplitsRequest};
use quickwit_proto::{IndexUid, PublishToken};
use quickwit_proto::types::{IndexUid, PublishToken};
use quickwit_storage::SplitPayloadBuilder;
use serde::Serialize;
use tantivy::TrackedObject;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/models/indexed_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use quickwit_common::io::IoControls;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::{IndexUid, PublishToken};
use quickwit_proto::types::{IndexUid, PublishToken};
use tantivy::directory::MmapDirectory;
use tantivy::{IndexBuilder, TrackedObject};
use tracing::{instrument, Span};
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/models/packaged_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt;
use itertools::Itertools;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::{IndexUid, PublishToken, SplitId};
use quickwit_proto::types::{IndexUid, PublishToken, SplitId};
use tantivy::TrackedObject;
use tracing::Span;

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/models/publisher_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::fmt;
use itertools::Itertools;
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_metastore::SplitMetadata;
use quickwit_proto::{IndexUid, PublishToken};
use quickwit_proto::types::{IndexUid, PublishToken};
use tantivy::TrackedObject;
use tracing::Span;

Expand Down
16 changes: 10 additions & 6 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use bytes::Bytes;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::uri::Uri;
use quickwit_config::FileSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::types::Position;
use serde::Serialize;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tracing::info;
Expand Down Expand Up @@ -140,11 +141,14 @@ impl TypedSourceFactory for FileSourceFactory {
let mut offset = 0;
let reader: Box<dyn AsyncRead + Send + Unpin> = if let Some(filepath) = &params.filepath {
let partition_id = PartitionId::from(filepath.to_string_lossy().to_string());
if let Some(Position::Offset(offset_str)) =
checkpoint.position_for_partition(&partition_id).cloned()
{
offset = offset_str.parse::<usize>()?;
}
offset = checkpoint
.position_for_partition(&partition_id)
.map(|position| {
position
.as_usize()
.expect("file offset should be stored as usize")
})
.unwrap_or(0);
let (dir_uri, file_name) = dir_and_filename(filepath)?;
let storage = ctx.storage_resolver.resolve(&dir_uri).await?;
let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap();
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use google_cloud_pubsub::subscription::Subscription;
use quickwit_actors::{ActorContext, ActorExitStatus, Mailbox};
use quickwit_common::rand::append_random_suffix;
use quickwit_config::GcpPubSubSourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::types::Position;
use serde_json::{json, Value as JsonValue};
use tokio::time;
use tracing::{debug, info, warn};
Expand Down
Loading

0 comments on commit fdc60f8

Please sign in to comment.