From 01596ee479cac2f5b9934c042a54266195b06adf Mon Sep 17 00:00:00 2001 From: Filippo Casarin Date: Mon, 30 Sep 2024 15:27:08 +0200 Subject: [PATCH] More refactoring --- pixie-server/src/dnsmasq.rs | 3 ++- pixie-server/src/http.rs | 2 +- pixie-server/src/main.rs | 20 +++++++++++++------- pixie-server/src/ping.rs | 31 +++++++++---------------------- pixie-server/src/state.rs | 21 ++++++++++++++++++--- pixie-server/src/tcp.rs | 13 ++++--------- pixie-server/src/udp.rs | 9 +++++---- 7 files changed, 52 insertions(+), 47 deletions(-) diff --git a/pixie-server/src/dnsmasq.rs b/pixie-server/src/dnsmasq.rs index 9f5cb85..950149f 100644 --- a/pixie-server/src/dnsmasq.rs +++ b/pixie-server/src/dnsmasq.rs @@ -47,6 +47,8 @@ async fn write_config(state: &State) -> Result<()> { ), }; + let storage_str = state.storage_dir.to_str().unwrap(); + write!( dnsmasq_conf, r#" @@ -78,7 +80,6 @@ dhcp-vendorclass=set:netboot,PXEClient:Arch:00007 dhcp-vendorclass=set:netboot,PXEClient:Arch:00009 dhcp-vendorclass=set:netboot,pixie "#, - storage_str = state.storage_dir.to_str().unwrap() )?; Ok(()) diff --git a/pixie-server/src/http.rs b/pixie-server/src/http.rs index fef0772..e734b57 100644 --- a/pixie-server/src/http.rs +++ b/pixie-server/src/http.rs @@ -140,7 +140,7 @@ async fn image( } async fn gc(extract::State(state): extract::State>) -> String { - state.gc_chunks().await.unwrap(); + state.gc_chunks().unwrap(); "".to_owned() } diff --git a/pixie-server/src/main.rs b/pixie-server/src/main.rs index fb05be9..c28d663 100644 --- a/pixie-server/src/main.rs +++ b/pixie-server/src/main.rs @@ -19,6 +19,7 @@ use clap::Parser; use interfaces::Interface; use ipnet::Ipv4Net; use macaddr::MacAddr6; +use tokio::task::JoinHandle; use crate::state::State; @@ -126,13 +127,18 @@ async fn main() -> Result<()> { let state = Arc::new(State::load(options.storage_dir)?); - tokio::try_join!( - dnsmasq::main(state.clone()), - http::main(state.clone()), - udp::main(&state), - tcp::main(state.clone()), - ping::main(&state), - )?; + async fn flatten(task: JoinHandle>) -> Result<()> { + task.await??; + Ok(()) + } + + let dnsmasq_task = flatten(tokio::spawn(dnsmasq::main(state.clone()))); + let http_task = flatten(tokio::spawn(http::main(state.clone()))); + let udp_task = flatten(tokio::spawn(udp::main(state.clone()))); + let tcp_task = flatten(tokio::spawn(tcp::main(state.clone()))); + let ping_task = flatten(tokio::spawn(ping::main(state.clone()))); + + tokio::try_join!(dnsmasq_task, http_task, udp_task, tcp_task, ping_task)?; Ok(()) } diff --git a/pixie-server/src/ping.rs b/pixie-server/src/ping.rs index c0570f2..8abcacc 100644 --- a/pixie-server/src/ping.rs +++ b/pixie-server/src/ping.rs @@ -1,17 +1,13 @@ -use std::{ - net::{IpAddr, Ipv4Addr}, - time::SystemTime, -}; +use std::{net::IpAddr, sync::Arc, time::SystemTime}; use anyhow::{bail, Result}; -use pixie_shared::PACKET_LEN; use tokio::net::UdpSocket; use crate::{find_mac, state::State}; +use pixie_shared::{PACKET_LEN, PING_PORT}; -pub async fn main(state: &State) -> Result<()> { - // TODO: do we like port 4043? - let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 4043)).await?; +pub async fn main(state: Arc) -> Result<()> { + let socket = UdpSocket::bind((state.config.hosts.listen_on, PING_PORT)).await?; log::info!("Listening on {}", socket.local_addr()?); let mut buf = [0; PACKET_LEN]; @@ -28,19 +24,10 @@ pub async fn main(state: &State) -> Result<()> { } }; - state.units.send_if_modified(|units| { - let Some(unit) = units.iter_mut().find(|unit| unit.mac == peer_mac) else { - log::warn!("Got ping from unknown unit"); - return false; - }; - - unit.last_ping_timestamp = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - unit.last_ping_msg = buf[..len].to_vec(); - - true - }); + let time = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + state.set_ping(peer_mac, time, buf[..len].to_owned()); } } diff --git a/pixie-server/src/state.rs b/pixie-server/src/state.rs index db5ea0e..17e592d 100644 --- a/pixie-server/src/state.rs +++ b/pixie-server/src/state.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::{anyhow, bail, Context, Result}; +use macaddr::MacAddr6; use mktemp::Temp; use tokio::sync::watch; @@ -151,7 +152,7 @@ impl State { }) } - pub async fn gc_chunks(&self) -> Result<()> { + pub fn gc_chunks(&self) -> Result<()> { self.image_stats.send_modify(|image_stats| { let mut chunk_stats = self.chunk_stats.lock().unwrap(); let mut cnt = 0; @@ -171,7 +172,7 @@ impl State { Ok(()) } - pub async fn add_chunk(&self, hash: ChunkHash, data: &[u8]) -> Result<()> { + pub fn add_chunk(&self, hash: ChunkHash, data: &[u8]) -> Result<()> { let path = self.storage_dir.join("chunks").join(hex::encode(hash)); self.image_stats.send_modify(|image_stats| { @@ -192,7 +193,7 @@ impl State { Ok(()) } - pub async fn add_image(&self, name: String, image: Image) -> Result<()> { + pub fn add_image(&self, name: String, image: Image) -> Result<()> { if !self.config.images.contains(&name) { bail!("Unknown image: {}", name); } @@ -231,4 +232,18 @@ impl State { Ok(()) } + + pub fn set_ping(&self, peer_mac: MacAddr6, time: u64, message: Vec) { + self.units.send_if_modified(|units| { + let Some(unit) = units.iter_mut().find(|unit| unit.mac == peer_mac) else { + log::warn!("Got ping from unknown unit"); + return false; + }; + + unit.last_ping_timestamp = time; + unit.last_ping_msg = message; + + true + }); + } } diff --git a/pixie-server/src/tcp.rs b/pixie-server/src/tcp.rs index 8dc6214..7c82571 100644 --- a/pixie-server/src/tcp.rs +++ b/pixie-server/src/tcp.rs @@ -1,9 +1,4 @@ -use std::{ - io::ErrorKind, - net::SocketAddr, - net::{IpAddr, Ipv4Addr}, - sync::Arc, -}; +use std::{io::ErrorKind, net::IpAddr, net::SocketAddr, sync::Arc}; use tokio::{ fs, @@ -77,14 +72,14 @@ async fn handle_request(state: &State, req: TcpRequest, peer_mac: MacAddr6) -> R Vec::new() } TcpRequest::UploadChunk(hash, data) => { - state.add_chunk(hash, &data).await?; + state.add_chunk(hash, &data)?; Vec::new() } TcpRequest::UploadImage(name, image) => { if !state.config.images.contains(&name) { return Ok(format!("Unknown image: {}", name).into_bytes()); } - state.add_image(name, image).await?; + state.add_image(name, image)?; Vec::new() } @@ -190,7 +185,7 @@ async fn handle_connection( } pub async fn main(state: Arc) -> Result<()> { - let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, ACTION_PORT)).await?; + let listener = TcpListener::bind((state.config.hosts.listen_on, ACTION_PORT)).await?; log::info!("Listening on {}", listener.local_addr()?); loop { let (stream, addr) = listener.accept().await?; diff --git a/pixie-server/src/udp.rs b/pixie-server/src/udp.rs index fba3aca..afa0914 100644 --- a/pixie-server/src/udp.rs +++ b/pixie-server/src/udp.rs @@ -3,6 +3,7 @@ use std::{ fs, net::{IpAddr, Ipv4Addr, SocketAddrV4}, ops::Bound, + sync::Arc, }; use tokio::{ @@ -203,7 +204,7 @@ async fn handle_requests(state: &State, socket: &UdpSocket, tx: Sender<[u8; 32]> } } -pub async fn main(state: &State) -> Result<()> { +pub async fn main(state: Arc) -> Result<()> { let (_, network) = find_network(state.config.hosts.listen_on)?; let (tx, rx) = mpsc::channel(128); @@ -212,9 +213,9 @@ pub async fn main(state: &State) -> Result<()> { socket.set_broadcast(true)?; tokio::try_join!( - broadcast_chunks(state, &socket, network.broadcast(), rx), - broadcast_hint(state, &socket, network.broadcast()), - handle_requests(state, &socket, tx), + broadcast_chunks(&state, &socket, network.broadcast(), rx), + broadcast_hint(&state, &socket, network.broadcast()), + handle_requests(&state, &socket, tx), )?; Ok(())