Skip to content

Commit

Permalink
recreate_index breakdown in celery tasks (#4968)
Browse files Browse the repository at this point in the history
* Run recreate index through a set celery tasks
  • Loading branch information
arslanashraf7 committed Jul 20, 2021
1 parent 9453e39 commit 5abb2ad
Show file tree
Hide file tree
Showing 17 changed files with 444 additions and 121 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ MAILGUN_RECIPIENT_OVERRIDE=
MAILGUN_FROM_EMAIL=
MAILGUN_URL=
ELASTICSEARCH_INDEX=
ELASTICSEARCH_INDEXING_CHUNK_SIZE=100
6 changes: 5 additions & 1 deletion app.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@
"ELASTICSEARCH_SHARD_COUNT": {
"description": "Configurable shard cound for Elasticsearch"
},
"ELASTICSEARCH_INDEXING_CHUNK_SIZE": {
"description": "Chunk size to use for Elasticsearch indexing tasks",
"required": false
},
"ELASTICSEARCH_INDEX": {
"description": "Index to use on Elasticsearch"
},
Expand Down Expand Up @@ -261,7 +265,7 @@
"stack": "heroku-20",
"repository": "https://github.com/mitodl/micromasters",
"scripts": {
"postdeploy": "./manage.py migrate --noinput && ./manage.py recreate_index"
"postdeploy": "./manage.py migrate --noinput"
},
"success_url": "/",
"website": "https://github.com/mitodl/micromasters"
Expand Down
1 change: 1 addition & 0 deletions micromasters/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@
if not ELASTICSEARCH_INDEX:
raise ImproperlyConfigured("Missing ELASTICSEARCH_INDEX")
ELASTICSEARCH_HTTP_AUTH = get_string("ELASTICSEARCH_HTTP_AUTH", None)
ELASTICSEARCH_INDEXING_CHUNK_SIZE = get_int("ELASTICSEARCH_INDEXING_CHUNK_SIZE", 100)
ELASTICSEARCH_SHARD_COUNT = get_int('ELASTICSEARCH_SHARD_COUNT', 5)

# django-role-permissions
Expand Down
31 changes: 31 additions & 0 deletions micromasters/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,34 @@ def generate_md5(unicode):
hasher = hashlib.md5()
hasher.update(unicode)
return hasher.hexdigest()


def merge_strings(list_or_str):
"""
Recursively go through through nested lists of strings and merge into a flattened list.
Args:
list_or_str (any): A list of strings or a string
Returns:
list of str: A list of strings
"""

list_to_return = []
_merge_strings(list_or_str, list_to_return)
return list_to_return


def _merge_strings(list_or_str, list_to_return):
"""
Recursively go through nested lists of strings and merge into a flattened list.
Args:
list_or_str (any): A list of strings or a string
list_to_return (list of str): The list the strings will be added to
"""
if isinstance(list_or_str, list):
for item in list_or_str:
_merge_strings(item, list_to_return)
elif list_or_str is not None:
list_to_return.append(list_or_str)
18 changes: 17 additions & 1 deletion micromasters/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
RequestFactory,
)
import pytz
import pytest
from rest_framework import status
from rest_framework.exceptions import ValidationError

Expand Down Expand Up @@ -43,7 +44,7 @@
serialize_model_object,
pop_keys_from_dict,
pop_matching_keys_from_dict,
generate_md5,
generate_md5, merge_strings,
)
from search.base import MockedESTestCase

Expand Down Expand Up @@ -347,3 +348,18 @@ def test_generate_md5():
assert len(md5_hash) == 32
repeat_md5_hash = generate_md5(bytes_to_hash)
assert md5_hash == repeat_md5_hash


@pytest.mark.parametrize(
"list_or_string,output",
[
["str", ["str"]],
[["str", None, [None]], ["str"]],
[[["a"], "b", ["c", "d"], "e"], ["a", "b", "c", "d", "e"]],
],
)
def test_merge_strings(list_or_string, output):
"""
merge_strings should flatten a nested list of strings
"""
assert merge_strings(list_or_string) == output
20 changes: 17 additions & 3 deletions search/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
TestCase,
)

from dashboard.models import ProgramEnrollment
from search import tasks
from search.indexing_api import recreate_index, delete_indices
from search.indexing_api import delete_indices, create_backing_indices
from search.models import PercolateQuery


class ESTestCase(TestCase):
Expand All @@ -19,7 +21,7 @@ class ESTestCase(TestCase):
@classmethod
def setUpClass(cls):
# Make sure index exists when signals are run.
recreate_index()
reindex_test_es_data()
super().setUpClass()

def setUp(self):
Expand All @@ -28,7 +30,7 @@ def setUp(self):
# because the test data is contained in a transaction
# which is reverted after each test runs, so signals don't get run
# that keep ES up to date.
recreate_index()
reindex_test_es_data()
super().setUp()

@classmethod
Expand Down Expand Up @@ -73,3 +75,15 @@ def tearDownClass(cls):
patcher.stop()

super().tearDownClass()


def reindex_test_es_data():
"""
Recreates the ElasticSearch indices for the live data used in tests
"""
backing_indices = create_backing_indices()
tasks.bulk_index_program_enrollments(ProgramEnrollment.objects.order_by("id").values_list("id", flat=True),
backing_indices[0][0], backing_indices[1][0])
tasks.bulk_index_percolate_queries(PercolateQuery.objects.order_by("id").values_list("id", flat=True),
backing_indices[2][0])
tasks.finish_recreate_index(results=[], backing_indices=backing_indices)
6 changes: 6 additions & 0 deletions search/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ class PercolateException(Exception):
"""
Exception for percolate failures
"""


class RetryException(Exception):
"""
A special exception used to signal that celery can retry this task
"""
130 changes: 43 additions & 87 deletions search/indexing_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,23 @@

from profiles.models import Profile
from profiles.serializers import ProfileSerializer
from dashboard.models import ProgramEnrollment
from dashboard.serializers import UserProgramSearchSerializer
from micromasters.utils import (
chunks,
dict_with_keys,
now_in_utc,
)
from search.connection import (
ALL_INDEX_TYPES,
get_aliases,
get_default_alias,
get_conn,
make_alias_name,
make_backing_index_name,
GLOBAL_DOC_TYPE,
PERCOLATE_INDEX_TYPE,
PRIVATE_ENROLLMENT_INDEX_TYPE,
PUBLIC_ENROLLMENT_INDEX_TYPE,
PUBLIC_ENROLLMENT_INDEX_TYPE, make_backing_index_name,
)
from search.exceptions import ReindexException
from search.models import PercolateQuery
from search.util import (
fix_nested_filter,
open_json_stream,
Expand Down Expand Up @@ -571,88 +567,6 @@ def delete_indices():
conn.indices.delete_alias(index=INDEX_WILDCARD, name=alias)


# pylint: disable=too-many-locals
def recreate_index():
"""
Wipe and recreate index and mapping, and index all items.
"""
conn = get_conn(verify=False)

# Create new backing index for reindex
new_backing_public_index = make_backing_index_name()
new_backing_private_index = make_backing_index_name()
new_backing_percolate_index = make_backing_index_name()
backing_index_tuples = [
(new_backing_public_index, PUBLIC_ENROLLMENT_INDEX_TYPE),
(new_backing_private_index, PRIVATE_ENROLLMENT_INDEX_TYPE),
(new_backing_percolate_index, PERCOLATE_INDEX_TYPE),
]
for backing_index, index_type in backing_index_tuples:
# Clear away temp alias so we can reuse it, and create mappings
clear_and_create_index(backing_index, index_type=index_type)
temp_alias = make_alias_name(index_type, is_reindexing=True)
if conn.indices.exists_alias(name=temp_alias):
# Deletes both alias and backing indexes
conn.indices.delete_alias(index=INDEX_WILDCARD, name=temp_alias)

# Point temp_alias toward new backing index
conn.indices.put_alias(index=backing_index, name=temp_alias)

# Do the indexing on the temp index
start = now_in_utc()
try:
enrollment_count = ProgramEnrollment.objects.count()
log.info("Indexing %d program enrollments...", enrollment_count)
index_program_enrolled_users(
ProgramEnrollment.objects.iterator(),
public_indices=[new_backing_public_index],
private_indices=[new_backing_private_index],
)

log.info("Indexing %d percolator queries...", PercolateQuery.objects.exclude(is_deleted=True).count())
_index_chunks(
_get_percolate_documents(PercolateQuery.objects.exclude(is_deleted=True).iterator()),
index=new_backing_percolate_index,
)

# Point default alias to new index and delete the old backing index, if any
log.info("Done with temporary index. Pointing default aliases to newly created backing indexes...")

for new_backing_index, index_type in backing_index_tuples:
actions = []
old_backing_indexes = []
default_alias = make_alias_name(index_type, is_reindexing=False)
if conn.indices.exists_alias(name=default_alias):
# Should only be one backing index in normal circumstances
old_backing_indexes = list(conn.indices.get_alias(name=default_alias).keys())
for index in old_backing_indexes:
actions.append({
"remove": {
"index": index,
"alias": default_alias,
}
})
actions.append({
"add": {
"index": new_backing_index,
"alias": default_alias,
},
})
conn.indices.update_aliases({
"actions": actions
})

refresh_index(new_backing_index)
for index in old_backing_indexes:
conn.indices.delete(index)
finally:
for new_backing_index, index_type in backing_index_tuples:
temp_alias = make_alias_name(index_type, is_reindexing=True)
conn.indices.delete_alias(name=temp_alias, index=new_backing_index)
end = now_in_utc()
log.info("recreate_index took %d seconds", (end - start).total_seconds())


def _serialize_percolate_query(query):
"""
Serialize PercolateQuery for Elasticsearch indexing
Expand Down Expand Up @@ -707,3 +621,45 @@ def delete_percolate_query(percolate_query_id):

for index in aliases:
_delete_item(percolate_query_id, index=index)


def delete_backing_indices(backing_indices):
"""
Remove the temporary backing indices
Args:
backing_indices (list of tuples):
A list of tuples containing the created backing indices for reindexing
"""

conn = get_conn(verify=False)
for new_backing_index, index_type in backing_indices:
temp_alias = make_alias_name(index_type, is_reindexing=True)
conn.indices.delete_alias(name=temp_alias, index=new_backing_index)


def create_backing_indices():
"""
Creates the list of tuples that contains the backing indices names and type which is used in reindexing
"""
conn = get_conn(verify=False)
new_backing_public_index = make_backing_index_name()
new_backing_private_index = make_backing_index_name()
new_backing_percolate_index = make_backing_index_name()
backing_index_tuples = [
(new_backing_public_index, PUBLIC_ENROLLMENT_INDEX_TYPE),
(new_backing_private_index, PRIVATE_ENROLLMENT_INDEX_TYPE),
(new_backing_percolate_index, PERCOLATE_INDEX_TYPE),
]
for backing_index, index_type in backing_index_tuples:
# Clear away temp alias so we can reuse it, and create mappings
clear_and_create_index(backing_index, index_type=index_type)
temp_alias = make_alias_name(index_type, is_reindexing=True)
if conn.indices.exists_alias(name=temp_alias):
# Deletes both alias and backing indexes
conn.indices.delete_alias(index=INDEX_WILDCARD, name=temp_alias)

# Point temp_alias toward new backing index
conn.indices.put_alias(index=backing_index, name=temp_alias)

return backing_index_tuples
Loading

0 comments on commit 5abb2ad

Please sign in to comment.