diff --git a/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql b/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql index 70c2632b8..8b33bc8bd 100644 --- a/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql +++ b/lib/workload/stateless/filemanager/database/migrations/0002_add_s3_object_table.sql @@ -53,6 +53,6 @@ create table s3_object ( number_duplicate_events integer not null default 0, -- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event. - constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer), - constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer) + constraint created_sequencer_unique unique nulls not distinct (bucket, key, version_id, created_sequencer), + constraint deleted_sequencer_unique unique nulls not distinct (bucket, key, version_id, deleted_sequencer) ); diff --git a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql index 792dd1537..9e52b3569 100644 --- a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql +++ b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql @@ -51,7 +51,7 @@ current_objects as ( join input on input.bucket = s3_object.bucket and input.key = s3_object.key and - input.version_id = s3_object.version_id + input.version_id is not distinct from s3_object.version_id -- Lock this pre-emptively for the update. for update ), diff --git a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql index 68014c2d6..6c36cad39 100644 --- a/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql +++ b/lib/workload/stateless/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql @@ -36,7 +36,7 @@ current_objects as ( join input on input.bucket = s3_object.bucket and input.key = s3_object.key and - input.version_id = s3_object.version_id + input.version_id is not distinct from s3_object.version_id -- Lock this pre-emptively for the update. for update ), diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs index 2a1cabc46..431267ac6 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs @@ -290,7 +290,11 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with(&s3_object_results[0], Some(0)); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -304,7 +308,11 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with(&s3_object_results[0], Some(0)); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -327,7 +335,49 @@ pub(crate) mod tests { 2, s3_object_results[0].get::("number_duplicate_events") ); - assert_deleted_with(&s3_object_results[0], Some(0)); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_reordered_duplicates(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(test_events())) + .await + .unwrap(); + + // No reason the order should matter if they are duplicates + let events = test_events(); + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_deleted(events.object_deleted), + )) + .await + .unwrap(); + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_created(events.object_created), + )) + .await + .unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!( + 2, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -336,19 +386,15 @@ pub(crate) mod tests { let events = test_events(); // Deleted coming before created. ingester - .ingest(EventSourceType::S3(Events { - object_created: Default::default(), - object_deleted: events.object_deleted, - other: Default::default(), - })) + .ingest(EventSourceType::S3( + Events::default().with_object_deleted(events.object_deleted), + )) .await .unwrap(); ingester - .ingest(EventSourceType::S3(Events { - object_created: events.object_created, - object_deleted: Default::default(), - other: Default::default(), - })) + .ingest(EventSourceType::S3( + Events::default().with_object_created(events.object_created), + )) .await .unwrap(); @@ -357,7 +403,11 @@ pub(crate) mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); assert_eq!(2, s3_object_results[0].get::("number_reordered")); - assert_deleted_with(&s3_object_results[0], Some(0)); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -393,14 +443,129 @@ pub(crate) mod tests { s3_object_results[1].get::("number_duplicate_events") ); assert_eq!(1, s3_object_results[1].get::("number_reordered")); - assert_deleted_with(&s3_object_results[1], Some(0)); + assert_deleted_with( + &s3_object_results[1], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); assert_created_with( &s3_object_results[0], - EXPECTED_VERSION_ID, + Some(EXPECTED_VERSION_ID.to_string()), EXPECTED_SEQUENCER_CREATED_ONE, ); } + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + let events = remove_version_ids(test_events()); + + // Correct ordering + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_created(events.object_created), + )) + .await + .unwrap(); + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_deleted(events.object_deleted), + )) + .await + .unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!(0, s3_object_results[0].get::("number_reordered")); + assert_deleted_with(&s3_object_results[0], Some(0), None); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_duplicates_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(remove_version_ids(test_events()))) + .await + .unwrap(); + ingester + .ingest(EventSourceType::S3(remove_version_ids(test_events()))) + .await + .unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!( + 2, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted_with(&s3_object_results[0], Some(0), None); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_reordered_duplicates_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + ingester + .ingest(EventSourceType::S3(remove_version_ids(test_events()))) + .await + .unwrap(); + + // No reason the order should matter if they are duplicates + let events = remove_version_ids(test_events()); + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_deleted(events.object_deleted), + )) + .await + .unwrap(); + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_created(events.object_created), + )) + .await + .unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!( + 2, + s3_object_results[0].get::("number_duplicate_events") + ); + assert_deleted_with(&s3_object_results[0], Some(0), None); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_reorder_without_version_id(pool: PgPool) { + let ingester = test_ingester(pool); + let events = remove_version_ids(test_events()); + + // Deleted coming before created. + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_deleted(events.object_deleted), + )) + .await + .unwrap(); + ingester + .ingest(EventSourceType::S3( + Events::default().with_object_created(events.object_created), + )) + .await + .unwrap(); + + let (object_results, s3_object_results) = fetch_results(&ingester).await; + + assert_eq!(object_results.len(), 1); + assert_eq!(s3_object_results.len(), 1); + assert_eq!(2, s3_object_results[0].get::("number_reordered")); + assert_deleted_with(&s3_object_results[0], Some(0), None); + } + #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_duplicates_with_version_id(pool: PgPool) { let ingester = test_ingester(pool); @@ -434,10 +599,14 @@ pub(crate) mod tests { s3_object_results[1].get::("number_duplicate_events") ); assert_eq!(0, s3_object_results[1].get::("number_reordered")); - assert_deleted_with(&s3_object_results[1], Some(0)); + assert_deleted_with( + &s3_object_results[1], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); assert_created_with( &s3_object_results[0], - "version_id", + Some("version_id".to_string()), EXPECTED_SEQUENCER_CREATED_ONE, ); } @@ -605,6 +774,21 @@ pub(crate) mod tests { ); } + fn remove_version_ids(mut events: Events) -> Events { + events + .object_deleted + .version_ids + .iter_mut() + .for_each(|version_id| _ = version_id.take()); + events + .object_created + .version_ids + .iter_mut() + .for_each(|version_id| _ = version_id.take()); + + events + } + pub(crate) async fn fetch_results(ingester: &Ingester) -> (Vec, Vec) { ( sqlx::query("select * from object") @@ -620,7 +804,7 @@ pub(crate) mod tests { pub(crate) fn assert_created_with( s3_object_results: &PgRow, - expected_version_id: &str, + expected_version_id: Option, expected_sequencer: &str, ) { assert_eq!("bucket", s3_object_results.get::("bucket")); @@ -629,7 +813,7 @@ pub(crate) mod tests { assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); assert_eq!( expected_version_id, - s3_object_results.get::("version_id") + s3_object_results.get::, _>("version_id") ); assert_eq!( expected_sequencer, @@ -648,17 +832,21 @@ pub(crate) mod tests { pub(crate) fn assert_created(s3_object_results: &PgRow) { assert_created_with( s3_object_results, - EXPECTED_VERSION_ID, + Some(EXPECTED_VERSION_ID.to_string()), EXPECTED_SEQUENCER_CREATED_ONE, ) } - pub(crate) fn assert_deleted_with(s3_object_results: &PgRow, size: Option) { + pub(crate) fn assert_deleted_with( + s3_object_results: &PgRow, + size: Option, + version_id: Option, + ) { assert_eq!("bucket", s3_object_results.get::("bucket")); assert_eq!("key", s3_object_results.get::("key")); assert_eq!( - EXPECTED_VERSION_ID, - s3_object_results.get::("version_id") + version_id, + s3_object_results.get::, _>("version_id") ); assert_eq!(size, s3_object_results.get::, _>("size")); assert_eq!(EXPECTED_E_TAG, s3_object_results.get::("e_tag")); diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs index 747412a44..67c531b75 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/aws/mod.rs @@ -194,13 +194,33 @@ impl From for FlatS3EventMessages { } /// Group by event types. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Events { pub object_created: TransposedS3EventMessages, pub object_deleted: TransposedS3EventMessages, pub other: TransposedS3EventMessages, } +impl Events { + /// Set the created objects. + pub fn with_object_created(mut self, object_created: TransposedS3EventMessages) -> Self { + self.object_created = object_created; + self + } + + /// Set the deleted objects. + pub fn with_object_deleted(mut self, object_deleted: TransposedS3EventMessages) -> Self { + self.object_deleted = object_deleted; + self + } + + /// Set the other events. + pub fn with_other(mut self, other: TransposedS3EventMessages) -> Self { + self.other = other; + self + } +} + impl From for Events { fn from(messages: FlatS3EventMessages) -> Self { let mut object_created = FlatS3EventMessages::default(); diff --git a/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs index e40153c96..8e9d3e3a7 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/handlers/aws.rs @@ -97,7 +97,7 @@ mod tests { use crate::events::aws::collecter::tests::{ expected_head_object, set_s3_client_expectations, set_sqs_client_expectations, }; - use crate::events::aws::tests::expected_event_record_simple; + use crate::events::aws::tests::{expected_event_record_simple, EXPECTED_VERSION_ID}; use super::*; @@ -118,7 +118,11 @@ mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with(&s3_object_results[0], Some(0)); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); } #[sqlx::test(migrator = "MIGRATOR")] @@ -142,6 +146,10 @@ mod tests { assert_eq!(object_results.len(), 1); assert_eq!(s3_object_results.len(), 1); - assert_deleted_with(&s3_object_results[0], Some(0)); + assert_deleted_with( + &s3_object_results[0], + Some(0), + Some(EXPECTED_VERSION_ID.to_string()), + ); } }