diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py index d3b83b470..aa02e7397 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py @@ -1,17 +1,24 @@ import logging -from typing import Optional +from typing import Optional, Dict from console_link.models.cluster import Cluster from console_link.models.metrics_source import MetricsSource from console_link.logic.metrics import get_metrics_source from console_link.logic.backfill import get_backfill from console_link.models.backfill_base import Backfill -from console_link.models.snapshot import Snapshot, S3Snapshot +from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot import yaml from cerberus import Validator logger = logging.getLogger(__name__) + +def get_snapshot(config: Dict, source_cluster: Cluster, target_cluster: Cluster): + if 'fs' in config: + return FileSystemSnapshot(config, source_cluster, target_cluster) + return S3Snapshot(config, source_cluster, target_cluster) + + SCHEMA = { "source_cluster": {"type": "dict", "required": False}, "target_cluster": {"type": "dict", "required": True}, @@ -66,9 +73,9 @@ def __init__(self, config_file: str): logger.info("No backfill provided") if 'snapshot' in self.config: - self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"], - source_cluster=self.source_cluster, - target_cluster=self.target_cluster) + self.snapshot: Snapshot = get_snapshot(self.config["snapshot"], + source_cluster=self.source_cluster, + target_cluster=self.target_cluster) logger.info(f"Snapshot initialized: {self.snapshot}") else: logger.info("No snapshot provided") diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py index 690ad7f7b..9b986cd3b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/snapshot.py @@ -3,10 +3,12 @@ import logging import subprocess from typing import Dict, Optional -from console_link.models.cluster import Cluster +from console_link.models.cluster import AuthMethod, Cluster from console_link.models.command_result import CommandResult from cerberus import Validator +from console_link.models.schema_tools import contains_one_of + logger = logging.getLogger(__name__) SnapshotStatus = Enum( @@ -17,6 +19,30 @@ "FAILED"]) +SNAPSHOT_SCHEMA = { + 'snapshot': { + 'type': 'dict', + 'schema': { + 'snapshot_name': {'type': 'string', 'required': True}, + 's3': { + 'type': 'dict', + 'schema': { + 'repo_uri': {'type': 'string', 'required': True}, + 'aws_region': {'type': 'string', 'required': True}, + } + }, + 'fs': { + 'type': 'dict', + 'schema': { + 'repo_path': {'type': 'string', 'required': True}, + } + } + }, + 'check_with': contains_one_of({'s3', 'fs'}) + } +} + + class Snapshot(ABC): """ Interface for creating and managing snapshots. @@ -25,6 +51,9 @@ def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Option self.config = config self.source_cluster = source_cluster self.target_cluster = target_cluster + v = Validator(SNAPSHOT_SCHEMA) + if not v.validate({'snapshot': config}): + raise ValueError("Invalid config file for snapshot", v.errors) @abstractmethod def create(self, *args, **kwargs) -> CommandResult: @@ -37,40 +66,69 @@ def status(self, *args, **kwargs) -> CommandResult: pass -S3_SNAPSHOT_SCHEMA = { - 'snapshot_name': { - 'type': 'string', - 'required': True - }, - 's3_repo_uri': { - 'type': 'string', - 'required': True - }, - 's3_region': { - 'type': 'string', - 'required': True - } -} - - class S3Snapshot(Snapshot): - def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None: + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluster) -> None: super().__init__(config, source_cluster, target_cluster) - v = Validator(S3_SNAPSHOT_SCHEMA) - if not v.validate(config): - raise ValueError("Invalid config file for snapshot", v.errors) + self.snapshot_name = config['snapshot_name'] - self.s3_repo_uri = config['s3_repo_uri'] - self.s3_region = config['s3_region'] + self.s3_repo_uri = config['s3']['repo_uri'] + self.s3_region = config['s3']['aws_region'] def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) + if self.source_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") + + if self.target_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + command = [ "/root/createSnapshot/bin/CreateSnapshot", "--snapshot-name", self.snapshot_name, "--s3-repo-uri", self.s3_repo_uri, "--s3-region", self.s3_region, "--source-host", self.source_cluster.endpoint, - "--target-host", self.target_cluster.endpoint, + ] + + if self.source_cluster.allow_insecure: + command.append("--source-insecure") + if self.target_cluster.allow_insecure: + command.append("--target-insecure") + + logger.info(f"Creating snapshot with command: {' '.join(command)}") + try: + # Pass None to stdout and stderr to not capture output and show in terminal + subprocess.run(command, stdout=None, stderr=None, text=True, check=True) + logger.info(f"Snapshot {self.config['snapshot_name']} created successfully") + return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to create snapshot: {str(e)}") + return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)}") + + def status(self, *args, **kwargs) -> CommandResult: + return CommandResult(success=False, value="Command not implemented") + + +class FileSystemSnapshot(Snapshot): + def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Cluster) -> None: + super().__init__(config, source_cluster, target_cluster) + self.snapshot_name = config['snapshot_name'] + self.repo_path = config['fs']['repo_path'] + + def create(self, *args, **kwargs) -> CommandResult: + assert isinstance(self.target_cluster, Cluster) + + if self.source_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Source cluster authentication is not supported for creating snapshots") + + if self.target_cluster.auth_type != AuthMethod.NO_AUTH: + raise NotImplementedError("Target cluster authentication is not supported for creating snapshots") + + command = [ + "/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", self.snapshot_name, + "--file-system-repo-path", self.repo_path, + "--source-host", self.source_cluster.endpoint, ] if self.source_cluster.allow_insecure: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml index 521992379..e9e901d3f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/services.yaml @@ -2,21 +2,21 @@ source_cluster: endpoint: "https://capture-proxy:9200" allow_insecure: true basic_auth: - username: "admin" - password: "admin" + username: "admin" + password: "admin" target_cluster: endpoint: "https://opensearchtarget:9200" allow_insecure: true basic_auth: - username: "admin" - password: "myStrongPassword123!" + username: "admin" + password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" backfill: - reindex_from_snapshot: - docker: + reindex_from_snapshot: + docker: snapshot: - snapshot_name: "snapshot_2023_01_01" - s3_repo_uri: "s3://my-snapshot-bucket" - s3_region: "us-east-2" + snapshot_name: "snapshot_2024_06_21" + fs: + repo_path: "/snapshot/test-console" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml index abca9d89a..caa08cafe 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/data/services.yaml @@ -2,21 +2,22 @@ source_cluster: endpoint: "https://capture-proxy-es:9200" allow_insecure: true basic_auth: - username: "admin" - password: "admin" + username: "admin" + password: "admin" target_cluster: endpoint: "https://opensearchtarget:9200" allow_insecure: true basic_auth: - username: "admin" - password: "myStrongPassword123!" + username: "admin" + password: "myStrongPassword123!" metrics_source: prometheus: endpoint: "http://prometheus:9090" backfill: - reindex_from_snapshot: - docker: + reindex_from_snapshot: + docker: snapshot: - snapshot_name: "test_snapshot" - s3_repo_uri: "s3://test-bucket" - s3_region: "us-east-2" + snapshot_name: "snapshot_2023_01_01" + s3: + repo_uri: "s3://my-snapshot-bucket" + aws_region: "us-east-2" diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py new file mode 100644 index 000000000..ddaec0a7e --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/test_snapshot.py @@ -0,0 +1,179 @@ +from console_link.models.snapshot import S3Snapshot, FileSystemSnapshot, Snapshot +from console_link.environment import get_snapshot +from console_link.models.cluster import AuthMethod +from tests.utils import create_valid_cluster +import pytest + + +def test_s3_snapshot_init_succeeds(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + snapshot = S3Snapshot(config['snapshot'], create_valid_cluster(), None) + assert isinstance(snapshot, Snapshot) + + +def test_fs_snapshot_init_succeeds(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + } + } + snapshot = FileSystemSnapshot(config["snapshot"], create_valid_cluster(auth_type=AuthMethod.NO_AUTH), + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + assert isinstance(snapshot, Snapshot) + + +def test_get_snapshot_for_s3_config(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + snapshot = get_snapshot(config["snapshot"], create_valid_cluster(), None) + assert isinstance(snapshot, S3Snapshot) + + +def test_get_snapshot_for_fs_config(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + } + } + snapshot = get_snapshot(config["snapshot"], create_valid_cluster(auth_type=AuthMethod.NO_AUTH), + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + assert isinstance(snapshot, FileSystemSnapshot) + + +def test_get_snapshot_fails_for_invalid_config(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "invalid": { + "key": "value" + }, + } + } + with pytest.raises(ValueError) as excinfo: + get_snapshot(config["snapshot"], create_valid_cluster(), create_valid_cluster()) + assert "Invalid config file for snapshot" in str(excinfo.value.args[0]) + + +def test_get_snpashot_fails_for_config_with_fs_and_s3(): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + with pytest.raises(ValueError) as excinfo: + get_snapshot(config["snapshot"], create_valid_cluster(), create_valid_cluster()) + assert "Invalid config file for snapshot" in str(excinfo.value.args[0]) + + +def test_fs_snapshot_create_calls_subprocess_run_with_correct_args(mocker): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/for/snapshot/repo" + }, + } + } + source = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + snapshot = FileSystemSnapshot(config["snapshot"], source, + create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + + mock = mocker.patch("subprocess.run") + snapshot.create() + + mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", config["snapshot"]["snapshot_name"], + "--file-system-repo-path", config["snapshot"]["fs"]["repo_path"], + "--source-host", source.endpoint, + "--source-insecure", "--target-insecure"], + stdout=None, stderr=None, text=True, check=True) + + +def test_s3_snapshot_create_calls_subprocess_run_with_correct_args(mocker): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + source = create_valid_cluster(auth_type=AuthMethod.NO_AUTH) + snapshot = S3Snapshot(config["snapshot"], source, create_valid_cluster(auth_type=AuthMethod.NO_AUTH)) + + mock = mocker.patch("subprocess.run") + snapshot.create() + + mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot", + "--snapshot-name", config["snapshot"]["snapshot_name"], + "--s3-repo-uri", config["snapshot"]["s3"]["repo_uri"], + "--s3-region", config["snapshot"]["s3"]["aws_region"], + "--source-host", source.endpoint, + "--source-insecure", "--target-insecure"], + stdout=None, stderr=None, text=True, check=True) + + +@pytest.mark.parametrize("source_auth,target_auth", [(AuthMethod.NO_AUTH, AuthMethod.BASIC_AUTH), + (AuthMethod.BASIC_AUTH, AuthMethod.NO_AUTH)]) +def test_s3_snapshot_create_fails_for_clusters_with_auth(source_auth, target_auth): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "s3": { + "repo_uri": "s3://my-bucket", + "aws_region": "us-east-1" + }, + } + } + snapshot = S3Snapshot(config["snapshot"], create_valid_cluster(auth_type=source_auth), + create_valid_cluster(auth_type=target_auth)) + with pytest.raises(NotImplementedError) as excinfo: + snapshot.create() + assert "authentication is not supported" in str(excinfo.value.args[0]) + + +@pytest.mark.parametrize("source_auth,target_auth", [(AuthMethod.NO_AUTH, AuthMethod.BASIC_AUTH), + (AuthMethod.BASIC_AUTH, AuthMethod.NO_AUTH)]) +def test_fs_snapshot_create_fails_for_clusters_with_auth(source_auth, target_auth): + config = { + "snapshot": { + "snapshot_name": "reindex_from_snapshot", + "fs": { + "repo_path": "/path/to/repo" + }, + } + } + with pytest.raises(NotImplementedError) as excinfo: + snapshot = FileSystemSnapshot(config["snapshot"], create_valid_cluster(auth_type=source_auth), + create_valid_cluster(auth_type=target_auth)) + snapshot.create() + assert "authentication is not supported" in str(excinfo.value.args[0]) diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts index 514834ddd..4d01c91e6 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-services-yaml.ts @@ -42,16 +42,27 @@ export class OSIBackfillYaml { } } +export class FileSystemSnapshotYaml { + repo_path: string = ''; +} + export class S3SnapshotYaml { + repo_uri: string = ''; + aws_region: string = ''; +} + +export class SnapshotYaml { snapshot_name: string = ''; - s3_repo_uri: string = ''; - s3_region: string = ''; + s3?: S3SnapshotYaml; + fs?: FileSystemSnapshotYaml; toDict() { return { snapshot_name: this.snapshot_name, - s3_repo_uri: this.s3_repo_uri, - s3_region: this.s3_region + // This conditinally includes the s3 and fs parameters if they're defined, + // but does not add the keys otherwise + ...(this.s3 && { s3: this.s3 }), + ...(this.fs && { fs: this.fs }) }; } } @@ -61,7 +72,7 @@ export class ServicesYaml { target_cluster: ClusterYaml; metrics_source: MetricsSourceYaml = new MetricsSourceYaml(); backfill: RFSBackfillYaml | OSIBackfillYaml; - snapshot?: S3SnapshotYaml; + snapshot?: SnapshotYaml; stringify(): string { return yaml.stringify({ diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts index 5b3627ac1..e144825fc 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/reindex-from-snapshot-stack.ts @@ -11,7 +11,7 @@ import { createOpenSearchServerlessIAMAccessPolicy, getMigrationStringParameterValue } from "../common-utilities"; -import { RFSBackfillYaml, S3SnapshotYaml } from "../migration-services-yaml"; +import { RFSBackfillYaml, SnapshotYaml } from "../migration-services-yaml"; export interface ReindexFromSnapshotProps extends StackPropsExt { @@ -23,7 +23,7 @@ export interface ReindexFromSnapshotProps extends StackPropsExt { export class ReindexFromSnapshotStack extends MigrationServiceCore { rfsBackfillYaml: RFSBackfillYaml; - rfsSnapshotYaml: S3SnapshotYaml; + rfsSnapshotYaml: SnapshotYaml; constructor(scope: Construct, id: string, props: ReindexFromSnapshotProps) { super(scope, id, props) @@ -85,9 +85,8 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore { this.rfsBackfillYaml = new RFSBackfillYaml(); this.rfsBackfillYaml.ecs.cluster_name = `migration-${props.stage}-ecs-cluster`; this.rfsBackfillYaml.ecs.service_name = `migration-${props.stage}-reindex-from-snapshot`; - this.rfsSnapshotYaml = new S3SnapshotYaml(); - this.rfsSnapshotYaml.s3_repo_uri = s3Uri; - this.rfsSnapshotYaml.s3_region = this.region; + this.rfsSnapshotYaml = new SnapshotYaml(); + this.rfsSnapshotYaml.s3 = {repo_uri: s3Uri, aws_region: this.region}; this.rfsSnapshotYaml.snapshot_name = "rfs-snapshot"; } diff --git a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts index 2f7a38f91..0eb70d25f 100644 --- a/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts +++ b/deployment/cdk/opensearch-service-migration/test/migration-services-yaml.test.ts @@ -1,4 +1,4 @@ -import { ClusterYaml, RFSBackfillYaml, ServicesYaml } from "../lib/migration-services-yaml" +import { ClusterYaml, RFSBackfillYaml, ServicesYaml, SnapshotYaml } from "../lib/migration-services-yaml" test('Test default servicesYaml can be stringified', () => { const servicesYaml = new ServicesYaml(); @@ -57,4 +57,24 @@ test('Test servicesYaml without backfill does not include backend section', () = let servicesYaml = new ServicesYaml(); const yaml = servicesYaml.stringify(); expect(yaml).toBe(`metrics_source:\n cloudwatch:\n`); -}) \ No newline at end of file +}) + +test('Test SnapshotYaml for filesystem only includes fs', () => { + let fsSnapshot = new SnapshotYaml(); + fsSnapshot.fs = {"repo_path": "/path/to/shared/volume"} + const fsSnapshotDict = fsSnapshot.toDict() + expect(fsSnapshotDict).toBeDefined(); + expect(fsSnapshotDict).toHaveProperty("fs"); + expect(fsSnapshotDict["fs"]).toHaveProperty("repo_path"); + expect(fsSnapshotDict).not.toHaveProperty("s3"); +}) + +test('Test SnapshotYaml for s3 only includes s3', () => { + let s3Snapshot = new SnapshotYaml(); + s3Snapshot.s3 = {"repo_uri": "s3://repo/path", "aws_region": "us-east-1"} + const s3SnapshotDict = s3Snapshot.toDict() + expect(s3SnapshotDict).toBeDefined(); + expect(s3SnapshotDict).toHaveProperty("s3"); + expect(s3SnapshotDict["s3"]).toHaveProperty("repo_uri"); + expect(s3SnapshotDict).not.toHaveProperty("fs"); +})