Skip to content

Commit

Permalink
Replace shard_id: u64 with generic shard_id: ShardId(ByteString) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jan 16, 2024
1 parent 1cb5bc2 commit 9d0a223
Show file tree
Hide file tree
Showing 47 changed files with 1,393 additions and 1,070 deletions.
7 changes: 7 additions & 0 deletions quickwit/clippy.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
disallowed-methods = [
"std::path::Path::exists"
]
ignore-interior-mutability = [
"bytes::Bytes",
"bytestring::ByteString",
"quickwit_ingest::ShardInfo",
"quickwit_ingest::ShardInfos",
"quickwit_proto::types::ShardId",
]
63 changes: 32 additions & 31 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,35 +467,31 @@ pub(crate) fn set_indexing_tasks_in_node_state(

fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String) {
let IndexingTask {
pipeline_uid: _,
index_uid,
source_id,
shard_ids,
pipeline_uid: _,
} = indexing_task;
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid());
let shards_str = shard_ids.iter().sorted().join(",");
let value = format!("{index_uid}:{source_id}:{shards_str}");
let shard_ids_str = shard_ids.iter().sorted().join(",");
let value = format!("{index_uid}:{source_id}:{shard_ids_str}");
(key, value)
}

fn parse_shards_str(shards_str: &str) -> Option<Vec<ShardId>> {
if shards_str.is_empty() {
return Some(Vec::new());
}
let mut shard_ids = Vec::new();
for shard_str in shards_str.split(',') {
let shard_id = shard_str.parse::<u64>().ok()?;
shard_ids.push(shard_id);
}
Some(shard_ids)
fn parse_shard_ids_str(shard_ids_str: &str) -> Vec<ShardId> {
shard_ids_str
.split(',')
.filter(|shard_id_str| !shard_id_str.is_empty())
.map(ShardId::from)
.collect()
}

fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask> {
let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?;
let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?;
let (source_uid, shards_str) = value.rsplit_once(':')?;
let (index_uid, source_id) = source_uid.rsplit_once(':')?;
let shard_ids = parse_shards_str(shards_str)?;
let shard_ids = parse_shard_ids_str(shards_str);
Some(IndexingTask {
index_uid: index_uid.to_string(),
source_id: source_id.to_string(),
Expand Down Expand Up @@ -1095,7 +1091,7 @@ mod tests {
);
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"),
"my_index:uid:my_source:3a,5".to_string(),
"my_index-uid-my_source:3,5".to_string(),
);
}
node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5))
Expand Down Expand Up @@ -1211,7 +1207,7 @@ mod tests {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
}],
&mut node_state,
);
Expand All @@ -1221,7 +1217,7 @@ mod tests {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2, 3],
shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)],
}],
&mut node_state,
);
Expand All @@ -1231,13 +1227,13 @@ mod tests {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![3, 4],
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
},
],
&mut node_state,
Expand All @@ -1249,13 +1245,13 @@ mod tests {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
index_uid: "test:test2".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![3, 4],
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
},
],
&mut node_state,
Expand All @@ -1267,26 +1263,31 @@ mod tests {
pipeline_uid: Some(PipelineUid::from_u128(1u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source1".to_string(),
shard_ids: vec![1, 2],
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
},
IndexingTask {
pipeline_uid: Some(PipelineUid::from_u128(2u128)),
index_uid: "test:test1".to_string(),
source_id: "my-source2".to_string(),
shard_ids: vec![3, 4],
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
},
],
&mut node_state,
);
}

#[test]
fn test_parse_shards_str() {
assert!(parse_shards_str("").unwrap().is_empty());
assert_eq!(parse_shards_str("12").unwrap(), vec![12]);
assert_eq!(parse_shards_str("12,23").unwrap(), vec![12, 23]);
assert!(parse_shards_str("12,23,").is_none());
assert!(parse_shards_str("12,23a,32").is_none());
fn test_parse_shard_ids_str() {
assert!(parse_shard_ids_str("").is_empty());
assert!(parse_shard_ids_str(",").is_empty());
assert_eq!(
parse_shard_ids_str("00000000000000000012,"),
[ShardId::from(12)]
);
assert_eq!(
parse_shard_ids_str("00000000000000000012,00000000000000000023,"),
[ShardId::from(12), ShardId::from(23)]
);
}

#[test]
Expand All @@ -1296,7 +1297,7 @@ mod tests {
);
let task = super::chitchat_kv_to_indexing_task(
"indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0",
"my_index:uid:my_source:1,3",
"my_index:uid:my_source:00000000000000000001,00000000000000000003",
)
.unwrap();
assert_eq!(
Expand All @@ -1305,6 +1306,6 @@ mod tests {
);
assert_eq!(&task.index_uid, "my_index:uid");
assert_eq!(&task.source_id, "my_source");
assert_eq!(&task.shard_ids, &[1, 3]);
assert_eq!(&task.shard_ids, &[ShardId::from(1), ShardId::from(3)]);
}
}
47 changes: 23 additions & 24 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
return Ok(());
};
let known_shard_ids: FnvHashSet<ShardId> = shard_entries
.map(|shard_entry| shard_entry.shard_id)
.map(|shard_entry| shard_entry.shard_id())
.cloned()
.collect();
// let's identify the shard that have reached EOF but have not yet been removed.
let shard_ids_to_close: Vec<ShardId> = shard_positions_update
Expand Down Expand Up @@ -286,7 +287,6 @@ fn convert_metastore_error<T>(
| MetastoreError::NotFound(_) => true,
MetastoreError::Connection { .. }
| MetastoreError::Db { .. }
| MetastoreError::InconsistentControlPlaneState { .. }
| MetastoreError::Internal { .. }
| MetastoreError::Io { .. }
| MetastoreError::Unavailable(_) => false,
Expand Down Expand Up @@ -942,11 +942,10 @@ mod tests {
shards: vec![Shard {
index_uid: "test-index:0".to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 1,
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
..Default::default()
}],
next_shard_id: 2,
}];
let response = ListShardsResponse { subresponses };
Ok(response)
Expand Down Expand Up @@ -982,7 +981,7 @@ mod tests {
assert_eq!(subresponse.index_uid, "test-index:0");
assert_eq!(subresponse.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(subresponse.open_shards.len(), 1);
assert_eq!(subresponse.open_shards[0].shard_id, 1);
assert_eq!(subresponse.open_shards[0].shard_id(), ShardId::from(1));

universe.assert_quit().await;
}
Expand Down Expand Up @@ -1153,15 +1152,15 @@ mod tests {
let subrequest = &delete_shards_request.subrequests[0];
assert_eq!(subrequest.index_uid, index_uid_clone);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(&subrequest.shard_ids[..], &[17]);
assert_eq!(subrequest.shard_ids, [ShardId::from(17)]);
Ok(DeleteShardsResponse {})
},
);

let mut shard = Shard {
index_uid: index_0.index_uid.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: 17,
shard_id: Some(ShardId::from(17)),
leader_id: "test_node".to_string(),
..Default::default()
};
Expand All @@ -1175,7 +1174,6 @@ mod tests {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![shard],
next_shard_id: 18,
}],
};
Ok(list_shards_resp)
Expand All @@ -1200,7 +1198,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(17, Position::offset(1_000u64))],
shard_positions: vec![(ShardId::from(17), Position::offset(1_000u64))],
})
.await
.unwrap();
Expand All @@ -1216,14 +1214,14 @@ mod tests {
.get("indexer-node-1")
.unwrap();
assert_eq!(indexing_tasks.len(), 1);
assert_eq!(&indexing_tasks[0].shard_ids, &[17]);
assert_eq!(indexing_tasks[0].shard_ids, [ShardId::from(17)]);
let _ = client_inbox.drain_for_test();

// This update should trigger the deletion of the shard and a new indexing plan.
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid,
shard_positions: vec![(17, Position::eof(1_000u64))],
shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down Expand Up @@ -1288,7 +1286,7 @@ mod tests {
let subrequest = &delete_shards_request.subrequests[0];
assert_eq!(subrequest.index_uid, index_uid_clone);
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
assert_eq!(&subrequest.shard_ids[..], &[17]);
assert_eq!(subrequest.shard_ids, [ShardId::from(17)]);
Ok(DeleteShardsResponse {})
},
);
Expand All @@ -1301,7 +1299,6 @@ mod tests {
index_uid: index_uid_clone.to_string(),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shards: vec![],
next_shard_id: 18,
}],
};
Ok(list_shards_resp)
Expand All @@ -1326,7 +1323,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(17, Position::eof(1_000u64))],
shard_positions: vec![(ShardId::from(17), Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down Expand Up @@ -1377,14 +1374,13 @@ mod tests {
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
shard_id: 15,
shard_id: Some(ShardId::from(15)),
leader_id: "node1".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
publish_position_inclusive: None,
publish_token: None,
}],
next_shard_id: 18,
}],
};
Ok(list_shards_resp)
Expand All @@ -1394,10 +1390,12 @@ mod tests {
.expect_retain_shards()
.times(1)
.in_sequence(&mut seq)
.returning(|mut request| {
.returning(|request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert_eq!(&retain_shards_for_source.shard_ids, &[15]);
assert_eq!(
request.retain_shards_for_sources[0].shard_ids,
[ShardId::from(15)]
);
Ok(RetainShardsResponse {})
});

Expand Down Expand Up @@ -1454,10 +1452,12 @@ mod tests {
ingester_mock
.expect_retain_shards()
.times(2)
.returning(|mut request| {
.returning(|request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert_eq!(&retain_shards_for_source.shard_ids, &[15]);
assert_eq!(
request.retain_shards_for_sources[0].shard_ids,
[ShardId::from(15)]
);
Ok(RetainShardsResponse {})
});
ingester_pool.insert("node1".into(), ingester_mock.into());
Expand Down Expand Up @@ -1499,14 +1499,13 @@ mod tests {
shards: vec![Shard {
index_uid: index_uid_clone.to_string(),
source_id: source.source_id.to_string(),
shard_id: 15,
shard_id: Some(ShardId::from(15)),
leader_id: "node1".to_string(),
follower_id: None,
shard_state: ShardState::Open as i32,
publish_position_inclusive: None,
publish_token: None,
}],
next_shard_id: 18,
}],
};
Ok(list_shards_resp)
Expand Down
14 changes: 3 additions & 11 deletions quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,13 @@ impl PhysicalIndexingPlan {
for task in tasks.iter_mut() {
task.shard_ids.sort_unstable();
}
tasks.sort_by(|left, right| {
tasks.sort_unstable_by(|left, right| {
left.index_uid
.cmp(&right.index_uid)
.then_with(|| left.source_id.cmp(&right.source_id))
.then_with(|| {
left.shard_ids
.first()
.copied()
.cmp(&right.shard_ids.first().copied())
})
.then_with(|| left.pipeline_uid().cmp(&right.pipeline_uid()))
.then_with(|| left.shard_ids.first().cmp(&right.shard_ids.first()))
.then_with(|| left.pipeline_uid.cmp(&right.pipeline_uid))
});
for task in tasks {
task.shard_ids.sort();
}
}
}
}
Loading

0 comments on commit 9d0a223

Please sign in to comment.