Skip to content

Commit

Permalink
Merge branch 'release/0.5.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Oct 20, 2023
2 parents cc5005c + 6ae6c18 commit b3fe337
Show file tree
Hide file tree
Showing 15 changed files with 719 additions and 891 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: "3.9"
python-version: "3.11"
- name: Install deps
uses: knowsuchagency/poetry-install@v1
env:
Expand Down
13 changes: 5 additions & 8 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ jobs:
matrix:
cmd:
- black
- flake8
- isort
# mypy is disabled because redis>=5.0 has errors in typing
# - mypy
- autoflake
- ruff
- mypy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Install poetry
run: pipx install poetry
- name: Set up Python
Expand All @@ -47,10 +44,10 @@ jobs:
- 6379:6379
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11"]
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v2
with:
Expand Down
32 changes: 8 additions & 24 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,19 @@ repos:
language: system
types: [python]

- id: autoflake
name: autoflake
entry: autoflake
language: system
types: [ python ]
args: [ --in-place, --remove-all-unused-imports, --remove-duplicate-keys ]

- id: isort
name: isort
entry: isort
language: system
types: [ python ]

- id: flake8
name: Check with Flake8
entry: flake8
- id: ruff
name: Run ruff lints
entry: poetry run ruff
language: system
pass_filenames: false
types: [ python ]
args: [--count, .]
types: [python]
args:
- "--fix"
- "taskiq_redis"
- "tests"

- id: mypy
name: Validate types with MyPy
entry: mypy
language: system
types: [ python ]

- id: yesqa
name: Remove usless noqa
entry: yesqa
language: system
types: [ python ]
1,259 changes: 426 additions & 833 deletions poetry.lock

Large diffs are not rendered by default.

84 changes: 72 additions & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq-redis"
version = "0.5.0"
version = "0.5.1"
description = "Redis integration for taskiq"
authors = ["taskiq-team <[email protected]>"]
readme = "README.md"
Expand All @@ -26,24 +26,21 @@ keywords = [

[tool.poetry.dependencies]
python = "^3.8.1"
taskiq = "^0"
taskiq = ">=0.10.1,<1"
redis = "^5"

[tool.poetry.dev-dependencies]
[tool.poetry.group.dev.dependencies]
pytest = "^7.0"
flake8 = "^6"
mypy = "^1"
isort = "^5.10.1"
yesqa = "^1.3.0"
wemake-python-styleguide = "^0.18"
black = "^22.3.0"
autoflake = "^1.4"
pytest-cov = "^3.0.0"
anyio = "^3.6.1"
pytest-env = "^0.6.2"
fakeredis = "^2"
pre-commit = "^2.20.0"
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
ruff = "^0.1.0"
types-redis = "^4.6.0.7"

[tool.mypy]
strict = true
Expand All @@ -61,10 +58,73 @@ module = ['redis']
ignore_missing_imports = true
strict = false

[tool.isort]
profile = "black"
multi_line_output = 3

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.ruff]
# List of enabled rulsets.
# See https://docs.astral.sh/ruff/rules/ for more information.
select = [
"E", # Error
"F", # Pyflakes
"W", # Pycodestyle
"C90", # McCabe complexity
"I", # Isort
"N", # pep8-naming
"D", # Pydocstyle
"ANN", # Pytype annotations
"S", # Bandit
"B", # Bugbear
"COM", # Commas
"C4", # Comprehensions
"ISC", # Implicit string concat
"PIE", # Unnecessary code
"T20", # Catch prints
"PYI", # validate pyi files
"Q", # Checks for quotes
"RSE", # Checks raise statements
"RET", # Checks return statements
"SLF", # Self checks
"SIM", # Simplificator
"PTH", # Pathlib checks
"ERA", # Checks for commented out code
"PL", # PyLint checks
"RUF", # Specific to Ruff checks
]
ignore = [
"D105", # Missing docstring in magic method
"D107", # Missing docstring in __init__
"D212", # Multi-line docstring summary should start at the first line
"D401", # First line should be in imperative mood
"D104", # Missing docstring in public package
"D100", # Missing docstring in public module
"ANN102", # Missing type annotation for self in method
"ANN101", # Missing type annotation for argument
"ANN401", # typing.Any are disallowed in `**kwargs
"PLR0913", # Too many arguments for function call
"D106", # Missing docstring in public nested class
]
exclude = [".venv/"]
mccabe = { max-complexity = 10 }
line-length = 88

[tool.ruff.per-file-ignores]
"tests/*" = [
"S101", # Use of assert detected
"S301", # Use of pickle detected
"D103", # Missing docstring in public function
"SLF001", # Private member accessed
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
"D101", # Missing docstring in public class
]

[tool.ruff.pydocstyle]
convention = "pep257"
ignore-decorators = ["typing.overload"]

[tool.ruff.pylint]
allow-magic-value-types = ["int", "str", "float"]

[tool.ruff.flake8-bugbear]
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]
2 changes: 2 additions & 0 deletions taskiq_redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Package for redis integration."""
from taskiq_redis.redis_backend import RedisAsyncResultBackend
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
from taskiq_redis.schedule_source import RedisScheduleSource

__all__ = [
"RedisAsyncResultBackend",
"ListQueueBroker",
"PubSubBroker",
"RedisScheduleSource",
]
Empty file added taskiq_redis/py.typed
Empty file.
10 changes: 6 additions & 4 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(
keep_results: bool = True,
result_ex_time: Optional[int] = None,
result_px_time: Optional[int] = None,
):
) -> None:
"""
Constructs a new result backend.
Expand Down Expand Up @@ -87,7 +87,7 @@ async def set_result(
redis_set_params["px"] = self.result_px_time

async with Redis(connection_pool=self.redis_pool) as redis:
await redis.set(**redis_set_params)
await redis.set(**redis_set_params) # type: ignore

async def is_result_ready(self, task_id: str) -> bool:
"""
Expand Down Expand Up @@ -124,9 +124,11 @@ async def get_result(
)

if result_value is None:
raise ResultIsMissingError()
raise ResultIsMissingError

taskiq_result: TaskiqResult[_ReturnType] = pickle.loads(result_value)
taskiq_result: TaskiqResult[_ReturnType] = pickle.loads( # noqa: S301
result_value,
)

if not with_logs:
taskiq_result.log = None
Expand Down
4 changes: 2 additions & 2 deletions taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage

_T = TypeVar("_T") # noqa: WPS111
_T = TypeVar("_T")

logger = getLogger("taskiq.redis_broker")

Expand Down Expand Up @@ -109,7 +109,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
"""
redis_brpop_data_position = 1
async with Redis(connection_pool=self.connection_pool) as redis_conn:
while True: # noqa: WPS457
while True:
yield (await redis_conn.brpop(self.queue_name))[
redis_brpop_data_position
]
97 changes: 97 additions & 0 deletions taskiq_redis/schedule_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import dataclasses
from typing import Any, List, Optional

from redis.asyncio import ConnectionPool, Redis
from taskiq import ScheduleSource
from taskiq.abc.serializer import TaskiqSerializer
from taskiq.scheduler.scheduled_task import ScheduledTask

from taskiq_redis.serializer import PickleSerializer


class RedisScheduleSource(ScheduleSource):
"""
Source of schedules for redis.
This class allows you to store schedules in redis.
Also it supports dynamic schedules.
:param url: url to redis.
:param prefix: prefix for redis schedule keys.
:param buffer_size: buffer size for redis scan.
This is how many keys will be fetched at once.
:param max_connection_pool_size: maximum number of connections in pool.
:param serializer: serializer for data.
:param connection_kwargs: additional arguments for aio-redis ConnectionPool.
"""

def __init__(
self,
url: str,
prefix: str = "schedule",
buffer_size: int = 50,
max_connection_pool_size: Optional[int] = None,
serializer: Optional[TaskiqSerializer] = None,
**connection_kwargs: Any,
) -> None:
self.prefix = prefix
self.connection_pool: ConnectionPool = ConnectionPool.from_url(
url=url,
max_connections=max_connection_pool_size,
**connection_kwargs,
)
self.buffer_size = buffer_size
if serializer is None:
serializer = PickleSerializer()
self.serializer = serializer

async def delete_schedule(self, schedule_id: str) -> None:
"""Remove schedule by id."""
async with Redis(connection_pool=self.connection_pool) as redis:
await redis.delete(f"{self.prefix}:{schedule_id}")

async def add_schedule(self, schedule: ScheduledTask) -> None:
"""
Add schedule to redis.
:param schedule: schedule to add.
:param schedule_id: schedule id.
"""
async with Redis(connection_pool=self.connection_pool) as redis:
await redis.set(
f"{self.prefix}:{schedule.schedule_id}",
self.serializer.dumpb(dataclasses.asdict(schedule)),
)

async def get_schedules(self) -> List[ScheduledTask]:
"""
Get all schedules from redis.
This method is used by scheduler to get all schedules.
:return: list of schedules.
"""
schedules = []
async with Redis(connection_pool=self.connection_pool) as redis:
buffer = []
async for key in redis.scan_iter(f"{self.prefix}:*"):
buffer.append(key)
if len(buffer) >= self.buffer_size:
schedules.extend(await redis.mget(buffer))
buffer = []
if buffer:
schedules.extend(await redis.mget(buffer))
return [
ScheduledTask(**self.serializer.loadb(schedule))
for schedule in schedules
if schedule
]

async def post_send(self, task: ScheduledTask) -> None:
"""Delete a task after it's completed."""
if task.time is not None:
await self.delete_schedule(task.schedule_id)

async def shutdown(self) -> None:
"""Shut down the schedule source."""
await self.connection_pool.disconnect()
16 changes: 16 additions & 0 deletions taskiq_redis/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pickle
from typing import Any

from taskiq.abc.serializer import TaskiqSerializer


class PickleSerializer(TaskiqSerializer):
"""Serializer that uses pickle."""

def dumpb(self, value: Any) -> bytes:
"""Dumps value to bytes."""
return pickle.dumps(value)

def loadb(self, value: bytes) -> Any:
"""Loads value from bytes."""
return pickle.loads(value) # noqa: S301
Loading

0 comments on commit b3fe337

Please sign in to comment.