Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelYvon committed Mar 1, 2024
1 parent c71e8cb commit 35d62eb
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 62 deletions.
1 change: 1 addition & 0 deletions onvif/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ base64 = "0.13.0"
bigdecimal = "0.3.0"
chrono = "0.4.19"
digest_auth = "0.3.0"
futures = "0.3.30"
futures-core = "0.3.8"
num-bigint = "0.4.2"
reqwest = { version = "0.11.20", default-features = false }
Expand Down
261 changes: 199 additions & 62 deletions onvif/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod network_enumeration;

use futures_core::stream::Stream;
use schema::ws_discovery::{probe, probe_matches};
use std::{
Expand Down Expand Up @@ -31,6 +33,17 @@ pub enum Error {
Unsupported(String),
}

// TODO
#[derive(Debug, Clone)]
pub enum DiscoveryMode {
/// The normal WS-Discovery Mode
Multicast,
Unicast {
network: Ipv4Addr,
network_mask: Ipv4Addr,
},
}

#[derive(Clone, Eq, Hash, PartialEq)]
pub struct Device {
/// The WS-Discovery UUID / address reference
Expand All @@ -55,18 +68,24 @@ impl Debug for Device {
pub struct DiscoveryBuilder {
duration: Duration,
listen_address: IpAddr,
discovery_mode: DiscoveryMode,
}

impl Default for DiscoveryBuilder {
fn default() -> Self {
Self {
duration: Duration::from_secs(5),
listen_address: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
discovery_mode: DiscoveryMode::Multicast,
}
}
}

impl DiscoveryBuilder {
const LOCAL_PORT: u16 = 0;
const MULTI_PORT: u16 = 3702;
const WS_DISCOVERY_BROADCAST_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);

/// How long to listen for the responses from the network.
pub fn duration(&mut self, duration: Duration) -> &mut Self {
self.duration = duration;
Expand All @@ -83,79 +102,104 @@ impl DiscoveryBuilder {
self
}

/// Discovers devices on a local network asynchronously using WS-discovery.
///
/// Internally it sends a multicast probe and waits for responses for a specified amount of time.
/// The result is a stream of discovered devices.
/// The stream is terminated after provided amount of time.
///
/// There are many different ways to iterate over and process the values in a `Stream`
/// https://rust-lang.github.io/async-book/05_streams/02_iteration_and_concurrency.html
///
/// # Examples
///
/// You can access each element on the stream concurrently as soon as devices respond:
///
/// ```
/// use onvif::discovery;
/// use futures_util::stream::StreamExt; // to use for_each_concurrent
///
/// const MAX_CONCURRENT_JUMPERS: usize = 100;
///
/// async {
/// discovery::DiscoveryBuilder::default().run()
/// .await
/// .unwrap()
/// .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |addr| {
/// async move {
/// println!("Device found: {:?}", addr);
/// }
/// })
/// .await;
/// };
/// ```
///
/// Or you can await on a collection of unique devices found in one second:
///
/// ```
/// use onvif::discovery;
/// use futures_util::stream::StreamExt; // to use collect
/// use std::collections::HashSet;
///
/// async {
/// let devices = discovery::DiscoveryBuilder::default().run()
/// .await
/// .unwrap()
/// .collect::<HashSet<_>>()
/// .await;
///
/// println!("Devices found: {:?}", devices);
/// };
/// ```
pub async fn run(&self) -> Result<impl Stream<Item = Device>, Error> {
let Self {
duration,
listen_address,
} = self;
/// Set the discovery mode. See [DiscoveryMode] for a description of how this works.
pub fn discovery_mode(&mut self, discovery_mode: DiscoveryMode) -> &mut Self {
self.discovery_mode = discovery_mode;
self
}

async fn run_unicast(
&self,
duration: &Duration,
listen_address: &IpAddr,
network: &Ipv4Addr,
network_mask: &Ipv4Addr,
) -> Result<ReceiverStream<Device>, Error> {
let probe = Arc::new(build_probe());
let probe_xml = yaserde::ser::to_string(probe.as_ref()).map_err(Error::Serde)?;

debug!("Probe XML: {}", probe_xml);

let socket = {
const LOCAL_PORT: u16 = 0;
let message_id = Arc::new(probe.header.message_id.clone());
let payload = Arc::new(probe_xml.as_bytes().to_vec());

const MULTI_IPV4_ADDR: Ipv4Addr = Ipv4Addr::new(239, 255, 255, 250);
const MULTI_PORT: u16 = 3702;
let (device_sender, device_receiver) = channel(32);
let device_receiver = ReceiverStream::new(device_receiver);

let local_socket_addr = SocketAddr::new(*listen_address, LOCAL_PORT);
let multi_socket_addr = SocketAddr::new(IpAddr::V4(MULTI_IPV4_ADDR), MULTI_PORT);
let mut unicast_requests = vec![];

for target_address in enumerate_network_v4(*network, *network_mask) {
let local_socket_addr = SocketAddr::new(*listen_address, Self::LOCAL_PORT);
let target_sock_addr = SocketAddr::new(IpAddr::V4(target_address), Self::MULTI_PORT);

let socket = UdpSocket::bind(local_socket_addr).await?;

unicast_requests.push((
socket,
target_sock_addr,
device_sender.clone(),
payload.clone(),
message_id.clone(),
));
}

let produce_devices = async move {
futures::future::join_all(unicast_requests.iter().map(
|(sock, addr, device_sender, payload, message_id)| async move {
sock.send_to(payload, addr).await.ok()?;
let (xml, _) = recv_string(&sock).await.ok()?;

debug!("Probe match XML: {}", xml);

let envelope = match yaserde::de::from_str::<probe_matches::Envelope>(&xml) {
Ok(envelope) => envelope,
Err(e) => {
debug!("Deserialization failed: {e}");
return None;
}
};

if envelope.header.relates_to != **message_id {
debug!("Unrelated message");
return None;
}

if let Some(device) = device_from_envelope(envelope) {
debug!("Found device {device:?}");
// It's ok to ignore the sending error as user can drop the receiver soon
// (for example, after the first device discovered).
Some(device_sender.send(device).await)
} else {
None
}
},
))
.await
};

tokio::spawn(timeout(*duration, produce_devices));

Ok(device_receiver)
}

async fn run_multicast(
&self,
duration: &Duration,
listen_address: &IpAddr,
) -> Result<ReceiverStream<Device>, Error> {
let probe = Arc::new(build_probe());
let probe_xml = yaserde::ser::to_string(probe.as_ref()).map_err(Error::Serde)?;

debug!("Probe XML: {}", probe_xml);

let socket = {
let local_socket_addr = SocketAddr::new(*listen_address, Self::LOCAL_PORT);
let multi_socket_addr = SocketAddr::new(IpAddr::V4(Self::WS_DISCOVERY_BROADCAST_ADDR), Self::MULTI_PORT);

let socket = UdpSocket::bind(local_socket_addr).await?;

match listen_address {
IpAddr::V4(addr) => socket.join_multicast_v4(MULTI_IPV4_ADDR, *addr)?,
IpAddr::V4(addr) => socket.join_multicast_v4(Self::WS_DISCOVERY_BROADCAST_ADDR, *addr)?,
IpAddr::V6(_) => return Err(Error::Unsupported("Discovery with IPv6".to_owned())),
}

Expand Down Expand Up @@ -208,6 +252,78 @@ impl DiscoveryBuilder {

Ok(device_receiver)
}

/// Discovers devices on a local network asynchronously using WS-discovery.
///
/// Internally it sends a multicast probe and waits for responses for a specified amount of time.
/// You alternatively have the choice to send multiple unicast probes. See [DiscoveryMode]. This
/// is to allow the discovery process to operate within a Docker container or an environment where
/// the hosts network might be different than the target network.
///
/// The result is a stream of discovered devices.
/// The stream is terminated after provided amount of time.
///
/// There are many different ways to iterate over and process the values in a `Stream`
/// https://rust-lang.github.io/async-book/05_streams/02_iteration_and_concurrency.html
///
/// # Examples
///
/// You can access each element on the stream concurrently as soon as devices respond:
///
/// ```
/// use onvif::discovery;
/// use futures_util::stream::StreamExt; // to use for_each_concurrent
///
/// const MAX_CONCURRENT_JUMPERS: usize = 100;
///
/// async {
/// discovery::DiscoveryBuilder::default().run()
/// .await
/// .unwrap()
/// .for_each_concurrent(MAX_CONCURRENT_JUMPERS, |addr| {
/// async move {
/// println!("Device found: {:?}", addr);
/// }
/// })
/// .await;
/// };
/// ```
///
/// Or you can await on a collection of unique devices found in one second:
///
/// ```
/// use onvif::discovery;
/// use futures_util::stream::StreamExt; // to use collect
/// use std::collections::HashSet;
///
/// async {
/// let devices = discovery::DiscoveryBuilder::default().run()
/// .await
/// .unwrap()
/// .collect::<HashSet<_>>()
/// .await;
///
/// println!("Devices found: {:?}", devices);
/// };
/// ```
pub async fn run(&self) -> Result<impl Stream<Item=Device>, Error> {
let Self {
duration,
listen_address,
discovery_mode,
} = self;

match discovery_mode {
DiscoveryMode::Multicast => self.run_multicast(duration, listen_address).await,
DiscoveryMode::Unicast {
network,
network_mask,
} => {
self.run_unicast(duration, listen_address, network, network_mask)
.await
}
}
}
}

async fn recv_string(s: &UdpSocket) -> io::Result<(String, SocketAddr)> {
Expand Down Expand Up @@ -261,6 +377,27 @@ fn build_probe() -> probe::Envelope {
}
}

use std::iter::Iterator;
use std::net::Ipv6Addr;
use tokio_stream::StreamExt;
use crate::discovery::network_enumeration::enumerate_network_v4;

#[tokio::test]
async fn test_unicast() {
let devices = DiscoveryBuilder::default()
.discovery_mode(DiscoveryMode::Unicast {
network: Ipv4Addr::new(192, 168, 1, 0),
network_mask: Ipv4Addr::new(255, 255, 255, 0),
})
.run()
.await
.unwrap()
.collect::<Vec<_>>()
.await;

println!("Devices found: {:?}", devices);
}

#[test]
fn test_xaddrs_extraction() {
const DEVICE_ADDRESS: &str = "an address";
Expand Down Expand Up @@ -327,7 +464,7 @@ fn test_xaddrs_extraction() {
name: Some("MyCamera2000".to_string()),
hardware: None,
address: DEVICE_ADDRESS.to_string(),
types: vec![]
types: vec![],
}]
);
}
54 changes: 54 additions & 0 deletions onvif/src/discovery/network_enumeration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::net::Ipv4Addr;

#[inline]
fn octets_to_u32(octets: [u8; 4]) -> u32 {
(octets[0] as u32) << (3 * 8)
| (octets[1] as u32) << (2 * 8)
| (octets[2] as u32) << (1 * 8)
| (octets[3] as u32)
}

/// Enumerate the list of IPs on the network given the network address and the mask.
pub fn enumerate_network_v4(network: Ipv4Addr, mask: Ipv4Addr) -> Vec<Ipv4Addr> {
let network = octets_to_u32(network.octets());
let mask = octets_to_u32(mask.octets());

let mask = !mask;

let mut ips = Vec::with_capacity(mask as usize);

for value in 1..mask {
let addr = network | value;
ips.push(Ipv4Addr::from(addr))
}

ips
}

/// Tests the enumeration method. See http://jodies.de/ipcalc for examples.
#[cfg(test)]
mod test_enumerate_v4 {
use super::*;

#[test]
pub fn test_basic_home_network() {
let home_net = Ipv4Addr::new(192, 168, 0, 0);
let net_mask = Ipv4Addr::new(255, 255, 255, 0);

let ips = enumerate_network_v4(home_net, net_mask);

assert_eq!(254, ips.len())
}

#[test]
pub fn test_more_complex_net() {
let home_net = Ipv4Addr::new(192, 168, 0, 0);
let net_mask = Ipv4Addr::new(255, 255, 254, 0);

let ips = enumerate_network_v4(home_net, net_mask);

dbg!(&ips);

assert_eq!(510, ips.len())
}
}

0 comments on commit 35d62eb

Please sign in to comment.