Skip to content

Commit

Permalink
Include S3 object details in rejection logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sq2gxo committed Apr 15, 2024
1 parent cd4fa8e commit f52ef33
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
3 changes: 3 additions & 0 deletions src/log/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def process_log_object(log_processing_rule: LogProcessingRule, bucket: str, key:
context_log_attributes.update(
log_processing_rule.get_processing_log_annotations())

for log_sink in log_sinks:
log_sink.set_s3_source(bucket, key)

# Count log entries (can't len() a stream)
num_log_entries = 0
decompressed_log_object_size = 0
Expand Down
16 changes: 10 additions & 6 deletions src/log/sinks/dynatrace.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, dt_url: str, dt_api_key_parameter: str, verify_ssl: bool = Tr
self._approx_buffered_messages_size = LIST_BRACKETS_LENGTH
self._messages = []
self._batch_num = 1
self._s3_source = ""

retry_strategy = Retry(
total = 3,
Expand Down Expand Up @@ -96,6 +97,9 @@ def is_empty(self):
def get_environment_url(self):
return self._environment_url

def set_s3_s3_source(self, bucket: str, key: str):
self._s3_s3_source = f"{bucket}:{key}"

def push(self, message: dict):
# Validate that the message size doesn't reach DT limits. If so,
# truncate the "content" field.
Expand Down Expand Up @@ -128,6 +132,7 @@ def empty_sink(self):
self._messages = []
self._approx_buffered_messages_size = LIST_BRACKETS_LENGTH
self._batch_num = 1
self._s3_source = ""

def check_log_message_size_and_truncate(self, message: dict):
'''
Expand Down Expand Up @@ -211,28 +216,27 @@ def ingest_logs(self, logs: list, session=None,
unit=MetricUnit.Count, value=1)
elif resp.status_code == 200:
logger.warning(
'%s: Parts of batch %s were not successfully posted: %s',tenant_id, batch_num, resp.text)
f"{tenant_id}: Parts of batch {batch_num} were not successfully posted: {resp.text}. Source file: {self._s3_source}")
metrics.add_metric(
name='DynatraceHTTP200PartialSuccess', unit=MetricUnit.Count, value=1)
elif resp.status_code == 400:
logger.warning(
'%s: Parts of batch %s were not successfully posted: %s',tenant_id, batch_num, resp.text)
f"{tenant_id}: Parts of batch {batch_num} were not successfully posted: {resp.text}. Source file: {self._s3_source}")
metrics.add_metric(
name='DynatraceHTTP400InvalidLogEntries', unit=MetricUnit.Count, value=1)
elif resp.status_code == 429:
logger.error("%s: Throttled by Dynatrace. Exhausted retry attempts...", tenant_id)
logger.error(f"{tenant_id}: Throttled by Dynatrace. Exhausted retry attempts... Source file: {self._s3_source}")
metrics.add_metric(name='DynatraceHTTP429Throttled',unit=MetricUnit.Count, value=1)
metrics.add_metric(name='DynatraceHTTPErrors', unit=MetricUnit.Count, value=1)
raise DynatraceThrottlingException
elif resp.status_code == 503:
logger.error("%s: Usable space limit reached. Exhausted retry attempts...",tenant_id)
logger.error(f"{tenant_id}: Usable space limit reached. Exhausted retry attempts... Source file: {self._s3_source}")
metrics.add_metric(name='DynatraceHTTP503SpaceLimitReached',unit=MetricUnit.Count, value=1)
metrics.add_metric(name='DynatraceHTTPErrors', unit=MetricUnit.Count, value=1)
raise DynatraceThrottlingException
else:
logger.error(
"%s: There was a HTTP %d error posting batch %d to Dynatrace. %s",
tenant_id,resp.status_code, batch_num, resp.text)
f"{tenant_id}: There was a HTTP {resp.status_code} error posting batch {batch_num} to Dynatrace: {resp.text}. Source file: {self._s3_source}")
metrics.add_metric(name='DynatraceHTTPErrors',
unit=MetricUnit.Count, value=1)
raise DynatraceIngestionException
Expand Down

0 comments on commit f52ef33

Please sign in to comment.