Skip to content

Commit

Permalink
remove expired boosted hexes when streaming from db
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldjeffrey committed May 7, 2024
1 parent 5568963 commit 7822f79
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
48 changes: 46 additions & 2 deletions mobile_config/src/boosted_hex_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ impl BoostedHexInfo {
fn matches_device_type(&self, device_type: &BoostedHexDeviceType) -> bool {
self.device_type == *device_type || self.device_type == BoostedHexDeviceType::All
}

fn is_expired(&self, ts: &DateTime<Utc>) -> bool {
self.end_ts.is_some_and(|end| &end < ts)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -298,6 +302,7 @@ impl BoostedHexes {
pub(crate) mod db {
use super::{to_end_ts, to_start_ts, BoostedHexInfo};
use chrono::{DateTime, Duration, Utc};
use futures::future;
use futures::stream::{Stream, StreamExt};
use hextree::Cell;
use solana_sdk::pubkey::Pubkey;
Expand Down Expand Up @@ -336,12 +341,20 @@ pub(crate) mod db {
where hexes.refreshed_at > $1
"#;

pub fn all_info_stream<'a>(
pub fn all_info_stream_with_time_now<'a>(
db: impl PgExecutor<'a> + 'a,
) -> impl Stream<Item = BoostedHexInfo> + 'a {
all_info_stream(db, Utc::now())
}

fn all_info_stream<'a>(
db: impl PgExecutor<'a> + 'a,
now: DateTime<Utc>,
) -> impl Stream<Item = BoostedHexInfo> + 'a {
sqlx::query_as::<_, BoostedHexInfo>(GET_BOOSTED_HEX_INFO_SQL)
.fetch(db)
.filter_map(|info| async move { info.ok() })
.filter(move |info| future::ready(!info.is_expired(&now)))
.boxed()
}

Expand Down Expand Up @@ -645,6 +658,37 @@ mod tests {
Ok(())
}

#[sqlx::test]
async fn filter_expired_boosted_hexes(pool: PgPool) -> anyhow::Result<()> {
let boost_config_address = Pubkey::new_unique();
let now = Utc::now();

create_tables(&pool).await?;
insert_boost_config(&pool, &boost_config_address, now).await?;

let times = vec![
None, // unstarted
Some(now), // still boosting
Some(now - Duration::days(400)), // expired
];

for time in times {
insert_boosted_hex(
&pool,
&boost_config_address,
now,
Some(serde_json::json!("cbrsIndoor")),
time,
)
.await?;
}

assert_eq!(3, boosted_hexes_count(&pool).await?);
assert_eq!(2, streamed_hexes_count(&pool).await?);

Ok(())
}

async fn create_tables(pool: &PgPool) -> anyhow::Result<()> {
const CREATE_BOOSTED_HEXES_TABLE: &str = r#"
CREATE TABLE
Expand Down Expand Up @@ -776,7 +820,7 @@ mod tests {

async fn streamed_hexes_count(pool: &PgPool) -> anyhow::Result<usize> {
// If a row cannot be parsed, it is dropped from the stream with no erros.
let mut infos = super::db::all_info_stream(pool);
let mut infos = super::db::all_info_stream_with_time_now(pool);
let mut count = 0;
while let Some(_info) = infos.next().await {
// println!("info: {_info:?}");
Expand Down
2 changes: 1 addition & 1 deletion mobile_config/src/hex_boosting_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl mobile_config::HexBoosting for HexBoostingService {
let (tx, rx) = tokio::sync::mpsc::channel(100);

tokio::spawn(async move {
let stream = boosted_hex_info::db::all_info_stream(&pool);
let stream = boosted_hex_info::db::all_info_stream_with_time_now(&pool);
stream_multi_info(stream, tx.clone(), signing_key.clone(), batch_size).await
});

Expand Down

0 comments on commit 7822f79

Please sign in to comment.