Skip to content

Commit

Permalink
Lower notebook batch size, send metrics for resource imports
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-richey committed Sep 17, 2024
1 parent 5cb933a commit 32d7f82
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 27 deletions.
3 changes: 2 additions & 1 deletion datadog_sync/commands/_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from click import command

from datadog_sync.constants import Command
from datadog_sync.commands.shared.options import common_options, source_auth_options
from datadog_sync.commands.shared.options import common_options, destination_auth_options, source_auth_options
from datadog_sync.commands.shared.utils import run_cmd


@command(Command.IMPORT.value, short_help="Import Datadog resources.")
@source_auth_options
@destination_auth_options
@common_options
def _import(**kwargs):
"""Import Datadog resources."""
Expand Down
8 changes: 8 additions & 0 deletions datadog_sync/commands/shared/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ def click_config_file_provider(ctx: Context, opts: CustomOptionClass, value: Non
"only source api key is validated. On sync/diffs, only destination api key is validated.",
cls=CustomOptionClass,
),
option(
"--send-metrics",
type=bool,
required=False,
default=True,
help="Enables sync-cli metrics being sent to both source and destination",
cls=CustomOptionClass,
),
]


Expand Down
7 changes: 3 additions & 4 deletions datadog_sync/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

from enum import Enum, StrEnum
from enum import Enum

# Environment variables
DD_SOURCE_API_URL = "DD_SOURCE_API_URL"
Expand Down Expand Up @@ -47,14 +47,13 @@ class Origin(Enum):
DESTINATION = "destination"


# Metrics
class Metrics(StrEnum):
class Metrics(Enum):
PREFIX = "datadog.ddr.sync_cli"
ACTION = "action"


# Status
class Status(StrEnum):
class Status(Enum):
SUCCESS = "success"
SKIPPED = "skipped"
FAILURE = "failure"
2 changes: 1 addition & 1 deletion datadog_sync/model/notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Notebooks(BaseResource):
)
# Additional Notebooks specific attributes
pagination_config = PaginationConfig(
page_size=500,
page_size=100,
page_size_param="count",
page_number_param="start",
remaining_func=lambda idx, resp, page_size, page_number: (resp["meta"]["page"]["total_count"])
Expand Down
12 changes: 7 additions & 5 deletions datadog_sync/utils/base_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
find_attr,
ResourceConnectionError,
)
from datadog_sync.constants import Command, Metrics, Status
from datadog_sync.constants import Metrics

if TYPE_CHECKING:
from datadog_sync.utils.configuration import Configuration
Expand Down Expand Up @@ -229,11 +229,13 @@ async def _send_action_metrics(self, action: str, _id: str, status: str, tags: O
tags.append(f"status:{status}")
tags.append(f"resource_type:{self.resource_type}")
try:
await self.config.destination_client.send_metric(Metrics.ACTION, tags)
await self.config.destination_client.send_metric(Metrics.ACTION.value, tags + ["client_type:destination"])
self.config.logger.debug(f"Sent metrics to destination for {self.resource_type}")
except Exception as e:
self.config.logger.warning(f"Failed to send metrics to destination for {self.resource_type}: {str(e)}")
self.config.logger.debug(f"Failed to send metrics to destination for {self.resource_type}: {str(e)}")

try:
await self.config.source_client.send_metric(Metrics.ACTION, tags)
await self.config.source_client.send_metric(Metrics.ACTION.value, tags + ["client_type:source"])
self.config.logger.debug(f"Sent metrics to source for {self.resource_type}")
except Exception as e:
self.config.logger.warning(f"Failed to send metrics to source for {self.resource_type}: {str(e)}")
self.config.logger.debug(f"Failed to send metrics to source for {self.resource_type}: {str(e)}")
7 changes: 5 additions & 2 deletions datadog_sync/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Configuration(object):
cleanup: int
create_global_downtime: bool
validate: bool
send_metrics: bool
state: State
resources: Dict[str, BaseResource] = field(default_factory=dict)
resources_arg: List[str] = field(default_factory=list)
Expand Down Expand Up @@ -87,20 +88,21 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
# Initialize the datadog API Clients based on cmd
retry_timeout = kwargs.get("http_client_retry_timeout")
timeout = kwargs.get("http_client_timeout")
send_metrics = kwargs.get("send_metrics")

source_auth = {}
if k := kwargs.get("source_api_key"):
source_auth["apiKeyAuth"] = k
if k := kwargs.get("source_app_key"):
source_auth["appKeyAuth"] = k
source_client = CustomClient(source_api_url, source_auth, retry_timeout, timeout)
source_client = CustomClient(source_api_url, source_auth, retry_timeout, timeout, send_metrics)

destination_auth = {}
if k := kwargs.get("destination_api_key"):
destination_auth["apiKeyAuth"] = k
if k := kwargs.get("destination_app_key"):
destination_auth["appKeyAuth"] = k
destination_client = CustomClient(destination_api_url, destination_auth, retry_timeout, timeout)
destination_client = CustomClient(destination_api_url, destination_auth, retry_timeout, timeout, send_metrics)

# Additional settings
force_missing_dependencies = kwargs.get("force_missing_dependencies")
Expand Down Expand Up @@ -133,6 +135,7 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
cleanup=cleanup,
create_global_downtime=create_global_downtime,
validate=validate,
send_metrics=send_metrics,
state=state,
)

Expand Down
9 changes: 7 additions & 2 deletions datadog_sync/utils/custom_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ def __init__(
auth: Dict[str, str],
retry_timeout: int,
timeout: int,
send_metrics: bool,
) -> None:
self.url_object = UrlObject.from_str(host)
self.timeout = timeout
self.session = None
self.retry_timeout = retry_timeout
self.default_pagination = PaginationConfig()
self.auth = auth
self.send_metrics = send_metrics

async def _init_session(self):
ssl_context = ssl.create_default_context(cafile=certifi.where())
Expand Down Expand Up @@ -131,7 +133,8 @@ async def wrapper(*args, **kwargs):
log.debug(
f"fetching {args[0]} "
f"{pagination_config.page_number_param}: {page_number} "
f"{pagination_config.page_size_param}: {page_size}"
f"{pagination_config.page_size_param}: {page_size} "
f"remaining: {remaining}"
)
params = {
pagination_config.page_size_param: page_size,
Expand Down Expand Up @@ -160,9 +163,11 @@ async def wrapper(*args, **kwargs):
return wrapper

async def send_metric(self, metric: str, tags: List[str] = None) -> None:
if not self.send_metrics:
return None
path = "/api/v2/series"
timestamp = int(datetime.now().timestamp())
full_metric = f"{Metrics.PREFIX}.{metric}"
full_metric = f"{Metrics.PREFIX.value}.{metric}"
body = {
"series": [
{
Expand Down
34 changes: 22 additions & 12 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def _apply_resource_cb(self, q_item: List) -> None:
prep_resource(r_class.resource_config, resource)
await r_class._update_resource(_id, resource)
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SUCCESS, tags=["action_sub_type:update"]
Command.SYNC.value, _id, Status.SUCCESS.value, tags=["action_sub_type:update"]
)
self.config.logger.debug(f"Finished update for {resource_type} with {_id}")

Expand All @@ -125,23 +125,27 @@ async def _apply_resource_cb(self, q_item: List) -> None:
prep_resource(r_class.resource_config, resource)
await r_class._create_resource(_id, resource)
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SUCCESS, tags=["action_sub_type:create"]
Command.SYNC.value, _id, Status.SUCCESS.value, tags=["action_sub_type:create"]
)
self.config.logger.debug(f"finished create for {resource_type} with id: {_id}")

self.worker.counter.increment_success()

except SkipResource as e:
self.config.logger.info(str(e))
self.config.logger.debug(str(e))
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.SKIPPED, tags=["reason:up_to_date"])
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:up_to_date"]
)
except ResourceConnectionError:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.SKIPPED, tags=["reason:connection_error"])
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:connection_error"]
)
except Exception as e:
self.worker.counter.increment_failure()
self.config.logger.error(str(e))
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.FAILURE)
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.FAILURE.value)
finally:
# always place in done queue regardless of exception thrown
self.sorter.done(q_item)
Expand Down Expand Up @@ -235,13 +239,19 @@ async def _import_get_resources_cb(self, resource_type: str, tmp_storage) -> Non
get_resp = await r_class._get_resources(self.config.source_client)
self.worker.counter.increment_success()
tmp_storage[resource_type] = get_resp
await r_class._send_action_metrics(Command.IMPORT.value + "_resources", resource_type, Status.SUCCESS.value)
except TimeoutError:
self.worker.counter.increment_failure()
self.config.logger.error(f"TimeoutError while getting resources {resource_type}")
await r_class._send_action_metrics(Command.IMPORT.value + "_resources", resource_type, Status.FAILURE.value, tags=["reason:timeout"])
except Exception as e:
self.worker.counter.increment_failure()
self.config.logger.error(f"Error while getting resources {resource_type}: {str(e)}")
await r_class._send_action_metrics(Command.IMPORT.value + "_resources", resource_type, Status.FAILURE.value, tags=["reason:unknown"])

async def _import_resource(self, q_item: List) -> None:
resource_type, resource = q_item
_id = resource["id"]
_id = resource.get("id")
r_class = self.config.resources[resource_type]

if not r_class.filter(resource):
Expand All @@ -251,14 +261,14 @@ async def _import_resource(self, q_item: List) -> None:
try:
await r_class._import_resource(resource=resource)
self.worker.counter.increment_success()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SUCCESS)
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SUCCESS.value)
except SkipResource as e:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SKIPPED)
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SKIPPED.value)
self.config.logger.debug(str(e))
except Exception as e:
self.worker.counter.increment_failure()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.FAILURE)
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.FAILURE.value)
self.config.logger.error(f"Error while importing resource {resource_type}: {str(e)}")

async def _force_missing_dep_import_cb(self, q_item: List):
Expand All @@ -285,10 +295,10 @@ async def _cleanup_worker(self, q_item: List) -> None:

await r_class._delete_resource(_id)
self.worker.counter.increment_success()
await r_class._send_action_metrics("delete", _id, Status.SUCCESS)
await r_class._send_action_metrics("delete", _id, Status.SUCCESS.value)
except Exception as e:
self.worker.counter.increment_failure()
await r_class._send_action_metrics("delete", _id, Status.FAILURE)
await r_class._send_action_metrics("delete", _id, Status.FAILURE.value)
self.config.logger.error(f"error deleting resource {resource_type} with id {_id}: {str(e)}")
finally:
if not r_class.resource_config.concurrent:
Expand Down

0 comments on commit 32d7f82

Please sign in to comment.