Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(llmobs): implement skeleton code for ragas faithfulness evaluator #10662

Merged
merged 31 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
571d317
implement ragas faithfulenss runner with dummy ragas score generator
Sep 13, 2024
4b3d840
remove newline
Sep 13, 2024
7b9c929
pydantic v1
Sep 16, 2024
2e883a0
refactor into evaluator list
Sep 16, 2024
7b31443
add unit tests
Sep 17, 2024
13229bd
fix expectde span event
Sep 17, 2024
b493e20
merg conf
Sep 17, 2024
d290dcd
remove config option, use only env var
Sep 17, 2024
6e49cca
address comments
Sep 17, 2024
fcf9991
refactor into one evaluator service
Sep 18, 2024
10b276f
dont cancel futures
Sep 18, 2024
b6fa4e0
refactor dummy faithfulness into class
Sep 19, 2024
be893a1
rename field to label
Sep 19, 2024
a309330
Merge branch 'main' into evan.li/ragas-skeleton
lievan Sep 19, 2024
d849067
Merge branch 'main' of github.com:DataDog/dd-trace-py into evan.li/ra…
Sep 19, 2024
fd73621
Merge branch 'evan.li/ragas-skeleton' of github.com:DataDog/dd-trace-…
Sep 19, 2024
2f48461
Merge branch 'main' of github.com:DataDog/dd-trace-py into evan.li/ra…
Sep 20, 2024
04d202e
Merge branch 'main' into evan.li/ragas-skeleton
lievan Sep 20, 2024
a991f15
refactor so we store the service only in ragas
Sep 23, 2024
38e0a23
Merge branch 'evan.li/ragas-skeleton' of github.com:DataDog/dd-trace-…
Sep 23, 2024
ea5d4fa
rename a test
Sep 23, 2024
e66ee7e
clean up
Sep 26, 2024
c04dac4
rename, fix test
Sep 26, 2024
0a956e1
Merge branch 'main' of github.com:DataDog/dd-trace-py into evan.li/ra…
Sep 26, 2024
6d9c136
fork safety
Sep 26, 2024
7d7192c
fix tests
Sep 26, 2024
bb8d388
add more comments
Sep 26, 2024
3c17dee
delete unused fixture
Sep 26, 2024
62ecbe4
remove debug
Sep 26, 2024
f43dceb
fix wrong patch
Sep 30, 2024
d37b328
Merge branch 'main' into evan.li/ragas-skeleton
lievan Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions ddtrace/llmobs/_evaluators/ragas/faithfulness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import math
import time
from typing import Optional


class RagasFaithfulnessEvaluator:
lievan marked this conversation as resolved.
Show resolved Hide resolved
LABEL = "ragas_faithfulness"
METRIC_TYPE = "score"

def __init__(self, llmobs_service):
self.llmobs_service = llmobs_service

def run(self, span: dict) -> None:
if not span:
return
score_result = self.evaluate(span)
if score_result:
self.llmobs_service.submit_evaluation(
span_context=span,
label=RagasFaithfulnessEvaluator.LABEL,
metric_type=RagasFaithfulnessEvaluator.METRIC_TYPE,
value=score_result,
timestamp_ms=math.floor(time.time() * 1000),
)

def evaluate(self, span: dict) -> Optional[float]:
span_id, trace_id = span.get("span_id"), span.get("trace_id")
if not span_id or not trace_id:
return None
return 1.0
83 changes: 83 additions & 0 deletions ddtrace/llmobs/_evaluators/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import atexit
from concurrent import futures
import os
from typing import Dict

from ddtrace.internal import forksafe
from ddtrace.internal import service
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicService

from .ragas.faithfulness import RagasFaithfulnessEvaluator


logger = get_logger(__name__)

SUPPORTED_EVALUATORS = {
"ragas_faithfulness": RagasFaithfulnessEvaluator,
}


class EvaluatorRunner(PeriodicService):
"""Base class for evaluating LLM Observability span events"""

def __init__(self, interval: float, llmobs_service=None):
super(EvaluatorRunner, self).__init__(interval=interval)
self._lock = forksafe.RLock()
self._buffer = [] # type: list[Dict]
self._buffer_limit = 1000

self.llmobs_service = llmobs_service
self.executor = futures.ThreadPoolExecutor()
self.evaluators = []

evaluator_str = os.getenv("_DD_LLMOBS_EVALUATORS")
if evaluator_str is None:
return

evaluators = evaluator_str.split(",")
for evaluator in evaluators:
if evaluator in SUPPORTED_EVALUATORS:
self.evaluators.append(SUPPORTED_EVALUATORS[evaluator](llmobs_service=llmobs_service))

def start(self, *args, **kwargs):
if not self.evaluators:
logger.debug("no evaluators configured, not starting %r", self.__class__.__name__)
return
super(EvaluatorRunner, self).start()
logger.debug("started %r to %r", self.__class__.__name__)
atexit.register(self.on_shutdown)

def stop(self, *args, **kwargs):
if not self.evaluators and self.status == service.ServiceStatus.STOPPED:
logger.debug("no evaluators configured, not starting %r", self.__class__.__name__)
lievan marked this conversation as resolved.
Show resolved Hide resolved
return
super(EvaluatorRunner, self).stop()

def on_shutdown(self):
self.executor.shutdown()
sabrenner marked this conversation as resolved.
Show resolved Hide resolved

def enqueue(self, span_event: Dict) -> None:
with self._lock:
if len(self._buffer) >= self._buffer_limit:
logger.warning(
"%r event buffer full (limit is %d), dropping event", self.__class__.__name__, self._buffer_limit
)
return
self._buffer.append(span_event)

def periodic(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
self._buffer = []

try:
self.run(events)
except RuntimeError as e:
logger.debug("failed to run evaluation: %s", e)

def run(self, spans):
for evaluator in self.evaluators:
self.executor.map(lambda span: evaluator.run(span), spans)
19 changes: 18 additions & 1 deletion ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import _get_llmobs_parent_id
Expand Down Expand Up @@ -88,7 +89,13 @@ def __init__(self, tracer=None):
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)),
)
self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer)

self._evaluator_runner = EvaluatorRunner(
lievan marked this conversation as resolved.
Show resolved Hide resolved
interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)),
llmobs_service=self,
)

self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluator_runner)
forksafe.register(self._child_after_fork)

def _child_after_fork(self):
Expand All @@ -114,13 +121,23 @@ def _start_service(self) -> None:
except ServiceStatusError:
log.debug("Error starting LLMObs writers")

try:
self._evaluator_runner.start()
except ServiceStatusError:
log.debug("Error starting evaluator runner")

def _stop_service(self) -> None:
try:
self._llmobs_span_writer.stop()
self._llmobs_eval_metric_writer.stop()
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

try:
lievan marked this conversation as resolved.
Show resolved Hide resolved
self._evaluator_runner.stop()
except ServiceStatusError:
log.debug("Error stopping evaluator runner")

try:
forksafe.unregister(self._child_after_fork)
self.tracer.shutdown()
Expand Down
9 changes: 8 additions & 1 deletion ddtrace/llmobs/_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class LLMObsTraceProcessor(TraceProcessor):
Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability.
"""

def __init__(self, llmobs_span_writer):
def __init__(self, llmobs_span_writer, evaluator_runner=None):
self._span_writer = llmobs_span_writer
self._evaluator_runner = evaluator_runner

def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if not trace:
Expand All @@ -57,11 +58,17 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:

def submit_llmobs_span(self, span: Span) -> None:
"""Generate and submit an LLMObs span event to be sent to LLMObs."""
span_event = None
try:
span_event = self._llmobs_span_event(span)
self._span_writer.enqueue(span_event)
except (KeyError, TypeError):
log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span)
finally:
if not span_event:
return
if self._evaluator_runner:
self._evaluator_runner.enqueue(span_event)
sabrenner marked this conversation as resolved.
Show resolved Hide resolved

def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"""Span event object structure."""
Expand Down
4 changes: 4 additions & 0 deletions tests/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ def _llmobs_base_span_event(
span_event["meta"]["error.type"] = error
span_event["meta"]["error.message"] = error_message
span_event["meta"]["error.stack"] = error_stack
if tags:
span_event["ml_app"] = tags.get("ml_app", "unnamed-ml-app")
else:
span_event["ml_app"] = "unnamed-ml-app"
return span_event


Expand Down
24 changes: 24 additions & 0 deletions tests/llmobs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ddtrace.internal.utils.http import Response
from ddtrace.llmobs import LLMObs as llmobs_service
from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator
from tests.llmobs._utils import logs_vcr
from tests.utils import DummyTracer
from tests.utils import override_global_config
Expand Down Expand Up @@ -58,6 +59,16 @@ def mock_llmobs_eval_metric_writer():
patcher.stop()


@pytest.fixture
def mock_llmobs_ragas_evaluator():
patcher = mock.patch("ddtrace.llmobs._evaluations.ragas.faithfulness.evaluator.run")
RagasEvaluator = patcher.start()
m = mock.MagicMock()
RagasEvaluator.return_value = m
yield m
patcher.stop()


@pytest.fixture
def mock_http_writer_send_payload_response():
with mock.patch(
Expand Down Expand Up @@ -88,6 +99,12 @@ def mock_writer_logs():
yield m


@pytest.fixture
def mock_evaluator_logs():
with mock.patch("ddtrace.llmobs._evaluators.runner.logger") as m:
yield m


@pytest.fixture
def mock_http_writer_logs():
with mock.patch("ddtrace.internal.writer.writer.log") as m:
Expand Down Expand Up @@ -125,3 +142,10 @@ def AgentlessLLMObs(mock_llmobs_span_agentless_writer, mock_llmobs_eval_metric_w
llmobs_service.enable(_tracer=dummy_tracer)
yield llmobs_service
llmobs_service.disable()


@pytest.fixture
lievan marked this conversation as resolved.
Show resolved Hide resolved
def mock_ragas_evaluator(mock_llmobs_eval_metric_writer):
with mock.patch("ddtrace.llmobs._evaluators.ragas.faithfulness.RagasFaithfulnessEvaluator.evaluate") as m:
m.return_value = 1.0
yield RagasFaithfulnessEvaluator
98 changes: 98 additions & 0 deletions tests/llmobs/test_llmobs_evaluator_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import time

import mock
import pytest

from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._writer import LLMObsEvaluationMetricEvent


def _dummy_ragas_eval_metric_event(span_id, trace_id):
return LLMObsEvaluationMetricEvent(
span_id=span_id,
trace_id=trace_id,
score_value=1.0,
ml_app="unnamed-ml-app",
timestamp_ms=mock.ANY,
metric_type="score",
label="ragas_faithfulness",
tags=["ddtrace.version:2.14.0.dev78+g929ae6186", "ml_app:unnamed-ml-app"],
)


def test_evaluator_runner_start(mock_evaluator_logs, mock_ragas_evaluator):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock())
evaluator_runner.evaluators.append(mock_ragas_evaluator)
evaluator_runner.start()
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")])


def test_evaluator_runner_buffer_limit(mock_evaluator_logs):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock())
for _ in range(1001):
evaluator_runner.enqueue({})
mock_evaluator_logs.warning.assert_called_with(
"%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000
)


def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator):
lievan marked this conversation as resolved.
Show resolved Hide resolved
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs))
evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"})
evaluator_runner.periodic()
mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


@pytest.mark.vcr_logs
def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator):
lievan marked this conversation as resolved.
Show resolved Hide resolved
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs))
evaluator_runner.start()

evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"})

time.sleep(0.1)

mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subprocess):
out, err, status, pid = run_python_code_in_subprocess(
"""
import os
import time
import mock

from ddtrace.internal.utils.http import Response
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator

with mock.patch(
"ddtrace.internal.writer.HTTPWriter._send_payload",
return_value=Response(
status=200,
body="{}",
),
):
LLMObs.enable(
site="datad0g.com",
api_key=os.getenv("DD_API_KEY"),
ml_app="unnamed-ml-app",
)
evaluator_runner = EvaluatorRunner(
interval=0.01, llmobs_service=LLMObs
)
evaluator_runner.evaluators.append(RagasFaithfulnessEvaluator(llmobs_service=LLMObs))
evaluator_runner.start()
evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"})
""",
)
assert status == 0, err
assert out == b""
assert err == b""
Loading
Loading