Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modeled Coverage - Phase 3 #559

Closed
wants to merge 51 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0479b5f
Add ingest service for coverage object
maplant Jun 21, 2023
498685e
implement basic algorithm for computing coverage points
maplant Jun 26, 2023
bab457f
Fix bug where multiple signal levels would get mixed
maplant Jun 26, 2023
60814f7
Implement logic for modeled coverage
maplant Jun 27, 2023
e52dc60
Fmt
maplant Jun 27, 2023
e35ee73
Convert rewarding to take coverage into account
maplant Jun 28, 2023
7e04c77
Convert over most of the reward share tests, add fixes
Jul 3, 2023
54bcaed
Port over one more test
Jul 5, 2023
ca0434d
Address remaining issues
Jul 7, 2023
da1ccb2
Update file_store/src/error.rs
Jul 7, 2023
8a1e2a6
Update mobile_verifier/src/coverage.rs
Jul 10, 2023
57da60b
Fix tests, add setting, improve some of the code
maplant Jul 10, 2023
ba97a7e
Add coverage_object column to heartbeats
maplant Jul 10, 2023
74d6eb8
s/no/none/g
maplant Jul 10, 2023
64fa0b3
Correct hex_coverage table
maplant Jul 11, 2023
8c52ceb
Merge remote-tracking branch 'origin/main' into map/modeled-coverage
maplant Jul 11, 2023
a1c254d
Fmt and Fix
maplant Jul 11, 2023
6619bfd
Move coverage object around to satisfy OCD
maplant Jul 11, 2023
99505a3
Clippy
maplant Jul 12, 2023
6b5c69e
Fix CellHeartbeat to allow backwards compat w/ empty coverage_object
maplant Jul 12, 2023
035e4a9
Fix coverage claim time with first_timestamp
maplant Jul 12, 2023
242ad5a
Address comments
maplant Jul 13, 2023
349a92e
Move coverage claim time adjustment outside of stream
maplant Jul 13, 2023
567bd35
Move coverage claim time adjustment into heartbeat save
maplant Jul 13, 2023
a5c9d4d
Adjust migration
maplant Jul 17, 2023
9d1dffb
Move things around and cache
maplant Jul 17, 2023
0f5cd3d
Add fun todo :-)
maplant Jul 17, 2023
cb0ae3c
Correct seniority updates
maplant Jul 18, 2023
d886af8
Merge remote-tracking branch 'origin/main' into map/modeled-coverage
maplant Jul 18, 2023
5cd1b75
Fix coverage claim time adjustment
maplant Jul 18, 2023
fee3eb3
Fix seniority
maplant Jul 19, 2023
e35c82b
CMove to insert or update model for seniority
maplant Jul 19, 2023
985c990
Refactor coverage and heartbeats a little bit
maplant Jul 21, 2023
d066c29
Merge remote-tracking branch 'origin/main' into map/modeled-coverage
maplant Jul 24, 2023
a71a601
Refactor into_rewards a bit
maplant Jul 24, 2023
1deea42
Clippy
maplant Jul 24, 2023
9c88a27
Delete old hex coverages
maplant Jul 24, 2023
45638d5
Fix deletion and validated heartbeats stream
maplant Jul 25, 2023
8201e1b
Unnet into_rewards
maplant Jul 26, 2023
9301ad7
Clippy
maplant Jul 27, 2023
b8b719b
Embed heartbeat object
maplant Aug 2, 2023
eccb3f5
Add cbsd_id to seniority deletion
maplant Aug 16, 2023
c2827a8
Address issues with seniority
maplant Aug 16, 2023
83f0ce6
Merge remote-tracking branch 'origin/main' into map/modeled-coverage
maplant Aug 22, 2023
12b9c96
Use timestamp instead of datetime
maplant Aug 22, 2023
e288a10
Refactor seniority updates to make them more easily testable
maplant Aug 22, 2023
cf76f30
First seniority unit test
maplant Aug 22, 2023
25d787a
Add tests for seniority updates
maplant Aug 22, 2023
34057e7
Merge remote-tracking branch 'origin/main' into map/modeled-coverage
maplant Aug 22, 2023
9ec0f69
Add initial coverage object tests
maplant Aug 22, 2023
9b38ab4
Add modeled coverage integration tests
maplant Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mobile_verifier/migrations/15_modeled_coverage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ CREATE TABLE seniority (
seniority_ts TIMESTAMPTZ NOT NULL,
last_heartbeat TIMESTAMPTZ NOT NULL,
uuid UUID NOT NULL,
update_reason INT NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is update_reason an int and not an enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because then we can have it be the same type as the proto enum

PRIMARY KEY (cbsd_id, seniority_ts)
);

Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Cmd {
valid_heartbeats,
seniority_updates,
settings.max_heartbeat_distance_from_coverage_km,
settings.modeled_coverage_start_timestamp,
);

// Speedtests
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ pub struct Seniority {
pub seniority_ts: DateTime<Utc>,
pub last_heartbeat: DateTime<Utc>,
pub inserted_at: DateTime<Utc>,
pub update_reason: i32,
}

#[async_trait::async_trait]
Expand Down
60 changes: 43 additions & 17 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
};
use chrono::{DateTime, Duration, DurationRound, RoundingError, Utc};
use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
heartbeat::{CellHeartbeatIngestReport, CellHeartbeat},
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
heartbeat::{CellHeartbeat, CellHeartbeatIngestReport},
};
use futures::{
stream::{Stream, StreamExt, TryStreamExt},
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct HeartbeatDaemon {
heartbeat_sink: FileSinkClient,
seniority_sink: FileSinkClient,
max_distance: f64,
modeled_coverage_start_timestamp: DateTime<Utc>,
}

impl HeartbeatDaemon {
Expand All @@ -62,6 +64,7 @@ impl HeartbeatDaemon {
heartbeat_sink: FileSinkClient,
seniority_sink: FileSinkClient,
max_distance: f64,
modeled_coverage_start_timestamp: DateTime<Utc>,
) -> Self {
Self {
pool,
Expand All @@ -70,6 +73,7 @@ impl HeartbeatDaemon {
heartbeat_sink,
seniority_sink,
max_distance,
modeled_coverage_start_timestamp,
}
}

Expand Down Expand Up @@ -145,7 +149,12 @@ impl HeartbeatDaemon {

heartbeat.write(&self.heartbeat_sink).await?;
heartbeat
.update_seniority(coverage_claim_time, &self.seniority_sink, &mut transaction)
.update_seniority(
coverage_claim_time,
self.modeled_coverage_start_timestamp,
&self.seniority_sink,
&mut transaction,
)
.await?;

let key = (
Expand Down Expand Up @@ -230,12 +239,13 @@ pub struct Heartbeat {
pub heartbeat: CellHeartbeat,
pub cell_type: Option<CellType>,
pub coverage_object: Option<Uuid>,
pub received_timestamp: DateTime<Utc>,
pub validity: proto::HeartbeatValidity,
}

impl Heartbeat {
pub fn truncated_timestamp(&self) -> Result<DateTime<Utc>, RoundingError> {
self.heartbeat.timestamp.duration_trunc(Duration::hours(1))
self.received_timestamp.duration_trunc(Duration::hours(1))
}

pub fn validate_heartbeats<'a>(
Expand All @@ -259,6 +269,7 @@ impl Heartbeat {
Ok(Heartbeat {
coverage_object: heartbeat_report.report.coverage_object(),
heartbeat: heartbeat_report.report,
received_timestamp: heartbeat_report.received_timestamp,
cell_type,
validity,
})
Expand All @@ -277,7 +288,7 @@ impl Heartbeat {
.map_or(0.0, |ct| ct.reward_weight().to_f32().unwrap_or(0.0)),
cell_type: self.cell_type.unwrap_or(CellType::Neutrino430) as i32, // Is this the right default?
validity: self.validity as i32,
timestamp: self.heartbeat.timestamp.timestamp() as u64,
timestamp: self.received_timestamp.timestamp() as u64,
lat: self.heartbeat.lat,
lon: self.heartbeat.lon,
coverage_object: self
Expand All @@ -294,9 +305,12 @@ impl Heartbeat {
pub async fn update_seniority(
&self,
coverage_claim_time: DateTime<Utc>,
modeled_coverage_start_timestamp: DateTime<Utc>,
seniorities: &FileSinkClient,
exec: &mut Transaction<'_, Postgres>,
) -> anyhow::Result<()> {
use proto::SeniorityUpdateReason::*;

enum InsertOrUpdate {
Insert(proto::SeniorityUpdateReason),
Update(DateTime<Utc>),
Expand All @@ -311,27 +325,38 @@ impl Heartbeat {
.await?
{
if self.coverage_object != Some(prev_seniority.uuid) {
if prev_seniority.update_reason == HeartbeatNotSeen as i32
&& coverage_claim_time < prev_seniority.seniority_ts
{
return Ok(());
}
(
coverage_claim_time,
InsertOrUpdate::Insert(proto::SeniorityUpdateReason::NewCoverageClaimTime),
InsertOrUpdate::Insert(NewCoverageClaimTime),
)
} else if self.heartbeat.timestamp - prev_seniority.last_heartbeat > Duration::days(3)
&& coverage_claim_time < self.heartbeat.timestamp
} else if self.received_timestamp - prev_seniority.last_heartbeat > Duration::days(3)
&& coverage_claim_time < self.received_timestamp
{
(
self.heartbeat.timestamp,
InsertOrUpdate::Insert(proto::SeniorityUpdateReason::HeartbeatNotSeen),
self.received_timestamp,
InsertOrUpdate::Insert(HeartbeatNotSeen),
)
} else {
(
coverage_claim_time,
InsertOrUpdate::Update(prev_seniority.seniority_ts),
)
}
} else if self.received_timestamp - modeled_coverage_start_timestamp > Duration::days(3) {
// This will become the default case 72 hours after we launch modeled coverage
(
self.received_timestamp,
InsertOrUpdate::Insert(HeartbeatNotSeen),
)
} else {
(
coverage_claim_time,
InsertOrUpdate::Insert(proto::SeniorityUpdateReason::NewCoverageClaimTime),
InsertOrUpdate::Insert(NewCoverageClaimTime),
)
};

Expand All @@ -340,16 +365,17 @@ impl Heartbeat {
sqlx::query(
r#"
INSERT INTO seniority
(cbsd_id, last_heartbeat, uuid, seniority_ts, inserted_at)
(cbsd_id, last_heartbeat, uuid, seniority_ts, inserted_at, update_reason)
VALUES
($1, $2, $3, $4, $5)
($1, $2, $3, $4, $5, $6)
"#,
)
.bind(&self.heartbeat.cbsd_id)
.bind(self.heartbeat.timestamp)
.bind(self.received_timestamp)
.bind(self.coverage_object)
.bind(seniority_ts)
.bind(self.heartbeat.timestamp)
.bind(self.received_timestamp)
.bind(update_reason as i32)
.execute(&mut *exec)
.await?;
seniorities
Expand All @@ -373,7 +399,7 @@ impl Heartbeat {
seniority_ts = $3
"#,
)
.bind(self.heartbeat.timestamp)
.bind(self.received_timestamp)
.bind(&self.heartbeat.cbsd_id)
.bind(seniority_ts)
.execute(&mut *exec)
Expand Down Expand Up @@ -411,7 +437,7 @@ impl Heartbeat {
.bind(self.heartbeat.cbsd_id)
.bind(self.heartbeat.pubkey)
.bind(self.cell_type.unwrap())
.bind(self.heartbeat.timestamp)
.bind(self.received_timestamp)
.bind(truncated_timestamp)
.bind(self.coverage_object)
.fetch_one(&mut *exec)
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct Settings {
pub max_heartbeat_distance_from_coverage_km: f64,
#[serde(default = "default_disable_discovery_loc_rewards_to_s3")]
pub disable_discovery_loc_rewards_to_s3: bool,
pub modeled_coverage_start_timestamp: DateTime<Utc>,
}

pub fn default_disable_discovery_loc_rewards_to_s3() -> bool {
Expand Down