Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 157 additions & 48 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages
from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager
from anthropic.lib.streaming import (
MessageStreamManager,
MessageStream,
AsyncMessageStreamManager,
)

from anthropic.types import (
MessageStartEvent,
Expand All @@ -56,7 +60,7 @@
raise DidNotEnable("Anthropic not installed")

if TYPE_CHECKING:
from typing import Any, AsyncIterator, Iterator, Optional, Union
from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

Expand All @@ -67,7 +71,7 @@
TextBlockParam,
ToolUnionParam,
)
from anthropic.lib.streaming import MessageStream, AsyncMessageStream
from anthropic.lib.streaming import AsyncMessageStream


class _RecordedUsage:
Expand All @@ -89,14 +93,35 @@
version = package_version("anthropic")
_check_minimum_version(AnthropicIntegration, version)

"""
client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol.
The private _iterator variable and the close() method are patched. During iteration over the _iterator generator,
information from intercepted events is accumulated and used to populate output attributes on the AI Client Span.

The span can be finished in two places:
- When the user exits the context manager or directly calls close(), the patched close() finishes the span.
- When iteration ends, the finally block in the _iterator wrapper finishes the span.

Both paths may run. For example, the context manager exit can follow iterator exhaustion.
"""
Messages.create = _wrap_message_create(Messages.create)
Stream.close = _wrap_close(Stream.close)

Check warning on line 108 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Missing exception handling in _wrap_close may prevent HTTP connection from closing

The `_wrap_close` function doesn't wrap its Sentry tracing logic with `capture_internal_exceptions`. If `_finish_streaming_span` or any internal Sentry operation throws an exception, the original `close()` method (`f(self)`) will never be called. This could leave the underlying HTTP connection open, causing resource leaks. The analogous `_wrap_synchronous_message_iterator` already correctly uses `capture_internal_exceptions` in its finally block.

AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

"""
client.messages.stream() patches are analogous to the patches for client.messages.create(stream=True) described above.
"""
Messages.stream = _wrap_message_stream(Messages.stream)
MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter(
MessageStreamManager.__enter__
)

# Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a
# MessageStream inherits from Stream, so patching Stream is sufficient on these versions.
if not issubclass(MessageStream, Stream):
MessageStream.close = _wrap_close(MessageStream.close)

Check warning on line 123 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: code-review

AsyncStream.close() and AsyncMessageStream.close() not patched, spans may not finish

The PR patches `Stream.close()` and conditionally `MessageStream.close()` to finish spans when the connection is closed early, but does not patch the corresponding `AsyncStream.close()` or `AsyncMessageStream.close()` methods. If a user calls `close()` on an async stream before fully consuming the iterator, the span will not be finished properly, leaving orphaned spans. This creates inconsistent behavior between sync and async APIs.

AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream)
AsyncMessageStreamManager.__aenter__ = (
_wrap_async_message_stream_manager_aenter(
Expand Down Expand Up @@ -399,21 +424,13 @@


def _wrap_synchronous_message_iterator(
stream: "Union[Stream, MessageStream]",
iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []
response_id = None
finish_reason = None

try:
for event in iterator:
# Message and content types are aliases for corresponding Raw* types, introduced in
Expand All @@ -432,40 +449,21 @@
yield event
continue

(model, usage, content_blocks, response_id, finish_reason) = (
_collect_ai_data(
event,
model,
usage,
content_blocks,
response_id,
finish_reason,
)
)
_accumulate_event_data(stream, event)
yield event
finally:
with capture_internal_exceptions():
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
response_id=response_id,
finish_reason=finish_reason,
)
if hasattr(stream, "_span"):
_finish_streaming_span(
span=stream._span,
integration=stream._integration,
model=stream._model,
usage=stream._usage,
content_blocks=stream._content_blocks,
response_id=stream._response_id,
finish_reason=stream._finish_reason,
)
del stream._span


async def _wrap_asynchronous_message_iterator(
Expand Down Expand Up @@ -625,9 +623,15 @@
result = yield f, args, kwargs

if isinstance(result, Stream):
result._span = span
result._integration = integration

_initialize_data_accumulation_state(result)
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
result,
result._iterator,
)

return result

if isinstance(result, AsyncStream):
Expand Down Expand Up @@ -712,6 +716,108 @@
return _sentry_patched_create_sync


def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None:
"""
Initialize fields for accumulating output on the Stream instance.
"""
if not hasattr(stream, "_model"):
stream._model = None
stream._usage = _RecordedUsage()
stream._content_blocks = []
stream._response_id = None
stream._finish_reason = None


def _accumulate_event_data(
stream: "Union[Stream, MessageStream]",
event: "Union[RawMessageStreamEvent, MessageStreamEvent]",
) -> None:
"""
Update accumulated output from a single stream event.
"""
(model, usage, content_blocks, response_id, finish_reason) = _collect_ai_data(
event,
stream._model,
stream._usage,
stream._content_blocks,
stream._response_id,
stream._finish_reason,
)

stream._model = model
stream._usage = usage
stream._content_blocks = content_blocks
stream._response_id = response_id
stream._finish_reason = finish_reason


def _finish_streaming_span(
span: "Span",
integration: "AnthropicIntegration",
model: "Optional[str]",
usage: "_RecordedUsage",
content_blocks: "list[str]",
response_id: "Optional[str]",
finish_reason: "Optional[str]",
) -> None:
"""
Set output attributes on the AI Client Span and end the span.
"""
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
response_id=response_id,
finish_reason=finish_reason,
)


def _wrap_close(
f: "Callable[..., None]",
) -> "Callable[..., None]":
"""
Closes the AI Client Span unless the finally block in `_wrap_synchronous_message_iterator()` runs first.
"""

def close(self: "Union[Stream, MessageStream]") -> None:
if not hasattr(self, "_span"):
return f(self)

if not hasattr(self, "_model"):
self._span.__exit__(None, None, None)
del self._span
return f(self)

_finish_streaming_span(
span=self._span,
integration=self._integration,
model=self._model,
usage=self._usage,
content_blocks=self._content_blocks,
response_id=self._response_id,
finish_reason=self._finish_reason,
)

Check warning on line 813 in sentry_sdk/integrations/anthropic.py

View check run for this annotation

@sentry/warden / warden: code-review

Missing capture_internal_exceptions() around _finish_streaming_span in _wrap_close

The `_wrap_close` function calls `_finish_streaming_span()` without wrapping it in `capture_internal_exceptions()`, unlike the same call in `_wrap_synchronous_message_iterator()` (line 455). If an exception occurs during span finishing (e.g., unexpected state, SDK bug), it would prevent the original `close()` method from being called, potentially leaking HTTP connections and propagating unexpected exceptions to user code.
del self._span

return f(self)

return close


def _wrap_message_create_async(f: "Any") -> "Any":
async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
gen = _sentry_patched_create_common(f, *args, **kwargs)
Expand Down Expand Up @@ -819,10 +925,13 @@
tools=self._tools,
)

stream._span = span
stream._integration = integration

_initialize_data_accumulation_state(stream)
stream._iterator = _wrap_synchronous_message_iterator(
iterator=stream._iterator,
span=span,
integration=integration,
stream,
stream._iterator,
)

return stream
Expand Down
Loading
Loading