Skip to content

Commit

Permalink
optimize by computing end_ts in db query
Browse files Browse the repository at this point in the history
Thanks for the query help Brian!

By precomputing the end timestamp of a boosted hex, we can not have to
stream all the hexes out of the db just to throw them away.
  • Loading branch information
michaeldjeffrey committed May 1, 2024
1 parent f1ac715 commit 703e94a
Showing 1 changed file with 47 additions and 27 deletions.
74 changes: 47 additions & 27 deletions mobile_config/src/boosted_hex_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl TryFrom<BoostedHexInfoProto> for BoostedHexInfo {
.map(NonZeroU32::new)
.collect::<Option<Vec<_>>>()
.ok_or_else(|| anyhow::anyhow!("multipliers cannot contain values of 0"))?;
let start_ts = to_start_ts(v.start_ts);
let start_ts = to_timestamp(v.start_ts);
let end_ts = to_end_ts(start_ts, period_length, multipliers.len());
let boosted_hex_pubkey: Pubkey = Pubkey::try_from(v.boosted_hex_pubkey.as_slice())?;
let boost_config_pubkey: Pubkey = Pubkey::try_from(v.boost_config_pubkey.as_slice())?;
Expand Down Expand Up @@ -111,10 +111,6 @@ impl BoostedHexInfo {
fn matches_device_type(&self, device_type: &BoostedHexDeviceType) -> bool {
self.device_type == *device_type || self.device_type == BoostedHexDeviceType::All
}

fn is_expired(&self, ts: &DateTime<Utc>) -> bool {
self.end_ts.is_some_and(|end| &end < ts)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -280,7 +276,7 @@ impl BoostedHexes {

pub fn insert(&mut self, info: BoostedHexInfo) {
#[cfg(test)]
if info.is_expired(&Utc::now()) {
if info.end_ts.is_some_and(|end| end < Utc::now()) {
// mobile-config does not deliver expired boosts from the database.
// Tests using this struct to mimic mobile-config should uphold the
// same contract.
Expand All @@ -292,9 +288,9 @@ impl BoostedHexes {
}

pub(crate) mod db {
use super::{to_end_ts, to_start_ts, BoostedHexInfo};

use super::{to_timestamp, BoostedHexInfo};
use chrono::{DateTime, Duration, Utc};
use futures::future;
use futures::stream::{Stream, StreamExt};
use hextree::Cell;
use solana_sdk::pubkey::Pubkey;
Expand All @@ -304,33 +300,57 @@ pub(crate) mod db {
use std::str::FromStr;

const GET_BOOSTED_HEX_INFO_SQL: &str = r#"
select
CAST(hexes.location as bigint),
CAST(hexes.start_ts as bigint),
WITH boosted_hexes_replacement AS (
SELECT
h.*,
CASE
WHEN start_ts = 0 THEN 0
ELSE h.start_ts + (c.period_length * length(h.boosts_by_period))
END AS end_ts
FROM boost_configs c
INNER JOIN boosted_hexes h ON c.address = h.boost_config
)
SELECT
CAST(hexes.location as bigint),
CAST(hexes.start_ts as bigint),
CAST(hexes.end_ts as bigint),
config.period_length,
hexes.boosts_by_period as multipliers,
hexes.address as boosted_hex_pubkey,
hexes.address as boosted_hex_pubkey,
config.address as boost_config_pubkey,
hexes.version,
hexes.device_type
from boosted_hexes hexes
from boosted_hexes_replacement hexes
join boost_configs config on hexes.boost_config = config.address
WHERE
hexes.start_ts = 0
OR hexes.end_ts > date_part('epoch', $1)
"#;

// TODO: reuse with string above
// NOTE(mj): modified hexes should be returned regardless of expiration status
const GET_MODIFIED_BOOSTED_HEX_INFO_SQL: &str = r#"
select
CAST(hexes.location as bigint),
CAST(hexes.start_ts as bigint),
WITH boosted_hexes_replacement AS (
SELECT
h.*,
CASE
WHEN start_ts = 0 THEN 0
ELSE h.start_ts + (c.period_length * length(h.boosts_by_period))
END AS end_ts
FROM boost_configs c
INNER JOIN boosted_hexes h ON c.address = h.boost_config
)
SELECT
CAST(hexes.location AS bigint),
CAST(hexes.start_ts AS bigint),
config.period_length,
hexes.boosts_by_period as multipliers,
hexes.address as boosted_hex_pubkey,
config.address as boost_config_pubkey,
hexes.boosts_by_period AS multipliers,
hexes.address AS boosted_hex_pubkey,
config.address AS boost_config_pubkey,
hexes.version,
hexes.device_type
from boosted_hexes hexes
join boost_configs config on hexes.boost_config = config.address
where hexes.refreshed_at > $1
FROM boosted_hexes_replacement hexes
JOIN boost_configs config ON hexes.boost_config = config.address
WHERE hexes.refreshed_at > $1
"#;

pub fn all_info_stream_with_time_now<'a>(
Expand All @@ -344,9 +364,9 @@ pub(crate) mod db {
now: DateTime<Utc>,
) -> impl Stream<Item = BoostedHexInfo> + 'a {
sqlx::query_as::<_, BoostedHexInfo>(GET_BOOSTED_HEX_INFO_SQL)
.bind(now)
.fetch(db)
.filter_map(|info| async move { info.ok() })
.filter(move |info| future::ready(!info.is_expired(&now)))
.boxed()
}

Expand All @@ -364,7 +384,7 @@ pub(crate) mod db {
impl sqlx::FromRow<'_, sqlx::postgres::PgRow> for BoostedHexInfo {
fn from_row(row: &sqlx::postgres::PgRow) -> sqlx::Result<Self> {
let period_length = Duration::seconds(row.get::<i32, &str>("period_length") as i64);
let start_ts = to_start_ts(row.get::<i64, &str>("start_ts") as u64);
let start_ts = to_timestamp(row.get::<i64, &str>("start_ts") as u64);
let multipliers = row
.get::<Vec<u8>, &str>("multipliers")
.into_iter()
Expand All @@ -373,7 +393,7 @@ pub(crate) mod db {
.ok_or_else(|| {
sqlx::Error::Decode(Box::from("multipliers cannot contain values of 0"))
})?;
let end_ts = to_end_ts(start_ts, period_length, multipliers.len());
let end_ts = to_timestamp(row.get::<i64, &str>("end_ts") as u64);
let boost_config_pubkey =
Pubkey::from_str(row.get::<&str, &str>("boost_config_pubkey"))
.map_err(|e| sqlx::Error::Decode(Box::new(e)))?;
Expand Down Expand Up @@ -407,7 +427,7 @@ pub(crate) mod db {
}
}

fn to_start_ts(timestamp: u64) -> Option<DateTime<Utc>> {
fn to_timestamp(timestamp: u64) -> Option<DateTime<Utc>> {
if timestamp == 0 {
None
} else {
Expand Down

0 comments on commit 703e94a

Please sign in to comment.