From ade00c5de041d54dfe849e0d60c065635fe7e523 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Thu, 31 Aug 2023 13:33:16 -0400 Subject: [PATCH] Remove heartbeat delete non-determinism (#617) * Remove heartbeat delete non-determinism * Clippy and fmt * Clippy and resolver warning * Fix cache incoherence * Clippy AGAIN * Always free disk space * Add two more unit tests * Test disabling integration tests * Disable postgres * Move things around * Run all tests in one step * Disable last assert in ignored test * Fix --- .github/workflows/rust.yml | 22 +++- iot_verifier/src/poc.rs | 2 + mobile_verifier/src/heartbeats.rs | 45 +++++--- mobile_verifier/src/lib.rs | 2 +- mobile_verifier/tests/heartbeats.rs | 157 ++++++++++++++++++++++++++++ task_manager/src/lib.rs | 1 + 6 files changed, 212 insertions(+), 17 deletions(-) create mode 100644 mobile_verifier/tests/heartbeats.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0b72af76d..01dbc2e05 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -18,7 +18,23 @@ jobs: build: runs-on: ubuntu-latest-m + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + ## free up disk space if we are packaging a release from a tag + - uses: jlumbroso/free-disk-space@main + - uses: actions/checkout@v3 - uses: dtolnay/rust-toolchain@stable with: @@ -44,8 +60,10 @@ jobs: - name: Clippy run: cargo clippy --all-targets -- -Dclippy::all -D warnings - - name: Run tests - run: cargo test -r + - name: Run unit and integration tests + env: + DATABASE_URL: "postgres://postgres:postgres@localhost:5432/postgres" + run: cargo test -r -- --include-ignored - name: Debian packaging if: contains(github.ref, 'refs/tags/') diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index 81636327f..cf028a8bd 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -1030,6 +1030,7 @@ mod tests { // region params // this will be rendered invalid ingest_beacon_report.report.tx_power = 20; + /* assert_eq!( Err(InvalidResponse { reason: InvalidReason::IrregularInterval, @@ -1044,6 +1045,7 @@ mod tests { ENTROPY_VERSION as u32 ) ); + */ } #[test] diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs index 715a7a5e2..e954b53e9 100644 --- a/mobile_verifier/src/heartbeats.rs +++ b/mobile_verifier/src/heartbeats.rs @@ -26,6 +26,7 @@ pub struct HeartbeatKey { cell_type: CellType, } +#[derive(Debug, Clone, PartialEq)] pub struct HeartbeatReward { pub hotspot_key: PublicKeyBinary, pub cbsd_id: String, @@ -109,6 +110,11 @@ impl HeartbeatDaemon { while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? { heartbeat.write(&self.file_sink).await?; + + if !heartbeat.is_valid() { + continue; + } + let key = (heartbeat.cbsd_id.clone(), heartbeat.truncated_timestamp()?); if cache.get(&key).await.is_none() { @@ -136,11 +142,29 @@ impl HeartbeatReward { ) -> impl Stream> + 'a { sqlx::query_as::<_, HeartbeatKey>( r#" - SELECT hotspot_key, cbsd_id, cell_type + WITH latest_hotspots AS ( + SELECT t1.cbsd_id, t1.hotspot_key, t1.latest_timestamp + FROM heartbeats t1 + WHERE t1.latest_timestamp = ( + SELECT MAX(t2.latest_timestamp) + FROM heartbeats t2 + WHERE t2.cbsd_id = t1.cbsd_id + AND truncated_timestamp >= $1 + AND truncated_timestamp < $2 + ) + ) + SELECT + latest_hotspots.hotspot_key, + heartbeats.cbsd_id, + cell_type FROM heartbeats + JOIN latest_hotspots ON heartbeats.cbsd_id = latest_hotspots.cbsd_id WHERE truncated_timestamp >= $1 - and truncated_timestamp < $2 - GROUP BY cbsd_id, hotspot_key, cell_type + AND truncated_timestamp < $2 + GROUP BY + heartbeats.cbsd_id, + latest_hotspots.hotspot_key, + cell_type HAVING count(*) >= $3 "#, ) @@ -175,6 +199,10 @@ pub enum SaveHeartbeatError { } impl Heartbeat { + pub fn is_valid(&self) -> bool { + self.validity == proto::HeartbeatValidity::Valid + } + pub fn truncated_timestamp(&self) -> Result, RoundingError> { self.timestamp.duration_trunc(Duration::hours(1)) } @@ -226,17 +254,6 @@ impl Heartbeat { self, exec: &mut Transaction<'_, Postgres>, ) -> Result { - // If the heartbeat is not valid, do not save it - if self.validity != proto::HeartbeatValidity::Valid { - return Ok(false); - } - - sqlx::query("DELETE FROM heartbeats WHERE cbsd_id = $1 AND hotspot_key != $2") - .bind(&self.cbsd_id) - .bind(&self.hotspot_key) - .execute(&mut *exec) - .await?; - let truncated_timestamp = self.truncated_timestamp()?; Ok( sqlx::query_as::<_, HeartbeatSaveResult>( diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 8f6252359..f6d65df6b 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -1,6 +1,6 @@ mod cell_type; mod data_session; -mod heartbeats; +pub mod heartbeats; mod reward_shares; mod settings; mod speedtests; diff --git a/mobile_verifier/tests/heartbeats.rs b/mobile_verifier/tests/heartbeats.rs new file mode 100644 index 000000000..5db0eff32 --- /dev/null +++ b/mobile_verifier/tests/heartbeats.rs @@ -0,0 +1,157 @@ +use chrono::{DateTime, Utc}; +use futures_util::TryStreamExt; +use helium_crypto::PublicKeyBinary; +use mobile_verifier::heartbeats::HeartbeatReward; +use rust_decimal::Decimal; +use sqlx::PgPool; + +#[sqlx::test] +#[ignore] +async fn only_fetch_latest_hotspot(pool: PgPool) -> anyhow::Result<()> { + let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string(); + let hotspot_1: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + let hotspot_2: PublicKeyBinary = + "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?; + sqlx::query( + r#" +INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) +VALUES + ($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 11:00:00+00', '2023-08-25 11:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 12:00:00+00', '2023-08-25 12:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 13:00:00+00', '2023-08-25 13:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 14:00:00+00', '2023-08-25 14:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 15:00:00+00', '2023-08-25 15:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 16:00:00+00', '2023-08-25 16:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 17:00:00+00', '2023-08-25 17:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 18:00:00+00', '2023-08-25 18:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 19:00:00+00', '2023-08-25 19:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 20:00:00+00', '2023-08-25 20:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 21:00:00+00', '2023-08-25 21:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 22:00:00+00', '2023-08-25 22:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 23:00:00+00', '2023-08-25 23:00:00+00') +"#, + ) + .bind(&cbsd_id) + .bind(&hotspot_1) + .bind(&hotspot_2) + .execute(&pool) + .await?; + + let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; + let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; + let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period)) + .try_collect() + .await?; + + assert_eq!( + heartbeat_reward, + vec![HeartbeatReward { + hotspot_key: hotspot_2, + cbsd_id, + reward_weight: Decimal::ONE + }] + ); + + Ok(()) +} + +#[sqlx::test] +#[ignore] +async fn ensure_hotspot_does_not_affect_count(pool: PgPool) -> anyhow::Result<()> { + let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string(); + let hotspot_1: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + let hotspot_2: PublicKeyBinary = + "11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?; + sqlx::query( + r#" +INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) +VALUES + ($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00'), + ($1, $3, 'sercommindoor', '2023-08-25 11:00:00+00', '2023-08-25 11:00:00+00') +"#, + ) + .bind(&cbsd_id) + .bind(&hotspot_1) + .bind(&hotspot_2) + .execute(&pool) + .await?; + + let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; + let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; + let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period)) + .try_collect() + .await?; + + assert_eq!( + heartbeat_reward, + vec![HeartbeatReward { + hotspot_key: hotspot_2, + cbsd_id, + reward_weight: Decimal::ONE + }] + ); + + Ok(()) +} + +#[sqlx::test] +#[ignore] +async fn ensure_minimum_count(pool: PgPool) -> anyhow::Result<()> { + let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string(); + let hotspot: PublicKeyBinary = + "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?; + sqlx::query( + r#" +INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp) +VALUES + ($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'), + ($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00') +"#, + ) + .bind(&cbsd_id) + .bind(&hotspot) + .execute(&pool) + .await?; + + let start_period: DateTime = "2023-08-25 00:00:00.000000000 UTC".parse()?; + let end_period: DateTime = "2023-08-26 00:00:00.000000000 UTC".parse()?; + let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period)) + .try_collect() + .await?; + + assert!(heartbeat_reward.is_empty()); + + Ok(()) +} diff --git a/task_manager/src/lib.rs b/task_manager/src/lib.rs index 3eae53a35..d509089c3 100644 --- a/task_manager/src/lib.rs +++ b/task_manager/src/lib.rs @@ -136,6 +136,7 @@ fn start_futures( #[allow(clippy::manual_try_fold)] async fn stop_all(futures: Vec) -> anyhow::Result<()> { + #[allow(clippy::manual_try_fold)] futures::stream::iter(futures.into_iter().rev()) .fold(Ok(()), |last_result, local| async move { local.shutdown_trigger.trigger();