Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract data set processing to a trait #826

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 83 additions & 21 deletions mobile_verifier/src/boosting_oracles/data_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ where
h.urbanization.is_ready() && h.footfall.is_ready() && h.landtype.is_ready()
}

pub struct DataSetDownloaderDaemon<A, B, C> {
pub struct DataSetDownloaderDaemon<A, B, C, T> {
pool: PgPool,
data_sets: HexBoostData<A, B, C>,
store: FileStore,
oracle_boosting_sink: FileSinkClient,
data_set_processor: T,
data_set_directory: PathBuf,
new_coverage_object_notification: NewCoverageObjectNotification,
poll_duration: Duration,
Expand Down Expand Up @@ -217,11 +217,12 @@ impl DataSetStatus {
}
}

impl<A, B, C> ManagedTask for DataSetDownloaderDaemon<A, B, C>
impl<A, B, C, T> ManagedTask for DataSetDownloaderDaemon<A, B, C, T>
where
A: DataSet,
B: DataSet,
C: DataSet,
T: DataSetProcessor,
{
fn start_task(
self: Box<Self>,
Expand All @@ -243,7 +244,7 @@ where
}
}

impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization> {
impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization, FileSinkClient> {
pub async fn create_managed_task(
pool: PgPool,
settings: &Settings,
Expand Down Expand Up @@ -288,17 +289,18 @@ impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization> {
}
}

impl<A, B, C> DataSetDownloaderDaemon<A, B, C>
impl<A, B, C, T> DataSetDownloaderDaemon<A, B, C, T>
where
A: DataSet,
B: DataSet,
C: DataSet,
T: DataSetProcessor,
{
pub fn new(
pool: PgPool,
data_sets: HexBoostData<A, B, C>,
store: FileStore,
oracle_boosting_sink: FileSinkClient,
data_set_processor: T,
data_set_directory: PathBuf,
new_coverage_object_notification: NewCoverageObjectNotification,
poll_duration: Duration,
Expand All @@ -307,7 +309,7 @@ where
pool,
data_sets,
store,
oracle_boosting_sink,
data_set_processor,
data_set_directory,
new_coverage_object_notification,
poll_duration,
Expand Down Expand Up @@ -337,12 +339,9 @@ where
new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some();
if is_hex_boost_data_ready(&self.data_sets) && new_data_set {
tracing::info!("Processing new data sets");
set_all_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink,
)
.await?;
self.data_set_processor
.set_all_oracle_boosting_assignments(&self.pool, &self.data_sets)
.await?;
}

// Mark the new data sets as processed and delete the old ones
Expand Down Expand Up @@ -394,12 +393,9 @@ where
// Attempt to fill in any unassigned hexes. This is for the edge case in
// which we shutdown before a coverage object updates.
if is_hex_boost_data_ready(&self.data_sets) {
set_unassigned_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink,
)
.await?;
self.data_set_processor
.set_unassigned_oracle_boosting_assignments(&self.pool, &self.data_sets)
.await?;
}

let mut wakeup = Instant::now() + self.poll_duration;
Expand All @@ -410,10 +406,9 @@ where
// If we see a new coverage object, we want to assign only those hexes
// that don't have an assignment
if is_hex_boost_data_ready(&self.data_sets) {
set_unassigned_oracle_boosting_assignments(
self.data_set_processor.set_unassigned_oracle_boosting_assignments(
&self.pool,
&self.data_sets,
&self.oracle_boosting_sink
).await?;
}
},
Expand Down Expand Up @@ -507,6 +502,73 @@ impl DataSetType {
}
}

#[async_trait::async_trait]
pub trait DataSetProcessor: Send + Sync + 'static {
async fn set_all_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()>;

async fn set_unassigned_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()>;
}

#[async_trait::async_trait]
impl DataSetProcessor for FileSinkClient {
async fn set_all_oracle_boosting_assignments(
maplant marked this conversation as resolved.
Show resolved Hide resolved
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
let assigned_coverage_objs =
AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets)
.await?;
assigned_coverage_objs.write(self).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}

async fn set_unassigned_oracle_boosting_assignments(
&self,
pool: &PgPool,
data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream(
db::fetch_hexes_with_null_assignments(pool),
data_sets,
)
.await?;
assigned_coverage_objs.write(self).await?;
assigned_coverage_objs.save(pool).await?;
Ok(())
}
}

pub struct NopDataSetProcessor;

#[async_trait::async_trait]
impl DataSetProcessor for NopDataSetProcessor {
async fn set_all_oracle_boosting_assignments(
&self,
_pool: &PgPool,
_data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
Ok(())
}

async fn set_unassigned_oracle_boosting_assignments(
&self,
_pool: &PgPool,
_data_sets: &HexBoostData<impl HexAssignment, impl HexAssignment, impl HexAssignment>,
) -> anyhow::Result<()> {
Ok(())
}
}

pub mod db {
use super::*;

Expand Down