Skip to content

Commit

Permalink
optimize trace collector (#4194)
Browse files Browse the repository at this point in the history
* initial work on incremental aggregation

* implement incremental collection for FindTraceIdsAggregation
  • Loading branch information
trinity-1686a authored Nov 29, 2023
1 parent 88a3465 commit 778e7e3
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 29 deletions.
4 changes: 4 additions & 0 deletions quickwit/quickwit-common/src/binary_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ where
self.heap.len() >= self.k
}

pub fn max_len(&self) -> usize {
self.k
}

/// Try to add new entries, if they are better than the current worst.
pub fn add_entries(&mut self, mut items: impl Iterator<Item = T>) {
if self.k == 0 {
Expand Down
123 changes: 109 additions & 14 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::HashSet;

Expand Down Expand Up @@ -458,6 +459,92 @@ impl QuickwitAggregations {
}
}
}

fn maybe_incremental_aggregator(&self) -> QuickwitIncrementalAggregations {
match self {
QuickwitAggregations::FindTraceIdsAggregation(aggreg) => {
QuickwitIncrementalAggregations::FindTraceIdsAggregation(aggreg.clone(), Vec::new())
}
QuickwitAggregations::TantivyAggregations(aggreg) => {
QuickwitIncrementalAggregations::TantivyAggregations(aggreg.clone(), Vec::new())
}
}
}
}

#[derive(Clone)]
enum QuickwitIncrementalAggregations {
FindTraceIdsAggregation(FindTraceIdsCollector, Vec<Vec<Span>>),
TantivyAggregations(Aggregations, Vec<Vec<u8>>),
NoAggregation,
}

impl QuickwitIncrementalAggregations {
fn add(&mut self, intermediate_result: Vec<u8>) -> tantivy::Result<()> {
match self {
QuickwitIncrementalAggregations::FindTraceIdsAggregation(collector, ref mut state) => {
let fruits: Vec<Span> =
postcard::from_bytes(&intermediate_result).map_err(map_error)?;
state.push(fruits);
if state.iter().map(Vec::len).sum::<usize>() >= collector.num_traces {
let new_state = collector.merge_fruits(std::mem::take(state))?;
state.push(new_state);
}
}
QuickwitIncrementalAggregations::TantivyAggregations(_, state) => {
state.push(intermediate_result);
}
QuickwitIncrementalAggregations::NoAggregation => (),
}
Ok(())
}

fn virtual_worst_hit(&self) -> Option<PartialHit> {
match self {
QuickwitIncrementalAggregations::FindTraceIdsAggregation(collector, state) => {
if let Some(first) = state.first() {
if first.len() >= collector.num_traces {
if let Some(last_elem) = first.last() {
let timestamp = last_elem.span_timestamp.into_timestamp_nanos();
return Some(PartialHit {
sort_value: Some(SortByValue {
sort_value: Some(SortValue::I64(timestamp)),
}),
sort_value2: None,
split_id: String::new(),
segment_ord: 0,
doc_id: 0,
});
}
}
}
None
}
QuickwitIncrementalAggregations::TantivyAggregations(_, _) => None,
QuickwitIncrementalAggregations::NoAggregation => None,
}
}

fn finalize(self) -> tantivy::Result<Option<Vec<u8>>> {
match self {
QuickwitIncrementalAggregations::FindTraceIdsAggregation(collector, mut state) => {
let merged_fruit = if state.len() > 1 {
collector.merge_fruits(state)?
} else {
state.pop().unwrap_or_default()
};
let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?;
Ok(Some(serialized))
}
QuickwitIncrementalAggregations::TantivyAggregations(aggregation, state) => {
merge_intermediate_aggregation_result(
&Some(QuickwitAggregations::TantivyAggregations(aggregation)),
state.iter().map(|vec| vec.as_slice()),
)
}
QuickwitIncrementalAggregations::NoAggregation => Ok(None),
}
}
}

/// The quickwit collector is the tantivy Collector used in Quickwit.
Expand Down Expand Up @@ -932,7 +1019,7 @@ impl SortKeyMapper<SegmentPartialHit> for HitSortingMapper {
pub(crate) struct IncrementalCollector {
inner: QuickwitCollector,
top_k_hits: TopK<PartialHit, PartialHitSortingKey, HitSortingMapper>,
intermediate_aggregation_results: Vec<Vec<u8>>,
incremental_aggregation: QuickwitIncrementalAggregations,
num_hits: u64,
failed_splits: Vec<SplitSearchError>,
num_attempted_splits: u64,
Expand All @@ -941,20 +1028,25 @@ pub(crate) struct IncrementalCollector {
impl IncrementalCollector {
/// Create a new incremental collector
pub(crate) fn new(inner: QuickwitCollector) -> Self {
let incremental_aggregation = inner
.aggregation
.as_ref()
.map(QuickwitAggregations::maybe_incremental_aggregator)
.unwrap_or(QuickwitIncrementalAggregations::NoAggregation);
let (order1, order2) = inner.sort_by.sort_orders();
let sort_key_mapper = HitSortingMapper { order1, order2 };
IncrementalCollector {
top_k_hits: TopK::new(inner.max_hits + inner.start_offset, sort_key_mapper),
inner,
intermediate_aggregation_results: Vec::new(),
incremental_aggregation,
num_hits: 0,
failed_splits: Vec::new(),
num_attempted_splits: 0,
}
}

/// Merge one search result with the current state
pub(crate) fn add_split(&mut self, leaf_response: LeafSearchResponse) {
pub(crate) fn add_split(&mut self, leaf_response: LeafSearchResponse) -> tantivy::Result<()> {
let LeafSearchResponse {
num_hits,
partial_hits,
Expand All @@ -968,9 +1060,10 @@ impl IncrementalCollector {
self.failed_splits.extend(failed_splits);
self.num_attempted_splits += num_attempted_splits;
if let Some(intermediate_aggregation_result) = intermediate_aggregation_result {
self.intermediate_aggregation_results
.push(intermediate_aggregation_result);
self.incremental_aggregation
.add(intermediate_aggregation_result)?;
}
Ok(())
}

/// Add a failed split to the state
Expand All @@ -981,22 +1074,24 @@ impl IncrementalCollector {
/// Get the worst top-hit. Can be used to skip splits if they can't possibly do better.
///
/// Only returns a result if enough hits were recorded already.
pub(crate) fn peek_worst_hit(&self) -> Option<&PartialHit> {
pub(crate) fn peek_worst_hit(&self) -> Option<Cow<PartialHit>> {
if self.top_k_hits.max_len() == 0 {
return self
.incremental_aggregation
.virtual_worst_hit()
.map(Cow::Owned);
}

if self.top_k_hits.at_capacity() {
self.top_k_hits.peek_worst()
self.top_k_hits.peek_worst().map(Cow::Borrowed)
} else {
None
}
}

/// Finalize the merge, creating a LeafSearchResponse.
pub(crate) fn finalize(self) -> tantivy::Result<LeafSearchResponse> {
let intermediate_aggregation_result = merge_intermediate_aggregation_result(
&self.inner.aggregation,
self.intermediate_aggregation_results
.iter()
.map(|vec| vec.as_slice()),
)?;
let intermediate_aggregation_result = self.incremental_aggregation.finalize()?;
let mut partial_hits = self.top_k_hits.finalize();
if self.inner.start_offset != 0 {
partial_hits.drain(0..self.inner.start_offset.min(partial_hits.len()));
Expand Down Expand Up @@ -1548,7 +1643,7 @@ mod tests {
.unwrap();

for split_result in results {
incremental_collector.add_split(split_result);
incremental_collector.add_split(split_result).unwrap();
}

let incremental_result = incremental_collector.finalize().unwrap();
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-search/src/find_trace_ids_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ fn merge_segment_fruits(mut segment_fruits: Vec<Vec<Span>>, num_traces: usize) -
let mut seen_trace_ids: FnvHashSet<TraceId> = FnvHashSet::default();

for span in segment_fruits.into_iter().kmerge() {
if !seen_trace_ids.contains(&span.trace_id) {
seen_trace_ids.insert(span.trace_id);
if seen_trace_ids.insert(span.trace_id) {
spans.push(span);

if spans.len() == num_traces {
Expand Down Expand Up @@ -327,7 +326,6 @@ impl SelectTraceIds {
return;
}
self.select();

for trace_id in self.select_workbench.drain(..self.num_traces) {
self.dedup_workbench
.insert(trace_id.term_ord, trace_id.span_timestamp);
Expand Down
55 changes: 43 additions & 12 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,27 @@ enum CanSplitDoBetter {
SplitIdHigher(Option<String>),
SplitTimestampHigher(Option<i64>),
SplitTimestampLower(Option<i64>),
FindTraceIdsAggregation(Option<i64>),
}

impl CanSplitDoBetter {
/// Create a CanSplitDoBetter from a SearchRequest
fn from_request(request: &SearchRequest, timestamp_field_name: Option<&str>) -> Self {
if request.max_hits == 0 {
if let Some(aggregation) = &request.aggregation_request {
if let Ok(crate::QuickwitAggregations::FindTraceIdsAggregation(
find_trace_aggregation,
)) = serde_json::from_str(aggregation)
{
if Some(find_trace_aggregation.span_timestamp_field_name.as_str())
== timestamp_field_name
{
return CanSplitDoBetter::FindTraceIdsAggregation(None);
}
}
}
}

if request.sort_fields.is_empty() {
CanSplitDoBetter::SplitIdHigher(None)
} else if let Some((sort_by, timestamp_field)) =
Expand Down Expand Up @@ -480,7 +496,8 @@ impl CanSplitDoBetter {
CanSplitDoBetter::SplitIdHigher(_) => {
splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id))
}
CanSplitDoBetter::SplitTimestampHigher(_) => {
CanSplitDoBetter::SplitTimestampHigher(_)
| CanSplitDoBetter::FindTraceIdsAggregation(_) => {
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end()))
}
CanSplitDoBetter::SplitTimestampLower(_) => {
Expand All @@ -495,11 +512,12 @@ impl CanSplitDoBetter {
fn can_be_better(&self, split: &SplitIdAndFooterOffsets) -> bool {
match self {
CanSplitDoBetter::SplitIdHigher(Some(split_id)) => split.split_id >= *split_id,
CanSplitDoBetter::SplitTimestampHigher(Some(timestamp)) => {
split.timestamp_end() > *timestamp
CanSplitDoBetter::SplitTimestampHigher(Some(timestamp))
| CanSplitDoBetter::FindTraceIdsAggregation(Some(timestamp)) => {
split.timestamp_end() >= *timestamp
}
CanSplitDoBetter::SplitTimestampLower(Some(timestamp)) => {
split.timestamp_start() < *timestamp
split.timestamp_start() <= *timestamp
}
_ => true,
}
Expand All @@ -512,7 +530,8 @@ impl CanSplitDoBetter {
match self {
CanSplitDoBetter::Uninformative => (),
CanSplitDoBetter::SplitIdHigher(split_id) => *split_id = Some(hit.split_id.clone()),
CanSplitDoBetter::SplitTimestampHigher(timestamp) => {
CanSplitDoBetter::SplitTimestampHigher(timestamp)
| CanSplitDoBetter::FindTraceIdsAggregation(timestamp) => {
if let Some(SortValue::I64(timestamp_ns)) = hit.sort_value() {
// if we get a timestamp of, says 1.5s, we need to check up to 2s to make
// sure we don't throw away something like 1.2s, so we should round up while
Expand Down Expand Up @@ -549,14 +568,15 @@ pub async fn leaf_search(
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5));

// In the future this should become `request.aggregation_request.is_some() ||
// request.exact_count == true`
let run_all_splits =
request.aggregation_request.is_some() || request.count_hits() == CountHits::CountAll;

let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
split_filter.optimize_split_order(&mut splits);

// if client wants full count, or we are doing an aggregation, we want to run every splits.
// However if the aggregation is the tracing aggregation, we don't actually need all splits.
let run_all_splits = request.count_hits() == CountHits::CountAll
|| (request.aggregation_request.is_some()
&& !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_)));

// Creates a collector which merges responses into one
let merge_collector =
make_merge_collector(&request, &searcher_context.get_aggregation_limits())?;
Expand Down Expand Up @@ -663,15 +683,26 @@ async fn leaf_search_single_split_wrapper(

let mut locked_incremental_merge_collector = incremental_merge_collector.lock().unwrap();
match leaf_search_single_split_res {
Ok(split_search_res) => locked_incremental_merge_collector.add_split(split_search_res),
Ok(split_search_res) => {
if let Err(err) = locked_incremental_merge_collector.add_split(split_search_res) {
locked_incremental_merge_collector.add_failed_split(SplitSearchError {
split_id: split.split_id.clone(),
error: format!("Error parsing aggregation result: {err}"),
retryable_error: true,
});
}
}
Err(err) => locked_incremental_merge_collector.add_failed_split(SplitSearchError {
split_id: split.split_id.clone(),
error: format!("{err}"),
retryable_error: true,
}),
}
if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() {
split_filter.lock().unwrap().record_new_worst_hit(last_hit);
split_filter
.lock()
.unwrap()
.record_new_worst_hit(last_hit.as_ref());
}
}

Expand Down

0 comments on commit 778e7e3

Please sign in to comment.