|
| 1 | +"""OpenTelemetry SpanProcessor for normalizing LlamaIndex tool call attributes. |
| 2 | +
|
| 3 | +LlamaIndex wraps tool arguments in {"kwargs": {...}} which differs from other |
| 4 | +frameworks like LangChain that use flat {"arg": value} format. This processor |
| 5 | +normalizes the format at the span level before exporters or dev terminal read it. |
| 6 | +""" |
| 7 | + |
| 8 | +import json |
| 9 | +import logging |
| 10 | +from typing import Any, Optional |
| 11 | + |
| 12 | +from opentelemetry.context import Context |
| 13 | +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor |
| 14 | + |
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | + |
| 18 | +class AttributeNormalizingSpanProcessor(SpanProcessor): |
| 19 | + """Normalizes LlamaIndex tool call attributes to match other frameworks. |
| 20 | +
|
| 21 | + Unwraps {"kwargs": {...}} to flat {...} format for consistency with LangChain. |
| 22 | + """ |
| 23 | + |
| 24 | + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: |
| 25 | + """Called when span starts - no action needed.""" |
| 26 | + pass |
| 27 | + |
| 28 | + def on_end(self, span: ReadableSpan) -> None: |
| 29 | + """Normalize tool call attributes before span is consumed by exporters/terminal.""" |
| 30 | + if not span._attributes: |
| 31 | + return |
| 32 | + |
| 33 | + try: |
| 34 | + # Get the mutable internal attributes dict |
| 35 | + attrs: dict = span._attributes # type: ignore[attr-defined] |
| 36 | + |
| 37 | + if attrs.get("openinference.span.kind", None) == "TOOL": |
| 38 | + # Normalize tool call attributes |
| 39 | + for key in ("input.value", "output.value"): |
| 40 | + if key in attrs: |
| 41 | + original = attrs[key] |
| 42 | + normalized = self._normalize_attribute(key, original) |
| 43 | + |
| 44 | + if normalized != original: |
| 45 | + attrs[key] = normalized |
| 46 | + if logger.isEnabledFor(logging.DEBUG): |
| 47 | + logger.debug( |
| 48 | + f"Normalized {key} in span '{span.name}': " |
| 49 | + f"{original[:50]}... → {normalized[:50]}..." |
| 50 | + ) |
| 51 | + |
| 52 | + except Exception as e: |
| 53 | + # Don't crash span processing if normalization fails |
| 54 | + logger.debug( |
| 55 | + f"Failed to normalize span '{getattr(span, 'name', 'unknown')}': {e}" |
| 56 | + ) |
| 57 | + |
| 58 | + def _normalize_attribute(self, key: str, value: Any) -> str: |
| 59 | + """Unwrap LlamaIndex's kwargs wrapper if present.""" |
| 60 | + if isinstance(value, str): |
| 61 | + try: |
| 62 | + value = json.loads(value) |
| 63 | + except Exception: |
| 64 | + pass |
| 65 | + if isinstance(value, dict): |
| 66 | + if key == "input.value": |
| 67 | + if "kwargs" in value: |
| 68 | + value = json.dumps(value["kwargs"]) |
| 69 | + elif key == "output.value": |
| 70 | + value = json.dumps( |
| 71 | + { |
| 72 | + "content": value.get("raw_output"), |
| 73 | + "status": "success" |
| 74 | + if not value.get("is_error", False) |
| 75 | + else "error", |
| 76 | + "tool_call_id": value.get("tool_call_id"), |
| 77 | + } |
| 78 | + ) |
| 79 | + return str(value) |
| 80 | + |
| 81 | + def shutdown(self) -> None: |
| 82 | + """Called on processor shutdown - no cleanup needed.""" |
| 83 | + pass |
| 84 | + |
| 85 | + def force_flush(self, timeout_millis: int = 30000) -> bool: |
| 86 | + """Force flush - always succeeds (nothing to flush).""" |
| 87 | + return True |
0 commit comments