Skip to content

Commit

Permalink
implement new sorting spec (#3799)
Browse files Browse the repository at this point in the history
* implement new sorting spec

* simplify top-k merging by reusing more code
  • Loading branch information
trinity-1686a authored Sep 7, 2023
1 parent 2409d2b commit 202348d
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 295 deletions.
4 changes: 0 additions & 4 deletions docs/internals/sorting.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ Quickwit can sort results based on fastfield values or score. This document disc
It also tries to describe optimizations that may be enabled (but are not necessarily implemente)
by this behavior.

Described below is the target behavior, which is *not* implemented right now, but will be shortly.

## Behavior

Sorting is controlled by the `sort_by` query parameter. It accepts a comma separated list of fields
Expand All @@ -32,8 +30,6 @@ TODO we could also say "it's not sorted" and add a special `_doc_id` for that. S

# Code

(The changes described here are currently part of quickwit#3545, which is an optimization PR. They
*should* be backported to a standalone PR to ease review and discussion).
A new structure TopK is introduced which is used both for in-split sorting and for merging of
results. It reduces the risks of inconsistencies between in-split and between-split behavior.
`SortOrder` gets new `compare` and `compare_opt` method which can be used to compare two values with
Expand Down
151 changes: 151 additions & 0 deletions quickwit/quickwit-common/src/binary_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,79 @@ impl<O: Ord, T> PartialEq for OrderItemPair<O, T> {

impl<O: Ord, T> Eq for OrderItemPair<O, T> {}

pub trait SortKeyMapper<Value> {
type Key;
fn get_sort_key(&self, value: &Value) -> Self::Key;
}

/// Progressively compute top-k.
pub struct TopK<T, O: Ord, S> {
heap: BinaryHeap<Reverse<OrderItemPair<O, T>>>,
sort_key_mapper: S,
k: usize,
}

impl<T, O, S> TopK<T, O, S>
where
O: Ord,
S: SortKeyMapper<T, Key = O>,
{
/// Create a new top-k computer.
pub fn new(k: usize, sort_key_mapper: S) -> Self {
TopK {
heap: BinaryHeap::with_capacity(k),
sort_key_mapper,
k,
}
}

/// Whether there are k element ready already.
pub fn at_capacity(&self) -> bool {
self.heap.len() >= self.k
}

/// Try to add new entries, if they are better than the current worst.
pub fn add_entries(&mut self, mut items: impl Iterator<Item = T>) {
if self.k == 0 {
return;
}
while !self.at_capacity() {
if let Some(item) = items.next() {
let order: O = self.sort_key_mapper.get_sort_key(&item);
self.heap.push(Reverse(OrderItemPair { order, item }));
} else {
return;
}
}

for item in items {
let mut head = self.heap.peek_mut().unwrap();
let order = self.sort_key_mapper.get_sort_key(&item);
if head.0.order < order {
*head = Reverse(OrderItemPair { order, item });
}
}
}

pub fn add_entry(&mut self, item: T) {
self.add_entries(std::iter::once(item))
}

/// Get a reference to the worst entry.
pub fn peek_worst(&self) -> Option<&T> {
self.heap.peek().map(|entry| &entry.0.item)
}

/// Get a Vec of sorted entries.
pub fn finalize(self) -> Vec<T> {
self.heap
.into_sorted_vec()
.into_iter()
.map(|order_item| order_item.0.item)
.collect()
}
}

#[cfg(test)]
mod tests {

Expand All @@ -136,4 +209,82 @@ mod tests {
let top_k: Vec<u32> = super::top_k(vec![].into_iter(), 4, |n| *n);
assert!(top_k.is_empty());
}

#[test]
fn test_incremental_top_k() {
struct Mapper(bool);
impl SortKeyMapper<u32> for Mapper {
type Key = u32;
fn get_sort_key(&self, value: &u32) -> u32 {
if self.0 {
u32::MAX - value
} else {
*value
}
}
}
let mut top_k = TopK::new(2, Mapper(false));
top_k.add_entries([1u32, 2, 3].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&2));
assert_eq!(&top_k.finalize(), &[3, 2]);

let mut top_k = TopK::new(2, Mapper(false));
top_k.add_entries([1u32].into_iter());
assert!(!top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&1));
top_k.add_entries([3].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&1));
top_k.add_entries([2].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&2));
assert_eq!(&top_k.finalize(), &[3, 2]);

let mut top_k = TopK::new(2, Mapper(true));
top_k.add_entries([1u32, 2, 3].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&2));
assert_eq!(&top_k.finalize(), &[1, 2]);

let mut top_k = TopK::new(2, Mapper(true));
top_k.add_entries([1u32].into_iter());
assert!(!top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&1));
top_k.add_entries([3].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&3));
top_k.add_entries([2].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&2));
assert_eq!(&top_k.finalize(), &[1, 2]);

let mut top_k = TopK::new(4, Mapper(false));
top_k.add_entries([2u32, 1, 2].into_iter());
assert!(!top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&1));
assert_eq!(&top_k.finalize(), &[2, 2, 1]);

let mut top_k = TopK::new(4, Mapper(false));
top_k.add_entries([2u32].into_iter());
assert!(!top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&2));
top_k.add_entries([1].into_iter());
assert!(!top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&1));
top_k.add_entries([2].into_iter());
assert!(!top_k.at_capacity());
assert_eq!(top_k.peek_worst(), Some(&1));
assert_eq!(&top_k.finalize(), &[2, 2, 1]);

let mut top_k = TopK::<u32, u32, _>::new(4, Mapper(false));
top_k.add_entries([].into_iter());
assert!(top_k.finalize().is_empty());

let mut top_k = TopK::new(0, Mapper(false));
top_k.add_entries([1u32, 2, 3].into_iter());
assert!(top_k.at_capacity());
assert_eq!(top_k.peek_worst(), None);
assert!(top_k.finalize().is_empty());
}
}
20 changes: 20 additions & 0 deletions quickwit/quickwit-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#![deny(clippy::disallowed_methods)]
#![allow(rustdoc::invalid_html_tags)]

use std::cmp::Ordering;
use std::fmt;

use ::opentelemetry::global;
Expand Down Expand Up @@ -227,3 +228,22 @@ impl<E: fmt::Debug + ServiceError> ServiceError for quickwit_actors::AskError<E>
}
}
}

impl search::SortOrder {
pub fn compare_opt<T: Ord>(&self, this: &Option<T>, other: &Option<T>) -> Ordering {
match (this, other) {
(Some(this), Some(other)) => self.compare(this, other),
(Some(_), None) => Ordering::Greater,
(None, Some(_)) => Ordering::Less,
(None, None) => Ordering::Equal,
}
}

pub fn compare<T: Ord>(&self, this: &T, other: &T) -> Ordering {
if self == &search::SortOrder::Desc {
this.cmp(other)
} else {
other.cmp(this)
}
}
}
Loading

0 comments on commit 202348d

Please sign in to comment.