Skip to content

Commit

Permalink
workload: add zipfian latest distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Jul 31, 2024
1 parent 6cd96b5 commit f6ee725
Showing 1 changed file with 89 additions and 1 deletion.
90 changes: 89 additions & 1 deletion src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum KeyDistribution {
Shuffle(Vec<usize>),
Uniform(Uniform<usize>),
Zipfian(ZipfDistribution, usize),
ZipfianLatest(ZipfDistribution, usize, usize),
}

/// Key generator that takes care of synthetic keys based on a distribution. Currently it only
Expand Down Expand Up @@ -106,6 +107,16 @@ impl KeyGenerator {
Self::new(len, min, max, dist)
}

fn new_zipfian_latest(len: usize, min: usize, max: usize, theta: f64, hotspot: f64) -> Self {
let hotspot = (hotspot * (max - min - 1) as f64) as usize; // approx location for discrete keys
let dist = KeyDistribution::ZipfianLatest(
ZipfDistribution::new(max - min, theta).unwrap(),
hotspot,
0,
);
Self::new(len, min, max, dist)
}

fn next(&mut self, rng: &mut impl Rng) -> Box<[u8]> {
let k = match self.dist {
KeyDistribution::Increment => self.serial % self.keyspace,
Expand All @@ -115,6 +126,12 @@ impl KeyGenerator {
// zipf starts at 1
(dist.sample(rng) - 1 + hotspot) % self.keyspace
}
KeyDistribution::ZipfianLatest(dist, hotspot, ref mut latest) => {
// just like zipfian, but always store the latest key
let sample = dist.sample(rng) - 1;
*latest = sample;
(sample + hotspot) % self.keyspace
}
} + self.min;
self.serial += 1;
assert!(k < self.max);
Expand Down Expand Up @@ -168,6 +185,7 @@ pub struct WorkloadOpt {
/// each range/thread.
/// - "uniform": uniformly random keys from `kmin` to `kmax`.
/// - "zipfian": random keys from `kmin` to `kmax` following Zipfian distribution.
/// - "latest": just like Zipfian but the hotspot is the latest key written to the store.
pub dist: String,

/// The theta parameter for Zipfian distribution. (Optional, default 1.0)
Expand Down Expand Up @@ -243,6 +261,11 @@ impl Workload {
let hotspot = opt.zipf_hotspot.unwrap_or(0.0f64);
KeyGenerator::new_zipfian(klen, kmin, kmax, theta, hotspot)
}
"latest" => {
let theta = opt.zipf_theta.unwrap_or(1.0f64);
let hotspot = opt.zipf_hotspot.unwrap_or(0.0f64);
KeyGenerator::new_zipfian_latest(klen, kmin, kmax, theta, hotspot)
}
_ => {
panic!("invalid key distribution: {}", opt.dist);
}
Expand All @@ -269,6 +292,11 @@ impl Workload {
let key = self.kgen.next(rng);
match self.mix.next(rng) {
OperationType::Set => {
// special case for "latest" distribution: if the generated request is a `SET`,
// update the hotspot to the latest generated key.
if let KeyDistribution::ZipfianLatest(_, ref mut hotspot, latest) = self.kgen.dist {
*hotspot = latest;
}
let value = vec![0u8; self.vlen].into_boxed_slice();
Operation::Set { key, value }
}
Expand Down Expand Up @@ -402,6 +430,8 @@ mod tests {
#[test]
fn keygen_zipfian_hotspot() {
let mut rng = rand::thread_rng();

// hotspot is the middle key
let mut dist: HashMap<Box<[u8]>, u64> = HashMap::new();
let mut kgen = KeyGenerator::new_zipfian(8, 0, 9, 1.0, 0.5);
for _ in 0..1000000 {
Expand All @@ -410,11 +440,24 @@ mod tests {
}
let mut freq: Vec<(Box<[u8]>, u64)> = dist.iter().map(|e| (e.0.clone(), *e.1)).collect();
freq.sort_by_key(|c| c.0.clone());
// 0 - 9 in total 9 keys, [0, 1, 2, 3, 4, 5, 6, 7, 8], and 4 is the middle one
let p1 = freq[4].1 as f64 / freq[5].1 as f64;
assert!(p1 > 1.9 && p1 < 2.0, "zipf p1: {}", p1);
let p2 = freq[5].1 as f64 / freq[6].1 as f64;
assert!(p2 > 1.45 && p2 < 1.55, "zipf p2: {}", p2);

// hotspot is the last key
let mut dist: HashMap<Box<[u8]>, u64> = HashMap::new();
let mut kgen = KeyGenerator::new_zipfian(8, 0, 9, 1.0, 1.0);
for _ in 0..1000000 {
let k = kgen.next(&mut rng);
dist.entry(k).and_modify(|c| *c += 1).or_insert(0);
}
let mut freq: Vec<(Box<[u8]>, u64)> = dist.iter().map(|e| (e.0.clone(), *e.1)).collect();
freq.sort_by_key(|c| c.0.clone());
let p1 = freq[8].1 as f64 / freq[0].1 as f64;
assert!(p1 > 1.9 && p1 < 2.0, "zipf p1: {}", p1);
let p2 = freq[0].1 as f64 / freq[1].1 as f64;
assert!(p2 > 1.45 && p2 < 1.55, "zipf p2: {}", p2);
}

#[test]
Expand Down Expand Up @@ -599,6 +642,51 @@ mod tests {
test(&opt);
}

#[test]
fn workload_keygen_zipfian_latest() {
let opt = WorkloadOpt {
set_perc: 5,
get_perc: 95,
del_perc: 0,
klen: Some(16),
vlen: Some(100),
dist: "latest".to_string(),
kmin: Some(10000),
kmax: Some(22347),
zipf_theta: None,
zipf_hotspot: None,
};

let mut workload = Workload::new(&opt, None);
let mut rng = rand::thread_rng();
assert!(matches!(
workload.kgen.dist,
KeyDistribution::ZipfianLatest(_, 0, 0)
));
let mut dist: HashSet<usize> = HashSet::new();
let mut set_count = 0;
for _ in 0..1000000 {
let KeyDistribution::ZipfianLatest(_, this_hotspot, _) = workload.kgen.dist else {
panic!();
};
dist.insert(this_hotspot);
let op = workload.next(&mut rng);
if let Operation::Set { key, value } = op {
assert_eq!(key.len(), 16);
assert_eq!(value.len(), 100);
let KeyDistribution::ZipfianLatest(_, hotspot, latest) = workload.kgen.dist else {
panic!();
};
assert_eq!(hotspot, latest);
set_count += 1;
}
}
// approx. proportion of set should be ~5%.
assert!(set_count < 60000);
// and all the observed hotspot must be way less than that
assert!(dist.len() < set_count);
}

#[test]
fn workload_uniform_write_intensive() {
let opt = WorkloadOpt {
Expand Down

0 comments on commit f6ee725

Please sign in to comment.