Skip to content

Commit

Permalink
Abstract data set processing to a trait (#826)
Browse files Browse the repository at this point in the history
* Abstract data set processing to a trait

* Remove unused functions
  • Loading branch information
Matthew Plant authored Jun 7, 2024
1 parent 08cd5aa commit fa80db4
Showing 1 changed file with 83 additions and 48 deletions.
131 changes: 83 additions & 48 deletions mobile_verifier/src/boosting_oracles/data_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ where
h.urbanization.is_ready() && h.footfall.is_ready() && h.landtype.is_ready()
}

pub struct DataSetDownloaderDaemon<A, B, C> {
pub struct DataSetDownloaderDaemon<A, B, C, T> {
pool: PgPool,
data_sets: HexBoostData<A, B, C>,
store: FileStore,
oracle_boosting_sink: FileSinkClient,
data_set_processor: T,
data_set_directory: PathBuf,
new_coverage_object_notification: NewCoverageObjectNotification,
poll_duration: Duration,
Expand Down Expand Up @@ -217,11 +217,12 @@ impl DataSetStatus {
}
}

impl<A, B, C> ManagedTask for DataSetDownloaderDaemon<A, B, C>
impl<A, B, C, T> ManagedTask for DataSetDownloaderDaemon<A, B, C, T>
where
A: DataSet,
B: DataSet,
C: DataSet,
T: DataSetProcessor,
{
fn start_task(
self: Box<Self>,
Expand All @@ -243,7 +244,7 @@ where
}
}

impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization> {
impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization, FileSinkClient> {
pub async fn create_managed_task(
pool: PgPool,
settings: &Settings,
Expand Down Expand Up @@ -288,17 +289,18 @@ impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization> {
}
}

impl<A, B, C> DataSetDownloaderDaemon<A, B, C>
impl<A, B, C, T> DataSetDownloaderDaemon<A, B, C, T>
where
A: DataSet,
B: DataSet,
C: DataSet,
T: DataSetProcessor,
{
pub fn new(
pool: PgPool,
data_sets: HexBoostData<A, B, C>,
store: FileStore,
oracle_boosting_sink: FileSinkClient,
data_set_processor: T,
data_set_directory: PathBuf,
new_coverage_object_notification: NewCoverageObjectNotification,
poll_duration: Duration,
Expand All @@ -307,7 +309,7 @@ where
pool,
data_sets,
store,
oracle_boosting_sink,
data_set_processor,
data_set_directory,
new_coverage_object_notification,
poll_duration,
Expand Down Expand Up @@ -337,12 +339,9 @@ where
new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some();
if is_hex_boost_data_ready(&self.data_sets) && new_data_set {
tracing::info!("Processing new data sets");
set_all_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink,
)
.await?;
self.data_set_processor
.set_all_oracle_boosting_assignments(&self.pool, &self.data_sets)
.await?;
}

// Mark the new data sets as processed and delete the old ones
Expand Down Expand Up @@ -394,12 +393,9 @@ where
// Attempt to fill in any unassigned hexes. This is for the edge case in
// which we shutdown before a coverage object updates.
if is_hex_boost_data_ready(&self.data_sets) {
set_unassigned_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink,
)
.await?;
self.data_set_processor
.set_unassigned_oracle_boosting_assignments(&self.pool, &self.data_sets)
.await?;
}

let mut wakeup = Instant::now() + self.poll_duration;
Expand All @@ -410,10 +406,9 @@ where
// If we see a new coverage object, we want to assign only those hexes
// that don't have an assignment
if is_hex_boost_data_ready(&self.data_sets) {
set_unassigned_oracle_boosting_assignments(
self.data_set_processor.set_unassigned_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink
).await?;
}
},
Expand Down Expand Up @@ -507,6 +502,73 @@ impl DataSetType {
}
}

#[async_trait::async_trait]
pub trait DataSetProcessor: Send + Sync + 'static {
async fn set_all_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()>;

async fn set_unassigned_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()>;
}

#[async_trait::async_trait]
impl DataSetProcessor for FileSinkClient {
async fn set_all_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
let assigned_coverage_objs =
AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets)
.await?;
assigned_coverage_objs.write(self).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}

async fn set_unassigned_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream(
db::fetch_hexes_with_null_assignments(pool),
data_sets,
)
.await?;
assigned_coverage_objs.write(self).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}
}

pub struct NopDataSetProcessor;

#[async_trait::async_trait]
impl DataSetProcessor for NopDataSetProcessor {
async fn set_all_oracle_boosting_assignments(
&self,
_pool: &PgPool,
_data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
Ok(())
}

async fn set_unassigned_oracle_boosting_assignments(
&self,
_pool: &PgPool,
_data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
Ok(())
}
}

pub mod db {
use super::*;

Expand Down Expand Up @@ -771,30 +833,3 @@ pub struct AssignedHex {
pub signal_power: i32,
pub assignments: HexAssignments,
}

pub async fn set_all_oracle_boosting_assignments(
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
file_sink: &FileSinkClient,
) -> anyhow::Result<()> {
let assigned_coverage_objs =
AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets).await?;
assigned_coverage_objs.write(file_sink).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}

pub async fn set_unassigned_oracle_boosting_assignments(
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
file_sink: &FileSinkClient,
) -> anyhow::Result<()> {
let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream(
db::fetch_hexes_with_null_assignments(pool),
data_sets,
)
.await?;
assigned_coverage_objs.write(file_sink).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}

0 comments on commit fa80db4

Please sign in to comment.