Skip to content

Commit

Permalink
Merge branch 'main' into doc_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz authored Oct 2, 2024
2 parents 9759eb5 + 2dcc696 commit c168aa6
Show file tree
Hide file tree
Showing 47 changed files with 2,133 additions and 430 deletions.
3 changes: 2 additions & 1 deletion docs/reference/es_compatible_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ If a parameter appears both as a query string parameter and in the JSON payload,
| `size` | `Integer` | Number of hits to return. | 10 |
| `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) |
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) |
| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` |

#### Supported Request Body parameters

Expand Down Expand Up @@ -301,7 +302,7 @@ GET api/v1/_elastic/_cat/indices

Use the [cat indices API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html) to get the following information for each index in a cluster:
* Shard count
* Document count
* Document count
* Deleted document count
* Primary store size
* Total store size
Expand Down
8 changes: 6 additions & 2 deletions monitoring/grafana/dashboards/searchers.json
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,10 @@
"text": "All",
"value": "$__all"
},
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"definition": "label_values(quickwit_search_leaf_searches_splits_total,instance)",
"hide": 0,
"includeAll": true,
Expand All @@ -861,16 +865,16 @@
"name": "instance",
"options": [],
"query": {
"qryType": 1,
"query": "label_values(quickwit_search_leaf_searches_splits_total,instance)",
"refId": "PrometheusVariableQueryEditor-VariableQuery"
"refId": "StandardVariableQuery"
},
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"sort": 0,
"type": "query"
}

]
},
"time": {
Expand Down
1 change: 0 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
format: BodyFormat::Json,
sort_by,
count_all: CountHits::CountAll,
allow_failed_splits: false,
};
let search_request =
search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
Expand Down
22 changes: 14 additions & 8 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String
let index_uid = indexing_task.index_uid();
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid());
let shard_ids_str = shard_ids.iter().sorted().join(",");
let value = format!("{index_uid}:{source_id}:{shard_ids_str}");
let fingerprint = indexing_task.params_fingerprint;
let value = format!("{index_uid}:{source_id}:{fingerprint}:{shard_ids_str}");
(key, value)
}

Expand All @@ -536,16 +537,20 @@ fn parse_shard_ids_str(shard_ids_str: &str) -> Vec<ShardId> {
fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask> {
let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?;
let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?;
let (source_uid, shards_str) = value.rsplit_once(':')?;
let (index_uid, source_id) = source_uid.rsplit_once(':')?;
let mut field_iterator = value.rsplitn(4, ':');
let shards_str = field_iterator.next()?;
let fingerprint_str = field_iterator.next()?;
let source_id = field_iterator.next()?;
let index_uid = field_iterator.next()?;
let params_fingerprint: u64 = fingerprint_str.parse().ok()?;
let index_uid = index_uid.parse().ok()?;
let shard_ids = parse_shard_ids_str(shards_str);
Some(IndexingTask {
index_uid: Some(index_uid),
source_id: source_id.to_string(),
pipeline_uid: Some(pipeline_uid),
shard_ids,
params_fingerprint: 0,
params_fingerprint,
})
}

Expand Down Expand Up @@ -1143,11 +1148,11 @@ mod tests {
let mut chitchat_guard = chitchat_handle.lock().await;
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS0"),
"my_index:00000000000000000000000000:my_source:1,3".to_string(),
"my_index:00000000000000000000000000:my_source:41:1,3".to_string(),
);
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"),
"my_index-00000000000000000000000000-my_source:3,5".to_string(),
"my_index-00000000000000000000000000-my_source:53:3,5".to_string(),
);
}
node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5))
Expand Down Expand Up @@ -1358,14 +1363,15 @@ mod tests {
#[test]
fn test_parse_chitchat_kv() {
assert!(
chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:1,3").is_none()
chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:42:1,3").is_none()
);
let task = super::chitchat_kv_to_indexing_task(
"indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0",
"my_index:00000000000000000000000000:my_source:00000000000000000001,\
"my_index:00000000000000000000000000:my_source:42:00000000000000000001,\
00000000000000000003",
)
.unwrap();
assert_eq!(task.params_fingerprint, 42);
assert_eq!(
task.pipeline_uid(),
PipelineUid::from_str("01BX5ZZKBKACTAV9WEVGEMMVS0").unwrap()
Expand Down
20 changes: 10 additions & 10 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,8 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<()> {
const NUM_PERMITS: u64 = 1;

if !model
.acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap_or(false)
{
return Ok(());
Expand All @@ -698,7 +696,7 @@ impl IngestController {
if successful_source_uids.is_empty() {
// We did not manage to create the shard.
// We can release our permit.
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Up);
warn!(
index_uid=%source_uid.index_uid,
source_id=%source_uid.source_id,
Expand All @@ -722,7 +720,7 @@ impl IngestController {
source_id=%source_uid.source_id,
"scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}"
);
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Up);
Err(metastore_error)
}
}
Expand Down Expand Up @@ -860,10 +858,12 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<()> {
const NUM_PERMITS: u64 = 1;
if shard_stats.num_open_shards == 0 {
return Ok(());
}

if !model
.acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap_or(false)
{
return Ok(());
Expand All @@ -876,12 +876,12 @@ impl IngestController {
"scaling down number of shards to {new_num_open_shards}"
);
let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
};
info!("scaling down shard {shard_id} from {leader_id}");
let Some(ingester) = self.ingester_pool.get(&leader_id) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
};
let shard_pkeys = vec![ShardPKey {
Expand All @@ -896,7 +896,7 @@ impl IngestController {
.await
{
warn!("failed to scale down number of shards: {error}");
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
}
model.close_shards(&source_uid, &[shard_id]);
Expand Down
12 changes: 3 additions & 9 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,25 +378,19 @@ impl ControlPlaneModel {
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) -> Option<bool> {
self.shard_table
.acquire_scaling_permits(source_uid, scaling_mode, num_permits)
.acquire_scaling_permits(source_uid, scaling_mode)
}

pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
self.shard_table
.drain_scaling_permits(source_uid, scaling_mode)
}

pub fn release_scaling_permits(
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) {
pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
self.shard_table
.release_scaling_permits(source_uid, scaling_mode, num_permits)
.release_scaling_permits(source_uid, scaling_mode)
}
}

Expand Down
28 changes: 11 additions & 17 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,13 @@ impl ShardTable {
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) -> Option<bool> {
let table_entry = self.table_entries.get_mut(source_uid)?;
let scaling_rate_limiter = match scaling_mode {
ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter,
ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter,
};
Some(scaling_rate_limiter.acquire(num_permits))
Some(scaling_rate_limiter.acquire(1))
}

pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
Expand All @@ -564,18 +563,13 @@ impl ShardTable {
}
}

pub fn release_scaling_permits(
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) {
pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
if let Some(table_entry) = self.table_entries.get_mut(source_uid) {
let scaling_rate_limiter = match scaling_mode {
ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter,
ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter,
};
scaling_rate_limiter.release(num_permits);
scaling_rate_limiter.release(1);
}
}
}
Expand Down Expand Up @@ -1058,7 +1052,7 @@ mod tests {
source_id: source_id.clone(),
};
assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.is_none());

shard_table.add_source(&index_uid, &source_id);
Expand All @@ -1071,7 +1065,7 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap());

let new_available_permits = shard_table
Expand All @@ -1096,7 +1090,7 @@ mod tests {
source_id: source_id.clone(),
};
assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.is_none());

shard_table.add_source(&index_uid, &source_id);
Expand All @@ -1109,7 +1103,7 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap());

let new_available_permits = shard_table
Expand Down Expand Up @@ -1143,10 +1137,10 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap());

shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1);
shard_table.release_scaling_permits(&source_uid, ScalingMode::Up);

let new_available_permits = shard_table
.table_entries
Expand Down Expand Up @@ -1179,10 +1173,10 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap());

shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1);
shard_table.release_scaling_permits(&source_uid, ScalingMode::Down);

let new_available_permits = shard_table
.table_entries
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-datetime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
itertools = { workspace = true }
ouroboros = "0.18.0"
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = { workspace = true }
Expand Down
Loading

0 comments on commit c168aa6

Please sign in to comment.