Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e3cc76d
fix(anthropic): Respect iterator protocol in synchronous streamed res…
alexander-alderman-webb Mar 16, 2026
7bd7ed4
version check and typing
alexander-alderman-webb Mar 16, 2026
c8a6ec3
fix tests
alexander-alderman-webb Mar 16, 2026
6f1d833
missing types
alexander-alderman-webb Mar 16, 2026
40e2bf0
.
alexander-alderman-webb Mar 16, 2026
a21406f
fix(anthropic): Respect iterator protocol in asynchronous streamed re…
alexander-alderman-webb Mar 16, 2026
a3cc18f
simplify
alexander-alderman-webb Mar 16, 2026
631e727
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 16, 2026
dd26abc
simplify
alexander-alderman-webb Mar 16, 2026
f6b8909
merge master
alexander-alderman-webb Mar 17, 2026
0aeec72
update tests
alexander-alderman-webb Mar 17, 2026
005eb66
merge
alexander-alderman-webb Mar 17, 2026
c5cd959
.
alexander-alderman-webb Mar 17, 2026
b92db6d
docstring
alexander-alderman-webb Mar 17, 2026
a956644
docstring
alexander-alderman-webb Mar 17, 2026
beb8f2c
docstrings
alexander-alderman-webb Mar 17, 2026
e84a63f
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 17, 2026
7ec95fe
docs
alexander-alderman-webb Mar 17, 2026
288a065
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 17, 2026
8e9bfab
docstrings
alexander-alderman-webb Mar 17, 2026
fab5d93
type annotation
alexander-alderman-webb Mar 17, 2026
cda41e2
review
alexander-alderman-webb Mar 17, 2026
4156446
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 17, 2026
e17e036
review
alexander-alderman-webb Mar 17, 2026
31869af
simplify
alexander-alderman-webb Mar 17, 2026
1017e9e
simplify
alexander-alderman-webb Mar 17, 2026
8897151
simplify
alexander-alderman-webb Mar 17, 2026
fcedba7
.
alexander-alderman-webb Mar 17, 2026
fdf770a
.
alexander-alderman-webb Mar 17, 2026
e2f7afc
.
alexander-alderman-webb Mar 17, 2026
e36a4d4
merge master
alexander-alderman-webb Mar 18, 2026
c79fe89
docstring
alexander-alderman-webb Mar 18, 2026
e3b6b60
fix type annotation
alexander-alderman-webb Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 76 additions & 50 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
MessageStreamManager,
MessageStream,
AsyncMessageStreamManager,
AsyncMessageStream,
)

from anthropic.types import (
Expand All @@ -60,7 +61,15 @@
raise DidNotEnable("Anthropic not installed")

if TYPE_CHECKING:
from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable
from typing import (
Any,
AsyncIterator,
Iterator,
Optional,
Union,
Callable,
Awaitable,
)
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

Expand All @@ -71,7 +80,6 @@
TextBlockParam,
ToolUnionParam,
)
from anthropic.lib.streaming import AsyncMessageStream


class _RecordedUsage:
Expand All @@ -95,6 +103,7 @@ def setup_once() -> None:

"""
client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol.
Analogously, the function can return an AsyncStream, which implements the asynchronous iterator protocol.
The private _iterator variable and the close() method are patched. During iteration over the _iterator generator,
information from intercepted events is accumulated and used to populate output attributes on the AI Client Span.

Expand All @@ -108,6 +117,7 @@ def setup_once() -> None:
Stream.close = _wrap_close(Stream.close)

AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)
AsyncStream.close = _wrap_async_close(AsyncStream.close)

"""
client.messages.stream() patches are analogous to the patches for client.messages.create(stream=True) described above.
Expand All @@ -129,6 +139,11 @@ def setup_once() -> None:
)
)

# Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a
# AsyncMessageStream inherits from AsyncStream, so patching Stream is sufficient on these versions.
if not issubclass(AsyncMessageStream, AsyncStream):
AsyncMessageStream.close = _wrap_async_close(AsyncMessageStream.close)


def _capture_exception(exc: "Any") -> None:
set_span_errored()
Expand Down Expand Up @@ -467,20 +482,13 @@ def _wrap_synchronous_message_iterator(


async def _wrap_asynchronous_message_iterator(
stream: "Union[AsyncStream, AsyncMessageStream]",
iterator: "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]",
span: "Span",
integration: "AnthropicIntegration",
) -> "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
"""
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []
response_id = None
finish_reason = None

try:
async for event in iterator:
# Message and content types are aliases for corresponding Raw* types, introduced in
Expand All @@ -499,44 +507,21 @@ async def _wrap_asynchronous_message_iterator(
yield event
continue

(
model,
usage,
content_blocks,
response_id,
finish_reason,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
response_id,
finish_reason,
)
_accumulate_event_data(stream, event)
yield event
finally:
with capture_internal_exceptions():
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
response_id=response_id,
finish_reason=finish_reason,
)
if hasattr(stream, "_span"):
_finish_streaming_span(
span=stream._span,
integration=stream._integration,
model=stream._model,
usage=stream._usage,
content_blocks=stream._content_blocks,
response_id=stream._response_id,
finish_reason=stream._finish_reason,
)
del stream._span


def _set_output_data(
Expand Down Expand Up @@ -635,9 +620,15 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
return result

if isinstance(result, AsyncStream):
result._span = span
result._integration = integration

_initialize_data_accumulation_state(result)
result._iterator = _wrap_asynchronous_message_iterator(
result._iterator, span, integration
result,
result._iterator,
)

return result

with capture_internal_exceptions():
Expand Down Expand Up @@ -856,6 +847,38 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
return _sentry_patched_create_async


def _wrap_async_close(
f: "Callable[..., Awaitable[None]]",
) -> "Callable[..., Awaitable[None]]":
"""
Closes the AI Client Span unless the finally block in `_wrap_asynchronous_message_iterator()` runs first.
"""

async def close(self: "AsyncStream") -> None:
if not hasattr(self, "_span"):
return await f(self)

if not hasattr(self, "_model"):
self._span.__exit__(None, None, None)
del self._span
return await f(self)

_finish_streaming_span(
span=self._span,
integration=self._integration,
model=self._model,
usage=self._usage,
content_blocks=self._content_blocks,
response_id=self._response_id,
finish_reason=self._finish_reason,
)
del self._span

return await f(self)

return close


def _wrap_message_stream(f: "Any") -> "Any":
"""
Attaches user-provided arguments to the returned context manager.
Expand Down Expand Up @@ -1012,10 +1035,13 @@ async def _sentry_patched_aenter(
tools=self._tools,
)

stream._span = span
stream._integration = integration

_initialize_data_accumulation_state(stream)
stream._iterator = _wrap_asynchronous_message_iterator(
iterator=stream._iterator,
span=span,
integration=integration,
stream,
stream._iterator,
)

return stream
Expand Down
Loading
Loading