From f259683155563d9110e423c619c55cca2be0ca11 Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Thu, 12 Sep 2024 16:30:53 +0200 Subject: [PATCH] Fix exception handling to suppress tracing errors --- agenta-cli/agenta/sdk/decorators/routing.py | 33 ++----- agenta-cli/agenta/sdk/decorators/tracing.py | 44 +++++----- .../agenta/sdk/tracing/opentelemetry.py | 88 +++++++++++++------ 3 files changed, 94 insertions(+), 71 deletions(-) diff --git a/agenta-cli/agenta/sdk/decorators/routing.py b/agenta-cli/agenta/sdk/decorators/routing.py index c92148832..ea72c8138 100644 --- a/agenta-cli/agenta/sdk/decorators/routing.py +++ b/agenta-cli/agenta/sdk/decorators/routing.py @@ -13,7 +13,7 @@ from tempfile import NamedTemporaryFile from typing import Any, Callable, Dict, Optional, Tuple, List from importlib.metadata import version - +from contextlib import suppress from fastapi.middleware.cors import CORSMiddleware from fastapi import Body, FastAPI, UploadFile, HTTPException @@ -383,6 +383,9 @@ async def execute_function( FINALSTEP = 0.001 NOFSTEPS = TIMEOUT / TIMESTEP + data = None + trace = {} + try: result = ( await func(*args, **func_params["params"]) @@ -391,7 +394,10 @@ async def execute_function( ) data = self.patch_result(result) + except Exception as e: + self.handle_exception(e) + with suppress(Exception): if inline_trace: if WAIT_FOR_SPANS: remaining_steps = NOFSTEPS @@ -406,30 +412,9 @@ async def execute_function( else: trace = ag.tracing.get_trace_id_only() - response = BaseResponse(data=data, trace=trace) - - return response - - except Exception as e: - if WAIT_FOR_SPANS: - remaining_steps = NOFSTEPS + response = BaseResponse(data=data, trace=trace) - while ag.tracing.is_processing() and remaining_steps > 0: - await asyncio.sleep(TIMESTEP) - remaining_steps -= 1 - - await asyncio.sleep(FINALSTEP) - - trace = ag.tracing.get_inline_trace(trace_id_only=(not inline_trace)) - - log.info("========= Error ==========") - log.info("") - - print("-> trace") - log.info(trace) - print("-> exception") - - self.handle_exception(e) + return response def handle_exception(self, e: Exception): status_code = e.status_code if hasattr(e, "status_code") else 500 diff --git a/agenta-cli/agenta/sdk/decorators/tracing.py b/agenta-cli/agenta/sdk/decorators/tracing.py index 78676a99c..72ebdef90 100644 --- a/agenta-cli/agenta/sdk/decorators/tracing.py +++ b/agenta-cli/agenta/sdk/decorators/tracing.py @@ -1,8 +1,8 @@ import inspect from functools import wraps from itertools import chain -from contextvars import ContextVar -from typing import Callable, Optional, Union, Any, Dict, List +from contextlib import suppress +from typing import Callable, Optional, Any, Dict, List import agenta as ag @@ -89,7 +89,7 @@ def patch(result: Any) -> Dict[str, Any]: async def async_wrapper(*args, **kwargs): async def wrapped_func(*args, **kwargs): with ag.tracing.start_as_current_span(func.__name__, self.kind): - try: + with suppress(Exception): rctx = tracing_context.get() ag.tracing.set_attributes( "metadata", {"config": rctx.get("config", {})} @@ -109,8 +109,16 @@ async def wrapped_func(*args, **kwargs): redact(parse(*args, **kwargs), self.ignore_inputs), ) + try: result = await func(*args, **kwargs) + except Exception as e: + ag.tracing.record_exception(e) + + ag.tracing.set_status("ERROR") + + raise e + with suppress(Exception): cost = 0.0 usage = {} if isinstance(result, dict): @@ -139,14 +147,7 @@ async def wrapped_func(*args, **kwargs): ag.tracing.set_status("OK") - return result - - except Exception as e: - ag.tracing.record_exception(e) - - ag.tracing.set_status("ERROR") - - raise e + return result return await wrapped_func(*args, **kwargs) @@ -154,7 +155,7 @@ async def wrapped_func(*args, **kwargs): def sync_wrapper(*args, **kwargs): def wrapped_func(*args, **kwargs): with ag.tracing.start_as_current_span(func.__name__, self.kind): - try: + with suppress(Exception): rctx = tracing_context.get() ag.tracing.set_attributes( "metadata", {"config": rctx.get("config", {})} @@ -174,8 +175,16 @@ def wrapped_func(*args, **kwargs): redact(parse(*args, **kwargs), self.ignore_inputs), ) + try: result = func(*args, **kwargs) + except Exception as e: + ag.tracing.record_exception(e) + + ag.tracing.set_status("ERROR") + + raise e + with suppress(Exception): cost = 0.0 usage = {} if isinstance(result, dict): @@ -196,20 +205,15 @@ def wrapped_func(*args, **kwargs): } ), ) + ag.tracing.set_attributes( "data.outputs", redact(patch(result), self.ignore_outputs), ) - ag.tracing.set_status("OK") - - return result - - except Exception as e: - ag.tracing.record_exception(e) - ag.tracing.set_status("ERROR") + ag.tracing.set_status("OK") - raise e + return result return wrapped_func(*args, **kwargs) diff --git a/agenta-cli/agenta/sdk/tracing/opentelemetry.py b/agenta-cli/agenta/sdk/tracing/opentelemetry.py index dc82777c0..331405bae 100644 --- a/agenta-cli/agenta/sdk/tracing/opentelemetry.py +++ b/agenta-cli/agenta/sdk/tracing/opentelemetry.py @@ -1,7 +1,7 @@ import httpx from typing import Optional, Union, Any, Dict -from contextlib import contextmanager +from contextlib import contextmanager, suppress from opentelemetry.trace import set_tracer_provider from opentelemetry.trace.propagation import get_current_span @@ -89,25 +89,31 @@ def __init__( @contextmanager def start_as_current_span(self, name: str, kind: str): - with self.tracer.start_as_current_span(name) as span: + try: + with self.tracer.start_as_current_span(name) as span: + self.set_attributes( + namespace="extra", + attributes={"kind": kind}, + span=span, + ) + + yield span + except Exception as e: + yield None + + def start_span(self, name: str, kind: str) -> Optional[Span]: + try: + span = self.tracer.start_span(name) + self.set_attributes( namespace="extra", attributes={"kind": kind}, span=span, ) - yield span - - def start_span(self, name: str, kind: str) -> Span: - span = self.tracer.start_span(name) - - self.set_attributes( - namespace="extra", - attributes={"kind": kind}, - span=span, - ) - - return span + return span + except Exception as e: + return None def set_status( self, @@ -118,7 +124,8 @@ def set_status( if span is None: span = get_current_span() - otel_set_status(span, status, message) + with suppress(Exception): + otel_set_status(span, status, message) def add_event( self, @@ -130,7 +137,8 @@ def add_event( if span is None: span = get_current_span() - otel_add_event(span, name, attributes, timestamp) + with suppress(Exception): + otel_add_event(span, name, attributes, timestamp) def record_exception( self, @@ -142,7 +150,8 @@ def record_exception( if span is None: span = get_current_span() - otel_record_exception(span, exception, attributes, timestamp) + with suppress(Exception): + otel_record_exception(span, exception, attributes, timestamp) def set_attributes( self, @@ -153,7 +162,8 @@ def set_attributes( if span is None: span = get_current_span() - otel_set_attributes(span, namespace, attributes) + with suppress(Exception): + otel_set_attributes(span, namespace, attributes) def get_attributes( self, @@ -163,24 +173,48 @@ def get_attributes( if span is None: span = get_current_span() - return otel_get_attributes(span, namespace) + attributes = {} + + with suppress(Exception): + attributes = otel_get_attributes(span, namespace) + + return attributes def store_internals( self, attributes: Dict[str, Any], span: Optional[Span] = None, ) -> None: - self.set_attributes( - namespace="data.internals", - attributes=attributes, - span=span, - ) + if span is None: + span = get_current_span() + + with suppress(Exception): + self.set_attributes( + namespace="data.internals", + attributes=attributes, + span=span, + ) def is_processing(self) -> bool: - return not self.inline_processor.is_done() + processing = False + + with suppress(Exception): + processing = self.inline_processor.is_done() + + return processing def get_inline_trace(self) -> Dict[str, Any]: - return inline_get_trace(self.spans) + trace = {} + + with suppress(Exception): + trace = inline_get_trace(self.spans) + + return trace def get_trace_id_only(self) -> Dict[str, Any]: - return inline_get_trace_id(self.spans) + trace = {} + + with suppress(Exception): + trace = inline_get_trace_id(self.spans) + + return trace