Skip to content

Commit

Permalink
feat(workflow): major refactor of core work to support multiple projects
Browse files Browse the repository at this point in the history
  • Loading branch information
shinybrar committed Aug 18, 2023
1 parent 0d949db commit 1fdd115
Show file tree
Hide file tree
Showing 29 changed files with 1,583 additions and 614 deletions.
9 changes: 5 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ repos:
- --py36-plus
id: pyupgrade
repo: https://github.com/asottile/pyupgrade
rev: v3.8.0
rev: v3.9.0
- hooks:
- id: black
repo: https://github.com/psf/black
rev: 23.3.0
rev: 23.7.0
- hooks:
- additional_dependencies:
- types-attrs
- types-requests
- types-setuptools
- types-PyYAML
- types-toml
args:
- --ignore-missing-imports
- --no-implicit-optional
Expand All @@ -49,7 +50,7 @@ repos:
- hooks:
- id: blacken-docs
repo: https://github.com/asottile/blacken-docs
rev: 1.14.0
rev: 1.15.0
- hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand Down Expand Up @@ -94,4 +95,4 @@ repos:
stages:
- commit-msg
repo: https://github.com/commitizen-tools/commitizen
rev: 3.5.2
rev: 3.5.3
669 changes: 428 additions & 241 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "workflow"
version = "0.1.0"
description = "Workflow Core"
authors = ["Shiny Brar <[email protected]>"]
authors = ["Shiny Brar <[email protected]>"]
license = "MIT"
readme = "README.md"

Expand All @@ -12,6 +12,10 @@ tenacity = "^8.2"
pydantic = "^2.0"
requests = "^2.31"
pyyaml = "^6.0"
click = "^8.1"
toml = "^0.10"
rich = "^13.4"
mergedeep = "^1.3"
pydantic-settings = "^2.0"

[tool.poetry.group.dev.dependencies]
Expand All @@ -20,6 +24,9 @@ pytest = "^7.4"
pytest-cov = "^4.1"
tomli = "^2.0"
pre-commit = "^3.3"
types-requests = "^2.31"
types-toml = "^0.10"
types-setuptools = "^68.0"

[tool.poetry.scripts]
workflow = "workflow.cli.main:cli"
Expand Down
2 changes: 0 additions & 2 deletions workflow.env

This file was deleted.

25 changes: 24 additions & 1 deletion workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
"""Top-level imports for Tasks API."""
from .work.work import Work # noqa: F401s
from logging import getLogger
from pathlib import Path

import toml
from pkg_resources import DistributionNotFound, get_distribution

# Root path to the Skaha Project
BASE_PATH: Path = Path(__file__).absolute().parent.parent
__version__: str = "unknown"

logger = getLogger(__name__)

try:
__version__ = get_distribution("workflow").version
except DistributionNotFound as error: # pragma: no cover
logger.warning(error)
pyproject = toml.load(BASE_PATH / "pyproject.toml")
__version__ = pyproject["tool"]["poetry"]["version"]
except Exception as error: # pragma: no cover
logger.warning(error)
logger.warning("unable to find workflow client version")

from workflow.utils.logger import get_logger # noqa: F401, E402 isort:skip
from workflow.definitions.work import Work # noqa: F401, E402
21 changes: 0 additions & 21 deletions workflow/configs/chime-frb.yaml

This file was deleted.

5 changes: 0 additions & 5 deletions workflow/configs/chime-sps.yaml

This file was deleted.

45 changes: 35 additions & 10 deletions workflow/daemons/audit.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,56 @@
"""Audit Daemon."""
import time
from typing import Any, Dict
from typing import Any, Dict, Optional

import click
from chime_frb_api.modules.buckets import Buckets

from workflow.http.buckets import Buckets


@click.command()
@click.option("--sleep", "-s", default=5, help="Time to sleep between audits")
@click.option(
"--base-url",
"--sleep",
"-s",
default=5,
type=click.INT,
help="Number of seconds to sleep between audits.",
)
@click.option(
"--baseurl",
"-b",
type=click.STRING,
default="http://frb-vsop.chime:8004",
help="Location of the Buckets backend.",
help="Buckets backend.",
)
@click.option(
"--test-mode", default=False, help="Enable test mode to avoid while True loop"
"--token",
"-t",
default=None,
type=click.STRING,
help="Authentication token.",
)
def workflow(sleep: int, base_url: str, test_mode: bool) -> Dict[str, Any]:
"""Audit the Buckets DB for work that is failed, expired, or stale work.
@click.option(
"--test-mode",
default=False,
is_flag=True,
type=click.BOOL,
help="Enable test mode to avoid while True loop",
)
def workflow(
sleep: int, baseurl: str, token: Optional[str], test_mode: bool
) -> Dict[str, Any]:
"""Audit for Buckets Database to find failed, expired, or stale work.
Args:
sleep (int): number of seconds to sleep between audits
base_url (str): location of the Buckets backend
baseurl (str): location of the Buckets backend
token (Optional[str]): authentication token
test_mode (bool): enable test mode to avoid while True loop
Returns:
Dict[str, Any]: Audit results.
"""
buckets: Buckets = Buckets(base_url=base_url, debug=test_mode)
buckets: Buckets = Buckets(baseurl=baseurl, token=token) # type: ignore
if test_mode:
return buckets.audit()
while True:
Expand Down
15 changes: 6 additions & 9 deletions workflow/daemons/transfer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
"""Transfer Daemon."""
import logging
"""Workflow Transfer Daemon."""
import time
from typing import Any, Dict, List

import click
from chime_frb_api.modules.buckets import Buckets
from chime_frb_api.modules.results import Results

# Setup logging and formatter
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
log = logging.getLogger(__name__)
from workflow import get_logger
from workflow.http.buckets import Buckets
from workflow.http.results import Results

log = get_logger("workflow.daemons.transfer")


def deposit_work_to_results(
Expand Down
File renamed without changes.
120 changes: 120 additions & 0 deletions workflow/definitions/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Work Object Configuration."""
from typing import List, Optional

from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class Archive(BaseSettings):
"""Archive Configuration.
This class is used to configure the archive strategy for the work.
Args:
BaseModel (BaseModel): Pydantic BaseModel.
Attributes:
results (bool): Archive results for the work.
products (str): Archive strategy for the products.
plots (str): Archive strategy for the plots.
logs (str): Archive strategy for the logs.
"""

model_config = SettingsConfigDict(
title="Workflow Archive Object",
validate_default=True,
validate_assignment=True,
validate_return=True,
revalidate_instances="always",
env_prefix="WORKFLOW_CONFIG_ARCHIVE_",
secrets_dir="/run/secrets",
extra="ignore",
)

results: bool = Field(
default=True,
description="Archive results for the work.",
examples=[True],
)
products: str = Field(
default="copy",
description="Archive strategy for the products.",
examples=["copy"],
)
plots: str = Field(
default="copy",
description="Archive strategy for the plots.",
examples=["move"],
)
logs: str = Field(
default="move",
description="Archive strategy for the logs.",
examples=["delete"],
)

@field_validator("products", "plots", "logs")
def validate_archive(cls, value: str) -> str:
"""Validate the archive strategy.
Args:
value (str): Archive strategy.
Raises:
ValueError: If the archive strategy is not valid.
Returns:
str: The archive strategy.
"""
strategy = ["pass", "copy", "move", "delete", "upload"]
if value not in strategy:
raise ValueError(f"archive strategy must be one of {strategy}")
return value


class Config(BaseSettings):
"""Workflow Configuration Object.
Args:
BaseSettings (BaseSettings): Pydantic BaseModel with settings.
Attributes:
archive (Archive): Archive Configuration.
metrics (bool): Generate metrics from work lifecycle.
parent (Optional[str]): Parent Pipeline ID. None implies no parent.
orgs (List[str]): GitHub organization[s] the work belongs to.
teams (Optional[List[str]]): GitHub Team[s] with access to the work.
"""

model_config = SettingsConfigDict(
title="Workflow Config Object",
validate_default=True,
validate_assignment=True,
validate_return=True,
revalidate_instances="always",
env_prefix="WORKFLOW_CONFIG_",
secrets_dir="/run/secrets",
extra="ignore",
)

archive: Archive = Archive()
metrics: bool = Field(
default=False,
description="Generate metrics from work lifecycle.",
)
parent: Optional[str] = Field(
default=None,
description="Parent Pipeline ID. None implies no parent.",
examples=["5f9b5c5d7b54b5a9c5e5b5c5"],
)
orgs: List[str] = Field(
default=["chimefrb"],
description="GitHub organization[s] the work belongs to.",
examples=[["octocat", "chimefrb"]],
)
teams: Optional[List[str]] = Field(
default=None,
description="""
GitHub Team[s] with access to the work. None implies all org members.
""",
examples=[["developers", "admins"]],
)
19 changes: 14 additions & 5 deletions workflow/work/notify.py → workflow/definitions/notify.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Notification Configuration."""
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, ConfigDict, Field, StrictStr
from pydantic import BaseModel, Field, StrictStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Slack(BaseModel):
class Slack(BaseSettings):
"""Slack Configuration.
This class is used to configure the slack notification strategy for the work.
Expand All @@ -23,10 +24,14 @@ class Slack(BaseModel):
reply (Dict[str, Any]): Status of the slack notification.
"""

model_config = ConfigDict(
title="Slack Configuration",
model_config = SettingsConfigDict(
title="Workflow Notify Object",
validate_default=True,
validate_assignment=True,
validate_return=True,
revalidate_instances="always",
env_prefix="WORKFLOW_NOTIFY_SLACK_",
secrets_dir="/run/secrets",
extra="forbid",
)

Expand Down Expand Up @@ -78,4 +83,8 @@ class Notify(BaseModel):
slack (Slack): Send slack notifications for the work.
"""

slack: Slack = Slack()
slack: Slack = Field(
default_factory=Slack,
description="Send slack notifications for the work.",
examples=[Slack()],
)
Loading

0 comments on commit 1fdd115

Please sign in to comment.