diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index 9c25aed0..e0c8d357 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -774,13 +774,67 @@ def gauge( """ return self._report(metric, "g", value, tags, sample_rate) + # Minimum Datadog Agent version: 7.40.0 + def gauge_with_timestamp( + self, + metric, # type: Text + value, # type: float + timestamp, # type: int + tags=None, # type: Optional[List[str]] + sample_rate=None, # type: Optional[float] + ): # type(...) -> None + """u + Record the value of a gauge with a Unix timestamp (in seconds), + optionally setting a list of tags and a sample rate. + + Minimum Datadog Agent version: 7.40.0 + + >>> statsd.gauge("users.online", 123, 1713804588) + >>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"]) + """ + return self._report(metric, "g", value, tags, sample_rate, timestamp) + + def count( + self, + metric, # type: Text + value, # type: float + tags=None, # type: Optional[List[str]] + sample_rate=None, # type: Optional[float] + ): # type(...) -> None + """ + Count tracks how many times something happened per second, tags and a sample + rate. + + >>> statsd.count("page.views", 123) + """ + self._report(metric, "c", value, tags, sample_rate) + + # Minimum Datadog Agent version: 7.40.0 + def count_with_timestamp( + self, + metric, # type: Text + value, # type: float + timestamp=0, # type: int + tags=None, # type: Optional[List[str]] + sample_rate=None, # type: Optional[float] + ): # type(...) -> None + """ + Count how many times something happened at a given Unix timestamp in seconds, + tags and a sample rate. + + Minimum Datadog Agent version: 7.40.0 + + >>> statsd.count("files.transferred", 124, timestamp=1713804588) + """ + self._report(metric, "c", value, tags, sample_rate, timestamp) + def increment( self, metric, # type: Text value=1, # type: float tags=None, # type: Optional[List[str]] sample_rate=None, # type: Optional[float] - ): # type: (...) -> None + ): # type(...) -> None """ Increment a counter, optionally setting a value, tags and a sample rate. @@ -934,23 +988,27 @@ def close_socket(self): log.error("Unexpected error: %s", str(e)) self.telemetry_socket = None - def _serialize_metric(self, metric, metric_type, value, tags, sample_rate=1): + def _serialize_metric( + self, metric, metric_type, value, tags, sample_rate=1, timestamp=0 + ): # Create/format the metric packet - return "%s%s:%s|%s%s%s%s" % ( + return "%s%s:%s|%s%s%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, metric_type, ("|@" + text(sample_rate)) if sample_rate != 1 else "", ("|#" + ",".join(normalize_tags(tags))) if tags else "", - ("|c:" + self._container_id if self._container_id else "") + ("|c:" + self._container_id if self._container_id else ""), + ("|T" + text(timestamp)) if timestamp > 0 else "", ) - def _report(self, metric, metric_type, value, tags, sample_rate): + def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0): """ Create a metric packet and send it. - More information about the packets' format: http://docs.datadoghq.com/guides/dogstatsd/ + More information about the packets' format: + https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol """ if value is None: return @@ -967,9 +1025,16 @@ def _report(self, metric, metric_type, value, tags, sample_rate): if sample_rate != 1 and random() > sample_rate: return + # timestamps (protocol v1.3) only allowed on gauges and counts + allows_timestamp = metric_type == "g" or metric_type == "c" + if not allows_timestamp or timestamp < 0: + timestamp = 0 + # Resolve the full tag list tags = self._add_constant_tags(tags) - payload = self._serialize_metric(metric, metric_type, value, tags, sample_rate) + payload = self._serialize_metric( + metric, metric_type, value, tags, sample_rate, timestamp + ) # Send it self._send(payload) diff --git a/tests/unit/dogstatsd/test_statsd.py b/tests/unit/dogstatsd/test_statsd.py index 24816344..a5fe7a0e 100644 --- a/tests/unit/dogstatsd/test_statsd.py +++ b/tests/unit/dogstatsd/test_statsd.py @@ -304,10 +304,31 @@ def test_set(self): self.statsd.set('set', 123) self.assert_equal_telemetry('set:123|s\n', self.recv(2)) + def test_report(self): + self.statsd._report('report', 'g', 123.4, tags=None, sample_rate=None) + self.assert_equal_telemetry('report:123.4|g\n', self.recv(2)) + + def test_report_metric_with_unsupported_ts(self): + self.statsd._reset_telemetry() + self.statsd._report('report', 'h', 123.5, tags=None, sample_rate=None, timestamp=100) + self.assert_equal_telemetry('report:123.5|h\n', self.recv(2)) + + self.statsd._reset_telemetry() + self.statsd._report('set', 's', 123, tags=None, sample_rate=None, timestamp=100) + self.assert_equal_telemetry('set:123|s\n', self.recv(2)) + def test_gauge(self): self.statsd.gauge('gauge', 123.4) self.assert_equal_telemetry('gauge:123.4|g\n', self.recv(2)) + def test_gauge_with_ts(self): + self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=1066) + self.assert_equal_telemetry("gauge:123.4|g|T1066\n", self.recv(2)) + + def test_gauge_with_invalid_ts_should_be_ignored(self): + self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=-500) + self.assert_equal_telemetry("gauge:123.4|g\n", self.recv(2)) + def test_counter(self): self.statsd.increment('page.views') self.statsd.flush() @@ -328,6 +349,26 @@ def test_counter(self): self.statsd.flush() self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2)) + def test_count(self): + self.statsd.count('page.views', 11) + self.statsd.flush() + self.assert_equal_telemetry('page.views:11|c\n', self.recv(2)) + + def test_count_with_ts(self): + self.statsd.count_with_timestamp("page.views", 1, timestamp=1066) + self.statsd.flush() + self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2)) + + self.statsd._reset_telemetry() + self.statsd.count_with_timestamp("page.views", 11, timestamp=2121) + self.statsd.flush() + self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2)) + + def test_count_with_invalid_ts_should_be_ignored(self): + self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066) + self.statsd.flush() + self.assert_equal_telemetry("page.views:1|c\n", self.recv(2)) + def test_histogram(self): self.statsd.histogram('histo', 123.4) self.assert_equal_telemetry('histo:123.4|h\n', self.recv(2)) @@ -518,7 +559,6 @@ def func(): # check that the method does not fail with a small payload self.statsd.event("title", "message") - def test_service_check(self): now = int(time.time()) self.statsd.service_check( @@ -1106,7 +1146,6 @@ def test_batching_sequential(self): self.recv(2), telemetry=expected_metrics1) - expected2 = 'page.views:123|g\ntimer:123|ms\n' self.assert_equal_telemetry( expected2, @@ -1276,7 +1315,6 @@ def test_telemetry(self): self.statsd.bytes_dropped_queue = 8 self.statsd.packets_dropped_queue = 9 - self.statsd.open_buffer() self.statsd.gauge('page.views', 123) self.statsd.close_buffer() @@ -1383,7 +1421,6 @@ def test_telemetry_flush_interval_batch(self): # assert that _last_flush_time has been updated self.assertTrue(time1 < dogstatsd._last_flush_time) - def test_dedicated_udp_telemetry_dest(self): listener_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) listener_sock.bind(('localhost', 0))