Skip to content

Commit

Permalink
[DPE-5369] sc int tests for upgrades (#333)
Browse files Browse the repository at this point in the history
* copy over infra from vm

* update set status lib

* add integration tests

* fmt + lint

* remove unused bits

* tests for upgrades

* wip with adding upgrade tests

* wip continous writes to both shards

* infrastructure for sharding upgrade test is functional

* update write charm + add additional cases to upgrade code

* fmt + lint

* add fixtures to conftest

* fmt + lint

* update unit tests

* personal nits

* personal nits

* fix: skip all tests

* Update tests/integration/ha_tests/application_charm/actions.yaml

Co-authored-by: Mykola Marzhan <[email protected]>

* fix: post-rebase

* fix: default values

* fix: improve default values

* fix: writing in the same DB always

* fix: typo

---------

Co-authored-by: Neha Oudin <[email protected]>
Co-authored-by: Neha Oudin <[email protected]>
Co-authored-by: Mykola Marzhan <[email protected]>
  • Loading branch information
4 people authored Oct 2, 2024
1 parent 1991e9e commit ad87f00
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 35 deletions.
11 changes: 8 additions & 3 deletions tests/integration/backup_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
from pytest_operator.plugin import OpsTest
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from ..ha_tests.helpers import get_direct_mongo_client, get_replica_set_primary
from ..ha_tests.helpers import (
TEST_COLLECTION,
TEST_DB,
get_direct_mongo_client,
get_replica_set_primary,
)

S3_APP_NAME = "s3-integrator"
TIMEOUT = 10 * 60
Expand Down Expand Up @@ -107,8 +112,8 @@ async def insert_unwanted_data(ops_test: OpsTest, app_name: str) -> None:
"""Inserts the data into the MongoDB cluster via primary replica."""
primary_unit = await get_replica_set_primary(ops_test, application_name=app_name)
with await get_direct_mongo_client(ops_test, unit=primary_unit.name) as client:
db = client["continuous_writes_database"]
test_collection = db["test_collection"]
db = client[TEST_DB]
test_collection = db[TEST_COLLECTION]
test_collection.insert_one({"unwanted_data": "bad data 1"})
test_collection.insert_one({"unwanted_data": "bad data 2"})
test_collection.insert_one({"unwanted_data": "bad data 3"})
20 changes: 16 additions & 4 deletions tests/integration/backup_tests/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,32 @@ async def add_writes_to_db(ops_test: OpsTest):
"""Adds writes to DB before test starts and clears writes at the end of the test."""
application_unit = ops_test.model.applications[WRITE_APP].units[0]

clear_writes_action = await application_unit.run_action("clear-continuous-writes")
clear_writes_action = await application_unit.run_action(
"clear-continuous-writes",
**{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION},
)
await clear_writes_action.wait()

start_writes_action = await application_unit.run_action("start-continuous-writes")
start_writes_action = await application_unit.run_action(
"start-continuous-writes",
**{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION},
)
await start_writes_action.wait()

time.sleep(20)

stop_writes_action = await application_unit.run_action("stop-continuous-writes")
stop_writes_action = await application_unit.run_action(
"stop-continuous-writes",
**{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION},
)
await stop_writes_action.wait()

yield

clear_writes_action = await application_unit.run_action("clear-continuous-writes")
clear_writes_action = await application_unit.run_action(
"clear-continuous-writes",
**{"db-name": ha_helpers.TEST_DB, "coll-name": ha_helpers.TEST_COLLECTION},
)
await clear_writes_action.wait()


Expand Down
15 changes: 12 additions & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
get_application_name,
)

TEST_DB = "continuous_writes_database"
TEST_COLL = "continuous_writes_collection"


@pytest_asyncio.fixture
async def continuous_writes(ops_test: OpsTest) -> None:
Expand All @@ -20,15 +23,21 @@ async def continuous_writes(ops_test: OpsTest) -> None:

application_unit = ops_test.model.applications[application_name].units[0]

clear_writes_action = await application_unit.run_action("clear-continuous-writes")
clear_writes_action = await application_unit.run_action(
"clear-continuous-writes", **{"db-name": TEST_DB, "coll-name": TEST_COLL}
)
await clear_writes_action.wait()

start_writes_action = await application_unit.run_action("start-continuous-writes")
start_writes_action = await application_unit.run_action(
"start-continuous-writes", **{"db-name": TEST_DB, "coll-name": TEST_COLL}
)
await start_writes_action.wait()

yield

clear_writes_action = await application_unit.run_action("clear-continuous-writes")
clear_writes_action = await application_unit.run_action(
"clear-continuous-writes", **{"db-name": TEST_DB, "coll-name": TEST_COLL}
)
await clear_writes_action.wait()


Expand Down
27 changes: 27 additions & 0 deletions tests/integration/ha_tests/application_charm/actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,36 @@

clear-continuous-writes:
description: Clear the written data.
params:
db-name:
type: string
description: name of the database to write to
default: continuous_writes_database
coll-name:
type: string
description: name of the collection to write to
default: continuous_writes_collection

start-continuous-writes:
description: Start continuous writes.
params:
db-name:
type: string
description: name of the database to write to
default: continuous_writes_database
coll-name:
type: string
description: name of the collection to write to
default: continuous_writes_collection

stop-continuous-writes:
description: Stop continuous writes.
params:
db-name:
type: string
description: name of the database to write to
default: continuous_writes_database
coll-name:
type: string
description: name of the collection to write to
default: continuous_writes_collection
56 changes: 38 additions & 18 deletions tests/integration/ha_tests/application_charm/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
logger = logging.getLogger(__name__)

DATABASE_NAME = "continuous_writes_database"
COLLECTION_NAME = "continuous_writes_collection"
PEER = "application-peers"
LAST_WRITTEN_FILE = "last_written_value"
PROC_PID_KEY = "proc-pid"
LAST_WRITTEN_FILE = "last_written_value"


class ContinuousWritesApplication(CharmBase):
Expand Down Expand Up @@ -101,12 +102,12 @@ def _database_config(self):
# Helpers
# ==============

def _start_continuous_writes(self, starting_number: int) -> None:
def _start_continuous_writes(self, starting_number: int, db_name: str, coll_name: str) -> None:
"""Start continuous writes to the MongoDB cluster."""
if not self._database_config:
return

self._stop_continuous_writes()
self._stop_continuous_writes(db_name, coll_name)

# Run continuous writes in the background
proc = subprocess.Popen(
Expand All @@ -115,41 +116,53 @@ def _start_continuous_writes(self, starting_number: int) -> None:
"src/continuous_writes.py",
self._database_config["uris"],
str(starting_number),
db_name,
coll_name,
]
)

# Store the continuous writes process id in stored state to be able to stop it later
self.app_peer_data[PROC_PID_KEY] = str(proc.pid)
self.app_peer_data[self.proc_id_key(db_name, coll_name)] = str(proc.pid)

def _stop_continuous_writes(self) -> Optional[int]:
def _stop_continuous_writes(self, db_name: str, coll_name: str) -> Optional[int]:
"""Stop continuous writes to the MongoDB cluster and return the last written value."""
if not self._database_config:
return None

if not self.app_peer_data.get(PROC_PID_KEY):
if not self.app_peer_data.get(self.proc_id_key(db_name, coll_name)):
return None

# Send a SIGTERM to the process and wait for the process to exit
try:
os.kill(int(self.app_peer_data[PROC_PID_KEY]), signal.SIGTERM)
os.kill(int(self.app_peer_data[self.proc_id_key(db_name, coll_name)]), signal.SIGTERM)
except ProcessLookupError:
logger.info(f"Process {PROC_PID_KEY} was killed already (or never existed)")
logger.info(
f"Process {self.proc_id_key(db_name, coll_name)} was killed already (or never existed)"
)

del self.app_peer_data[PROC_PID_KEY]
del self.app_peer_data[self.proc_id_key(db_name, coll_name)]

# read the last written_value
try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)):
with attempt:
with open(LAST_WRITTEN_FILE, "r") as fd:
with open(self.last_written_filename(db_name, coll_name), "r") as fd:
last_written_value = int(fd.read())
except RetryError as e:
logger.exception("Unable to query the database", exc_info=e)
return -1

os.remove(LAST_WRITTEN_FILE)
os.remove(self.last_written_filename(db_name, coll_name))
return last_written_value

def proc_id_key(self, db_name: str, coll_name: str) -> str:
"""Returns a process id key for the continuous writes process to a given db and coll."""
return f"{PROC_PID_KEY}-{db_name}-{coll_name}"

def last_written_filename(self, db_name: str, coll_name: str) -> str:
"""Returns a process id key for the continuous writes process to a given db and coll."""
return f"{LAST_WRITTEN_FILE}-{db_name}-{coll_name}"

# ==============
# Handlers
# ==============
Expand All @@ -158,39 +171,46 @@ def _on_start(self, _) -> None:
"""Handle the start event."""
self.unit.status = WaitingStatus()

def _on_clear_continuous_writes_action(self, _) -> None:
def _on_clear_continuous_writes_action(self, event) -> None:
"""Handle the clear continuous writes action event."""
if not self._database_config:
return

self._stop_continuous_writes()
db_name = event.params.get("db-name") or DATABASE_NAME
coll_name = event.params.get("coll-name") or COLLECTION_NAME

self._stop_continuous_writes(db_name, coll_name)

client = MongoClient(self._database_config["uris"])
db = client[DATABASE_NAME]

# collection for continuous writes
test_collection = db["test_collection"]
test_collection = db[coll_name]
test_collection.drop()

# collection for replication tests
test_collection = db["test_ubuntu_collection"]
test_collection = db[db_name]
test_collection.drop()

client.close()

def _on_start_continuous_writes_action(self, _) -> None:
def _on_start_continuous_writes_action(self, event) -> None:
"""Handle the start continuous writes action event."""
if not self._database_config:
return

self._start_continuous_writes(1)
db_name = event.params.get("db-name") or DATABASE_NAME
coll_name = event.params.get("coll-name") or COLLECTION_NAME
self._start_continuous_writes(1, db_name, coll_name)

def _on_stop_continuous_writes_action(self, event: ActionEvent) -> None:
"""Handle the stop continuous writes action event."""
if not self._database_config:
return event.set_results({"writes": -1})

writes = self._stop_continuous_writes()
db_name = event.params.get("db-name") or DATABASE_NAME
coll_name = event.params.get("coll-name") or COLLECTION_NAME
writes = self._stop_continuous_writes(db_name, coll_name)
event.set_results({"writes": writes or -1})

def _on_database_created(self, _) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from pymongo.errors import PyMongoError
from pymongo.write_concern import WriteConcern

DEFAULT_DB_NAME = "continuous_writes_database"
DEFAULT_COLL_NAME = "continuous_writes_collection"

run = True


Expand All @@ -17,16 +20,21 @@ def sigterm_handler(_signo, _stack_frame):
run = False


def continous_writes(connection_string: str, starting_number: int):
def continous_writes(
connection_string: str,
starting_number: int,
db_name: str,
coll_name: str,
):
write_value = starting_number

while run:
client = MongoClient(
connection_string,
socketTimeoutMS=5000,
)
db = client["continuous_writes_database"]
test_collection = db["test_collection"]
db = client[db_name]
test_collection = db[coll_name]
try:
# insert item into collection if it doesn't already exist
test_collection.with_options(
Expand All @@ -48,14 +56,16 @@ def continous_writes(connection_string: str, starting_number: int):

write_value += 1

with open("last_written_value", "w") as fd:
with open(f"last_written_value-{db_name}-{coll_name}", "w") as fd:
fd.write(str(write_value - 1))


def main():
connection_string = sys.argv[1]
starting_number = int(sys.argv[2])
continous_writes(connection_string, starting_number)
db_name = DEFAULT_DB_NAME if len(sys.argv) < 4 else sys.argv[3]
coll_name = DEFAULT_COLL_NAME if len(sys.argv) < 5 else sys.argv[4]
continous_writes(connection_string, starting_number, db_name, coll_name)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
APPLICATION_DEFAULT_APP_NAME = "application"
TIMEOUT = 15 * 60
TEST_DB = "continuous_writes_database"
TEST_COLLECTION = "test_collection"
TEST_COLLECTION = "continuous_writes_collection"
ANOTHER_DATABASE_APP_NAME = "another-database"
EXCLUDED_APPS = [ANOTHER_DATABASE_APP_NAME]

Expand Down
Loading

0 comments on commit ad87f00

Please sign in to comment.