Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tracer): [SVLS-5672] DynamoDB PutItem pointers #10824

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import wrapt

from ddtrace import config
from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace.span import Span
from ddtrace._trace.utils import extract_DD_context_from_messages
Expand Down Expand Up @@ -629,6 +630,7 @@ def _on_botocore_patched_api_call_success(ctx, response):
set_botocore_response_metadata_tags(span, response)

for span_pointer_description in extract_span_pointers_from_successful_botocore_response(
dynamodb_primary_key_names_for_tables=config.botocore.dynamodb_primary_key_names_for_tables,
endpoint_name=ctx.get_item("endpoint_name"),
operation_name=ctx.get_item("operation"),
request_parameters=ctx.get_item("params"),
Expand Down
152 changes: 152 additions & 0 deletions ddtrace/_trace/utils_botocore/span_pointers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Set

from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace._span_pointer import _SpanPointerDirection
Expand All @@ -13,7 +14,19 @@
log = get_logger(__name__)


_DynamoDBTableName = str
_DynamoDBItemFieldName = str
_DynamoDBItemTypeTag = str

_DynamoDBItemValue = Dict[_DynamoDBItemTypeTag, Any]
_DynamoDBItem = Dict[_DynamoDBItemFieldName, _DynamoDBItemValue]

_DynamoDBItemPrimaryKeyValue = Dict[_DynamoDBItemTypeTag, str] # must be length 1
_DynamoDBItemPrimaryKey = Dict[_DynamoDBItemFieldName, _DynamoDBItemPrimaryKeyValue]


def extract_span_pointers_from_successful_botocore_response(
dynamodb_primary_key_names_for_tables: Dict[_DynamoDBTableName, Set[_DynamoDBItemFieldName]],
endpoint_name: str,
operation_name: str,
request_parameters: Dict[str, Any],
Expand All @@ -22,9 +35,148 @@ def extract_span_pointers_from_successful_botocore_response(
if endpoint_name == "s3":
return _extract_span_pointers_for_s3_response(operation_name, request_parameters, response)

if endpoint_name == "dynamodb":
return _extract_span_pointers_for_dynamodb_response(
dynamodb_primary_key_names_for_tables, operation_name, request_parameters
)

return []


def _extract_span_pointers_for_dynamodb_response(
dynamodb_primary_key_names_for_tables: Dict[_DynamoDBTableName, Set[_DynamoDBItemFieldName]],
operation_name: str,
request_parameters: Dict[str, Any],
) -> List[_SpanPointerDescription]:
if operation_name == "PutItem":
return _extract_span_pointers_for_dynamodb_putitem_response(
dynamodb_primary_key_names_for_tables, request_parameters
)

return []


def _extract_span_pointers_for_dynamodb_putitem_response(
dynamodb_primary_key_names_for_tables: Dict[_DynamoDBTableName, Set[_DynamoDBItemFieldName]],
request_parameters: Dict[str, Any],
) -> List[_SpanPointerDescription]:
try:
table_name = request_parameters["TableName"]
item = request_parameters["Item"]

return [
_aws_dynamodb_item_span_pointer_description(
pointer_direction=_SpanPointerDirection.DOWNSTREAM,
table_name=table_name,
primary_key=_aws_dynamodb_item_primary_key_from_item(
dynamodb_primary_key_names_for_tables[table_name], item
),
)
]

except Exception as e:
log.warning(
"failed to generate DynamoDB.PutItem span pointer: %s",
str(e),
)
return []


def _aws_dynamodb_item_primary_key_from_item(
primary_key_field_names: Set[_DynamoDBItemFieldName],
item: _DynamoDBItem,
) -> _DynamoDBItemPrimaryKey:
if len(primary_key_field_names) not in (1, 2):
raise ValueError(f"unexpected number of primary key fields: {len(primary_key_field_names)}")

return {
primary_key_field_name: _aws_dynamodb_extract_and_verify_primary_key_field_value_item(
item, primary_key_field_name
)
for primary_key_field_name in primary_key_field_names
}


def _aws_dynamodb_item_span_pointer_description(
pointer_direction: _SpanPointerDirection,
table_name: _DynamoDBTableName,
primary_key: _DynamoDBItemPrimaryKey,
) -> _SpanPointerDescription:
return _SpanPointerDescription(
pointer_kind="aws.dynamodb.item",
pointer_direction=pointer_direction,
pointer_hash=_aws_dynamodb_item_span_pointer_hash(table_name, primary_key),
extra_attributes={},
)


def _aws_dynamodb_extract_and_verify_primary_key_field_value_item(
item: _DynamoDBItem,
primary_key_field_name: _DynamoDBItemFieldName,
) -> _DynamoDBItemPrimaryKeyValue:
if primary_key_field_name not in item:
raise ValueError(f"missing primary key field: {primary_key_field_name}")

value_object = item[primary_key_field_name]

if len(value_object) != 1:
raise ValueError(f"primary key field {primary_key_field_name} must have exactly one value: {len(value_object)}")

value_type, value_data = next(iter(value_object.items()))
if value_type not in ("S", "N", "B"):
raise ValueError(f"unexpected primary key field {primary_key_field_name} value type: {value_type}")

if not isinstance(value_data, str):
raise ValueError(f"unexpected primary key field {primary_key_field_name} value data type: {type(value_data)}")

return {value_type: value_data}


def _aws_dynamodb_item_span_pointer_hash(table_name: _DynamoDBTableName, primary_key: _DynamoDBItemPrimaryKey) -> str:
if len(primary_key) == 1:
key, value_object = next(iter(primary_key.items()))
encoded_key_1 = key.encode("utf-8")
encoded_value_1 = _aws_dynamodb_item_encode_primary_key_value(value_object)
encoded_key_2 = b""
encoded_value_2 = b""

elif len(primary_key) == 2:
(key_1, value_object_1), (key_2, value_object_2) = sorted(
primary_key.items(), key=lambda x: x[0].encode("utf-8")
)
encoded_key_1 = key_1.encode("utf-8")
encoded_value_1 = _aws_dynamodb_item_encode_primary_key_value(value_object_1)
encoded_key_2 = key_2.encode("utf-8")
encoded_value_2 = _aws_dynamodb_item_encode_primary_key_value(value_object_2)

else:
raise ValueError(f"unexpected number of primary key fields: {len(primary_key)}")

return _standard_hashing_function(
table_name.encode("utf-8"),
encoded_key_1,
encoded_value_1,
encoded_key_2,
encoded_value_2,
)


def _aws_dynamodb_item_encode_primary_key_value(value_object: _DynamoDBItemPrimaryKeyValue) -> bytes:
if len(value_object) != 1:
raise ValueError(f"primary key value object must have exactly one field: {len(value_object)}")

value_type, value = next(iter(value_object.items()))

if value_type == "S":
return value.encode("utf-8")

if value_type in ("N", "B"):
# these should already be here as ASCII strings
return value.encode("ascii")

raise ValueError(f"unknown primary key value type: {value_type}")


def _extract_span_pointers_for_s3_response(
operation_name: str,
request_parameters: Dict[str, Any],
Expand Down
29 changes: 29 additions & 0 deletions ddtrace/contrib/internal/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
Trace queries to aws api done via botocore client
"""
import collections
import json
import os
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Set # noqa:F401
from typing import Union # noqa:F401
Expand Down Expand Up @@ -59,6 +61,32 @@
log = get_logger(__name__)


def _load_dynamodb_primary_key_names_for_tables() -> Dict[str, Set[str]]:
try:
encoded_table_primary_keys = os.getenv("DD_BOTOCORE_DYNAMODB_TABLE_PRIMARY_KEYS", "{}")
raw_table_primary_keys = json.loads(encoded_table_primary_keys)

table_primary_keys = {}
for table, primary_keys in raw_table_primary_keys.items():
if not isinstance(table, str):
raise ValueError(f"expected string table name: {table}")

if not isinstance(primary_keys, list):
raise ValueError(f"expected list of primary keys: {primary_keys}")

unique_primary_keys = set(primary_keys)
if not len(unique_primary_keys) == len(primary_keys):
raise ValueError(f"expected unique primary keys: {primary_keys}")

table_primary_keys[table] = unique_primary_keys

return table_primary_keys

except Exception as e:
log.warning("failed to load DD_BOTOCORE_DYNAMODB_TABLE_PRIMARY_KEYS: %s", e)
return {}


# Botocore default settings
config._add(
"botocore",
Expand All @@ -73,6 +101,7 @@
"instrument_internals": asbool(os.getenv("DD_BOTOCORE_INSTRUMENT_INTERNALS", default=False)),
"propagation_enabled": asbool(os.getenv("DD_BOTOCORE_PROPAGATION_ENABLED", default=False)),
"empty_poll_enabled": asbool(os.getenv("DD_BOTOCORE_EMPTY_POLL_ENABLED", default=True)),
"dynamodb_primary_key_names_for_tables": _load_dynamodb_primary_key_names_for_tables(),
},
)

Expand Down
68 changes: 68 additions & 0 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,74 @@ def test_dynamodb_put_get(self):
assert span.service == "test-botocore-tracing.dynamodb"
assert span.resource == "botocore.parsers.parse"

span = spans[2]
assert span.get_tag("aws.operation") == "PutItem"
# Since the dynamodb_primary_key_names_for_tables isn't configured, we
# cannot create span pointers for this item.
assert not span._links

@pytest.mark.skipif(
PYTHON_VERSION_INFO < (3, 8),
reason="Skipping for older py versions whose latest supported moto versions don't have the right dynamodb api",
)
@mock_dynamodb
def test_dynamodb_put_get_with_table_primary_key_mapping(self):
ddb = self.session.create_client("dynamodb", region_name="us-west-2")
Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(ddb)

with self.override_config(
"botocore",
dict(
instrument_internals=True,
dynamodb_primary_key_names_for_tables={
"foobar": {"myattr"},
},
),
):
ddb.create_table(
TableName="foobar",
AttributeDefinitions=[{"AttributeName": "myattr", "AttributeType": "S"}],
KeySchema=[{"AttributeName": "myattr", "KeyType": "HASH"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName="foobar", Item={"myattr": {"S": "baz"}})
ddb.get_item(TableName="foobar", Key={"myattr": {"S": "baz"}})

spans = self.get_spans()
assert spans
span = spans[0]
assert len(spans) == 6
assert_is_measured(span)
assert span.get_tag("aws.operation") == "CreateTable"
assert span.get_tag("component") == "botocore"
assert span.get_tag("span.kind"), "client"
assert_span_http_status_code(span, 200)
assert span.service == "test-botocore-tracing.dynamodb"
assert span.resource == "dynamodb.createtable"

span = spans[1]
assert span.name == "botocore.parsers.parse"
assert span.get_tag("component") == "botocore"
assert span.get_tag("span.kind"), "client"
assert span.service == "test-botocore-tracing.dynamodb"
assert span.resource == "botocore.parsers.parse"

span = spans[2]
assert span.get_tag("aws.operation") == "PutItem"
# This span pointer is only available if the
# dynamodb_primary_key_names_for_tables is properly configured with the
# table and its primary key field names.
assert span._links == [
_SpanPointer(
pointer_kind="aws.dynamodb.item",
pointer_direction=_SpanPointerDirection.DOWNSTREAM,
# We have more detailed tests for the hashing behavior
# elsewhere. Here we just want to make sure that the pointer is
# correctly attached to the span.
pointer_hash="de960284e8cba01c46f87b102ab1c9cb",
),
]

@mock_s3
def test_s3_client(self):
s3 = self.session.create_client("s3", region_name="us-west-2")
Expand Down
Loading
Loading