Skip to content

Commit

Permalink
add support for monitors
Browse files Browse the repository at this point in the history
  • Loading branch information
dlesl committed Aug 21, 2021
1 parent c37d101 commit 140571a
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 16 deletions.
2 changes: 1 addition & 1 deletion rustler/src/lib.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use crate::types::{
Atom, Binary, Decoder, Encoder, ListIterator, LocalPid, MapIterator, OwnedBinary,
};
pub mod resource;
pub use crate::resource::ResourceArc;
pub use crate::resource::{ResourceArc, ResourceArcMonitor, Monitor, MonitorResource};

#[doc(hidden)]
pub mod dynamic;
Expand Down
98 changes: 95 additions & 3 deletions rustler/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
//! NIF calls. The struct will be automatically dropped when the BEAM GC decides that there are no
//! more references to the resource.

use std::marker::PhantomData;
use std::{marker::PhantomData, mem::MaybeUninit};
use std::mem;
use std::ops::Deref;
use std::ptr;

use rustler_sys::{ErlNifMonitor, ErlNifPid, ErlNifResourceDown};

use super::{Decoder, Encoder, Env, Error, NifResult, Term};
use crate::wrapper::{
use crate::{LocalPid, wrapper::{
c_void, NifResourceFlags, MUTABLE_NIF_RESOURCE_HANDLE, NIF_ENV, NIF_RESOURCE_TYPE,
};
}};

/// Re-export a type used by the `resource!` macro.
#[doc(hidden)]
Expand All @@ -38,6 +40,21 @@ pub trait ResourceTypeProvider: Sized + Send + Sync + 'static {
fn get_type() -> &'static ResourceType<Self>;
}

#[derive(Clone)]
pub struct Monitor {
inner: ErlNifMonitor,
}

impl PartialEq for Monitor {
fn eq(&self, other: &Self) -> bool {
unsafe { rustler_sys::enif_compare_monitors(&self.inner, &other.inner) == 0 }
}
}

pub trait MonitorResource: ResourceTypeProvider {
fn down(resource: &ResourceArc<Self>, pid: LocalPid, mon: Monitor);
}

impl<T> Encoder for ResourceArc<T>
where
T: ResourceTypeProvider,
Expand Down Expand Up @@ -65,6 +82,24 @@ extern "C" fn resource_destructor<T>(_env: NIF_ENV, handle: MUTABLE_NIF_RESOURCE
}
}

pub extern "C" fn resource_down<T: MonitorResource>(
_env: NIF_ENV,
handle: MUTABLE_NIF_RESOURCE_HANDLE,
pid: *const ErlNifPid,
mon: *const ErlNifMonitor
) {
unsafe {
let pid = LocalPid::new((&*pid).clone());
let mon = Monitor { inner: (&*mon).clone() };
crate::wrapper::resource::keep_resource(handle);
let resource = ResourceArc {
inner: align_alloced_mem_for_struct::<T>(handle) as *mut T,
raw: handle
};
T::down(&resource, pid, mon);
}
}

/// This is the function that gets called from resource! in on_load to create a new
/// resource type.
///
Expand All @@ -75,13 +110,15 @@ extern "C" fn resource_destructor<T>(_env: NIF_ENV, handle: MUTABLE_NIF_RESOURCE
pub fn open_struct_resource_type<T: ResourceTypeProvider>(
env: Env,
name: &str,
down: Option<ErlNifResourceDown>,
flags: NifResourceFlags,
) -> Option<ResourceType<T>> {
let res: Option<NIF_RESOURCE_TYPE> = unsafe {
crate::wrapper::resource::open_resource_type(
env.as_c_arg(),
name.as_bytes(),
Some(resource_destructor::<T>),
down,
flags,
)
};
Expand Down Expand Up @@ -227,6 +264,50 @@ where
}
}

pub trait ResourceArcMonitor {
fn monitor(&self, caller_env: Option<&Env>, pid: &LocalPid) -> Option<Monitor>;
fn demonitor(&self, caller_env: Option<&Env>, mon: &Monitor) -> bool;
}

impl<T: MonitorResource> ResourceArcMonitor for ResourceArc<T> {
fn monitor(&self, caller_env: Option<&Env>, pid: &LocalPid) -> Option<Monitor> {
let env = maybe_env(caller_env);
let mut mon = MaybeUninit::uninit();
let res = unsafe { rustler_sys::enif_monitor_process(env, self.raw, pid.as_c_arg(), mon.as_mut_ptr()) == 0 };
if res {
Some(Monitor {
inner: unsafe { mon.assume_init() }
})
} else {
None
}
}

fn demonitor(&self, caller_env: Option<&Env>, mon: &Monitor) -> bool {
let env = maybe_env(caller_env);
unsafe { rustler_sys::enif_demonitor_process(env, self.raw, &mon.inner) == 0 }
}
}

fn maybe_env(caller_env: Option<&Env>) -> NIF_ENV {
let thread_type = unsafe { rustler_sys::enif_thread_type() };
if thread_type == rustler_sys::ERL_NIF_THR_UNDEFINED {
assert!(caller_env.is_none());
ptr::null_mut()
} else if thread_type == rustler_sys::ERL_NIF_THR_NORMAL_SCHEDULER
|| thread_type == rustler_sys::ERL_NIF_THR_DIRTY_CPU_SCHEDULER
|| thread_type == rustler_sys::ERL_NIF_THR_DIRTY_IO_SCHEDULER
{
let caller_env = caller_env.unwrap();
// Panic if `caller_env` is not the environment of the calling process.
caller_env.pid();

caller_env.as_c_arg()
} else {
panic!("Unrecognized calling thread type");
}
}

#[macro_export]
#[deprecated(since = "0.22.0", note = "Please use `resource!` instead.")]
macro_rules! resource_struct_init {
Expand All @@ -238,13 +319,17 @@ macro_rules! resource_struct_init {
#[macro_export]
macro_rules! resource {
($struct_name:ty, $env: ident) => {
$crate::resource!($struct_name, $env, None)
};
($struct_name:ty, $env: ident, $down: expr) => {
{
static mut STRUCT_TYPE: Option<$crate::resource::ResourceType<$struct_name>> = None;

let temp_struct_type =
match $crate::resource::open_struct_resource_type::<$struct_name>(
$env,
concat!(stringify!($struct_name), "\x00"),
$down,
$crate::resource::NIF_RESOURCE_FLAGS::ERL_NIF_RT_CREATE
) {
Some(inner) => inner,
Expand All @@ -264,3 +349,10 @@ macro_rules! resource {
}
}
}

#[macro_export]
macro_rules! monitor_resource {
($struct_name:ty, $env: ident) => {
$crate::resource!($struct_name, $env, Some($crate::resource::resource_down::<$struct_name>))
}
}
3 changes: 3 additions & 0 deletions rustler/src/types/local_pid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub struct LocalPid {
}

impl LocalPid {
pub unsafe fn new(c: ErlNifPid) -> LocalPid {
LocalPid { c }
}
pub fn as_c_arg(&self) -> &ErlNifPid {
&self.c
}
Expand Down
16 changes: 11 additions & 5 deletions rustler/src/wrapper/resource.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::wrapper::{
NifResourceDtor, NifResourceFlags, NIF_ENV, NIF_RESOURCE_HANDLE, NIF_RESOURCE_TYPE, NIF_TERM,
NifResourceDtor, NifResourceFlags, NIF_ENV, NIF_RESOURCE_HANDLE, NIF_RESOURCE_TYPE, NIF_TERM,
};

use rustler_sys::{ErlNifResourceTypeInit, ErlNifResourceDown};
pub use rustler_sys::{
enif_alloc_resource as alloc_resource, enif_keep_resource as keep_resource,
enif_make_resource as make_resource,
Expand All @@ -10,23 +11,28 @@ pub use rustler_sys::{
pub use rustler_sys::enif_release_resource as release_resource;

use std::mem::MaybeUninit;
use std::ptr;

pub unsafe fn open_resource_type(
env: NIF_ENV,
name: &[u8],
dtor: Option<NifResourceDtor>,
down: Option<ErlNifResourceDown>,
flags: NifResourceFlags,
) -> Option<NIF_RESOURCE_TYPE> {
// Panic if name is not null-terminated.
assert_eq!(name.last().cloned(), Some(0u8));

// Currently unused as per erlang nif documentation
let module_p: *const u8 = ptr::null();
let name_p = name.as_ptr();
let init = ErlNifResourceTypeInit {
dtor,
stop: None,
down,
members: 4,
dyncall: None
};
let res = {
let mut tried = MaybeUninit::uninit();
rustler_sys::enif_open_resource_type(env, module_p, name_p, dtor, flags, tried.as_mut_ptr())
rustler_sys::enif_open_resource_type_x(env, name_p, &init, flags, tried.as_mut_ptr())
};

if res.is_null() {
Expand Down
10 changes: 5 additions & 5 deletions rustler_sys/src/rustler_sys_api.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ pub type ErlNifResourceDynCall =
#[derive(Debug, Copy, Clone)]
#[repr(C)]
pub struct ErlNifResourceTypeInit {
dtor: *const ErlNifResourceDtor,
stop: *const ErlNifResourceStop, // at ERL_NIF_SELECT_STOP event
down: *const ErlNifResourceDown, // enif_monitor_process
members: c_int,
dyncall: *const ErlNifResourceDynCall,
pub dtor: Option<ErlNifResourceDtor>,
pub stop: Option<ErlNifResourceStop>, // at ERL_NIF_SELECT_STOP event
pub down: Option<ErlNifResourceDown>, // enif_monitor_process
pub members: c_int,
pub dyncall: Option<ErlNifResourceDynCall>,
}

/// See [ErlNifSelectFlags](http://erlang.org/doc/man/erl_nif.html#ErlNifSelectFlags) in the Erlang docs.
Expand Down
4 changes: 4 additions & 0 deletions rustler_tests/lib/rustler_test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ defmodule RustlerTest do
def resource_get_integer_field(_), do: err()
def resource_make_immutable(_), do: err()
def resource_immutable_count(), do: err()
def monitor_resource_make(), do: err()
def monitor_resource_monitor(_, _), do: err()
def monitor_resource_down_called(_), do: err()
def monitor_resource_demonitor(_), do: err()

def make_shorter_subbinary(_), do: err()
def parse_integer(_), do: err()
Expand Down
4 changes: 4 additions & 0 deletions rustler_tests/native/rustler_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ rustler::init!(
test_resource::resource_get_integer_field,
test_resource::resource_make_immutable,
test_resource::resource_immutable_count,
test_resource::monitor_resource_make,
test_resource::monitor_resource_monitor,
test_resource::monitor_resource_down_called,
test_resource::monitor_resource_demonitor,
test_atom::atom_to_string,
test_atom::atom_equals_ok,
test_atom::binary_to_atom,
Expand Down
51 changes: 49 additions & 2 deletions rustler_tests/native/rustler_test/src/test_resource.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use rustler::{Env, ResourceArc};
use std::sync::RwLock;
use rustler::{Env, ResourceArc, ResourceArcMonitor, Monitor, MonitorResource, LocalPid};
use std::sync::{RwLock, Mutex};

pub struct TestResource {
test_field: RwLock<i32>,
Expand All @@ -12,9 +12,27 @@ pub struct ImmutableResource {
b: u32,
}

struct TestMonitorResourceInner {
mon: Option<Monitor>,
down_called: bool
}

pub struct TestMonitorResource {
inner: Mutex<TestMonitorResourceInner>
}

impl MonitorResource for TestMonitorResource {
fn down(resource: &ResourceArc<TestMonitorResource>, _pid: LocalPid, mon: Monitor) {
let mut inner = resource.inner.lock().unwrap();
assert!(Some(mon) == inner.mon);
inner.down_called = true;
}
}

pub fn on_load(env: Env) -> bool {
rustler::resource!(TestResource, env);
rustler::resource!(ImmutableResource, env);
rustler::monitor_resource!(TestMonitorResource, env);
true
}

Expand Down Expand Up @@ -69,3 +87,32 @@ pub fn resource_make_immutable(u: u32) -> ResourceArc<ImmutableResource> {
pub fn resource_immutable_count() -> u32 {
COUNT.load(Ordering::SeqCst) as u32
}

#[rustler::nif]
pub fn monitor_resource_make() -> ResourceArc<TestMonitorResource> {
ResourceArc::new(TestMonitorResource {
inner: Mutex::new(TestMonitorResourceInner {
mon: None,
down_called: false
})
})
}

#[rustler::nif]
pub fn monitor_resource_monitor(env: Env, resource: ResourceArc<TestMonitorResource>, pid: LocalPid) {
let mut inner = resource.inner.lock().unwrap();
inner.mon = resource.monitor(Some(&env), &pid);
assert!(inner.mon.is_some());
inner.down_called = false;
}

#[rustler::nif]
pub fn monitor_resource_down_called(resource: ResourceArc<TestMonitorResource>) -> bool {
resource.inner.lock().unwrap().down_called
}

#[rustler::nif]
pub fn monitor_resource_demonitor(env: Env, resource: ResourceArc<TestMonitorResource>) -> bool {
let inner = resource.inner.lock().unwrap();
resource.demonitor(Some(&env), inner.mon.as_ref().unwrap())
}
29 changes: 29 additions & 0 deletions rustler_tests/test/resource_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,33 @@ defmodule RustlerTest.ResourceTest do
# Erlang's exact GC should have cleaned all that up.
assert RustlerTest.resource_immutable_count() == 0
end

test "monitor resource" do
resource = RustlerTest.monitor_resource_make()
parent = self()
spawn(fn ->
RustlerTest.monitor_resource_monitor(resource, self())
send(parent, :done)
end)
receive do
:done -> :ok
end
:timer.sleep(10)
assert RustlerTest.monitor_resource_down_called(resource) == true
end

test "monitor resource demonitor" do
resource = RustlerTest.monitor_resource_make()
parent = self()
spawn(fn ->
RustlerTest.monitor_resource_monitor(resource, self())
RustlerTest.monitor_resource_demonitor(resource)
send(parent, :done)
end)
receive do
:done -> :ok
end
:timer.sleep(10)
assert RustlerTest.monitor_resource_down_called(resource) == false
end
end

0 comments on commit 140571a

Please sign in to comment.