Skip to content

Commit

Permalink
Add a throttle for fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
iovxw committed Apr 10, 2020
1 parent 2291ecb commit c448753
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions src/fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::cmp;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
};

use futures::{future::FutureExt, select_biased};
use tbot::{
Expand All @@ -11,7 +14,7 @@ use tokio::{
self,
stream::StreamExt,
sync::Notify,
time::{self, delay_queue::DelayQueue, Duration, Instant},
time::{self, delay_for, delay_queue::DelayQueue, Duration, Instant},
};

use crate::client::pull_feed;
Expand All @@ -28,14 +31,17 @@ pub fn start(
// TODO: Don't use interval, it can accumulate ticks
// replace it with delay_until
let mut interval = time::interval_at(Instant::now(), Duration::from_secs(min_interval as u64));
let throttle = Throttle::new(min_interval as usize);
tokio::spawn(async move {
loop {
select_biased! {
feed = queue.next().fuse() => {
let feed = feed.expect("unreachable");
let bot = bot.clone();
let db = db.clone();
let opportunity = throttle.acquire();
tokio::spawn(async move {
opportunity.wait().await;
if let Err(e) = fetch_and_push_updates(bot, db, feed).await {
dbg!(e);
}
Expand Down Expand Up @@ -213,3 +219,42 @@ impl FetchQueue {
}
}
}

struct Throttle {
pieces: usize,
counter: Arc<AtomicUsize>,
}

impl Throttle {
fn new(pieces: usize) -> Self {
Throttle {
pieces,
counter: Arc::new(AtomicUsize::new(0)),
}
}

fn acquire(&self) -> Opportunity {
Opportunity {
n: self.counter.fetch_add(1, Ordering::AcqRel) % self.pieces,
counter: self.counter.clone(),
}
}
}

#[must_use = "Don't lose your opportunity"]
struct Opportunity {
n: usize,
counter: Arc<AtomicUsize>,
}

impl Opportunity {
async fn wait(&self) {
delay_for(Duration::from_secs(self.n as u64)).await
}
}

impl Drop for Opportunity {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}

0 comments on commit c448753

Please sign in to comment.