diff --git a/agenta-cli/agenta/sdk/decorators/routing.py b/agenta-cli/agenta/sdk/decorators/routing.py index 600b0dddb..c601d4a19 100644 --- a/agenta-cli/agenta/sdk/decorators/routing.py +++ b/agenta-cli/agenta/sdk/decorators/routing.py @@ -640,13 +640,18 @@ def handle_terminal_run( ) ) + log.info("") log.info("========= Result =========") log.info("") - print("-> data") - print(json.dumps(result.data, indent=2)) - print("-> trace") - print(json.dumps(result.trace, indent=2)) + # print("-> data") + # print(json.dumps(result.data, indent=2)) + # print("-> trace") + # print(json.dumps(result.trace, indent=2)) + for span in result.trace["spans"]: + log.info( + f"{span['id']} {span['spankind'].ljust(10)} {span['status'].ljust(5)} {span['name']}" + ) log.info("") log.info("==========================") diff --git a/agenta-cli/agenta/sdk/tracing/opentelemetry.py b/agenta-cli/agenta/sdk/tracing/opentelemetry.py index 57e77c021..71596aa60 100644 --- a/agenta-cli/agenta/sdk/tracing/opentelemetry.py +++ b/agenta-cli/agenta/sdk/tracing/opentelemetry.py @@ -1,7 +1,7 @@ import json from threading import Lock -from datetime import datetime, timezone +from datetime import datetime from typing import Optional, Dict, Any, List, Literal, Sequence from contextlib import contextmanager, suppress from importlib.metadata import version @@ -94,6 +94,8 @@ def __init__( def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: super().on_start(span, parent_context=parent_context) + log.info(f"> {span.context.span_id.to_bytes(8).hex()} {span.name}") + if span.context.trace_id not in self.registry: self.registry[span.context.trace_id] = dict() @@ -102,6 +104,8 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None def on_end(self, span: ReadableSpan): super().on_end(span) + log.info(f" < {span.context.span_id.to_bytes(8).hex()} {span.name}") + del self.registry[span.context.trace_id][span.context.span_id] if self.is_done(): @@ -167,10 +171,10 @@ def __init__( project_id: Optional[str] = None, api_key: Optional[str] = None, experiment_id: Optional[str] = None, - set_global_tracer_provider: Optional[bool] = True, ) -> None: + # ENDPOINT - self.url = url + self.url = "http://localhost:4318/v1/traces" # url # AUTHENTICATION self.project_id = project_id # AUTHORIZATION @@ -190,30 +194,35 @@ def __init__( # SPANS (INLINE TRACE) self.spans: Dict[str:ReadableSpan] = dict() + # SPAN PROCESSOR + # self.processor = SynchronousMultiSpanProcessor() + self.processor = ConcurrentMultiSpanProcessor(num_threads=2) + + base_shutdown = self.processor.shutdown + + def safe_shutdown(): + with suppress(Exception): + base_shutdown() + + self.processor.shutdown = safe_shutdown + # TRACER PROVIDER - self.tracer_provider = TracerProvider( - active_span_processor=SynchronousMultiSpanProcessor() - # active_span_processor=ConcurrentMultiSpanProcessor( - # num_threads=2, # inline_processor + remote_processor - # ) - ) + self.tracer_provider = TracerProvider(active_span_processor=self.processor) - # self.tracer_provider.add_span_processor( - # TraceProcessor( - # OTLPSpanExporter( - # endpoint=self.url, - # headers=self.headers, - # ) - # ) - # ) + # TRACE PROCESSORS self.inline_processor = TraceProcessor(InlineTraceExporter(registry=self.spans)) self.tracer_provider.add_span_processor(self.inline_processor) - if set_global_tracer_provider is True: - set_tracer_provider(self.tracer_provider) + self.remote_processor = TraceProcessor( + OTLPSpanExporter(endpoint=self.url, headers=self.headers) + ) + self.tracer_provider.add_span_processor(self.remote_processor) + + # GLOBAL TRACER PROVIDER + set_tracer_provider(self.tracer_provider) # TRACER - self.tracer = self.tracer_provider.get_tracer("ag.tracing") + self.tracer = self.tracer_provider.get_tracer("agenta.tracer") # @suppress(Exception) @contextmanager @@ -357,7 +366,7 @@ def get_inline_trace(self, trace_id_only: bool = False): for span in self.spans.values(): span: ReadableSpan - trace_id = hex(span.context.trace_id)[2:] + trace_id = span.context.trace_id.to_bytes(16).hex() if trace_id not in spans_idx: spans_idx[trace_id] = list() @@ -398,7 +407,7 @@ def _parse_to_legacy_span( attributes.update(**event.attributes) legacy_span = CreateSpan( - id=hex(span.context.span_id)[2:], + id=span.context.span_id.to_bytes(8).hex(), spankind=self._get_attributes("extra", span).get("kind", "UNKNOWN"), name=span.name, status=str(span.status.status_code.name), @@ -410,7 +419,9 @@ def _parse_to_legacy_span( span.end_time / 1_000_000_000, ).isoformat(), # - parent_span_id=hex(span.parent.span_id)[2:] if span.parent else None, + parent_span_id=( + span.parent.span_id.to_bytes(8).hex() if span.parent else None + ), # inputs=self._get_attributes("data.inputs", span), internals=self._get_attributes("data.internals", span),