diff --git a/tests/integration/backup_tests/helpers.py b/tests/integration/backup_tests/helpers.py index 66cb17f9..4911ee7f 100644 --- a/tests/integration/backup_tests/helpers.py +++ b/tests/integration/backup_tests/helpers.py @@ -5,7 +5,12 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed -from ..ha_tests.helpers import get_direct_mongo_client, get_replica_set_primary +from ..ha_tests.helpers import ( + TEST_COLLECTION, + TEST_DB, + get_direct_mongo_client, + get_replica_set_primary, +) S3_APP_NAME = "s3-integrator" TIMEOUT = 10 * 60 @@ -107,8 +112,8 @@ async def insert_unwanted_data(ops_test: OpsTest, app_name: str) -> None: """Inserts the data into the MongoDB cluster via primary replica.""" primary_unit = await get_replica_set_primary(ops_test, application_name=app_name) with await get_direct_mongo_client(ops_test, unit=primary_unit.name) as client: - db = client["continuous_writes_database"] - test_collection = db["test_collection"] + db = client[TEST_DB] + test_collection = db[TEST_COLLECTION] test_collection.insert_one({"unwanted_data": "bad data 1"}) test_collection.insert_one({"unwanted_data": "bad data 2"}) test_collection.insert_one({"unwanted_data": "bad data 3"}) diff --git a/tests/integration/backup_tests/test_backups.py b/tests/integration/backup_tests/test_backups.py index 2f711408..fb0c56b4 100644 --- a/tests/integration/backup_tests/test_backups.py +++ b/tests/integration/backup_tests/test_backups.py @@ -58,20 +58,32 @@ async def add_writes_to_db(ops_test: OpsTest): """Adds writes to DB before test starts and clears writes at the end of the test.""" application_unit = ops_test.model.applications[WRITE_APP].units[0] - clear_writes_action = await application_unit.run_action("clear-continuous-writes") + clear_writes_action = await application_unit.run_action( + "clear-continuous-writes", + **{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION}, + ) await clear_writes_action.wait() - start_writes_action = await application_unit.run_action("start-continuous-writes") + start_writes_action = await application_unit.run_action( + "start-continuous-writes", + **{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION}, + ) await start_writes_action.wait() time.sleep(20) - stop_writes_action = await application_unit.run_action("stop-continuous-writes") + stop_writes_action = await application_unit.run_action( + "stop-continuous-writes", + **{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION}, + ) await stop_writes_action.wait() yield - clear_writes_action = await application_unit.run_action("clear-continuous-writes") + clear_writes_action = await application_unit.run_action( + "clear-continuous-writes", + **{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION}, + ) await clear_writes_action.wait() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8f831472..96762983 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -12,6 +12,9 @@ get_application_name, ) +TEST_DB = "continuous_writes_database" +TEST_COLL = "continuous_writes_collection" + @pytest_asyncio.fixture async def continuous_writes(ops_test: OpsTest) -> None: @@ -20,15 +23,21 @@ async def continuous_writes(ops_test: OpsTest) -> None: application_unit = ops_test.model.applications[application_name].units[0] - clear_writes_action = await application_unit.run_action("clear-continuous-writes") + clear_writes_action = await application_unit.run_action( + "clear-continuous-writes", **{"db-name": TEST_DB, "coll-name": TEST_COLL} + ) await clear_writes_action.wait() - start_writes_action = await application_unit.run_action("start-continuous-writes") + start_writes_action = await application_unit.run_action( + "start-continuous-writes", **{"db-name": TEST_DB, "coll-name": TEST_COLL} + ) await start_writes_action.wait() yield - clear_writes_action = await application_unit.run_action("clear-continuous-writes") + clear_writes_action = await application_unit.run_action( + "clear-continuous-writes", **{"db-name": TEST_DB, "coll-name": TEST_COLL} + ) await clear_writes_action.wait() diff --git a/tests/integration/ha_tests/application_charm/actions.yaml b/tests/integration/ha_tests/application_charm/actions.yaml index 3d77916c..614d0fbe 100644 --- a/tests/integration/ha_tests/application_charm/actions.yaml +++ b/tests/integration/ha_tests/application_charm/actions.yaml @@ -3,9 +3,36 @@ clear-continuous-writes: description: Clear the written data. + params: + db-name: + type: string + description: name of the database to write to + default: continuous_writes_database + coll-name: + type: string + description: name of the collection to write to + default: continuous_writes_collection start-continuous-writes: description: Start continuous writes. + params: + db-name: + type: string + description: name of the database to write to + default: continuous_writes_database + coll-name: + type: string + description: name of the collection to write to + default: continuous_writes_collection stop-continuous-writes: description: Stop continuous writes. + params: + db-name: + type: string + description: name of the database to write to + default: continuous_writes_database + coll-name: + type: string + description: name of the collection to write to + default: continuous_writes_collection diff --git a/tests/integration/ha_tests/application_charm/src/charm.py b/tests/integration/ha_tests/application_charm/src/charm.py index 36db9551..27ebd047 100755 --- a/tests/integration/ha_tests/application_charm/src/charm.py +++ b/tests/integration/ha_tests/application_charm/src/charm.py @@ -24,9 +24,10 @@ logger = logging.getLogger(__name__) DATABASE_NAME = "continuous_writes_database" +COLLECTION_NAME = "continuous_writes_collection" PEER = "application-peers" -LAST_WRITTEN_FILE = "last_written_value" PROC_PID_KEY = "proc-pid" +LAST_WRITTEN_FILE = "last_written_value" class ContinuousWritesApplication(CharmBase): @@ -101,12 +102,12 @@ def _database_config(self): # Helpers # ============== - def _start_continuous_writes(self, starting_number: int) -> None: + def _start_continuous_writes(self, starting_number: int, db_name: str, coll_name: str) -> None: """Start continuous writes to the MongoDB cluster.""" if not self._database_config: return - self._stop_continuous_writes() + self._stop_continuous_writes(db_name, coll_name) # Run continuous writes in the background proc = subprocess.Popen( @@ -115,41 +116,53 @@ def _start_continuous_writes(self, starting_number: int) -> None: "src/continuous_writes.py", self._database_config["uris"], str(starting_number), + db_name, + coll_name, ] ) # Store the continuous writes process id in stored state to be able to stop it later - self.app_peer_data[PROC_PID_KEY] = str(proc.pid) + self.app_peer_data[self.proc_id_key(db_name, coll_name)] = str(proc.pid) - def _stop_continuous_writes(self) -> Optional[int]: + def _stop_continuous_writes(self, db_name: str, coll_name: str) -> Optional[int]: """Stop continuous writes to the MongoDB cluster and return the last written value.""" if not self._database_config: return None - if not self.app_peer_data.get(PROC_PID_KEY): + if not self.app_peer_data.get(self.proc_id_key(db_name, coll_name)): return None # Send a SIGTERM to the process and wait for the process to exit try: - os.kill(int(self.app_peer_data[PROC_PID_KEY]), signal.SIGTERM) + os.kill(int(self.app_peer_data[self.proc_id_key(db_name, coll_name)]), signal.SIGTERM) except ProcessLookupError: - logger.info(f"Process {PROC_PID_KEY} was killed already (or never existed)") + logger.info( + f"Process {self.proc_id_key(db_name, coll_name)} was killed already (or never existed)" + ) - del self.app_peer_data[PROC_PID_KEY] + del self.app_peer_data[self.proc_id_key(db_name, coll_name)] # read the last written_value try: for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): with attempt: - with open(LAST_WRITTEN_FILE, "r") as fd: + with open(self.last_written_filename(db_name, coll_name), "r") as fd: last_written_value = int(fd.read()) except RetryError as e: logger.exception("Unable to query the database", exc_info=e) return -1 - os.remove(LAST_WRITTEN_FILE) + os.remove(self.last_written_filename(db_name, coll_name)) return last_written_value + def proc_id_key(self, db_name: str, coll_name: str) -> str: + """Returns a process id key for the continuous writes process to a given db and coll.""" + return f"{PROC_PID_KEY}-{db_name}-{coll_name}" + + def last_written_filename(self, db_name: str, coll_name: str) -> str: + """Returns a process id key for the continuous writes process to a given db and coll.""" + return f"{LAST_WRITTEN_FILE}-{db_name}-{coll_name}" + # ============== # Handlers # ============== @@ -158,39 +171,46 @@ def _on_start(self, _) -> None: """Handle the start event.""" self.unit.status = WaitingStatus() - def _on_clear_continuous_writes_action(self, _) -> None: + def _on_clear_continuous_writes_action(self, event) -> None: """Handle the clear continuous writes action event.""" if not self._database_config: return - self._stop_continuous_writes() + db_name = event.params.get("db-name") or DATABASE_NAME + coll_name = event.params.get("coll-name") or COLLECTION_NAME + + self._stop_continuous_writes(db_name, coll_name) client = MongoClient(self._database_config["uris"]) db = client[DATABASE_NAME] # collection for continuous writes - test_collection = db["test_collection"] + test_collection = db[coll_name] test_collection.drop() # collection for replication tests - test_collection = db["test_ubuntu_collection"] + test_collection = db[db_name] test_collection.drop() client.close() - def _on_start_continuous_writes_action(self, _) -> None: + def _on_start_continuous_writes_action(self, event) -> None: """Handle the start continuous writes action event.""" if not self._database_config: return - self._start_continuous_writes(1) + db_name = event.params.get("db-name") or DATABASE_NAME + coll_name = event.params.get("coll-name") or COLLECTION_NAME + self._start_continuous_writes(1, db_name, coll_name) def _on_stop_continuous_writes_action(self, event: ActionEvent) -> None: """Handle the stop continuous writes action event.""" if not self._database_config: return event.set_results({"writes": -1}) - writes = self._stop_continuous_writes() + db_name = event.params.get("db-name") or DATABASE_NAME + coll_name = event.params.get("coll-name") or COLLECTION_NAME + writes = self._stop_continuous_writes(db_name, coll_name) event.set_results({"writes": writes or -1}) def _on_database_created(self, _) -> None: diff --git a/tests/integration/ha_tests/application_charm/src/continuous_writes.py b/tests/integration/ha_tests/application_charm/src/continuous_writes.py index f8aff76f..a249fe28 100644 --- a/tests/integration/ha_tests/application_charm/src/continuous_writes.py +++ b/tests/integration/ha_tests/application_charm/src/continuous_writes.py @@ -9,6 +9,9 @@ from pymongo.errors import PyMongoError from pymongo.write_concern import WriteConcern +DEFAULT_DB_NAME = "continuous_writes_database" +DEFAULT_COLL_NAME = "continuous_writes_collection" + run = True @@ -17,7 +20,12 @@ def sigterm_handler(_signo, _stack_frame): run = False -def continous_writes(connection_string: str, starting_number: int): +def continous_writes( + connection_string: str, + starting_number: int, + db_name: str, + coll_name: str, +): write_value = starting_number while run: @@ -25,8 +33,8 @@ def continous_writes(connection_string: str, starting_number: int): connection_string, socketTimeoutMS=5000, ) - db = client["continuous_writes_database"] - test_collection = db["test_collection"] + db = client[db_name] + test_collection = db[coll_name] try: # insert item into collection if it doesn't already exist test_collection.with_options( @@ -48,14 +56,16 @@ def continous_writes(connection_string: str, starting_number: int): write_value += 1 - with open("last_written_value", "w") as fd: + with open(f"last_written_value-{db_name}-{coll_name}", "w") as fd: fd.write(str(write_value - 1)) def main(): connection_string = sys.argv[1] starting_number = int(sys.argv[2]) - continous_writes(connection_string, starting_number) + db_name = DEFAULT_DB_NAME if len(sys.argv) < 4 else sys.argv[3] + coll_name = DEFAULT_COLL_NAME if len(sys.argv) < 5 else sys.argv[4] + continous_writes(connection_string, starting_number, db_name, coll_name) if __name__ == "__main__": diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 23721255..af2dfe18 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -49,7 +49,7 @@ APPLICATION_DEFAULT_APP_NAME = "application" TIMEOUT = 15 * 60 TEST_DB = "continuous_writes_database" -TEST_COLLECTION = "test_collection" +TEST_COLLECTION = "continuous_writes_collection" ANOTHER_DATABASE_APP_NAME = "another-database" EXCLUDED_APPS = [ANOTHER_DATABASE_APP_NAME] diff --git a/tests/integration/upgrades/test_sharding_upgrades.py b/tests/integration/upgrades/test_sharding_upgrades.py new file mode 100644 index 00000000..e6cd986e --- /dev/null +++ b/tests/integration/upgrades/test_sharding_upgrades.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import pytest +from pytest_operator.plugin import OpsTest + +from ..ha_tests.helpers import ( + deploy_and_scale_application, + get_direct_mongo_client, + isolate_instance_from_cluster, + remove_instance_isolation, + wait_until_unit_in_status, +) +from ..helpers import MONGOS_PORT, mongodb_uri +from ..sharding_tests import writes_helpers +from ..sharding_tests.helpers import deploy_cluster_components, integrate_cluster +from .helpers import assert_successful_run_upgrade_sequence, backup_helpers + +SHARD_ONE_DB_NAME = "shard_one_db" +SHARD_ONE_COLL_NAME = "test_collection" +SHARD_TWO_DB_NAME = "shard_two_db" +SHARD_TWO_COLL_NAME = "test_collection" +SHARD_ONE_APP_NAME = "shard-one" +SHARD_TWO_APP_NAME = "shard-two" +CONFIG_SERVER_APP_NAME = "config-server" +CLUSTER_COMPONENTS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME, CONFIG_SERVER_APP_NAME] +SHARD_APPS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME] +WRITE_APP = "application" +TIMEOUT = 15 * 60 + + +@pytest.fixture() +async def add_writes_to_shards(ops_test: OpsTest): + """Adds writes to each shard before test starts and clears writes at the end of the test.""" + application_unit = ops_test.model.applications[WRITE_APP].units[0] + + start_writes_action = await application_unit.run_action( + "start-continuous-writes", + **{"db-name": SHARD_ONE_DB_NAME, "coll-name": SHARD_ONE_COLL_NAME}, + ) + await start_writes_action.wait() + + start_writes_action = await application_unit.run_action( + "start-continuous-writes", + **{"db-name": SHARD_TWO_DB_NAME, "coll-name": SHARD_TWO_COLL_NAME}, + ) + await start_writes_action.wait() + + # # move continuous writes so they are present on each shard + mongos_client = await get_direct_mongo_client( + ops_test, app_name=CONFIG_SERVER_APP_NAME, mongos=True + ) + mongos_client.admin.command("movePrimary", SHARD_ONE_DB_NAME, to=SHARD_ONE_APP_NAME) + mongos_client.admin.command("movePrimary", SHARD_TWO_DB_NAME, to=SHARD_TWO_APP_NAME) + + yield + clear_writes_action = await application_unit.run_action( + "clear-continuous-writes", + **{"db-name": SHARD_ONE_DB_NAME, "coll-name": SHARD_ONE_COLL_NAME}, + ) + await clear_writes_action.wait() + + clear_writes_action = await application_unit.run_action( + "clear-continuous-writes", + **{"db-name": SHARD_TWO_DB_NAME, "coll-name": SHARD_TWO_APP_NAME}, + ) + await clear_writes_action.wait() + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest) -> None: + """Build deploy, and integrate, a sharded cluster.""" + await deploy_and_scale_application(ops_test) + + await deploy_cluster_components(ops_test) + + await ops_test.model.wait_for_idle( + apps=CLUSTER_COMPONENTS, + idle_period=20, + raise_on_blocked=False, + raise_on_error=False, + ) + + await integrate_cluster(ops_test) + await ops_test.model.wait_for_idle( + apps=CLUSTER_COMPONENTS, + idle_period=20, + timeout=TIMEOUT, + ) + + # configure write app to use mongos uri + mongos_uri = await mongodb_uri(ops_test, app_name=CONFIG_SERVER_APP_NAME, port=MONGOS_PORT) + await ops_test.model.applications[WRITE_APP].set_config({"mongos-uri": mongos_uri}) + + +@pytest.mark.skip() +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_upgrade(ops_test: OpsTest, add_writes_to_shards) -> None: + """Verify that the sharded cluster can be safely upgraded without losing writes.""" + new_charm = await ops_test.build_charm(".") + + for sharding_component in CLUSTER_COMPONENTS: + await assert_successful_run_upgrade_sequence( + ops_test, sharding_component, new_charm=new_charm + ) + + application_unit = ops_test.model.applications[WRITE_APP].units[0] + stop_writes_action = await application_unit.run_action( + "stop-continuous-writes", + **{"db-name": SHARD_ONE_DB_NAME, "coll-name": SHARD_ONE_COLL_NAME}, + ) + await stop_writes_action.wait() + shard_one_expected_writes = int(stop_writes_action.results["writes"]) + stop_writes_action = await application_unit.run_action( + "stop-continuous-writes", + **{"db-name": SHARD_TWO_DB_NAME, "coll-name": SHARD_TWO_COLL_NAME}, + ) + await stop_writes_action.wait() + shard_two_total_expected_writes = int(stop_writes_action.results["writes"]) + + actual_shard_one_writes = await writes_helpers.count_shard_writes( + ops_test, + config_server_name=CONFIG_SERVER_APP_NAME, + db_name=SHARD_ONE_DB_NAME, + ) + actual_shard_two_writes = await writes_helpers.count_shard_writes( + ops_test, + config_server_name=CONFIG_SERVER_APP_NAME, + db_name=SHARD_TWO_DB_NAME, + ) + + assert ( + actual_shard_one_writes == shard_one_expected_writes + ), "missed writes during upgrade procedure." + assert ( + actual_shard_two_writes == shard_two_total_expected_writes + ), "missed writes during upgrade procedure." + + +@pytest.mark.skip() +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_pre_upgrade_check_success(ops_test: OpsTest) -> None: + """Verify that the pre-upgrade check succeeds in the happy path.""" + for sharding_component in CLUSTER_COMPONENTS: + leader_unit = await backup_helpers.get_leader_unit(ops_test, sharding_component) + action = await leader_unit.run_action("pre-upgrade-check") + await action.wait() + assert action.status == "completed", "pre-upgrade-check failed, expected to succeed." + + +@pytest.mark.skip() +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_pre_upgrade_check_failure(ops_test: OpsTest, chaos_mesh) -> None: + """Verify that the pre-upgrade check fails if there is a problem with one of the shards.""" + leader_unit = await backup_helpers.get_leader_unit(ops_test, SHARD_TWO_APP_NAME) + + non_leader_unit = None + for unit in ops_test.model.applications[SHARD_TWO_APP_NAME].units: + if unit != leader_unit: + non_leader_unit = unit + break + + isolate_instance_from_cluster(ops_test, non_leader_unit.name) + await wait_until_unit_in_status( + ops_test, non_leader_unit, leader_unit, "(not reachable/healthy)" + ) + + for sharding_component in CLUSTER_COMPONENTS: + leader_unit = await backup_helpers.get_leader_unit(ops_test, sharding_component) + action = await leader_unit.run_action("pre-upgrade-check") + await action.wait() + assert action.status == "completed", "pre-upgrade-check failed, expected to succeed." + + # restore network after test + remove_instance_isolation(ops_test) + await ops_test.model.wait_for_idle( + apps=[SHARD_TWO_APP_NAME], status="active", timeout=1000, idle_period=30 + ) diff --git a/tests/integration/upgrades/test_upgrades.py b/tests/integration/upgrades/test_upgrades.py index 42ec405a..8a6a7d59 100644 --- a/tests/integration/upgrades/test_upgrades.py +++ b/tests/integration/upgrades/test_upgrades.py @@ -3,6 +3,7 @@ # See LICENSE file for licensing details. import logging +from pathlib import Path import pytest from pytest_operator.plugin import OpsTest @@ -53,7 +54,7 @@ async def test_build_and_deploy(ops_test: OpsTest): @pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_successful_upgrade(ops_test: OpsTest, continuous_writes) -> None: - new_charm = await ops_test.build_charm(".") + new_charm: Path = await ops_test.build_charm(".") db_app_name = await get_app_name(ops_test) await assert_successful_run_upgrade_sequence(ops_test, db_app_name, new_charm=new_charm)