From f0a470abf84717a022d152f07b79ad1fefecba7d Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 24 Nov 2023 19:35:40 +0000 Subject: [PATCH 01/20] feat(dev): adds validation extension support; - Add config option for resources to require validation. - Only submit to xloader if validation is successful. --- ckanext/xloader/config_declaration.yaml | 8 ++++++ ckanext/xloader/plugin.py | 38 ++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index b31f12e2..06258e4c 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -112,5 +112,13 @@ groups: to True. type: bool required: false + - key: ckanext.xloader.requires_validation + default: False + example: True + description: | + Resources are required to pass validation from the ckanext-validation + plugin to be able to get xloadered. + type: bool + required: false diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index dbde8ed5..2837332d 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -5,6 +5,9 @@ from ckan import plugins from ckan.plugins import toolkit +from ckan.model.domain_object import DomainObjectOperation +from ckan.model.resource import Resource + from . import action, auth, helpers as xloader_helpers, utils from .loader import fulltext_function_exists, get_write_engine @@ -53,7 +56,7 @@ def is_it_an_xloader_format(cls, format_): class xloaderPlugin(plugins.SingletonPlugin): plugins.implements(plugins.IConfigurer) plugins.implements(plugins.IConfigurable) - plugins.implements(plugins.IResourceUrlChange) + plugins.implements(plugins.IDomainObjectModification) plugins.implements(plugins.IActions) plugins.implements(plugins.IAuthFunctions) plugins.implements(plugins.ITemplateHelpers) @@ -94,16 +97,39 @@ def configure(self, config_): ) ) - # IResourceUrlChange + # IDomainObjectModification + + def notify(self, entity, operation): + # type: (ckan.model.Package|ckan.model.Resource, DomainObjectOperation) -> None + """ + Runs before_commit to database for Packages and Resources. + We only want to check for changed Resources for this. + We want to check if values have changed, namely the url. + See: ckan/model/modification.py.DomainObjectModificationExtension + """ + if operation != DomainObjectOperation.changed \ + or not isinstance(entity, Resource): + return + + # If the resource requires validation, stop here if validation + # has not been performed or did not succeed. The Validation + # extension will call resource_patch and this method should + # be called again. However, url_changed will not be in the entity + # once Validation does the patch. + if toolkit.h.plugin_loaded('validation') and \ + toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): + if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': + return + elif not getattr(entity, 'url_changed', False): + return - def notify(self, resource): context = { "ignore_auth": True, } resource_dict = toolkit.get_action("resource_show")( context, { - "id": resource.id, + "id": entity.id, }, ) self._submit_to_xloader(resource_dict) @@ -111,6 +137,10 @@ def notify(self, resource): # IResourceController def after_resource_create(self, context, resource_dict): + if toolkit.h.plugin_loaded('validation') and \ + toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ + resource_dict.get('validation_status', None) != 'success': + return self._submit_to_xloader(resource_dict) def before_resource_show(self, resource_dict): From a97f4adb2417bc4754ca035d7063191eaf1ed1c0 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 24 Nov 2023 19:45:19 +0000 Subject: [PATCH 02/20] feat(dev): logging; - Added debug logging. --- ckanext/xloader/plugin.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 2837332d..232554b4 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -119,6 +119,8 @@ def notify(self, entity, operation): if toolkit.h.plugin_loaded('validation') and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': + log.debug("Skipping xloading resource %s because " + "resource did not pass validation yet.", entity.id) return elif not getattr(entity, 'url_changed', False): return @@ -140,6 +142,8 @@ def after_resource_create(self, context, resource_dict): if toolkit.h.plugin_loaded('validation') and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ resource_dict.get('validation_status', None) != 'success': + log.debug("Skipping xloading resource %s because " + "resource did not pass validation yet.", resource_dict.get('id')) return self._submit_to_xloader(resource_dict) From 028e42bf9222f798668059fa0fea0b1973636d01 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 24 Nov 2023 20:00:35 +0000 Subject: [PATCH 03/20] fix(dev): 2.9 support; - Do not use 2.10+ helper. --- ckanext/xloader/plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 232554b4..41e6f58e 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -116,7 +116,7 @@ def notify(self, entity, operation): # extension will call resource_patch and this method should # be called again. However, url_changed will not be in the entity # once Validation does the patch. - if toolkit.h.plugin_loaded('validation') and \ + if 'validation' in toolkit.config.get('ckan.plugins', []) and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': log.debug("Skipping xloading resource %s because " @@ -139,7 +139,7 @@ def notify(self, entity, operation): # IResourceController def after_resource_create(self, context, resource_dict): - if toolkit.h.plugin_loaded('validation') and \ + if 'validation' in toolkit.config.get('ckan.plugins', []) and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ resource_dict.get('validation_status', None) != 'success': log.debug("Skipping xloading resource %s because " From 77d762ee6934b09afcd5ef3c87aa23daeee7d2e6 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 27 Nov 2023 16:00:11 +0000 Subject: [PATCH 04/20] fix(dev): better conditioning; - Check toolkit action instead of plugin name. --- ckanext/xloader/plugin.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 41e6f58e..982ff01c 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -116,7 +116,7 @@ def notify(self, entity, operation): # extension will call resource_patch and this method should # be called again. However, url_changed will not be in the entity # once Validation does the patch. - if 'validation' in toolkit.config.get('ckan.plugins', []) and \ + if _is_validation_plugin_loaded() and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': log.debug("Skipping xloading resource %s because " @@ -139,7 +139,7 @@ def notify(self, entity, operation): # IResourceController def after_resource_create(self, context, resource_dict): - if 'validation' in toolkit.config.get('ckan.plugins', []) and \ + if _is_validation_plugin_loaded() and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ resource_dict.get('validation_status', None) != 'success': log.debug("Skipping xloading resource %s because " @@ -246,3 +246,11 @@ def get_helpers(self): "xloader_status": xloader_helpers.xloader_status, "xloader_status_description": xloader_helpers.xloader_status_description, } + + +def _is_validation_plugin_loaded(): + try: + toolkit.get_action('resource_validation_show') + except KeyError: + return False + return True From aec22b9d407bff52d01f1be5efc9ac4b7b43bbac Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 27 Nov 2023 16:05:44 +0000 Subject: [PATCH 05/20] feat(comments): added comments; - Added method docs to explain new method. --- ckanext/xloader/plugin.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 982ff01c..b7505743 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -249,6 +249,10 @@ def get_helpers(self): def _is_validation_plugin_loaded(): + """ + Checks the existance of a logic action from the ckanext-validation + plugin, thus supporting any extending of the Validation Plugin class. + """ try: toolkit.get_action('resource_validation_show') except KeyError: From 7e99aa7d08a17ad97095b6e7bdc9ede37c69e669 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 29 Jan 2024 20:11:05 +0000 Subject: [PATCH 06/20] fix(dev): misc feedback; - Fixed typo. - Fixed try/catch. --- ckanext/xloader/plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 00c4552d..d15afeca 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -249,11 +249,11 @@ def get_helpers(self): def _is_validation_plugin_loaded(): """ - Checks the existance of a logic action from the ckanext-validation + Checks the existence of a logic action from the ckanext-validation plugin, thus supporting any extending of the Validation Plugin class. """ try: toolkit.get_action('resource_validation_show') + return True except KeyError: return False - return True From 25ea76ebcd451502339724f64c54d08da71e771a Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 29 Jan 2024 21:02:37 +0000 Subject: [PATCH 07/20] fix(dev): misc fixes; - Prevent POST to upload to DS if validation is required. - flake8 if wrap indents. --- ckanext/xloader/plugin.py | 26 +++++++------------------- ckanext/xloader/utils.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index d15afeca..edaa34bd 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -115,10 +115,10 @@ def notify(self, entity, operation): # extension will call resource_patch and this method should # be called again. However, url_changed will not be in the entity # once Validation does the patch. - if _is_validation_plugin_loaded() and \ - toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): + if utils.is_validation_plugin_loaded() and \ + toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': - log.debug("Skipping xloading resource %s because " + log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", entity.id) return elif not getattr(entity, 'url_changed', False): @@ -138,10 +138,10 @@ def notify(self, entity, operation): # IResourceController def after_resource_create(self, context, resource_dict): - if _is_validation_plugin_loaded() and \ - toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ - resource_dict.get('validation_status', None) != 'success': - log.debug("Skipping xloading resource %s because " + if utils.is_validation_plugin_loaded() and \ + toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ + resource_dict.get('validation_status', None) != 'success': + log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", resource_dict.get('id')) return self._submit_to_xloader(resource_dict) @@ -245,15 +245,3 @@ def get_helpers(self): "xloader_status": xloader_helpers.xloader_status, "xloader_status_description": xloader_helpers.xloader_status_description, } - - -def _is_validation_plugin_loaded(): - """ - Checks the existence of a logic action from the ckanext-validation - plugin, thus supporting any extending of the Validation Plugin class. - """ - try: - toolkit.get_action('resource_validation_show') - return True - except KeyError: - return False diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index ec8e4bbd..3bb20a98 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -9,11 +9,29 @@ from decimal import Decimal import ckan.plugins as p +from ckan.plugins.toolkit import config, h, _ def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST": + if is_validation_plugin_loaded() and \ + p.toolkit.asbool(p.toolkit.config.get('ckanext.xloader.requires_validation')): + context = { + "ignore_auth": True, + } + resource_dict = p.toolkit.get_action("resource_show")( + context, + { + "id": resource_id, + }, + ) + if resource_dict.get('validation_status', None) != 'success': + h.flash_error(_("Cannot upload resource %s to the DataStore " + "because the resource did not pass validation yet.") % resource_id) + return p.toolkit.redirect_to( + "xloader.resource_data", id=id, resource_id=resource_id + ) try: p.toolkit.get_action("xloader_submit")( None, @@ -215,3 +233,15 @@ def type_guess(rows, types=TYPES, strict=False): guesses_tuples = [(t, guess[t]) for t in types if t in guess] _columns.append(max(guesses_tuples, key=lambda t_n: t_n[1])[0]) return _columns + + +def is_validation_plugin_loaded(): + """ + Checks the existence of a logic action from the ckanext-validation + plugin, thus supporting any extending of the Validation Plugin class. + """ + try: + p.toolkit.get_action('resource_validation_show') + return True + except KeyError: + return False From d2720fcf8ec35fb9082a4e48bb12759133dfa903 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 31 Jan 2024 16:37:10 +0000 Subject: [PATCH 08/20] fix(syntax): flake8; - Syntax fixes from flake8. --- ckanext/xloader/plugin.py | 17 +++++++++++------ ckanext/xloader/utils.py | 11 ++++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index edaa34bd..19e88b9b 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -7,6 +7,7 @@ from ckan.model.domain_object import DomainObjectOperation from ckan.model.resource import Resource +from ckan.model.package import Package from . import action, auth, helpers as xloader_helpers, utils @@ -99,7 +100,7 @@ def configure(self, config_): # IDomainObjectModification def notify(self, entity, operation): - # type: (ckan.model.Package|ckan.model.Resource, DomainObjectOperation) -> None + # type: (Package|Resource, DomainObjectOperation) -> None """ Runs before_commit to database for Packages and Resources. We only want to check for changed Resources for this. @@ -107,7 +108,7 @@ def notify(self, entity, operation): See: ckan/model/modification.py.DomainObjectModificationExtension """ if operation != DomainObjectOperation.changed \ - or not isinstance(entity, Resource): + or not isinstance(entity, Resource): return # If the resource requires validation, stop here if validation @@ -116,11 +117,13 @@ def notify(self, entity, operation): # be called again. However, url_changed will not be in the entity # once Validation does the patch. if utils.is_validation_plugin_loaded() and \ - toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): + toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): + if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", entity.id) return + elif not getattr(entity, 'url_changed', False): return @@ -139,11 +142,13 @@ def notify(self, entity, operation): def after_resource_create(self, context, resource_dict): if utils.is_validation_plugin_loaded() and \ - toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ - resource_dict.get('validation_status', None) != 'success': + toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ + resource_dict.get('validation_status', None) != 'success': + log.debug("Skipping xloading resource %s because the " - "resource did not pass validation yet.", resource_dict.get('id')) + "resource did not pass validation yet.", resource_dict.get('id')) return + self._submit_to_xloader(resource_dict) def before_resource_show(self, resource_dict): diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 3bb20a98..7a0fdb87 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -9,14 +9,15 @@ from decimal import Decimal import ckan.plugins as p -from ckan.plugins.toolkit import config, h, _ +from ckan.plugins.toolkit import h, _ def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST": if is_validation_plugin_loaded() and \ - p.toolkit.asbool(p.toolkit.config.get('ckanext.xloader.requires_validation')): + p.toolkit.asbool(p.toolkit.config.get('ckanext.xloader.requires_validation')): + context = { "ignore_auth": True, } @@ -28,7 +29,7 @@ def resource_data(id, resource_id, rows=None): ) if resource_dict.get('validation_status', None) != 'success': h.flash_error(_("Cannot upload resource %s to the DataStore " - "because the resource did not pass validation yet.") % resource_id) + "because the resource did not pass validation yet.") % resource_id) return p.toolkit.redirect_to( "xloader.resource_data", id=id, resource_id=resource_id ) @@ -187,7 +188,7 @@ def type_guess(rows, types=TYPES, strict=False): at_least_one_value = [] for ri, row in enumerate(rows): diff = len(row) - len(guesses) - for _ in range(diff): + for _i in range(diff): typesdict = {} for type in types: typesdict[type] = 0 @@ -213,7 +214,7 @@ def type_guess(rows, types=TYPES, strict=False): else: for i, row in enumerate(rows): diff = len(row) - len(guesses) - for _ in range(diff): + for _i in range(diff): guesses.append(defaultdict(int)) for i, cell in enumerate(row): # add string guess so that we have at least one guess From e8881537ef763db66d182200dbc7a95afcd3f18e Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 2 Feb 2024 17:15:33 +0000 Subject: [PATCH 09/20] feat(dev): logic and schema config option; - Moved logic into util method. - Renamed config options. - Added new config option to enforce validation schema existance. --- ckanext/xloader/config_declaration.yaml | 19 +++++- ckanext/xloader/plugin.py | 21 ++++-- ckanext/xloader/utils.py | 91 +++++++++++++++++-------- 3 files changed, 94 insertions(+), 37 deletions(-) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 06258e4c..2c619bfe 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -112,12 +112,25 @@ groups: to True. type: bool required: false - - key: ckanext.xloader.requires_validation + - key: ckanext.xloader.validation.requires_successful_report default: False example: True description: | - Resources are required to pass validation from the ckanext-validation - plugin to be able to get xloadered. + Resources are required to pass Validation from the ckanext-validation + plugin to be able to get XLoadered. + type: bool + required: false + - key: ckanext.xloader.validation.enforce_schema + default: True + example: False + description: | + Resources are expected to have a Validation Schema, or use the default ones if not. + + If this option is set to `False`, Resources that do not have + a Validation Schema will be treated like they do not require Validation. + + See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema + for more details. type: bool required: false diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 19e88b9b..fb379feb 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -116,7 +116,7 @@ def notify(self, entity, operation): # extension will call resource_patch and this method should # be called again. However, url_changed will not be in the entity # once Validation does the patch. - if utils.is_validation_plugin_loaded() and \ + if utils.awaiting_validation() and \ toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': @@ -136,15 +136,26 @@ def notify(self, entity, operation): "id": entity.id, }, ) + + if utils.awaiting_validation(resource_dict): + # If the resource requires validation, stop here if validation + # has not been performed or did not succeed. The Validation + # extension will call resource_patch and this method should + # be called again. However, url_changed will not be in the entity + # once Validation does the patch. + log.debug("Skipping xloading resource %s because the " + "resource did not pass validation yet.", entity.id) + return + elif not getattr(entity, 'url_changed', False): + # do not submit to xloader if the url has not changed. + return + self._submit_to_xloader(resource_dict) # IResourceController def after_resource_create(self, context, resource_dict): - if utils.is_validation_plugin_loaded() and \ - toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')) and \ - resource_dict.get('validation_status', None) != 'success': - + if utils.awaiting_validation(resource_dict): log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", resource_dict.get('id')) return diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 7a0fdb87..fed161ab 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -11,28 +11,73 @@ import ckan.plugins as p from ckan.plugins.toolkit import h, _ +from logging import getLogger + + +log = getLogger(__name__) + + +def awaiting_validation(res_dict): + """ + Checks the existence of a logic action from the ckanext-validation + plugin, thus supporting any extending of the Validation Plugin class. + + Checks ckanext.xloader.validation.requires_successful_report config + option value. + + Checks ckanext.xloader.validation.enforce_schema config + option value. Then checks the Resource's validation_status. + """ + if not p.toolkit.asbool(p.toolkit.config.get('ckanext.xloader.validation.requires_successful_report', False)): + # validation.requires_successful_report is turned off, return right away + return False + + try: + # check for one of the main actions from ckanext-validation + # in the case that users extend the Validation plugin class + # and rename the plugin entry-point. + p.toolkit.get_action('resource_validation_show') + is_validation_plugin_loaded = True + except KeyError: + is_validation_plugin_loaded = False + + if not is_validation_plugin_loaded: + # the validation plugin is not loaded but required, log a warning + log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.') + return False + + if p.toolkit.asbool(p.toolkit.config.get('ckanext.xloader.validation.enforce_schema', True)): + # validation.enforce_schema is turned on, so we will always look for `validation_status` + if res_dict.get('validation_status', None) != 'success': + return True + + # validation.enforce_schema is turned off, so if the Resource + # does not have a Validation Schema, we will treat it like + # it does not require Validation. + return False + def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST": - if is_validation_plugin_loaded() and \ - p.toolkit.asbool(p.toolkit.config.get('ckanext.xloader.requires_validation')): - - context = { - "ignore_auth": True, - } - resource_dict = p.toolkit.get_action("resource_show")( - context, - { - "id": resource_id, - }, + + context = { + "ignore_auth": True, + } + resource_dict = p.toolkit.get_action("resource_show")( + context, + { + "id": resource_id, + }, + ) + + if awaiting_validation(resource_dict): + h.flash_error(_("Cannot upload resource %s to the DataStore " + "because the resource did not pass validation yet.") % resource_id) + return p.toolkit.redirect_to( + "xloader.resource_data", id=id, resource_id=resource_id ) - if resource_dict.get('validation_status', None) != 'success': - h.flash_error(_("Cannot upload resource %s to the DataStore " - "because the resource did not pass validation yet.") % resource_id) - return p.toolkit.redirect_to( - "xloader.resource_data", id=id, resource_id=resource_id - ) + try: p.toolkit.get_action("xloader_submit")( None, @@ -234,15 +279,3 @@ def type_guess(rows, types=TYPES, strict=False): guesses_tuples = [(t, guess[t]) for t in types if t in guess] _columns.append(max(guesses_tuples, key=lambda t_n: t_n[1])[0]) return _columns - - -def is_validation_plugin_loaded(): - """ - Checks the existence of a logic action from the ckanext-validation - plugin, thus supporting any extending of the Validation Plugin class. - """ - try: - p.toolkit.get_action('resource_validation_show') - return True - except KeyError: - return False From 4612484aaa43a0fdf98b273b3f041fd19f8e8545 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 2 Feb 2024 18:16:29 +0000 Subject: [PATCH 10/20] feat(dev): better logic and tests; - Added automated tests. - Changed the logic to be better and more clear. --- ckanext/xloader/plugin.py | 16 ------ ckanext/xloader/tests/test_plugin.py | 82 ++++++++++++++++++++++++++++ ckanext/xloader/utils.py | 19 ++++--- 3 files changed, 93 insertions(+), 24 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 30510684..1fc10d12 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -82,22 +82,6 @@ def notify(self, entity, operation): or not isinstance(entity, Resource): return - # If the resource requires validation, stop here if validation - # has not been performed or did not succeed. The Validation - # extension will call resource_patch and this method should - # be called again. However, url_changed will not be in the entity - # once Validation does the patch. - if utils.awaiting_validation() and \ - toolkit.asbool(toolkit.config.get('ckanext.xloader.requires_validation')): - - if entity.__dict__.get('extras', {}).get('validation_status', None) != 'success': - log.debug("Skipping xloading resource %s because the " - "resource did not pass validation yet.", entity.id) - return - - elif not getattr(entity, 'url_changed', False): - return - context = { "ignore_auth": True, } diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index 05b83b5b..7b215c78 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -58,6 +58,88 @@ def test_submit_when_url_changes(self, monkeypatch): assert func.called + @pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True) + def test_require_validation(self, monkeypatch): + func = mock.Mock() + monkeypatch.setitem(_actions, "xloader_submit", func) + + mock_resource_validation_show = mock.Mock() + monkeypatch.setitem(_actions, "resource_validation_show", mock_resource_validation_show) + + dataset = factories.Dataset() + + resource = helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + format="CSV", + validation_status='failure', + ) + + assert not func.called # because of the validation_status not being `success` + func.called = None # reset + + helpers.call_action( + "resource_update", + {}, + id=resource["id"], + package_id=dataset["id"], + url="http://example.com/file2.csv", + format="CSV", + validation_status='success', + ) + + assert func.called # because of the validation_status is `success` + + @pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True) + @pytest.mark.ckan_config("ckanext.xloader.validation.enforce_schema", False) + def test_enforce_validation_schema(self, monkeypatch): + func = mock.Mock() + monkeypatch.setitem(_actions, "xloader_submit", func) + + mock_resource_validation_show = mock.Mock() + monkeypatch.setitem(_actions, "resource_validation_show", mock_resource_validation_show) + + dataset = factories.Dataset() + + resource = helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + schema='', + validation_status='', + ) + + assert func.called # because of the schema being empty + func.called = None # reset + + helpers.call_action( + "resource_update", + {}, + id=resource["id"], + package_id=dataset["id"], + url="http://example.com/file2.csv", + schema='https://example.com/schema.json', + validation_status='failure', + ) + + assert not func.called # because of the validation_status not being `success` and there is a schema + func.called = None # reset + + helpers.call_action( + "resource_update", + {}, + package_id=dataset["id"], + id=resource["id"], + url="http://example.com/file3.csv", + schema='https://example.com/schema.json', + validation_status='success', + ) + + assert func.called # because of the validation_status is `success` and there is a schema + def _pending_task(self, resource_id): return { "entity_id": resource_id, diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 7a1ed524..c73ce323 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -76,14 +76,17 @@ def awaiting_validation(res_dict): log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.') return False - if p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)): - # validation.enforce_schema is turned on, so we will always look for `validation_status` - if res_dict.get('validation_status', None) != 'success': - return True - - # validation.enforce_schema is turned off, so if the Resource - # does not have a Validation Schema, we will treat it like - # it does not require Validation. + if p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)) \ + and res_dict.get('validation_status', None) != 'success': + # validation.enforce_schema is turned on, and there is no successful report. + return True + + elif res_dict.get('schema', None) and res_dict.get('validation_status', None) != 'success': + # validation.enforce_schema is turned off, and there is a Validation Schema and no successful report. + return True + + # at this point, we can assume that the Resource is not waiting for Validation. + # or that the Resource does not have a Validation Schema and we are not enforcing schemas. return False From 8ac8db506783e95454adad0eb511aa792eebd5fa Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 2 Feb 2024 18:21:44 +0000 Subject: [PATCH 11/20] fix(logic): fixed some logic; - Fixed a logic case. --- ckanext/xloader/utils.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index c73ce323..109559ea 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -76,10 +76,12 @@ def awaiting_validation(res_dict): log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.') return False - if p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)) \ - and res_dict.get('validation_status', None) != 'success': - # validation.enforce_schema is turned on, and there is no successful report. - return True + if p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)): + # validation.enforce_schema is turned on, explicitly check for the `validation_status` + if res_dict.get('validation_status', None) != 'success': + return True + else: + return False elif res_dict.get('schema', None) and res_dict.get('validation_status', None) != 'success': # validation.enforce_schema is turned off, and there is a Validation Schema and no successful report. From 1ac809063a13c09fe0c715be7f7f36360a957ad8 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 5 Feb 2024 15:04:00 +0000 Subject: [PATCH 12/20] fix(syntax): made better; - Better conditional syntax. --- ckanext/xloader/utils.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 109559ea..cb3f247e 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -76,15 +76,11 @@ def awaiting_validation(res_dict): log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.') return False - if p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)): - # validation.enforce_schema is turned on, explicitly check for the `validation_status` - if res_dict.get('validation_status', None) != 'success': - return True - else: - return False + if (p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)) + or res_dict.get('schema', None)) and res_dict.get('validation_status', None) != 'success': - elif res_dict.get('schema', None) and res_dict.get('validation_status', None) != 'success': - # validation.enforce_schema is turned off, and there is a Validation Schema and no successful report. + # either validation.enforce_schema is turned on or it is off and there is not schema to enfroce, + # we then explicitly check for the `validation_status` report to be `success`` return True # at this point, we can assume that the Resource is not waiting for Validation. From 1761ed5d09a179886899e4d2a5aec278bb9b77d5 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 5 Feb 2024 20:18:17 +0000 Subject: [PATCH 13/20] fix(comments): fixed inline comments; - Fixed inline comments to make more sense. --- ckanext/xloader/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index cb3f247e..eaad84ac 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -79,7 +79,7 @@ def awaiting_validation(res_dict): if (p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)) or res_dict.get('schema', None)) and res_dict.get('validation_status', None) != 'success': - # either validation.enforce_schema is turned on or it is off and there is not schema to enfroce, + # either validation.enforce_schema is turned on or it is off and there is a schema, # we then explicitly check for the `validation_status` report to be `success`` return True From e182eb702d02cbef96f7e61d91bb4957b17a0c49 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Tue, 6 Feb 2024 18:05:43 +0000 Subject: [PATCH 14/20] feat(dev): started doing sync mode; - Started doing sync mode for xloader right after validation. --- ckanext/xloader/action.py | 7 +++++ ckanext/xloader/config_declaration.yaml | 8 ++++++ ckanext/xloader/plugin.py | 13 ++++++++- ckanext/xloader/schema.py | 1 + ckanext/xloader/utils.py | 35 +++++++++++++++++++++++++ 5 files changed, 63 insertions(+), 1 deletion(-) diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index aabc8148..2c62c824 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -40,6 +40,10 @@ def xloader_submit(context, data_dict): :param ignore_hash: If set to True, the xloader will reload the file even if it haven't changed. (optional, default: False) :type ignore_hash: bool + :param sync_mode: If set to True, the xloader callback will be executed right + away, instead of a job being enqueued. It will also delete any existing jobs + for the given resource. (optional, default: False) + :type sync_mode: bool Returns ``True`` if the job has been submitted and ``False`` if the job has not been submitted, i.e. when ckanext-xloader is not configured. @@ -53,6 +57,9 @@ def xloader_submit(context, data_dict): p.toolkit.check_access('xloader_submit', context, data_dict) + sync_mode = data_dict.pop('sync_mode', False) + #TODO: implement the sync_mode logic + res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 89114783..cd702e81 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -138,6 +138,14 @@ groups: See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema for more details. + - key: ckanext.xloader.validation.chain_xloader + default: True + example: False + description: | + Resources that pass Validation will immediately get XLoadered instead of having + a job enqueued for it. + + If this option is set to `False`, jobs will be enqueued like normal. - key: ckanext.xloader.clean_datastore_tables default: False example: True diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 801c25c2..b939b76b 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -104,6 +104,12 @@ def notify(self, entity, operation): log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", entity.id) return + elif utils.do_chain_after_validation(resource_dict.get('id')): + # At this point, the Resource has passed validation requirements, + # and chainging is turned on. We will execute XLoader right away, + # instead of enqueueing a job. + self._submit_to_xloader(resource_dict, sync_mode=True) + return elif not getattr(entity, 'url_changed', False): # do not submit to xloader if the url has not changed. return @@ -118,6 +124,10 @@ def after_resource_create(self, context, resource_dict): "resource did not pass validation yet.", resource_dict.get('id')) return + if utils.do_chain_after_validation(resource_dict.get('id')): + self._submit_to_xloader(resource_dict, sync_mode=True) + return + self._submit_to_xloader(resource_dict) def before_resource_show(self, resource_dict): @@ -160,7 +170,7 @@ def before_show(self, resource_dict): def after_update(self, context, resource_dict): self.after_resource_update(context, resource_dict) - def _submit_to_xloader(self, resource_dict): + def _submit_to_xloader(self, resource_dict, sync_mode=False): context = {"ignore_auth": True, "defer_commit": True} if not XLoaderFormats.is_it_an_xloader_format(resource_dict["format"]): log.debug( @@ -187,6 +197,7 @@ def _submit_to_xloader(self, resource_dict): { "resource_id": resource_dict["id"], "ignore_hash": self.ignore_hash, + "sync_mode": sync_mode, }, ) except toolkit.ValidationError as e: diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index c0e8d938..2eac0baf 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -29,6 +29,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], + 'sync_mode': [ignore_missing, boolean_validator], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index eaad84ac..e7ccf923 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -2,6 +2,7 @@ import json import datetime +from rq import get_current_job from ckan import model from ckan.lib import search @@ -48,6 +49,7 @@ def is_it_an_xloader_format(cls, format_): def awaiting_validation(res_dict): + # type: (dict) -> bool """ Checks the existence of a logic action from the ckanext-validation plugin, thus supporting any extending of the Validation Plugin class. @@ -88,6 +90,39 @@ def awaiting_validation(res_dict): return False +def do_chain_after_validation(resource_id): + # type: (str) -> bool + if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)): + # we are not requiring resources to pass validation + return False + + if not p.toolkit.asbool(config.get('ckanext.xloader.validation.chain_xloader', True)): + # we are not chaining validation to xloader + return False + + current_job = get_current_job() + + if not current_job: + # we are outside of the job context, thus not running a job + return False + + if current_job.func_name != 'ckanext.validation.jobs.run_validation_job': + # the current running job is not the ckanext-validation validate job + #FIXME: how to do a better check for the caller in the stack?? + return False + + try: + job_rid = current_job.args[0].get('id', None) + except (KeyError): + job_rid = None + if resource_id != job_rid: + # the current running job's Resource ID is not + # the same as the passed Resource ID + return False + + return True + + def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST": From b386e0e4248c55eb062c430491e2e48073ac9436 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 7 Feb 2024 15:25:16 +0000 Subject: [PATCH 15/20] feat(dev): sync mode cont.; - Continued sync mode chaining from validation. --- ckanext/xloader/action.py | 34 +++++++++++++++++++++------------- ckanext/xloader/plugin.py | 27 ++++++++++++++++++--------- ckanext/xloader/schema.py | 2 +- ckanext/xloader/utils.py | 22 ++++++++++------------ 4 files changed, 50 insertions(+), 35 deletions(-) diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index 2c62c824..dd9daef0 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -40,10 +40,10 @@ def xloader_submit(context, data_dict): :param ignore_hash: If set to True, the xloader will reload the file even if it haven't changed. (optional, default: False) :type ignore_hash: bool - :param sync_mode: If set to True, the xloader callback will be executed right + :param sync: If set to True, the xloader callback will be executed right away, instead of a job being enqueued. It will also delete any existing jobs for the given resource. (optional, default: False) - :type sync_mode: bool + :type sync: bool Returns ``True`` if the job has been submitted and ``False`` if the job has not been submitted, i.e. when ckanext-xloader is not configured. @@ -57,8 +57,7 @@ def xloader_submit(context, data_dict): p.toolkit.check_access('xloader_submit', context, data_dict) - sync_mode = data_dict.pop('sync_mode', False) - #TODO: implement the sync_mode logic + sync = data_dict.pop('sync', False) res_id = data_dict['resource_id'] try: @@ -166,17 +165,26 @@ def xloader_submit(context, data_dict): log.debug("Timeout for XLoading resource %s is %s", res_id, timeout) try: - job = enqueue_job( - jobs.xloader_data_into_datastore, [data], - title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), - rq_kwargs=dict(timeout=timeout) - ) + if sync: + jobs.xloader_data_into_datastore(data) + else: + job = enqueue_job( + jobs.xloader_data_into_datastore, [data], + title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), + rq_kwargs=dict(timeout=timeout) + ) except Exception: - log.exception('Unable to enqueued xloader res_id=%s', res_id) + if sync: + log.exception('Unable to xloader res_id=%s', res_id) + else: + log.exception('Unable to enqueued xloader res_id=%s', res_id) return False - log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) - - value = json.dumps({'job_id': job.id}) + if sync: + log.debug('Ran xloader in sync mode res_id=%s', res_id) + value = json.dumps({'job_id': 'sync.%s' % res_id}) + else: + log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) + value = json.dumps({'job_id': job.id}) task['value'] = value task['state'] = 'pending' diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index b939b76b..715bb9e9 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -104,11 +104,15 @@ def notify(self, entity, operation): log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", entity.id) return - elif utils.do_chain_after_validation(resource_dict.get('id')): + elif utils.do_chain_after_validation(resource_dict): + #FIXME: when validation does resource_patch, the url_changed will + # always be false. But if we do the sync submit here, we will always + # be in a loop... + # At this point, the Resource has passed validation requirements, # and chainging is turned on. We will execute XLoader right away, # instead of enqueueing a job. - self._submit_to_xloader(resource_dict, sync_mode=True) + self._submit_to_xloader(resource_dict, sync=True) return elif not getattr(entity, 'url_changed', False): # do not submit to xloader if the url has not changed. @@ -124,8 +128,8 @@ def after_resource_create(self, context, resource_dict): "resource did not pass validation yet.", resource_dict.get('id')) return - if utils.do_chain_after_validation(resource_dict.get('id')): - self._submit_to_xloader(resource_dict, sync_mode=True) + if utils.do_chain_after_validation(resource_dict): + self._submit_to_xloader(resource_dict, sync=True) return self._submit_to_xloader(resource_dict) @@ -170,7 +174,7 @@ def before_show(self, resource_dict): def after_update(self, context, resource_dict): self.after_resource_update(context, resource_dict) - def _submit_to_xloader(self, resource_dict, sync_mode=False): + def _submit_to_xloader(self, resource_dict, sync=False): context = {"ignore_auth": True, "defer_commit": True} if not XLoaderFormats.is_it_an_xloader_format(resource_dict["format"]): log.debug( @@ -189,15 +193,20 @@ def _submit_to_xloader(self, resource_dict, sync_mode=False): return try: - log.debug( - "Submitting resource %s to be xloadered", resource_dict["id"] - ) + if sync: + log.debug( + "xloadering resource %s in sync mode", resource_dict["id"] + ) + else: + log.debug( + "Submitting resource %s to be xloadered", resource_dict["id"] + ) toolkit.get_action("xloader_submit")( context, { "resource_id": resource_dict["id"], "ignore_hash": self.ignore_hash, - "sync_mode": sync_mode, + "sync": sync, }, ) except toolkit.ValidationError as e: diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index 2eac0baf..dce9db63 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -29,7 +29,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], - 'sync_mode': [ignore_missing, boolean_validator], + 'sync': [ignore_missing, boolean_validator], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index e7ccf923..8730f5b3 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -90,20 +90,18 @@ def awaiting_validation(res_dict): return False -def do_chain_after_validation(resource_id): - # type: (str) -> bool - if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)): - # we are not requiring resources to pass validation - return False - - if not p.toolkit.asbool(config.get('ckanext.xloader.validation.chain_xloader', True)): - # we are not chaining validation to xloader - return False +def do_chain_after_validation(res_dict): + # type: (dict) -> bool current_job = get_current_job() - if not current_job: - # we are outside of the job context, thus not running a job + if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)) \ + or not p.toolkit.asbool(config.get('ckanext.xloader.validation.chain_xloader', True)) \ + or not current_job: + + # we are not requiring resources to pass validation + # OR we are not chaining validation to xloader + # OR we are outside of the job context, thus not running a job return False if current_job.func_name != 'ckanext.validation.jobs.run_validation_job': @@ -115,7 +113,7 @@ def do_chain_after_validation(resource_id): job_rid = current_job.args[0].get('id', None) except (KeyError): job_rid = None - if resource_id != job_rid: + if res_dict.get('id', None) != job_rid: # the current running job's Resource ID is not # the same as the passed Resource ID return False From 3200483d2c4942643b816f2b4e961eb5a5abd916 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Wed, 7 Feb 2024 19:38:51 +0000 Subject: [PATCH 16/20] feat(dev): sync mode cont.; - Continued sync mode chaining from validation. --- ckanext/xloader/action.py | 22 +++++++++------------- ckanext/xloader/plugin.py | 8 ++------ ckanext/xloader/utils.py | 2 +- 3 files changed, 12 insertions(+), 20 deletions(-) diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index dd9daef0..f47aac48 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -165,26 +165,22 @@ def xloader_submit(context, data_dict): log.debug("Timeout for XLoading resource %s is %s", res_id, timeout) try: - if sync: - jobs.xloader_data_into_datastore(data) - else: - job = enqueue_job( - jobs.xloader_data_into_datastore, [data], - title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), - rq_kwargs=dict(timeout=timeout) - ) + job = enqueue_job( + jobs.xloader_data_into_datastore, [data], + title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), + rq_kwargs=dict(timeout=timeout, at_front=sync) + ) except Exception: if sync: log.exception('Unable to xloader res_id=%s', res_id) else: log.exception('Unable to enqueued xloader res_id=%s', res_id) return False + log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) + value = json.dumps({'job_id': job.id}) + if sync: - log.debug('Ran xloader in sync mode res_id=%s', res_id) - value = json.dumps({'job_id': 'sync.%s' % res_id}) - else: - log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) - value = json.dumps({'job_id': job.id}) + log.debug('Pushed xloader sync mode job=%s res_id=%s to front of queue', job.id, res_id) task['value'] = value task['state'] = 'pending' diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 715bb9e9..09e1cbc4 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -105,13 +105,9 @@ def notify(self, entity, operation): "resource did not pass validation yet.", entity.id) return elif utils.do_chain_after_validation(resource_dict): - #FIXME: when validation does resource_patch, the url_changed will - # always be false. But if we do the sync submit here, we will always - # be in a loop... - # At this point, the Resource has passed validation requirements, - # and chainging is turned on. We will execute XLoader right away, - # instead of enqueueing a job. + # and chaining is turned on. We will execute XLoader right away, + # inside of the Validation job, instead of enqueueing a job. self._submit_to_xloader(resource_dict, sync=True) return elif not getattr(entity, 'url_changed', False): diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 8730f5b3..89518a62 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -115,7 +115,7 @@ def do_chain_after_validation(res_dict): job_rid = None if res_dict.get('id', None) != job_rid: # the current running job's Resource ID is not - # the same as the passed Resource ID + # the same as the passed Resource's ID return False return True From 4fbdb0d1c1de307b52ac40c8f330f999a79db9dd Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 16 May 2024 19:15:11 +0000 Subject: [PATCH 17/20] feat(dev): IPipeValidation implementation; - Implement experimental `IPipeValidation` implement. --- ckanext/xloader/config_declaration.yaml | 8 ----- ckanext/xloader/plugin.py | 44 +++++++++++++++---------- ckanext/xloader/utils.py | 37 +++------------------ 3 files changed, 32 insertions(+), 57 deletions(-) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index cd702e81..89114783 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -138,14 +138,6 @@ groups: See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema for more details. - - key: ckanext.xloader.validation.chain_xloader - default: True - example: False - description: | - Resources that pass Validation will immediately get XLoadered instead of having - a job enqueued for it. - - If this option is set to `False`, jobs will be enqueued like normal. - key: ckanext.xloader.clean_datastore_tables default: False example: True diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 09e1cbc4..60f0a97e 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -12,6 +12,12 @@ from . import action, auth, helpers as xloader_helpers, utils from ckanext.xloader.utils import XLoaderFormats +try: + from ckanext.validation.interfaces import IPipeValidation + HAS_IPIPE_VALIDATION = True +except ImportError: + HAS_IPIPE_VALIDATION = False + try: config_declarations = toolkit.blanket.config_declarations except AttributeError: @@ -34,6 +40,8 @@ class xloaderPlugin(plugins.SingletonPlugin): plugins.implements(plugins.IResourceController, inherit=True) plugins.implements(plugins.IClick) plugins.implements(plugins.IBlueprint) + if HAS_IPIPE_VALIDATION: + plugins.implements(IPipeValidation) # IClick def get_commands(self): @@ -68,6 +76,23 @@ def configure(self, config_): ) ) + # IPipeValidation + + def receive_validation_report(self, validation_report): + if utils.requires_successful_validation_report(): + res_dict = toolkit.get_action('resource_show')({'ignore_auth': True}, + {'id': validation_report.get('resource_id')}) + if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True)) + or res_dict.get('schema', None)) and validation_report.get('status') != 'success': + # either validation.enforce_schema is turned on or it is off and there is a schema, + # we then explicitly check for the `validation_status` report to be `success` + return + # if validation is running in async mode, it is running from the redis workers. + # thus we need to do sync=True to have Xloader put the job at the front of the queue. + sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True)) + self._submit_to_xloader(res_dict, sync=sync) + + # IDomainObjectModification def notify(self, entity, operation): @@ -95,21 +120,10 @@ def notify(self, entity, operation): if _should_remove_unsupported_resource_from_datastore(resource_dict): toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id]) - if utils.awaiting_validation(resource_dict): - # If the resource requires validation, stop here if validation - # has not been performed or did not succeed. The Validation - # extension will call resource_patch and this method should - # be called again. However, url_changed will not be in the entity - # once Validation does the patch. + if utils.requires_successful_validation_report(): log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", entity.id) return - elif utils.do_chain_after_validation(resource_dict): - # At this point, the Resource has passed validation requirements, - # and chaining is turned on. We will execute XLoader right away, - # inside of the Validation job, instead of enqueueing a job. - self._submit_to_xloader(resource_dict, sync=True) - return elif not getattr(entity, 'url_changed', False): # do not submit to xloader if the url has not changed. return @@ -119,15 +133,11 @@ def notify(self, entity, operation): # IResourceController def after_resource_create(self, context, resource_dict): - if utils.awaiting_validation(resource_dict): + if utils.requires_successful_validation_report(): log.debug("Skipping xloading resource %s because the " "resource did not pass validation yet.", resource_dict.get('id')) return - if utils.do_chain_after_validation(resource_dict): - self._submit_to_xloader(resource_dict, sync=True) - return - self._submit_to_xloader(resource_dict) def before_resource_show(self, resource_dict): diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 89518a62..3ed75055 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -48,6 +48,10 @@ def is_it_an_xloader_format(cls, format_): return format_.lower() in cls._formats +def requires_successful_validation_report(): + return p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)) + + def awaiting_validation(res_dict): # type: (dict) -> bool """ @@ -60,7 +64,7 @@ def awaiting_validation(res_dict): Checks ckanext.xloader.validation.enforce_schema config option value. Then checks the Resource's validation_status. """ - if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)): + if not requires_successful_validation_report(): # validation.requires_successful_report is turned off, return right away return False @@ -90,37 +94,6 @@ def awaiting_validation(res_dict): return False -def do_chain_after_validation(res_dict): - # type: (dict) -> bool - - current_job = get_current_job() - - if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)) \ - or not p.toolkit.asbool(config.get('ckanext.xloader.validation.chain_xloader', True)) \ - or not current_job: - - # we are not requiring resources to pass validation - # OR we are not chaining validation to xloader - # OR we are outside of the job context, thus not running a job - return False - - if current_job.func_name != 'ckanext.validation.jobs.run_validation_job': - # the current running job is not the ckanext-validation validate job - #FIXME: how to do a better check for the caller in the stack?? - return False - - try: - job_rid = current_job.args[0].get('id', None) - except (KeyError): - job_rid = None - if res_dict.get('id', None) != job_rid: - # the current running job's Resource ID is not - # the same as the passed Resource's ID - return False - - return True - - def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST": From 27d98cff2efb52fccd0845f8717ab905f11becc0 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 16 May 2024 19:29:29 +0000 Subject: [PATCH 18/20] fix(tests): validation req tests; - Cannot do tests without `IPipeValidation`. --- ckanext/xloader/tests/test_plugin.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index ec62e1e7..f22dafbd 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -79,6 +79,7 @@ def test_require_validation(self, monkeypatch): validation_status='failure', ) + # TODO: test IPipeValidation assert not func.called # because of the validation_status not being `success` func.called = None # reset @@ -92,7 +93,8 @@ def test_require_validation(self, monkeypatch): validation_status='success', ) - assert func.called # because of the validation_status is `success` + # TODO: test IPipeValidation + assert not func.called # because of the validation_status is `success` @pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True) @pytest.mark.ckan_config("ckanext.xloader.validation.enforce_schema", False) @@ -114,7 +116,8 @@ def test_enforce_validation_schema(self, monkeypatch): validation_status='', ) - assert func.called # because of the schema being empty + # TODO: test IPipeValidation + assert not func.called # because of the schema being empty func.called = None # reset helpers.call_action( @@ -127,6 +130,7 @@ def test_enforce_validation_schema(self, monkeypatch): validation_status='failure', ) + # TODO: test IPipeValidation assert not func.called # because of the validation_status not being `success` and there is a schema func.called = None # reset @@ -140,7 +144,8 @@ def test_enforce_validation_schema(self, monkeypatch): validation_status='success', ) - assert func.called # because of the validation_status is `success` and there is a schema + # TODO: test IPipeValidation + assert not func.called # because of the validation_status is `success` and there is a schema @pytest.mark.parametrize("toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result", [ # Test1: Should pass as it is an upload with an active datastore entry but an unsupported format From 378f69f054c122cbb2fa15e0762b1f910578e4eb Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 12 Jul 2024 20:17:42 +0000 Subject: [PATCH 19/20] fix(misc): comments and messages; - Clearer comments. - Clearer log messages. --- ckanext/xloader/plugin.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 60f0a97e..09d138a7 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -84,8 +84,7 @@ def receive_validation_report(self, validation_report): {'id': validation_report.get('resource_id')}) if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True)) or res_dict.get('schema', None)) and validation_report.get('status') != 'success': - # either validation.enforce_schema is turned on or it is off and there is a schema, - # we then explicitly check for the `validation_status` report to be `success` + # A schema is present, or required to be present return # if validation is running in async mode, it is running from the redis workers. # thus we need to do sync=True to have Xloader put the job at the front of the queue. @@ -121,7 +120,7 @@ def notify(self, entity, operation): toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id]) if utils.requires_successful_validation_report(): - log.debug("Skipping xloading resource %s because the " + log.debug("Deferring xloading resource %s because the " "resource did not pass validation yet.", entity.id) return elif not getattr(entity, 'url_changed', False): @@ -134,7 +133,7 @@ def notify(self, entity, operation): def after_resource_create(self, context, resource_dict): if utils.requires_successful_validation_report(): - log.debug("Skipping xloading resource %s because the " + log.debug("Deferring xloading resource %s because the " "resource did not pass validation yet.", resource_dict.get('id')) return From 6070740ea65245bbb0e40938b10cd063d5c63bb9 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 9 Aug 2024 14:07:35 +0000 Subject: [PATCH 20/20] fix(logic): ignore not sysadmin; - Added `ignore_not_sysadmin` validator to the sync key. --- ckanext/xloader/action.py | 8 ++++---- ckanext/xloader/schema.py | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index aa7b94e3..e92c2fde 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -40,10 +40,6 @@ def xloader_submit(context, data_dict): :param ignore_hash: If set to True, the xloader will reload the file even if it haven't changed. (optional, default: False) :type ignore_hash: bool - :param sync: If set to True, the xloader callback will be executed right - away, instead of a job being enqueued. It will also delete any existing jobs - for the given resource. (optional, default: False) - :type sync: bool Returns ``True`` if the job has been submitted and ``False`` if the job has not been submitted, i.e. when ckanext-xloader is not configured. @@ -59,7 +55,11 @@ def xloader_submit(context, data_dict): p.toolkit.check_access('xloader_submit', context, data_dict) + # If sync is set to True, the xloader callback will be executed right + # away, instead of a job being enqueued. It will also delete any existing jobs + # for the given resource. This is only controlled by sysadmins or the system. sync = data_dict.pop('sync', False) + res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index dce9db63..47ae65a5 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -16,6 +16,7 @@ boolean_validator = get_validator('boolean_validator') int_validator = get_validator('int_validator') OneOf = get_validator('OneOf') +ignore_not_sysadmin = get_validator('ignore_not_sysadmin') if p.toolkit.check_ckan_version('2.9'): unicode_safe = get_validator('unicode_safe') @@ -29,7 +30,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], - 'sync': [ignore_missing, boolean_validator], + 'sync': [ignore_missing, boolean_validator, ignore_not_sysadmin], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] }