Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Console] Scale for ECS services; replayer start/stop #765

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import console_link.logic.backfill as logic_backfill
import console_link.logic.snapshot as logic_snapshot
import console_link.logic.metadata as logic_metadata
import console_link.logic.replay as logic_replay

from console_link.models.utils import ExitCode
from console_link.environment import Environment
Expand Down Expand Up @@ -236,7 +237,61 @@
raise click.ClickException(message)
click.echo(message)

# ##################### METRICS ###################

# ##################### REPLAY ###################

@cli.group(name="replay")
@click.pass_obj
def replay_group(ctx):
"""All actions related to replaying data"""
if ctx.env.replay is None:
raise click.UsageError("Replay is not set")

Check warning on line 248 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L247-L248

Added lines #L247 - L248 were not covered by tests


@replay_group.command(name="describe")
@click.pass_obj
def describe_replay_cmd(ctx):
click.echo(logic_replay.describe(ctx.env.replay, as_json=ctx.json))

Check warning on line 254 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L254

Added line #L254 was not covered by tests


@replay_group.command(name="start")
@click.pass_obj
def start_replay_cmd(ctx):
exitcode, message = logic_replay.start(ctx.env.replay)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Check warning on line 263 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L260-L263

Added lines #L260 - L263 were not covered by tests


@replay_group.command(name="stop")
@click.pass_obj
def stop_replay_cmd(ctx):
exitcode, message = logic_replay.stop(ctx.env.replay)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Check warning on line 272 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L269-L272

Added lines #L269 - L272 were not covered by tests


@replay_group.command(name="scale")
@click.argument("units", type=int, required=True)
@click.pass_obj
def scale_replay_cmd(ctx, units: int):
exitcode, message = logic_replay.scale(ctx.env.replay, units)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Check warning on line 282 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L279-L282

Added lines #L279 - L282 were not covered by tests


@replay_group.command(name="status")
@click.pass_obj
def status_replay_cmd(ctx):
exitcode, message = logic_replay.status(ctx.env.replay)
if exitcode != ExitCode.SUCCESS:
raise click.ClickException(message)
click.echo(message)

Check warning on line 291 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/cli.py#L288-L291

Added lines #L288 - L291 were not covered by tests


# ##################### METADATA ###################


@cli.group(name="metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from console_link.logic.backfill import get_backfill
from console_link.models.backfill_base import Backfill
from console_link.models.snapshot import FileSystemSnapshot, Snapshot, S3Snapshot
from console_link.models.replayer_base import Replayer
from console_link.models.replayer_ecs import ECSReplayer
import yaml
from cerberus import Validator

Expand All @@ -20,14 +22,21 @@
return S3Snapshot(config, source_cluster, target_cluster)


def get_replayer(config: Dict):
if 'ecs' in config:
return ECSReplayer(config)
raise ValueError("Invalid replayer config")

Check warning on line 28 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/environment.py#L28

Added line #L28 was not covered by tests


SCHEMA = {
"source_cluster": {"type": "dict", "required": False},
"target_cluster": {"type": "dict", "required": True},
"replayer": {"type": "dict", "required": False},
"backfill": {"type": "dict", "required": False},
"metrics_source": {"type": "dict", "required": False},
"snapshot": {"type": "dict", "required": False},
"metadata_migration": {"type": "dict", "required": False}
"metadata_migration": {"type": "dict", "required": False},
"replay": {"type": "dict", "required": False}
}


Expand All @@ -38,6 +47,7 @@
metrics_source: Optional[MetricsSource] = None
snapshot: Optional[Snapshot] = None
metadata: Optional[Metadata] = None
replay: Optional[Replayer] = None

def __init__(self, config_file: str):
logger.info(f"Loading config file: {config_file}")
Expand Down Expand Up @@ -77,6 +87,10 @@
else:
logger.info("No backfill provided")

if 'replay' in self.config:
self.replay: Replayer = get_replayer(self.config["replay"])
logger.info(f"Replay initialized: {self.replay}")

if 'snapshot' in self.config:
self.snapshot: Snapshot = get_snapshot(self.config["snapshot"],
source_cluster=self.source_cluster,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
import logging
from typing import Tuple
from console_link.models.utils import ExitCode
from console_link.models.replayer_base import Replayer
import yaml


logger = logging.getLogger(__name__)


def describe(replayer: Replayer, as_json=False) -> str:
response = replayer.describe()
if as_json:
return json.dumps(response)
return yaml.safe_dump(response)

Check warning on line 16 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py#L13-L16

Added lines #L13 - L16 were not covered by tests


def start(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
try:
result = replayer.start(*args, **kwargs)
except NotImplementedError:
logger.error(f"Start is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Start is not implemented for replayer {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to start replayer: {e}")
return ExitCode.FAILURE, f"Failure when starting replayer: {type(e).__name__} {e}"

Check warning on line 27 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py#L20-L27

Added lines #L20 - L27 were not covered by tests

if result.success:
return ExitCode.SUCCESS, "Replayer started successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer start failed." + "\n" + result.display()

Check warning on line 31 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py#L29-L31

Added lines #L29 - L31 were not covered by tests


def stop(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Stopping replayer")
try:
result = replayer.stop(*args, **kwargs)
except NotImplementedError:
logger.error(f"Stop is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Stop is not implemented for replayer {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to stop replayer: {e}")
return ExitCode.FAILURE, f"Failure when stopping replayer: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, "Replayer stopped successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer stop failed." + "\n" + result.display()

Check warning on line 46 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py#L35-L46

Added lines #L35 - L46 were not covered by tests


def scale(replayer: Replayer, units: int, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info(f"Scaling replayer to {units} units")
try:
result = replayer.scale(units, *args, **kwargs)
except NotImplementedError:
logger.error(f"Scale is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Scale is not implemented for replayer {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to scale replayer: {e}")
return ExitCode.FAILURE, f"Failure when scaling replayer: {type(e).__name__} {e}"
if result.success:
return ExitCode.SUCCESS, "Replayer scaled successfully." + "\n" + result.display()
return ExitCode.FAILURE, "Replayer scale failed." + "\n" + result.display()

Check warning on line 61 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py#L50-L61

Added lines #L50 - L61 were not covered by tests


def status(replayer: Replayer, *args, **kwargs) -> Tuple[ExitCode, str]:
logger.info("Getting replayer status")
try:
status = replayer.get_status(*args, **kwargs)
except NotImplementedError:
logger.error(f"Status is not implemented for replayer {type(replayer).__name__}")
return ExitCode.FAILURE, f"Status is not implemented for replayer: {type(replayer).__name__}"
except Exception as e:
logger.error(f"Failed to get status of replayer: {e}")
return ExitCode.FAILURE, f"Failure when getting status of replayer: {type(e).__name__} {e}"
if status:
return ExitCode.SUCCESS, status.value
return ExitCode.FAILURE, "Replayer status retrieval failed." + "\n" + status

Check warning on line 76 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/logic/replay.py#L65-L76

Added lines #L65 - L76 were not covered by tests
peternied marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ def start(self, *args, **kwargs) -> CommandResult:
def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping RFS backfill by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from enum import Enum
from typing import Dict
from abc import ABC, abstractmethod

from console_link.models.schema_tools import contains_one_of
from console_link.models.command_result import CommandResult

from cerberus import Validator

DOCKER_REPLAY_SCHEMA = {
"type": "dict",
"schema": {
"socket": {"type": "string", "required": False}
}
}

ECS_REPLAY_SCHEMA = {
"type": "dict",
"schema": {
"cluster_name": {"type": "string", "required": True},
"service_name": {"type": "string", "required": True},
"aws_region": {"type": "string", "required": False}
}
}

SCHEMA = {
"replay": {
"type": "dict",
"schema": {
"docker": DOCKER_REPLAY_SCHEMA,
"ecs": ECS_REPLAY_SCHEMA,
"scale": {"type": "integer", "required": False, "min": 1}
},
"check_with": contains_one_of({"docker", "ecs"})
}
}


ReplayStatus = Enum("ReplayStatus", ["NOT_STARTED", "RUNNING", "STOPPED", "FAILED"])


class Replayer(ABC):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice way to inject this abstraction

"""
Interface for replaying data from kafka to a target cluster.
"""
def __init__(self, config: Dict) -> None:
v = Validator(SCHEMA)
self.config = config
if not v.validate({"replay": self.config}):
raise ValueError("Invalid config file for replay", v.errors)
self.default_scale = self.config.get("scale", 1)

@abstractmethod
def start(self, *args, **kwargs) -> CommandResult:
"""Begin running the replayer. After running start, the user should be able to assume that--barring exceptions
or failures--their data will begin playing against the target cluster."""
pass

Check warning on line 57 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py#L57

Added line #L57 was not covered by tests

@abstractmethod
def stop(self, *args, **kwargs) -> CommandResult:
"""Stop or pause the replay. This does not make guarantees about resumeability."""
pass

Check warning on line 62 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py#L62

Added line #L62 was not covered by tests

@abstractmethod
def get_status(self, *args, **kwargs) -> ReplayStatus:
"""Return a status"""
pass

Check warning on line 67 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py#L67

Added line #L67 was not covered by tests

@abstractmethod
def scale(self, units: int, *args, **kwargs) -> CommandResult:
pass

Check warning on line 71 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py#L71

Added line #L71 was not covered by tests

def describe(self) -> Dict:
return self.config

Check warning on line 74 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_base.py#L74

Added line #L74 was not covered by tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Dict
from console_link.models.command_result import CommandResult
from console_link.models.ecs_service import ECSService
from console_link.models.replayer_base import Replayer, ReplayStatus

import logging

logger = logging.getLogger(__name__)


class ECSReplayer(Replayer):
def __init__(self, config: Dict) -> None:
super().__init__(config)
self.ecs_config = self.config["ecs"]
self.ecs_client = ECSService(self.ecs_config["cluster_name"], self.ecs_config["service_name"],
self.ecs_config.get("aws_region", None))

def start(self, *args, **kwargs) -> CommandResult:
logger.info(f"Starting ECS replayer by setting desired count to {self.default_scale} instances")
return self.ecs_client.set_desired_count(self.default_scale)

def stop(self, *args, **kwargs) -> CommandResult:
logger.info("Stopping ECS replayer by setting desired count to 0 instances")
return self.ecs_client.set_desired_count(0)

def get_status(self, *args, **kwargs) -> ReplayStatus:
raise NotImplementedError()

Check warning on line 27 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_ecs.py

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/models/replayer_ecs.py#L27

Added line #L27 was not covered by tests

def scale(self, units: int, *args, **kwargs) -> CommandResult:
logger.info(f"Scaling ECS replayer by setting desired count to {units} instances")
return self.ecs_client.set_desired_count(units)
Loading
Loading