diff --git a/docker-compose.apps.yml b/docker-compose.apps.yml index 338bab8b4..5699c7659 100644 --- a/docker-compose.apps.yml +++ b/docker-compose.apps.yml @@ -56,7 +56,7 @@ services: command: > /bin/bash -c ' sleep 3; - celery -A main.celery:app worker -E -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}' + celery -A main.celery:app worker -B -E -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}' depends_on: db: condition: service_healthy diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index 670d8f0ef..408a32620 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -3,6 +3,7 @@ import datetime import itertools import logging +import time from collections import OrderedDict from contextlib import contextmanager from itertools import groupby @@ -275,14 +276,8 @@ def send_subscription_emails(self, subscription_type, period="daily"): return self.replace(email_tasks) -@app.task( - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", - acks_late=True, - reject_on_worker_lost=True, -) -def index_learning_resources(ids, resource_type, index_types): +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def index_learning_resources(self, ids, resource_type, index_types): """ Index courses @@ -294,14 +289,10 @@ def index_learning_resources(ids, resource_type, index_types): """ try: - with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS): - api.index_learning_resources(ids, resource_type, index_types) - except (RetryError, Ignore, SystemExit): - raise - except: # noqa: E722 - error = "index_courses threw an error" - log.exception(error) - return error + api.index_learning_resources(ids, resource_type, index_types) + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -354,14 +345,8 @@ def bulk_deindex_percolators(ids): return error -@app.task( - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", - acks_late=True, - reject_on_worker_lost=True, -) -def bulk_index_percolate_queries(percolate_ids, index_types): +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def bulk_index_percolate_queries(self, percolate_ids, index_types): """ Bulk index percolate queries for provided percolate query Ids @@ -378,12 +363,10 @@ def bulk_index_percolate_queries(percolate_ids, index_types): PERCOLATE_INDEX_TYPE, index_types, ) - except (RetryError, Ignore, SystemExit): - raise - except: # noqa: E722 - error = "bulk_index_percolate_queries threw an error" - log.exception(error) - return error + time.sleep(1) + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -409,14 +392,9 @@ def index_course_content_files(course_ids, index_types): return error -@app.task( - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", - acks_late=True, - reject_on_worker_lost=True, -) +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) def index_content_files( + self, content_file_ids, learning_resource_id, index_types=IndexestoUpdate.all_indexes.value, @@ -432,16 +410,13 @@ def index_content_files( """ try: - with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS): - api.index_content_files( - content_file_ids, learning_resource_id, index_types=index_types - ) - except (RetryError, Ignore, SystemExit): - raise - except: # noqa: E722 - error = "index_content_files threw an error" - log.exception(error) - return error + api.index_content_files( + content_file_ids, learning_resource_id, index_types=index_types + ) + time.sleep(1) + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -533,113 +508,109 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags): """ Wipe and recreate index and mapping, and index all items. """ - try: - if not remove_existing_reindexing_tags: - existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) - if existing_reindexing_indexes: - error = ( - f"Reindexing in progress. Reindexing indexes already exist: " - f"{', '.join(existing_reindexing_indexes)}" - ) - log.exception(error) - return error + if not remove_existing_reindexing_tags: + existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) - api.delete_orphaned_indexes( - indexes, delete_reindexing_tags=remove_existing_reindexing_tags - ) + if existing_reindexing_indexes: + error = ( + f"Reindexing in progress. Reindexing indexes already exist: " + f"{', '.join(existing_reindexing_indexes)}" + ) + log.exception(error) + return error - new_backing_indices = { - obj_type: api.create_backing_index(obj_type) for obj_type in indexes - } + api.delete_orphaned_indexes( + indexes, delete_reindexing_tags=remove_existing_reindexing_tags + ) - # Do the indexing on the temp index - log.info("starting to index %s objects...", ", ".join(indexes)) + new_backing_indices = { + obj_type: api.create_backing_index(obj_type) for obj_type in indexes + } - index_tasks = [] + # Do the indexing on the temp index + log.info("starting to index %s objects...", ", ".join(indexes)) - if PERCOLATE_INDEX_TYPE in indexes: + index_tasks = [] + + if PERCOLATE_INDEX_TYPE in indexes: + index_tasks = index_tasks + [ + bulk_index_percolate_queries.s( + percolate_ids, IndexestoUpdate.reindexing_index.value + ) + for percolate_ids in chunks( + PercolateQuery.objects.order_by("id").values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + ) + ] + + if COURSE_TYPE in indexes: + blocklisted_ids = load_course_blocklist() + index_tasks = index_tasks + [ + index_learning_resources.si( + ids, + COURSE_TYPE, + index_types=IndexestoUpdate.reindexing_index.value, + ) + for ids in chunks( + Course.objects.filter(learning_resource__published=True) + .exclude(learning_resource__readable_id=blocklisted_ids) + .order_by("learning_resource_id") + .values_list("learning_resource_id", flat=True), + chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + ) + ] + + for course in ( + Course.objects.filter(learning_resource__published=True) + .filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES) + .exclude(learning_resource__readable_id=blocklisted_ids) + .order_by("learning_resource_id") + ): index_tasks = index_tasks + [ - bulk_index_percolate_queries.si( - percolate_ids, IndexestoUpdate.reindexing_index.value + index_content_files.si( + ids, + course.learning_resource_id, + index_types=IndexestoUpdate.reindexing_index.value, ) - for percolate_ids in chunks( - PercolateQuery.objects.order_by("id").values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + for ids in chunks( + ContentFile.objects.filter( + run__learning_resource_id=course.learning_resource_id, + published=True, + run__published=True, + ) + .order_by("id") + .values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE, ) ] - if COURSE_TYPE in indexes: - blocklisted_ids = load_course_blocklist() + for resource_type in [ + PROGRAM_TYPE, + PODCAST_TYPE, + PODCAST_EPISODE_TYPE, + LEARNING_PATH_TYPE, + VIDEO_TYPE, + VIDEO_PLAYLIST_TYPE, + ]: + if resource_type in indexes: index_tasks = index_tasks + [ index_learning_resources.si( ids, - COURSE_TYPE, + resource_type, index_types=IndexestoUpdate.reindexing_index.value, ) for ids in chunks( - Course.objects.filter(learning_resource__published=True) - .exclude(learning_resource__readable_id=blocklisted_ids) - .order_by("learning_resource_id") - .values_list("learning_resource_id", flat=True), + LearningResource.objects.filter( + published=True, resource_type=resource_type + ) + .order_by("id") + .values_list("id", flat=True), chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, ) ] - for course in ( - Course.objects.filter(learning_resource__published=True) - .filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES) - .exclude(learning_resource__readable_id=blocklisted_ids) - .order_by("learning_resource_id") - ): - index_tasks = index_tasks + [ - index_content_files.si( - ids, - course.learning_resource_id, - index_types=IndexestoUpdate.reindexing_index.value, - ) - for ids in chunks( - ContentFile.objects.filter( - run__learning_resource_id=course.learning_resource_id, - published=True, - run__published=True, - ) - .order_by("id") - .values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE, - ) - ] - - for resource_type in [ - PROGRAM_TYPE, - PODCAST_TYPE, - PODCAST_EPISODE_TYPE, - LEARNING_PATH_TYPE, - VIDEO_TYPE, - VIDEO_PLAYLIST_TYPE, - ]: - if resource_type in indexes: - index_tasks = index_tasks + [ - index_learning_resources.si( - ids, - resource_type, - index_types=IndexestoUpdate.reindexing_index.value, - ) - for ids in chunks( - LearningResource.objects.filter( - published=True, resource_type=resource_type - ) - .order_by("id") - .values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, - ) - ] - - index_tasks = celery.group(index_tasks) - except: # noqa: E722 - error = "start_recreate_index threw an error" - log.exception(error) - return error + index_tasks = celery.group(index_tasks) # Use self.replace so that code waiting on this task will also wait on the indexing # and finish tasks @@ -858,14 +829,8 @@ def get_update_learning_resource_tasks(resource_type): ] -@app.task( - acks_late=True, - reject_on_worker_lost=True, - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", -) -def finish_recreate_index(results, backing_indices): +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def finish_recreate_index(self, results, backing_indices): """ Swap reindex backing index with default backing index @@ -873,27 +838,31 @@ def finish_recreate_index(results, backing_indices): results (list or bool): Results saying whether the error exists backing_indices (dict): The backing OpenSearch indices keyed by object type """ - errors = merge_strings(results) - if errors: - try: - api.delete_orphaned_indexes( - list(backing_indices.keys()), delete_reindexing_tags=True - ) - except RequestError as ex: - raise RetryError(str(ex)) from ex - msg = f"Errors occurred during recreate_index: {errors}" - raise ReindexError(msg) + try: + errors = merge_strings(results) + if errors: + try: + api.delete_orphaned_indexes( + list(backing_indices.keys()), delete_reindexing_tags=True + ) + except RequestError as ex: + raise RetryError(str(ex)) from ex + msg = f"Errors occurred during recreate_index: {errors}" + raise ReindexError(msg) - log.info( - "Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501 - ) - for obj_type, backing_index in backing_indices.items(): - try: - api.switch_indices(backing_index, obj_type) - except RequestError as ex: - raise RetryError(str(ex)) from ex - log.info("recreate_index has finished successfully!") - clear_search_cache() + log.info( + "Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501 + ) + for obj_type, backing_index in backing_indices.items(): + try: + api.switch_indices(backing_index, obj_type) + except RequestError as ex: + raise RetryError(str(ex)) from ex + log.info("recreate_index has finished successfully!") + clear_search_cache() + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) def _generate_subscription_digest_subject( @@ -965,3 +934,27 @@ def attempt_send_digest_email_batch(user_template_items): "short_subject": short_subject, }, ) + + +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def finish_long_running_task(self): + time.sleep(10) + log.error("Finished") + + +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def long_running_subtask(self, seconds): + try: + log.info(f"Starting task {self.request.id}, sleeping for {seconds} seconds") + time.sleep(seconds) + log.info(f"Task {self.request.id} completed") + return f"Task completed after {seconds} seconds" + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) + + +@app.task(bind=True) +def long_running_task(self, seconds): + grouped_tasks = celery.group(long_running_subtask.si(seconds) for _ in range(100)) + return self.replace(celery.chain(grouped_tasks, finish_long_running_task.si()))