Skip to content

Commit

Permalink
avoid unneeded copy on touching but non-overlaping byte ranges (#4219)
Browse files Browse the repository at this point in the history
* avoid unneeded copy on touching but non-overlaping byte ranges
  • Loading branch information
trinity-1686a authored Dec 4, 2023
1 parent 1faf703 commit 187e2b7
Showing 1 changed file with 150 additions and 10 deletions.
160 changes: 150 additions & 10 deletions quickwit/quickwit-storage/src/cache/byte_range_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,23 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
let key = CacheKey::from_borrowed(tag, byte_range.start);
let (k, v) = if let Some((k, v)) = self.get_block(&key, byte_range.end) {
(k, v)
} else if let Some((k, v)) = self.merge_ranges(&key, byte_range.end) {
(k, v)
} else {
self.cache_counters.misses_num_items.inc();
return None;
};

let start = byte_range.start - k.range_start;
let end = byte_range.end - k.range_start;
let result = v.bytes.slice(start..end);

self.cache_counters.hits_num_items.inc();
self.cache_counters
.hits_num_bytes
.inc_by((end - start) as u64);

Some(v.bytes.slice(start..end))
Some(result)
}

fn put_slice(&mut self, tag: T::Owned, byte_range: Range<usize>, bytes: OwnedBytes) {
Expand All @@ -106,13 +109,13 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
return;
}

// try to find a block with which we overlap (and not just touch)
let start_key = CacheKey::from_borrowed(tag.borrow(), byte_range.start);
let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end);

let first_matching_block = self
.get_block(&start_key, byte_range.start)
.get_block(&start_key, byte_range.start + 1)
.map(|(k, _v)| k);

let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end - 1);
let last_matching_block = self.get_block(&end_key, byte_range.end).map(|(k, _v)| k);

if first_matching_block.is_some() && first_matching_block == last_matching_block {
Expand Down Expand Up @@ -140,15 +143,19 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
.unwrap_or(true);

let (final_range, final_bytes) = if can_drop_first && can_drop_last {
// if we are here, either ther was no overlapping block, or there was, but this buffer
// covers entirely every block it overlapped with. There is no merging to do.
(byte_range, bytes)
} else {
// if we are here, we have to do some merging

// first find the final buffer start and end position.
let start = if can_drop_first {
byte_range.start
} else {
// if no first, can_drop_first is true
overlapping.first().unwrap().start
};

let end = if can_drop_last {
byte_range.end
} else {
Expand All @@ -158,6 +165,8 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {

let mut buffer = Vec::with_capacity(end - start);

// if this buffer overlap, but does not contain the 1st buffer, copy the
// non-overlapping part at the start of the final buffer.
if !can_drop_first {
let first_range = overlapping.first().unwrap();
let key = CacheKey::from_borrowed(tag.borrow(), first_range.start);
Expand All @@ -167,8 +176,11 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
buffer.extend_from_slice(&block.bytes[..len]);
}

// copy the entire current buffer
buffer.extend_from_slice(&bytes);

// if this buffer overlap, but does not contain the last buffer, copy the
// non-overlapping part ad the end of the final buffer.
if !can_drop_last {
let last_range = overlapping.last().unwrap();
let key = CacheKey::from_borrowed(tag.borrow(), last_range.start);
Expand All @@ -178,6 +190,7 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
buffer.extend_from_slice(&block.bytes[start..]);
}

// sanity check, we copied as much as expected
debug_assert_eq!(end - start, buffer.len());

(start..end, OwnedBytes::new(buffer))
Expand All @@ -187,11 +200,14 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
// in the loop. It works with .get() instead of .remove() (?).
let mut key = CacheKey::from_owned(tag, 0);
for range in overlapping.into_iter() {
// remove every block with which we overlapped, including the 1st and last, as they
// were included as prefix/suffix to the final block.
key.range_start = range.start;
self.cache.remove(&key);
self.update_counter_drop_item(range.end - range.start);
}

// and finaly insert the newly added buffer
key.range_start = final_range.start;
let value = CacheValue {
range_end: final_range.end,
Expand All @@ -202,17 +218,90 @@ impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T> {
}

// Return a block that contain everything between query.range_start and range_end
fn get_block<'a, 'b: 'a>(
&'a self,
query: &CacheKey<'b, T>,
fn get_block<'a>(
&self,
query: &CacheKey<'a, T>,
range_end: usize,
) -> Option<(&CacheKey<'_, T>, &CacheValue)> {
) -> Option<(&CacheKey<'a, T>, &CacheValue)> {
self.cache
.range(..=query)
.next_back()
.filter(|(k, v)| k.tag == query.tag && range_end <= v.range_end)
}

/// Try to merge all blocks in the given range. Fails if some bytes were not already stored.
fn merge_ranges<'a>(
&mut self,
start: &CacheKey<'a, T>,
range_end: usize,
) -> Option<(&CacheKey<'a, T>, &CacheValue)> {
let own_key = |key: &CacheKey<T>| {
CacheKey::from_owned(T::borrow(&key.tag).to_owned(), key.range_start)
};

let first_block = self.get_block(start, start.range_start)?;

// query cache for all blocks which overlap with our query
let overlapping_blocks = self
.cache
.range(first_block.0..)
.take_while(|(k, _)| k.tag == start.tag && k.range_start <= range_end);

// verify there are no hole, and each range touches the next one. There can't be overlap
// due to how we fill our data-structure.
let mut last_block = first_block;
for (k, v) in overlapping_blocks.clone().skip(1) {
if k.range_start != last_block.1.range_end {
return None;
}

last_block = (k, v);
}
if last_block.1.range_end < range_end {
// we got a gap at the end
return None;
}

// we have everything we need. Merge every sub-buffer into a single large buffer.
let mut buffer = Vec::with_capacity(last_block.1.range_end - first_block.0.range_start);
let mut part_count = 0i64;
for (_, v) in overlapping_blocks {
part_count += 1;
buffer.extend_from_slice(&v.bytes);
}
assert_eq!(
buffer.len(),
(last_block.1.range_end - first_block.0.range_start)
);

let new_key = own_key(first_block.0);
let new_value = CacheValue {
range_end: last_block.1.range_end,
bytes: OwnedBytes::new(buffer),
};

// cleanup is sub-optimal, we'd need a BTreeMap::drain_range or something like that
let last_key = own_key(last_block.0);

// remove previous buffers from the cache
let blocks_to_remove: Vec<_> = self
.cache
.range(&new_key..=&last_key)
.map(|(k, _)| own_key(k))
.collect();
for block in blocks_to_remove {
self.cache.remove(&block);
}

// and insert the new merged buffer
self.cache.insert(new_key, new_value);

self.num_items -= (part_count - 1) as u64;
self.cache_counters.in_cache_count.sub(part_count - 1);

self.get_block(start, range_end)
}

fn update_counter_record_item(&mut self, num_bytes: usize) {
self.num_items += 1;
self.num_bytes += num_bytes as u64;
Expand Down Expand Up @@ -352,7 +441,9 @@ mod tests {
count_items(tagged_state)
})
.sum();
assert_eq!(cache.inner.lock().unwrap().num_items, expected_item_count as u64);
// in some case we have ranges touching each other, count_items count them
// as only one, but cache count them as 2.
assert!(cache.inner.lock().unwrap().num_items >= expected_item_count as u64);

let expected_byte_count = state.values()
.flatten()
Expand Down Expand Up @@ -391,4 +482,53 @@ mod tests {
})
.1
}

#[test]
fn test_byte_range_cache_doesnt_merge_unnecessarily() {
let cache = ByteRangeCache::with_infinite_capacity(&CACHE_METRICS_FOR_TESTS);

let key: std::path::PathBuf = "key".into();

cache.put_slice(
key.clone(),
0..5,
OwnedBytes::new((0..5).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
5..10,
OwnedBytes::new((5..10).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
10..15,
OwnedBytes::new((10..15).collect::<Vec<_>>()),
);
cache.put_slice(
key.clone(),
15..20,
OwnedBytes::new((15..20).collect::<Vec<_>>()),
);

{
let mutable_cache = cache.inner.lock().unwrap();
assert_eq!(mutable_cache.cache.len(), 4);
assert_eq!(mutable_cache.num_items, 4);
assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 4);
assert_eq!(mutable_cache.num_bytes, 20);
assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20);
}

cache.get_slice(&key, 3..12).unwrap();

{
// now they should've been merged, except the last one
let mutable_cache = cache.inner.lock().unwrap();
assert_eq!(mutable_cache.cache.len(), 2);
assert_eq!(mutable_cache.num_items, 2);
assert_eq!(mutable_cache.cache_counters.in_cache_count.get(), 2);
assert_eq!(mutable_cache.num_bytes, 20);
assert_eq!(mutable_cache.cache_counters.in_cache_num_bytes.get(), 20);
}
}
}

0 comments on commit 187e2b7

Please sign in to comment.