Skip to content

Commit

Permalink
custom number threads for requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sequal32 committed Sep 18, 2020
1 parent 1a93a59 commit 4d0f58c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
4 changes: 3 additions & 1 deletion src/flightaware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ fn get_flightplan_from_json(data: &Value) -> Option<FlightPlan> {
});
}

#[derive(Debug)]
struct FlightPlanRequest {
id: String,
callsign: String
}

#[derive(Debug)]
pub struct FlightPlanResult {
pub id: String,
pub callsign: String,
Expand All @@ -85,7 +87,7 @@ impl FlightAware {
pub fn new() -> Self {
Self {
client: Arc::new(Mutex::new(Client::new())),
flightplans: Request::new()
flightplans: Request::new(5)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/noaa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct NoaaWeather {
impl NoaaWeather {
pub fn new() -> Self {
Self {
weather_request: Request::new(),
weather_request: Request::new(1),
client: Arc::new(Mutex::new(Client::new()))
}
}
Expand Down
45 changes: 27 additions & 18 deletions src/request.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
use std::thread;
use std::{thread, sync::Arc};

use crossbeam_channel::{unbounded, Sender, Receiver};
use crossbeam_deque::{Worker, Steal};

pub struct Request<T, J> {
tx: Sender<T>,
rx: Receiver<T>,
worker: Worker<J>
worker: Worker<J>,
num_threads: u32
}


impl<T, J> Request<T, J> where T: Send + 'static, J: Send + 'static {
pub fn new() -> Self {
pub fn new(num_threads: u32) -> Self {
let (tx, rx) = unbounded();
Self {
rx, tx,
worker: Worker::new_fifo()
worker: Worker::new_fifo(),
num_threads
}
}

pub fn run<F>(&self, worker: F) where
F: Fn(J) -> T + Send + 'static {
let result_transmitter = self.tx.clone();
let s = self.worker.stealer();

thread::spawn(move || {
loop {
match s.steal() {
Steal::Success(job) => {
result_transmitter.send(worker(job)).ok();
},
_ => ()
}
thread::sleep(std::time::Duration::from_millis(10));
F: Fn(J) -> T + Send + Sync + 'static {
let worker = Arc::new(worker);

// Spawn worker threads to read from queue
(0..self.num_threads).for_each(|_| {
let s = self.worker.stealer();
let result_transmitter = self.tx.clone();
let worker = worker.clone();
// Process tasks
thread::spawn(move || {
loop {
match s.steal() {
Steal::Success(job) => {
result_transmitter.send(worker(job)).ok();
},
_ => ()
}
thread::sleep(std::time::Duration::from_millis(10));
}
});
}
});
);
}

pub fn get_next(&self) -> Option<T> {
Expand Down

0 comments on commit 4d0f58c

Please sign in to comment.