From e6e1e226294e007ca8675abd631af8418bf6e40b Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Fri, 27 Sep 2024 08:57:52 -0400 Subject: [PATCH 1/7] retry_on_celery_worker_lost for all recreate_index subtasks --- learning_resources_search/tasks.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index b3a07cf71..d4a30e08b 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -275,7 +275,13 @@ def send_subscription_emails(self, subscription_type, period="daily"): return self.replace(email_tasks) -@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") +@app.task( + autoretry_for=(RetryError,), + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def index_learning_resources(ids, resource_type, index_types): """ Index courses @@ -348,7 +354,13 @@ def bulk_deindex_percolators(ids): return error -@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") +@app.task( + autoretry_for=(RetryError,), + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def bulk_index_percolate_queries(percolate_ids, index_types): """ Bulk index percolate queries for provided percolate query Ids @@ -397,7 +409,13 @@ def index_course_content_files(course_ids, index_types): return error -@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") +@app.task( + autoretry_for=(RetryError,), + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def index_content_files( content_file_ids, learning_resource_id, From 95f3b067a2c1eb5f2feeab6fc1ff1c7e5ac05753 Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Fri, 27 Sep 2024 09:08:12 -0400 Subject: [PATCH 2/7] Add SystemExit retry --- learning_resources_search/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index d4a30e08b..5493e41aa 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -276,7 +276,7 @@ def send_subscription_emails(self, subscription_type, period="daily"): @app.task( - autoretry_for=(RetryError,), + autoretry_for=(RetryError, SystemExit), retry_backoff=True, rate_limit="600/m", acks_late=True, @@ -355,7 +355,7 @@ def bulk_deindex_percolators(ids): @app.task( - autoretry_for=(RetryError,), + autoretry_for=(RetryError, SystemExit), retry_backoff=True, rate_limit="600/m", acks_late=True, @@ -410,7 +410,7 @@ def index_course_content_files(course_ids, index_types): @app.task( - autoretry_for=(RetryError,), + autoretry_for=(RetryError, SystemExit), retry_backoff=True, rate_limit="600/m", acks_late=True, @@ -861,7 +861,7 @@ def get_update_learning_resource_tasks(resource_type): @app.task( acks_late=True, reject_on_worker_lost=True, - autoretry_for=(RetryError,), + autoretry_for=(RetryError, SystemExit), retry_backoff=True, rate_limit="600/m", ) From 974643277dc38cccae9f98bd4448af14bf1899ff Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Mon, 30 Sep 2024 08:02:46 -0400 Subject: [PATCH 3/7] experiment --- docker-compose.apps.yml | 5 +++-- docker-compose.services.yml | 1 + learning_resources_search/tasks.py | 8 ++++---- main/settings_celery.py | 2 -- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docker-compose.apps.yml b/docker-compose.apps.yml index 06049cfe6..1e1bb2958 100644 --- a/docker-compose.apps.yml +++ b/docker-compose.apps.yml @@ -56,8 +56,7 @@ services: command: > /bin/bash -c ' sleep 3; - celery -A main.celery:app worker -Q default -B -l ${MITOL_LOG_LEVEL:-INFO} & - celery -A main.celery:app worker -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}' + celery -A main.celery:app worker -E -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}' depends_on: db: condition: service_healthy @@ -66,3 +65,5 @@ services: volumes: - .:/src - django_media:/var/media + environment: + REMAP_SIGTERM: "SIGQUIT" diff --git a/docker-compose.services.yml b/docker-compose.services.yml index f43ce554e..37ff0a8cf 100644 --- a/docker-compose.services.yml +++ b/docker-compose.services.yml @@ -24,6 +24,7 @@ services: profiles: - backend image: redis:7.4.0 + command: ["redis-server", "--save", "", "--appendonly", "no"] healthcheck: test: ["CMD", "redis-cli", "ping", "|", "grep", "PONG"] interval: 3s diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index 5493e41aa..3ddc5bee4 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -276,7 +276,7 @@ def send_subscription_emails(self, subscription_type, period="daily"): @app.task( - autoretry_for=(RetryError, SystemExit), + autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m", acks_late=True, @@ -355,7 +355,7 @@ def bulk_deindex_percolators(ids): @app.task( - autoretry_for=(RetryError, SystemExit), + autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m", acks_late=True, @@ -410,7 +410,7 @@ def index_course_content_files(course_ids, index_types): @app.task( - autoretry_for=(RetryError, SystemExit), + autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m", acks_late=True, @@ -643,7 +643,7 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags): # Use self.replace so that code waiting on this task will also wait on the indexing # and finish tasks - return self.replace( + raise self.replace( celery.chain(index_tasks, finish_recreate_index.s(new_backing_indices)) ) diff --git a/main/settings_celery.py b/main/settings_celery.py index 7fc24e41d..efa5a74cd 100644 --- a/main/settings_celery.py +++ b/main/settings_celery.py @@ -141,5 +141,3 @@ CELERY_RESULT_SERIALIZER = "json" CELERY_ACCEPT_CONTENT = ["json"] CELERY_TIMEZONE = "UTC" -CELERY_TASK_TASK_TRACK_STARTED = True -CELERY_TASK_SEND_SENT_EVENT = True From 38fcc2cba8b92aae6fe6134f98339b334e2b2f6b Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Mon, 30 Sep 2024 10:26:05 -0400 Subject: [PATCH 4/7] workign --- docker-compose.apps.yml | 2 +- docker-compose.yml | 2 ++ learning_resources_search/tasks.py | 10 +++++----- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docker-compose.apps.yml b/docker-compose.apps.yml index 1e1bb2958..338bab8b4 100644 --- a/docker-compose.apps.yml +++ b/docker-compose.apps.yml @@ -66,4 +66,4 @@ services: - .:/src - django_media:/var/media environment: - REMAP_SIGTERM: "SIGQUIT" + REMAP_SIGTERM: SIGQUIT diff --git a/docker-compose.yml b/docker-compose.yml index 584fe7c06..8d930f635 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,3 +46,5 @@ services: # DEPRECATED: legacy .env file at the repo root - path: .env required: false + environment: + REMAP_SIGTERM: SIGQUIT diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index 3ddc5bee4..670d8f0ef 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -296,7 +296,7 @@ def index_learning_resources(ids, resource_type, index_types): try: with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS): api.index_learning_resources(ids, resource_type, index_types) - except (RetryError, Ignore): + except (RetryError, Ignore, SystemExit): raise except: # noqa: E722 error = "index_courses threw an error" @@ -378,7 +378,7 @@ def bulk_index_percolate_queries(percolate_ids, index_types): PERCOLATE_INDEX_TYPE, index_types, ) - except (RetryError, Ignore): + except (RetryError, Ignore, SystemExit): raise except: # noqa: E722 error = "bulk_index_percolate_queries threw an error" @@ -436,7 +436,7 @@ def index_content_files( api.index_content_files( content_file_ids, learning_resource_id, index_types=index_types ) - except (RetryError, Ignore): + except (RetryError, Ignore, SystemExit): raise except: # noqa: E722 error = "index_content_files threw an error" @@ -643,7 +643,7 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags): # Use self.replace so that code waiting on this task will also wait on the indexing # and finish tasks - raise self.replace( + return self.replace( celery.chain(index_tasks, finish_recreate_index.s(new_backing_indices)) ) @@ -861,7 +861,7 @@ def get_update_learning_resource_tasks(resource_type): @app.task( acks_late=True, reject_on_worker_lost=True, - autoretry_for=(RetryError, SystemExit), + autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m", ) From 4c3c4487744a0ffe432c58b59633ee63afe9731e Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Mon, 30 Sep 2024 13:17:42 -0400 Subject: [PATCH 5/7] working2 --- docker-compose.apps.yml | 2 +- learning_resources_search/tasks.py | 315 ++++++++++++++--------------- 2 files changed, 155 insertions(+), 162 deletions(-) diff --git a/docker-compose.apps.yml b/docker-compose.apps.yml index 338bab8b4..5699c7659 100644 --- a/docker-compose.apps.yml +++ b/docker-compose.apps.yml @@ -56,7 +56,7 @@ services: command: > /bin/bash -c ' sleep 3; - celery -A main.celery:app worker -E -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}' + celery -A main.celery:app worker -B -E -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}' depends_on: db: condition: service_healthy diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index 670d8f0ef..408a32620 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -3,6 +3,7 @@ import datetime import itertools import logging +import time from collections import OrderedDict from contextlib import contextmanager from itertools import groupby @@ -275,14 +276,8 @@ def send_subscription_emails(self, subscription_type, period="daily"): return self.replace(email_tasks) -@app.task( - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", - acks_late=True, - reject_on_worker_lost=True, -) -def index_learning_resources(ids, resource_type, index_types): +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def index_learning_resources(self, ids, resource_type, index_types): """ Index courses @@ -294,14 +289,10 @@ def index_learning_resources(ids, resource_type, index_types): """ try: - with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS): - api.index_learning_resources(ids, resource_type, index_types) - except (RetryError, Ignore, SystemExit): - raise - except: # noqa: E722 - error = "index_courses threw an error" - log.exception(error) - return error + api.index_learning_resources(ids, resource_type, index_types) + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -354,14 +345,8 @@ def bulk_deindex_percolators(ids): return error -@app.task( - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", - acks_late=True, - reject_on_worker_lost=True, -) -def bulk_index_percolate_queries(percolate_ids, index_types): +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def bulk_index_percolate_queries(self, percolate_ids, index_types): """ Bulk index percolate queries for provided percolate query Ids @@ -378,12 +363,10 @@ def bulk_index_percolate_queries(percolate_ids, index_types): PERCOLATE_INDEX_TYPE, index_types, ) - except (RetryError, Ignore, SystemExit): - raise - except: # noqa: E722 - error = "bulk_index_percolate_queries threw an error" - log.exception(error) - return error + time.sleep(1) + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -409,14 +392,9 @@ def index_course_content_files(course_ids, index_types): return error -@app.task( - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", - acks_late=True, - reject_on_worker_lost=True, -) +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) def index_content_files( + self, content_file_ids, learning_resource_id, index_types=IndexestoUpdate.all_indexes.value, @@ -432,16 +410,13 @@ def index_content_files( """ try: - with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS): - api.index_content_files( - content_file_ids, learning_resource_id, index_types=index_types - ) - except (RetryError, Ignore, SystemExit): - raise - except: # noqa: E722 - error = "index_content_files threw an error" - log.exception(error) - return error + api.index_content_files( + content_file_ids, learning_resource_id, index_types=index_types + ) + time.sleep(1) + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -533,113 +508,109 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags): """ Wipe and recreate index and mapping, and index all items. """ - try: - if not remove_existing_reindexing_tags: - existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) - if existing_reindexing_indexes: - error = ( - f"Reindexing in progress. Reindexing indexes already exist: " - f"{', '.join(existing_reindexing_indexes)}" - ) - log.exception(error) - return error + if not remove_existing_reindexing_tags: + existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) - api.delete_orphaned_indexes( - indexes, delete_reindexing_tags=remove_existing_reindexing_tags - ) + if existing_reindexing_indexes: + error = ( + f"Reindexing in progress. Reindexing indexes already exist: " + f"{', '.join(existing_reindexing_indexes)}" + ) + log.exception(error) + return error - new_backing_indices = { - obj_type: api.create_backing_index(obj_type) for obj_type in indexes - } + api.delete_orphaned_indexes( + indexes, delete_reindexing_tags=remove_existing_reindexing_tags + ) - # Do the indexing on the temp index - log.info("starting to index %s objects...", ", ".join(indexes)) + new_backing_indices = { + obj_type: api.create_backing_index(obj_type) for obj_type in indexes + } - index_tasks = [] + # Do the indexing on the temp index + log.info("starting to index %s objects...", ", ".join(indexes)) - if PERCOLATE_INDEX_TYPE in indexes: + index_tasks = [] + + if PERCOLATE_INDEX_TYPE in indexes: + index_tasks = index_tasks + [ + bulk_index_percolate_queries.s( + percolate_ids, IndexestoUpdate.reindexing_index.value + ) + for percolate_ids in chunks( + PercolateQuery.objects.order_by("id").values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + ) + ] + + if COURSE_TYPE in indexes: + blocklisted_ids = load_course_blocklist() + index_tasks = index_tasks + [ + index_learning_resources.si( + ids, + COURSE_TYPE, + index_types=IndexestoUpdate.reindexing_index.value, + ) + for ids in chunks( + Course.objects.filter(learning_resource__published=True) + .exclude(learning_resource__readable_id=blocklisted_ids) + .order_by("learning_resource_id") + .values_list("learning_resource_id", flat=True), + chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + ) + ] + + for course in ( + Course.objects.filter(learning_resource__published=True) + .filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES) + .exclude(learning_resource__readable_id=blocklisted_ids) + .order_by("learning_resource_id") + ): index_tasks = index_tasks + [ - bulk_index_percolate_queries.si( - percolate_ids, IndexestoUpdate.reindexing_index.value + index_content_files.si( + ids, + course.learning_resource_id, + index_types=IndexestoUpdate.reindexing_index.value, ) - for percolate_ids in chunks( - PercolateQuery.objects.order_by("id").values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + for ids in chunks( + ContentFile.objects.filter( + run__learning_resource_id=course.learning_resource_id, + published=True, + run__published=True, + ) + .order_by("id") + .values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE, ) ] - if COURSE_TYPE in indexes: - blocklisted_ids = load_course_blocklist() + for resource_type in [ + PROGRAM_TYPE, + PODCAST_TYPE, + PODCAST_EPISODE_TYPE, + LEARNING_PATH_TYPE, + VIDEO_TYPE, + VIDEO_PLAYLIST_TYPE, + ]: + if resource_type in indexes: index_tasks = index_tasks + [ index_learning_resources.si( ids, - COURSE_TYPE, + resource_type, index_types=IndexestoUpdate.reindexing_index.value, ) for ids in chunks( - Course.objects.filter(learning_resource__published=True) - .exclude(learning_resource__readable_id=blocklisted_ids) - .order_by("learning_resource_id") - .values_list("learning_resource_id", flat=True), + LearningResource.objects.filter( + published=True, resource_type=resource_type + ) + .order_by("id") + .values_list("id", flat=True), chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, ) ] - for course in ( - Course.objects.filter(learning_resource__published=True) - .filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES) - .exclude(learning_resource__readable_id=blocklisted_ids) - .order_by("learning_resource_id") - ): - index_tasks = index_tasks + [ - index_content_files.si( - ids, - course.learning_resource_id, - index_types=IndexestoUpdate.reindexing_index.value, - ) - for ids in chunks( - ContentFile.objects.filter( - run__learning_resource_id=course.learning_resource_id, - published=True, - run__published=True, - ) - .order_by("id") - .values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE, - ) - ] - - for resource_type in [ - PROGRAM_TYPE, - PODCAST_TYPE, - PODCAST_EPISODE_TYPE, - LEARNING_PATH_TYPE, - VIDEO_TYPE, - VIDEO_PLAYLIST_TYPE, - ]: - if resource_type in indexes: - index_tasks = index_tasks + [ - index_learning_resources.si( - ids, - resource_type, - index_types=IndexestoUpdate.reindexing_index.value, - ) - for ids in chunks( - LearningResource.objects.filter( - published=True, resource_type=resource_type - ) - .order_by("id") - .values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, - ) - ] - - index_tasks = celery.group(index_tasks) - except: # noqa: E722 - error = "start_recreate_index threw an error" - log.exception(error) - return error + index_tasks = celery.group(index_tasks) # Use self.replace so that code waiting on this task will also wait on the indexing # and finish tasks @@ -858,14 +829,8 @@ def get_update_learning_resource_tasks(resource_type): ] -@app.task( - acks_late=True, - reject_on_worker_lost=True, - autoretry_for=(RetryError,), - retry_backoff=True, - rate_limit="600/m", -) -def finish_recreate_index(results, backing_indices): +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def finish_recreate_index(self, results, backing_indices): """ Swap reindex backing index with default backing index @@ -873,27 +838,31 @@ def finish_recreate_index(results, backing_indices): results (list or bool): Results saying whether the error exists backing_indices (dict): The backing OpenSearch indices keyed by object type """ - errors = merge_strings(results) - if errors: - try: - api.delete_orphaned_indexes( - list(backing_indices.keys()), delete_reindexing_tags=True - ) - except RequestError as ex: - raise RetryError(str(ex)) from ex - msg = f"Errors occurred during recreate_index: {errors}" - raise ReindexError(msg) + try: + errors = merge_strings(results) + if errors: + try: + api.delete_orphaned_indexes( + list(backing_indices.keys()), delete_reindexing_tags=True + ) + except RequestError as ex: + raise RetryError(str(ex)) from ex + msg = f"Errors occurred during recreate_index: {errors}" + raise ReindexError(msg) - log.info( - "Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501 - ) - for obj_type, backing_index in backing_indices.items(): - try: - api.switch_indices(backing_index, obj_type) - except RequestError as ex: - raise RetryError(str(ex)) from ex - log.info("recreate_index has finished successfully!") - clear_search_cache() + log.info( + "Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501 + ) + for obj_type, backing_index in backing_indices.items(): + try: + api.switch_indices(backing_index, obj_type) + except RequestError as ex: + raise RetryError(str(ex)) from ex + log.info("recreate_index has finished successfully!") + clear_search_cache() + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) def _generate_subscription_digest_subject( @@ -965,3 +934,27 @@ def attempt_send_digest_email_batch(user_template_items): "short_subject": short_subject, }, ) + + +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def finish_long_running_task(self): + time.sleep(10) + log.error("Finished") + + +@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +def long_running_subtask(self, seconds): + try: + log.info(f"Starting task {self.request.id}, sleeping for {seconds} seconds") + time.sleep(seconds) + log.info(f"Task {self.request.id} completed") + return f"Task completed after {seconds} seconds" + except Exception as exc: + log.error(f"Task {self.request.id} failed: {exc}") + raise self.retry(exc=exc, countdown=5) + + +@app.task(bind=True) +def long_running_task(self, seconds): + grouped_tasks = celery.group(long_running_subtask.si(seconds) for _ in range(100)) + return self.replace(celery.chain(grouped_tasks, finish_long_running_task.si())) From 320df9230f206ea448922ba09aa3a135ebf48f04 Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Mon, 30 Sep 2024 17:10:21 -0400 Subject: [PATCH 6/7] Working but only with cold shutdowns and short sleep inetrvals in each subtask --- docker-compose.yml | 2 - learning_resources_search/tasks.py | 255 +++++++++++++++-------------- main/settings_celery.py | 2 + 3 files changed, 132 insertions(+), 127 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8d930f635..584fe7c06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,5 +46,3 @@ services: # DEPRECATED: legacy .env file at the repo root - path: .env required: false - environment: - REMAP_SIGTERM: SIGQUIT diff --git a/learning_resources_search/tasks.py b/learning_resources_search/tasks.py index 408a32620..6c884ae31 100644 --- a/learning_resources_search/tasks.py +++ b/learning_resources_search/tasks.py @@ -276,7 +276,14 @@ def send_subscription_emails(self, subscription_type, period="daily"): return self.replace(email_tasks) -@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +@app.task( + bind=True, + max_retries=3, + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def index_learning_resources(self, ids, resource_type, index_types): """ Index courses @@ -290,9 +297,9 @@ def index_learning_resources(self, ids, resource_type, index_types): """ try: api.index_learning_resources(ids, resource_type, index_types) - except Exception as exc: - log.error(f"Task {self.request.id} failed: {exc}") - raise self.retry(exc=exc, countdown=5) + time.sleep(5) + except Exception as exc: # noqa: BLE001 + raise self.retry(exc=exc, countdown=5) # noqa: B904 @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -345,7 +352,14 @@ def bulk_deindex_percolators(ids): return error -@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +@app.task( + bind=True, + max_retries=3, + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def bulk_index_percolate_queries(self, percolate_ids, index_types): """ Bulk index percolate queries for provided percolate query Ids @@ -363,10 +377,9 @@ def bulk_index_percolate_queries(self, percolate_ids, index_types): PERCOLATE_INDEX_TYPE, index_types, ) - time.sleep(1) - except Exception as exc: - log.error(f"Task {self.request.id} failed: {exc}") - raise self.retry(exc=exc, countdown=5) + time.sleep(5) + except Exception as exc: # noqa: BLE001 + raise self.retry(exc=exc, countdown=5) # noqa: B904 @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -392,7 +405,14 @@ def index_course_content_files(course_ids, index_types): return error -@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +@app.task( + bind=True, + max_retries=3, + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def index_content_files( self, content_file_ids, @@ -413,10 +433,9 @@ def index_content_files( api.index_content_files( content_file_ids, learning_resource_id, index_types=index_types ) - time.sleep(1) - except Exception as exc: - log.error(f"Task {self.request.id} failed: {exc}") - raise self.retry(exc=exc, countdown=5) + time.sleep(5) + except Exception as exc: # noqa: BLE001 + raise self.retry(exc=exc, countdown=5) # noqa: B904 @app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m") @@ -508,109 +527,113 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags): """ Wipe and recreate index and mapping, and index all items. """ + try: + if not remove_existing_reindexing_tags: + existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) - if not remove_existing_reindexing_tags: - existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes) - - if existing_reindexing_indexes: - error = ( - f"Reindexing in progress. Reindexing indexes already exist: " - f"{', '.join(existing_reindexing_indexes)}" - ) - log.exception(error) - return error - - api.delete_orphaned_indexes( - indexes, delete_reindexing_tags=remove_existing_reindexing_tags - ) - - new_backing_indices = { - obj_type: api.create_backing_index(obj_type) for obj_type in indexes - } + if existing_reindexing_indexes: + error = ( + f"Reindexing in progress. Reindexing indexes already exist: " + f"{', '.join(existing_reindexing_indexes)}" + ) + log.exception(error) + return error - # Do the indexing on the temp index - log.info("starting to index %s objects...", ", ".join(indexes)) + api.delete_orphaned_indexes( + indexes, delete_reindexing_tags=remove_existing_reindexing_tags + ) - index_tasks = [] + new_backing_indices = { + obj_type: api.create_backing_index(obj_type) for obj_type in indexes + } - if PERCOLATE_INDEX_TYPE in indexes: - index_tasks = index_tasks + [ - bulk_index_percolate_queries.s( - percolate_ids, IndexestoUpdate.reindexing_index.value - ) - for percolate_ids in chunks( - PercolateQuery.objects.order_by("id").values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, - ) - ] + # Do the indexing on the temp index + log.info("starting to index %s objects...", ", ".join(indexes)) - if COURSE_TYPE in indexes: - blocklisted_ids = load_course_blocklist() - index_tasks = index_tasks + [ - index_learning_resources.si( - ids, - COURSE_TYPE, - index_types=IndexestoUpdate.reindexing_index.value, - ) - for ids in chunks( - Course.objects.filter(learning_resource__published=True) - .exclude(learning_resource__readable_id=blocklisted_ids) - .order_by("learning_resource_id") - .values_list("learning_resource_id", flat=True), - chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, - ) - ] + index_tasks = [] - for course in ( - Course.objects.filter(learning_resource__published=True) - .filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES) - .exclude(learning_resource__readable_id=blocklisted_ids) - .order_by("learning_resource_id") - ): + if PERCOLATE_INDEX_TYPE in indexes: index_tasks = index_tasks + [ - index_content_files.si( - ids, - course.learning_resource_id, - index_types=IndexestoUpdate.reindexing_index.value, + bulk_index_percolate_queries.si( + percolate_ids, IndexestoUpdate.reindexing_index.value ) - for ids in chunks( - ContentFile.objects.filter( - run__learning_resource_id=course.learning_resource_id, - published=True, - run__published=True, - ) - .order_by("id") - .values_list("id", flat=True), - chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE, + for percolate_ids in chunks( + PercolateQuery.objects.order_by("id").values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, ) ] - for resource_type in [ - PROGRAM_TYPE, - PODCAST_TYPE, - PODCAST_EPISODE_TYPE, - LEARNING_PATH_TYPE, - VIDEO_TYPE, - VIDEO_PLAYLIST_TYPE, - ]: - if resource_type in indexes: + if COURSE_TYPE in indexes: + blocklisted_ids = load_course_blocklist() index_tasks = index_tasks + [ index_learning_resources.si( ids, - resource_type, + COURSE_TYPE, index_types=IndexestoUpdate.reindexing_index.value, ) for ids in chunks( - LearningResource.objects.filter( - published=True, resource_type=resource_type - ) - .order_by("id") - .values_list("id", flat=True), + Course.objects.filter(learning_resource__published=True) + .exclude(learning_resource__readable_id=blocklisted_ids) + .order_by("learning_resource_id") + .values_list("learning_resource_id", flat=True), chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, ) ] - index_tasks = celery.group(index_tasks) + for course in ( + Course.objects.filter(learning_resource__published=True) + .filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES) + .exclude(learning_resource__readable_id=blocklisted_ids) + .order_by("learning_resource_id") + ): + index_tasks = index_tasks + [ + index_content_files.si( + ids, + course.learning_resource_id, + index_types=IndexestoUpdate.reindexing_index.value, + ) + for ids in chunks( + ContentFile.objects.filter( + run__learning_resource_id=course.learning_resource_id, + published=True, + run__published=True, + ) + .order_by("id") + .values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE, + ) + ] + + for resource_type in [ + PROGRAM_TYPE, + PODCAST_TYPE, + PODCAST_EPISODE_TYPE, + LEARNING_PATH_TYPE, + VIDEO_TYPE, + VIDEO_PLAYLIST_TYPE, + ]: + if resource_type in indexes: + index_tasks = index_tasks + [ + index_learning_resources.si( + ids, + resource_type, + index_types=IndexestoUpdate.reindexing_index.value, + ) + for ids in chunks( + LearningResource.objects.filter( + published=True, resource_type=resource_type + ) + .order_by("id") + .values_list("id", flat=True), + chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE, + ) + ] + + index_tasks = celery.group(index_tasks) + except: # noqa: E722 + error = "start_recreate_index threw an error" + log.exception(error) + return error # Use self.replace so that code waiting on this task will also wait on the indexing # and finish tasks @@ -829,7 +852,14 @@ def get_update_learning_resource_tasks(resource_type): ] -@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) +@app.task( + bind=True, + max_retries=3, + retry_backoff=True, + rate_limit="600/m", + acks_late=True, + reject_on_worker_lost=True, +) def finish_recreate_index(self, results, backing_indices): """ Swap reindex backing index with default backing index @@ -848,7 +878,7 @@ def finish_recreate_index(self, results, backing_indices): except RequestError as ex: raise RetryError(str(ex)) from ex msg = f"Errors occurred during recreate_index: {errors}" - raise ReindexError(msg) + raise ReindexError(msg) # noqa: TRY301 log.info( "Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501 @@ -860,9 +890,8 @@ def finish_recreate_index(self, results, backing_indices): raise RetryError(str(ex)) from ex log.info("recreate_index has finished successfully!") clear_search_cache() - except Exception as exc: - log.error(f"Task {self.request.id} failed: {exc}") - raise self.retry(exc=exc, countdown=5) + except Exception as exc: # noqa: BLE001 + raise self.retry(exc=exc, countdown=5) # noqa: B904 def _generate_subscription_digest_subject( @@ -934,27 +963,3 @@ def attempt_send_digest_email_batch(user_template_items): "short_subject": short_subject, }, ) - - -@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) -def finish_long_running_task(self): - time.sleep(10) - log.error("Finished") - - -@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True) -def long_running_subtask(self, seconds): - try: - log.info(f"Starting task {self.request.id}, sleeping for {seconds} seconds") - time.sleep(seconds) - log.info(f"Task {self.request.id} completed") - return f"Task completed after {seconds} seconds" - except Exception as exc: - log.error(f"Task {self.request.id} failed: {exc}") - raise self.retry(exc=exc, countdown=5) - - -@app.task(bind=True) -def long_running_task(self, seconds): - grouped_tasks = celery.group(long_running_subtask.si(seconds) for _ in range(100)) - return self.replace(celery.chain(grouped_tasks, finish_long_running_task.si())) diff --git a/main/settings_celery.py b/main/settings_celery.py index efa5a74cd..7fc24e41d 100644 --- a/main/settings_celery.py +++ b/main/settings_celery.py @@ -141,3 +141,5 @@ CELERY_RESULT_SERIALIZER = "json" CELERY_ACCEPT_CONTENT = ["json"] CELERY_TIMEZONE = "UTC" +CELERY_TASK_TASK_TRACK_STARTED = True +CELERY_TASK_SEND_SENT_EVENT = True From 1927749f084c69932fa2255f626e525558d918a7 Mon Sep 17 00:00:00 2001 From: Matt Bertrand Date: Mon, 30 Sep 2024 17:33:37 -0400 Subject: [PATCH 7/7] Get rid of custom redis command, still works --- docker-compose.services.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker-compose.services.yml b/docker-compose.services.yml index 37ff0a8cf..f43ce554e 100644 --- a/docker-compose.services.yml +++ b/docker-compose.services.yml @@ -24,7 +24,6 @@ services: profiles: - backend image: redis:7.4.0 - command: ["redis-server", "--save", "", "--appendonly", "no"] healthcheck: test: ["CMD", "redis-cli", "ping", "|", "grep", "PONG"] interval: 3s