Skip to content

Commit

Permalink
Merge branch '2.12' into backport-10681-to-2.12
Browse files Browse the repository at this point in the history
  • Loading branch information
sabrenner committed Sep 23, 2024
2 parents 6353b3a + 6eb517d commit 6c6b724
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion ddtrace/contrib/internal/botocore/services/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _patched_kinesis_api_call(parent_ctx, original_func, instance, args, kwargs,
parent_ctx,
params,
time_estimate,
data_obj.get("_datadog"),
data_obj.get("_datadog") if data_obj else None,
record,
result,
config.botocore.propagation_enabled,
Expand Down
5 changes: 2 additions & 3 deletions ddtrace/contrib/internal/botocore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ def get_json_from_str(data_str: str) -> Tuple[str, Optional[Dict[str, Any]]]:
return None, data_obj


def get_kinesis_data_object(data: str) -> Tuple[str, Optional[Dict[str, Any]]]:
def get_kinesis_data_object(data: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
"""
:data: the data from a kinesis stream
The data from a kinesis stream comes as a string (could be json, base64 encoded, etc.)
We support injecting our trace context in the following three cases:
- json string
- byte encoded json string
- base64 encoded json string
If it's none of these, then we leave the message as it is.
If it's none of these, then we return None
"""

# check if data is a json string
try:
return get_json_from_str(data)
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/contrib/internal/kafka/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def _instrument_message(messages, pin, start_ns, instance, err):

for message in messages:
if message is not None and first_message is not None:
core.set_item("kafka_topic", first_message.topic())
core.set_item("kafka_topic", str(first_message.topic()))
core.dispatch("kafka.consume.start", (instance, first_message, span))

span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE)
Expand All @@ -260,7 +260,7 @@ def _instrument_message(messages, pin, start_ns, instance, err):
if first_message is not None:
message_key = first_message.key() or ""
message_offset = first_message.offset() or -1
span.set_tag_str(kafkax.TOPIC, first_message.topic())
span.set_tag_str(kafkax.TOPIC, str(first_message.topic()))

# If this is a deserializing consumer, do not set the key as a tag since we
# do not have the serialization function
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
kafka: Fixed an issue where a ``TypeError`` exception would be raised if the first message's ``topic()`` returned ``None`` during consumption.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
kinesis: This fix resolves an issue where unparsable data in a Kinesis record would cause a NoneType error.
13 changes: 11 additions & 2 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2858,7 +2858,9 @@ def _test_kinesis_put_record_trace_injection(self, test_name, data, client=None,

return decoded_record_data

def _test_kinesis_put_records_trace_injection(self, test_name, data, client=None, enable_stream_arn=False):
def _test_kinesis_put_records_trace_injection(
self, test_name, data, client=None, enable_stream_arn=False, verify=True
):
if not client:
client = self.session.create_client("kinesis", region_name="us-east-1")

Expand All @@ -2870,7 +2872,8 @@ def _test_kinesis_put_records_trace_injection(self, test_name, data, client=None
client.put_records(StreamName=stream_name, Records=data, StreamARN=stream_arn)
else:
client.put_records(StreamName=stream_name, Records=data)

if not verify:
return None
# assert commons for span
span = self._kinesis_assert_spans()

Expand Down Expand Up @@ -3261,6 +3264,12 @@ def test_kinesis_put_records_newline_json_trace_injection(self):

assert decoded_record_data.endswith("\n")

@mock_kinesis
def test_kinesis_put_records_unparsable_data_object_avoid_nonetype_error(self):
# If the data is unparsable we should not error in tracer code
records = [{"Data": b"", "PartitionKey": "1234"}]
self._test_kinesis_put_records_trace_injection("unparsable_data_obj", records, verify=False)

@mock_kinesis
def test_kinesis_put_records_newline_bytes_trace_injection(self):
# (dict -> json string -> bytes + new line)[]
Expand Down

0 comments on commit 6c6b724

Please sign in to comment.