Skip to content

Commit

Permalink
Add test_machine.py
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiSakuma committed Sep 11, 2024
1 parent 3492f7c commit e40a610
Showing 1 changed file with 140 additions and 0 deletions.
140 changes: 140 additions & 0 deletions tests/fsm/test_machine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any
from unittest.mock import AsyncMock, call

from hypothesis import given, settings
from hypothesis import strategies as st
from transitions import MachineError

from nextline.fsm import Callback, StateMachine
from nextline.types import ResetOptions


class StatefulTest:
def __init__(self, data: st.DataObject) -> None:
self._callback = AsyncMock(spec=Callback)
self._machine = StateMachine(callback=self._callback)
pass

@asynccontextmanager
async def context(self) -> AsyncIterator[None]:
self._prev = self._machine.state
self._callback.reset_mock()
try:
yield
except MachineError:
assert self._machine.state == self._prev
repr(self._machine)

async def initialize(self) -> None:
await self._machine.initialize()
self.assert_after_initialize()

def assert_after_initialize(self) -> None:
assert self._callback.mock_calls == [
call.start(),
call.initialize_run(),
call.on_change_state('initialized'),
]
self._callback.start.assert_awaited_once()
self._callback.initialize_run.assert_awaited_once()

async def run(self) -> None:
await self._machine.run()
assert self._callback.mock_calls == [
call.start_run(),
call.on_change_state('running'),
]
self._callback.start_run.assert_awaited_once()

async def finish(self) -> None:
await self._machine.finish()
assert self._callback.mock_calls == [
call.finish(),
call.on_change_state('finished'),
]

async def reset(self) -> None:
options = ResetOptions()
await self._machine.reset(reset_options=options)
if self._prev == 'finished':
assert self._callback.mock_calls == [
call.reset(reset_options=options),
call.on_exit_finished(),
call.initialize_run(),
call.on_change_state('initialized'),
]
self._callback.on_exit_finished.assert_awaited_once()
else:
assert self._callback.mock_calls == [
call.reset(reset_options=options),
call.initialize_run(),
call.on_change_state('initialized'),
]
self._callback.reset.assert_awaited_once()
self._callback.initialize_run.assert_awaited_once()

async def close(self) -> None:
await self._machine.close()
self.assert_after_close()

def assert_after_close(self) -> None:
if self._prev == 'running':
assert self._callback.mock_calls == [
call.wait_for_run_finish(),
call.close(),
call.on_change_state('closed'),
]
self._callback.wait_for_run_finish.assert_awaited_once()
self._callback.close.assert_awaited_once()
elif self._prev == 'finished':
assert self._callback.mock_calls == [
call.on_exit_finished(),
call.close(),
call.on_change_state('closed'),
]
self._callback.on_exit_finished.assert_awaited_once()
self._callback.close.assert_awaited_once()
elif self._prev == 'closed':
assert self._callback.mock_calls == []
else:
assert self._callback.mock_calls == [
call.close(),
call.on_change_state('closed'),
]
self._callback.close.assert_awaited_once()


async def __aenter__(self) -> 'StatefulTest':
async with self.context():
await self._machine.__aenter__()
self.assert_after_initialize()
return self

async def __aexit__(self, *args: Any, **kwargs: Any) -> bool:
async with self.context():
handled = await self._machine.__aexit__(*args, **kwargs) # type: ignore
self.assert_after_close()
return bool(handled)


@settings(max_examples=200)
@given(data=st.data())
async def test_property(data: st.DataObject) -> None:
test = StatefulTest(data)

TRIGGERS = [
test.initialize,
test.run,
test.finish,
test.reset,
test.close,
]

triggers = data.draw(st.lists(st.sampled_from(TRIGGERS)))

async with test:
for trigger in triggers:
async with test.context():
await trigger()

0 comments on commit e40a610

Please sign in to comment.