Skip to content

Commit

Permalink
Review serialization of split maturity.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Jul 3, 2023
1 parent 2a498a3 commit df5b165
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 78 deletions.
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ mod tests {
partition_id,
num_merge_ops,
create_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
maturity: SplitMaturity::TimeToMaturity(Duration::from_secs(3600)),
maturity: SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(3600),
},
..Default::default()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy {
if split_num_docs >= self.split_num_docs_target {
return SplitMaturity::Mature;
}
SplitMaturity::TimeToMaturity(self.config.maturation_period)
SplitMaturity::MatureAfterPeriod {
period: self.config.maturation_period,
}
}

#[cfg(test)]
Expand Down Expand Up @@ -213,7 +215,9 @@ mod tests {
// maturation_period is not mature.
assert_eq!(
merge_policy.split_maturity(split.num_docs, split.num_merge_ops),
SplitMaturity::TimeToMaturity(Duration::from_secs(3600))
SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(3600)
}
);
// Split with docs > max_merge_docs is mature.
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ impl MergePolicy for StableLogMergePolicy {
if split_num_docs >= self.split_num_docs_target {
return SplitMaturity::Mature;
}
SplitMaturity::TimeToMaturity(self.config.maturation_period)
SplitMaturity::MatureAfterPeriod {
period: self.config.maturation_period,
}
}

#[cfg(test)]
Expand Down Expand Up @@ -373,7 +375,9 @@ mod tests {
// Split under max_merge_docs and created before now() - maturation_period is not mature.
assert_eq!(
merge_policy.split_maturity(9_000_000, 0),
SplitMaturity::TimeToMaturity(Duration::from_secs(3600 * 48))
SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(3600 * 48)
}
);
assert_eq!(
merge_policy.split_maturity(&merge_policy.split_num_docs_target + 1, 0),
Expand All @@ -383,7 +387,9 @@ mod tests {
// mature.
assert_eq!(
merge_policy.split_maturity(9_000_000, 0),
SplitMaturity::TimeToMaturity(merge_policy.config.maturation_period)
SplitMaturity::MatureAfterPeriod {
period: merge_policy.config.maturation_period
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ mod tests {
SplitMetadata {
split_id: split_id.to_string(),
create_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
maturity: SplitMaturity::TimeToMaturity(Duration::from_secs(3600)),
maturity: SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(3600),
},
..Default::default()
}
}
Expand Down
93 changes: 33 additions & 60 deletions quickwit/quickwit-metastore/src/split_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ impl SplitMetadata {
pub fn is_mature(&self) -> bool {
match self.maturity {
SplitMaturity::Mature => true,
SplitMaturity::TimeToMaturity(duration) => {
self.create_timestamp + duration.as_secs() as i64 <= utc_now_timestamp()
}
SplitMaturity::MatureAfterPeriod {
period: time_to_maturity,
} => self.create_timestamp + time_to_maturity.as_secs() as i64 <= utc_now_timestamp(),
}
}

Expand All @@ -175,9 +175,9 @@ impl SplitMetadata {
pub(crate) fn maturity_timestamp(&self) -> i64 {
match self.maturity {
SplitMaturity::Mature => 0,
SplitMaturity::TimeToMaturity(duration) => {
self.create_timestamp + duration.as_secs() as i64
}
SplitMaturity::MatureAfterPeriod {
period: time_to_maturity,
} => self.create_timestamp + time_to_maturity.as_secs() as i64,
}
}

Expand Down Expand Up @@ -214,7 +214,9 @@ impl quickwit_config::TestableForRegression for SplitMetadata {
uncompressed_docs_size_in_bytes: 234234,
time_range: Some(121000..=130198),
create_timestamp: 3,
maturity: SplitMaturity::TimeToMaturity(Duration::from_secs(4)),
maturity: SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(4),
},
tags: ["234".to_string(), "aaa".to_string()].into_iter().collect(),
footer_offsets: 1000..2000,
num_merge_ops: 3,
Expand Down Expand Up @@ -277,63 +279,33 @@ impl FromStr for SplitState {
/// `TimeToMaturity`.
/// The maturity is determined by the `MergePolicy`.
#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum SplitMaturity {
/// Split is mature.
#[default]
#[serde(with = "serde_split_maturity::mature")]
Mature,
/// Time after which the split will be mature.
/// Note the accuracy is in seconds.
TimeToMaturity(#[serde(with = "serde_split_maturity::duration")] Duration),
/// Period after which the split is mature.
/// Period is truncated to seconds
/// on serialization.
MatureAfterPeriod {
/// Time to maturity.
#[serde(with = "serde_duration_secs")]
period: Duration,
},
}

pub mod serde_split_maturity {
// Serde untagged will serialize the empty variant as `null`.
// We want to serialize it as `mature` instead.
pub mod mature {
pub fn serialize<S>(serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_str("mature")
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<(), D::Error>
where D: serde::Deserializer<'de> {
struct V;
impl<'de> serde::de::Visitor<'de> for V {
type Value = ();
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str(concat!("\"mature\""))
}
fn visit_str<E: serde::de::Error>(self, value: &str) -> Result<Self::Value, E> {
if value == "mature" {
Ok(())
} else {
Err(E::invalid_value(serde::de::Unexpected::Str(value), &self))
}
}
}
deserializer.deserialize_str(V)
}
}

pub mod serde_duration_secs {
// Serializer/Deserializer of `Duration` in seconds.
pub mod duration {
pub fn serialize<S>(
duration: &std::time::Duration,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_u64(duration.as_secs())
}
pub fn serialize<S>(duration: &std::time::Duration, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer {
serializer.serialize_u64(duration.as_secs())
}

pub fn deserialize<'de, D>(deserializer: D) -> Result<std::time::Duration, D::Error>
where D: serde::Deserializer<'de> {
let time_to_maturity_in_seconds: u64 = serde::Deserialize::deserialize(deserializer)?;
Ok(std::time::Duration::from_secs(time_to_maturity_in_seconds))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<std::time::Duration, D::Error>
where D: serde::Deserializer<'de> {
let time_to_maturity_in_seconds: u64 = serde::Deserialize::deserialize(deserializer)?;
Ok(std::time::Duration::from_secs(time_to_maturity_in_seconds))
}
}

Expand All @@ -354,17 +326,18 @@ mod tests {
#[test]
fn test_split_maturity_serialization() {
{
let split_maturity =
super::SplitMaturity::TimeToMaturity(std::time::Duration::from_secs(10));
let split_maturity = super::SplitMaturity::MatureAfterPeriod {
period: std::time::Duration::from_secs(10),
};
let serialized = serde_json::to_string(&split_maturity).unwrap();
assert_eq!(serialized, r#"10"#);
assert_eq!(serialized, r#"{"type":"mature_after_period","period":10}"#);
let deserialized: super::SplitMaturity = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, split_maturity);
}
{
let split_maturity = super::SplitMaturity::Mature;
let serialized = serde_json::to_string(&split_maturity).unwrap();
assert_eq!(serialized, r#""mature""#);
assert_eq!(serialized, r#"{"type":"mature"}"#);
let deserialized: super::SplitMaturity = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, split_maturity);
}
Expand Down
12 changes: 9 additions & 3 deletions quickwit/quickwit-metastore/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,9 @@ pub mod test_suite {
index_uid: index_uid.clone(),
time_range: Some(0..=99),
create_timestamp: current_timestamp,
maturity: SplitMaturity::TimeToMaturity(Duration::from_secs(0)),
maturity: SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(0),
},
tags: to_btree_set(&["tag!", "tag:foo", "tag:bar"]),
delete_opstamp: 3,
..Default::default()
Expand All @@ -1674,7 +1676,9 @@ pub mod test_suite {
index_uid: index_uid.clone(),
time_range: Some(100..=199),
create_timestamp: current_timestamp,
maturity: SplitMaturity::TimeToMaturity(Duration::from_secs(10)),
maturity: SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(10),
},
tags: to_btree_set(&["tag!", "tag:bar"]),
delete_opstamp: 1,
..Default::default()
Expand All @@ -1686,7 +1690,9 @@ pub mod test_suite {
index_uid: index_uid.clone(),
time_range: Some(200..=299),
create_timestamp: current_timestamp,
maturity: SplitMaturity::TimeToMaturity(Duration::from_secs(20)),
maturity: SplitMaturity::MatureAfterPeriod {
period: Duration::from_secs(20),
},
tags: to_btree_set(&["tag!", "tag:foo", "tag:baz"]),
delete_opstamp: 5,
..Default::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@
"start": 1000
},
"index_uid": "my-index",
"maturity": "mature",
"maturity": {
"type": "mature"
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@
"start": 1000
},
"index_uid": "my-index",
"maturity": "mature",
"maturity": {
"type": "mature"
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@
"start": 1000
},
"index_uid": "my-index:11111111111111111111111111",
"maturity": 4,
"maturity": {
"type": "mature_after_period",
"period": 4
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@
"start": 1000
},
"index_uid": "my-index:11111111111111111111111111",
"maturity": 4,
"maturity": {
"type": "mature_after_period",
"period": 4
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"start": 1000
},
"index_uid": "my-index",
"maturity": "mature",
"maturity": {
"type": "mature"
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"start": 1000
},
"index_uid": "my-index",
"maturity": "mature",
"maturity": {
"type": "mature"
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"start": 1000
},
"index_uid": "my-index:11111111111111111111111111",
"maturity": 4,
"maturity": {
"type": "mature_after_period",
"period": 4
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"start": 1000
},
"index_uid": "my-index:11111111111111111111111111",
"maturity": 4,
"maturity": {
"type": "mature_after_period",
"period": 4
},
"node_id": "node",
"num_docs": 12303,
"num_merge_ops": 3,
Expand Down

0 comments on commit df5b165

Please sign in to comment.