Skip to content

Commit

Permalink
Adding observability metrics to sync-cli (#283)
Browse files Browse the repository at this point in the history
* Add metrics to sync-cli
* Add files to .gitignore
* Fixed notebooks timeout by changing pagesize
  • Loading branch information
michael-richey committed Sep 27, 2024
1 parent 58f0b78 commit dfdb3de
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 34 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
datadog_sync/version.py
**/*.pyc
**/*.pyo
**/*.swp
.vscode/
.idea/
.coverage
Expand Down Expand Up @@ -145,3 +146,6 @@ dmypy.json

# Cython debug symbols
cython_debug/

# sync-cli resources
resources/
7 changes: 6 additions & 1 deletion datadog_sync/commands/_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
from click import command

from datadog_sync.constants import Command
from datadog_sync.commands.shared.options import common_options, source_auth_options
from datadog_sync.commands.shared.options import (
common_options,
destination_auth_options,
source_auth_options,
)
from datadog_sync.commands.shared.utils import run_cmd


@command(Command.IMPORT.value, short_help="Import Datadog resources.")
@source_auth_options
@destination_auth_options
@common_options
def _import(**kwargs):
"""Import Datadog resources."""
Expand Down
8 changes: 8 additions & 0 deletions datadog_sync/commands/shared/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ def click_config_file_provider(ctx: Context, opts: CustomOptionClass, value: Non
"only source api key is validated. On sync/diffs, only destination api key is validated.",
cls=CustomOptionClass,
),
option(
"--send-metrics",
type=bool,
required=False,
default=True,
help="Enables sync-cli metrics being sent to both source and destination",
cls=CustomOptionClass,
),
]


Expand Down
13 changes: 13 additions & 0 deletions datadog_sync/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,16 @@ class Origin(Enum):
ALL = "all"
SOURCE = "source"
DESTINATION = "destination"


class Metrics(Enum):
PREFIX = "datadog.org-sync"
ACTION = "action"
ORIGIN_PRODUCT = 24


# Status
class Status(Enum):
SUCCESS = "success"
SKIPPED = "skipped"
FAILURE = "failure"
2 changes: 1 addition & 1 deletion datadog_sync/model/notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Notebooks(BaseResource):
)
# Additional Notebooks specific attributes
pagination_config = PaginationConfig(
page_size=500,
page_size=100,
page_size_param="count",
page_number_param="start",
remaining_func=lambda idx, resp, page_size, page_number: (resp["meta"]["page"]["total_count"])
Expand Down
21 changes: 21 additions & 0 deletions datadog_sync/utils/base_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
find_attr,
ResourceConnectionError,
)
from datadog_sync.constants import Metrics

if TYPE_CHECKING:
from datadog_sync.utils.configuration import Configuration
Expand Down Expand Up @@ -218,3 +219,23 @@ def filter(self, resource: Dict) -> bool:
return True
# Filter was specified for resource type but resource did not match any
return False

async def _send_action_metrics(self, action: str, _id: str, status: str, tags: Optional[List[str]] = None) -> None:
if not tags:
tags = []
if _id:
tags.append(f"id:{_id}")
tags.append(f"action_type:{action}")
tags.append(f"status:{status}")
tags.append(f"resource_type:{self.resource_type}")
try:
await self.config.destination_client.send_metric(Metrics.ACTION.value, tags + ["client_type:destination"])
self.config.logger.debug(f"Sent metrics to destination for {self.resource_type}")
except Exception as e:
self.config.logger.debug(f"Failed to send metrics to destination for {self.resource_type}: {str(e)}")

try:
await self.config.source_client.send_metric(Metrics.ACTION.value, tags + ["client_type:source"])
self.config.logger.debug(f"Sent metrics to source for {self.resource_type}")
except Exception as e:
self.config.logger.debug(f"Failed to send metrics to source for {self.resource_type}: {str(e)}")
7 changes: 5 additions & 2 deletions datadog_sync/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Configuration(object):
cleanup: int
create_global_downtime: bool
validate: bool
send_metrics: bool
state: State
resources: Dict[str, BaseResource] = field(default_factory=dict)
resources_arg: List[str] = field(default_factory=list)
Expand Down Expand Up @@ -87,20 +88,21 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
# Initialize the datadog API Clients based on cmd
retry_timeout = kwargs.get("http_client_retry_timeout")
timeout = kwargs.get("http_client_timeout")
send_metrics = kwargs.get("send_metrics")

source_auth = {}
if k := kwargs.get("source_api_key"):
source_auth["apiKeyAuth"] = k
if k := kwargs.get("source_app_key"):
source_auth["appKeyAuth"] = k
source_client = CustomClient(source_api_url, source_auth, retry_timeout, timeout)
source_client = CustomClient(source_api_url, source_auth, retry_timeout, timeout, send_metrics)

destination_auth = {}
if k := kwargs.get("destination_api_key"):
destination_auth["apiKeyAuth"] = k
if k := kwargs.get("destination_app_key"):
destination_auth["appKeyAuth"] = k
destination_client = CustomClient(destination_api_url, destination_auth, retry_timeout, timeout)
destination_client = CustomClient(destination_api_url, destination_auth, retry_timeout, timeout, send_metrics)

# Additional settings
force_missing_dependencies = kwargs.get("force_missing_dependencies")
Expand Down Expand Up @@ -133,6 +135,7 @@ def build_config(cmd: Command, **kwargs: Optional[Any]) -> Configuration:
cleanup=cleanup,
create_global_downtime=create_global_downtime,
validate=validate,
send_metrics=send_metrics,
state=state,
)

Expand Down
54 changes: 48 additions & 6 deletions datadog_sync/utils/custom_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import asyncio
from datetime import datetime
import ssl
import time
import logging
import platform
from dataclasses import dataclass
from typing import Awaitable, Dict, Optional, Callable
from typing import Awaitable, Dict, List, Optional, Callable
from urllib.parse import urlparse

import aiohttp
import certifi

from datadog_sync.constants import LOGGER_NAME
from datadog_sync.constants import LOGGER_NAME, Metrics
from datadog_sync.utils.resource_utils import CustomClientHTTPError

log = logging.getLogger(LOGGER_NAME)
Expand Down Expand Up @@ -66,13 +67,21 @@ async def wrapper(*args, **kwargs):


class CustomClient:
def __init__(self, host: Optional[str], auth: Dict[str, str], retry_timeout: int, timeout: int) -> None:
def __init__(
self,
host: Optional[str],
auth: Dict[str, str],
retry_timeout: int,
timeout: int,
send_metrics: bool,
) -> None:
self.url_object = UrlObject.from_str(host)
self.timeout = timeout
self.session = None
self.retry_timeout = retry_timeout
self.default_pagination = PaginationConfig()
self.auth = auth
self.send_metrics = send_metrics

async def _init_session(self):
ssl_context = ssl.create_default_context(cafile=certifi.where())
Expand Down Expand Up @@ -124,7 +133,8 @@ async def wrapper(*args, **kwargs):
log.debug(
f"fetching {args[0]} "
f"{pagination_config.page_number_param}: {page_number} "
f"{pagination_config.page_size_param}: {page_size}"
f"{pagination_config.page_size_param}: {page_size} "
f"remaining: {remaining}"
)
params = {
pagination_config.page_size_param: page_size,
Expand Down Expand Up @@ -152,6 +162,29 @@ async def wrapper(*args, **kwargs):

return wrapper

async def send_metric(self, metric: str, tags: List[str] = None) -> None:
if not self.send_metrics:
return None
path = "/api/v2/series"
timestamp = int(datetime.now().timestamp())
full_metric = f"{Metrics.PREFIX.value}.{metric}"
body = {
"series": [
{
"metadata": {
"origin": {
"origin_product": Metrics.ORIGIN_PRODUCT.value,
},
},
"metric": full_metric,
"type": 0,
"points": [{"timestamp": timestamp, "value": 1}],
"tags": tags,
}
]
}
await self.post(path, body)


def build_default_headers(auth_obj: Dict[str, str]) -> Dict[str, str]:
headers = {
Expand Down Expand Up @@ -215,11 +248,20 @@ def from_str(cls, url: str):
domain = ".".join(res[-2:])
subdomain = ".".join(res[:-2])

return cls(protocol=parsed_url.scheme, domain=domain, subdomain=subdomain, _default=url)
return cls(
protocol=parsed_url.scheme,
domain=domain,
subdomain=subdomain,
_default=url,
)
return cls()

def build_url(
self, path, protocol: Optional[str] = None, domain: Optional[str] = None, subdomain: Optional[str] = None
self,
path,
protocol: Optional[str] = None,
domain: Optional[str] = None,
subdomain: Optional[str] = None,
) -> str:
if all(arg is None for arg in (protocol, domain, subdomain)):
return self._default + path
Expand Down
50 changes: 36 additions & 14 deletions datadog_sync/utils/resources_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from click import confirm
from pprint import pformat

from datadog_sync.constants import TRUE, FALSE, FORCE, Origin
from datadog_sync.constants import TRUE, FALSE, FORCE, Command, Origin, Status
from datadog_sync.utils.resource_utils import (
CustomClientHTTPError,
ResourceConnectionError,
Expand Down Expand Up @@ -108,29 +108,42 @@ async def _apply_resource_cb(self, q_item: List) -> None:

if _id in self.config.state.destination[resource_type]:
diff = check_diff(r_class.resource_config, resource, self.config.state.destination[resource_type][_id])
if diff:
self.config.logger.debug("running update", resource_type=resource_type, _id=_id)

prep_resource(r_class.resource_config, resource)
await r_class._update_resource(_id, resource)
if not diff:
raise SkipResource(_id, resource_type, "No differences detected.")

self.config.logger.debug("finished update", resource_type=resource_type, _id=_id)
self.config.logger.debug(f"Running update for {resource_type} with {_id}")
prep_resource(r_class.resource_config, resource)
await r_class._update_resource(_id, resource)
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SUCCESS.value, tags=["action_sub_type:update"]
)
self.config.logger.debug(f"Finished update for {resource_type} with {_id}")
else:
self.config.logger.debug("running create", resource_type=resource_type, _id=_id)

self.config.logger.debug(f"Running create for {resource_type} with id: {_id}")
prep_resource(r_class.resource_config, resource)
await r_class._create_resource(_id, resource)
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SUCCESS.value, tags=["action_sub_type:create"]
)
self.config.logger.debug(f"finished create for {resource_type} with id: {_id}")

self.config.logger.debug("finished create", resource_type=resource_type, _id=_id)
self.worker.counter.increment_success()

except SkipResource as e:
self.config.logger.info(str(e), resource_type=resource_type, _id=_id)
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:up_to_date"]
)
except ResourceConnectionError:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(
Command.SYNC.value, _id, Status.SKIPPED.value, tags=["reason:connection_error"]
)
except Exception as e:
self.worker.counter.increment_failure()
self.config.logger.error(str(e), resource_type=resource_type, _id=_id)
self.config.logger.error(str(e))
await r_class._send_action_metrics(Command.SYNC.value, _id, Status.FAILURE.value)
finally:
# always place in done queue regardless of exception thrown
self.sorter.done(q_item)
Expand Down Expand Up @@ -207,7 +220,7 @@ async def import_resources(self) -> None:
for resource in v:
self.worker.work_queue.put_nowait((k, resource))
await self.worker.schedule_workers_with_pbar(total=total)
self.config.logger.info(f"finished importng individual resource items: {self.worker.counter}.")
self.config.logger.info(f"finished importing individual resource items: {self.worker.counter}.")

# Dump resources
self.config.state.dump_state(Origin.SOURCE)
Expand All @@ -222,12 +235,16 @@ async def _import_get_resources_cb(self, resource_type: str, tmp_storage) -> Non
get_resp = await r_class._get_resources(self.config.source_client)
self.worker.counter.increment_success()
tmp_storage[resource_type] = get_resp
except TimeoutError:
self.worker.counter.increment_failure()
self.config.logger.error(f"TimeoutError while getting resources {resource_type}")
except Exception as e:
self.config.logger.error(f"error while getting resources: {str(e)}", resource_type=resource_type)
self.worker.counter.increment_failure()
self.config.logger.error(f"Error while getting resources {resource_type}: {str(e)}")

async def _import_resource(self, q_item: List) -> None:
resource_type, resource = q_item
_id = resource.get("id")
r_class = self.config.resources[resource_type]

if not r_class.filter(resource):
Expand All @@ -237,11 +254,14 @@ async def _import_resource(self, q_item: List) -> None:
try:
await r_class._import_resource(resource=resource)
self.worker.counter.increment_success()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SUCCESS.value)
except SkipResource as e:
self.worker.counter.increment_skipped()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.SKIPPED.value)
self.config.logger.debug(str(e))
except Exception as e:
self.worker.counter.increment_failure()
await r_class._send_action_metrics(Command.IMPORT.value, _id, Status.FAILURE.value)
self.config.logger.error(f"error while importing resource: {str(e)}", resource_type=resource_type)

async def _force_missing_dep_import_cb(self, q_item: List):
Expand All @@ -268,9 +288,11 @@ async def _cleanup_worker(self, q_item: List) -> None:

await r_class._delete_resource(_id)
self.worker.counter.increment_success()
await r_class._send_action_metrics("delete", _id, Status.SUCCESS.value)
except Exception as e:
self.config.logger.error(f"error deleting resource: {str(e)}", resource_type=resource_type, _id=_id)
self.worker.counter.increment_failure()
await r_class._send_action_metrics("delete", _id, Status.FAILURE.value)
self.config.logger.error(f"error deleting resource {resource_type} with id {_id}: {str(e)}")
finally:
if not r_class.resource_config.concurrent:
r_class.resource_config.async_lock.release()
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def vcr_config():

@pytest.fixture(scope="module")
def config():
custom_client = CustomClient(None, {"apiKeyAuth": "123", "appKeyAuth": "123"}, None, None)
custom_client = CustomClient(None, {"apiKeyAuth": "123", "appKeyAuth": "123"}, None, None, True)

cfg = Configuration(
logger=logging.getLogger(__name__),
Expand All @@ -138,6 +138,7 @@ def config():
create_global_downtime=False,
validate=False,
state=State(),
send_metrics=True,
)

resources = init_resources(cfg)
Expand Down
Loading

0 comments on commit dfdb3de

Please sign in to comment.