Skip to content

Commit

Permalink
Working but only with cold shutdowns and short sleep inetrvals in eac…
Browse files Browse the repository at this point in the history
…h subtask
  • Loading branch information
mbertrand committed Sep 30, 2024
1 parent 4c3c448 commit 320df92
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 127 deletions.
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,3 @@ services:
# DEPRECATED: legacy .env file at the repo root
- path: .env
required: false
environment:
REMAP_SIGTERM: SIGQUIT
255 changes: 130 additions & 125 deletions learning_resources_search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,14 @@ def send_subscription_emails(self, subscription_type, period="daily"):
return self.replace(email_tasks)


@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True)
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def index_learning_resources(self, ids, resource_type, index_types):
"""
Index courses
Expand All @@ -290,9 +297,9 @@ def index_learning_resources(self, ids, resource_type, index_types):
"""
try:
api.index_learning_resources(ids, resource_type, index_types)
except Exception as exc:
log.error(f"Task {self.request.id} failed: {exc}")
raise self.retry(exc=exc, countdown=5)
time.sleep(5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand Down Expand Up @@ -345,7 +352,14 @@ def bulk_deindex_percolators(ids):
return error


@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True)
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def bulk_index_percolate_queries(self, percolate_ids, index_types):
"""
Bulk index percolate queries for provided percolate query Ids
Expand All @@ -363,10 +377,9 @@ def bulk_index_percolate_queries(self, percolate_ids, index_types):
PERCOLATE_INDEX_TYPE,
index_types,
)
time.sleep(1)
except Exception as exc:
log.error(f"Task {self.request.id} failed: {exc}")
raise self.retry(exc=exc, countdown=5)
time.sleep(5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand All @@ -392,7 +405,14 @@ def index_course_content_files(course_ids, index_types):
return error


@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True)
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def index_content_files(
self,
content_file_ids,
Expand All @@ -413,10 +433,9 @@ def index_content_files(
api.index_content_files(
content_file_ids, learning_resource_id, index_types=index_types
)
time.sleep(1)
except Exception as exc:
log.error(f"Task {self.request.id} failed: {exc}")
raise self.retry(exc=exc, countdown=5)
time.sleep(5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


@app.task(autoretry_for=(RetryError,), retry_backoff=True, rate_limit="600/m")
Expand Down Expand Up @@ -508,109 +527,113 @@ def start_recreate_index(self, indexes, remove_existing_reindexing_tags):
"""
Wipe and recreate index and mapping, and index all items.
"""
try:
if not remove_existing_reindexing_tags:
existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes)

if not remove_existing_reindexing_tags:
existing_reindexing_indexes = api.get_existing_reindexing_indexes(indexes)

if existing_reindexing_indexes:
error = (
f"Reindexing in progress. Reindexing indexes already exist: "
f"{', '.join(existing_reindexing_indexes)}"
)
log.exception(error)
return error

api.delete_orphaned_indexes(
indexes, delete_reindexing_tags=remove_existing_reindexing_tags
)

new_backing_indices = {
obj_type: api.create_backing_index(obj_type) for obj_type in indexes
}
if existing_reindexing_indexes:
error = (
f"Reindexing in progress. Reindexing indexes already exist: "
f"{', '.join(existing_reindexing_indexes)}"
)
log.exception(error)
return error

# Do the indexing on the temp index
log.info("starting to index %s objects...", ", ".join(indexes))
api.delete_orphaned_indexes(
indexes, delete_reindexing_tags=remove_existing_reindexing_tags
)

index_tasks = []
new_backing_indices = {
obj_type: api.create_backing_index(obj_type) for obj_type in indexes
}

if PERCOLATE_INDEX_TYPE in indexes:
index_tasks = index_tasks + [
bulk_index_percolate_queries.s(
percolate_ids, IndexestoUpdate.reindexing_index.value
)
for percolate_ids in chunks(
PercolateQuery.objects.order_by("id").values_list("id", flat=True),
chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE,
)
]
# Do the indexing on the temp index
log.info("starting to index %s objects...", ", ".join(indexes))

if COURSE_TYPE in indexes:
blocklisted_ids = load_course_blocklist()
index_tasks = index_tasks + [
index_learning_resources.si(
ids,
COURSE_TYPE,
index_types=IndexestoUpdate.reindexing_index.value,
)
for ids in chunks(
Course.objects.filter(learning_resource__published=True)
.exclude(learning_resource__readable_id=blocklisted_ids)
.order_by("learning_resource_id")
.values_list("learning_resource_id", flat=True),
chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE,
)
]
index_tasks = []

for course in (
Course.objects.filter(learning_resource__published=True)
.filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES)
.exclude(learning_resource__readable_id=blocklisted_ids)
.order_by("learning_resource_id")
):
if PERCOLATE_INDEX_TYPE in indexes:
index_tasks = index_tasks + [
index_content_files.si(
ids,
course.learning_resource_id,
index_types=IndexestoUpdate.reindexing_index.value,
bulk_index_percolate_queries.si(
percolate_ids, IndexestoUpdate.reindexing_index.value
)
for ids in chunks(
ContentFile.objects.filter(
run__learning_resource_id=course.learning_resource_id,
published=True,
run__published=True,
)
.order_by("id")
.values_list("id", flat=True),
chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE,
for percolate_ids in chunks(
PercolateQuery.objects.order_by("id").values_list("id", flat=True),
chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE,
)
]

for resource_type in [
PROGRAM_TYPE,
PODCAST_TYPE,
PODCAST_EPISODE_TYPE,
LEARNING_PATH_TYPE,
VIDEO_TYPE,
VIDEO_PLAYLIST_TYPE,
]:
if resource_type in indexes:
if COURSE_TYPE in indexes:
blocklisted_ids = load_course_blocklist()
index_tasks = index_tasks + [
index_learning_resources.si(
ids,
resource_type,
COURSE_TYPE,
index_types=IndexestoUpdate.reindexing_index.value,
)
for ids in chunks(
LearningResource.objects.filter(
published=True, resource_type=resource_type
)
.order_by("id")
.values_list("id", flat=True),
Course.objects.filter(learning_resource__published=True)
.exclude(learning_resource__readable_id=blocklisted_ids)
.order_by("learning_resource_id")
.values_list("learning_resource_id", flat=True),
chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE,
)
]

index_tasks = celery.group(index_tasks)
for course in (
Course.objects.filter(learning_resource__published=True)
.filter(learning_resource__etl_source__in=RESOURCE_FILE_ETL_SOURCES)
.exclude(learning_resource__readable_id=blocklisted_ids)
.order_by("learning_resource_id")
):
index_tasks = index_tasks + [
index_content_files.si(
ids,
course.learning_resource_id,
index_types=IndexestoUpdate.reindexing_index.value,
)
for ids in chunks(
ContentFile.objects.filter(
run__learning_resource_id=course.learning_resource_id,
published=True,
run__published=True,
)
.order_by("id")
.values_list("id", flat=True),
chunk_size=settings.OPENSEARCH_DOCUMENT_INDEXING_CHUNK_SIZE,
)
]

for resource_type in [
PROGRAM_TYPE,
PODCAST_TYPE,
PODCAST_EPISODE_TYPE,
LEARNING_PATH_TYPE,
VIDEO_TYPE,
VIDEO_PLAYLIST_TYPE,
]:
if resource_type in indexes:
index_tasks = index_tasks + [
index_learning_resources.si(
ids,
resource_type,
index_types=IndexestoUpdate.reindexing_index.value,
)
for ids in chunks(
LearningResource.objects.filter(
published=True, resource_type=resource_type
)
.order_by("id")
.values_list("id", flat=True),
chunk_size=settings.OPENSEARCH_INDEXING_CHUNK_SIZE,
)
]

index_tasks = celery.group(index_tasks)
except: # noqa: E722
error = "start_recreate_index threw an error"
log.exception(error)
return error

# Use self.replace so that code waiting on this task will also wait on the indexing
# and finish tasks
Expand Down Expand Up @@ -829,7 +852,14 @@ def get_update_learning_resource_tasks(resource_type):
]


@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True)
@app.task(
bind=True,
max_retries=3,
retry_backoff=True,
rate_limit="600/m",
acks_late=True,
reject_on_worker_lost=True,
)
def finish_recreate_index(self, results, backing_indices):
"""
Swap reindex backing index with default backing index
Expand All @@ -848,7 +878,7 @@ def finish_recreate_index(self, results, backing_indices):
except RequestError as ex:
raise RetryError(str(ex)) from ex
msg = f"Errors occurred during recreate_index: {errors}"
raise ReindexError(msg)
raise ReindexError(msg) # noqa: TRY301

log.info(
"Done with temporary index. Pointing default aliases to newly created backing indexes..." # noqa: E501
Expand All @@ -860,9 +890,8 @@ def finish_recreate_index(self, results, backing_indices):
raise RetryError(str(ex)) from ex
log.info("recreate_index has finished successfully!")
clear_search_cache()
except Exception as exc:
log.error(f"Task {self.request.id} failed: {exc}")
raise self.retry(exc=exc, countdown=5)
except Exception as exc: # noqa: BLE001
raise self.retry(exc=exc, countdown=5) # noqa: B904


def _generate_subscription_digest_subject(
Expand Down Expand Up @@ -934,27 +963,3 @@ def attempt_send_digest_email_batch(user_template_items):
"short_subject": short_subject,
},
)


@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True)
def finish_long_running_task(self):
time.sleep(10)
log.error("Finished")


@app.task(bind=True, max_retries=3, acks_late=True, reject_on_worker_lost=True)
def long_running_subtask(self, seconds):
try:
log.info(f"Starting task {self.request.id}, sleeping for {seconds} seconds")
time.sleep(seconds)
log.info(f"Task {self.request.id} completed")
return f"Task completed after {seconds} seconds"
except Exception as exc:
log.error(f"Task {self.request.id} failed: {exc}")
raise self.retry(exc=exc, countdown=5)


@app.task(bind=True)
def long_running_task(self, seconds):
grouped_tasks = celery.group(long_running_subtask.si(seconds) for _ in range(100))
return self.replace(celery.chain(grouped_tasks, finish_long_running_task.si()))
2 changes: 2 additions & 0 deletions main/settings_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,5 @@
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TASK_TRACK_STARTED = True
CELERY_TASK_SEND_SENT_EVENT = True

0 comments on commit 320df92

Please sign in to comment.