Skip to content

Commit

Permalink
Starting after Stop fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JettChenT committed Feb 24, 2024
1 parent 36c90fe commit 021c427
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 30 deletions.
18 changes: 9 additions & 9 deletions burrow/src/daemon/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum RunState {
pub struct DaemonInstance {
rx: async_channel::Receiver<DaemonCommand>,
sx: async_channel::Sender<DaemonResponse>,
tun_interface: Option<Arc<RwLock<TunInterface>>>,
tun_interface: Arc<RwLock<Option<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>,
wg_state: RunState,
}
Expand All @@ -36,7 +36,7 @@ impl DaemonInstance {
rx,
sx,
wg_interface,
tun_interface: None,
tun_interface: Arc::new(RwLock::new(None)),
wg_state: RunState::Idle,
}
}
Expand All @@ -50,15 +50,15 @@ impl DaemonInstance {
warn!("Got start, but tun interface already up.");
}
RunState::Idle => {
let tun_if = Arc::new(RwLock::new(st.tun.open()?));
let tun_if = st.tun.open()?;
debug!("Setting tun on wg_interface");
self.wg_interface.read().await.set_tun(tun_if).await;
debug!("tun set on wg_interface");

debug!("Setting tun_interface");
self.tun_interface = Some(tun_if.clone());
self.tun_interface = self.wg_interface.read().await.get_tun();
debug!("tun_interface set: {:?}", self.tun_interface);

debug!("Setting tun on wg_interface");
self.wg_interface.write().await.set_tun(tun_if).await;
debug!("tun set on wg_interface");

debug!("Cloning wg_interface");
let tmp_wg = self.wg_interface.clone();
Expand All @@ -82,12 +82,12 @@ impl DaemonInstance {
}
Ok(DaemonResponseData::None)
}
DaemonCommand::ServerInfo => match &self.tun_interface {
DaemonCommand::ServerInfo => match &self.tun_interface.read().await.as_ref() {
None => Ok(DaemonResponseData::None),
Some(ti) => {
info!("{:?}", ti);
Ok(DaemonResponseData::ServerInfo(ServerInfo::try_from(
ti.read().await.inner.get_ref(),
ti.inner.get_ref(),
)?))
}
},
Expand Down
47 changes: 29 additions & 18 deletions burrow/src/wireguard/iface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Error;
use fehler::throws;
use futures::future::join_all;
use ip_network_table::IpNetworkTable;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::sync::{RwLock, Notify};
use tracing::{debug, error};
use tun::tokio::TunInterface;

Expand Down Expand Up @@ -53,9 +53,10 @@ enum IfaceStatus {
}

pub struct Interface {
tun: Option<Arc<RwLock<TunInterface>>>,
tun: Arc<RwLock<Option<TunInterface>>>,
pcbs: Arc<IndexedPcbs>,
status: Arc<RwLock<IfaceStatus>>
status: Arc<RwLock<IfaceStatus>>,
stop_notifier: Arc<Notify>,
}

async fn is_running(status: Arc<RwLock<IfaceStatus>>) -> bool {
Expand All @@ -72,43 +73,54 @@ impl Interface {
.collect::<Result<_, _>>()?;

let pcbs = Arc::new(pcbs);
Self { pcbs, tun: None, status: Arc::new(RwLock::new(IfaceStatus::Idle)) }
Self { pcbs, tun: Arc::new(RwLock::new(None)), status: Arc::new(RwLock::new(IfaceStatus::Idle)), stop_notifier: Arc::new(Notify::new()) }
}

pub async fn set_tun(&mut self, tun: Arc<RwLock<TunInterface>>) {
self.tun = Some(tun);
pub async fn set_tun(&self, tun: TunInterface) {
debug!("Setting tun interface");
self.tun.write().await.replace(tun);
let mut st = self.status.write().await;
*st = IfaceStatus::Running;
}

pub fn get_tun(&self) -> Arc<RwLock<Option<TunInterface>>> {
self.tun.clone()
}

pub async fn remove_tun(&self){
let mut st = self.status.write().await;
self.stop_notifier.notify_waiters();
*st = IfaceStatus::Idle;
}

pub async fn run(&self) -> anyhow::Result<()> {
let pcbs = self.pcbs.clone();
let tun = self
.tun
.clone()
.ok_or(anyhow::anyhow!("tun interface does not exist"))?;
.clone();
let status = self.status.clone();
let stop_notifier = self.stop_notifier.clone();
log::info!("Starting interface");

let outgoing = async move {
while is_running(status.clone()).await {
let mut buf = [0u8; 3000];

let src = {
let src = match tun.read().await.recv(&mut buf[..]).await {
Ok(len) => &buf[..len],
Err(e) => {
error!("Failed to read from interface: {}", e);
continue
}
let t = tun.read().await;
let Some(_tun) = t.as_ref() else {
continue;
};
debug!("Read {} bytes from interface", src.len());
src
tokio::select! {
_ = stop_notifier.notified() => continue,
pkg = _tun.recv(&mut buf[..]) => match pkg {
Ok(len) => &buf[..len],
Err(e) => {
error!("Failed to read from interface: {}", e);
continue
}
},
}
};

let dst_addr = match Tunnel::dst_address(src) {
Expand Down Expand Up @@ -143,8 +155,7 @@ impl Interface {
let mut tsks = vec![];
let tun = self
.tun
.clone()
.ok_or(anyhow::anyhow!("tun interface does not exist"))?;
.clone();
let outgoing = tokio::task::spawn(outgoing);
tsks.push(outgoing);
debug!("preparing to spawn read tasks");
Expand Down
6 changes: 3 additions & 3 deletions burrow/src/wireguard/pcb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl PeerPcb {
Ok(())
}

pub async fn run(&self, tun_interface: Arc<RwLock<TunInterface>>) -> Result<(), Error> {
pub async fn run(&self, tun_interface: Arc<RwLock<Option<TunInterface>>>) -> Result<(), Error> {
tracing::debug!("starting read loop for pcb... for {:?}", &self);
let rid: i32 = random();
let mut buf: [u8; 3000] = [0u8; 3000];
Expand Down Expand Up @@ -106,12 +106,12 @@ impl PeerPcb {
}
TunnResult::WriteToTunnelV4(packet, addr) => {
tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr);
tun_interface.read().await.send(packet).await?;
tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?;
break
}
TunnResult::WriteToTunnelV6(packet, addr) => {
tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr);
tun_interface.read().await.send(packet).await?;
tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?;
break
}
}
Expand Down

0 comments on commit 021c427

Please sign in to comment.