Skip to content

Commit

Permalink
fixup after rebase
Browse files Browse the repository at this point in the history
- include new verified mapper code
- update deps
- fix botched rebase conflicts
  • Loading branch information
michaeldjeffrey committed Aug 19, 2024
1 parent 68d9a1c commit b5bbad4
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 76 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ impl_file_sink!(
FileType::SubscriberLocationIngestReport,
"subscriber_location_report"
);
impl_file_sink!(
poc_mobile::SubscriberVerifiedMappingEventIngestReportV1,
FileType::SubscriberVerifiedMappingEventIngestReport,
"subscriber_verified_mapping_event_ingest_report"
);
impl_file_sink!(
poc_mobile::VerifiedInvalidatedRadioThresholdIngestReportV1,
FileType::VerifiedInvalidatedRadioThresholdIngestReport,
Expand All @@ -212,6 +217,11 @@ impl_file_sink!(
FileType::VerifiedSubscriberLocationIngestReport,
"verified_subscriber_location"
);
impl_file_sink!(
poc_mobile::VerifiedSubscriberVerifiedMappingEventIngestReportV1,
FileType::VerifiedSubscriberVerifiedMappingEventIngestReport,
"verified_subscriber_verified_mapping_event_ingest_report"
);
impl_file_sink!(
poc_mobile::WifiHeartbeatIngestReportV1,
FileType::WifiHeartbeatIngestReport,
Expand Down
22 changes: 10 additions & 12 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ pub struct GrpcServer {
coverage_object_report_sink: FileSinkClient<CoverageObjectIngestReportV1>,
sp_boosted_rewards_ban_sink:
FileSinkClient<ServiceProviderBoostedRewardsBannedRadioIngestReportV1>,
subscriber_mapping_event_sink:
FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand Down Expand Up @@ -78,9 +77,13 @@ impl GrpcServer {
data_transfer_session_sink: FileSinkClient<DataTransferSessionIngestReportV1>,
subscriber_location_report_sink: FileSinkClient<SubscriberLocationIngestReportV1>,
radio_threshold_report_sink: FileSinkClient<RadioThresholdIngestReportV1>,
invalidated_radio_threshold_report_sink: FileSinkClient<InvalidatedRadioThresholdIngestReportV1>,
invalidated_radio_threshold_report_sink: FileSinkClient<
InvalidatedRadioThresholdIngestReportV1,
>,
coverage_object_report_sink: FileSinkClient<CoverageObjectIngestReportV1>,
sp_boosted_rewards_ban_sink: FileSinkClient<ServiceProviderBoostedRewardsBannedRadioIngestReportV1>,
sp_boosted_rewards_ban_sink: FileSinkClient<
ServiceProviderBoostedRewardsBannedRadioIngestReportV1,
>,
subscriber_mapping_event_sink: FileSinkClient<SubscriberVerifiedMappingEventIngestReportV1>,
required_network: Network,
address: SocketAddr,
Expand Down Expand Up @@ -525,17 +528,12 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.await?;

let (subscriber_mapping_event_sink, subscriber_mapping_event_server) =
file_sink::FileSinkBuilder::new(
FileType::SubscriberVerifiedMappingEventIngestReport,
SubscriberVerifiedMappingEventIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
concat!(
env!("CARGO_PKG_NAME"),
"_subscriber_verified_mapping_event_ingest_report"
),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.roll_time(settings.roll_time)
.create()
.await?;

let Some(api_token) = settings
Expand Down
55 changes: 36 additions & 19 deletions ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use backon::{ExponentialBuilder, Retryable};
use file_store::file_sink::FileSinkClient;
use helium_crypto::{KeyTag, Keypair, Network, Sign};
use helium_proto::services::poc_mobile::{
Client as PocMobileClient, SubscriberVerifiedMappingEventReqV1,
SubscriberVerifiedMappingEventResV1,
Client as PocMobileClient, SubscriberVerifiedMappingEventIngestReportV1,
SubscriberVerifiedMappingEventReqV1, SubscriberVerifiedMappingEventResV1,
};
use ingest::server_mobile::GrpcServer;
use prost::Message;
Expand All @@ -21,9 +21,6 @@ use triggered::Trigger;
pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
let key_pair = generate_keypair();

let (file_sink_tx, file_sink_rx) = tokio::sync::mpsc::channel(10);
let file_sink = FileSinkClient::new(file_sink_tx, "test_file_sync");

let socket_addr = {
let tcp_listener = TcpListener::bind("127.0.0.1:0").await?;
tcp_listener.local_addr()?
Expand All @@ -37,18 +34,29 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {

let (trigger, listener) = triggered::trigger();

let (cbrs_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10);
let (wifi_heartbeat_tx, _rx) = tokio::sync::mpsc::channel(10);
let (speedtest_tx, _rx) = tokio::sync::mpsc::channel(10);
let (data_transfer_tx, _rx) = tokio::sync::mpsc::channel(10);
let (subscriber_location_tx, _rx) = tokio::sync::mpsc::channel(10);
let (radio_threshold_tx, _rx) = tokio::sync::mpsc::channel(10);
let (invalidated_threshold_tx, _rx) = tokio::sync::mpsc::channel(10);
let (coverage_obj_tx, _rx) = tokio::sync::mpsc::channel(10);
let (sp_boosted_tx, _rx) = tokio::sync::mpsc::channel(10);
let (subscriber_mapping_tx, subscriber_mapping_rx) = tokio::sync::mpsc::channel(10);

tokio::spawn(async move {
let grpc_server = GrpcServer::new(
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
file_sink.clone(),
FileSinkClient::new(cbrs_heartbeat_tx, "noop"),
FileSinkClient::new(wifi_heartbeat_tx, "noop"),
FileSinkClient::new(speedtest_tx, "noop"),
FileSinkClient::new(data_transfer_tx, "noop"),
FileSinkClient::new(subscriber_location_tx, "noop"),
FileSinkClient::new(radio_threshold_tx, "noop"),
FileSinkClient::new(invalidated_threshold_tx, "noop"),
FileSinkClient::new(coverage_obj_tx, "noop"),
FileSinkClient::new(sp_boosted_tx, "noop"),
FileSinkClient::new(subscriber_mapping_tx, "test_file_sink"),
Network::MainNet,
socket_addr,
api_token,
Expand All @@ -57,7 +65,13 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
grpc_server.run(listener).await
});

let client = TestClient::new(socket_addr, key_pair, token.to_string(), file_sink_rx).await;
let client = TestClient::new(
socket_addr,
key_pair,
token.to_string(),
subscriber_mapping_rx,
)
.await;

Ok((client, trigger))
}
Expand All @@ -66,15 +80,18 @@ pub struct TestClient {
client: PocMobileClient<Channel>,
key_pair: Arc<Keypair>,
authorization: MetadataValue<Ascii>,
file_sink_rx: Receiver<file_store::file_sink::Message>,
file_sink_rx:
Receiver<file_store::file_sink::Message<SubscriberVerifiedMappingEventIngestReportV1>>,
}

impl TestClient {
pub async fn new(
socket_addr: SocketAddr,
key_pair: Keypair,
api_token: String,
file_sink_rx: Receiver<file_store::file_sink::Message>,
file_sink_rx: Receiver<
file_store::file_sink::Message<SubscriberVerifiedMappingEventIngestReportV1>,
>,
) -> TestClient {
let client = (|| PocMobileClient::connect(format!("http://{socket_addr}")))
.retry(&ExponentialBuilder::default())
Expand All @@ -89,7 +106,7 @@ impl TestClient {
}
}

pub async fn recv(mut self) -> anyhow::Result<Vec<u8>> {
pub async fn recv(mut self) -> anyhow::Result<SubscriberVerifiedMappingEventIngestReportV1> {
match timeout(Duration::from_secs(2), self.file_sink_rx.recv()).await {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Commit(_) => bail!("got Commit"),
Expand Down
8 changes: 1 addition & 7 deletions ingest/tests/mobile_ingest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use helium_proto::services::poc_mobile::SubscriberVerifiedMappingEventIngestReportV1;
use prost::Message;

mod common;

#[tokio::test]
Expand All @@ -19,10 +16,7 @@ async fn submit_verified_subscriber_mapping_event() -> anyhow::Result<()> {
let timestamp: String = res.unwrap().id;

match client.recv().await {
Ok(data) => {
let report = SubscriberVerifiedMappingEventIngestReportV1::decode(data.as_slice())
.expect("unable to decode into SubscriberVerifiedMappingEventIngestReportV1");

Ok(report) => {
assert_eq!(timestamp, report.received_timestamp.to_string());

match report.report {
Expand Down
25 changes: 11 additions & 14 deletions mobile_verifier/src/subscriber_verified_mapping_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use crate::Settings;
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::{self, FileSinkClient},
file_sink::FileSinkClient,
file_source,
file_upload::FileUpload,
subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent,
subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport,
FileStore, FileType,
};
Expand All @@ -32,7 +33,7 @@ pub struct SubscriberVerifiedMappingEventDaemon<AV, EV> {
authorization_verifier: AV,
entity_verifier: EV,
reports_receiver: Receiver<FileInfoStream<SubscriberVerifiedMappingEventIngestReport>>,
verified_report_sink: FileSinkClient,
verified_report_sink: FileSinkClient<VerifiedSubscriberVerifiedMappingEventIngestReportV1>,
}

impl<AV, EV> SubscriberVerifiedMappingEventDaemon<AV, EV>
Expand All @@ -45,7 +46,7 @@ where
authorization_verifier: AV,
entity_verifier: EV,
reports_receiver: Receiver<FileInfoStream<SubscriberVerifiedMappingEventIngestReport>>,
verified_report_sink: FileSinkClient,
verified_report_sink: FileSinkClient<VerifiedSubscriberVerifiedMappingEventIngestReportV1>,
) -> Self {
Self {
pool,
Expand Down Expand Up @@ -73,18 +74,14 @@ where
.create()
.await?;

let (verified_report_sink, verified_report_sink_server) = file_sink::FileSinkBuilder::new(
FileType::VerifiedSubscriberVerifiedMappingEventIngestReport,
settings.store_base_path(),
file_upload.clone(),
concat!(
let (verified_report_sink, verified_report_sink_server) =
VerifiedSubscriberVerifiedMappingEventIngestReportV1::file_sink(
settings.store_base_path(),
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
"_verified_subscriber_verified_mapping_event_ingest_report"
),
)
.auto_commit(false)
.create()
.await?;
)
.await?;

let task = Self::new(
pool,
Expand Down
16 changes: 6 additions & 10 deletions mobile_verifier/tests/integrations/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ use file_store::{
};
use futures::{stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use helium_proto::{
services::{
mobile_config::NetworkKeyRole,
poc_mobile::{
mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward,
MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward,
RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward,
UnallocatedReward,
},
use helium_proto::services::{
mobile_config::NetworkKeyRole,
poc_mobile::{
mobile_reward_share::Reward as MobileReward, radio_reward_v2, GatewayReward,
MobileRewardShare, OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward,
RadioRewardV2, ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward,
},
Message,
};
use hex_assignments::{Assignment, HexAssignment, HexBoostData};
use mobile_config::{
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/tests/integrations/hex_boosting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use helium_crypto::PublicKeyBinary;
use helium_proto::services::{
poc_lora::UnallocatedRewardType,
poc_mobile::{
CoverageObjectValidity, HeartbeatValidity, LocationSource, RadioRewardV2,
SeniorityUpdateReason, SignalLevel, UnallocatedReward,
CoverageObjectValidity, HeartbeatValidity, LocationSource, MobileRewardShare,
RadioRewardV2, SeniorityUpdateReason, SignalLevel, UnallocatedReward,
},
};
use hextree::Cell;
Expand Down
4 changes: 2 additions & 2 deletions mobile_verifier/tests/integrations/rewarder_poc_dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use file_store::{
};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
CoverageObjectValidity, GatewayReward, HeartbeatValidity, LocationSource, RadioRewardV2,
SeniorityUpdateReason, SignalLevel,
CoverageObjectValidity, GatewayReward, HeartbeatValidity, LocationSource, MobileRewardShare,
RadioRewardV2, SeniorityUpdateReason, SignalLevel,
};
use mobile_verifier::{
cell_type::CellType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ use file_store::{
FileInfo,
};
use helium_crypto::{KeyTag, Keypair, PublicKeyBinary};
use helium_proto::services::poc_mobile::VerifiedSubscriberVerifiedMappingEventIngestReportV1;
use mobile_verifier::subscriber_verified_mapping_event::{
aggregate_verified_mapping_events, SubscriberVerifiedMappingEventDaemon,
VerifiedSubscriberVerifiedMappingEventShare, VerifiedSubscriberVerifiedMappingEventShares,
};
use prost::Message;
use rand::rngs::OsRng;
use sqlx::{PgPool, Pool, Postgres, Row};
use std::{collections::HashMap, ops::Range};
Expand Down Expand Up @@ -86,10 +84,7 @@ async fn main_test(pool: PgPool) -> anyhow::Result<()> {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Commit(_) => panic!("got Commit"),
file_store::file_sink::Message::Rollback(_) => panic!("got Rollback"),
file_store::file_sink::Message::Data(_, data) => {
let proto_verified_report = VerifiedSubscriberVerifiedMappingEventIngestReportV1::decode(data.as_slice())
.expect("unable to decode into VerifiedSubscriberVerifiedMappingEventIngestReportV1");

file_store::file_sink::Message::Data(_, proto_verified_report) => {
let rcv_report: SubscriberVerifiedMappingEventIngestReport =
proto_verified_report.report.unwrap().try_into()?;

Expand Down

0 comments on commit b5bbad4

Please sign in to comment.