Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry recreate_index subtasks on worker exit #1615

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docker-compose.apps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ services:
command: >
/bin/bash -c '
sleep 3;
celery -A main.celery:app worker -Q default -B -l ${MITOL_LOG_LEVEL:-INFO} &
celery -A main.celery:app worker -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}'
celery -A main.celery:app worker -B -E -Q edx_content,default -l ${MITOL_LOG_LEVEL:-INFO}'
depends_on:
db:
condition: service_healthy
Expand All @@ -66,3 +65,5 @@ services:
volumes:
- .:/src
- django_media:/var/media
environment:
REMAP_SIGTERM: SIGQUIT
122 changes: 69 additions & 53 deletions learning_resources_search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import itertools
import logging
import time
from collections import OrderedDict
from contextlib import contextmanager
from itertools import groupby
Expand Down Expand Up @@ -275,8 +276,15 @@ def send_subscription_emails(self, subscription_type, period="daily"):
return self.replace(email_tasks)


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
def index_learning_resources(ids, resource_type, index_types):
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def index_learning_resources(self, ids, resource_type, index_types):
"""
Index courses

Expand All @@ -288,14 +296,10 @@ def index_learning_resources(ids, resource_type, index_types):

"""
try:
with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS):
api.index_learning_resources(ids, resource_type, index_types)
except (RetryError, Ignore):
raise
except: # noqa: E722
error = "index_courses threw an error"
log.exception(error)
return error
api.index_learning_resources(ids, resource_type, index_types)
time.sleep(5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand Down Expand Up @@ -348,8 +352,15 @@ def bulk_deindex_percolators(ids):
return error


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
def bulk_index_percolate_queries(percolate_ids, index_types):
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def bulk_index_percolate_queries(self, percolate_ids, index_types):
"""
Bulk index percolate queries for provided percolate query Ids

Expand All @@ -366,12 +377,9 @@ def bulk_index_percolate_queries(percolate_ids, index_types):
PERCOLATE_INDEX_TYPE,
index_types,
)
except (RetryError, Ignore):
raise
except: # noqa: E722
error = "bulk_index_percolate_queries threw an error"
log.exception(error)
return error
time.sleep(5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand All @@ -397,8 +405,16 @@ def index_course_content_files(course_ids, index_types):
return error


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def index_content_files(
self,
content_file_ids,
learning_resource_id,
index_types=IndexestoUpdate.all_indexes.value,
Expand All @@ -414,16 +430,12 @@ def index_content_files(

"""
try:
with wrap_retry_exception(*SEARCH_CONN_EXCEPTIONS):
api.index_content_files(
content_file_ids, learning_resource_id, index_types=index_types
)
except (RetryError, Ignore):
raise
except: # noqa: E722
error = "index_content_files threw an error"
log.exception(error)
return error
api.index_content_files(
content_file_ids, learning_resource_id, index_types=index_types
)
time.sleep(5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand Down Expand Up @@ -841,41 +853,45 @@ def get_update_learning_resource_tasks(resource_type):


@app.task(
acks_late=True,
reject_on_worker_lost=True,
autoretry_for=(RetryError,),
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def finish_recreate_index(results, backing_indices):
def finish_recreate_index(self, results, backing_indices):
"""
Swap reindex backing index with default backing index

Args:
results (list or bool): Results saying whether the error exists
backing_indices (dict): The backing OpenSearch indices keyed by object type
"""
errors = merge_strings(results)
if errors:
try:
api.delete_orphaned_indexes(
list(backing_indices.keys()), delete_reindexing_tags=True
)
except RequestError as ex:
raise RetryError(str(ex)) from ex
msg = f"Errors occurred during recreate_index: {errors}"
raise ReindexError(msg)
try:
errors = merge_strings(results)
if errors:
try:
api.delete_orphaned_indexes(
list(backing_indices.keys()), delete_reindexing_tags=True
)
except RequestError as ex:
raise RetryError(str(ex)) from ex
msg = f"Errors occurred during recreate_index: {errors}"
raise ReindexError(msg) # noqa: TRY301

log.info(
"Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501
)
for obj_type, backing_index in backing_indices.items():
try:
api.switch_indices(backing_index, obj_type)
except RequestError as ex:
raise RetryError(str(ex)) from ex
log.info("recreate_index has finished successfully!")
clear_search_cache()
log.info(
"Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501
)
for obj_type, backing_index in backing_indices.items():
try:
api.switch_indices(backing_index, obj_type)
except RequestError as ex:
raise RetryError(str(ex)) from ex
log.info("recreate_index has finished successfully!")
clear_search_cache()
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


def _generate_subscription_digest_subject(
Expand Down
Loading