Skip to content

Commit

Permalink
Fix exception handling to suppress tracing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jp-agenta committed Sep 12, 2024
1 parent 2fe9f02 commit f259683
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 71 deletions.
33 changes: 9 additions & 24 deletions agenta-cli/agenta/sdk/decorators/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, Optional, Tuple, List
from importlib.metadata import version

from contextlib import suppress
from fastapi.middleware.cors import CORSMiddleware
from fastapi import Body, FastAPI, UploadFile, HTTPException

Expand Down Expand Up @@ -383,6 +383,9 @@ async def execute_function(
FINALSTEP = 0.001
NOFSTEPS = TIMEOUT / TIMESTEP

data = None
trace = {}

try:
result = (
await func(*args, **func_params["params"])
Expand All @@ -391,7 +394,10 @@ async def execute_function(
)

data = self.patch_result(result)
except Exception as e:
self.handle_exception(e)

with suppress(Exception):
if inline_trace:
if WAIT_FOR_SPANS:
remaining_steps = NOFSTEPS
Expand All @@ -406,30 +412,9 @@ async def execute_function(
else:
trace = ag.tracing.get_trace_id_only()

response = BaseResponse(data=data, trace=trace)

return response

except Exception as e:
if WAIT_FOR_SPANS:
remaining_steps = NOFSTEPS
response = BaseResponse(data=data, trace=trace)

while ag.tracing.is_processing() and remaining_steps > 0:
await asyncio.sleep(TIMESTEP)
remaining_steps -= 1

await asyncio.sleep(FINALSTEP)

trace = ag.tracing.get_inline_trace(trace_id_only=(not inline_trace))

log.info("========= Error ==========")
log.info("")

print("-> trace")
log.info(trace)
print("-> exception")

self.handle_exception(e)
return response

def handle_exception(self, e: Exception):
status_code = e.status_code if hasattr(e, "status_code") else 500
Expand Down
44 changes: 24 additions & 20 deletions agenta-cli/agenta/sdk/decorators/tracing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import inspect
from functools import wraps
from itertools import chain
from contextvars import ContextVar
from typing import Callable, Optional, Union, Any, Dict, List
from contextlib import suppress
from typing import Callable, Optional, Any, Dict, List

import agenta as ag

Expand Down Expand Up @@ -89,7 +89,7 @@ def patch(result: Any) -> Dict[str, Any]:
async def async_wrapper(*args, **kwargs):
async def wrapped_func(*args, **kwargs):
with ag.tracing.start_as_current_span(func.__name__, self.kind):
try:
with suppress(Exception):
rctx = tracing_context.get()
ag.tracing.set_attributes(
"metadata", {"config": rctx.get("config", {})}
Expand All @@ -109,8 +109,16 @@ async def wrapped_func(*args, **kwargs):
redact(parse(*args, **kwargs), self.ignore_inputs),
)

try:
result = await func(*args, **kwargs)
except Exception as e:
ag.tracing.record_exception(e)

ag.tracing.set_status("ERROR")

raise e

with suppress(Exception):
cost = 0.0
usage = {}
if isinstance(result, dict):
Expand Down Expand Up @@ -139,22 +147,15 @@ async def wrapped_func(*args, **kwargs):

ag.tracing.set_status("OK")

return result

except Exception as e:
ag.tracing.record_exception(e)

ag.tracing.set_status("ERROR")

raise e
return result

return await wrapped_func(*args, **kwargs)

@wraps(func)
def sync_wrapper(*args, **kwargs):
def wrapped_func(*args, **kwargs):
with ag.tracing.start_as_current_span(func.__name__, self.kind):
try:
with suppress(Exception):
rctx = tracing_context.get()
ag.tracing.set_attributes(
"metadata", {"config": rctx.get("config", {})}
Expand All @@ -174,8 +175,16 @@ def wrapped_func(*args, **kwargs):
redact(parse(*args, **kwargs), self.ignore_inputs),
)

try:
result = func(*args, **kwargs)
except Exception as e:
ag.tracing.record_exception(e)

ag.tracing.set_status("ERROR")

raise e

with suppress(Exception):
cost = 0.0
usage = {}
if isinstance(result, dict):
Expand All @@ -196,20 +205,15 @@ def wrapped_func(*args, **kwargs):
}
),
)

ag.tracing.set_attributes(
"data.outputs",
redact(patch(result), self.ignore_outputs),
)
ag.tracing.set_status("OK")

return result

except Exception as e:
ag.tracing.record_exception(e)

ag.tracing.set_status("ERROR")
ag.tracing.set_status("OK")

raise e
return result

return wrapped_func(*args, **kwargs)

Expand Down
88 changes: 61 additions & 27 deletions agenta-cli/agenta/sdk/tracing/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import httpx

from typing import Optional, Union, Any, Dict
from contextlib import contextmanager
from contextlib import contextmanager, suppress

from opentelemetry.trace import set_tracer_provider
from opentelemetry.trace.propagation import get_current_span
Expand Down Expand Up @@ -89,25 +89,31 @@ def __init__(

@contextmanager
def start_as_current_span(self, name: str, kind: str):
with self.tracer.start_as_current_span(name) as span:
try:
with self.tracer.start_as_current_span(name) as span:
self.set_attributes(
namespace="extra",
attributes={"kind": kind},
span=span,
)

yield span
except Exception as e:
yield None

def start_span(self, name: str, kind: str) -> Optional[Span]:
try:
span = self.tracer.start_span(name)

self.set_attributes(
namespace="extra",
attributes={"kind": kind},
span=span,
)

yield span

def start_span(self, name: str, kind: str) -> Span:
span = self.tracer.start_span(name)

self.set_attributes(
namespace="extra",
attributes={"kind": kind},
span=span,
)

return span
return span
except Exception as e:
return None

def set_status(
self,
Expand All @@ -118,7 +124,8 @@ def set_status(
if span is None:
span = get_current_span()

otel_set_status(span, status, message)
with suppress(Exception):
otel_set_status(span, status, message)

def add_event(
self,
Expand All @@ -130,7 +137,8 @@ def add_event(
if span is None:
span = get_current_span()

otel_add_event(span, name, attributes, timestamp)
with suppress(Exception):
otel_add_event(span, name, attributes, timestamp)

def record_exception(
self,
Expand All @@ -142,7 +150,8 @@ def record_exception(
if span is None:
span = get_current_span()

otel_record_exception(span, exception, attributes, timestamp)
with suppress(Exception):
otel_record_exception(span, exception, attributes, timestamp)

def set_attributes(
self,
Expand All @@ -153,7 +162,8 @@ def set_attributes(
if span is None:
span = get_current_span()

otel_set_attributes(span, namespace, attributes)
with suppress(Exception):
otel_set_attributes(span, namespace, attributes)

def get_attributes(
self,
Expand All @@ -163,24 +173,48 @@ def get_attributes(
if span is None:
span = get_current_span()

return otel_get_attributes(span, namespace)
attributes = {}

with suppress(Exception):
attributes = otel_get_attributes(span, namespace)

return attributes

def store_internals(
self,
attributes: Dict[str, Any],
span: Optional[Span] = None,
) -> None:
self.set_attributes(
namespace="data.internals",
attributes=attributes,
span=span,
)
if span is None:
span = get_current_span()

with suppress(Exception):
self.set_attributes(
namespace="data.internals",
attributes=attributes,
span=span,
)

def is_processing(self) -> bool:
return not self.inline_processor.is_done()
processing = False

with suppress(Exception):
processing = self.inline_processor.is_done()

return processing

def get_inline_trace(self) -> Dict[str, Any]:
return inline_get_trace(self.spans)
trace = {}

with suppress(Exception):
trace = inline_get_trace(self.spans)

return trace

def get_trace_id_only(self) -> Dict[str, Any]:
return inline_get_trace_id(self.spans)
trace = {}

with suppress(Exception):
trace = inline_get_trace_id(self.spans)

return trace

0 comments on commit f259683

Please sign in to comment.