Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arcor2_runtime): provide on pause and on resume callbacks #862

Closed
wants to merge 6 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/python/arcor2_runtime/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,79 @@ class Globals:

lock: threading.Lock = field(default_factory=threading.Lock)

disable_action_wrapper: bool = False


g = Globals()


class PackageStateHandler(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(object) is not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would implement this as a special OT or mixin (plugin for OTs) - that would probably be a cleaner solution and definitely more flexible for you (easier changes in that code). The code can react to states of already existing events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object inheritance removed.

As for the singleton vs OT (mixin): We have a requirement, that if resume conditions aren't met, the script cannot continue. As I understand suggested implementation, there would be daemon thread inside OT handling pause/resume. So it reacts independently on pause/resume.

What we want to achieve is to lock the doors to the workplace on script resume, so operator is safe. This might fail (doors remains open - cannot be locked) - we would like to notify user about this (via blocking dialog service), user will close the door, check passes, resume callback ends, script (thread) continues.

In OT (mixin) implementation this would require additional synchronization between OTs, leading to less readable code (maybe) for integrators. It is probably easy to implement but singleton solution seems easier for me.

"""Singleton class that manages callbacks for PAUSE and RESUME events."""

_instance = None
_on_pause_callbacks: list[Callable[..., None]] = []
_on_resume_callbacks: list[Callable[..., None]] = []
_instance_lock = threading.Lock()
_execution_lock = threading.Lock()

def __init__(self):
"""Forbidden initializer."""
raise RuntimeError("Call get_instance() instead")

@classmethod
def get_instance(cls):
"""Returns the singleton instance of the class."""
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = cls.__new__(cls)
return cls._instance

def add_on_pause_callback(self, on_pause_callback: Callable[..., None]):
"""Adds a callback to be executed when the script is paused."""
self._on_pause_callbacks.append(on_pause_callback)

def remove_on_pause_callback(self, on_pause_callback: Callable[..., None]):
"""Removes a callback to be executed when the script is paused."""
self._on_pause_callbacks.remove(on_pause_callback)

def add_on_resume_callback(self, on_resume_callback: Callable[..., None]):
"""Adds a callback to be executed when the script is resumed."""
self._on_resume_callbacks.append(on_resume_callback)

def remove_on_resume_callback(self, on_resume_callback: Callable[..., None]):
"""Removes a callback to be executed when the script is resumed."""
self._on_resume_callbacks.remove(on_resume_callback)

def execute_on_pause(self):
"""Executes all pause callbacks."""

with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from PAUSE callback.
g.disable_action_wrapper = True

try:
for callback in self._on_pause_callbacks:
callback()
finally:
# Enable action wrapper back.
g.disable_action_wrapper = False

def execute_on_resume(self):
"""Executes all resume callbacks."""

with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from RESUME callback.
g.disable_action_wrapper = True

try:
for callback in self._on_resume_callbacks:
callback()
finally:
# Enable action wrapper back.
g.disable_action_wrapper = False


def patch_aps(project: CachedProject) -> None:
"""orientations / joints have to be monkey-patched with AP's ID in order to
make breakpoints work in @action."""
Expand Down Expand Up @@ -162,11 +231,18 @@ def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None:
g.pause_on_next_action.clear()
g.resume.clear()
g.pause.set()

# Execute on pause callbacks, prevent transfer to PAUSED state if callback causes exception.
PackageStateHandler.get_instance().execute_on_pause()

print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))

if g.pause.is_set():
g.resume.wait()

# Execute on resume callbacks, if callback causes exception, it is in RUNNING state.
PackageStateHandler.get_instance().execute_on_resume()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to call this before "resume" actually happens (before calling g.resume.set())? Like this, the action will start being executed, and at the same time, the action wrapper will be disabled.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the code, handle_stdin_commands is called on thread(s) executing action, so in the current implementation it would execute pause callbacks and equally resume callbacks. In case that resume callbacks execution would be fired before g.resume.set() - from daemon thread handling _get_commands - it would lead to situation where pause callbacks are fired for each thread, but resume callbacks would be fired only once.

Current handling of commands is little confusing for me, because print_event PAUSE is fired from handle_stdin_commands thread and _get_commands thread, but print_event RUNNING is fired only from _get_commands thread.

I adjusted the behavior of pause/resume handling, so print_event for PAUSE and RUNNING is fired only from handle_stdin_commands thread. Is it better, or I overlooked something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, handle_stdin_commands should be renamed... It is not processing stdin commands (_get_commands does that) but does the actual waiting if the script is paused and additionally it can set pause event when there is a breakpoint (that the case when the "command" to pause does not come from the Execution service). It was not renamed properly when doing changes in the code, although the docstring was (somehow) updated. Sorry for possible confusion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted changes made in _get_commands and handle_stdin_commands and left a simple pause and resume callback execution in handle_stdin_commands. This way will callbacks be executed on thread handling action I suppose.



F = TypeVar("F", bound=Callable[..., Any])

Expand Down Expand Up @@ -215,6 +291,20 @@ def wrapper(obj: Generic, *action_args: Any, an: None | str = None, **kwargs: An
if thread_id not in g.depth:
g.depth[thread_id] = 0

# Execute action without wrapping in case that action wrapper is disabled.
if g.disable_action_wrapper:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use "g.pause.is_set()" instead - it is (probably) the same information.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the code correctly, g.pause.is_set() is global flag for all threads. So if one thread sets it, other threads would read the flag here, executing action directly and continue execution - without reaching handle_stdin_commands function and effectively pause.

I leaved disable_action_wrapper field and switched it from bool to dict[int, bool] (thread_id as a key) so this if clause is executed only if actions are disabled for current thread.

Unfortunately I don't have infrastructure to test the code, so I am not sure if it is correct solution, please review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...I will try to go through it and provide feedback tomorrow. Regarding testing - if you manage to get past the linter and mypy, we will see if the tests will pass or not. In the final state, the new functionality should be at least partially covered by tests. The CI currently stops on some formatting issues (have you ran pants fmt ::?) Also, there should be more type annotations in action.py, see https://github.com/robofit/arcor2/actions/runs/10303220550/job/28518715396?pr=862#step:9:33.

g.depth[thread_id] += 1

try:
res = f(obj, *action_args, an=an, **kwargs)
except Arcor2Exception:
g.depth[thread_id] = 0
g.ea[thread_id] = None
raise

g.depth[thread_id] -= 1
return res

try:
action_id: None | str = None
action_mapping_provided = hasattr(obj, ACTION_NAME_ID_MAPPING_ATTR)
Expand Down
Loading