Skip to content

Commit

Permalink
feat(arcor2_runtime): fix CI, simplify callbacks, revert pause, resum…
Browse files Browse the repository at this point in the history
…e event changes
  • Loading branch information
Kinali CI committed Aug 9, 2024
1 parent d52db1c commit 0ccca41
Showing 1 changed file with 32 additions and 90 deletions.
122 changes: 32 additions & 90 deletions src/python/arcor2_runtime/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import threading
from dataclasses import dataclass, field
from functools import wraps
from typing import Any, Callable, TypeVar, cast
from typing import Any, Callable, Optional, TypeVar, cast

from arcor2.cached import CachedProject, CachedScene
from arcor2.data.common import Pose, ProjectRobotJoints, StrEnum
Expand Down Expand Up @@ -43,87 +43,37 @@ class Globals:

disable_action_wrapper: dict[int, bool] = field(default_factory=dict)

paused_callback: Optional[Callable[..., None]] = None
resumed_callback: Optional[Callable[..., None]] = None

g = Globals()
def execute_paused_callback(self) -> None:
"""Executes pause callback."""
if self.paused_callback is not None:
# Disable action wrapper to prevent stack overflow when action is called from PAUSE callback.
thread_id = threading.get_ident()
g.disable_action_wrapper[thread_id] = True

try:
self.paused_callback()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[thread_id] = False

def execute_resumed_callback(self) -> None:
"""Executes resume callback."""
if self.resumed_callback is not None:
# Disable action wrapper to prevent stack overflow when action is called from RESUME callback.
thread_id = threading.get_ident()
g.disable_action_wrapper[thread_id] = True

class PackageStateHandler:
"""Singleton class that manages callbacks for PAUSE and RESUME events."""

_instance = None
_on_pause_callbacks: list[Callable[..., None]] = []
_on_resume_callbacks: list[Callable[..., None]] = []
_instance_lock = threading.Lock()
_execution_lock = threading.Lock()
_pause_callbacks_executed = threading.Event() # regular pausing
_resume_callbacks_executed = threading.Event()

def __init__(self):
"""Forbidden initializer."""
raise RuntimeError("Call get_instance() instead")

@classmethod
def get_instance(cls):
"""Returns the singleton instance of the class."""
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = cls.__new__(cls)
return cls._instance

def add_on_pause_callback(self, on_pause_callback: Callable[..., None]) -> None:
"""Adds a callback to be executed when the script is paused."""
self._on_pause_callbacks.append(on_pause_callback)

def remove_on_pause_callback(self, on_pause_callback: Callable[..., None]) -> None:
"""Removes a callback to be executed when the script is paused."""
self._on_pause_callbacks.remove(on_pause_callback)

def add_on_resume_callback(self, on_resume_callback: Callable[..., None]) -> None:
"""Adds a callback to be executed when the script is resumed."""
self._on_resume_callbacks.append(on_resume_callback)

def remove_on_resume_callback(self, on_resume_callback: Callable[..., None]) -> None:
"""Removes a callback to be executed when the script is resumed."""
self._on_resume_callbacks.remove(on_resume_callback)

def execute_on_pause(self) -> None:
"""Executes all pause callbacks."""
if threading.current_thread() is threading.main_thread():
with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from PAUSE callback.
thread_id = threading.get_ident()
g.disable_action_wrapper[thread_id] = True
try:
self.resumed_callback()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[thread_id] = False

try:
for callback in self._on_pause_callbacks:
callback()
self._pause_callbacks_executed.set()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[threading.get_ident()] = False
else:
self._pause_callbacks_executed.wait()
self._resume_callbacks_executed.clear()

def execute_on_resume(self) -> None:
"""Executes all resume callbacks."""
if threading.current_thread() is threading.main_thread():
with self._execution_lock:
# Disable action wrapper to prevent stack overflow when action is called from RESUME callback.
thread_id = threading.get_ident()
g.disable_action_wrapper[thread_id] = True

try:
for callback in self._on_resume_callbacks:
callback()
self._resume_callbacks_executed.set()
finally:
# Enable action wrapper back.
g.disable_action_wrapper[thread_id] = False
else:
self._resume_callbacks_executed.wait()
self._pause_callbacks_executed.clear()
g = Globals()


def patch_aps(project: CachedProject) -> None:
Expand Down Expand Up @@ -217,6 +167,7 @@ def _get_commands():
if g.pause.is_set():
if cmd in (Commands.STEP, Commands.RESUME):
g.pause.clear()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING)))

if cmd == Commands.STEP:
g.pause_on_next_action.set()
Expand All @@ -226,6 +177,7 @@ def _get_commands():
if cmd == Commands.PAUSE:
g.resume.clear()
g.pause.set()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))


_cmd_thread = threading.Thread(target=_get_commands)
Expand All @@ -241,22 +193,12 @@ def handle_stdin_commands(*, before: bool, breakpoint: bool = False) -> None:
g.pause_on_next_action.clear()
g.resume.clear()
g.pause.set()
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))

if g.pause.is_set():
# Execute on pause callbacks, prevent transfer to PAUSED state if callback causes exception.
PackageStateHandler.get_instance().execute_on_pause()

# Signal that thread is paused.
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.PAUSED)))

# Wait for resume.
g.execute_paused_callback()
g.resume.wait()

# Signal that thread is running.
print_event(PackageState(PackageState.Data(PackageState.Data.StateEnum.RUNNING)))

# Execute on resume callbacks, if callback causes exception, it is in RUNNING state.
PackageStateHandler.get_instance().execute_on_resume()
g.execute_resumed_callback()


F = TypeVar("F", bound=Callable[..., Any])
Expand Down

0 comments on commit 0ccca41

Please sign in to comment.