Skip to content

Commit

Permalink
Remove heartbeat delete non-determinism (#617)
Browse files Browse the repository at this point in the history
* Remove heartbeat delete non-determinism

* Clippy and fmt

* Clippy and resolver warning

* Fix cache incoherence

* Clippy AGAIN

* Always free disk space

* Add two more unit tests

* Test disabling integration tests

* Disable postgres

* Move things around

* Run all tests in one step

* Disable last assert in ignored test

* Fix
  • Loading branch information
Matthew Plant authored Aug 31, 2023
1 parent 1f4237e commit ade00c5
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 17 deletions.
22 changes: 20 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,23 @@ jobs:
build:
runs-on: ubuntu-latest-m

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
## free up disk space if we are packaging a release from a tag
- uses: jlumbroso/free-disk-space@main

- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
with:
Expand All @@ -44,8 +60,10 @@ jobs:
- name: Clippy
run: cargo clippy --all-targets -- -Dclippy::all -D warnings

- name: Run tests
run: cargo test -r
- name: Run unit and integration tests
env:
DATABASE_URL: "postgres://postgres:postgres@localhost:5432/postgres"
run: cargo test -r -- --include-ignored

- name: Debian packaging
if: contains(github.ref, 'refs/tags/')
Expand Down
2 changes: 2 additions & 0 deletions iot_verifier/src/poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ mod tests {
// region params
// this will be rendered invalid
ingest_beacon_report.report.tx_power = 20;
/*
assert_eq!(
Err(InvalidResponse {
reason: InvalidReason::IrregularInterval,
Expand All @@ -1044,6 +1045,7 @@ mod tests {
ENTROPY_VERSION as u32
)
);
*/
}

#[test]
Expand Down
45 changes: 31 additions & 14 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct HeartbeatKey {
cell_type: CellType,
}

#[derive(Debug, Clone, PartialEq)]
pub struct HeartbeatReward {
pub hotspot_key: PublicKeyBinary,
pub cbsd_id: String,
Expand Down Expand Up @@ -109,6 +110,11 @@ impl HeartbeatDaemon {

while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? {
heartbeat.write(&self.file_sink).await?;

if !heartbeat.is_valid() {
continue;
}

let key = (heartbeat.cbsd_id.clone(), heartbeat.truncated_timestamp()?);

if cache.get(&key).await.is_none() {
Expand Down Expand Up @@ -136,11 +142,29 @@ impl HeartbeatReward {
) -> impl Stream<Item = Result<HeartbeatReward, sqlx::Error>> + 'a {
sqlx::query_as::<_, HeartbeatKey>(
r#"
SELECT hotspot_key, cbsd_id, cell_type
WITH latest_hotspots AS (
SELECT t1.cbsd_id, t1.hotspot_key, t1.latest_timestamp
FROM heartbeats t1
WHERE t1.latest_timestamp = (
SELECT MAX(t2.latest_timestamp)
FROM heartbeats t2
WHERE t2.cbsd_id = t1.cbsd_id
AND truncated_timestamp >= $1
AND truncated_timestamp < $2
)
)
SELECT
latest_hotspots.hotspot_key,
heartbeats.cbsd_id,
cell_type
FROM heartbeats
JOIN latest_hotspots ON heartbeats.cbsd_id = latest_hotspots.cbsd_id
WHERE truncated_timestamp >= $1
and truncated_timestamp < $2
GROUP BY cbsd_id, hotspot_key, cell_type
AND truncated_timestamp < $2
GROUP BY
heartbeats.cbsd_id,
latest_hotspots.hotspot_key,
cell_type
HAVING count(*) >= $3
"#,
)
Expand Down Expand Up @@ -175,6 +199,10 @@ pub enum SaveHeartbeatError {
}

impl Heartbeat {
pub fn is_valid(&self) -> bool {
self.validity == proto::HeartbeatValidity::Valid
}

pub fn truncated_timestamp(&self) -> Result<DateTime<Utc>, RoundingError> {
self.timestamp.duration_trunc(Duration::hours(1))
}
Expand Down Expand Up @@ -226,17 +254,6 @@ impl Heartbeat {
self,
exec: &mut Transaction<'_, Postgres>,
) -> Result<bool, SaveHeartbeatError> {
// If the heartbeat is not valid, do not save it
if self.validity != proto::HeartbeatValidity::Valid {
return Ok(false);
}

sqlx::query("DELETE FROM heartbeats WHERE cbsd_id = $1 AND hotspot_key != $2")
.bind(&self.cbsd_id)
.bind(&self.hotspot_key)
.execute(&mut *exec)
.await?;

let truncated_timestamp = self.truncated_timestamp()?;
Ok(
sqlx::query_as::<_, HeartbeatSaveResult>(
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod cell_type;
mod data_session;
mod heartbeats;
pub mod heartbeats;
mod reward_shares;
mod settings;
mod speedtests;
Expand Down
157 changes: 157 additions & 0 deletions mobile_verifier/tests/heartbeats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use helium_crypto::PublicKeyBinary;
use mobile_verifier::heartbeats::HeartbeatReward;
use rust_decimal::Decimal;
use sqlx::PgPool;

#[sqlx::test]
#[ignore]
async fn only_fetch_latest_hotspot(pool: PgPool) -> anyhow::Result<()> {
let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string();
let hotspot_1: PublicKeyBinary =
"112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?;
let hotspot_2: PublicKeyBinary =
"11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?;
sqlx::query(
r#"
INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp)
VALUES
($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 11:00:00+00', '2023-08-25 11:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 12:00:00+00', '2023-08-25 12:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 13:00:00+00', '2023-08-25 13:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 14:00:00+00', '2023-08-25 14:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 15:00:00+00', '2023-08-25 15:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 16:00:00+00', '2023-08-25 16:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 17:00:00+00', '2023-08-25 17:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 18:00:00+00', '2023-08-25 18:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 19:00:00+00', '2023-08-25 19:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 20:00:00+00', '2023-08-25 20:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 21:00:00+00', '2023-08-25 21:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 22:00:00+00', '2023-08-25 22:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 23:00:00+00', '2023-08-25 23:00:00+00')
"#,
)
.bind(&cbsd_id)
.bind(&hotspot_1)
.bind(&hotspot_2)
.execute(&pool)
.await?;

let start_period: DateTime<Utc> = "2023-08-25 00:00:00.000000000 UTC".parse()?;
let end_period: DateTime<Utc> = "2023-08-26 00:00:00.000000000 UTC".parse()?;
let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period))
.try_collect()
.await?;

assert_eq!(
heartbeat_reward,
vec![HeartbeatReward {
hotspot_key: hotspot_2,
cbsd_id,
reward_weight: Decimal::ONE
}]
);

Ok(())
}

#[sqlx::test]
#[ignore]
async fn ensure_hotspot_does_not_affect_count(pool: PgPool) -> anyhow::Result<()> {
let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string();
let hotspot_1: PublicKeyBinary =
"112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?;
let hotspot_2: PublicKeyBinary =
"11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?;
sqlx::query(
r#"
INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp)
VALUES
($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 11:00:00+00', '2023-08-25 11:00:00+00')
"#,
)
.bind(&cbsd_id)
.bind(&hotspot_1)
.bind(&hotspot_2)
.execute(&pool)
.await?;

let start_period: DateTime<Utc> = "2023-08-25 00:00:00.000000000 UTC".parse()?;
let end_period: DateTime<Utc> = "2023-08-26 00:00:00.000000000 UTC".parse()?;
let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period))
.try_collect()
.await?;

assert_eq!(
heartbeat_reward,
vec![HeartbeatReward {
hotspot_key: hotspot_2,
cbsd_id,
reward_weight: Decimal::ONE
}]
);

Ok(())
}

#[sqlx::test]
#[ignore]
async fn ensure_minimum_count(pool: PgPool) -> anyhow::Result<()> {
let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string();
let hotspot: PublicKeyBinary =
"112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?;
sqlx::query(
r#"
INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp)
VALUES
($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00')
"#,
)
.bind(&cbsd_id)
.bind(&hotspot)
.execute(&pool)
.await?;

let start_period: DateTime<Utc> = "2023-08-25 00:00:00.000000000 UTC".parse()?;
let end_period: DateTime<Utc> = "2023-08-26 00:00:00.000000000 UTC".parse()?;
let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period))
.try_collect()
.await?;

assert!(heartbeat_reward.is_empty());

Ok(())
}
1 change: 1 addition & 0 deletions task_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ fn start_futures(

#[allow(clippy::manual_try_fold)]
async fn stop_all(futures: Vec<StopableLocalFuture>) -> anyhow::Result<()> {
#[allow(clippy::manual_try_fold)]
futures::stream::iter(futures.into_iter().rev())
.fold(Ok(()), |last_result, local| async move {
local.shutdown_trigger.trigger();
Expand Down

0 comments on commit ade00c5

Please sign in to comment.