Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Virv12 committed Sep 30, 2024
1 parent 3a25f3b commit 01596ee
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 47 deletions.
3 changes: 2 additions & 1 deletion pixie-server/src/dnsmasq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ async fn write_config(state: &State) -> Result<()> {
),
};

let storage_str = state.storage_dir.to_str().unwrap();

write!(
dnsmasq_conf,
r#"
Expand Down Expand Up @@ -78,7 +80,6 @@ dhcp-vendorclass=set:netboot,PXEClient:Arch:00007
dhcp-vendorclass=set:netboot,PXEClient:Arch:00009
dhcp-vendorclass=set:netboot,pixie
"#,
storage_str = state.storage_dir.to_str().unwrap()
)?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion pixie-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn image(
}

async fn gc(extract::State(state): extract::State<Arc<State>>) -> String {
state.gc_chunks().await.unwrap();
state.gc_chunks().unwrap();
"".to_owned()
}

Expand Down
20 changes: 13 additions & 7 deletions pixie-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use clap::Parser;
use interfaces::Interface;
use ipnet::Ipv4Net;
use macaddr::MacAddr6;
use tokio::task::JoinHandle;

use crate::state::State;

Expand Down Expand Up @@ -126,13 +127,18 @@ async fn main() -> Result<()> {

let state = Arc::new(State::load(options.storage_dir)?);

tokio::try_join!(
dnsmasq::main(state.clone()),
http::main(state.clone()),
udp::main(&state),
tcp::main(state.clone()),
ping::main(&state),
)?;
async fn flatten(task: JoinHandle<Result<()>>) -> Result<()> {
task.await??;
Ok(())
}

let dnsmasq_task = flatten(tokio::spawn(dnsmasq::main(state.clone())));
let http_task = flatten(tokio::spawn(http::main(state.clone())));
let udp_task = flatten(tokio::spawn(udp::main(state.clone())));
let tcp_task = flatten(tokio::spawn(tcp::main(state.clone())));
let ping_task = flatten(tokio::spawn(ping::main(state.clone())));

tokio::try_join!(dnsmasq_task, http_task, udp_task, tcp_task, ping_task)?;

Ok(())
}
31 changes: 9 additions & 22 deletions pixie-server/src/ping.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use std::{
net::{IpAddr, Ipv4Addr},
time::SystemTime,
};
use std::{net::IpAddr, sync::Arc, time::SystemTime};

use anyhow::{bail, Result};
use pixie_shared::PACKET_LEN;
use tokio::net::UdpSocket;

use crate::{find_mac, state::State};
use pixie_shared::{PACKET_LEN, PING_PORT};

pub async fn main(state: &State) -> Result<()> {
// TODO: do we like port 4043?
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 4043)).await?;
pub async fn main(state: Arc<State>) -> Result<()> {
let socket = UdpSocket::bind((state.config.hosts.listen_on, PING_PORT)).await?;
log::info!("Listening on {}", socket.local_addr()?);

let mut buf = [0; PACKET_LEN];
Expand All @@ -28,19 +24,10 @@ pub async fn main(state: &State) -> Result<()> {
}
};

state.units.send_if_modified(|units| {
let Some(unit) = units.iter_mut().find(|unit| unit.mac == peer_mac) else {
log::warn!("Got ping from unknown unit");
return false;
};

unit.last_ping_timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
unit.last_ping_msg = buf[..len].to_vec();

true
});
let time = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
state.set_ping(peer_mac, time, buf[..len].to_owned());
}
}
21 changes: 18 additions & 3 deletions pixie-server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use anyhow::{anyhow, bail, Context, Result};
use macaddr::MacAddr6;
use mktemp::Temp;
use tokio::sync::watch;

Expand Down Expand Up @@ -151,7 +152,7 @@ impl State {
})
}

pub async fn gc_chunks(&self) -> Result<()> {
pub fn gc_chunks(&self) -> Result<()> {
self.image_stats.send_modify(|image_stats| {
let mut chunk_stats = self.chunk_stats.lock().unwrap();
let mut cnt = 0;
Expand All @@ -171,7 +172,7 @@ impl State {
Ok(())
}

pub async fn add_chunk(&self, hash: ChunkHash, data: &[u8]) -> Result<()> {
pub fn add_chunk(&self, hash: ChunkHash, data: &[u8]) -> Result<()> {
let path = self.storage_dir.join("chunks").join(hex::encode(hash));

self.image_stats.send_modify(|image_stats| {
Expand All @@ -192,7 +193,7 @@ impl State {
Ok(())
}

pub async fn add_image(&self, name: String, image: Image) -> Result<()> {
pub fn add_image(&self, name: String, image: Image) -> Result<()> {
if !self.config.images.contains(&name) {
bail!("Unknown image: {}", name);
}
Expand Down Expand Up @@ -231,4 +232,18 @@ impl State {

Ok(())
}

pub fn set_ping(&self, peer_mac: MacAddr6, time: u64, message: Vec<u8>) {
self.units.send_if_modified(|units| {
let Some(unit) = units.iter_mut().find(|unit| unit.mac == peer_mac) else {
log::warn!("Got ping from unknown unit");
return false;
};

unit.last_ping_timestamp = time;
unit.last_ping_msg = message;

true
});
}
}
13 changes: 4 additions & 9 deletions pixie-server/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
use std::{
io::ErrorKind,
net::SocketAddr,
net::{IpAddr, Ipv4Addr},
sync::Arc,
};
use std::{io::ErrorKind, net::IpAddr, net::SocketAddr, sync::Arc};

use tokio::{
fs,
Expand Down Expand Up @@ -77,14 +72,14 @@ async fn handle_request(state: &State, req: TcpRequest, peer_mac: MacAddr6) -> R
Vec::new()
}
TcpRequest::UploadChunk(hash, data) => {
state.add_chunk(hash, &data).await?;
state.add_chunk(hash, &data)?;
Vec::new()
}
TcpRequest::UploadImage(name, image) => {
if !state.config.images.contains(&name) {
return Ok(format!("Unknown image: {}", name).into_bytes());
}
state.add_image(name, image).await?;
state.add_image(name, image)?;

Vec::new()
}
Expand Down Expand Up @@ -190,7 +185,7 @@ async fn handle_connection(
}

pub async fn main(state: Arc<State>) -> Result<()> {
let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, ACTION_PORT)).await?;
let listener = TcpListener::bind((state.config.hosts.listen_on, ACTION_PORT)).await?;
log::info!("Listening on {}", listener.local_addr()?);
loop {
let (stream, addr) = listener.accept().await?;
Expand Down
9 changes: 5 additions & 4 deletions pixie-server/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
fs,
net::{IpAddr, Ipv4Addr, SocketAddrV4},
ops::Bound,
sync::Arc,
};

use tokio::{
Expand Down Expand Up @@ -203,7 +204,7 @@ async fn handle_requests(state: &State, socket: &UdpSocket, tx: Sender<[u8; 32]>
}
}

pub async fn main(state: &State) -> Result<()> {
pub async fn main(state: Arc<State>) -> Result<()> {
let (_, network) = find_network(state.config.hosts.listen_on)?;

let (tx, rx) = mpsc::channel(128);
Expand All @@ -212,9 +213,9 @@ pub async fn main(state: &State) -> Result<()> {
socket.set_broadcast(true)?;

tokio::try_join!(
broadcast_chunks(state, &socket, network.broadcast(), rx),
broadcast_hint(state, &socket, network.broadcast()),
handle_requests(state, &socket, tx),
broadcast_chunks(&state, &socket, network.broadcast(), rx),
broadcast_hint(&state, &socket, network.broadcast()),
handle_requests(&state, &socket, tx),
)?;

Ok(())
Expand Down

0 comments on commit 01596ee

Please sign in to comment.