|
1 | 1 | import ast |
| 2 | +import hashlib |
2 | 3 | import json |
| 4 | +from collections import defaultdict |
3 | 5 | from collections.abc import Mapping, Sequence |
4 | 6 | from datetime import datetime |
5 | 7 | from typing import Any |
6 | 8 |
|
7 | 9 | from opentelemetry.sdk.trace import ReadableSpan |
8 | 10 |
|
9 | | -from ..models import ToolCall, ToolOutput |
| 11 | +from ..models import ( |
| 12 | + AgentExecution, |
| 13 | + EvaluationResult, |
| 14 | + NumericEvaluationResult, |
| 15 | + ToolCall, |
| 16 | + ToolOutput, |
| 17 | +) |
10 | 18 |
|
11 | 19 | COMPARATOR_MAPPINGS = { |
12 | 20 | ">": "gt", |
|
21 | 29 | COMMUNITY_agents_SUFFIX = "-community-agents" |
22 | 30 |
|
23 | 31 |
|
| 32 | +def generate_datapoint_id(agent_execution: AgentExecution) -> str: |
| 33 | + """Generate a collision-safe but readable datapoint ID from agent_input. |
| 34 | +
|
| 35 | + Creates a short, readable ID that includes meaningful content from the input |
| 36 | + plus a hash suffix for collision safety. |
| 37 | +
|
| 38 | + Args: |
| 39 | + agent_execution: The agent execution containing agent_input |
| 40 | +
|
| 41 | + Returns: |
| 42 | + String datapoint ID in format: "readable_part_HASH" |
| 43 | + """ |
| 44 | + if not agent_execution.agent_input: |
| 45 | + # Handle empty input case |
| 46 | + raw_input = "empty_input" |
| 47 | + else: |
| 48 | + # Convert agent_input to JSON string for hashing |
| 49 | + raw_input = json.dumps( |
| 50 | + agent_execution.agent_input, sort_keys=True, separators=(",", ":") |
| 51 | + ) |
| 52 | + |
| 53 | + # Create readable part from input (first 30 chars, alphanumeric only) |
| 54 | + readable_part = "" |
| 55 | + if isinstance(agent_execution.agent_input, dict): |
| 56 | + # Try to extract meaningful text from common fields |
| 57 | + for key in ["query", "question", "input", "prompt", "text", "message"]: |
| 58 | + if key in agent_execution.agent_input and agent_execution.agent_input[key]: |
| 59 | + text = str(agent_execution.agent_input[key]) |
| 60 | + readable_part = "".join(c for c in text if c.isalnum() or c in " _-") |
| 61 | + readable_part = readable_part.replace(" ", "_").lower()[:30] |
| 62 | + break |
| 63 | + |
| 64 | + # If no readable part found, use "input" prefix |
| 65 | + if not readable_part: |
| 66 | + readable_part = "input" |
| 67 | + |
| 68 | + # Generate 8-character hash for collision safety |
| 69 | + hash_part = hashlib.md5(raw_input.encode("utf-8")).hexdigest()[:8] |
| 70 | + |
| 71 | + return f"{readable_part}_{hash_part}" |
| 72 | + |
| 73 | + |
24 | 74 | def extract_tool_calls_names(spans: Sequence[ReadableSpan]) -> list[str]: |
25 | 75 | """Extract the tool call names from execution spans IN ORDER. |
26 | 76 |
|
@@ -456,3 +506,89 @@ def trace_to_str(agent_trace: Sequence[ReadableSpan]) -> str: |
456 | 506 | platform_history.append("") |
457 | 507 |
|
458 | 508 | return "\n".join(platform_history) |
| 509 | + |
| 510 | + |
| 511 | +def calculate_final_score( |
| 512 | + evaluation_results: list[EvaluationResult], |
| 513 | + evaluator_weights: dict[str, float] | None = None, |
| 514 | + default_weight: float = 1.0, |
| 515 | +) -> tuple[float, dict[str, float]]: |
| 516 | + """Aggregate evaluation results with deduplication and weighted scoring. |
| 517 | +
|
| 518 | + Only NumericEvaluationResult can be aggregated, other types of results are ignored. |
| 519 | +
|
| 520 | + This function performs the following steps: |
| 521 | + 1. Deduplicates results by datapoint_id and evaluator_name (averages duplicates) |
| 522 | + 2. Calculates average score per evaluator across all datapoints |
| 523 | + 3. Computes final weighted score across evaluators |
| 524 | +
|
| 525 | + Args: |
| 526 | + evaluation_results: List of EvaluationResult objects with datapoint_id and evaluator_name |
| 527 | + evaluator_weights: Optional dict mapping evaluator names to weights |
| 528 | +
|
| 529 | + Returns: |
| 530 | + Tuple of (final_score, agg_metrics_per_evaluator) |
| 531 | + - final_score: Weighted average across evaluators |
| 532 | + - agg_metrics_per_evaluator: Dict mapping evaluator names to their average scores |
| 533 | + """ |
| 534 | + if not evaluation_results: |
| 535 | + return 0.0, {} |
| 536 | + |
| 537 | + if evaluator_weights is None: |
| 538 | + evaluator_weights = {} |
| 539 | + |
| 540 | + # Step 1: Group by datapoint_id and evaluator_name for deduplication |
| 541 | + grouped_by_datapoint_evaluator = defaultdict( |
| 542 | + lambda: defaultdict(list[NumericEvaluationResult]) |
| 543 | + ) |
| 544 | + |
| 545 | + for result in evaluation_results: |
| 546 | + # Only NumericEvaluationResult can be aggregated |
| 547 | + if isinstance(result, NumericEvaluationResult): |
| 548 | + datapoint_id = result.datapoint_id or "unknown_datapoint" |
| 549 | + evaluator_name = result.evaluator_name or "unknown_evaluator" |
| 550 | + grouped_by_datapoint_evaluator[datapoint_id][evaluator_name].append(result) |
| 551 | + |
| 552 | + # Step 2: Deduplicate by averaging same evaluator results for same datapoint |
| 553 | + dedup_results: list[NumericEvaluationResult] = [] |
| 554 | + for datapoint_id, evaluators_dict in grouped_by_datapoint_evaluator.items(): |
| 555 | + for evaluator_name, results_list in evaluators_dict.items(): |
| 556 | + if results_list: |
| 557 | + # Average the scores for this evaluator on this datapoint |
| 558 | + avg_score = sum(r.score for r in results_list) / len(results_list) |
| 559 | + # Create a representative result (use first result as template) |
| 560 | + first_result = results_list[0] |
| 561 | + dedup_result = type(first_result)( |
| 562 | + score=avg_score, |
| 563 | + datapoint_id=datapoint_id, |
| 564 | + evaluator_name=evaluator_name, |
| 565 | + details=first_result.details, |
| 566 | + evaluation_time=first_result.evaluation_time, |
| 567 | + ) |
| 568 | + dedup_results.append(dedup_result) |
| 569 | + |
| 570 | + # Step 3: Group by evaluator and calculate average score per evaluator |
| 571 | + grouped_by_evaluator = defaultdict(list[NumericEvaluationResult]) |
| 572 | + for result in dedup_results: |
| 573 | + grouped_by_evaluator[result.evaluator_name].append(result) |
| 574 | + |
| 575 | + agg_metrics_per_evaluator = {} |
| 576 | + for evaluator_name, results_list in grouped_by_evaluator.items(): |
| 577 | + avg_score = sum(r.score for r in results_list) / len(results_list) |
| 578 | + agg_metrics_per_evaluator[evaluator_name] = avg_score |
| 579 | + |
| 580 | + # Step 4: Calculate final weighted score |
| 581 | + if not agg_metrics_per_evaluator: |
| 582 | + return 0.0, {} |
| 583 | + |
| 584 | + total_weighted_score = 0.0 |
| 585 | + total_weight = 0.0 |
| 586 | + |
| 587 | + for evaluator_name, avg_score in agg_metrics_per_evaluator.items(): |
| 588 | + weight = evaluator_weights.get(evaluator_name, default_weight) |
| 589 | + total_weighted_score += avg_score * weight |
| 590 | + total_weight += weight |
| 591 | + |
| 592 | + final_score = total_weighted_score / total_weight if total_weight > 0 else 0.0 |
| 593 | + |
| 594 | + return final_score, agg_metrics_per_evaluator |
0 commit comments