diff --git a/CHANGELOG b/CHANGELOG index a9e2b33..ce78ef7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,5 @@ +0.8.0 + - feat: add job that uploads preview images to S3 0.7.1 - fix: make bg jobs work with dcor_schemas 0.18.2 - ref: cleanup diff --git a/README.rst b/README.rst index c13f79e..818a677 100644 --- a/README.rst +++ b/README.rst @@ -13,6 +13,7 @@ This extensione implements: - A default view for RT-DC (mimetype) resources - A background job that generates the preview image +- A background job that uploads preview images to the S3 object store - A route that makes the preview image available via "/dataset/{id}/resource/{resource_id}/preview.jpg" @@ -40,14 +41,7 @@ Add this extension to the plugins and defaul_views in ckan.ini: Testing ------- -If CKAN/DCOR is installed and setup for testing, this extension can -be tested with pytest: - -:: - - pytest ckanext - -Testing can also be done via vagrant in a virtualmachine using the +Testing can be done via vagrant in a virtual machine using the `dcor-test ` image. Make sure that `vagrant` and `virtualbox` are installed and run the following commands in the root of this repository: diff --git a/ckanext/dc_view/jobs.py b/ckanext/dc_view/jobs.py index 87a04b8..ed7d660 100644 --- a/ckanext/dc_view/jobs.py +++ b/ckanext/dc_view/jobs.py @@ -4,8 +4,11 @@ import shutil import tempfile +import ckan.plugins.toolkit as toolkit import dclab -from dcor_shared import DC_MIME_TYPES, get_resource_path, wait_for_resource +from dcor_shared import ( + DC_MIME_TYPES, s3, sha256sum, get_ckan_config_option, get_resource_path, + wait_for_resource) import numpy as np # Create a temporary matplotlib config directory which is removed on exit @@ -19,6 +22,10 @@ import matplotlib.pylab as plt # noqa: E402 +def admin_context(): + return {'ignore_auth': True, 'user': 'default'} + + def create_preview_job(resource, override=False): """Generate a *_preview.png file for a DC resource""" path = get_resource_path(resource["id"]) @@ -33,6 +40,28 @@ def create_preview_job(resource, override=False): return False +def migrate_preview_to_s3_job(resource): + """Migrate a preview image to the S3 object store""" + path = get_resource_path(resource["id"]) + path_prev = path.with_name(path.name + "_preview.jpg") + ds_dict = toolkit.get_action('package_show')( + admin_context(), + {'id': resource["package_id"]}) + # Perform the upload + bucket_name = get_ckan_config_option( + "dcor_object_store.bucket_name").format( + organization_id=ds_dict["organization"]["id"]) + rid = resource["id"] + sha256 = sha256sum(path_prev) + s3.upload_file( + bucket_name=bucket_name, + object_name=f"preview/{rid[:3]}/{rid[3:6]}/{rid[6:]}", + path=path_prev, + sha256=sha256, + private=ds_dict["private"]) + # TODO: delete the local resource after successful upload? + + def generate_preview(path_rtdc, path_jpg): # Check whether we have a condensed version of the dataset. # If so, also pass that to overview_plot. diff --git a/ckanext/dc_view/plugin.py b/ckanext/dc_view/plugin.py index 3348542..6488348 100644 --- a/ckanext/dc_view/plugin.py +++ b/ckanext/dc_view/plugin.py @@ -7,11 +7,11 @@ import ckan.plugins.toolkit as toolkit import ckan.plugins as plugins -from dcor_shared import DC_MIME_TYPES +from dcor_shared import DC_MIME_TYPES, s3 from rq.job import Job from .cli import get_commands -from .jobs import create_preview_job +from .jobs import create_preview_job, migrate_preview_to_s3_job from .meta import render_metadata_html from .route_funcs import dcpreview @@ -33,7 +33,7 @@ def get_blueprint(self): # Add plugin url rules to Blueprint object rules = [ - ('/dataset//resource//preview.jpg', + ('/dataset//resource//preview.jpg', 'dcpreview', dcpreview), ] @@ -74,6 +74,19 @@ def after_resource_create(self, context, resource): "job_id": jid_preview, "depends_on": copy.copy(depends_on)}) + # Upload the condensed dataset to S3 + if s3.is_available(): + jid_condensed_s3 = pkg_job_id + "previews3" + toolkit.enqueue_job( + migrate_preview_to_s3_job, + [resource], + title="Migrate preview image to S3 object store", + queue="dcor-normal", + rq_kwargs={"timeout": 1000, + "job_id": jid_condensed_s3, + "depends_on": [jid_preview]} + ) + # IResourceView def info(self): return {'name': 'dc_view', diff --git a/ckanext/dc_view/route_funcs.py b/ckanext/dc_view/route_funcs.py index daf1dbb..deba414 100644 --- a/ckanext/dc_view/route_funcs.py +++ b/ckanext/dc_view/route_funcs.py @@ -7,35 +7,71 @@ import ckan.model as model import ckan.plugins.toolkit as toolkit +import botocore.exceptions +from dcor_shared import get_ckan_config_option, s3 -def dcpreview(id, resource_id): + +def dcpreview(ds_id, res_id): """Serve a preview image on disk Parameters ---------- - id: str + ds_id: str dataset ID - resource_id: str + res_id: str resource ID for which to return the preview image """ # Code borrowed from ckan/controllers/package.py:resource_download context = {'model': model, 'session': model.Session, 'user': c.user, 'auth_user_obj': c.userobj} - id = str(id) - resource_id = str(resource_id) + did = str(ds_id) + rid = str(res_id) try: - rsc = toolkit.get_action('resource_show')(context, {'id': resource_id}) - toolkit.get_action('package_show')(context, {'id': id}) + res_dict = toolkit.get_action('resource_show')(context, {'id': rid}) + ds_dict = toolkit.get_action('package_show')(context, {'id': did}) except (logic.NotFound, logic.NotAuthorized): - toolkit.abort(404, toolkit._('Resource not found')) - else: - if rsc.get('url_type') == 'upload': - upload = uploader.get_resource_uploader(rsc) - filepath = pathlib.Path(upload.get_path(rsc['id'])) - jpg_file = filepath.with_name(filepath.name + "_preview.jpg") - if not jpg_file.exists(): - toolkit.abort(404, toolkit._('Preview not found')) - return flask.send_from_directory(jpg_file.parent, jpg_file.name) - elif 'url' not in rsc: - toolkit.abort(404, toolkit._('No download is available')) - toolkit.redirect_to(rsc['url']) + # Treat not found and not authorized equally, to not leak information + # to unprivileged users. + return toolkit.abort(404, toolkit._('Resource not found')) + + res_stem, _ = res_dict["name"].rsplit(".", 1) + prev_name = f"{res_stem}_preview.jpg" + + if s3 is not None and res_dict.get('s3_available'): + # check if the corresponding S3 object exists + bucket_name = get_ckan_config_option( + "dcor_object_store.bucket_name").format( + organization_id=ds_dict["organization"]["id"]) + object_name = f"preview/{rid[:3]}/{rid[3:6]}/{rid[6:]}" + s3_client, _, _ = s3.get_s3() + try: + s3_client.head_object(Bucket=bucket_name, + Key=object_name) + except botocore.exceptions.ClientError: + pass + else: + # We have an S3 object that we can redirect to. We are making use + # of presigned URLs to be able to specify a filename for download + # (otherwise, users that download via the web interface will + # just get a hash as a file name without any suffix or human- + # readable identifier). + if ds_dict["private"]: + expiration = 3600 + else: + expiration = 86400 + ps_url = s3.create_presigned_url( + bucket_name=bucket_name, + object_name=object_name, + filename=prev_name, + expiration=expiration) + return toolkit.redirect_to(ps_url) + + if res_dict.get('url_type') == 'upload': + upload = uploader.get_resource_uploader(res_dict) + filepath = pathlib.Path(upload.get_path(res_dict['id'])) + jpg_file = filepath.with_name(filepath.name + "_preview.jpg") + if not jpg_file.exists(): + return toolkit.abort(404, toolkit._('Preview not found')) + return flask.send_from_directory(jpg_file.parent, jpg_file.name, + attachment_filename=prev_name) + return toolkit.abort(404, toolkit._('No preview available')) diff --git a/ckanext/dc_view/tests/helper_methods.py b/ckanext/dc_view/tests/helper_methods.py index b22ed8b..0b78b7c 100644 --- a/ckanext/dc_view/tests/helper_methods.py +++ b/ckanext/dc_view/tests/helper_methods.py @@ -1,4 +1,3 @@ -import cgi import pathlib import ckan.tests.helpers as helpers @@ -7,7 +6,8 @@ data_path = pathlib.Path(__file__).parent / "data" -def make_dataset(create_context, owner_org, with_resource=False, +def make_dataset(create_context, owner_org, create_with_upload=None, + test_file_name="calibration_beads_47.rtdc", activate=False, **kwargs): if "title" not in kwargs: kwargs["title"] = "test-dataset" @@ -23,8 +23,10 @@ def make_dataset(create_context, owner_org, with_resource=False, state="draft", **kwargs ) - if with_resource: - rs = make_resource(create_context, dataset_id=ds["id"]) + + if create_with_upload is not None: + rs = make_resource(create_with_upload, create_context, ds["id"], + test_file_name=test_file_name) if activate: helpers.call_action("package_patch", create_context, @@ -33,23 +35,45 @@ def make_dataset(create_context, owner_org, with_resource=False, dataset = helpers.call_action("package_show", id=ds["id"]) - if with_resource: - resource = helpers.call_action("resource_show", id=rs["id"]) - return dataset, resource + if create_with_upload is not None: + return dataset, rs else: return dataset -def make_resource(create_context, dataset_id): - path = data_path / "calibration_beads_47.rtdc" - with path.open('rb') as fd: - upload = cgi.FieldStorage() - upload.filename = path.name - upload.file = fd - res = helpers.call_action("resource_create", create_context, - package_id=dataset_id, - upload=upload, - url="upload", - name=path.name, - ) - return res +def make_resource(create_with_upload, create_context, dataset_id, + test_file_name="calibration_beads_47.rtdc"): + content = (data_path / test_file_name).read_bytes() + rs = create_with_upload( + data=content, + filename=test_file_name, + context=create_context, + package_id=dataset_id, + url="upload", + ) + resource = helpers.call_action("resource_show", id=rs["id"]) + return resource + + +def synchronous_enqueue_job(job_func, args=None, kwargs=None, title=None, + queue=None, rq_kwargs=None): + """ + Synchronous mock for ``ckan.plugins.toolkit.enqueue_job``. + + + Due to the asynchronous nature of background jobs, code that uses them + needs to be handled specially when writing tests. + + A common approach is to use the mock package to replace the + ckan.plugins.toolkit.enqueue_job function with a mock that executes jobs + synchronously instead of asynchronously + + Also, since we are running the tests as root on a ckan instance that + is run by www-data, modifying files on disk in background jobs + (which were started by supervisor as www-data) does not work. + """ + if rq_kwargs is None: + rq_kwargs = {} + args = args or [] + kwargs = kwargs or {} + job_func(*args, **kwargs) diff --git a/ckanext/dc_view/tests/test_jobs.py b/ckanext/dc_view/tests/test_jobs.py index 913e447..37f2806 100644 --- a/ckanext/dc_view/tests/test_jobs.py +++ b/ckanext/dc_view/tests/test_jobs.py @@ -12,6 +12,7 @@ import time import pytest +import requests import ckan.lib import ckan.tests.factories as factories @@ -63,7 +64,8 @@ def test_create_preview_job(enqueue_job_mock, create_with_upload, monkeypatch, # create 1st dataset create_context = {'ignore_auth': False, 'user': user['name'], 'api_version': 3} - dataset = make_dataset(create_context, owner_org, with_resource=False, + dataset = make_dataset(create_context, + owner_org, activate=False) path = data_dir / "calibration_beads_47.rtdc" content = path.read_bytes() @@ -85,3 +87,55 @@ def test_create_preview_job(enqueue_job_mock, create_with_upload, monkeypatch, break else: raise ValueError("Preview generation timed out after 10s!") + + +# We need the dcor_depot extension to make sure that the symbolic- +# linking pipeline is used. +@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dc_view dcor_schemas') +@pytest.mark.usefixtures('clean_db', 'with_request_context') +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_upload_preview_dataset_to_s3_job( + enqueue_job_mock, create_with_upload, monkeypatch, ckan_config, + tmpdir): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.User() + owner_org = factories.Organization(users=[{ + 'name': user['id'], + 'capacity': 'admin' + }]) + # Note: `call_action` bypasses authorization! + # create 1st dataset + create_context = {'ignore_auth': False, + 'user': user['name'], + 'api_version': 3} + ds_dict, res_dict = make_dataset(create_context, + owner_org, + create_with_upload=create_with_upload, + activate=True) + bucket_name = dcor_shared.get_ckan_config_option( + "dcor_object_store.bucket_name").format( + organization_id=ds_dict["organization"]["id"]) + rid = res_dict["id"] + object_name = f"preview/{rid[:3]}/{rid[3:6]}/{rid[6:]}" + endpoint = dcor_shared.get_ckan_config_option( + "dcor_object_store.endpoint_url") + prev_url = f"{endpoint}/{bucket_name}/{object_name}" + response = requests.get(prev_url) + assert response.ok, "resource is public" + assert response.status_code == 200 + # Verify SHA256sum + path = dcor_shared.get_resource_path(res_dict["id"]) + path_prev = path.with_name(path.name + "_preview.jpg") + dl_path = tmpdir / "prev_image.jpg" + with dl_path.open("wb") as fd: + fd.write(response.content) + assert dcor_shared.sha256sum(dl_path) == dcor_shared.sha256sum(path_prev) diff --git a/ckanext/dc_view/tests/test_route.py b/ckanext/dc_view/tests/test_route.py new file mode 100644 index 0000000..9ececc3 --- /dev/null +++ b/ckanext/dc_view/tests/test_route.py @@ -0,0 +1,153 @@ +import mock + +import ckan.common +import ckan.model +import ckan.tests.factories as factories +import ckan.tests.helpers as helpers +import ckanext.dcor_schemas.plugin +import dcor_shared + +import pytest + +from .helper_methods import make_dataset, synchronous_enqueue_job + + +@pytest.mark.ckan_config('ckan.plugins', + 'dcor_depot dcor_schemas dc_serve dc_view') +@pytest.mark.usefixtures('clean_db', 'with_request_context') +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_route_redircet_preview_to_s3_private( + enqueue_job_mock, app, tmpdir, create_with_upload, monkeypatch, + ckan_config): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.User() + user_obj = ckan.model.User.by_name(user["name"]) + monkeypatch.setattr(ckan.common, + 'current_user', + user_obj) + owner_org = factories.Organization(users=[{ + 'name': user['id'], + 'capacity': 'admin' + }]) + # Note: `call_action` bypasses authorization! + create_context = {'ignore_auth': False, + 'user': user['name'], + 'api_version': 3} + # create a dataset + ds_dict, res_dict = make_dataset(create_context, owner_org, + create_with_upload=create_with_upload, + activate=True, + private=True + ) + rid = res_dict["id"] + assert "s3_available" in res_dict + assert "s3_url" in res_dict + + # Remove the local resource to make sure CKAN serves the S3 URL + path = dcor_shared.get_resource_path(rid) + path_prev = path.with_name(path.name + "_preview.jpg") + assert path_prev.exists() + path_prev.unlink() + + did = ds_dict["id"] + # We should not be authorized to access the resource without API token + resp0 = app.get( + f"/dataset/{did}/resource/{rid}/preview.jpg", + status=404 + ) + assert len(resp0.history) == 0 + + # Try again with token + data = helpers.call_action( + u"api_token_create", + context={u"model": ckan.model, u"user": user[u"name"]}, + user=user[u"name"], + name=u"token-name", + ) + + resp = app.get( + f"/dataset/{did}/resource/{rid}/preview.jpg", + headers={u"authorization": data["token"]}, + ) + + endpoint = dcor_shared.get_ckan_config_option( + "dcor_object_store.endpoint_url") + bucket_name = dcor_shared.get_ckan_config_option( + "dcor_object_store.bucket_name").format( + organization_id=ds_dict["organization"]["id"]) + redirect = resp.history[0] + assert redirect.status_code == 302 + redirect_stem = (f"{endpoint}/{bucket_name}/preview/" + f"{rid[:3]}/{rid[3:6]}/{rid[6:]}") + # Since we have a presigned URL, it is longer than the normal S3 URL. + assert redirect.location.startswith(redirect_stem) + assert len(redirect.location) > len(redirect_stem) + + +@pytest.mark.ckan_config('ckan.plugins', + 'dcor_depot dcor_schemas dc_serve dc_view') +@pytest.mark.usefixtures('clean_db', 'with_request_context') +@mock.patch('ckan.plugins.toolkit.enqueue_job', + side_effect=synchronous_enqueue_job) +def test_route_preview_to_s3_public( + enqueue_job_mock, app, tmpdir, create_with_upload, monkeypatch, + ckan_config): + monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir)) + monkeypatch.setattr(ckan.lib.uploader, + 'get_storage_path', + lambda: str(tmpdir)) + monkeypatch.setattr( + ckanext.dcor_schemas.plugin, + 'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS', + True) + + user = factories.User() + user_obj = ckan.model.User.by_name(user["name"]) + monkeypatch.setattr(ckan.common, + 'current_user', + user_obj) + owner_org = factories.Organization(users=[{ + 'name': user['id'], + 'capacity': 'admin' + }]) + # Note: `call_action` bypasses authorization! + create_context = {'ignore_auth': False, + 'user': user['name'], + 'api_version': 3} + # create a dataset + ds_dict, res_dict = make_dataset(create_context, owner_org, + create_with_upload=create_with_upload, + activate=True) + rid = res_dict["id"] + assert "s3_available" in res_dict + assert "s3_url" in res_dict + + # Remove the local resource to make sure CKAN serves the S3 URL + path = dcor_shared.get_resource_path(rid) + path_prev = path.with_name(path.name + "_preview.jpg") + assert path_prev.exists() + path_prev.unlink() + + did = ds_dict["id"] + resp = app.get( + f"/dataset/{did}/resource/{rid}/preview.jpg", + ) + + endpoint = dcor_shared.get_ckan_config_option( + "dcor_object_store.endpoint_url") + bucket_name = dcor_shared.get_ckan_config_option( + "dcor_object_store.bucket_name").format( + organization_id=ds_dict["organization"]["id"]) + redirect = resp.history[0] + assert redirect.status_code == 302 + assert redirect.location.startswith(f"{endpoint}/{bucket_name}/preview/" + f"{rid[:3]}/{rid[3:6]}/{rid[6:]}")