Skip to content

Commit

Permalink
Implement plugin support for resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
handicraftsman committed Oct 28, 2024
1 parent 429ecde commit ffa1fe9
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ description = "mDNS Service Discovery library with no async runtime dependency"
async = ["flume/async"]
logging = ["log"]
default = ["async", "logging"]
plugins = []

[dependencies]
flume = { version = "0.11", default-features = false } # channel between threads
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ mod dns_parser;
mod error;
mod service_daemon;
mod service_info;
#[cfg(feature = "plugins")]
mod plugin;

pub use error::{Error, Result};
pub use service_daemon::{
Expand All @@ -158,5 +160,8 @@ pub use service_daemon::{
};
pub use service_info::{AsIpAddrs, IntoTxtProperties, ServiceInfo, TxtProperties, TxtProperty};

#[cfg(feature = "plugins")]
pub use plugin::PluginCommand;

/// A handler to receive messages from [ServiceDaemon]. Re-export from `flume` crate.
pub use flume::Receiver;
12 changes: 12 additions & 0 deletions src/plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::collections::HashMap;
use crate::ServiceInfo;
use flume::Sender;

/// Commands to be implemented by plugins
#[derive(Debug)]
pub enum PluginCommand {
/// Command to fetch services that are currently provided by the plugin
ListServices(Sender<HashMap<String, ServiceInfo>>),

Exit(Sender<()>),
}
148 changes: 145 additions & 3 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
// corresponds to a set of DNS Resource Records.
#[cfg(feature = "logging")]
use crate::log::{debug, error, warn};
#[cfg(feature = "plugins")]
use crate::plugin::PluginCommand;
use crate::{
dns_cache::DnsCache,
dns_parser::{
Expand Down Expand Up @@ -278,6 +280,22 @@ impl ServiceDaemon {
self.send_cmd(Command::StopResolveHostname(hostname.to_string()))
}

/// Registers a plugin provided by the library consumer, to support dynamic mDNS resolution.
///
/// Please be aware that this resolution should be relatively consistent, e.g. configured
/// externally.
///
/// If feature `plugins` is enabled, the daemon will send requests to the plugins
/// using the flume channel that will be sent to `pc_recv`.
///
/// Please note that enabling the feature enables fetching the plugin-provided services
/// on *every* request, so this is disabled by default due to extra overhead.
#[cfg(feature = "plugins")]
pub fn register_plugin(&self, name: String, pc_recv: Sender<Receiver<PluginCommand>>) -> Result<()> {
self.send_cmd(Command::RegisterPlugin(name, pc_recv))
}


/// Registers a service provided by this host.
///
/// If `service_info` has no addresses yet and its `addr_auto` is enabled,
Expand All @@ -292,6 +310,7 @@ impl ServiceDaemon {
self.send_cmd(Command::Register(service_info))
}


/// Unregisters a service. This is a graceful shutdown of a service.
///
/// Returns a channel receiver that is used to receive the status code
Expand Down Expand Up @@ -401,9 +420,27 @@ impl ServiceDaemon {
fn daemon_thread(signal_sock: UdpSocket, poller: Poller, receiver: Receiver<Command>) {
let zc = Zeroconf::new(signal_sock, poller);

#[cfg(feature = "plugins")]
let plugin_senders = zc.plugin_senders.clone();

if let Some(cmd) = Self::run(zc, receiver) {
match cmd {
Command::Exit(resp_s) => {
#[cfg(feature = "plugins")]
for (plugin, sender) in plugin_senders.clone() {
let (p_send, p_recv) = bounded(1);

match sender.send(PluginCommand::Exit(p_send)) {
Ok(()) => {},
Err(e) => error!("failed to send plugin exit command: {}, {}", plugin, e)
};

match p_recv.recv() {
Ok(()) => debug!("plugin {} exited successfully", plugin),
Err(e) => error!("plugin {} failed to exit: {}", plugin, e)
}
}

// It is guaranteed that the receiver already dropped,
// i.e. the daemon command channel closed.
if let Err(e) = resp_s.send(DaemonStatus::Shutdown) {
Expand Down Expand Up @@ -658,6 +695,11 @@ impl ServiceDaemon {
zc.monitors.push(resp_s);
}

#[cfg(feature = "plugins")]
Command::RegisterPlugin(name, resp_s) => {
zc.register_plugin(name, resp_s);
}

Command::SetOption(daemon_opt) => {
zc.process_set_option(daemon_opt);
}
Expand Down Expand Up @@ -930,6 +972,9 @@ struct Zeroconf {

/// Service instances that are pending for resolving SRV and TXT.
pending_resolves: HashSet<String>,

#[cfg(feature = "plugins")]
plugin_senders: HashMap<String, Sender<PluginCommand>>,
}

impl Zeroconf {
Expand Down Expand Up @@ -979,6 +1024,8 @@ impl Zeroconf {
timers,
status,
pending_resolves: HashSet::new(),
#[cfg(feature = "plugins")]
plugin_senders: HashMap::new(),
}
}

Expand Down Expand Up @@ -1875,12 +1922,26 @@ impl Zeroconf {
// See https://datatracker.ietf.org/doc/html/rfc6763#section-9
const META_QUERY: &str = "_services._dns-sd._udp.local.";

let mut all_services = HashMap::new();

for (k, v) in &self.my_services {
all_services.insert(k, v);
}

let services_by_plugins = self.list_plugin_services();

for (_plugin, services) in &services_by_plugins {
for (k, v) in services {
all_services.insert(k, v);
}
}

for question in msg.questions.iter() {
debug!("query question: {:?}", &question);
let qtype = question.entry.ty;

if qtype == TYPE_PTR {
for service in self.my_services.values() {
for service in all_services.values() {
if question.entry.name == service.get_type()
|| service
.get_subtype()
Expand All @@ -1906,7 +1967,7 @@ impl Zeroconf {
}
} else {
if qtype == TYPE_A || qtype == TYPE_AAAA || qtype == TYPE_ANY {
for service in self.my_services.values() {
for service in all_services.values() {
if service.get_hostname().to_lowercase()
== question.entry.name.to_lowercase()
{
Expand Down Expand Up @@ -1940,7 +2001,7 @@ impl Zeroconf {
}

let name_to_find = question.entry.name.to_lowercase();
let service = match self.my_services.get(&name_to_find) {
let service = match all_services.get(&name_to_find) {
Some(s) => s,
None => continue,
};
Expand Down Expand Up @@ -2315,6 +2376,82 @@ impl Zeroconf {
self.increase_counter(Counter::CacheRefreshSRV, query_srv_count);
self.increase_counter(Counter::CacheRefreshAddr, query_addr_count);
}

// Returns (Plugin, Map<Service, Info>)
#[cfg(feature = "plugins")]
fn list_plugin_services(&self) -> Vec<(String, HashMap<String, ServiceInfo>)> {
let mut output = vec![];

for key in self.plugin_senders.keys() {
output.push((key.clone(), self.list_plugin_services_for(key)));
}

output
}

#[cfg(not(feature = "plugins"))]
fn list_plugin_services(&self) -> Vec<(String, HashMap<String, ServiceInfo>)> {
vec![]
}

#[cfg(feature = "plugins")]
fn list_plugin_services_for(&self, plugin: &str) -> HashMap<String, ServiceInfo> {
let (r_send, r_recv) = bounded(1);

let p_send = match self.plugin_senders.get(plugin) {
None => {
warn!("Could not find plugin {}", plugin);

return HashMap::new();
},
Some(p_send) => p_send,
};

match p_send.send(PluginCommand::ListServices(r_send)) {
Ok(()) => {},
Err(e) => warn!("Failed to send ListServices command: {}", e),
}

r_recv.recv().unwrap_or_else(|e| {
warn!("Could not receive service list: {}", e);

HashMap::new()
})
}

#[cfg(feature = "plugins")]
fn register_plugin(&mut self, name: String, resp_s: Sender<Receiver<PluginCommand>>) {
let (send, recv) = bounded(100);

match resp_s.send(recv) {
Ok(()) => debug!("Registered a plugin"),
Err(e) => {
error!("Failed to send registered plugin's receive handle: {}", e);

return;
}
}

if self.plugin_senders.contains_key(&name) {
let old_send = self.plugin_senders.get(&name).unwrap();

let (exit_send, exit_recv) = bounded(1);

match old_send.send(PluginCommand::Exit(exit_send)) {
Ok(()) => debug!("Requested old plugin exit"),
Err(e) => warn!("Failed to send exit command to a plugin: {}", e)
}

match exit_recv.recv_timeout(Duration::from_secs(1)) {
Ok(()) => debug!("The old plugin exited"),
Err(e) => warn!("Old plugin's exit timed out: {}", e)
}
}

debug!("Registered a new plugin: {}", name);

self.plugin_senders.insert(name, send);
}
}

/// All possible events sent to the client from the daemon
Expand Down Expand Up @@ -2410,6 +2547,9 @@ enum Command {

SetOption(DaemonOption),

#[cfg(feature = "plugins")]
RegisterPlugin(String, Sender<Receiver<PluginCommand>>),

Exit(Sender<DaemonStatus>),
}

Expand All @@ -2429,6 +2569,8 @@ impl fmt::Display for Command {
Self::StopResolveHostname(_) => write!(f, "Command StopResolveHostname"),
Self::Unregister(_, _) => write!(f, "Command Unregister"),
Self::UnregisterResend(_, _) => write!(f, "Command UnregisterResend"),
#[cfg(feature = "plugins")]
Self::RegisterPlugin(name, _) => write!(f, "Command RegisterPlugin: {}", name),
Self::Resolve(_, _) => write!(f, "Command Resolve"),
}
}
Expand Down

0 comments on commit ffa1fe9

Please sign in to comment.