Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support passing Unix timestamps to dogstatsd #831

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 72 additions & 7 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,13 +774,67 @@ def gauge(
"""
return self._report(metric, "g", value, tags, sample_rate)

# Minimum Datadog Agent version: 7.40.0
def gauge_with_timestamp(
self,
metric, # type: Text
value, # type: float
timestamp, # type: int
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type(...) -> None
"""u
Record the value of a gauge with a Unix timestamp (in seconds),
optionally setting a list of tags and a sample rate.

Minimum Datadog Agent version: 7.40.0

>>> statsd.gauge("users.online", 123, 1713804588)
>>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"])
"""
return self._report(metric, "g", value, tags, sample_rate, timestamp)

def count(
self,
metric, # type: Text
value, # type: float
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type(...) -> None
"""
Count tracks how many times something happened per second, tags and a sample
rate.

>>> statsd.count("page.views", 123)
"""
self._report(metric, "c", value, tags, sample_rate)

# Minimum Datadog Agent version: 7.40.0
def count_with_timestamp(
self,
metric, # type: Text
value, # type: float
timestamp=0, # type: int
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type(...) -> None
"""
Count how many times something happened at a given Unix timestamp in seconds,
tags and a sample rate.

Minimum Datadog Agent version: 7.40.0

>>> statsd.count("files.transferred", 124, timestamp=1713804588)
"""
self._report(metric, "c", value, tags, sample_rate, timestamp)

def increment(
self,
metric, # type: Text
value=1, # type: float
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type: (...) -> None
): # type(...) -> None
"""
Increment a counter, optionally setting a value, tags and a sample
rate.
Expand Down Expand Up @@ -934,23 +988,27 @@ def close_socket(self):
log.error("Unexpected error: %s", str(e))
self.telemetry_socket = None

def _serialize_metric(self, metric, metric_type, value, tags, sample_rate=1):
def _serialize_metric(
self, metric, metric_type, value, tags, sample_rate=1, timestamp=0
):
# Create/format the metric packet
return "%s%s:%s|%s%s%s%s" % (
return "%s%s:%s|%s%s%s%s%s" % (
(self.namespace + ".") if self.namespace else "",
metric,
value,
metric_type,
("|@" + text(sample_rate)) if sample_rate != 1 else "",
("|#" + ",".join(normalize_tags(tags))) if tags else "",
("|c:" + self._container_id if self._container_id else "")
("|c:" + self._container_id if self._container_id else ""),
("|T" + text(timestamp)) if timestamp > 0 else "",
)

def _report(self, metric, metric_type, value, tags, sample_rate):
def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
"""
Create a metric packet and send it.

More information about the packets' format: http://docs.datadoghq.com/guides/dogstatsd/
More information about the packets' format:
https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol
"""
if value is None:
return
Expand All @@ -967,9 +1025,16 @@ def _report(self, metric, metric_type, value, tags, sample_rate):
if sample_rate != 1 and random() > sample_rate:
return

# timestamps (protocol v1.3) only allowed on gauges and counts
allows_timestamp = metric_type == "g" or metric_type == "c"
if not allows_timestamp or timestamp < 0:
timestamp = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that would be worth a warning? It's a developer error at this point (somebody is calling _report with a wrong metric_type)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes worth a test (as said in the other comment) but maybe we don't want to flood stdout with logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test but not sure about adding a warning. In other SDKs we don't log a warning if invalid timestamp used. Here if they are trying to send a metric that doesn't support timestamps then they're accessing methods they shouldn't be in the first place.


# Resolve the full tag list
tags = self._add_constant_tags(tags)
payload = self._serialize_metric(metric, metric_type, value, tags, sample_rate)
payload = self._serialize_metric(
metric, metric_type, value, tags, sample_rate, timestamp
)

# Send it
self._send(payload)
Expand Down
45 changes: 41 additions & 4 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,31 @@ def test_set(self):
self.statsd.set('set', 123)
self.assert_equal_telemetry('set:123|s\n', self.recv(2))

def test_report(self):
self.statsd._report('report', 'g', 123.4, tags=None, sample_rate=None)
self.assert_equal_telemetry('report:123.4|g\n', self.recv(2))

def test_report_metric_with_unsupported_ts(self):
self.statsd._reset_telemetry()
self.statsd._report('report', 'h', 123.5, tags=None, sample_rate=None, timestamp=100)
self.assert_equal_telemetry('report:123.5|h\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd._report('set', 's', 123, tags=None, sample_rate=None, timestamp=100)
self.assert_equal_telemetry('set:123|s\n', self.recv(2))

def test_gauge(self):
self.statsd.gauge('gauge', 123.4)
self.assert_equal_telemetry('gauge:123.4|g\n', self.recv(2))

def test_gauge_with_ts(self):
self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=1066)
self.assert_equal_telemetry("gauge:123.4|g|T1066\n", self.recv(2))

def test_gauge_with_invalid_ts_should_be_ignored(self):
self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=-500)
self.assert_equal_telemetry("gauge:123.4|g\n", self.recv(2))

def test_counter(self):
self.statsd.increment('page.views')
self.statsd.flush()
Expand All @@ -328,6 +349,26 @@ def test_counter(self):
self.statsd.flush()
self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2))

def test_count(self):
self.statsd.count('page.views', 11)
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

def test_count_with_ts(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=1066)
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.count_with_timestamp("page.views", 11, timestamp=2121)
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2))

def test_count_with_invalid_ts_should_be_ignored(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066)
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c\n", self.recv(2))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be worth a direct call to _report with a timestamp and a wrong metric type to make sure it sets the timestamp to 0? Not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I feel there should be one more test to make sure that if statement that checks metric types with timestamps is covered correctly.

def test_histogram(self):
self.statsd.histogram('histo', 123.4)
self.assert_equal_telemetry('histo:123.4|h\n', self.recv(2))
Expand Down Expand Up @@ -518,7 +559,6 @@ def func():
# check that the method does not fail with a small payload
self.statsd.event("title", "message")


def test_service_check(self):
now = int(time.time())
self.statsd.service_check(
Expand Down Expand Up @@ -1106,7 +1146,6 @@ def test_batching_sequential(self):
self.recv(2),
telemetry=expected_metrics1)


expected2 = 'page.views:123|g\ntimer:123|ms\n'
self.assert_equal_telemetry(
expected2,
Expand Down Expand Up @@ -1276,7 +1315,6 @@ def test_telemetry(self):
self.statsd.bytes_dropped_queue = 8
self.statsd.packets_dropped_queue = 9


self.statsd.open_buffer()
self.statsd.gauge('page.views', 123)
self.statsd.close_buffer()
Expand Down Expand Up @@ -1383,7 +1421,6 @@ def test_telemetry_flush_interval_batch(self):
# assert that _last_flush_time has been updated
self.assertTrue(time1 < dogstatsd._last_flush_time)


def test_dedicated_udp_telemetry_dest(self):
listener_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener_sock.bind(('localhost', 0))
Expand Down
Loading