Skip to content

Commit

Permalink
fix(filemanager): correctly deal with null version_id for duplicates …
Browse files Browse the repository at this point in the history
…and reordering
  • Loading branch information
mmalenic committed Feb 27, 2024
1 parent 919c140 commit 5d47be1
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ create table s3_object (
number_duplicate_events integer not null default 0,

-- The sequencers should be unique with the bucket, key, and its version, otherwise this is a duplicate event.
constraint created_sequencer_unique unique (bucket, key, version_id, created_sequencer),
constraint deleted_sequencer_unique unique (bucket, key, version_id, deleted_sequencer)
constraint created_sequencer_unique unique nulls not distinct (bucket, key, version_id, created_sequencer),
constraint deleted_sequencer_unique unique nulls not distinct (bucket, key, version_id, deleted_sequencer)
);
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ current_objects as (
join input on
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id
input.version_id is not distinct from s3_object.version_id
-- Lock this pre-emptively for the update.
for update
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ current_objects as (
join input on
input.bucket = s3_object.bucket and
input.key = s3_object.key and
input.version_id = s3_object.version_id
input.version_id is not distinct from s3_object.version_id
-- Lock this pre-emptively for the update.
for update
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_deleted_with(&s3_object_results[0], Some(0));
assert_deleted_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
}

#[sqlx::test(migrator = "MIGRATOR")]
Expand All @@ -304,7 +308,11 @@ pub(crate) mod tests {

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_deleted_with(&s3_object_results[0], Some(0));
assert_deleted_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
}

#[sqlx::test(migrator = "MIGRATOR")]
Expand All @@ -327,7 +335,49 @@ pub(crate) mod tests {
2,
s3_object_results[0].get::<i32, _>("number_duplicate_events")
);
assert_deleted_with(&s3_object_results[0], Some(0));
assert_deleted_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_reordered_duplicates(pool: PgPool) {
let ingester = test_ingester(pool);
ingester
.ingest(EventSourceType::S3(test_events()))
.await
.unwrap();

// No reason the order should matter if they are duplicates
let events = test_events();
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_deleted(events.object_deleted),
))
.await
.unwrap();
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_created(events.object_created),
))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_eq!(
2,
s3_object_results[0].get::<i32, _>("number_duplicate_events")
);
assert_deleted_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
}

#[sqlx::test(migrator = "MIGRATOR")]
Expand All @@ -336,19 +386,15 @@ pub(crate) mod tests {
let events = test_events();
// Deleted coming before created.
ingester
.ingest(EventSourceType::S3(Events {
object_created: Default::default(),
object_deleted: events.object_deleted,
other: Default::default(),
}))
.ingest(EventSourceType::S3(
Events::default().with_object_deleted(events.object_deleted),
))
.await
.unwrap();
ingester
.ingest(EventSourceType::S3(Events {
object_created: events.object_created,
object_deleted: Default::default(),
other: Default::default(),
}))
.ingest(EventSourceType::S3(
Events::default().with_object_created(events.object_created),
))
.await
.unwrap();

Expand All @@ -357,7 +403,11 @@ pub(crate) mod tests {
assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_eq!(2, s3_object_results[0].get::<i32, _>("number_reordered"));
assert_deleted_with(&s3_object_results[0], Some(0));
assert_deleted_with(
&s3_object_results[0],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
}

#[sqlx::test(migrator = "MIGRATOR")]
Expand Down Expand Up @@ -393,14 +443,129 @@ pub(crate) mod tests {
s3_object_results[1].get::<i32, _>("number_duplicate_events")
);
assert_eq!(1, s3_object_results[1].get::<i32, _>("number_reordered"));
assert_deleted_with(&s3_object_results[1], Some(0));
assert_deleted_with(
&s3_object_results[1],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
assert_created_with(
&s3_object_results[0],
EXPECTED_VERSION_ID,
Some(EXPECTED_VERSION_ID.to_string()),
EXPECTED_SEQUENCER_CREATED_ONE,
);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_without_version_id(pool: PgPool) {
let ingester = test_ingester(pool);
let events = remove_version_ids(test_events());

// Correct ordering
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_created(events.object_created),
))
.await
.unwrap();
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_deleted(events.object_deleted),
))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_eq!(0, s3_object_results[0].get::<i32, _>("number_reordered"));
assert_deleted_with(&s3_object_results[0], Some(0), None);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_duplicates_without_version_id(pool: PgPool) {
let ingester = test_ingester(pool);
ingester
.ingest(EventSourceType::S3(remove_version_ids(test_events())))
.await
.unwrap();
ingester
.ingest(EventSourceType::S3(remove_version_ids(test_events())))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_eq!(
2,
s3_object_results[0].get::<i32, _>("number_duplicate_events")
);
assert_deleted_with(&s3_object_results[0], Some(0), None);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_reordered_duplicates_without_version_id(pool: PgPool) {
let ingester = test_ingester(pool);
ingester
.ingest(EventSourceType::S3(remove_version_ids(test_events())))
.await
.unwrap();

// No reason the order should matter if they are duplicates
let events = remove_version_ids(test_events());
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_deleted(events.object_deleted),
))
.await
.unwrap();
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_created(events.object_created),
))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_eq!(
2,
s3_object_results[0].get::<i32, _>("number_duplicate_events")
);
assert_deleted_with(&s3_object_results[0], Some(0), None);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_reorder_without_version_id(pool: PgPool) {
let ingester = test_ingester(pool);
let events = remove_version_ids(test_events());

// Deleted coming before created.
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_deleted(events.object_deleted),
))
.await
.unwrap();
ingester
.ingest(EventSourceType::S3(
Events::default().with_object_created(events.object_created),
))
.await
.unwrap();

let (object_results, s3_object_results) = fetch_results(&ingester).await;

assert_eq!(object_results.len(), 1);
assert_eq!(s3_object_results.len(), 1);
assert_eq!(2, s3_object_results[0].get::<i32, _>("number_reordered"));
assert_deleted_with(&s3_object_results[0], Some(0), None);
}

#[sqlx::test(migrator = "MIGRATOR")]
async fn ingest_duplicates_with_version_id(pool: PgPool) {
let ingester = test_ingester(pool);
Expand Down Expand Up @@ -434,10 +599,14 @@ pub(crate) mod tests {
s3_object_results[1].get::<i32, _>("number_duplicate_events")
);
assert_eq!(0, s3_object_results[1].get::<i32, _>("number_reordered"));
assert_deleted_with(&s3_object_results[1], Some(0));
assert_deleted_with(
&s3_object_results[1],
Some(0),
Some(EXPECTED_VERSION_ID.to_string()),
);
assert_created_with(
&s3_object_results[0],
"version_id",
Some("version_id".to_string()),
EXPECTED_SEQUENCER_CREATED_ONE,
);
}
Expand Down Expand Up @@ -605,6 +774,21 @@ pub(crate) mod tests {
);
}

fn remove_version_ids(mut events: Events) -> Events {
events
.object_deleted
.version_ids
.iter_mut()
.for_each(|version_id| _ = version_id.take());
events
.object_created
.version_ids
.iter_mut()
.for_each(|version_id| _ = version_id.take());

events
}

pub(crate) async fn fetch_results(ingester: &Ingester) -> (Vec<PgRow>, Vec<PgRow>) {
(
sqlx::query("select * from object")
Expand All @@ -620,7 +804,7 @@ pub(crate) mod tests {

pub(crate) fn assert_created_with(
s3_object_results: &PgRow,
expected_version_id: &str,
expected_version_id: Option<String>,
expected_sequencer: &str,
) {
assert_eq!("bucket", s3_object_results.get::<String, _>("bucket"));
Expand All @@ -629,7 +813,7 @@ pub(crate) mod tests {
assert_eq!(EXPECTED_E_TAG, s3_object_results.get::<String, _>("e_tag"));
assert_eq!(
expected_version_id,
s3_object_results.get::<String, _>("version_id")
s3_object_results.get::<Option<String>, _>("version_id")
);
assert_eq!(
expected_sequencer,
Expand All @@ -648,17 +832,21 @@ pub(crate) mod tests {
pub(crate) fn assert_created(s3_object_results: &PgRow) {
assert_created_with(
s3_object_results,
EXPECTED_VERSION_ID,
Some(EXPECTED_VERSION_ID.to_string()),
EXPECTED_SEQUENCER_CREATED_ONE,
)
}

pub(crate) fn assert_deleted_with(s3_object_results: &PgRow, size: Option<i32>) {
pub(crate) fn assert_deleted_with(
s3_object_results: &PgRow,
size: Option<i32>,
version_id: Option<String>,
) {
assert_eq!("bucket", s3_object_results.get::<String, _>("bucket"));
assert_eq!("key", s3_object_results.get::<String, _>("key"));
assert_eq!(
EXPECTED_VERSION_ID,
s3_object_results.get::<String, _>("version_id")
version_id,
s3_object_results.get::<Option<String>, _>("version_id")
);
assert_eq!(size, s3_object_results.get::<Option<i32>, _>("size"));
assert_eq!(EXPECTED_E_TAG, s3_object_results.get::<String, _>("e_tag"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,33 @@ impl From<TransposedS3EventMessages> for FlatS3EventMessages {
}

/// Group by event types.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct Events {
pub object_created: TransposedS3EventMessages,
pub object_deleted: TransposedS3EventMessages,
pub other: TransposedS3EventMessages,
}

impl Events {
/// Set the created objects.
pub fn with_object_created(mut self, object_created: TransposedS3EventMessages) -> Self {
self.object_created = object_created;
self
}

/// Set the deleted objects.
pub fn with_object_deleted(mut self, object_deleted: TransposedS3EventMessages) -> Self {
self.object_deleted = object_deleted;
self
}

/// Set the other events.
pub fn with_other(mut self, other: TransposedS3EventMessages) -> Self {
self.other = other;
self
}
}

impl From<FlatS3EventMessages> for Events {
fn from(messages: FlatS3EventMessages) -> Self {
let mut object_created = FlatS3EventMessages::default();
Expand Down
Loading

0 comments on commit 5d47be1

Please sign in to comment.