Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jul 26, 2023
1 parent 90c59a2 commit b43cb97
Show file tree
Hide file tree
Showing 46 changed files with 2,458 additions and 2,407 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ testsuite = [
"quickwit-actors/testsuite",
"quickwit-cluster/testsuite",
"quickwit-common/testsuite",
"quickwit-metastore/testsuite",
]

[dev-dependencies]
Expand Down
27 changes: 14 additions & 13 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject};
use quickwit_proto::IndexUid;
use serde::Serialize;
use serde_json::Value as JsonValue;
use tantivy::schema::{Field, Value};
Expand Down Expand Up @@ -107,7 +108,7 @@ impl From<FromUtf8Error> for DocProcessorError {

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct DocProcessorCounters {
index_id: String,
index_uid: IndexUid,
source_id: String,
/// Overall number of documents received, partitioned
/// into 4 categories:
Expand All @@ -129,9 +130,9 @@ pub struct DocProcessorCounters {
}

impl DocProcessorCounters {
pub fn new(index_id: String, source_id: String) -> Self {
pub fn new(index_uid: IndexUid, source_id: String) -> Self {
Self {
index_id,
index_uid,
source_id,
num_parse_errors: 0,
num_transform_errors: 0,
Expand Down Expand Up @@ -162,15 +163,15 @@ impl DocProcessorCounters {
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([
self.index_id.as_str(),
self.index_uid.index_id(),
self.source_id.as_str(),
"parsing_error",
])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([
self.index_id.as_str(),
self.index_uid.index_id(),
self.source_id.as_str(),
"parsing_error",
])
Expand All @@ -183,15 +184,15 @@ impl DocProcessorCounters {
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([
self.index_id.as_str(),
self.index_uid.index_id(),
self.source_id.as_str(),
"transform_error",
])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([
self.index_id.as_str(),
self.index_uid.index_id(),
self.source_id.as_str(),
"transform_error",
])
Expand All @@ -204,15 +205,15 @@ impl DocProcessorCounters {
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([
self.index_id.as_str(),
self.index_uid.index_id(),
self.source_id.as_str(),
"missing_field",
])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([
self.index_id.as_str(),
self.index_uid.index_id(),
self.source_id.as_str(),
"missing_field",
])
Expand All @@ -224,11 +225,11 @@ impl DocProcessorCounters {
self.overall_num_bytes += num_bytes;
crate::metrics::INDEXER_METRICS
.processed_docs_total
.with_label_values([self.index_id.as_str(), self.source_id.as_str(), "valid"])
.with_label_values([self.index_uid.index_id(), self.source_id.as_str(), "valid"])
.inc();
crate::metrics::INDEXER_METRICS
.processed_bytes
.with_label_values([self.index_id.as_str(), self.source_id.as_str(), "valid"])
.with_label_values([self.index_uid.index_id(), self.source_id.as_str(), "valid"])
.inc_by(num_bytes);
}
}
Expand All @@ -246,7 +247,7 @@ pub struct DocProcessor {

impl DocProcessor {
pub fn try_new(
index_id: String,
index_uid: IndexUid,
source_id: String,
doc_mapper: Arc<dyn DocMapper>,
indexer_mailbox: Mailbox<Indexer>,
Expand All @@ -261,7 +262,7 @@ impl DocProcessor {
doc_mapper,
indexer_mailbox,
timestamp_field_opt,
counters: DocProcessorCounters::new(index_id, source_id),
counters: DocProcessorCounters::new(index_uid, source_id),
publish_lock: PublishLock::default(),
#[cfg(feature = "vrl")]
transform_opt: transform_config_opt
Expand Down
18 changes: 11 additions & 7 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_config::{IndexingSettings, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError};
use quickwit_metastore::{IndexMetadataResponseExt, Metastore, MetastoreError};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::IndexMetadataRequest;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -254,11 +255,12 @@ impl IndexingPipeline {
.await
.expect("The semaphore should not be closed.");
self.statistics.num_spawn_attempts += 1;
let index_id = self.params.pipeline_id.index_uid.index_id();
let index_uid = self.params.pipeline_id.index_uid.clone();
let index_id = index_uid.index_id();
let source_id = self.params.pipeline_id.source_id.as_str();
self.kill_switch = ctx.kill_switch().child();
info!(
index_id=%index_id,
index_id=%index_uid.index_id(),
source_id=%source_id,
pipeline_ord=%self.params.pipeline_id.pipeline_ord,
root_dir=%self.params.indexing_directory.path().display(),
Expand Down Expand Up @@ -351,7 +353,7 @@ impl IndexingPipeline {
.spawn(indexer);

let doc_processor = DocProcessor::try_new(
index_id.to_string(),
index_uid.clone(),
source_id.to_string(),
self.params.doc_mapper.clone(),
indexer_mailbox,
Expand All @@ -369,9 +371,11 @@ impl IndexingPipeline {
.spawn(doc_processor);

// Fetch index_metadata to be sure to have the last updated checkpoint.
let index_metadata = ctx
.protect_future(self.params.metastore.index_metadata(index_id))
let index_metadata_request = IndexMetadataRequest::strict(index_uid.clone());
let index_metadata_response = ctx
.protect_future(self.params.metastore.index_metadata(index_metadata_request))
.await?;
let index_metadata = index_metadata_response.deserialize_index_metadata()?;
let source_checkpoint = index_metadata
.checkpoint
.source_checkpoint(source_id)
Expand All @@ -381,7 +385,7 @@ impl IndexingPipeline {
.protect_future(quickwit_supported_sources().load_source(
Arc::new(SourceExecutionContext {
metastore: self.params.metastore.clone(),
index_uid: self.params.pipeline_id.index_uid.clone(),
index_uid,
queues_dir_path: self.params.queues_dir_path.clone(),
source_config: self.params.source_config.clone(),
}),
Expand Down
Loading

0 comments on commit b43cb97

Please sign in to comment.