Skip to content

Commit

Permalink
Moved CommitType enum to proto definition.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 25, 2023
1 parent 60764ed commit a0ede73
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 42 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/ingest_api_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod tests {
}
IngestRequest {
doc_batches,
commit: commit_type as u32,
commit: commit_type.into(),
}
}

Expand Down
39 changes: 37 additions & 2 deletions quickwit/quickwit-ingest/src/codegen/ingest_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct DropQueueRequest {
pub struct IngestRequest {
#[prost(message, repeated, tag = "1")]
pub doc_batches: ::prost::alloc::vec::Vec<DocBatch>,
#[prost(uint32, tag = "2")]
pub commit: u32,
#[prost(enumeration = "CommitType", tag = "2")]
pub commit: i32,
}
#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -114,6 +114,41 @@ pub struct ListQueuesResponse {
#[prost(string, repeated, tag = "1")]
pub queues: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// / Specifies if the ingest request should block waiting for the records to be committed.
#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum CommitType {
/// / The request doesn't wait for commit
Auto = 0,
/// / The request waits for the next scheduled commit to finish.
WaitFor = 1,
/// / The request forces an immediate commit after the last document in the batch and waits for
/// / it to finish.
Force = 2,
}
impl CommitType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
CommitType::Auto => "Auto",
CommitType::WaitFor => "WaitFor",
CommitType::Force => "Force",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"Auto" => Some(Self::Auto),
"WaitFor" => Some(Self::WaitFor),
"Force" => Some(Self::Force),
_ => None,
}
}
}
/// BEGIN quickwit-codegen
use tower::{Layer, Service, ServiceExt};
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl IngestApiService {
.queues
.append_batch(&doc_batch.index_id, records_it, ctx)
.await?;
let commit = CommitType::from(request.commit);
let commit = request.commit();
if let Some(max_position) = max_position {
if commit != CommitType::Auto {
if commit == CommitType::Force {
Expand Down Expand Up @@ -429,7 +429,7 @@ mod tests {
doc_lengths: vec![1, 3, 2],
},
],
commit: CommitType::Auto as u32,
commit: CommitType::Auto.into(),
};
assert_eq!(ingest_request.cost(), 9);
}
Expand Down Expand Up @@ -458,7 +458,7 @@ mod tests {

let ingest_request = IngestRequest {
doc_batches: vec![batch.build()],
commit: CommitType::Force as u32,
commit: CommitType::Force.into(),
};
let ingest_response = ingest_api_service
.send_message(ingest_request)
Expand Down Expand Up @@ -517,7 +517,7 @@ mod tests {

let ingest_request = IngestRequest {
doc_batches: vec![batch.build()],
commit: CommitType::WaitFor as u32,
commit: CommitType::WaitFor.into(),
};
let ingest_response = ingest_api_service
.send_message(ingest_request)
Expand Down
13 changes: 12 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,20 @@ message DropQueueRequest {
string queue_id = 1;
}

/// Specifies if the ingest request should block waiting for the records to be committed.
enum CommitType {
/// The request doesn't wait for commit
Auto = 0;
/// The request waits for the next scheduled commit to finish.
WaitFor = 1;
/// The request forces an immediate commit after the last document in the batch and waits for
/// it to finish.
Force = 2;
}

message IngestRequest {
repeated DocBatch doc_batches = 1;
uint32 commit = 2;
CommitType commit = 2;
}

message IngestResponse {
Expand Down
32 changes: 2 additions & 30 deletions quickwit/quickwit-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub use position::Position;
pub use queue::Queues;
use quickwit_actors::{Mailbox, Universe};
use quickwit_config::IngestApiConfig;
use serde::Deserialize;
use tokio::sync::Mutex;

mod doc_batch;
Expand Down Expand Up @@ -114,33 +113,6 @@ pub async fn start_ingest_api_service(
init_ingest_api(universe, &queues_dir_path, config).await
}

/// Specifies if the ingest request should block waiting for the records to be committed.
#[repr(u32)]
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, utoipa::ToSchema)]
#[serde(rename_all(deserialize = "snake_case"))]
#[derive(Default)]
pub enum CommitType {
#[default]
/// The request doesn't wait for commit
Auto = 0,
/// The request waits for the next scheduled commit to finish.
WaitFor = 1,
/// The request forces an immediate commit after the last document in the batch and waits for
/// it to finish.
Force = 2,
}

impl From<u32> for CommitType {
fn from(value: u32) -> Self {
match value {
0 => CommitType::Auto,
1 => CommitType::WaitFor,
2 => CommitType::Force,
_ => panic!("Unknown commit type {value}"),
}
}
}

impl CommitType {
pub fn to_query_parameter(&self) -> Option<&'static [(&'static str, &'static str)]> {
match self {
Expand Down Expand Up @@ -223,7 +195,7 @@ mod tests {
doc_lengths: vec![2],
},
],
commit: CommitType::Auto as u32,
commit: CommitType::Auto.into(),
};
let ingest_result = ingest_api_service.ask_for_res(ingest_request).await;
assert!(ingest_result.is_err());
Expand Down Expand Up @@ -268,7 +240,7 @@ mod tests {
doc_buffer: vec![1; 600].into(),
doc_lengths: vec![30; 20],
}],
commit: CommitType::Auto as u32,
commit: CommitType::Auto.into(),
};

ingest_api_service
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-opentelemetry/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl OtlpGrpcLogsService {
async fn store_logs(&mut self, doc_batch: DocBatch) -> Result<(), tonic::Status> {
let ingest_request = IngestRequest {
doc_batches: vec![doc_batch],
commit: CommitType::Auto as u32,
commit: CommitType::Auto.into(),
};
self.ingest_service.ingest(ingest_request).await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-opentelemetry/src/otlp/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ impl OtlpGrpcTracesService {
async fn store_spans(&mut self, doc_batch: DocBatch) -> Result<(), tonic::Status> {
let ingest_request = IngestRequest {
doc_batches: vec![doc_batch],
commit: self.commit_type as u32,
commit: self.commit_type.into(),
};
self.ingest_service.ingest(ingest_request).await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/elastic_search_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn elastic_ingest_bulk(
let commit_type: CommitType = ingest_options.refresh.into();
let ingest_request = IngestRequest {
doc_batches,
commit: commit_type as u32,
commit: commit_type.into(),
};
let ingest_response = ingest_service.ingest(ingest_request).await?;
Ok(ingest_response)
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async fn ingest(
}
let ingest_req = IngestRequest {
doc_batches: vec![doc_batch_builder.build()],
commit: ingest_options.commit_type as u32,
commit: ingest_options.commit_type.into(),
};
let ingest_response = ingest_service.ingest(ingest_req).await?;
Ok(ingest_response)
Expand Down

0 comments on commit a0ede73

Please sign in to comment.