Skip to content

Commit

Permalink
Add spin_until_complete and spin_for
Browse files Browse the repository at this point in the history
Co-authored-by: Hubert Liberacki <[email protected]>
Signed-off-by: Hubert Liberacki <[email protected]>
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard and hliberacki committed Apr 10, 2024
1 parent c8d3481 commit 9b94372
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 11 deletions.
47 changes: 47 additions & 0 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
This will invalidate all entities derived from the context.
"""

from typing import Callable
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -242,6 +243,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None:
executor.remove_node(node)


def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
"""
Execute work for some time.
Callbacks will be executed by the provided executor until the context associated with the
executor is shut down or the given time duration passes.
:param node: A node to add to the executor to check for work.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait (blocking).
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_for(duration_sec)
finally:
executor.remove_node(node)


def spin_until_complete(
node: 'Node',
condition: Callable[[], bool],
executor: Optional['Executor'] = None,
timeout_sec: Optional[float] = None,
) -> None:
"""
Execute work until the condition is complete.
Callbacks and other work will be executed by the provided executor until ``condition()``
returns ``True`` or the context associated with the executor is shutdown.
:param node: A node to add to the executor to check for work.
:param condition: The callable condition to wait on. If this condition is not related to what
the executor is waiting on and the timeout is infinite, this could block forever.
:param executor: The executor to use, or the global executor if ``None``.
:param timeout_sec: Seconds to wait. Block until the condition is complete
if ``None`` or negative. Don't wait if 0.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
executor.spin_until_complete(condition, timeout_sec)
finally:
executor.remove_node(node)


def spin_until_future_complete(
node: 'Node',
future: Future,
Expand Down
65 changes: 54 additions & 11 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,32 +307,44 @@ def spin(self) -> None:
while self._context.ok() and not self._is_shutdown:
self.spin_once()

def spin_until_future_complete(
def spin_for(self, duration_sec: Optional[float] = None) -> None:
"""Execute callbacks until shutdown or timeout."""
self.spin_until_complete(lambda: False, duration_sec)

def spin_until_complete(
self,
future: Future,
timeout_sec: Optional[float] = None
condition: Callable[[], bool],
timeout_sec: Optional[float] = None,
) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
# Make sure the future wakes this executor when it is done
future.add_done_callback(lambda x: self.wake())

"""Execute callbacks until a given condition is complete or a timeout occurs."""
if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_sec)
while self._context.ok() and not condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = TimeoutObject(timeout_sec)

while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_left)
while self._context.ok() and not condition() and not self._is_shutdown:
self.spin_once_until_complete(condition, timeout_left)
now = time.monotonic()

if now >= end:
return

timeout_left.timeout = end - now

def spin_until_future_complete(
self,
future: Future,
timeout_sec: Optional[float] = None,
) -> None:
"""Execute callbacks until a given future is done or a timeout occurs."""
future.add_done_callback(lambda x: self.wake())
def condition() -> bool:
return future.done()
self.spin_until_complete(condition, timeout_sec)

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
"""
Wait for and execute a single callback.
Expand All @@ -346,6 +358,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None:
"""
raise NotImplementedError()

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
) -> None:
"""
Wait for and execute a single callback.
This should behave in the same way as :meth:`spin_once`.
If needed by the implementation, it should awake other threads waiting.
:param condition: The callable condition to wait on.
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
Don't wait if 0.
"""
raise NotImplementedError()

def spin_once_until_future_complete(
self,
future: Future,
Expand Down Expand Up @@ -826,6 +855,13 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
) -> None:
self._spin_once_impl(timeout_sec, condition)

def spin_once_until_future_complete(
self,
future: Future,
Expand Down Expand Up @@ -898,6 +934,13 @@ def _spin_once_impl(
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_complete(
self,
condition: Callable[[], bool],
timeout_sec: float = None,
) -> None:
self._spin_once_impl(timeout_sec, condition)

def spin_once_until_future_complete(
self,
future: Future,
Expand Down
82 changes: 82 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,22 @@ def test_executor_add_node(self):
assert not executor.add_node(self.node)
assert id(executor) == id(self.node.executor)

def test_executor_spin_for(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

def timer_callback():
pass
timer = self.node.create_timer(1.0, timer_callback)

start = time.monotonic()
executor.spin_for(duration_sec=10.0)
end = time.monotonic()
self.assertGreaterEqual(end - start, 10.0)

timer.cancel()

def test_executor_spin_until_future_complete_timeout(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand All @@ -402,6 +418,72 @@ def timer_callback():

timer.cancel()

def test_executor_spin_until_complete_condition_done(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

def timer_callback():
pass
timer = self.node.create_timer(0.1, timer_callback)

condition_var = False

def set_condition():
nonlocal condition_var
condition_var = True

def condition():
nonlocal condition_var
return condition_var

# Condition complete timeout_sec > 0
self.assertFalse(condition())
t = threading.Thread(target=lambda: set_condition())
t.start()
executor.spin_until_complete(condition, timeout_sec=1.0)
self.assertTrue(condition())

# timeout_sec = None
condition_var = False
self.assertFalse(condition())
t = threading.Thread(target=lambda: set_condition())
t.start()
executor.spin_until_complete(condition, timeout_sec=None)
self.assertTrue(condition())

# Condition complete timeout < 0
condition_var = False
self.assertFalse(condition())
t = threading.Thread(target=lambda: set_condition())
t.start()
executor.spin_until_complete(condition, timeout_sec=-1)
self.assertTrue(condition())

timer.cancel()

def test_executor_spin_until_complete_do_not_wait(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

def timer_callback():
pass
timer = self.node.create_timer(0.1, timer_callback)

condition_var = False

def condition():
nonlocal condition_var
return condition_var

# Do not wait timeout_sec = 0
self.assertFalse(condition())
executor.spin_until_complete(condition, timeout_sec=0)
self.assertFalse(condition())

timer.cancel()

def test_executor_spin_until_future_complete_future_done(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand Down

0 comments on commit 9b94372

Please sign in to comment.