Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RQ Job Timeout Handling #223

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from psycopg2 import errors
from six.moves.urllib.parse import urlsplit
import requests
from rq import get_current_job
from rq import get_current_job, timeouts as rq_timeouts
import sqlalchemy as sa

from ckan import model
Expand Down Expand Up @@ -245,10 +245,21 @@ def tabulator_load():
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
tabulator_load()
try:
tabulator_load()
except rq_timeouts.JobTimeoutException as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't catching timeouts from the tabulator_load on line 266. Perhaps move this block to the outer catch, line 267?

tmp_file.close()
timeout = config.get('ckanext.xloader.job_timeout', '3600')
logger.warning('Job time out after %ss', timeout)
raise JobError('Job timed out after {}s'.format(timeout))
else:
try:
direct_load()
except rq_timeouts.JobTimeoutException as e:
tmp_file.close()
timeout = config.get('ckanext.xloader.job_timeout', '3600')
logger.warning('Job time out after %ss', timeout)
raise JobError('Job timed out after {}s'.format(timeout))
except JobError as e:
logger.warning('Load using COPY failed: %s', e)
logger.info('Trying again with tabulator')
Expand Down Expand Up @@ -351,6 +362,7 @@ def _download_resource_data(resource, data, api_key, logger):
response.close()
data['datastore_contains_all_records_of_source_file'] = False
except requests.exceptions.HTTPError as error:
tmp_file.close()
# status code error
logger.debug('HTTP error: %s', error)
raise HTTPError(
Expand All @@ -362,6 +374,7 @@ def _download_resource_data(resource, data, api_key, logger):
raise JobError('Connection timed out after {}s'.format(
DOWNLOAD_TIMEOUT))
except requests.exceptions.RequestException as e:
tmp_file.close()
try:
err_message = str(e.reason)
except AttributeError:
Expand All @@ -370,6 +383,11 @@ def _download_resource_data(resource, data, api_key, logger):
raise HTTPError(
message=err_message, status_code=None,
request_url=url, response=None)
except rq_timeouts.JobTimeoutException as e:
tmp_file.close()
timeout = config.get('ckanext.xloader.job_timeout', '3600')
logger.warning('Job time out after %ss', timeout)
raise JobError('Job timed out after {}s'.format(timeout))

logger.info('Downloaded ok - %s', printable_file_size(length))
file_hash = m.hexdigest()
Expand Down
27 changes: 26 additions & 1 deletion ckanext/xloader/tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import io
import os

from datetime import datetime

Expand All @@ -16,6 +17,7 @@


_TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10"
_TEST_LARGE_FILE_CONTENT = "\n1,2\n2,4\n3,6\n4,8\n5,10"


def get_response(download_url, headers):
Expand All @@ -25,14 +27,24 @@ def get_response(download_url, headers):
resp.headers = headers
return resp


def get_large_response(download_url, headers):
"""Mock jobs.get_response() method to fake a large file."""
resp = Response()
resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode())
resp.headers = {'content-length': 2000000000}
return resp

def get_large_data_response(download_url, headers):
"""Mock jobs.get_response() method."""
resp = Response()
f_content = _TEST_FILE_CONTENT + (_TEST_LARGE_FILE_CONTENT * 500000)
resp.raw = io.BytesIO(f_content.encode())
resp.headers = headers
return resp

def _get_temp_files():
return [os.path.join('/tmp', f) for f in os.listdir('/tmp') if os.path.isfile(os.path.join('/tmp', f))]


@pytest.fixture
def apikey():
Expand Down Expand Up @@ -74,6 +86,8 @@ def data(create_with_upload, apikey):


@pytest.mark.usefixtures("clean_db", "with_plugins")
@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 15)
@pytest.mark.ckan_config("ckan.jobs.timeout", 15)
class TestXLoaderJobs(helpers.FunctionalRQTestBase):

def test_xloader_data_into_datastore(self, cli, data):
Expand Down Expand Up @@ -123,6 +137,17 @@ def test_data_max_excerpt_lines_config(self, cli, data):
resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"] is False

def test_data_with_rq_job_timeout(self, cli, data):
for f in _get_temp_files():
os.remove(f)
assert len(_get_temp_files()) == 0
self.enqueue(jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=15))
with mock.patch("ckanext.xloader.jobs.get_response", get_large_data_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Job timed out after" in stdout
assert len(_get_temp_files()) == 0



@pytest.mark.usefixtures("clean_db")
class TestSetResourceMetadata(object):
Expand Down
Loading