Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove heartbeat delete non-determinism #617

Merged
merged 15 commits into from
Aug 31, 2023
20 changes: 19 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ jobs:
build:
runs-on: ubuntu-20.04

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
Expand All @@ -44,13 +57,18 @@ jobs:
- name: Clippy
run: cargo clippy --all-targets -- -Dclippy::all -D warnings

- name: Run tests
- name: Run unit tests
run: cargo test -r

## free up disk space if we are packaging a release from a tag
- uses: jlumbroso/free-disk-space@main
if: contains(github.ref, 'refs/tags/')

- name: Run integration tests
env:
DATABASE_URL: "postgres://postgres:postgres@localhost:5432/postgres"
run: cargo test --test '*' -- --ignored # Only run integration tests that have been ignored

- name: Debian packaging
if: contains(github.ref, 'refs/tags/')
env:
Expand Down
45 changes: 31 additions & 14 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct HeartbeatKey {
cell_type: CellType,
}

#[derive(Debug, Clone, PartialEq)]
pub struct HeartbeatReward {
pub hotspot_key: PublicKeyBinary,
pub cbsd_id: String,
Expand Down Expand Up @@ -109,6 +110,11 @@ impl HeartbeatDaemon {

while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? {
heartbeat.write(&self.file_sink).await?;

if !heartbeat.is_valid() {
continue;
}

let key = (heartbeat.cbsd_id.clone(), heartbeat.truncated_timestamp()?);

if cache.get(&key).await.is_none() {
Expand Down Expand Up @@ -136,11 +142,29 @@ impl HeartbeatReward {
) -> impl Stream<Item = Result<HeartbeatReward, sqlx::Error>> + 'a {
sqlx::query_as::<_, HeartbeatKey>(
r#"
SELECT hotspot_key, cbsd_id, cell_type
WITH latest_hotspots AS (
SELECT t1.cbsd_id, t1.hotspot_key, t1.latest_timestamp
FROM heartbeats t1
WHERE t1.latest_timestamp = (
SELECT MAX(t2.latest_timestamp)
FROM heartbeats t2
WHERE t2.cbsd_id = t1.cbsd_id
AND truncated_timestamp >= $1
AND truncated_timestamp < $2
)
)
SELECT
latest_hotspots.hotspot_key,
heartbeats.cbsd_id,
cell_type
FROM heartbeats
JOIN latest_hotspots ON heartbeats.cbsd_id = latest_hotspots.cbsd_id
WHERE truncated_timestamp >= $1
and truncated_timestamp < $2
GROUP BY cbsd_id, hotspot_key, cell_type
AND truncated_timestamp < $2
GROUP BY
heartbeats.cbsd_id,
latest_hotspots.hotspot_key,
cell_type
HAVING count(*) >= $3
"#,
)
Expand Down Expand Up @@ -175,6 +199,10 @@ pub enum SaveHeartbeatError {
}

impl Heartbeat {
pub fn is_valid(&self) -> bool {
self.validity == proto::HeartbeatValidity::Valid
}

pub fn truncated_timestamp(&self) -> Result<DateTime<Utc>, RoundingError> {
self.timestamp.duration_trunc(Duration::hours(1))
}
Expand Down Expand Up @@ -226,17 +254,6 @@ impl Heartbeat {
self,
exec: &mut Transaction<'_, Postgres>,
) -> Result<bool, SaveHeartbeatError> {
// If the heartbeat is not valid, do not save it
if self.validity != proto::HeartbeatValidity::Valid {
return Ok(false);
}

sqlx::query("DELETE FROM heartbeats WHERE cbsd_id = $1 AND hotspot_key != $2")
.bind(&self.cbsd_id)
.bind(&self.hotspot_key)
.execute(&mut *exec)
.await?;
maplant marked this conversation as resolved.
Show resolved Hide resolved

let truncated_timestamp = self.truncated_timestamp()?;
Ok(
sqlx::query_as::<_, HeartbeatSaveResult>(
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod cell_type;
mod data_session;
mod heartbeats;
pub mod heartbeats;
mod reward_shares;
mod settings;
mod speedtests;
Expand Down
68 changes: 68 additions & 0 deletions mobile_verifier/tests/heartbeats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use helium_crypto::PublicKeyBinary;
use mobile_verifier::heartbeats::HeartbeatReward;
use rust_decimal::Decimal;
use sqlx::PgPool;

#[sqlx::test]
#[ignore]
async fn only_fetch_latest_hotspot(pool: PgPool) -> anyhow::Result<()> {
let cbsd_id = "P27-SCE4255W120200039521XGB0103".to_string();
let hotspot_1: PublicKeyBinary =
"112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6".parse()?;
let hotspot_2: PublicKeyBinary =
"11sctWiP9r5wDJVuDe1Th4XSL2vaawaLLSQF8f8iokAoMAJHxqp".parse()?;
sqlx::query(
r#"
INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp)
VALUES
($1, $2, 'sercommindoor', '2023-08-25 00:00:00+00', '2023-08-25 00:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 01:00:00+00', '2023-08-25 01:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 02:00:00+00', '2023-08-25 02:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 03:00:00+00', '2023-08-25 03:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 04:00:00+00', '2023-08-25 04:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 05:00:00+00', '2023-08-25 05:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 06:00:00+00', '2023-08-25 06:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 07:00:00+00', '2023-08-25 07:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 08:00:00+00', '2023-08-25 08:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 09:00:00+00', '2023-08-25 09:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 10:00:00+00', '2023-08-25 10:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 11:00:00+00', '2023-08-25 11:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 12:00:00+00', '2023-08-25 12:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 13:00:00+00', '2023-08-25 13:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 14:00:00+00', '2023-08-25 14:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 15:00:00+00', '2023-08-25 15:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 16:00:00+00', '2023-08-25 16:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 17:00:00+00', '2023-08-25 17:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 18:00:00+00', '2023-08-25 18:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 19:00:00+00', '2023-08-25 19:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 20:00:00+00', '2023-08-25 20:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 21:00:00+00', '2023-08-25 21:00:00+00'),
($1, $2, 'sercommindoor', '2023-08-25 22:00:00+00', '2023-08-25 22:00:00+00'),
($1, $3, 'sercommindoor', '2023-08-25 23:00:00+00', '2023-08-25 23:00:00+00')
"#,
)
.bind(&cbsd_id)
.bind(&hotspot_1)
.bind(&hotspot_2)
.execute(&pool)
.await?;

let start_period: DateTime<Utc> = "2023-08-25 00:00:00.000000000 UTC".parse()?;
let end_period: DateTime<Utc> = "2023-08-26 00:00:00.000000000 UTC".parse()?;
let heartbeat_reward: Vec<_> = HeartbeatReward::validated(&pool, &(start_period..end_period))
.try_collect()
.await?;

assert_eq!(
heartbeat_reward,
vec![HeartbeatReward {
hotspot_key: hotspot_2,
cbsd_id,
reward_weight: Decimal::ONE
}]
maplant marked this conversation as resolved.
Show resolved Hide resolved
);

Ok(())
}
1 change: 1 addition & 0 deletions task_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ fn start_futures(

#[allow(clippy::manual_try_fold)]
async fn stop_all(futures: Vec<StopableLocalFuture>) -> anyhow::Result<()> {
#[allow(clippy::manual_try_fold)]
maplant marked this conversation as resolved.
Show resolved Hide resolved
futures::stream::iter(futures.into_iter().rev())
.fold(Ok(()), |last_result, local| async move {
local.shutdown_trigger.trigger();
Expand Down
Loading