From 021c427a6bc03d9f93973fe788a154808f4b2ef1 Mon Sep 17 00:00:00 2001 From: Jett Chen Date: Sun, 25 Feb 2024 03:42:40 +0800 Subject: [PATCH] Starting after Stop fix --- burrow/src/daemon/instance.rs | 18 +++++++------- burrow/src/wireguard/iface.rs | 47 +++++++++++++++++++++-------------- burrow/src/wireguard/pcb.rs | 6 ++--- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/burrow/src/daemon/instance.rs b/burrow/src/daemon/instance.rs index dcd88681..0d3e7265 100644 --- a/burrow/src/daemon/instance.rs +++ b/burrow/src/daemon/instance.rs @@ -21,7 +21,7 @@ enum RunState { pub struct DaemonInstance { rx: async_channel::Receiver, sx: async_channel::Sender, - tun_interface: Option>>, + tun_interface: Arc>>, wg_interface: Arc>, wg_state: RunState, } @@ -36,7 +36,7 @@ impl DaemonInstance { rx, sx, wg_interface, - tun_interface: None, + tun_interface: Arc::new(RwLock::new(None)), wg_state: RunState::Idle, } } @@ -50,15 +50,15 @@ impl DaemonInstance { warn!("Got start, but tun interface already up."); } RunState::Idle => { - let tun_if = Arc::new(RwLock::new(st.tun.open()?)); + let tun_if = st.tun.open()?; + debug!("Setting tun on wg_interface"); + self.wg_interface.read().await.set_tun(tun_if).await; + debug!("tun set on wg_interface"); debug!("Setting tun_interface"); - self.tun_interface = Some(tun_if.clone()); + self.tun_interface = self.wg_interface.read().await.get_tun(); debug!("tun_interface set: {:?}", self.tun_interface); - debug!("Setting tun on wg_interface"); - self.wg_interface.write().await.set_tun(tun_if).await; - debug!("tun set on wg_interface"); debug!("Cloning wg_interface"); let tmp_wg = self.wg_interface.clone(); @@ -82,12 +82,12 @@ impl DaemonInstance { } Ok(DaemonResponseData::None) } - DaemonCommand::ServerInfo => match &self.tun_interface { + DaemonCommand::ServerInfo => match &self.tun_interface.read().await.as_ref() { None => Ok(DaemonResponseData::None), Some(ti) => { info!("{:?}", ti); Ok(DaemonResponseData::ServerInfo(ServerInfo::try_from( - ti.read().await.inner.get_ref(), + ti.inner.get_ref(), )?)) } }, diff --git a/burrow/src/wireguard/iface.rs b/burrow/src/wireguard/iface.rs index 9e5e94a8..60970825 100755 --- a/burrow/src/wireguard/iface.rs +++ b/burrow/src/wireguard/iface.rs @@ -5,7 +5,7 @@ use anyhow::Error; use fehler::throws; use futures::future::join_all; use ip_network_table::IpNetworkTable; -use tokio::sync::{RwLock, RwLockReadGuard}; +use tokio::sync::{RwLock, Notify}; use tracing::{debug, error}; use tun::tokio::TunInterface; @@ -53,9 +53,10 @@ enum IfaceStatus { } pub struct Interface { - tun: Option>>, + tun: Arc>>, pcbs: Arc, - status: Arc> + status: Arc>, + stop_notifier: Arc, } async fn is_running(status: Arc>) -> bool { @@ -72,17 +73,23 @@ impl Interface { .collect::>()?; let pcbs = Arc::new(pcbs); - Self { pcbs, tun: None, status: Arc::new(RwLock::new(IfaceStatus::Idle)) } + Self { pcbs, tun: Arc::new(RwLock::new(None)), status: Arc::new(RwLock::new(IfaceStatus::Idle)), stop_notifier: Arc::new(Notify::new()) } } - pub async fn set_tun(&mut self, tun: Arc>) { - self.tun = Some(tun); + pub async fn set_tun(&self, tun: TunInterface) { + debug!("Setting tun interface"); + self.tun.write().await.replace(tun); let mut st = self.status.write().await; *st = IfaceStatus::Running; } + pub fn get_tun(&self) -> Arc>> { + self.tun.clone() + } + pub async fn remove_tun(&self){ let mut st = self.status.write().await; + self.stop_notifier.notify_waiters(); *st = IfaceStatus::Idle; } @@ -90,9 +97,9 @@ impl Interface { let pcbs = self.pcbs.clone(); let tun = self .tun - .clone() - .ok_or(anyhow::anyhow!("tun interface does not exist"))?; + .clone(); let status = self.status.clone(); + let stop_notifier = self.stop_notifier.clone(); log::info!("Starting interface"); let outgoing = async move { @@ -100,15 +107,20 @@ impl Interface { let mut buf = [0u8; 3000]; let src = { - let src = match tun.read().await.recv(&mut buf[..]).await { - Ok(len) => &buf[..len], - Err(e) => { - error!("Failed to read from interface: {}", e); - continue - } + let t = tun.read().await; + let Some(_tun) = t.as_ref() else { + continue; }; - debug!("Read {} bytes from interface", src.len()); - src + tokio::select! { + _ = stop_notifier.notified() => continue, + pkg = _tun.recv(&mut buf[..]) => match pkg { + Ok(len) => &buf[..len], + Err(e) => { + error!("Failed to read from interface: {}", e); + continue + } + }, + } }; let dst_addr = match Tunnel::dst_address(src) { @@ -143,8 +155,7 @@ impl Interface { let mut tsks = vec![]; let tun = self .tun - .clone() - .ok_or(anyhow::anyhow!("tun interface does not exist"))?; + .clone(); let outgoing = tokio::task::spawn(outgoing); tsks.push(outgoing); debug!("preparing to spawn read tasks"); diff --git a/burrow/src/wireguard/pcb.rs b/burrow/src/wireguard/pcb.rs index db57968a..974d84e7 100755 --- a/burrow/src/wireguard/pcb.rs +++ b/burrow/src/wireguard/pcb.rs @@ -54,7 +54,7 @@ impl PeerPcb { Ok(()) } - pub async fn run(&self, tun_interface: Arc>) -> Result<(), Error> { + pub async fn run(&self, tun_interface: Arc>>) -> Result<(), Error> { tracing::debug!("starting read loop for pcb... for {:?}", &self); let rid: i32 = random(); let mut buf: [u8; 3000] = [0u8; 3000]; @@ -106,12 +106,12 @@ impl PeerPcb { } TunnResult::WriteToTunnelV4(packet, addr) => { tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr); - tun_interface.read().await.send(packet).await?; + tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?; break } TunnResult::WriteToTunnelV6(packet, addr) => { tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr); - tun_interface.read().await.send(packet).await?; + tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?; break } }