Skip to content

Commit

Permalink
Add modeled coverage integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maplant committed Aug 23, 2023
1 parent 9ec0f69 commit 9b38ab4
Show file tree
Hide file tree
Showing 8 changed files with 1,316 additions and 86 deletions.
20 changes: 19 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ jobs:
build:
runs-on: ubuntu-20.04

services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
Expand All @@ -44,13 +57,18 @@ jobs:
- name: Clippy
run: cargo clippy --all-targets -- -Dclippy::all -D warnings

- name: Run tests
- name: Run unit tests
run: cargo test -r

## free up disk space if we are packaging a release from a tag
- uses: jlumbroso/free-disk-space@main
if: contains(github.ref, 'refs/tags/')

- name: Run integration tests
env:
DATABASE_URL: "postgres://postgres:postgres@localhost:5432/postgres"
run: cargo test --test '*' -- --ignored # Only run integration tests that have been ignored

- name: Debian packaging
if: contains(github.ref, 'refs/tags/')
env:
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/migrations/15_modeled_coverage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ CREATE TABLE seniority (
last_heartbeat TIMESTAMPTZ NOT NULL,
uuid UUID NOT NULL,
update_reason INT NOT NULL,
inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (cbsd_id, seniority_ts)
);

Expand Down
63 changes: 34 additions & 29 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use sqlx::{FromRow, Pool, Postgres, Transaction, Type};
use tokio::sync::mpsc::Receiver;
use uuid::Uuid;

use crate::IsAuthorized;

#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Type)]
#[sqlx(type_name = "signal_level")]
#[sqlx(rename_all = "lowercase")]
Expand Down Expand Up @@ -108,7 +110,9 @@ impl CoverageDaemon {

while let Some(coverage_object) = validated_coverage_objects.next().await.transpose()? {
coverage_object.write(&self.file_sink).await?;
coverage_object.save(&mut transaction).await?;
if coverage_object.is_valid() {
coverage_object.save(&mut transaction).await?;
}
}

self.file_sink.commit().await?;
Expand All @@ -130,27 +134,27 @@ pub struct CoverageObject {
}

impl CoverageObject {
pub fn is_valid(&self) -> bool {
matches!(self.validity, CoverageObjectValidity::Valid)
}

pub fn validate_coverage_objects<'a>(
auth_client: &'a AuthorizationClient,
auth_client: &'a impl IsAuthorized,
coverage_objects: impl Stream<Item = CoverageObjectIngestReport> + 'a,
) -> impl Stream<Item = anyhow::Result<Self>> + 'a {
coverage_objects.then(move |coverage_object_report| {
let mut auth_client = auth_client.clone();
async move {
let validity =
validate_coverage_object(&coverage_object_report, &mut auth_client).await?;

Ok(CoverageObject {
cbsd_id: coverage_object_report.report.cbsd_id,
uuid: coverage_object_report.report.uuid,
indoor: coverage_object_report.report.indoor,
coverage_claim_time: coverage_object_report.report.coverage_claim_time,
coverage: coverage_object_report.report.coverage,
pub_key: coverage_object_report.report.pub_key,
signature: coverage_object_report.report.signature,
validity,
})
}
coverage_objects.then(move |coverage_object_report| async move {
let validity = validate_coverage_object(&coverage_object_report, auth_client).await?;

Ok(CoverageObject {
cbsd_id: coverage_object_report.report.cbsd_id,
uuid: coverage_object_report.report.uuid,
indoor: coverage_object_report.report.indoor,
coverage_claim_time: coverage_object_report.report.coverage_claim_time,
coverage: coverage_object_report.report.coverage,
pub_key: coverage_object_report.report.pub_key,
signature: coverage_object_report.report.signature,
validity,
})
})
}

Expand All @@ -176,11 +180,6 @@ impl CoverageObject {
}

pub async fn save(self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<bool> {
// If the coverage object is not valid, do not save it
if self.validity != CoverageObjectValidity::Valid {
return Ok(false);
}

for hex in self.coverage {
let location: u64 = hex.location.into();
sqlx::query(
Expand Down Expand Up @@ -208,10 +207,10 @@ impl CoverageObject {

async fn validate_coverage_object(
coverage_object: &CoverageObjectIngestReport,
auth_client: &mut AuthorizationClient,
auth_client: &impl IsAuthorized,
) -> anyhow::Result<CoverageObjectValidity> {
if !auth_client
.verify_authorized_key(&coverage_object.report.pub_key, NetworkKeyRole::MobilePcs)
.is_authorized(&coverage_object.report.pub_key, NetworkKeyRole::MobilePcs)
.await?
{
return Ok(CoverageObjectValidity::InvalidPubKey);
Expand Down Expand Up @@ -418,7 +417,7 @@ impl CoveredHexes {
}

/// Returns the radios that should be rewarded for giving coverage.
pub fn into_iter(self) -> impl Iterator<Item = CoverageReward> {
pub fn into_coverage_rewards(self) -> impl Iterator<Item = CoverageReward> {
self.hexes
.into_values()
.flat_map(|radios| {
Expand Down Expand Up @@ -448,6 +447,12 @@ pub struct CoverageClaimTimeCache {
cache: Arc<Cache<CoverageClaimTimeKey, DateTime<Utc>>>,
}

impl Default for CoverageClaimTimeCache {
fn default() -> Self {
Self::new()
}
}

impl CoverageClaimTimeCache {
pub fn new() -> Self {
let cache = Arc::new(Cache::new());
Expand Down Expand Up @@ -598,7 +603,7 @@ mod test {
)
.await
.unwrap();
let rewards: Vec<_> = covered_hexes.into_iter().collect();
let rewards: Vec<_> = covered_hexes.into_coverage_rewards().collect();
assert_eq!(
rewards,
vec![CoverageReward {
Expand Down Expand Up @@ -700,7 +705,7 @@ mod test {
)
.await
.unwrap();
let rewards: Vec<_> = covered_hexes.into_iter().collect();
let rewards: Vec<_> = covered_hexes.into_coverage_rewards().collect();
assert_eq!(
rewards,
vec![
Expand Down
97 changes: 48 additions & 49 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::{
cell_type::CellType,
coverage::{CoverageClaimTimeCache, CoveredHexCache, Seniority},
HasOwner,
};
use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc};
use file_store::{
Expand All @@ -17,7 +18,7 @@ use futures::{
use h3o::LatLng;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile as proto;
use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient};
use mobile_config::GatewayClient;
use retainer::Cache;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use sqlx::{Postgres, Transaction};
Expand Down Expand Up @@ -160,9 +161,8 @@ impl HeartbeatDaemon {
self.modeled_coverage_start,
latest_seniority,
);
seniority_update
.execute(&self.seniority_sink, &mut transaction)
.await?;
seniority_update.write(&self.seniority_sink).await?;
seniority_update.execute(&mut transaction).await?;

let key = (
heartbeat.heartbeat.cbsd_id.clone(),
Expand Down Expand Up @@ -219,7 +219,7 @@ impl HeartbeatReward {
heartbeats.cbsd_id,
cell_type,
coverage_objs.coverage_object,
coverage_objs.latest_timestamp,
coverage_objs.latest_timestamp
FROM heartbeats
JOIN coverage_objs ON heartbeats.cbsd_id = coverage_objs.cbsd_id
WHERE truncated_timestamp >= $1
Expand All @@ -229,7 +229,7 @@ impl HeartbeatReward {
hotspot_key,
cell_type,
coverage_objs.coverage_object,
coverage_objs.latest_timestamp,
coverage_objs.latest_timestamp
HAVING count(*) >= $3
"#,
)
Expand Down Expand Up @@ -260,31 +260,28 @@ impl Heartbeat {
}

pub fn validate_heartbeats<'a>(
gateway_client: &'a GatewayClient,
gateway_client: &'a impl HasOwner,
covered_hex_cache: &'a CoveredHexCache,
heartbeats: impl Stream<Item = CellHeartbeatIngestReport> + 'a,
epoch: &'a Range<DateTime<Utc>>,
max_distance: f64,
) -> impl Stream<Item = anyhow::Result<Self>> + 'a {
heartbeats.then(move |heartbeat_report| {
let mut gateway_client = gateway_client.clone();
async move {
let (cell_type, validity) = validate_heartbeat(
&heartbeat_report,
&mut gateway_client,
covered_hex_cache,
epoch,
max_distance,
)
.await?;
Ok(Heartbeat {
coverage_object: heartbeat_report.report.coverage_object(),
heartbeat: heartbeat_report.report,
received_timestamp: heartbeat_report.received_timestamp,
cell_type,
validity,
})
}
heartbeats.then(move |heartbeat_report| async move {
let (cell_type, validity) = validate_heartbeat(
&heartbeat_report,
gateway_client,
covered_hex_cache,
epoch,
max_distance,
)
.await?;
Ok(Heartbeat {
coverage_object: heartbeat_report.report.coverage_object(),
heartbeat: heartbeat_report.report,
received_timestamp: heartbeat_report.received_timestamp,
cell_type,
validity,
})
})
}

Expand Down Expand Up @@ -327,7 +324,7 @@ impl Heartbeat {
INSERT INTO heartbeats (cbsd_id, hotspot_key, cell_type, latest_timestamp, truncated_timestamp, coverage_object)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (cbsd_id, truncated_timestamp) DO UPDATE SET
latest_timestamp = EXCLUDED.latest_timestamp
latest_timestamp = EXCLUDED.latest_timestamp,
coverage_object = EXCLUDED.coverage_object
RETURNING (xmax = 0) as inserted
"#
Expand All @@ -347,7 +344,7 @@ impl Heartbeat {
/// Validate a heartbeat in the given epoch.
async fn validate_heartbeat(
heartbeat: &CellHeartbeatIngestReport,
gateway_client: &mut GatewayClient,
gateway_client: &impl HasOwner,
coverage_cache: &CoveredHexCache,
epoch: &Range<DateTime<Utc>>,
max_distance: f64,
Expand All @@ -365,11 +362,7 @@ async fn validate_heartbeat(
return Ok((cell_type, proto::HeartbeatValidity::HeartbeatOutsideRange));
}

if gateway_client
.resolve_gateway_info(&heartbeat.report.pubkey)
.await?
.is_none()
{
if gateway_client.has_owner(&heartbeat.report.pubkey).await? {
return Ok((cell_type, proto::HeartbeatValidity::GatewayOwnerNotFound));
}

Expand All @@ -396,7 +389,7 @@ async fn validate_heartbeat(
Ok((cell_type, proto::HeartbeatValidity::Valid))
}

struct SeniorityUpdate<'a> {
pub struct SeniorityUpdate<'a> {
heartbeat: &'a Heartbeat,
action: SeniorityUpdateAction,
}
Expand Down Expand Up @@ -482,11 +475,27 @@ impl<'a> SeniorityUpdate<'a> {
}

impl SeniorityUpdate<'_> {
pub async fn execute(
self,
seniorities: &FileSinkClient,
exec: &mut Transaction<'_, Postgres>,
) -> anyhow::Result<()> {
pub async fn write(&self, seniorities: &FileSinkClient) -> anyhow::Result<()> {
if let SeniorityUpdateAction::Insert {
new_seniority,
update_reason,
} = self.action
{
seniorities
.write(
proto::SeniorityUpdate {
cbsd_id: self.heartbeat.heartbeat.cbsd_id.to_string(),
new_seniority_timestamp: new_seniority.timestamp() as u64,
reason: update_reason as i32,
},
[],
)
.await?;
}
Ok(())
}

pub async fn execute(self, exec: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
match self.action {
SeniorityUpdateAction::NoAction => (),
SeniorityUpdateAction::Insert {
Expand All @@ -509,16 +518,6 @@ impl SeniorityUpdate<'_> {
.bind(update_reason as i32)
.execute(&mut *exec)
.await?;
seniorities
.write(
proto::SeniorityUpdate {
cbsd_id: self.heartbeat.heartbeat.cbsd_id.to_string(),
new_seniority_timestamp: new_seniority.timestamp() as u64,
reason: update_reason as i32,
},
[],
)
.await?;
}
SeniorityUpdateAction::Update { curr_seniority } => {
sqlx::query(
Expand Down
Loading

0 comments on commit 9b38ab4

Please sign in to comment.