Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] add support for monitors #376

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions rustler/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,13 @@ impl<'a> Env<'a> {
/// an `OwnedEnv` on a thread that's managed by the Erlang VM).
///
pub fn send(self, pid: &LocalPid, message: impl Encoder) -> Result<(), SendError> {
let thread_type = unsafe { rustler_sys::enif_thread_type() };
let env = if thread_type == rustler_sys::ERL_NIF_THR_UNDEFINED {
ptr::null_mut()
} else if thread_type == rustler_sys::ERL_NIF_THR_NORMAL_SCHEDULER
|| thread_type == rustler_sys::ERL_NIF_THR_DIRTY_CPU_SCHEDULER
|| thread_type == rustler_sys::ERL_NIF_THR_DIRTY_IO_SCHEDULER
{
let env = if is_scheduler_thread() {
// Panic if `self` is not the environment of the calling process.
self.pid();

self.as_c_arg()
} else {
panic!("Env::send(): unrecognized calling thread type");
ptr::null_mut()
};

let message = message.encode(self);
Expand Down Expand Up @@ -345,3 +339,10 @@ impl Default for OwnedEnv {
Self::new()
}
}

/// Is the current thread an Erlang scheduler thread?
pub fn is_scheduler_thread() -> bool {
// From `enif_thread_type` docs: A positive value indicates a scheduler
// thread while a negative value or zero indicates another type of thread.
unsafe { rustler_sys::enif_thread_type() > 0 }
}
2 changes: 1 addition & 1 deletion rustler/src/lib.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use crate::types::{
OwnedBinary,
};
pub mod resource;
pub use crate::resource::ResourceArc;
pub use crate::resource::{Monitor, MonitorResource, ResourceArc, ResourceArcMonitor};

#[doc(hidden)]
pub mod dynamic;
Expand Down
124 changes: 122 additions & 2 deletions rustler/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
//! NIF calls. The struct will be automatically dropped when the BEAM GC decides that there are no
//! more references to the resource.

use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::ptr;
use std::{marker::PhantomData, mem::MaybeUninit};

use super::{Binary, Decoder, Encoder, Env, Error, NifResult, Term};
use rustler_sys::{ErlNifMonitor, ErlNifPid, ErlNifResourceDown};

use super::{Binary, Decoder, Encoder, Env, Error, LocalPid, Term};
use crate::wrapper::{
c_void, resource, NifResourceFlags, MUTABLE_NIF_RESOURCE_HANDLE, NIF_ENV, NIF_RESOURCE_TYPE,
};
use crate::NifResult;

/// Re-export a type used by the `resource!` macro.
#[doc(hidden)]
Expand All @@ -38,6 +41,27 @@ pub trait ResourceTypeProvider: Sized + Send + Sync + 'static {
fn get_type() -> &'static ResourceType<Self>;
}

#[derive(Clone)]
pub struct Monitor {
inner: ErlNifMonitor,
}

impl Monitor {
fn from_raw(inner: ErlNifMonitor) -> Monitor {
Monitor { inner }
}
}

impl PartialEq for Monitor {
fn eq(&self, other: &Self) -> bool {
unsafe { rustler_sys::enif_compare_monitors(&self.inner, &other.inner) == 0 }
}
}

pub trait MonitorResource: ResourceTypeProvider {
fn down(resource: ResourceArc<Self>, pid: LocalPid, mon: Monitor);
}

impl<T> Encoder for ResourceArc<T>
where
T: ResourceTypeProvider,
Expand Down Expand Up @@ -65,6 +89,27 @@ extern "C" fn resource_destructor<T>(_env: NIF_ENV, handle: MUTABLE_NIF_RESOURCE
}
}

/// Notity that resource that a monitored object is down (erlang_nif-sys
/// requires us to declare this function safe, but it is of course thoroughly
/// unsafe!)
extern "C" fn resource_down<T: MonitorResource>(
_env: NIF_ENV,
handle: MUTABLE_NIF_RESOURCE_HANDLE,
pid: *const ErlNifPid,
mon: *const ErlNifMonitor,
) {
unsafe {
let pid = LocalPid::from_raw(*pid);
let mon = Monitor::from_raw(*mon);
crate::wrapper::resource::keep_resource(handle);
evnu marked this conversation as resolved.
Show resolved Hide resolved
let resource = ResourceArc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this to be a helper function in impl<T> ResourceArc<T>.

inner: align_alloced_mem_for_struct::<T>(handle) as *mut T,
evnu marked this conversation as resolved.
Show resolved Hide resolved
raw: handle,
};
T::down(resource, pid, mon);
}
}

/// This is the function that gets called from resource! in on_load to create a new
/// resource type.
///
Expand All @@ -75,13 +120,15 @@ extern "C" fn resource_destructor<T>(_env: NIF_ENV, handle: MUTABLE_NIF_RESOURCE
pub fn open_struct_resource_type<T: ResourceTypeProvider>(
env: Env,
name: &str,
down: Option<ErlNifResourceDown>,
flags: NifResourceFlags,
) -> Option<ResourceType<T>> {
let res: Option<NIF_RESOURCE_TYPE> = unsafe {
resource::open_resource_type(
env.as_c_arg(),
name.as_bytes(),
Some(resource_destructor::<T>),
down,
flags,
)
};
Expand Down Expand Up @@ -261,6 +308,48 @@ where
}
}

pub trait ResourceArcMonitor {
fn monitor(&self, caller_env: Option<&Env>, pid: &LocalPid) -> Option<Monitor>;
fn demonitor(&self, caller_env: Option<&Env>, mon: &Monitor) -> bool;
}

impl<T: MonitorResource> ResourceArcMonitor for ResourceArc<T> {
fn monitor(&self, caller_env: Option<&Env>, pid: &LocalPid) -> Option<Monitor> {
let env = maybe_env(caller_env);
let mut mon = MaybeUninit::uninit();
let res = unsafe {
rustler_sys::enif_monitor_process(env, self.raw, pid.as_c_arg(), mon.as_mut_ptr()) == 0
};
if res {
Some(Monitor {
inner: unsafe { mon.assume_init() },
})
} else {
None
}
}

fn demonitor(&self, caller_env: Option<&Env>, mon: &Monitor) -> bool {
let env = maybe_env(caller_env);
unsafe { rustler_sys::enif_demonitor_process(env, self.raw, &mon.inner) == 0 }
}
}

fn maybe_env(env: Option<&Env>) -> NIF_ENV {
if crate::env::is_scheduler_thread() {
let env = env.expect("Env required when calling from a scheduler thread");
// Panic if `env` is not the environment of the calling process.
env.pid();
env.as_c_arg()
} else {
assert!(
env.is_none(),
"Env provided when not calling from a scheduler thread"
);
ptr::null_mut()
}
}

#[macro_export]
#[deprecated(since = "0.22.0", note = "Please use `resource!` instead.")]
macro_rules! resource_struct_init {
Expand All @@ -269,16 +358,40 @@ macro_rules! resource_struct_init {
};
}

/// Used by the resource! macro to pass the unsafe `resource_down` callback in a
/// safe way (because `resource_down` cannot be accessed outside of this module)
#[doc(hidden)]
pub trait ResourceDownProvider {
fn down_callback() -> Option<ErlNifResourceDown>;
}

impl ResourceDownProvider for () {
fn down_callback() -> Option<ErlNifResourceDown> {
None
}
}

impl<T: MonitorResource> ResourceDownProvider for T {
fn down_callback() -> Option<ErlNifResourceDown> {
Some(resource_down::<T>)
}
}

#[macro_export]
macro_rules! resource {
($struct_name:ty, $env: ident) => {
$crate::resource!($struct_name, $env, ())
};
($struct_name:ty, $env: ident, $down: ty) => {
{
use $crate::resource::ResourceDownProvider;
static mut STRUCT_TYPE: Option<$crate::resource::ResourceType<$struct_name>> = None;

let temp_struct_type =
match $crate::resource::open_struct_resource_type::<$struct_name>(
$env,
concat!(stringify!($struct_name), "\x00"),
<$down>::down_callback(),
$crate::resource::NIF_RESOURCE_FLAGS::ERL_NIF_RT_CREATE
) {
Some(inner) => inner,
Expand All @@ -298,3 +411,10 @@ macro_rules! resource {
}
}
}

#[macro_export]
macro_rules! monitor_resource {
($struct_name:ty, $env: ident) => {
$crate::resource!($struct_name, $env, $struct_name)
};
}
3 changes: 3 additions & 0 deletions rustler/src/types/local_pid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub struct LocalPid {
}

impl LocalPid {
pub unsafe fn from_raw(c: ErlNifPid) -> LocalPid {
LocalPid { c }
}
pub fn as_c_arg(&self) -> &ErlNifPid {
&self.c
}
Expand Down
12 changes: 8 additions & 4 deletions rustler/src/wrapper/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,29 @@ pub use rustler_sys::{
enif_alloc_resource as alloc_resource, enif_keep_resource as keep_resource,
enif_make_resource as make_resource, enif_release_resource as release_resource,
};
use rustler_sys::{ErlNifResourceDown, ErlNifResourceTypeInit};

use std::mem::MaybeUninit;
use std::ptr;

pub unsafe fn open_resource_type(
env: NIF_ENV,
name: &[u8],
dtor: Option<NifResourceDtor>,
down: Option<ErlNifResourceDown>,
flags: NifResourceFlags,
) -> Option<NIF_RESOURCE_TYPE> {
// Panic if name is not null-terminated.
assert_eq!(name.last().cloned(), Some(0u8));

// Currently unused as per erlang nif documentation
let module_p: *const u8 = ptr::null();
let name_p = name.as_ptr();
let init = ErlNifResourceTypeInit {
dtor,
stop: None,
down,
};
let res = {
let mut tried = MaybeUninit::uninit();
rustler_sys::enif_open_resource_type(env, module_p, name_p, dtor, flags, tried.as_mut_ptr())
rustler_sys::enif_open_resource_type_x(env, name_p, &init, flags, tried.as_mut_ptr())
};

if res.is_null() {
Expand Down
8 changes: 3 additions & 5 deletions rustler_sys/src/rustler_sys_api.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,9 @@ pub type ErlNifResourceDynCall =
#[derive(Debug, Copy, Clone)]
#[repr(C)]
pub struct ErlNifResourceTypeInit {
dtor: *const ErlNifResourceDtor,
stop: *const ErlNifResourceStop, // at ERL_NIF_SELECT_STOP event
down: *const ErlNifResourceDown, // enif_monitor_process
members: c_int,
dyncall: *const ErlNifResourceDynCall,
pub dtor: Option<ErlNifResourceDtor>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is pub(crate) enough?

pub stop: Option<ErlNifResourceStop>, // at ERL_NIF_SELECT_STOP event
pub down: Option<ErlNifResourceDown>, // enif_monitor_process
}

/// See [ErlNifSelectFlags](http://erlang.org/doc/man/erl_nif.html#ErlNifSelectFlags) in the Erlang docs.
Expand Down
4 changes: 4 additions & 0 deletions rustler_tests/lib/rustler_test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ defmodule RustlerTest do
def resource_get_integer_field(_), do: err()
def resource_make_immutable(_), do: err()
def resource_immutable_count(), do: err()
def monitor_resource_make(), do: err()
def monitor_resource_monitor(_, _), do: err()
def monitor_resource_down_called(_), do: err()
def monitor_resource_demonitor(_), do: err()

def resource_make_with_binaries(), do: err()
def resource_make_binaries(_), do: err()
Expand Down
4 changes: 4 additions & 0 deletions rustler_tests/native/rustler_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ rustler::init!(
test_resource::resource_immutable_count,
test_resource::resource_make_with_binaries,
test_resource::resource_make_binaries,
test_resource::monitor_resource_make,
test_resource::monitor_resource_monitor,
test_resource::monitor_resource_down_called,
test_resource::monitor_resource_demonitor,
test_atom::atom_to_string,
test_atom::atom_equals_ok,
test_atom::binary_to_atom,
Expand Down
55 changes: 53 additions & 2 deletions rustler_tests/native/rustler_test/src/test_resource.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use rustler::{Binary, Env, ResourceArc};
use std::sync::RwLock;
use rustler::{Binary, Env, LocalPid, Monitor, MonitorResource, ResourceArc, ResourceArcMonitor};
use std::sync::{Mutex, RwLock};

pub struct TestResource {
test_field: RwLock<i32>,
Expand All @@ -17,10 +17,28 @@ pub struct WithBinaries {
b: Vec<u8>,
}

struct TestMonitorResourceInner {
mon: Option<Monitor>,
down_called: bool,
}

pub struct TestMonitorResource {
inner: Mutex<TestMonitorResourceInner>,
}

impl MonitorResource for TestMonitorResource {
fn down(resource: ResourceArc<TestMonitorResource>, _pid: LocalPid, mon: Monitor) {
let mut inner = resource.inner.lock().unwrap();
assert!(Some(mon) == inner.mon);
inner.down_called = true;
}
}

pub fn on_load(env: Env) -> bool {
rustler::resource!(TestResource, env);
rustler::resource!(ImmutableResource, env);
rustler::resource!(WithBinaries, env);
rustler::monitor_resource!(TestMonitorResource, env);
true
}

Expand Down Expand Up @@ -85,6 +103,16 @@ pub fn resource_make_with_binaries() -> ResourceArc<WithBinaries> {
})
}

#[rustler::nif]
pub fn monitor_resource_make() -> ResourceArc<TestMonitorResource> {
ResourceArc::new(TestMonitorResource {
inner: Mutex::new(TestMonitorResourceInner {
mon: None,
down_called: false,
}),
})
}

#[rustler::nif]
pub fn resource_make_binaries(
env: Env,
Expand All @@ -108,3 +136,26 @@ pub fn resource_make_binaries(
pub fn resource_make_binary_from_vec(env: Env, resource: ResourceArc<WithBinaries>) -> Binary {
resource.make_binary(env, |w| &w.b)
}

#[rustler::nif]
pub fn monitor_resource_monitor(
env: Env,
resource: ResourceArc<TestMonitorResource>,
pid: LocalPid,
) {
let mut inner = resource.inner.lock().unwrap();
inner.mon = resource.monitor(Some(&env), &pid);
assert!(inner.mon.is_some());
inner.down_called = false;
}

#[rustler::nif]
pub fn monitor_resource_down_called(resource: ResourceArc<TestMonitorResource>) -> bool {
resource.inner.lock().unwrap().down_called
}

#[rustler::nif]
pub fn monitor_resource_demonitor(env: Env, resource: ResourceArc<TestMonitorResource>) -> bool {
let inner = resource.inner.lock().unwrap();
resource.demonitor(Some(&env), inner.mon.as_ref().unwrap())
}
Loading
Loading