Skip to content

Commit

Permalink
Add OTLP processor
Browse files Browse the repository at this point in the history
  • Loading branch information
jp-agenta committed Sep 4, 2024
1 parent e34df9e commit f1cc055
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
13 changes: 9 additions & 4 deletions agenta-cli/agenta/sdk/decorators/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,13 +640,18 @@ def handle_terminal_run(
)
)

log.info("")
log.info("========= Result =========")
log.info("")

print("-> data")
print(json.dumps(result.data, indent=2))
print("-> trace")
print(json.dumps(result.trace, indent=2))
# print("-> data")
# print(json.dumps(result.data, indent=2))
# print("-> trace")
# print(json.dumps(result.trace, indent=2))
for span in result.trace["spans"]:
log.info(
f"{span['id']} {span['spankind'].ljust(10)} {span['status'].ljust(5)} {span['name']}"
)
log.info("")
log.info("==========================")

Expand Down
57 changes: 34 additions & 23 deletions agenta-cli/agenta/sdk/tracing/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json

from threading import Lock
from datetime import datetime, timezone
from datetime import datetime
from typing import Optional, Dict, Any, List, Literal, Sequence
from contextlib import contextmanager, suppress
from importlib.metadata import version
Expand Down Expand Up @@ -94,6 +94,8 @@ def __init__(
def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
super().on_start(span, parent_context=parent_context)

log.info(f"> {span.context.span_id.to_bytes(8).hex()} {span.name}")

if span.context.trace_id not in self.registry:
self.registry[span.context.trace_id] = dict()

Expand All @@ -102,6 +104,8 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None
def on_end(self, span: ReadableSpan):
super().on_end(span)

log.info(f" < {span.context.span_id.to_bytes(8).hex()} {span.name}")

del self.registry[span.context.trace_id][span.context.span_id]

if self.is_done():
Expand Down Expand Up @@ -167,10 +171,10 @@ def __init__(
project_id: Optional[str] = None,
api_key: Optional[str] = None,
experiment_id: Optional[str] = None,
set_global_tracer_provider: Optional[bool] = True,
) -> None:

# ENDPOINT
self.url = url
self.url = "http://localhost:4318/v1/traces" # url
# AUTHENTICATION
self.project_id = project_id
# AUTHORIZATION
Expand All @@ -190,30 +194,35 @@ def __init__(
# SPANS (INLINE TRACE)
self.spans: Dict[str:ReadableSpan] = dict()

# SPAN PROCESSOR
# self.processor = SynchronousMultiSpanProcessor()
self.processor = ConcurrentMultiSpanProcessor(num_threads=2)

base_shutdown = self.processor.shutdown

def safe_shutdown():
with suppress(Exception):
base_shutdown()

self.processor.shutdown = safe_shutdown

# TRACER PROVIDER
self.tracer_provider = TracerProvider(
active_span_processor=SynchronousMultiSpanProcessor()
# active_span_processor=ConcurrentMultiSpanProcessor(
# num_threads=2, # inline_processor + remote_processor
# )
)
self.tracer_provider = TracerProvider(active_span_processor=self.processor)

# self.tracer_provider.add_span_processor(
# TraceProcessor(
# OTLPSpanExporter(
# endpoint=self.url,
# headers=self.headers,
# )
# )
# )
# TRACE PROCESSORS
self.inline_processor = TraceProcessor(InlineTraceExporter(registry=self.spans))
self.tracer_provider.add_span_processor(self.inline_processor)

if set_global_tracer_provider is True:
set_tracer_provider(self.tracer_provider)
self.remote_processor = TraceProcessor(
OTLPSpanExporter(endpoint=self.url, headers=self.headers)
)
self.tracer_provider.add_span_processor(self.remote_processor)

# GLOBAL TRACER PROVIDER
set_tracer_provider(self.tracer_provider)

# TRACER
self.tracer = self.tracer_provider.get_tracer("ag.tracing")
self.tracer = self.tracer_provider.get_tracer("agenta.tracer")

# @suppress(Exception)
@contextmanager
Expand Down Expand Up @@ -357,7 +366,7 @@ def get_inline_trace(self, trace_id_only: bool = False):
for span in self.spans.values():
span: ReadableSpan

trace_id = hex(span.context.trace_id)[2:]
trace_id = span.context.trace_id.to_bytes(16).hex()

if trace_id not in spans_idx:
spans_idx[trace_id] = list()
Expand Down Expand Up @@ -398,7 +407,7 @@ def _parse_to_legacy_span(
attributes.update(**event.attributes)

legacy_span = CreateSpan(
id=hex(span.context.span_id)[2:],
id=span.context.span_id.to_bytes(8).hex(),
spankind=self._get_attributes("extra", span).get("kind", "UNKNOWN"),
name=span.name,
status=str(span.status.status_code.name),
Expand All @@ -410,7 +419,9 @@ def _parse_to_legacy_span(
span.end_time / 1_000_000_000,
).isoformat(),
#
parent_span_id=hex(span.parent.span_id)[2:] if span.parent else None,
parent_span_id=(
span.parent.span_id.to_bytes(8).hex() if span.parent else None
),
#
inputs=self._get_attributes("data.inputs", span),
internals=self._get_attributes("data.internals", span),
Expand Down

0 comments on commit f1cc055

Please sign in to comment.