From b31c0d6d22f17ae75f30784085a93e0037a7a1f8 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Mon, 9 Sep 2024 18:01:16 -0300 Subject: [PATCH 1/8] Add test project Signed-off-by: Laura Couto --- performance-test/.gitignore | 151 ++++++++++++++++++ performance-test/.viz/stats.json | 1 + performance-test/README.md | 98 ++++++++++++ performance-test/conf/README.md | 20 +++ performance-test/conf/base/catalog.yml | 7 + performance-test/conf/base/parameters.yml | 0 .../conf/base/parameters_expense_analysis.yml | 5 + performance-test/conf/base/spark.yml | 8 + performance-test/conf/local/.gitkeep | 0 performance-test/notebooks/.gitkeep | 0 performance-test/pyproject.toml | 43 +++++ performance-test/requirements.txt | 11 ++ .../src/performance_test/__init__.py | 4 + .../src/performance_test/__main__.py | 24 +++ .../src/performance_test/hooks.py | 38 +++++ .../src/performance_test/pipeline_registry.py | 16 ++ .../performance_test/pipelines/__init__.py | 0 .../pipelines/expense_analysis/__init__.py | 10 ++ .../pipelines/expense_analysis/nodes.py | 13 ++ .../pipelines/expense_analysis/pipeline.py | 21 +++ .../src/performance_test/settings.py | 44 +++++ .../pipelines/expense_analysis/__init__.py | 0 .../expense_analysis/test_pipeline.py | 9 ++ 23 files changed, 523 insertions(+) create mode 100644 performance-test/.gitignore create mode 100644 performance-test/.viz/stats.json create mode 100644 performance-test/README.md create mode 100644 performance-test/conf/README.md create mode 100644 performance-test/conf/base/catalog.yml create mode 100644 performance-test/conf/base/parameters.yml create mode 100644 performance-test/conf/base/parameters_expense_analysis.yml create mode 100644 performance-test/conf/base/spark.yml create mode 100644 performance-test/conf/local/.gitkeep create mode 100644 performance-test/notebooks/.gitkeep create mode 100644 performance-test/pyproject.toml create mode 100644 performance-test/requirements.txt create mode 100644 performance-test/src/performance_test/__init__.py create mode 100644 performance-test/src/performance_test/__main__.py create mode 100644 performance-test/src/performance_test/hooks.py create mode 100644 performance-test/src/performance_test/pipeline_registry.py create mode 100644 performance-test/src/performance_test/pipelines/__init__.py create mode 100644 performance-test/src/performance_test/pipelines/expense_analysis/__init__.py create mode 100644 performance-test/src/performance_test/pipelines/expense_analysis/nodes.py create mode 100644 performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py create mode 100644 performance-test/src/performance_test/settings.py create mode 100644 performance-test/tests/pipelines/expense_analysis/__init__.py create mode 100644 performance-test/tests/pipelines/expense_analysis/test_pipeline.py diff --git a/performance-test/.gitignore b/performance-test/.gitignore new file mode 100644 index 0000000000..51a4444c61 --- /dev/null +++ b/performance-test/.gitignore @@ -0,0 +1,151 @@ +########################## +# KEDRO PROJECT + +# ignore all local configuration +conf/local/** +!conf/local/.gitkeep + +# ignore potentially sensitive credentials files +conf/**/*credentials* + +# ignore everything in the following folders +data/** + +# except their sub-folders +!data/**/ + +# also keep all .gitkeep files +!.gitkeep + +# keep also the example dataset +!data/01_raw/* + + +########################## +# Common files + +# IntelliJ +.idea/ +*.iml +out/ +.idea_modules/ + +### macOS +*.DS_Store +.AppleDouble +.LSOverride +.Trashes + +# Vim +*~ +.*.swo +.*.swp + +# emacs +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc + +# JIRA plugin +atlassian-ide-plugin.xml + +# C extensions +*.so + +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/performance-test/.viz/stats.json b/performance-test/.viz/stats.json new file mode 100644 index 0000000000..0967ef424b --- /dev/null +++ b/performance-test/.viz/stats.json @@ -0,0 +1 @@ +{} diff --git a/performance-test/README.md b/performance-test/README.md new file mode 100644 index 0000000000..64ac211c6f --- /dev/null +++ b/performance-test/README.md @@ -0,0 +1,98 @@ +# performance-test + +## Overview + +This is your new Kedro project with PySpark setup, which was generated using `kedro 0.19.8`. + +Take a look at the [Kedro documentation](https://docs.kedro.org) to get started. + +## Rules and guidelines + +In order to get the best out of the template: + +* Don't remove any lines from the `.gitignore` file we provide +* Make sure your results can be reproduced by following a [data engineering convention](https://docs.kedro.org/en/stable/faq/faq.html#what-is-data-engineering-convention) +* Don't commit data to your repository +* Don't commit any credentials or your local configuration to your repository. Keep all your credentials and local configuration in `conf/local/` + +## How to install dependencies + +Declare any dependencies in `requirements.txt` for `pip` installation. + +To install them, run: + +``` +pip install -r requirements.txt +``` + +## How to run your Kedro pipeline + +You can run your Kedro project with: + +``` +kedro run +``` + +## How to test your Kedro project + +Have a look at the files `src/tests/test_run.py` and `src/tests/pipelines/data_science/test_pipeline.py` for instructions on how to write your tests. Run the tests as follows: + +``` +pytest +``` + +To configure the coverage threshold, look at the `.coveragerc` file. + +## Project dependencies + +To see and update the dependency requirements for your project use `requirements.txt`. Install the project requirements with `pip install -r requirements.txt`. + +[Further information about project dependencies](https://docs.kedro.org/en/stable/kedro_project_setup/dependencies.html#project-specific-dependencies) + +## How to work with Kedro and notebooks + +> Note: Using `kedro jupyter` or `kedro ipython` to run your notebook provides these variables in scope: `catalog`, `context`, `pipelines` and `session`. +> +> Jupyter, JupyterLab, and IPython are already included in the project requirements by default, so once you have run `pip install -r requirements.txt` you will not need to take any extra steps before you use them. + +### Jupyter +To use Jupyter notebooks in your Kedro project, you need to install Jupyter: + +``` +pip install jupyter +``` + +After installing Jupyter, you can start a local notebook server: + +``` +kedro jupyter notebook +``` + +### JupyterLab +To use JupyterLab, you need to install it: + +``` +pip install jupyterlab +``` + +You can also start JupyterLab: + +``` +kedro jupyter lab +``` + +### IPython +And if you want to run an IPython session: + +``` +kedro ipython +``` + +### How to ignore notebook output cells in `git` +To automatically strip out all output cell contents before committing to `git`, you can use tools like [`nbstripout`](https://github.com/kynan/nbstripout). For example, you can add a hook in `.git/config` with `nbstripout --install`. This will run `nbstripout` before anything is committed to `git`. + +> *Note:* Your output cells will be retained locally. + +## Package your Kedro project + +[Further information about building project documentation and packaging your project](https://docs.kedro.org/en/stable/tutorial/package_a_project.html) diff --git a/performance-test/conf/README.md b/performance-test/conf/README.md new file mode 100644 index 0000000000..b135e80c2c --- /dev/null +++ b/performance-test/conf/README.md @@ -0,0 +1,20 @@ +# What is this for? + +This folder should be used to store configuration files used by Kedro or by separate tools. + +This file can be used to provide users with instructions for how to reproduce local configuration with their own credentials. You can edit the file however you like, but you may wish to retain the information below and add your own section in the section titled **Instructions**. + +## Local configuration + +The `local` folder should be used for configuration that is either user-specific (e.g. IDE configuration) or protected (e.g. security keys). + +> *Note:* Please do not check in any local configuration to version control. + +## Base configuration + +The `base` folder is for shared configuration, such as non-sensitive and project-related configuration that may be shared across team members. + +WARNING: Please do not put access credentials in the base configuration folder. + +## Find out more +You can find out more about configuration from the [user guide documentation](https://docs.kedro.org/en/stable/configuration/configuration_basics.html). diff --git a/performance-test/conf/base/catalog.yml b/performance-test/conf/base/catalog.yml new file mode 100644 index 0000000000..dbe458951a --- /dev/null +++ b/performance-test/conf/base/catalog.yml @@ -0,0 +1,7 @@ +congress_expenses: + type: spark.SparkDataset + filepath: data/gastos-deputados.csv + file_format: csv + load_args: + header: True + inferSchema: True diff --git a/performance-test/conf/base/parameters.yml b/performance-test/conf/base/parameters.yml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/performance-test/conf/base/parameters_expense_analysis.yml b/performance-test/conf/base/parameters_expense_analysis.yml new file mode 100644 index 0000000000..d3755629b2 --- /dev/null +++ b/performance-test/conf/base/parameters_expense_analysis.yml @@ -0,0 +1,5 @@ +# This is a boilerplate parameters config generated for pipeline 'expense_analysis' +# using Kedro 0.19.8. +# +# Documentation for this file format can be found in "Parameters" +# Link: https://docs.kedro.org/en/0.19.8/configuration/parameters.html diff --git a/performance-test/conf/base/spark.yml b/performance-test/conf/base/spark.yml new file mode 100644 index 0000000000..ab831b62ac --- /dev/null +++ b/performance-test/conf/base/spark.yml @@ -0,0 +1,8 @@ +# You can define spark specific configuration here. + +spark.driver.maxResultSize: 3g +spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem +spark.sql.execution.arrow.pyspark.enabled: true + +# https://docs.kedro.org/en/stable/integrations/pyspark_integration.html#tips-for-maximising-concurrency-using-threadrunner +spark.scheduler.mode: FAIR diff --git a/performance-test/conf/local/.gitkeep b/performance-test/conf/local/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/performance-test/notebooks/.gitkeep b/performance-test/notebooks/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/performance-test/pyproject.toml b/performance-test/pyproject.toml new file mode 100644 index 0000000000..76cf0e81fa --- /dev/null +++ b/performance-test/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = [ "setuptools",] +build-backend = "setuptools.build_meta" + +[project] +name = "performance_test" +readme = "README.md" +dynamic = [ "dependencies", "version",] + +[project.scripts] +performance-test = "performance_test.__main__:main" + +[tool.kedro] +package_name = "performance_test" +project_name = "performance-test" +kedro_init_version = "0.19.8" +tools = [ "PySpark", "Linting",] +example_pipeline = "False" +source_dir = "src" + +[tool.ruff] +line-length = 88 +show-fixes = true +select = [ "F", "W", "E", "I", "UP", "PL", "T201",] +ignore = [ "E501",] + +[project.entry-points."kedro.hooks"] + +[tool.ruff.format] +docstring-code-format = true + +[tool.setuptools.dynamic.dependencies] +file = "requirements.txt" + +[tool.setuptools.dynamic.version] +attr = "performance_test.__version__" + +[tool.setuptools.packages.find] +where = [ "src",] +namespaces = false + +[tool.kedro_telemetry] +project_id = "f041db02285d4cbba64cc480ca13ebed" diff --git a/performance-test/requirements.txt b/performance-test/requirements.txt new file mode 100644 index 0000000000..224b9f5f94 --- /dev/null +++ b/performance-test/requirements.txt @@ -0,0 +1,11 @@ +ipython>=8.10 +jupyterlab>=3.0 +kedro~=0.19.8 +kedro-datasets>=3.0; python_version >= "3.9" +kedro-datasets>=1.0; python_version < "3.9" +kedro-viz>=6.7.0 +kedro[jupyter] +notebook +ruff~=0.1.8 +scikit-learn~=1.5.1; python_version >= "3.9" +scikit-learn<=1.4.0,>=1.0; python_version < "3.9" diff --git a/performance-test/src/performance_test/__init__.py b/performance-test/src/performance_test/__init__.py new file mode 100644 index 0000000000..2659636cd7 --- /dev/null +++ b/performance-test/src/performance_test/__init__.py @@ -0,0 +1,4 @@ +"""performance-test +""" + +__version__ = "0.1" diff --git a/performance-test/src/performance_test/__main__.py b/performance-test/src/performance_test/__main__.py new file mode 100644 index 0000000000..79d0fb2316 --- /dev/null +++ b/performance-test/src/performance_test/__main__.py @@ -0,0 +1,24 @@ +"""performance-test file for ensuring the package is executable +as `performance-test` and `python -m performance_test` +""" +import sys +from pathlib import Path +from typing import Any + +from kedro.framework.cli.utils import find_run_command +from kedro.framework.project import configure_project + + +def main(*args, **kwargs) -> Any: + package_name = Path(__file__).parent.name + configure_project(package_name) + + interactive = hasattr(sys, 'ps1') + kwargs["standalone_mode"] = not interactive + + run = find_run_command(package_name) + return run(*args, **kwargs) + + +if __name__ == "__main__": + main() diff --git a/performance-test/src/performance_test/hooks.py b/performance-test/src/performance_test/hooks.py new file mode 100644 index 0000000000..c7a92e5bfe --- /dev/null +++ b/performance-test/src/performance_test/hooks.py @@ -0,0 +1,38 @@ +from typing import Dict + +from kedro.framework.hooks import hook_impl +from kedro.pipeline import Pipeline +from pyspark import SparkConf +from pyspark.sql import SparkSession + + +class SparkHooks: + @hook_impl + def after_context_created(self, context) -> None: + """Initialises a SparkSession using the config + defined in project's conf folder. + """ + + # Load the spark configuration in spark.yaml using the config loader + parameters = context.config_loader["spark"] + spark_conf = SparkConf().setAll(parameters.items()) + + # Initialise the spark session + spark_session_conf = ( + SparkSession.builder.appName(context.project_path.name) + .enableHiveSupport() + .config(conf=spark_conf) + ) + _spark_session = spark_session_conf.getOrCreate() + _spark_session.sparkContext.setLogLevel("WARN") + + +def register_pipelines(self) -> Dict[str, Pipeline]: + from performance_test.pipelines.expense_analysis import ( + pipeline as expense_analysis_pipeline, + ) + + return { + "__default__": expense_analysis_pipeline.create_pipeline(), + "expense_analysis": expense_analysis_pipeline.create_pipeline(), + } diff --git a/performance-test/src/performance_test/pipeline_registry.py b/performance-test/src/performance_test/pipeline_registry.py new file mode 100644 index 0000000000..2d4272e312 --- /dev/null +++ b/performance-test/src/performance_test/pipeline_registry.py @@ -0,0 +1,16 @@ +"""Project pipelines.""" +from typing import Dict + +from kedro.framework.project import find_pipelines +from kedro.pipeline import Pipeline + + +def register_pipelines() -> Dict[str, Pipeline]: + """Register the project's pipelines. + + Returns: + A mapping from pipeline names to ``Pipeline`` objects. + """ + pipelines = find_pipelines() + pipelines["__default__"] = sum(pipelines.values()) + return pipelines diff --git a/performance-test/src/performance_test/pipelines/__init__.py b/performance-test/src/performance_test/pipelines/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/__init__.py b/performance-test/src/performance_test/pipelines/expense_analysis/__init__.py new file mode 100644 index 0000000000..398aa8844b --- /dev/null +++ b/performance-test/src/performance_test/pipelines/expense_analysis/__init__.py @@ -0,0 +1,10 @@ +""" +This is a boilerplate pipeline 'expense_analysis' +generated using Kedro 0.19.8 +""" + +from .pipeline import create_pipeline + +__all__ = ["create_pipeline"] + +__version__ = "0.1" diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py new file mode 100644 index 0000000000..a58bf280c8 --- /dev/null +++ b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py @@ -0,0 +1,13 @@ +import pyspark.sql.functions as F + + +def analyze_expenses(congress_expenses): + expenses_per_party = congress_expenses.groupBy("sgpartido").agg( + F.sum("vlrliquido").alias("total_expense") + ).orderBy(F.desc("total_expense")) + + largest_expense_source = congress_expenses.groupBy("txtdescricao").agg( + F.sum("vlrliquido").alias("total_expense") + ).orderBy(F.desc("total_expense")).limit(1) + + return expenses_per_party, largest_expense_source diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py new file mode 100644 index 0000000000..6b3cf3397f --- /dev/null +++ b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py @@ -0,0 +1,21 @@ +""" +This is a boilerplate pipeline 'expense_analysis' +generated using Kedro 0.19.8 +""" + +from kedro.pipeline import Pipeline, node + +from .nodes import analyze_expenses + + +def create_pipeline(**kwargs): + return Pipeline( + [ + node( + func=analyze_expenses, + inputs="congress_expenses", + outputs=["expenses_per_party", "largest_expense_source"], + name="analyze_expenses_node", + ) + ] + ) diff --git a/performance-test/src/performance_test/settings.py b/performance-test/src/performance_test/settings.py new file mode 100644 index 0000000000..837b8ef29d --- /dev/null +++ b/performance-test/src/performance_test/settings.py @@ -0,0 +1,44 @@ +"""Project settings. There is no need to edit this file unless you want to change values +from the Kedro defaults. For further information, including these default values, see +https://docs.kedro.org/en/stable/kedro_project_setup/settings.html.""" + +# Instantiated project hooks. +from performance_test.hooks import SparkHooks # noqa: E402 + +# Hooks are executed in a Last-In-First-Out (LIFO) order. +HOOKS = (SparkHooks(),) + +# Installed plugins for which to disable hook auto-registration. +# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",) + +# Class that manages storing KedroSession data. +# from kedro.framework.session.store import BaseSessionStore +# SESSION_STORE_CLASS = BaseSessionStore +# Keyword arguments to pass to the `SESSION_STORE_CLASS` constructor. +# SESSION_STORE_ARGS = { +# "path": "./sessions" +# } + +# Directory that holds configuration. +# CONF_SOURCE = "conf" + +# Class that manages how configuration is loaded. +from kedro.config import OmegaConfigLoader # noqa: E402 + +CONFIG_LOADER_CLASS = OmegaConfigLoader +# Keyword arguments to pass to the `CONFIG_LOADER_CLASS` constructor. +CONFIG_LOADER_ARGS = { + "base_env": "base", + "default_run_env": "local", + "config_patterns": { + "spark": ["spark*", "spark*/**"], + } +} + +# Class that manages Kedro's library components. +# from kedro.framework.context import KedroContext +# CONTEXT_CLASS = KedroContext + +# Class that manages the Data Catalog. +# from kedro.io import DataCatalog +# DATA_CATALOG_CLASS = DataCatalog diff --git a/performance-test/tests/pipelines/expense_analysis/__init__.py b/performance-test/tests/pipelines/expense_analysis/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/performance-test/tests/pipelines/expense_analysis/test_pipeline.py b/performance-test/tests/pipelines/expense_analysis/test_pipeline.py new file mode 100644 index 0000000000..9da160d217 --- /dev/null +++ b/performance-test/tests/pipelines/expense_analysis/test_pipeline.py @@ -0,0 +1,9 @@ +""" +This is a boilerplate test file for pipeline 'expense_analysis' +generated using Kedro 0.19.8. +Please add your pipeline tests here. + +Kedro recommends using `pytest` framework, more info about it can be found +in the official documentation: +https://docs.pytest.org/en/latest/getting-started.html +""" From 43b757164016e11f13e195c3b6e099edfdaa9163 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Mon, 9 Sep 2024 19:39:30 -0300 Subject: [PATCH 2/8] Add delays Signed-off-by: Laura Couto --- performance-test/conf/base/catalog.yml | 27 ++++++++++++++++++ performance-test/pyproject.toml | 2 +- .../src/performance_test/hooks.py | 3 ++ .../pipelines/expense_analysis/nodes.py | 28 ++++++++++++++++--- .../pipelines/expense_analysis/pipeline.py | 28 +++++++++++++++---- 5 files changed, 77 insertions(+), 11 deletions(-) diff --git a/performance-test/conf/base/catalog.yml b/performance-test/conf/base/catalog.yml index dbe458951a..d1bc6a7aac 100644 --- a/performance-test/conf/base/catalog.yml +++ b/performance-test/conf/base/catalog.yml @@ -5,3 +5,30 @@ congress_expenses: load_args: header: True inferSchema: True + +expenses_per_party: + type: spark.SparkDataset + filepath: data/output/expenses_per_party.parquet + file_format: parquet + save_args: + sep: ',' + header: True + mode: overwrite + +largest_expense_source: + type: spark.SparkDataset + filepath: data/output/largest_expense_source.parquet + file_format: parquet + save_args: + sep: ',' + header: True + mode: overwrite + +top_spender_per_party: + type: spark.SparkDataset + filepath: data/output/top_spender_per_party.parquet + file_format: parquet + save_args: + sep: ',' + header: True + mode: overwrite diff --git a/performance-test/pyproject.toml b/performance-test/pyproject.toml index 76cf0e81fa..e059ee1f0d 100644 --- a/performance-test/pyproject.toml +++ b/performance-test/pyproject.toml @@ -40,4 +40,4 @@ where = [ "src",] namespaces = false [tool.kedro_telemetry] -project_id = "f041db02285d4cbba64cc480ca13ebed" +project_id = "" diff --git a/performance-test/src/performance_test/hooks.py b/performance-test/src/performance_test/hooks.py index c7a92e5bfe..40393323aa 100644 --- a/performance-test/src/performance_test/hooks.py +++ b/performance-test/src/performance_test/hooks.py @@ -1,3 +1,4 @@ +from time import sleep from typing import Dict from kedro.framework.hooks import hook_impl @@ -5,6 +6,7 @@ from pyspark import SparkConf from pyspark.sql import SparkSession +HOOK_DELAY = 0 class SparkHooks: @hook_impl @@ -23,6 +25,7 @@ def after_context_created(self, context) -> None: .enableHiveSupport() .config(conf=spark_conf) ) + sleep(HOOK_DELAY) _spark_session = spark_session_conf.getOrCreate() _spark_session.sparkContext.setLogLevel("WARN") diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py index a58bf280c8..b3bc259723 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py @@ -1,13 +1,33 @@ +from time import sleep + import pyspark.sql.functions as F +from pyspark.sql.window import Window +DATASET_LOAD_DELAY = 0 +FILE_SAVE_DELAY = 0 -def analyze_expenses(congress_expenses): - expenses_per_party = congress_expenses.groupBy("sgpartido").agg( +def analyze_expenses_per_party(congress_expenses): + """Calculate total expense per party.""" + sleep(DATASET_LOAD_DELAY) + sleep(FILE_SAVE_DELAY) + return congress_expenses.groupBy("sgpartido").agg( F.sum("vlrliquido").alias("total_expense") ).orderBy(F.desc("total_expense")) - largest_expense_source = congress_expenses.groupBy("txtdescricao").agg( +def find_largest_expense_source(congress_expenses): + """Find the largest source of expense.""" + sleep(DATASET_LOAD_DELAY) + sleep(FILE_SAVE_DELAY) + return congress_expenses.groupBy("txtdescricao").agg( F.sum("vlrliquido").alias("total_expense") ).orderBy(F.desc("total_expense")).limit(1) - return expenses_per_party, largest_expense_source +def find_top_spender_per_party(congress_expenses): + """Find the top-spending congressman for each party.""" + sleep(DATASET_LOAD_DELAY) + sleep(FILE_SAVE_DELAY) + return congress_expenses.groupBy("sgpartido", "txnomeparlamentar").agg( + F.sum("vlrliquido").alias("total_spent") + ).withColumn( + "rank", F.row_number().over(Window.partitionBy("sgpartido").orderBy(F.desc("total_spent"))) + ).filter(F.col("rank") == 1).drop("rank") diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py index 6b3cf3397f..87fa69e607 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py @@ -5,17 +5,33 @@ from kedro.pipeline import Pipeline, node -from .nodes import analyze_expenses +from .nodes import ( + analyze_expenses_per_party, + find_largest_expense_source, + find_top_spender_per_party, +) -def create_pipeline(**kwargs): +def create_pipeline(**kwargs) -> Pipeline: return Pipeline( [ node( - func=analyze_expenses, + func=analyze_expenses_per_party, inputs="congress_expenses", - outputs=["expenses_per_party", "largest_expense_source"], - name="analyze_expenses_node", - ) + outputs="expenses_per_party", + name="analyze_expenses_per_party_node", + ), + node( + func=find_largest_expense_source, + inputs="congress_expenses", + outputs="largest_expense_source", + name="find_largest_expense_source_node", + ), + node( + func=find_top_spender_per_party, + inputs="congress_expenses", + outputs="top_spender_per_party", + name="find_top_spender_per_party_node", + ), ] ) From f505a7fcc1a5950298d2ad2066a93b00cdf60cd4 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Mon, 9 Sep 2024 19:54:31 -0300 Subject: [PATCH 3/8] Use env vars to determine delay Signed-off-by: Laura Couto --- performance-test/src/performance_test/hooks.py | 3 ++- .../src/performance_test/pipelines/expense_analysis/nodes.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/performance-test/src/performance_test/hooks.py b/performance-test/src/performance_test/hooks.py index 40393323aa..ddd857627e 100644 --- a/performance-test/src/performance_test/hooks.py +++ b/performance-test/src/performance_test/hooks.py @@ -1,3 +1,4 @@ +import os from time import sleep from typing import Dict @@ -6,7 +7,7 @@ from pyspark import SparkConf from pyspark.sql import SparkSession -HOOK_DELAY = 0 +HOOK_DELAY = int(os.getenv("HOOKS_DELAY", 0)) class SparkHooks: @hook_impl diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py index b3bc259723..4ac2258068 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py @@ -1,10 +1,11 @@ +import os from time import sleep import pyspark.sql.functions as F from pyspark.sql.window import Window -DATASET_LOAD_DELAY = 0 -FILE_SAVE_DELAY = 0 +DATASET_LOAD_DELAY = int(os.getenv("DATASET_LOAD_DELAY", 0)) +FILE_SAVE_DELAY = int(os.getenv("FILE_SAVE_DELAY", 0)) def analyze_expenses_per_party(congress_expenses): """Calculate total expense per party.""" From eafe4c538d1225c6dedc7972b8939c59cc9f2a3b Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Wed, 11 Sep 2024 16:01:18 -0300 Subject: [PATCH 4/8] Use kedro run --params to determine delays Signed-off-by: Laura Couto --- performance-test/conf/base/parameters.yml | 3 +++ .../src/performance_test/hooks.py | 4 +--- .../pipelines/expense_analysis/nodes.py | 24 +++++++++---------- .../pipelines/expense_analysis/pipeline.py | 6 ++--- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/performance-test/conf/base/parameters.yml b/performance-test/conf/base/parameters.yml index e69de29bb2..f13a955864 100644 --- a/performance-test/conf/base/parameters.yml +++ b/performance-test/conf/base/parameters.yml @@ -0,0 +1,3 @@ +hook_delay: 0 +dataset_load_delay: 0 +file_save_delay: 0 diff --git a/performance-test/src/performance_test/hooks.py b/performance-test/src/performance_test/hooks.py index ddd857627e..611e8373ac 100644 --- a/performance-test/src/performance_test/hooks.py +++ b/performance-test/src/performance_test/hooks.py @@ -1,4 +1,3 @@ -import os from time import sleep from typing import Dict @@ -7,7 +6,6 @@ from pyspark import SparkConf from pyspark.sql import SparkSession -HOOK_DELAY = int(os.getenv("HOOKS_DELAY", 0)) class SparkHooks: @hook_impl @@ -26,7 +24,7 @@ def after_context_created(self, context) -> None: .enableHiveSupport() .config(conf=spark_conf) ) - sleep(HOOK_DELAY) + sleep(context.params['hook_delay']) _spark_session = spark_session_conf.getOrCreate() _spark_session.sparkContext.setLogLevel("WARN") diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py index 4ac2258068..c84a4409b5 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py @@ -1,32 +1,32 @@ -import os from time import sleep import pyspark.sql.functions as F from pyspark.sql.window import Window -DATASET_LOAD_DELAY = int(os.getenv("DATASET_LOAD_DELAY", 0)) -FILE_SAVE_DELAY = int(os.getenv("FILE_SAVE_DELAY", 0)) -def analyze_expenses_per_party(congress_expenses): +def analyze_expenses_per_party(congress_expenses, parameters): """Calculate total expense per party.""" - sleep(DATASET_LOAD_DELAY) - sleep(FILE_SAVE_DELAY) + sleep( parameters["dataset_load_delay"]) + sleep(parameters["file_save_delay"]) + return congress_expenses.groupBy("sgpartido").agg( F.sum("vlrliquido").alias("total_expense") ).orderBy(F.desc("total_expense")) -def find_largest_expense_source(congress_expenses): +def find_largest_expense_source(congress_expenses, parameters): """Find the largest source of expense.""" - sleep(DATASET_LOAD_DELAY) - sleep(FILE_SAVE_DELAY) + sleep( parameters["dataset_load_delay"]) + sleep(parameters["file_save_delay"]) + return congress_expenses.groupBy("txtdescricao").agg( F.sum("vlrliquido").alias("total_expense") ).orderBy(F.desc("total_expense")).limit(1) -def find_top_spender_per_party(congress_expenses): +def find_top_spender_per_party(congress_expenses, parameters): """Find the top-spending congressman for each party.""" - sleep(DATASET_LOAD_DELAY) - sleep(FILE_SAVE_DELAY) + sleep( parameters["dataset_load_delay"]) + sleep(parameters["file_save_delay"]) + return congress_expenses.groupBy("sgpartido", "txnomeparlamentar").agg( F.sum("vlrliquido").alias("total_spent") ).withColumn( diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py index 87fa69e607..d48aeb04fa 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py @@ -17,19 +17,19 @@ def create_pipeline(**kwargs) -> Pipeline: [ node( func=analyze_expenses_per_party, - inputs="congress_expenses", + inputs=["congress_expenses", "parameters"], outputs="expenses_per_party", name="analyze_expenses_per_party_node", ), node( func=find_largest_expense_source, - inputs="congress_expenses", + inputs=["congress_expenses", "parameters"], outputs="largest_expense_source", name="find_largest_expense_source_node", ), node( func=find_top_spender_per_party, - inputs="congress_expenses", + inputs=["congress_expenses", "parameters"], outputs="top_spender_per_party", name="find_top_spender_per_party_node", ), From c5a1ac3382b7bab7cc21ae18d7aa19418227fdcc Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Wed, 11 Sep 2024 19:51:50 -0300 Subject: [PATCH 5/8] Add extra nodes Signed-off-by: Laura Couto --- performance-test/conf/base/catalog.yml | 30 +++++++++++++++++-- .../pipelines/expense_analysis/nodes.py | 30 ++++++++++++++----- .../pipelines/expense_analysis/pipeline.py | 16 +++++++++- 3 files changed, 65 insertions(+), 11 deletions(-) diff --git a/performance-test/conf/base/catalog.yml b/performance-test/conf/base/catalog.yml index d1bc6a7aac..492970a867 100644 --- a/performance-test/conf/base/catalog.yml +++ b/performance-test/conf/base/catalog.yml @@ -8,12 +8,15 @@ congress_expenses: expenses_per_party: type: spark.SparkDataset - filepath: data/output/expenses_per_party.parquet - file_format: parquet + filepath: data/output/expenses_per_party.csv + file_format: csv save_args: sep: ',' header: True mode: overwrite + load_args: + header: True + inferSchema: True largest_expense_source: type: spark.SparkDataset @@ -26,7 +29,28 @@ largest_expense_source: top_spender_per_party: type: spark.SparkDataset - filepath: data/output/top_spender_per_party.parquet + filepath: data/output/top_spender_per_party.csv + file_format: csv + save_args: + sep: ',' + header: True + mode: overwrite + load_args: + header: True + inferSchema: True + +top_overall_spender: + type: spark.SparkDataset + filepath: data/output/top_overall_spender.parquet + file_format: parquet + save_args: + sep: ',' + header: True + mode: overwrite + +top_spending_party: + type: spark.SparkDataset + filepath: data/output/top_spending_party.parquet file_format: parquet save_args: sep: ',' diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py index c84a4409b5..d801fa6696 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/nodes.py @@ -6,29 +6,45 @@ def analyze_expenses_per_party(congress_expenses, parameters): """Calculate total expense per party.""" - sleep( parameters["dataset_load_delay"]) + sleep(parameters["dataset_load_delay"]) sleep(parameters["file_save_delay"]) - return congress_expenses.groupBy("sgpartido").agg( + return congress_expenses.groupBy("sgpartido", "txnomeparlamentar").agg( F.sum("vlrliquido").alias("total_expense") ).orderBy(F.desc("total_expense")) def find_largest_expense_source(congress_expenses, parameters): """Find the largest source of expense.""" - sleep( parameters["dataset_load_delay"]) + sleep(parameters["dataset_load_delay"]) sleep(parameters["file_save_delay"]) return congress_expenses.groupBy("txtdescricao").agg( F.sum("vlrliquido").alias("total_expense") ).orderBy(F.desc("total_expense")).limit(1) -def find_top_spender_per_party(congress_expenses, parameters): +def find_top_spender_per_party(expenses_per_party, parameters): """Find the top-spending congressman for each party.""" - sleep( parameters["dataset_load_delay"]) + sleep(parameters["dataset_load_delay"]) sleep(parameters["file_save_delay"]) - return congress_expenses.groupBy("sgpartido", "txnomeparlamentar").agg( - F.sum("vlrliquido").alias("total_spent") + return expenses_per_party.groupBy("sgpartido", "txnomeparlamentar").agg( + F.sum("total_expense").alias("total_spent") ).withColumn( "rank", F.row_number().over(Window.partitionBy("sgpartido").orderBy(F.desc("total_spent"))) ).filter(F.col("rank") == 1).drop("rank") + +def find_top_overall_spender(top_spender_per_party, parameters): + """Find the overall top spender across all parties.""" + sleep(parameters["dataset_load_delay"]) + sleep(parameters["file_save_delay"]) + + return top_spender_per_party.orderBy(F.desc("total_spent")).limit(1) + +def find_top_spending_party(expenses_per_party, parameters): + """Find the party with the highest total expense.""" + sleep(parameters["dataset_load_delay"]) + sleep(parameters["file_save_delay"]) + + return expenses_per_party.groupBy("sgpartido").agg( + F.sum("total_expense").alias("total_party_expense") + ).orderBy(F.desc("total_party_expense")).limit(1) diff --git a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py index d48aeb04fa..88c574df43 100644 --- a/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py +++ b/performance-test/src/performance_test/pipelines/expense_analysis/pipeline.py @@ -8,7 +8,9 @@ from .nodes import ( analyze_expenses_per_party, find_largest_expense_source, + find_top_overall_spender, find_top_spender_per_party, + find_top_spending_party, ) @@ -29,9 +31,21 @@ def create_pipeline(**kwargs) -> Pipeline: ), node( func=find_top_spender_per_party, - inputs=["congress_expenses", "parameters"], + inputs=["expenses_per_party", "parameters"], outputs="top_spender_per_party", name="find_top_spender_per_party_node", ), + node( + func=find_top_overall_spender, + inputs=["top_spender_per_party", "parameters"], + outputs="top_overall_spender", + name="find_top_overall_spender_node", + ), + node( + func=find_top_spending_party, + inputs=["expenses_per_party", "parameters"], + outputs="top_spending_party", + name="find_top_spending_party_node", + ), ] ) From 6c5ac734607b918976dc365a4776a2fa42bd8edc Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Thu, 19 Sep 2024 12:47:44 -0300 Subject: [PATCH 6/8] Remove redundant function from hooks Signed-off-by: Laura Couto --- performance-test/src/performance_test/hooks.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/performance-test/src/performance_test/hooks.py b/performance-test/src/performance_test/hooks.py index 611e8373ac..69aeb47cd5 100644 --- a/performance-test/src/performance_test/hooks.py +++ b/performance-test/src/performance_test/hooks.py @@ -1,8 +1,6 @@ from time import sleep -from typing import Dict from kedro.framework.hooks import hook_impl -from kedro.pipeline import Pipeline from pyspark import SparkConf from pyspark.sql import SparkSession @@ -27,14 +25,3 @@ def after_context_created(self, context) -> None: sleep(context.params['hook_delay']) _spark_session = spark_session_conf.getOrCreate() _spark_session.sparkContext.setLogLevel("WARN") - - -def register_pipelines(self) -> Dict[str, Pipeline]: - from performance_test.pipelines.expense_analysis import ( - pipeline as expense_analysis_pipeline, - ) - - return { - "__default__": expense_analysis_pipeline.create_pipeline(), - "expense_analysis": expense_analysis_pipeline.create_pipeline(), - } From 60f06ad118a0a37eeadfbec5119a64c0bf48b087 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Mon, 23 Sep 2024 01:06:53 -0300 Subject: [PATCH 7/8] Add usage instructions to readme Signed-off-by: Laura Couto --- performance-test/README.md | 95 +++------------------------------ performance-test/conf/README.md | 20 ------- 2 files changed, 8 insertions(+), 107 deletions(-) delete mode 100644 performance-test/conf/README.md diff --git a/performance-test/README.md b/performance-test/README.md index 64ac211c6f..604f134a1b 100644 --- a/performance-test/README.md +++ b/performance-test/README.md @@ -2,97 +2,18 @@ ## Overview -This is your new Kedro project with PySpark setup, which was generated using `kedro 0.19.8`. +This is a test project meant to simulate delays in specific parts of a Kedro pipeline. It's supposed to be a tool to gauge pipeline performance and be used to compare in-development changes to Kedro with an already stable release version. -Take a look at the [Kedro documentation](https://docs.kedro.org) to get started. +## Usage -## Rules and guidelines +There are three delay parameters that can be set in this project: -In order to get the best out of the template: +**hook_delay** - Simulates slow-loading hooks due to it performing complex operations or accessing external services that can suffer from latency. -* Don't remove any lines from the `.gitignore` file we provide -* Make sure your results can be reproduced by following a [data engineering convention](https://docs.kedro.org/en/stable/faq/faq.html#what-is-data-engineering-convention) -* Don't commit data to your repository -* Don't commit any credentials or your local configuration to your repository. Keep all your credentials and local configuration in `conf/local/` +**dataset_load_delay** - Simulates a delay in loading a dataset, because of a large size or connection latency, for example. -## How to install dependencies +**file_save_delay** - Simulates a delay in saving an output file, because of, for example, connection delay in accessing remote storage. -Declare any dependencies in `requirements.txt` for `pip` installation. +When invoking the `kedro run` command, you can pass the desired value in seconds for each delay as a parameter using the `--params` flag. For example: -To install them, run: - -``` -pip install -r requirements.txt -``` - -## How to run your Kedro pipeline - -You can run your Kedro project with: - -``` -kedro run -``` - -## How to test your Kedro project - -Have a look at the files `src/tests/test_run.py` and `src/tests/pipelines/data_science/test_pipeline.py` for instructions on how to write your tests. Run the tests as follows: - -``` -pytest -``` - -To configure the coverage threshold, look at the `.coveragerc` file. - -## Project dependencies - -To see and update the dependency requirements for your project use `requirements.txt`. Install the project requirements with `pip install -r requirements.txt`. - -[Further information about project dependencies](https://docs.kedro.org/en/stable/kedro_project_setup/dependencies.html#project-specific-dependencies) - -## How to work with Kedro and notebooks - -> Note: Using `kedro jupyter` or `kedro ipython` to run your notebook provides these variables in scope: `catalog`, `context`, `pipelines` and `session`. -> -> Jupyter, JupyterLab, and IPython are already included in the project requirements by default, so once you have run `pip install -r requirements.txt` you will not need to take any extra steps before you use them. - -### Jupyter -To use Jupyter notebooks in your Kedro project, you need to install Jupyter: - -``` -pip install jupyter -``` - -After installing Jupyter, you can start a local notebook server: - -``` -kedro jupyter notebook -``` - -### JupyterLab -To use JupyterLab, you need to install it: - -``` -pip install jupyterlab -``` - -You can also start JupyterLab: - -``` -kedro jupyter lab -``` - -### IPython -And if you want to run an IPython session: - -``` -kedro ipython -``` - -### How to ignore notebook output cells in `git` -To automatically strip out all output cell contents before committing to `git`, you can use tools like [`nbstripout`](https://github.com/kynan/nbstripout). For example, you can add a hook in `.git/config` with `nbstripout --install`. This will run `nbstripout` before anything is committed to `git`. - -> *Note:* Your output cells will be retained locally. - -## Package your Kedro project - -[Further information about building project documentation and packaging your project](https://docs.kedro.org/en/stable/tutorial/package_a_project.html) +`kedro run --params=hook_delay=5,dataset_load_delay=5,file_save_delay=5` diff --git a/performance-test/conf/README.md b/performance-test/conf/README.md deleted file mode 100644 index b135e80c2c..0000000000 --- a/performance-test/conf/README.md +++ /dev/null @@ -1,20 +0,0 @@ -# What is this for? - -This folder should be used to store configuration files used by Kedro or by separate tools. - -This file can be used to provide users with instructions for how to reproduce local configuration with their own credentials. You can edit the file however you like, but you may wish to retain the information below and add your own section in the section titled **Instructions**. - -## Local configuration - -The `local` folder should be used for configuration that is either user-specific (e.g. IDE configuration) or protected (e.g. security keys). - -> *Note:* Please do not check in any local configuration to version control. - -## Base configuration - -The `base` folder is for shared configuration, such as non-sensitive and project-related configuration that may be shared across team members. - -WARNING: Please do not put access credentials in the base configuration folder. - -## Find out more -You can find out more about configuration from the [user guide documentation](https://docs.kedro.org/en/stable/configuration/configuration_basics.html). From 6f24fe0630b2bda66c97d2321534a81fdd11e4f2 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Mon, 23 Sep 2024 16:28:25 -0300 Subject: [PATCH 8/8] Add pyspark to project requirements Signed-off-by: Laura Couto --- performance-test/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/performance-test/requirements.txt b/performance-test/requirements.txt index 224b9f5f94..28a3580693 100644 --- a/performance-test/requirements.txt +++ b/performance-test/requirements.txt @@ -6,6 +6,7 @@ kedro-datasets>=1.0; python_version < "3.9" kedro-viz>=6.7.0 kedro[jupyter] notebook +pyspark ruff~=0.1.8 scikit-learn~=1.5.1; python_version >= "3.9" scikit-learn<=1.4.0,>=1.0; python_version < "3.9"