Skip to content

Commit

Permalink
Simplify JsonDocBatchV2Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 23, 2024
1 parent 9b4764f commit 2993423
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 28 deletions.
26 changes: 14 additions & 12 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,25 @@ impl DocBatchV2Builder {
};
Some(doc_batch)
}

pub fn json_writer(self) -> JsonDocBatchV2Builder {
JsonDocBatchV2Builder {
doc_uids: self.doc_uids,
doc_buffer: self.doc_buffer.writer(),
doc_lengths: self.doc_lengths,
}
}
}

/// Wrapper around the batch builder that can add Serialize structs without an extra copy
/// Batch builder that can append [`Serialize`] structs without an extra copy
pub struct JsonDocBatchV2Builder {
doc_uids: Vec<DocUid>,
doc_buffer: Writer<BytesMut>,
doc_lengths: Vec<u32>,
}

impl Default for JsonDocBatchV2Builder {
fn default() -> Self {
Self {
doc_uids: Vec::new(),
doc_buffer: BytesMut::new().writer(),
doc_lengths: Vec::new(),
}
}
}

impl JsonDocBatchV2Builder {
pub fn add_doc(&mut self, doc_uid: DocUid, payload: impl Serialize) -> serde_json::Result<()> {
let old_len = self.doc_buffer.get_ref().len();
Expand All @@ -169,10 +171,10 @@ impl JsonDocBatchV2Builder {
Ok(())
}

pub fn into_inner(self) -> DocBatchV2Builder {
DocBatchV2Builder {
pub fn build(self) -> DocBatchV2 {
DocBatchV2 {
doc_uids: self.doc_uids,
doc_buffer: self.doc_buffer.into_inner(),
doc_buffer: self.doc_buffer.into_inner().freeze(),
doc_lengths: self.doc_lengths,
}
}
Expand Down
11 changes: 3 additions & 8 deletions quickwit/quickwit-opentelemetry/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quickwit_common::uri::Uri;
use quickwit_config::{
load_index_config_from_user_config, ConfigFormat, IndexConfig, INGEST_V2_SOURCE_ID,
};
use quickwit_ingest::{CommitType, DocBatchV2Builder, IngestServiceError};
use quickwit_ingest::{CommitType, IngestServiceError, JsonDocBatchV2Builder};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest,
};
Expand Down Expand Up @@ -310,7 +310,7 @@ impl OtlpGrpcLogsService {
let num_log_records = log_records.len() as u64;
let mut error_message = String::new();

let mut doc_batch_builder = DocBatchV2Builder::default().json_writer();
let mut doc_batch_builder = JsonDocBatchV2Builder::default();
let mut doc_uid_generator = DocUidGenerator::default();
for log_record in log_records {
let doc_uid = doc_uid_generator.next_doc_uid();
Expand All @@ -320,12 +320,7 @@ impl OtlpGrpcLogsService {
num_parse_errors += 1;
}
}
let doc_batch_opt = doc_batch_builder.into_inner().build();
let doc_batch = if let Some(doc_batch) = doc_batch_opt {
doc_batch
} else {
DocBatchV2::default()
};
let doc_batch = doc_batch_builder.build();
let current_span = RuntimeSpan::current();
current_span.record("num_log_records", num_log_records);
current_span.record("num_bytes", doc_batch.num_bytes());
Expand Down
11 changes: 3 additions & 8 deletions quickwit/quickwit-opentelemetry/src/otlp/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quickwit_common::uri::Uri;
use quickwit_config::{
load_index_config_from_user_config, ConfigFormat, IndexConfig, INGEST_V2_SOURCE_ID,
};
use quickwit_ingest::{CommitType, DocBatchV2Builder, IngestServiceError};
use quickwit_ingest::{CommitType, IngestServiceError, JsonDocBatchV2Builder};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest,
};
Expand Down Expand Up @@ -766,7 +766,7 @@ impl OtlpGrpcTracesService {
let mut num_parse_errors = 0;
let mut error_message = String::new();

let mut doc_batch_builder = DocBatchV2Builder::default().json_writer();
let mut doc_batch_builder = JsonDocBatchV2Builder::default();
let mut doc_uid_generator = DocUidGenerator::default();
for span in spans {
let doc_uid = doc_uid_generator.next_doc_uid();
Expand All @@ -776,12 +776,7 @@ impl OtlpGrpcTracesService {
num_parse_errors += 1;
}
}
let doc_batch_opt = doc_batch_builder.into_inner().build();
let doc_batch = if let Some(doc_batch) = doc_batch_opt {
doc_batch
} else {
DocBatchV2::default()
};
let doc_batch = doc_batch_builder.build();
let current_span = RuntimeSpan::current();
current_span.record("num_spans", num_spans);
current_span.record("num_bytes", doc_batch.num_bytes());
Expand Down

0 comments on commit 2993423

Please sign in to comment.