Skip to content

Commit

Permalink
feat: support new DCOR API for uploading data directly to S3
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Mar 6, 2024
1 parent 5d5cf4f commit 1a54508
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
0.14.0
- feat: support new DCOR API for uploading data directly to S3
- enh: add api.errors.APIBadRequest error class
0.13.3
- setup: bump dclab from 0.55.7 to 0.57.0
0.13.2
Expand Down
16 changes: 11 additions & 5 deletions dcoraid/api/ckan_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from .._version import version

from .errors import (APIConflictError, APINotFoundError, NoAPIKeyError,
APIBadGatewayError, APIGatewayTimeoutError,
APIBadGatewayError, APIBadRequest, APIGatewayTimeoutError,
APIAuthorizationError, APIOutdatedError)

#: Minimum required CKAN version on the server side
Expand Down Expand Up @@ -171,20 +171,26 @@ def handle_response(self, req, api_call):
except BaseException:
self.logger.error(traceback.format_exc())
rdata = {}

if isinstance(rdata, str):
raise ValueError(
"Command did not succeed, output: '{}'".format(rdata))
# Something went wrong on a lower level.
if rdata.startswith("Bad request"):
raise APIBadRequest(rdata)
else:
raise ValueError(
"Command did not succeed, output: '{}'".format(rdata))

if not req.ok:
error = rdata.get("error", {})
etype = error.get("__type", req.reason)
e_type = error.get("__type", req.reason)
etext = ""
for key in error:
if not key.startswith("_"):
etext += "{}: {}".format(key, error[key])
if not etext and len(req.reason) < 100:
# Skip large html output, only use small error messages
etext = req.content.decode("utf-8")
msg = "{}: {} (for '{}')".format(etype, etext, api_call)
msg = "{}: {} (for '{}')".format(e_type, etext, api_call)
if req.reason == "NOT FOUND":
raise APINotFoundError(msg)
elif req.reason == "CONFLICT":
Expand Down
124 changes: 112 additions & 12 deletions dcoraid/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
import requests
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor

from .errors import APIConflictError, APINotFoundError
from .errors import APIBadRequest, APIConflictError, APINotFoundError
from .ckan_api import CKANAPI


class NoS3UploadAvailableError(Exception):
"""Used for identifying DCOR servers that don't support direct S3 upload"""
pass


def dataset_activate(dataset_id, api):
"""Change the state of a dataset to "active"
Expand Down Expand Up @@ -206,16 +211,32 @@ def resource_add(dataset_id, path, api, resource_name=None,
if not exist_ok or not resource_exists(dataset_id=dataset_id,
resource_name=resource_name,
api=api):
# perform upload
resource_add_upload_legacy_indirect_ckan(
api=api,
resource_path=path,
dataset_id=dataset_id,
resource_name=resource_name,
monitor_callback=monitor_callback,
logger=logger,
timeout=27.9
)

# Perform the upload
try:
# Uploading directly to S3 is the preferred option.
resource_add_upload_direct_s3(
api=api,
resource_path=path,
dataset_id=dataset_id,
resource_name=resource_name,
monitor_callback=monitor_callback,
logger=logger,
timeout=27.9
)
except NoS3UploadAvailableError as e:
# This is the fall-back option that causes a performance hit
# on the DCOR server and will unsupported in the future.
logger.warning(str(e))
resource_add_upload_legacy_indirect_ckan(
api=api,
resource_path=path,
dataset_id=dataset_id,
resource_name=resource_name,
monitor_callback=monitor_callback,
logger=logger,
timeout=27.9
)

# If we are here, then the upload was successful
logger.info(f"Finished upload {dataset_id}/{resource_name}")
Expand All @@ -234,6 +255,85 @@ def resource_add(dataset_id, path, api, resource_name=None,
return srv_time


def resource_add_upload_direct_s3(
api: CKANAPI,
dataset_id: str,
resource_path: str | pathlib.Path,
resource_name: str,
monitor_callback: Callable = None,
logger: logging.Logger = None,
timeout: float = 27.9):
"""Direct resource upload to S3
This is an efficient method for uploading resources. The files are
uploaded directly to S3, without the detour via the CKAN API.
Parameters
----------
api: CKANAPI
instance of :class:`CKANAPI`
dataset_id: str
dataset ID to which the resource belongs (used to identify the
S3 bucket to which the resource is uploaded)
resource_path: str or pathlib.Path
path to the file that will be uploaded
resource_name: str
name of the resource (as displayed by DCOR)
monitor_callback: callable
callback method for tracking the progress
logger: logging.Logger
logger instance
timeout: float
timeout for the requests.post command for the upload
"""
upload_id = f"{dataset_id}/{resource_name}"
if logger is not None:
logger.info(f"Commencing S3 upload of {upload_id}")

# retrieve the organization ID
org_id = get_organization_id_for_dataset(api=api, dataset_id=dataset_id)

# retrieve the upload URL and the data fields
try:
upload_info = api.get("resource_upload_s3_url",
organization_id=org_id)
except APIBadRequest:
raise NoS3UploadAvailableError(f"Server {api.server} does not yet "
f"support direct upload to S3")

# preshared URL for the upload
ps_url = upload_info["url"]
# special fields for S3
fields = upload_info["fields"]

# Perform the upload to S3
with resource_path.open("rb") as fd:
fields["file"] = (fields["key"], fd)
e = MultipartEncoder(fields=fields)
m = MultipartEncoderMonitor(e, monitor_callback)
# Increase the read size to speed-up upload (the default chunk
# size for uploads in urllib is 8k which results in a lot of
# Python code being involved in uploading a 20GB file; Setting
# the chunk size to 4MB should increase the upload speed):
# https://github.com/requests/toolbelt/issues/75
# #issuecomment-237189952
m._read = m.read
m.read = lambda size: m._read(4 * 1024 * 1024)

# perform the actual upload
hrep = requests.post(
ps_url,
data=m,
headers={'Content-Type': m.content_type},
verify=True, # verify SSL connection
timeout=timeout, # timeout to avoid freezing
)

if hrep.status_code != 204:
raise ValueError(f"Upload {upload_id} failed with "
f"{hrep.status_code}: {hrep.reason}")


def resource_add_upload_legacy_indirect_ckan(
api: CKANAPI,
resource_path: str | pathlib.Path,
Expand All @@ -250,7 +350,7 @@ def resource_add_upload_legacy_indirect_ckan(
like this. A better approach is to upload resources directly to S3.
DCOR supports uploading resources directly to S3, which is facilitated
via the `resource_upload_s3_url` API action providing a presigned
upload link.
upload link. (see :func:`resource_add_upload_direct_s3`)
"""
upload_id = f"{dataset_id}/{resource_name}"
if logger is not None:
Expand Down
5 changes: 5 additions & 0 deletions dcoraid/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ class APIBadGatewayError(APIError):
pass


class APIBadRequest(APIError):
"""An API command cannot be found"""
pass


class APIConflictError(APIError):
"""Invalid payload to DCOR server"""
pass
Expand Down

0 comments on commit 1a54508

Please sign in to comment.