Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File Sink Client Typing #849

Merged
merged 28 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
420648a
make file sink parts declare their type
michaeldjeffrey Jul 19, 2024
dc66e79
ingest: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
48f37f1
boost_manager: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
8d0399a
iot_packet_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
be57669
mobile_packet_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
91418ff
price: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
ea033cf
poc_entropy: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
ada8975
iot_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
edcb89c
mobile_verifier: type all FileSinkClients
michaeldjeffrey Jul 19, 2024
8b1294c
Rename trait FileStoreAsBytes -> MsgBytes to follow convention for pr…
michaeldjeffrey Jul 22, 2024
990fd46
create file sinks directly from the message
michaeldjeffrey Jul 24, 2024
2b916a6
rename file sink trait to writer
michaeldjeffrey Jul 30, 2024
4e90746
file sink metric from static str to String
michaeldjeffrey Jul 30, 2024
ae81c48
ingest iot/mobile servers to use convenience file_sink method
michaeldjeffrey Jul 30, 2024
02f422c
iot_verifier to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
da58d2a
boost manager to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
94c5d28
mobile packet verifier use file_sink convenience methods
michaeldjeffrey Jul 30, 2024
61bd7a1
mobile_verifier to use file_sink convenience methods
michaeldjeffrey Jul 30, 2024
a18218c
poc_entropy to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
758bb18
price to use file_sink convenience method
michaeldjeffrey Jul 30, 2024
8fbd2f5
move MsgBytes trait back out of FileSinkWrite
michaeldjeffrey Jul 30, 2024
e17ed62
autocommit=false is already the default
michaeldjeffrey Jul 30, 2024
a59351f
msg is no longer bytes in test
michaeldjeffrey Jul 30, 2024
9457e98
stick with single style of reference
michaeldjeffrey Jul 30, 2024
5e216b6
combine auto_commit and roll_time into 1 argument
michaeldjeffrey Jul 30, 2024
68d9a1c
fixup after rebase
michaeldjeffrey Aug 2, 2024
b5bbad4
fixup after rebase
michaeldjeffrey Aug 19, 2024
a5fe861
uncouple FileSinkWriteExt from FileType enum
michaeldjeffrey Aug 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use boost_manager::{
};
use clap::Parser;
use file_store::{
file_info_poller::LookbackBehavior, file_sink, file_source, file_upload,
reward_manifest::RewardManifest, FileStore, FileType,
file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest,
traits::FileSinkWriteExt, FileStore, FileType,
};
use helium_proto::BoostedHexUpdateV1;
use mobile_config::client::hex_boosting_client::HexBoostingClient;
use solana::start_boost::SolanaRpc;
use std::{
Expand Down Expand Up @@ -99,14 +100,12 @@ impl Server {
.await?;

// setup the writer for our updated hexes
let (updated_hexes_sink, updated_hexes_sink_server) = file_sink::FileSinkBuilder::new(
FileType::BoostedHexUpdate,
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink(
store_base_path,
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_boosted_hex_update"),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

// The server to monitor rewards and activate any newly seen boosted hexes
Expand Down
4 changes: 2 additions & 2 deletions boost_manager/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const LAST_PROCESSED_TIMESTAMP_KEY: &str = "last_processed_hex_boosting_info";
pub struct Watcher<A> {
pub pool: Pool<Postgres>,
pub hex_boosting_client: A,
pub file_sink: FileSinkClient,
pub file_sink: FileSinkClient<BoostedHexUpdateProto>,
}

impl<A> ManagedTask for Watcher<A>
Expand All @@ -45,7 +45,7 @@ where
{
pub async fn new(
pool: Pool<Postgres>,
file_sink: FileSinkClient,
file_sink: FileSinkClient<BoostedHexUpdateProto>,
hex_boosting_client: A,
) -> Result<Self> {
Ok(Self {
Expand Down
8 changes: 4 additions & 4 deletions boost_manager/tests/integrations/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ impl HexBoostingInfoResolver for MockHexBoostingClient {
}

pub struct MockFileSinkReceiver {
pub receiver: tokio::sync::mpsc::Receiver<SinkMessage>,
pub receiver: tokio::sync::mpsc::Receiver<SinkMessage<BoostedHexUpdateProto>>,
}

impl MockFileSinkReceiver {
pub async fn receive(&mut self) -> Option<Vec<u8>> {
match timeout(seconds(2), self.receiver.recv()).await {
Ok(Some(SinkMessage::Data(on_write_tx, msg))) => {
let _ = on_write_tx.send(Ok(()));
Some(msg)
Some(msg.encode_to_vec())
}
Ok(None) => None,
Err(e) => panic!("timeout while waiting for message1 {:?}", e),
Expand Down Expand Up @@ -81,12 +81,12 @@ impl MockFileSinkReceiver {
}
}

pub fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) {
pub fn create_file_sink() -> (FileSinkClient<BoostedHexUpdateProto>, MockFileSinkReceiver) {
let (tx, rx) = tokio::sync::mpsc::channel(20);
(
FileSinkClient {
sender: tx,
metric: "metric",
metric: "metric".into(),
},
MockFileSinkReceiver { receiver: rx },
)
Expand Down
2 changes: 1 addition & 1 deletion file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl fmt::Display for FileType {
}

impl FileType {
pub fn to_str(&self) -> &'static str {
pub const fn to_str(&self) -> &'static str {
match self {
Self::InvalidatedRadioThresholdReq => INVALIDATED_RADIO_THRESHOLD_REQ,
Self::InvalidatedRadioThresholdIngestReport => {
Expand Down
63 changes: 34 additions & 29 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{file_upload::FileUpload, Error, Result};
use crate::{file_upload::FileUpload, traits::MsgBytes, Error, Result};
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -45,16 +45,16 @@ fn transport_sink(transport: &mut Transport) -> &mut Sink {
}

#[derive(Debug)]
pub enum Message {
Data(oneshot::Sender<Result>, Vec<u8>),
pub enum Message<T> {
Data(oneshot::Sender<Result>, T),
Commit(oneshot::Sender<Result<FileManifest>>),
Rollback(oneshot::Sender<Result<FileManifest>>),
}

pub type MessageSender = mpsc::Sender<Message>;
pub type MessageReceiver = mpsc::Receiver<Message>;
pub type MessageSender<T> = mpsc::Sender<Message<T>>;
pub type MessageReceiver<T> = mpsc::Receiver<Message<T>>;

fn message_channel(size: usize) -> (MessageSender, MessageReceiver) {
fn message_channel<T>(size: usize) -> (MessageSender<T>, MessageReceiver<T>) {
mpsc::channel(size)
}

Expand All @@ -66,15 +66,15 @@ pub struct FileSinkBuilder {
roll_time: Duration,
file_upload: FileUpload,
auto_commit: bool,
metric: &'static str,
metric: String,
}

impl FileSinkBuilder {
pub fn new(
prefix: impl ToString,
target_path: &Path,
file_upload: FileUpload,
metric: &'static str,
metric: impl Into<String>,
) -> Self {
Self {
prefix: prefix.to_string(),
Expand All @@ -84,7 +84,7 @@ impl FileSinkBuilder {
roll_time: Duration::from_secs(DEFAULT_SINK_ROLL_SECS),
file_upload,
auto_commit: true,
metric,
metric: metric.into(),
}
}

Expand Down Expand Up @@ -120,15 +120,18 @@ impl FileSinkBuilder {
}
}

pub async fn create(self) -> Result<(FileSinkClient, FileSink)> {
pub async fn create<T>(self) -> Result<(FileSinkClient<T>, FileSink<T>)>
where
T: MsgBytes,
{
let (tx, rx) = message_channel(50);

let client = FileSinkClient {
sender: tx,
metric: self.metric,
};

metrics::counter!(client.metric, vec![OK_LABEL]);
metrics::counter!(client.metric.clone(), vec![OK_LABEL]);

let mut sink = FileSink {
target_path: self.target_path,
Expand All @@ -148,33 +151,35 @@ impl FileSinkBuilder {
}

#[derive(Debug, Clone)]
pub struct FileSinkClient {
pub sender: MessageSender,
pub metric: &'static str,
pub struct FileSinkClient<T> {
pub sender: MessageSender<T>,
pub metric: String,
}

const OK_LABEL: Label = Label::from_static_parts("status", "ok");
const ERROR_LABEL: Label = Label::from_static_parts("status", "error");
const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

impl FileSinkClient {
pub fn new(sender: MessageSender, metric: &'static str) -> Self {
Self { sender, metric }
impl<T> FileSinkClient<T> {
pub fn new(sender: MessageSender<T>, metric: impl Into<String>) -> Self {
Self {
sender,
metric: metric.into(),
}
}

pub async fn write<T: prost::Message>(
pub async fn write(
&self,
item: T,
labels: impl IntoIterator<Item = &(&'static str, &'static str)>,
) -> Result<oneshot::Receiver<Result>> {
let (on_write_tx, on_write_rx) = oneshot::channel();
let bytes = item.encode_to_vec();
let labels = labels.into_iter().map(Label::from);
tokio::select! {
result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result {
result = self.sender.send_timeout(Message::Data(on_write_tx, item), SEND_TIMEOUT) => match result {
Ok(_) => {
metrics::counter!(
self.metric,
self.metric.clone(),
labels
.chain(std::iter::once(OK_LABEL))
.collect::<Vec<Label>>()
Expand All @@ -184,7 +189,7 @@ impl FileSinkClient {
}
Err(SendTimeoutError::Closed(_)) => {
metrics::counter!(
self.metric,
self.metric.clone(),
labels
.chain(std::iter::once(ERROR_LABEL))
.collect::<Vec<Label>>()
Expand All @@ -203,7 +208,7 @@ impl FileSinkClient {
/// Writes all messages to the file sink, return the last oneshot
pub async fn write_all(
&self,
items: impl IntoIterator<Item = impl prost::Message>,
items: impl IntoIterator<Item = T>,
) -> Result<Option<oneshot::Receiver<Result>>> {
let mut last_oneshot = None;
for item in items {
Expand Down Expand Up @@ -244,14 +249,14 @@ impl FileSinkClient {
}

#[derive(Debug)]
pub struct FileSink {
pub struct FileSink<T> {
target_path: PathBuf,
tmp_path: PathBuf,
prefix: String,
max_size: usize,
roll_time: Duration,

messages: MessageReceiver,
messages: MessageReceiver<T>,
file_upload: FileUpload,
staged_files: Vec<PathBuf>,
auto_commit: bool,
Expand All @@ -273,7 +278,7 @@ impl ActiveSink {
}
}

impl ManagedTask for FileSink {
impl<T: MsgBytes + Send + Sync + 'static> ManagedTask for FileSink<T> {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
Expand All @@ -288,7 +293,7 @@ impl ManagedTask for FileSink {
}
}

impl FileSink {
impl<T: MsgBytes> FileSink<T> {
async fn init(&mut self) -> Result {
fs::create_dir_all(&self.target_path).await?;
fs::create_dir_all(&self.tmp_path).await?;
Expand Down Expand Up @@ -350,8 +355,8 @@ impl FileSink {
_ = shutdown.clone() => break,
_ = rollover_timer.tick() => self.maybe_roll().await?,
msg = self.messages.recv() => match msg {
Some(Message::Data(on_write_tx, bytes)) => {
let res = match self.write(Bytes::from(bytes)).await {
Some(Message::Data(on_write_tx, item)) => {
let res = match self.write(item.as_bytes()).await {
Ok(_) => Ok(()),
Err(err) => {
tracing::error!("failed to store {}: {err:?}", &self.prefix);
Expand Down
Loading