Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TTL not really work #55

Open
stalkerg opened this issue Jul 29, 2023 · 10 comments
Open

TTL not really work #55

stalkerg opened this issue Jul 29, 2023 · 10 comments

Comments

@stalkerg
Copy link

If I insert with ttl 10000 items, with 60sec TTL in my on_evict in a callback, I will see only ~1000 evict items after 60sec.
During checking len() of such cache, it also shows ~9000 items.
I suppose the cleaner process is not working correctly, and there is something with the TTL map.

@al8n
Copy link
Owner

al8n commented Jul 29, 2023

Hi, could you provide some code to help me reproduce your situation?

@stalkerg
Copy link
Author

Yeah, sorry about the issue without repro. I will try to make a small example.

@stalkerg
Copy link
Author

stalkerg commented Aug 2, 2023

@al8n Okey this is repro:

use tokio::time::{sleep, Duration};
use stretto::AsyncCache;


#[tokio::main]
async fn main() {
    let cache: AsyncCache<String, String> = AsyncCache::new(12960, 1e6 as i64, tokio::spawn).unwrap();

    for i in 0..10000 {
        cache.insert_with_ttl(
            format!("key{}", i),
            format!("value{}", i),
            1,
            Duration::from_secs(60),
        )
        .await;
        sleep(Duration::from_millis(1)).await;
    }

    cache.wait().await.unwrap();

    println!("Current size: {}", cache.len());
    sleep(Duration::from_secs(100)).await;
    println!("New size: {}", cache.len());
}

on my machine, I got something like this:

Current size: 10000
New size: 4824

If I have no sleep(Duration::from_millis(1)).await; during insert, it's working as expected. I use this sleep to simulate natural load behavior.

Regards,

@stalkerg
Copy link
Author

stalkerg commented Aug 4, 2023

I hope you can reproduce my issue.

@stalkerg
Copy link
Author

@al8n, maybe next week I can look into it, but I suppose it's a bug.

@stalkerg
Copy link
Author

stalkerg commented Aug 18, 2023

Okey, I found why https://github.com/al8n/stretto/blob/main/src/store.rs#L285C29-L285C40 you have no guarantee what try_cleanup_async will be invoked strictly each second.
I tested it, and just print the current bucket and the current attempt to cleanup - we time to the time skip seconds.

@stalkerg
Copy link
Author

stalkerg commented Aug 18, 2023

Okey, this one is working, but I am not sure how it's efficient:

diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..1b7c36a 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -3,6 +3,7 @@ use std::collections::{hash_map::RandomState, HashMap};
 use std::hash::BuildHasher;
 use std::ops::{Deref, DerefMut};
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use itertools::Itertools;
 
 use crate::CacheError;
 
@@ -200,11 +201,24 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
 
     pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
         let bucket_num = cleanup_bucket(now);
-        Ok(self
-            .buckets
-            .write()
-            .remove(&bucket_num)
-            .map(|bucket| bucket.map))
+        let bucket_keys: Vec<i64> = self.buckets.read().keys().sorted().cloned().collect();
+        // println!("try_cleanup bucket_num: {} buckets:{:#?}", bucket_num, bucket_keys);
+        let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+        for map in bucket_keys
+            .iter()
+            .filter(|key| **key < bucket_num)
+            .map(|key| {
+                self
+                .buckets
+                .write()
+                .remove(key)
+                .map(|bucket| bucket.map)
+            }) {
+                if map.is_some() {
+                    ret_map.extend(map.unwrap().iter());
+                }
+        }
+        Ok(Some(ret_map))
     }
 
     pub fn hasher(&self) -> S {

will be better to use BTreeMap and range to fast filtering over the index.

@stalkerg
Copy link
Author

Okey option with btree and range:

diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..e72ed4d 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -1,5 +1,5 @@
 use parking_lot::RwLock;
-use std::collections::{hash_map::RandomState, HashMap};
+use std::collections::{hash_map::RandomState, HashMap, BTreeMap};
 use std::hash::BuildHasher;
 use std::ops::{Deref, DerefMut};
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -100,7 +100,7 @@ impl<S: BuildHasher> DerefMut for Bucket<S> {
 
 #[derive(Debug)]
 pub(crate) struct ExpirationMap<S = RandomState> {
-    buckets: RwLock<HashMap<i64, Bucket<S>, S>>,
+    buckets: RwLock<BTreeMap<i64, Bucket<S>>>,
     hasher: S,
 }
 
@@ -108,7 +108,7 @@ impl Default for ExpirationMap {
     fn default() -> Self {
         let hasher = RandomState::default();
         Self {
-            buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+            buckets: RwLock::new(BTreeMap::new()),
             hasher,
         }
     }
@@ -123,7 +123,7 @@ impl ExpirationMap {
 impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
     pub(crate) fn with_hasher(hasher: S) -> ExpirationMap<S> {
         ExpirationMap {
-            buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+            buckets: RwLock::new(BTreeMap::new()),
             hasher,
         }
     }
@@ -200,11 +200,22 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
 
     pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
         let bucket_num = cleanup_bucket(now);
-        Ok(self
-            .buckets
-            .write()
-            .remove(&bucket_num)
-            .map(|bucket| bucket.map))
+        let bucket_keys: Vec<i64> = self.buckets.read().range(..bucket_num).map(|(key, _)| *key).collect();
+        let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+        for map in bucket_keys
+            .iter()
+            .map(|key| {
+                self
+                .buckets
+                .write()
+                .remove(key)
+                .map(|bucket| bucket.map)
+            }) {
+                if map.is_some() {
+                    ret_map.extend(map.unwrap().iter());
+                }
+        }
+        Ok(Some(ret_map))
     }
 
     pub fn hasher(&self) -> S {

@al8n
Copy link
Owner

al8n commented Aug 19, 2023

Sorry for the late response, thanks! Would you mind open a PR and let us see if we can merge it?

@stalkerg
Copy link
Author

I don't think I will have time next few weeks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants