diff --git a/onvif/Cargo.toml b/onvif/Cargo.toml index f03f615..cdee42e 100644 --- a/onvif/Cargo.toml +++ b/onvif/Cargo.toml @@ -14,6 +14,7 @@ base64 = "0.13.0" bigdecimal = "0.3.0" chrono = "0.4.19" digest_auth = "0.3.0" +futures = "0.3.30" futures-core = "0.3.8" num-bigint = "0.4.2" reqwest = { version = "0.11.20", default-features = false } diff --git a/onvif/src/discovery/mod.rs b/onvif/src/discovery/mod.rs index fe871c0..28301a9 100644 --- a/onvif/src/discovery/mod.rs +++ b/onvif/src/discovery/mod.rs @@ -1,3 +1,5 @@ +mod network_enumeration; + use futures_core::stream::Stream; use schema::ws_discovery::{probe, probe_matches}; use std::{ @@ -31,6 +33,17 @@ pub enum Error { Unsupported(String), } +// TODO +#[derive(Debug, Clone)] +pub enum DiscoveryMode { + /// The normal WS-Discovery Mode + Multicast, + Unicast { + network: Ipv4Addr, + network_mask: Ipv4Addr, + }, +} + #[derive(Clone, Eq, Hash, PartialEq)] pub struct Device { /// The WS-Discovery UUID / address reference @@ -55,6 +68,7 @@ impl Debug for Device { pub struct DiscoveryBuilder { duration: Duration, listen_address: IpAddr, + discovery_mode: DiscoveryMode, } impl Default for DiscoveryBuilder { @@ -62,11 +76,16 @@ impl Default for DiscoveryBuilder { Self { duration: Duration::from_secs(5), listen_address: IpAddr::V4(Ipv4Addr::UNSPECIFIED), + discovery_mode: DiscoveryMode::Multicast, } } } impl DiscoveryBuilder { + const LOCAL_PORT: u16 = 0; + const MULTI_PORT: u16 = 3702; + const WS_DISCOVERY_BROADCAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); + /// How long to listen for the responses from the network. pub fn duration(&mut self, duration: Duration) -> &mut Self { self.duration = duration; @@ -83,79 +102,104 @@ impl DiscoveryBuilder { self } - /// Discovers devices on a local network asynchronously using WS-discovery. - /// - /// Internally it sends a multicast probe and waits for responses for a specified amount of time. - /// The result is a stream of discovered devices. - /// The stream is terminated after provided amount of time. - /// - /// There are many different ways to iterate over and process the values in a `Stream` - /// https://rust-lang.github.io/async-book/05_streams/02_iteration_and_concurrency.html - /// - /// # Examples - /// - /// You can access each element on the stream concurrently as soon as devices respond: - /// - /// ``` - /// use onvif::discovery; - /// use futures_util::stream::StreamExt; // to use for_each_concurrent - /// - /// const MAX_CONCURRENT_JUMPERS: usize = 100; - /// - /// async { - /// discovery::DiscoveryBuilder::default().run() - /// .await - /// .unwrap() - /// .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |addr| { - /// async move { - /// println!("Device found: {:?}", addr); - /// } - /// }) - /// .await; - /// }; - /// ``` - /// - /// Or you can await on a collection of unique devices found in one second: - /// - /// ``` - /// use onvif::discovery; - /// use futures_util::stream::StreamExt; // to use collect - /// use std::collections::HashSet; - /// - /// async { - /// let devices = discovery::DiscoveryBuilder::default().run() - /// .await - /// .unwrap() - /// .collect::>() - /// .await; - /// - /// println!("Devices found: {:?}", devices); - /// }; - /// ``` - pub async fn run(&self) -> Result, Error> { - let Self { - duration, - listen_address, - } = self; + /// Set the discovery mode. See [DiscoveryMode] for a description of how this works. + pub fn discovery_mode(&mut self, discovery_mode: DiscoveryMode) -> &mut Self { + self.discovery_mode = discovery_mode; + self + } + async fn run_unicast( + &self, + duration: &Duration, + listen_address: &IpAddr, + network: &Ipv4Addr, + network_mask: &Ipv4Addr, + ) -> Result, Error> { let probe = Arc::new(build_probe()); let probe_xml = yaserde::ser::to_string(probe.as_ref()).map_err(Error::Serde)?; debug!("Probe XML: {}", probe_xml); - let socket = { - const LOCAL_PORT: u16 = 0; + let message_id = Arc::new(probe.header.message_id.clone()); + let payload = Arc::new(probe_xml.as_bytes().to_vec()); - const MULTI_IPV4_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250); - const MULTI_PORT: u16 = 3702; + let (device_sender, device_receiver) = channel(32); + let device_receiver = ReceiverStream::new(device_receiver); - let local_socket_addr = SocketAddr::new(*listen_address, LOCAL_PORT); - let multi_socket_addr = SocketAddr::new(IpAddr::V4(MULTI_IPV4_ADDR), MULTI_PORT); + let mut unicast_requests = vec![]; + + for target_address in enumerate_network_v4(*network, *network_mask) { + let local_socket_addr = SocketAddr::new(*listen_address, Self::LOCAL_PORT); + let target_sock_addr = SocketAddr::new(IpAddr::V4(target_address), Self::MULTI_PORT); + + let socket = UdpSocket::bind(local_socket_addr).await?; + + unicast_requests.push(( + socket, + target_sock_addr, + device_sender.clone(), + payload.clone(), + message_id.clone(), + )); + } + + let produce_devices = async move { + futures::future::join_all(unicast_requests.iter().map( + |(sock, addr, device_sender, payload, message_id)| async move { + sock.send_to(payload, addr).await.ok()?; + let (xml, _) = recv_string(&sock).await.ok()?; + + debug!("Probe match XML: {}", xml); + + let envelope = match yaserde::de::from_str::(&xml) { + Ok(envelope) => envelope, + Err(e) => { + debug!("Deserialization failed: {e}"); + return None; + } + }; + + if envelope.header.relates_to != **message_id { + debug!("Unrelated message"); + return None; + } + + if let Some(device) = device_from_envelope(envelope) { + debug!("Found device {device:?}"); + // It's ok to ignore the sending error as user can drop the receiver soon + // (for example, after the first device discovered). + Some(device_sender.send(device).await) + } else { + None + } + }, + )) + .await + }; + + tokio::spawn(timeout(*duration, produce_devices)); + + Ok(device_receiver) + } + + async fn run_multicast( + &self, + duration: &Duration, + listen_address: &IpAddr, + ) -> Result, Error> { + let probe = Arc::new(build_probe()); + let probe_xml = yaserde::ser::to_string(probe.as_ref()).map_err(Error::Serde)?; + + debug!("Probe XML: {}", probe_xml); + + let socket = { + let local_socket_addr = SocketAddr::new(*listen_address, Self::LOCAL_PORT); + let multi_socket_addr = SocketAddr::new(IpAddr::V4(Self::WS_DISCOVERY_BROADCAST_ADDR), Self::MULTI_PORT); let socket = UdpSocket::bind(local_socket_addr).await?; match listen_address { - IpAddr::V4(addr) => socket.join_multicast_v4(MULTI_IPV4_ADDR, *addr)?, + IpAddr::V4(addr) => socket.join_multicast_v4(Self::WS_DISCOVERY_BROADCAST_ADDR, *addr)?, IpAddr::V6(_) => return Err(Error::Unsupported("Discovery with IPv6".to_owned())), } @@ -208,6 +252,78 @@ impl DiscoveryBuilder { Ok(device_receiver) } + + /// Discovers devices on a local network asynchronously using WS-discovery. + /// + /// Internally it sends a multicast probe and waits for responses for a specified amount of time. + /// You alternatively have the choice to send multiple unicast probes. See [DiscoveryMode]. This + /// is to allow the discovery process to operate within a Docker container or an environment where + /// the hosts network might be different than the target network. + /// + /// The result is a stream of discovered devices. + /// The stream is terminated after provided amount of time. + /// + /// There are many different ways to iterate over and process the values in a `Stream` + /// https://rust-lang.github.io/async-book/05_streams/02_iteration_and_concurrency.html + /// + /// # Examples + /// + /// You can access each element on the stream concurrently as soon as devices respond: + /// + /// ``` + /// use onvif::discovery; + /// use futures_util::stream::StreamExt; // to use for_each_concurrent + /// + /// const MAX_CONCURRENT_JUMPERS: usize = 100; + /// + /// async { + /// discovery::DiscoveryBuilder::default().run() + /// .await + /// .unwrap() + /// .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |addr| { + /// async move { + /// println!("Device found: {:?}", addr); + /// } + /// }) + /// .await; + /// }; + /// ``` + /// + /// Or you can await on a collection of unique devices found in one second: + /// + /// ``` + /// use onvif::discovery; + /// use futures_util::stream::StreamExt; // to use collect + /// use std::collections::HashSet; + /// + /// async { + /// let devices = discovery::DiscoveryBuilder::default().run() + /// .await + /// .unwrap() + /// .collect::>() + /// .await; + /// + /// println!("Devices found: {:?}", devices); + /// }; + /// ``` + pub async fn run(&self) -> Result, Error> { + let Self { + duration, + listen_address, + discovery_mode, + } = self; + + match discovery_mode { + DiscoveryMode::Multicast => self.run_multicast(duration, listen_address).await, + DiscoveryMode::Unicast { + network, + network_mask, + } => { + self.run_unicast(duration, listen_address, network, network_mask) + .await + } + } + } } async fn recv_string(s: &UdpSocket) -> io::Result<(String, SocketAddr)> { @@ -261,6 +377,27 @@ fn build_probe() -> probe::Envelope { } } +use std::iter::Iterator; +use std::net::Ipv6Addr; +use tokio_stream::StreamExt; +use crate::discovery::network_enumeration::enumerate_network_v4; + +#[tokio::test] +async fn test_unicast() { + let devices = DiscoveryBuilder::default() + .discovery_mode(DiscoveryMode::Unicast { + network: Ipv4Addr::new(192, 168, 1, 0), + network_mask: Ipv4Addr::new(255, 255, 255, 0), + }) + .run() + .await + .unwrap() + .collect::>() + .await; + + println!("Devices found: {:?}", devices); +} + #[test] fn test_xaddrs_extraction() { const DEVICE_ADDRESS: &str = "an address"; @@ -327,7 +464,7 @@ fn test_xaddrs_extraction() { name: Some("MyCamera2000".to_string()), hardware: None, address: DEVICE_ADDRESS.to_string(), - types: vec![] + types: vec![], }] ); } diff --git a/onvif/src/discovery/network_enumeration.rs b/onvif/src/discovery/network_enumeration.rs new file mode 100644 index 0000000..5ef01dd --- /dev/null +++ b/onvif/src/discovery/network_enumeration.rs @@ -0,0 +1,54 @@ +use std::net::Ipv4Addr; + +#[inline] +fn octets_to_u32(octets: [u8; 4]) -> u32 { + (octets[0] as u32) << (3 * 8) + | (octets[1] as u32) << (2 * 8) + | (octets[2] as u32) << (1 * 8) + | (octets[3] as u32) +} + +/// Enumerate the list of IPs on the network given the network address and the mask. +pub fn enumerate_network_v4(network: Ipv4Addr, mask: Ipv4Addr) -> Vec { + let network = octets_to_u32(network.octets()); + let mask = octets_to_u32(mask.octets()); + + let mask = !mask; + + let mut ips = Vec::with_capacity(mask as usize); + + for value in 1..mask { + let addr = network | value; + ips.push(Ipv4Addr::from(addr)) + } + + ips +} + +/// Tests the enumeration method. See http://jodies.de/ipcalc for examples. +#[cfg(test)] +mod test_enumerate_v4 { + use super::*; + + #[test] + pub fn test_basic_home_network() { + let home_net = Ipv4Addr::new(192, 168, 0, 0); + let net_mask = Ipv4Addr::new(255, 255, 255, 0); + + let ips = enumerate_network_v4(home_net, net_mask); + + assert_eq!(254, ips.len()) + } + + #[test] + pub fn test_more_complex_net() { + let home_net = Ipv4Addr::new(192, 168, 0, 0); + let net_mask = Ipv4Addr::new(255, 255, 254, 0); + + let ips = enumerate_network_v4(home_net, net_mask); + + dbg!(&ips); + + assert_eq!(510, ips.len()) + } +}