Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a separate queue for querying PACS #503

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pixl_core/src/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ class PixlDiscardError(RuntimeError):

class PixlRequeueMessageError(RuntimeError):
"""Requeue PIXL message."""


class PixlOutOfHoursError(Exception):
"""Nack and requeue PIXL message."""


class PixlStudyNotInPrimaryArchiveError(Exception):
"""Study not in primary archive."""
35 changes: 33 additions & 2 deletions pixl_core/src/core/patient_queue/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@
import aio_pika
from decouple import config

from core.exceptions import PixlDiscardError, PixlRequeueMessageError
from core.exceptions import (
PixlDiscardError,
PixlOutOfHoursError,
PixlRequeueMessageError,
PixlStudyNotInPrimaryArchiveError,
)
from core.patient_queue._base import PixlQueueInterface
from core.patient_queue.message import deserialise
from core.patient_queue.producer import PixlProducer

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
Expand All @@ -45,6 +51,7 @@
self,
queue_name: str,
token_bucket: TokenBucket,
token_bucket_key: str,
callback: Callable[[Message], Awaitable[None]],
) -> None:
"""
Expand All @@ -53,6 +60,7 @@
"""
super().__init__(queue_name=queue_name)
self.token_bucket = token_bucket
self.token_bucket_key = token_bucket_key
self._callback = callback

@property
Expand All @@ -75,7 +83,7 @@
return self

async def _process_message(self, message: AbstractIncomingMessage) -> None:
if not self.token_bucket.has_token:
if not self.token_bucket.has_token(key=self.token_bucket_key):
await asyncio.sleep(1)
await message.reject(requeue=True)
return
Expand All @@ -88,6 +96,29 @@
logger.trace("Requeue message: {} from {}", pixl_message.identifier, requeue)
await asyncio.sleep(1)
await message.reject(requeue=True)
except PixlStudyNotInPrimaryArchiveError as discard:
logger.trace(

Check warning on line 100 in pixl_core/src/core/patient_queue/subscriber.py

View check run for this annotation

Codecov / codecov/patch

pixl_core/src/core/patient_queue/subscriber.py#L99-L100

Added lines #L99 - L100 were not covered by tests
p-j-smith marked this conversation as resolved.
Show resolved Hide resolved
"Discard message: {} from {}. Sending to secondary imaging queue with priority {}.",
pixl_message.identifier,
discard,
message.priority,
)
await asyncio.sleep(1)
await message.reject(requeue=False)
with PixlProducer(

Check warning on line 108 in pixl_core/src/core/patient_queue/subscriber.py

View check run for this annotation

Codecov / codecov/patch

pixl_core/src/core/patient_queue/subscriber.py#L106-L108

Added lines #L106 - L108 were not covered by tests
queue_name="_imaging_secondary",
host=config("RABBITMQ_HOST"),
port=config("RABBITMQ_PORT", cast=int),
username=config("RABBITMQ_USERNAME"),
password=config("RABBITMQ_PASSWORD"),
) as producer:
producer.publish([pixl_message], priority=message.priority)
except PixlOutOfHoursError as nack_requeue:
logger.trace(

Check warning on line 117 in pixl_core/src/core/patient_queue/subscriber.py

View check run for this annotation

Codecov / codecov/patch

pixl_core/src/core/patient_queue/subscriber.py#L115-L117

Added lines #L115 - L117 were not covered by tests
"Nack and requeue message: {} from {}", pixl_message.identifier, nack_requeue
)
await asyncio.sleep(10)
await message.nack(requeue=True)

Check warning on line 121 in pixl_core/src/core/patient_queue/subscriber.py

View check run for this annotation

Codecov / codecov/patch

pixl_core/src/core/patient_queue/subscriber.py#L120-L121

Added lines #L120 - L121 were not covered by tests
except PixlDiscardError as exception:
logger.warning("Failed message {}: {}", pixl_message.identifier, exception)
await (
Expand Down
26 changes: 15 additions & 11 deletions pixl_core/src/core/token_buffer/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
from __future__ import annotations

import typing

import token_bucket as tb


Expand All @@ -22,19 +24,20 @@
Rate limitation is governed by the existence of tokens in a bucket, whereby the
bucket is refilled every second. As long as a token can be retrieved, an item can
be downloaded. Should there be no more tokens inside the bucket, the request is
added back into the queue. Note that the Limiter object can operate the rate on
added back into the queue.

Note that the Limiter object can operate the rate on
different "streams", which are specified by a string object, also called key. This
key has been hard coded here to "pixl" as we do not expect the token bucket to be
responsible for more than one stream at this point in time.
key has been hard coded here to accept one of two values: 'primary' or 'secondary',
representing two different streams.
"""

key = b"pixl"
_keys: typing.ClassVar = ["primary", "secondary"]

def __init__(
self,
rate: float = 5,
capacity: int = 5,
storage: tb.StorageBase = None,
) -> None:
"""
Uses the token bucket implementation from `Falconry`
Expand All @@ -46,18 +49,19 @@
:param storage: Type of storage used to hold the tokens
"""
self._zero_rate = False
storage = tb.MemoryStorage()

if rate == 0:
rate = 1 # tb.Limiter does not allow zero rates, so keep track...
self._zero_rate = True

super().__init__(rate=rate, capacity=capacity, storage=storage)
super().__init__(rate=rate, capacity=capacity, storage=tb.MemoryStorage())

@property
def has_token(self) -> bool:
"""Does this token bucket have a token?"""
return not self._zero_rate and bool(self.consume(self.key))
def has_token(self, key: str) -> bool:
"""Does this token bucket have a token for the given key?"""
if key not in self._keys:
message = f"Key must be one of '{self._keys}', not '{key}'"
raise ValueError(message)

Check warning on line 63 in pixl_core/src/core/token_buffer/tokens.py

View check run for this annotation

Codecov / codecov/patch

pixl_core/src/core/token_buffer/tokens.py#L62-L63

Added lines #L62 - L63 were not covered by tests
p-j-smith marked this conversation as resolved.
Show resolved Hide resolved
return not self._zero_rate and bool(self.consume(key))

@property
def rate(self) -> float:
Expand Down
5 changes: 4 additions & 1 deletion pixl_core/tests/patient_queue/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ async def test_create(mock_message) -> None:

consume = AsyncMock()
async with PixlConsumer(
queue_name=TEST_QUEUE, token_bucket=TokenBucket(), callback=consume
queue_name=TEST_QUEUE,
token_bucket=TokenBucket(),
token_bucket_key="primary", # noqa: S106
callback=consume,
) as consumer:
# Create a Task to run pc.run in the background
task = asyncio.create_task(consumer.run())
Expand Down
9 changes: 5 additions & 4 deletions pixl_core/tests/token_buffer/test_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,21 @@
def test_retrieve_token() -> None:
"""Checks whether token can be retrieved from created token bucket."""
bucket = TokenBucket()
assert bucket.has_token
assert bucket.has_token(key="primary")
assert bucket.has_token(key="secondary")


def test_refill_tokens() -> None:
"""Checks whether the refill happens after one second for a bucket size of 1."""
bucket = TokenBucket(rate=1, capacity=1)

assert bucket.has_token
assert bucket.has_token(key="primary")
# Interrogating the bucket within 1 second we find that it's empty
assert bucket.has_token is False
assert bucket.has_token(key="primary") is False

# but will be refilled after 1 second
time.sleep(1)
assert bucket.has_token
assert bucket.has_token(key="primary")


def test_zero_rate() -> None:
Expand Down
3 changes: 2 additions & 1 deletion pixl_export/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ def test_heartbeat_response_is_200() -> None:


def test_initial_state_has_no_token() -> None:
assert not AppState().token_bucket.has_token
assert not AppState().token_bucket.has_token(key="primary")
assert not AppState().token_bucket.has_token(key="secondary")
Loading