Skip to content

Commit

Permalink
feat: add job that uploads preview images to S3
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Oct 31, 2023
1 parent a302956 commit 1df3cdc
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 52 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
0.8.0
- feat: add job that uploads preview images to S3
0.7.1
- fix: make bg jobs work with dcor_schemas 0.18.2
- ref: cleanup
Expand Down
10 changes: 2 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This extensione implements:

- A default view for RT-DC (mimetype) resources
- A background job that generates the preview image
- A background job that uploads preview images to the S3 object store
- A route that makes the preview image available via
"/dataset/{id}/resource/{resource_id}/preview.jpg"

Expand Down Expand Up @@ -40,14 +41,7 @@ Add this extension to the plugins and defaul_views in ckan.ini:

Testing
-------
If CKAN/DCOR is installed and setup for testing, this extension can
be tested with pytest:

::

pytest ckanext

Testing can also be done via vagrant in a virtualmachine using the
Testing can be done via vagrant in a virtual machine using the
`dcor-test <https://app.vagrantup.com/paulmueller/boxes/dcor-test/>` image.
Make sure that `vagrant` and `virtualbox` are installed and run the
following commands in the root of this repository:
Expand Down
31 changes: 30 additions & 1 deletion ckanext/dc_view/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import shutil
import tempfile

import ckan.plugins.toolkit as toolkit
import dclab
from dcor_shared import DC_MIME_TYPES, get_resource_path, wait_for_resource
from dcor_shared import (
DC_MIME_TYPES, s3, sha256sum, get_ckan_config_option, get_resource_path,
wait_for_resource)
import numpy as np

# Create a temporary matplotlib config directory which is removed on exit
Expand All @@ -19,6 +22,10 @@
import matplotlib.pylab as plt # noqa: E402


def admin_context():
return {'ignore_auth': True, 'user': 'default'}


def create_preview_job(resource, override=False):
"""Generate a *_preview.png file for a DC resource"""
path = get_resource_path(resource["id"])
Expand All @@ -33,6 +40,28 @@ def create_preview_job(resource, override=False):
return False


def migrate_preview_to_s3_job(resource):
"""Migrate a preview image to the S3 object store"""
path = get_resource_path(resource["id"])
path_prev = path.with_name(path.name + "_preview.jpg")
ds_dict = toolkit.get_action('package_show')(
admin_context(),
{'id': resource["package_id"]})
# Perform the upload
bucket_name = get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
rid = resource["id"]
sha256 = sha256sum(path_prev)
s3.upload_file(
bucket_name=bucket_name,
object_name=f"preview/{rid[:3]}/{rid[3:6]}/{rid[6:]}",
path=path_prev,
sha256=sha256,
private=ds_dict["private"])
# TODO: delete the local resource after successful upload?


def generate_preview(path_rtdc, path_jpg):
# Check whether we have a condensed version of the dataset.
# If so, also pass that to overview_plot.
Expand Down
19 changes: 16 additions & 3 deletions ckanext/dc_view/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import ckan.plugins.toolkit as toolkit
import ckan.plugins as plugins

from dcor_shared import DC_MIME_TYPES
from dcor_shared import DC_MIME_TYPES, s3
from rq.job import Job

from .cli import get_commands
from .jobs import create_preview_job
from .jobs import create_preview_job, migrate_preview_to_s3_job
from .meta import render_metadata_html
from .route_funcs import dcpreview

Expand All @@ -33,7 +33,7 @@ def get_blueprint(self):

# Add plugin url rules to Blueprint object
rules = [
('/dataset/<uuid:id>/resource/<uuid:resource_id>/preview.jpg',
('/dataset/<uuid:ds_id>/resource/<uuid:res_id>/preview.jpg',
'dcpreview',
dcpreview),
]
Expand Down Expand Up @@ -74,6 +74,19 @@ def after_resource_create(self, context, resource):
"job_id": jid_preview,
"depends_on": copy.copy(depends_on)})

# Upload the condensed dataset to S3
if s3.is_available():
jid_condensed_s3 = pkg_job_id + "previews3"
toolkit.enqueue_job(
migrate_preview_to_s3_job,
[resource],
title="Migrate preview image to S3 object store",
queue="dcor-normal",
rq_kwargs={"timeout": 1000,
"job_id": jid_condensed_s3,
"depends_on": [jid_preview]}
)

# IResourceView
def info(self):
return {'name': 'dc_view',
Expand Down
74 changes: 55 additions & 19 deletions ckanext/dc_view/route_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,71 @@
import ckan.model as model
import ckan.plugins.toolkit as toolkit

import botocore.exceptions
from dcor_shared import get_ckan_config_option, s3

def dcpreview(id, resource_id):

def dcpreview(ds_id, res_id):
"""Serve a preview image on disk
Parameters
----------
id: str
ds_id: str
dataset ID
resource_id: str
res_id: str
resource ID for which to return the preview image
"""
# Code borrowed from ckan/controllers/package.py:resource_download
context = {'model': model, 'session': model.Session,
'user': c.user, 'auth_user_obj': c.userobj}
id = str(id)
resource_id = str(resource_id)
did = str(ds_id)
rid = str(res_id)
try:
rsc = toolkit.get_action('resource_show')(context, {'id': resource_id})
toolkit.get_action('package_show')(context, {'id': id})
res_dict = toolkit.get_action('resource_show')(context, {'id': rid})
ds_dict = toolkit.get_action('package_show')(context, {'id': did})
except (logic.NotFound, logic.NotAuthorized):
toolkit.abort(404, toolkit._('Resource not found'))
else:
if rsc.get('url_type') == 'upload':
upload = uploader.get_resource_uploader(rsc)
filepath = pathlib.Path(upload.get_path(rsc['id']))
jpg_file = filepath.with_name(filepath.name + "_preview.jpg")
if not jpg_file.exists():
toolkit.abort(404, toolkit._('Preview not found'))
return flask.send_from_directory(jpg_file.parent, jpg_file.name)
elif 'url' not in rsc:
toolkit.abort(404, toolkit._('No download is available'))
toolkit.redirect_to(rsc['url'])
# Treat not found and not authorized equally, to not leak information
# to unprivileged users.
return toolkit.abort(404, toolkit._('Resource not found'))

res_stem, _ = res_dict["name"].rsplit(".", 1)
prev_name = f"{res_stem}_preview.jpg"

if s3 is not None and res_dict.get('s3_available'):
# check if the corresponding S3 object exists
bucket_name = get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
object_name = f"preview/{rid[:3]}/{rid[3:6]}/{rid[6:]}"
s3_client, _, _ = s3.get_s3()
try:
s3_client.head_object(Bucket=bucket_name,
Key=object_name)
except botocore.exceptions.ClientError:
pass
else:
# We have an S3 object that we can redirect to. We are making use
# of presigned URLs to be able to specify a filename for download
# (otherwise, users that download via the web interface will
# just get a hash as a file name without any suffix or human-
# readable identifier).
if ds_dict["private"]:
expiration = 3600
else:
expiration = 86400
ps_url = s3.create_presigned_url(
bucket_name=bucket_name,
object_name=object_name,
filename=prev_name,
expiration=expiration)
return toolkit.redirect_to(ps_url)

if res_dict.get('url_type') == 'upload':
upload = uploader.get_resource_uploader(res_dict)
filepath = pathlib.Path(upload.get_path(res_dict['id']))
jpg_file = filepath.with_name(filepath.name + "_preview.jpg")
if not jpg_file.exists():
return toolkit.abort(404, toolkit._('Preview not found'))
return flask.send_from_directory(jpg_file.parent, jpg_file.name,
attachment_filename=prev_name)
return toolkit.abort(404, toolkit._('No preview available'))
64 changes: 44 additions & 20 deletions ckanext/dc_view/tests/helper_methods.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import cgi
import pathlib

import ckan.tests.helpers as helpers
Expand All @@ -7,7 +6,8 @@
data_path = pathlib.Path(__file__).parent / "data"


def make_dataset(create_context, owner_org, with_resource=False,
def make_dataset(create_context, owner_org, create_with_upload=None,
test_file_name="calibration_beads_47.rtdc",
activate=False, **kwargs):
if "title" not in kwargs:
kwargs["title"] = "test-dataset"
Expand All @@ -23,8 +23,10 @@ def make_dataset(create_context, owner_org, with_resource=False,
state="draft",
**kwargs
)
if with_resource:
rs = make_resource(create_context, dataset_id=ds["id"])

if create_with_upload is not None:
rs = make_resource(create_with_upload, create_context, ds["id"],
test_file_name=test_file_name)

if activate:
helpers.call_action("package_patch", create_context,
Expand All @@ -33,23 +35,45 @@ def make_dataset(create_context, owner_org, with_resource=False,

dataset = helpers.call_action("package_show", id=ds["id"])

if with_resource:
resource = helpers.call_action("resource_show", id=rs["id"])
return dataset, resource
if create_with_upload is not None:
return dataset, rs
else:
return dataset


def make_resource(create_context, dataset_id):
path = data_path / "calibration_beads_47.rtdc"
with path.open('rb') as fd:
upload = cgi.FieldStorage()
upload.filename = path.name
upload.file = fd
res = helpers.call_action("resource_create", create_context,
package_id=dataset_id,
upload=upload,
url="upload",
name=path.name,
)
return res
def make_resource(create_with_upload, create_context, dataset_id,
test_file_name="calibration_beads_47.rtdc"):
content = (data_path / test_file_name).read_bytes()
rs = create_with_upload(
data=content,
filename=test_file_name,
context=create_context,
package_id=dataset_id,
url="upload",
)
resource = helpers.call_action("resource_show", id=rs["id"])
return resource


def synchronous_enqueue_job(job_func, args=None, kwargs=None, title=None,
queue=None, rq_kwargs=None):
"""
Synchronous mock for ``ckan.plugins.toolkit.enqueue_job``.
Due to the asynchronous nature of background jobs, code that uses them
needs to be handled specially when writing tests.
A common approach is to use the mock package to replace the
ckan.plugins.toolkit.enqueue_job function with a mock that executes jobs
synchronously instead of asynchronously
Also, since we are running the tests as root on a ckan instance that
is run by www-data, modifying files on disk in background jobs
(which were started by supervisor as www-data) does not work.
"""
if rq_kwargs is None:
rq_kwargs = {}
args = args or []
kwargs = kwargs or {}
job_func(*args, **kwargs)
56 changes: 55 additions & 1 deletion ckanext/dc_view/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time

import pytest
import requests

import ckan.lib
import ckan.tests.factories as factories
Expand Down Expand Up @@ -63,7 +64,8 @@ def test_create_preview_job(enqueue_job_mock, create_with_upload, monkeypatch,
# create 1st dataset
create_context = {'ignore_auth': False, 'user': user['name'],
'api_version': 3}
dataset = make_dataset(create_context, owner_org, with_resource=False,
dataset = make_dataset(create_context,
owner_org,
activate=False)
path = data_dir / "calibration_beads_47.rtdc"
content = path.read_bytes()
Expand All @@ -85,3 +87,55 @@ def test_create_preview_job(enqueue_job_mock, create_with_upload, monkeypatch,
break
else:
raise ValueError("Preview generation timed out after 10s!")


# We need the dcor_depot extension to make sure that the symbolic-
# linking pipeline is used.
@pytest.mark.ckan_config('ckan.plugins', 'dcor_depot dc_view dcor_schemas')
@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_upload_preview_dataset_to_s3_job(
enqueue_job_mock, create_with_upload, monkeypatch, ckan_config,
tmpdir):
monkeypatch.setitem(ckan_config, 'ckan.storage_path', str(tmpdir))
monkeypatch.setattr(ckan.lib.uploader,
'get_storage_path',
lambda: str(tmpdir))
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)

user = factories.User()
owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
# Note: `call_action` bypasses authorization!
# create 1st dataset
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
ds_dict, res_dict = make_dataset(create_context,
owner_org,
create_with_upload=create_with_upload,
activate=True)
bucket_name = dcor_shared.get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
rid = res_dict["id"]
object_name = f"preview/{rid[:3]}/{rid[3:6]}/{rid[6:]}"
endpoint = dcor_shared.get_ckan_config_option(
"dcor_object_store.endpoint_url")
prev_url = f"{endpoint}/{bucket_name}/{object_name}"
response = requests.get(prev_url)
assert response.ok, "resource is public"
assert response.status_code == 200
# Verify SHA256sum
path = dcor_shared.get_resource_path(res_dict["id"])
path_prev = path.with_name(path.name + "_preview.jpg")
dl_path = tmpdir / "prev_image.jpg"
with dl_path.open("wb") as fd:
fd.write(response.content)
assert dcor_shared.sha256sum(dl_path) == dcor_shared.sha256sum(path_prev)
Loading

0 comments on commit 1df3cdc

Please sign in to comment.