Skip to content

Commit

Permalink
Remove inappropiate unwrap and fix typo
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 19, 2024
1 parent 52d84d1 commit 5fb88ac
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ impl QueueSharedState {
shard_ids: re_acquired_shards,
publish_token: publish_token.to_string(),
})
.await
.unwrap();
.await?;
for shard in acquire_shard_resp.acquired_shards {
let partition_id = PartitionId::from(shard.shard_id().as_str());
let position = shard.publish_position_inclusive.unwrap_or_default();
Expand All @@ -195,17 +194,19 @@ pub async fn checkpoint_messages(
publish_token: &str,
messages: Vec<PreProcessedMessage>,
) -> anyhow::Result<Vec<(PreProcessedMessage, Position)>> {
let mut message_map =
BTreeMap::from_iter(messages.into_iter().map(|msg| (msg.partition_id(), msg)));
let mut message_map: BTreeMap<PartitionId, PreProcessedMessage> = messages
.into_iter()
.map(|msg| (msg.partition_id(), msg))
.collect();
let partition_ids = message_map.keys().cloned().collect();

let shards = shared_state
.acquire_partitions(publish_token, partition_ids)
.await?;

let mut result = Vec::with_capacity(shards.len());
for (position_id, position) in shards {
let content = message_map.remove(&position_id).context("Unexpected partition ID. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.")?;
for (partition_id, position) in shards {
let content = message_map.remove(&partition_id).context("Unexpected partition ID. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.")?;
result.push((content, position));
}

Expand Down Expand Up @@ -247,14 +248,14 @@ pub mod shared_state_for_tests {
let mut subresponses = Vec::with_capacity(request.subrequests.len());
for sub_req in request.subrequests {
let partition_id: PartitionId = sub_req.shard_id().to_string().into();
let req_token = sub_req.publish_token.unwrap();
let req_token = sub_req.publish_token();
let (token, position, update_timestamp) = inner_state_ref
.lock()
.unwrap()
.get(&partition_id)
.cloned()
.unwrap_or((
req_token.clone(),
req_token.to_string(),
Position::Beginning,
OffsetDateTime::now_utc().unix_timestamp(),
));
Expand Down

0 comments on commit 5fb88ac

Please sign in to comment.