Skip to content

Commit

Permalink
Added Scroll API
Browse files Browse the repository at this point in the history
Closes #3551
  • Loading branch information
fulmicoton committed Jul 25, 2023
1 parent 3d45a87 commit 8280d2a
Show file tree
Hide file tree
Showing 28 changed files with 1,538 additions and 194 deletions.
66 changes: 66 additions & 0 deletions docs/internals/scroll.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Scroll API

The scroll API has been implemented to offer compatibility with ElasticSearch.
The API and the implementation are quirky and are detailed in this document.

## API description

You can find information about the scroll API here.
https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#scroll-search-results
https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html

The user runs a regular search request with a `scroll` param.
The search result then contains the normal response, but a `_scroll` property is added to the search body.

That id is then meant to be sent to a scroll rest API.
This API successive calls will then return pages incrementally.

## Quirk and difficulty.

The scrolled results should be consistent with the state of the original index.
For this reason we need to capture the state of the index at the point of the original request.

The elasticsearch API is needlessly broken as it returns the same scroll_id over and over.
If a network error happens between the client and the server at page N, there is no way for the client to ask the reemission of page N.
Page N+1 will be returned on the next call.


## Implementation

The scroll context contains:
- the detail about the original query (we need to be able to reemit paginated queries)
- the list of split metadata used for the query
- a cached list of partial docs (= not the doc content, just its address and its score) to avoid
- the total number of results, in order to append that information to our response.
searching at every single scroll requests.

We use a simple leaderless KV store to keep the state required to run the scroll API.
We generate a scroll ULID and use it to get a list of the servers with the best affinity according
to rendez vous hashing. We then go through them in order and attempt to put that key on up to 2 servers.
Failures for these PUTs are silent.

For each call to scroll, one of two things can happen:
- the partial docs for the page requested is in the partial doc cache. We just run the fetch_docs phase,
and update the context with the `start_offset`.
- the partial docs for the page request are not in the partial doc cache. We then run a new search query.

We attempt to fetch `SCROLL_CACHE_CAPACITY` in order to fill the partial doc address cache for subsequent calls.

# A strange `scroll_id`.

For more robustness, the scroll id is the concatenation of the
- ULID: used as the address for the search context.
- the start_offset.

The idea here is that if that if the put request failed, we can still return the right results even if we have an obsolete version of the `ScrollContext`.
We indeed take the max of the start_offset supplied in the `scroll_id` and present in the `ScrollContext`.

# Quickwit specific quirks

Our state is pretty volatile. Some scrolls may end up being broken if we were to remove 2 servers within 30mn.

The scroll lifetime does not extend the life of a scroll context.
We do not anything to prevent splits from being GCed and only rely on the grace period to make sure this does not happen.

The ES API does not always updates the `_scroll_id`. It does not seem to change in the different calls.
A misimplemented client might therefore appear to work correctly on one shard with elasticsearch and not work on quickwit.
13 changes: 13 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ tracing-subscriber = { version = "0.3.16", features = [
"std",
"env-filter",
] }
ttl_cache = "0.5"
typetag = "0.2"
ulid = "1.0"
username = "0.2"
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod pubsub;
pub mod rand;
pub mod rendezvous_hasher;
pub mod runtimes;
pub mod shared_consts;
pub mod sorted_iter;

pub mod stream_utils;
Expand Down
30 changes: 30 additions & 0 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::time::Duration;

/// We cannot safely delete splits right away as a:
/// - in-flight queries could actually have selected this split,
/// - scroll queries may also have a point in time on these splits.
///
/// We deal this probably by introducing a grace period. A split is first marked as delete,
/// and hence won't be selected for search. After a few minutes, once it reasonably safe to assume
/// that all queries involving this split have terminated, we effectively delete the split.
/// This duration is controlled by `DELETION_GRACE_PERIOD`.
pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32 min
10 changes: 2 additions & 8 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use async_trait::async_trait;
use futures::{stream, StreamExt};
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, Handler};
use quickwit_common::shared_consts::DELETION_GRACE_PERIOD;
use quickwit_metastore::Metastore;
use quickwit_storage::StorageResolver;
use serde::Serialize;
Expand All @@ -40,14 +41,6 @@ const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
/// the grace period strategy should do the job for the moment.
const STAGED_GRACE_PERIOD: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours

/// We cannot safely delete splits right away as a in-flight queries could actually
/// have selected this split.
/// We deal this probably by introducing a grace period. A split is first marked as delete,
/// and hence won't be selected for search. After a few minutes, once it reasonably safe to assume
/// that all queries involving this split have terminated, we effectively delete the split.
/// This duration is controlled by `DELETION_GRACE_PERIOD`.
const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(120); // 2 min

const MAX_CONCURRENT_GC_TASKS: usize = if cfg!(test) { 2 } else { 10 };

#[derive(Clone, Debug, Default, Serialize)]
Expand Down Expand Up @@ -210,6 +203,7 @@ mod tests {
use std::path::Path;

use quickwit_actors::Universe;
use quickwit_common::shared_consts::DELETION_GRACE_PERIOD;
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, MetastoreError, MockMetastore, Split, SplitMetadata,
SplitState,
Expand Down
94 changes: 70 additions & 24 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,47 @@ service SearchService {
// It is also in charge of merging back the results.
rpc RootListTerms(ListTermsRequest) returns (ListTermsResponse);

// Perform a leaf list terms on a given set of splits.
// Performs a leaf list terms on a given set of splits.
//
// It is like a regular list term except that:
// - the node should perform the listing locally instead of dispatching
// it to other nodes.
// - it should be applied on the given subset of splits
rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse);

// Performs a scroll request.
rpc Scroll(ScrollRequest) returns (SearchResponse);

// gRPC request used to store a key in the local storage of the targetted node.
// This RPC is used in the mini distributed immutable KV store embedded in quickwit.
rpc PutKV(PutKVRequest) returns (PutKVResponse);

// Gets a key from the local storage of the targetted node.
// This RPC is used in the mini distributed immutable KV store embedded in quickwit.
rpc GetKV(GetKVRequest) returns (GetKVResponse);
}

/// Scroll Request
message ScrollRequest {
/// The `scroll_id` is the given in the response of a search request including a scroll.
string scroll_id = 1;
optional uint32 scroll_ttl_secs = 2;
}

message PutKVRequest {
bytes key = 1;
bytes payload = 2;
uint32 ttl_secs = 3;
}

message PutKVResponse {}

message GetKVRequest {
bytes key = 1;
}

message GetKVResponse {
optional bytes payload = 1;
}

// -- Search -------------------
Expand Down Expand Up @@ -109,6 +143,11 @@ message SearchRequest {

// Optional sort by one or more fields (limited to 2 at the moment).
repeated SortField sort_fields = 14;

// If set, the search response will include a search id
// that will make it possible to paginate through the results
// in a consistent manner.
optional uint32 scroll_ttl_secs = 15;
}

message SortField {
Expand All @@ -117,9 +156,9 @@ message SortField {
}

enum SortOrder {
/// Ascending order.
// Ascending order.
ASC = 0;
/// Descending order.
// Descending order.
DESC = 1; //< This will be the default value;
}

Expand All @@ -138,6 +177,8 @@ message SearchResponse {
// Serialized aggregation response
optional string aggregation = 5;

// Scroll Id (only set if scroll_secs was set in the request)
optional string scroll_id = 6;
}

message SplitSearchError {
Expand Down Expand Up @@ -183,19 +224,19 @@ message SplitIdAndFooterOffsets {
optional int64 timestamp_end = 5;
}

/// Hits returned by a FetchDocRequest.
///
/// The json that is joined is the raw tantivy json doc.
/// It is very different from a quickwit json doc.
///
/// For instance:
/// - it may contain a _source and a _dynamic field.
/// - since tantivy has no notion of cardinality,
/// all fields is are arrays.
/// - since tantivy has no notion of object, the object is
/// flattened by concatenating the path to the root.
///
/// See `quickwit_search::convert_leaf_hit`
// Hits returned by a FetchDocRequest.
//
// The json that is joined is the raw tantivy json doc.
// It is very different from a quickwit json doc.
//
// For instance:
// - it may contain a _source and a _dynamic field.
// - since tantivy has no notion of cardinality,
// all fields is are arrays.
// - since tantivy has no notion of object, the object is
// flattened by concatenating the path to the root.
//
// See `quickwit_search::convert_leaf_hit`
message LeafHit {
// The actual content of the hit/
string leaf_json = 1;
Expand Down Expand Up @@ -280,6 +321,11 @@ message LeafSearchResponse {
optional bytes intermediate_aggregation_result = 6;
}

message SnippetRequest {
repeated string snippet_fields = 1;
string query_ast_resolved = 2;
}

message FetchDocsRequest {
// Request fetching the content of a given list of partial_hits.
repeated PartialHit partial_hits = 1;
Expand All @@ -296,12 +342,12 @@ message FetchDocsRequest {
// split files.
string index_uri = 4;

// Search request. This is a perfect copy of the original search request,
// that was sent to root apart from the start_offset & max_hits params.
SearchRequest search_request = 5;
optional SnippetRequest snippet_request = 7;

// `DocMapper` as json serialized trait.
string doc_mapper = 6;

reserved 5;
}

message FetchDocsResponse {
Expand Down Expand Up @@ -372,11 +418,11 @@ message LeafListTermsResponse {
// -- Stream -------------------

enum OutputFormat {
/// Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180).
/// The delimiter is `,`.
// Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180).
// The delimiter is `,`.
CSV = 0; //< This will be the default value
/// Format data by row in ClickHouse binary format.
/// https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary
// Format data by row in ClickHouse binary format.
// https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary
CLICK_HOUSE_ROW_BINARY = 1;
}

Expand Down Expand Up @@ -409,7 +455,7 @@ message SearchStreamRequest {
optional string partition_by_field = 9;

// Fields to extract snippet on.
repeated string snippet_fields = 10;
repeated string snippet_fields = 10;
}

message LeafSearchStreamRequest {
Expand Down
Loading

0 comments on commit 8280d2a

Please sign in to comment.