From e3cc76d4516c8d4945d92c9b5f8846f69a36b3cf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 11:24:31 +0100 Subject: [PATCH 01/23] fix(anthropic): Respect iterator protocol in synchronous streamed responses --- sentry_sdk/integrations/anthropic.py | 334 +++++++++++++-- .../integrations/anthropic/test_anthropic.py | 405 ++++++++++++++++++ 2 files changed, 692 insertions(+), 47 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index bc208ac4f5..efb8638642 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -39,7 +39,11 @@ from anthropic import Stream, AsyncStream from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager + from anthropic.lib.streaming import ( + MessageStreamManager, + MessageStream, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -56,7 +60,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union + from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -67,7 +71,7 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import MessageStream, AsyncMessageStream + from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -89,13 +93,36 @@ def setup_once() -> None: version = package_version("anthropic") _check_minimum_version(AnthropicIntegration, version) + """ + client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The close() method is patched for situations in which the method is directly invoked by the user, and otherwise + the finally block in the __iter__ patch closes the span. + """ Messages.create = _wrap_message_create(Messages.create) + Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) + Stream.__next__ = _wrap_stream_next(Stream.__next__) + Stream.close = _wrap_stream_close(Stream.close) + AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + """ + client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The close() method is patched for situations in which the method is directly invoked by the user, and otherwise + the finally block in the __iter__ patch closes the span. + """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -398,20 +425,14 @@ def _set_create_input_data( def _wrap_synchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - + generator_exit = False try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -430,36 +451,25 @@ def _wrap_synchronous_message_iterator( yield event continue - (model, usage, content_blocks, response_id) = _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - ) + _accumulate_event_data(stream, event) yield event + except ( + GeneratorExit + ): # https://docs.python.org/3/reference/expressions.html#generator.close + generator_exit = True + raise finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - ) + if not generator_exit and hasattr(stream, "_span"): + _finish_streaming_span( + stream._span, + stream._integration, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + ) + del stream._span async def _wrap_asynchronous_message_iterator( @@ -612,9 +622,8 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs if isinstance(result, Stream): - result._iterator = _wrap_synchronous_message_iterator( - result._iterator, span, integration - ) + result._span = span + result._integration = integration return result if isinstance(result, AsyncStream): @@ -698,6 +707,155 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]"): + """ + Initialize fields for accumulating output on the Stream instance. + """ + if not hasattr(stream, "_model"): + stream._model = None + stream._usage = _RecordedUsage() + stream._content_blocks: "list[str]" = [] + stream._response_id = None + + +def _accumulate_event_data( + self, event: "Union[RawMessageStreamEvent, MessageStreamEvent]" +): + """ + Update accumulated output from a single stream event. + """ + (model, usage, content_blocks, response_id) = _collect_ai_data( + event, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + + self._model = model + self._usage = usage + self._content_blocks = content_blocks + self._response_id = response_id + + +def _finish_streaming_span( + span: "Span", + integration: "AnthropicIntegration", + model: "Optional[str]", + usage: "_RecordedUsage", + content_blocks: "list[str]", + response_id: "Optional[str]", +): + """ + Set output attributes on the AI Client Span and end the span. + """ + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + response_id=response_id, + ) + + +def _wrap_stream_iter( + f: "Callable[..., Iterator[RawMessageStreamEvent]]", +) -> "Callable[..., Iterator[RawMessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + def __iter__(self) -> "Iterator[RawMessageStreamEvent]": + if not hasattr(self, "_span"): + for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_stream_next( + f: "Callable[..., RawMessageStreamEvent]", +) -> "Callable[..., RawMessageStreamEvent]": + """ + Accumulates output data from the returned event. + """ + + def __next__(self) -> "RawMessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + raise + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self) -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return f(self) + + return close + + def _wrap_message_create_async(f: "Any") -> "Any": async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": gen = _sentry_patched_create_common(f, *args, **kwargs) @@ -805,17 +963,99 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": tools=self._tools, ) - stream._iterator = _wrap_synchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, - ) + stream._span = span + stream._integration = integration return stream return _sentry_patched_enter +def _wrap_message_stream_iter( + f: "Callable[..., Iterator[MessageStreamEvent]]", +) -> "Callable[..., Iterator[MessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + def __iter__(self) -> "Iterator[MessageStreamEvent]": + if not hasattr(self, "_span"): + for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_message_stream_next( + f: "Callable[..., MessageStreamEvent]", +) -> "Callable[..., MessageStreamEvent]": + """ + Accumulates output data from the returned event. + """ + + def __next__(self) -> "MessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + raise + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_message_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self) -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return f(self) + + return close + + def _wrap_async_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 3a854e3a4e..e3cde11b62 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,6 +1,7 @@ import pytest from unittest import mock import json +from itertools import islice try: from unittest.mock import AsyncMock @@ -325,6 +326,207 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +def test_streaming_create_message_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + while True: + next(messages) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +def test_streaming_create_message_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + next(messages) + next(messages) + list(islice(messages, 1)) + next(messages) + messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -441,6 +643,209 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +def test_stream_messages_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + while True: + next(stream) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +def test_stream_messages_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + next(stream) + next(stream) + list(islice(stream, 2)) + next(stream) + stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", From 7bd7ed42c9efb24eb94ae07ef4b9f76c8c44db44 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:00:50 +0100 Subject: [PATCH 02/23] version check and typing --- sentry_sdk/integrations/anthropic.py | 78 ++++++++++--------- .../integrations/anthropic/test_anthropic.py | 7 +- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index efb8638642..5a0b511708 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -120,9 +120,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) - MessageStream.close = _wrap_message_stream_close(MessageStream.close) + + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if version >= (0, 26, 2): + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -779,7 +783,7 @@ def _wrap_stream_iter( output attributes on the AI Client Span and end the span. """ - def __iter__(self) -> "Iterator[RawMessageStreamEvent]": + def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": if not hasattr(self, "_span"): for event in f(self): yield event @@ -801,24 +805,26 @@ def _wrap_stream_next( Accumulates output data from the returned event. """ - def __next__(self) -> "RawMessageStreamEvent": + def __next__(self: "Stream") -> "RawMessageStreamEvent": _initialize_data_accumulation_state(self) try: event = f(self) except StopIteration: - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - ) - del self._span - raise + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) _accumulate_event_data(self, event) return event @@ -833,7 +839,7 @@ def _wrap_stream_close( Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self) -> None: + def close(self: "Stream") -> None: if not hasattr(self, "_span"): return f(self) @@ -979,7 +985,7 @@ def _wrap_message_stream_iter( output attributes on the AI Client Span and end the span. """ - def __iter__(self) -> "Iterator[MessageStreamEvent]": + def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": if not hasattr(self, "_span"): for event in f(self): yield event @@ -1001,24 +1007,26 @@ def _wrap_message_stream_next( Accumulates output data from the returned event. """ - def __next__(self) -> "MessageStreamEvent": + def __next__(self: "MessageStream") -> "MessageStreamEvent": _initialize_data_accumulation_state(self) try: event = f(self) except StopIteration: - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - ) - del self._span - raise + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) _accumulate_event_data(self, event) return event @@ -1033,7 +1041,7 @@ def _wrap_message_stream_close( Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self) -> None: + def close(self: "MessageStream") -> None: if not hasattr(self, "_span"): return f(self) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index e3cde11b62..49c32abf75 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -13,6 +13,7 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic +from anthropic.lib.streaming import TextEvent from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -815,8 +816,10 @@ def test_stream_messages_iterator_methods( ) as stream: next(stream) next(stream) - list(islice(stream, 2)) - next(stream) + list(islice(stream, 1)) + # New versions add TextEvent, so consume one more event. + if isinstance(next(stream), TextEvent): + next(stream) stream.close() assert len(events) == 1 From c8a6ec306f1d413295b9dbbd213317fcc7c1241e Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:03:40 +0100 Subject: [PATCH 03/23] fix tests --- tests/integrations/anthropic/test_anthropic.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 49c32abf75..d9d5c0c30e 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -13,7 +13,6 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic -from anthropic.lib.streaming import TextEvent from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -30,6 +29,11 @@ async def __call__(self, *args, **kwargs): except ImportError: pass +try: + from anthropic.lib.streaming import TextEvent +except ImportError: + TextEvent = None + try: # 0.27+ from anthropic.types.raw_message_delta_event import Delta @@ -818,7 +822,7 @@ def test_stream_messages_iterator_methods( next(stream) list(islice(stream, 1)) # New versions add TextEvent, so consume one more event. - if isinstance(next(stream), TextEvent): + if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) stream.close() From 6f1d833b6d290bd413cee40d4afe55981615a2cc Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:07:01 +0100 Subject: [PATCH 04/23] missing types --- sentry_sdk/integrations/anthropic.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5a0b511708..27acdafbe4 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -711,35 +711,36 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync -def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]"): +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None: """ Initialize fields for accumulating output on the Stream instance. """ if not hasattr(stream, "_model"): stream._model = None stream._usage = _RecordedUsage() - stream._content_blocks: "list[str]" = [] + stream._content_blocks = [] stream._response_id = None def _accumulate_event_data( - self, event: "Union[RawMessageStreamEvent, MessageStreamEvent]" -): + stream: "Union[Stream, MessageStream]", + event: "Union[RawMessageStreamEvent, MessageStreamEvent]", +) -> None: """ Update accumulated output from a single stream event. """ (model, usage, content_blocks, response_id) = _collect_ai_data( event, - self._model, - self._usage, - self._content_blocks, - self._response_id, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, ) - self._model = model - self._usage = usage - self._content_blocks = content_blocks - self._response_id = response_id + stream._model = model + stream._usage = usage + stream._content_blocks = content_blocks + stream._response_id = response_id def _finish_streaming_span( @@ -749,7 +750,7 @@ def _finish_streaming_span( usage: "_RecordedUsage", content_blocks: "list[str]", response_id: "Optional[str]", -): +) -> None: """ Set output attributes on the AI Client Span and end the span. """ From 40e2bf029dd609bb9b1ded5acb7ba78b052001cf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 13:02:19 +0100 Subject: [PATCH 05/23] . --- sentry_sdk/integrations/anthropic.py | 2 +- tests/integrations/anthropic/test_anthropic.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 27acdafbe4..98d4b50ed9 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -123,7 +123,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if version >= (0, 26, 2): + if version is not None and version >= (0, 26, 2): MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index d9d5c0c30e..bf89037660 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -821,6 +821,7 @@ def test_stream_messages_iterator_methods( next(stream) next(stream) list(islice(stream, 1)) + next(stream) # New versions add TextEvent, so consume one more event. if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) From a21406f13fa28ce29fca0506f9f890cffe29d457 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 13:59:05 +0100 Subject: [PATCH 06/23] fix(anthropic): Respect iterator protocol in asynchronous streamed responses --- sentry_sdk/integrations/anthropic.py | 296 +++++++++--- .../integrations/anthropic/test_anthropic.py | 430 ++++++++++++++++++ 2 files changed, 664 insertions(+), 62 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 98d4b50ed9..ab1422b545 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -43,6 +43,7 @@ MessageStreamManager, MessageStream, AsyncMessageStreamManager, + AsyncMessageStream, ) from anthropic.types import ( @@ -60,7 +61,15 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable + from typing import ( + Any, + AsyncIterator, + Iterator, + Optional, + Union, + Callable, + Awaitable, + ) from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -71,7 +80,6 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -94,12 +102,15 @@ def setup_once() -> None: _check_minimum_version(AnthropicIntegration, version) """ - client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. + Analogously, an AsyncStream instance can be returned, which implements the asynchronous iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept - streamed events. The streamed events are used to populate output attributes on the AI Client Span. + streamed events (and analogously, asynchronous iterators are consumed using __aiter__ or __anext__). The + streamed events are used to populate output attributes on the AI Client Span. The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + the finally block in the __iter__/__aiter__ patch closes the span. """ Messages.create = _wrap_message_create(Messages.create) Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) @@ -107,6 +118,9 @@ def setup_once() -> None: Stream.close = _wrap_stream_close(Stream.close) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + AsyncStream.__aiter__ = _wrap_async_stream_aiter(AsyncStream.__aiter__) + AsyncStream.__anext__ = _wrap_async_stream_anext(AsyncStream.__anext__) + AsyncStream.close = _wrap_async_stream_close(AsyncStream.close) """ client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. @@ -121,6 +135,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ ) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) + AsyncMessageStreamManager.__aenter__ = ( + _wrap_async_message_stream_manager_aenter( + AsyncMessageStreamManager.__aenter__ + ) + ) + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. if version is not None and version >= (0, 26, 2): @@ -128,12 +149,15 @@ def setup_once() -> None: MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) - AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) - AsyncMessageStreamManager.__aenter__ = ( - _wrap_async_message_stream_manager_aenter( - AsyncMessageStreamManager.__aenter__ + AsyncMessageStream.__aiter__ = _wrap_async_message_stream_aiter( + AsyncMessageStream.__aiter__ + ) + AsyncMessageStream.__anext__ = _wrap_async_message_stream_anext( + AsyncMessageStream.__anext__ + ) + AsyncMessageStream.close = _wrap_async_message_stream_close( + AsyncMessageStream.close ) - ) def _capture_exception(exc: "Any") -> None: @@ -477,19 +501,14 @@ def _wrap_synchronous_message_iterator( async def _wrap_asynchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - + generator_exit = False try: async for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -508,41 +527,25 @@ async def _wrap_asynchronous_message_iterator( yield event continue - ( - model, - usage, - content_blocks, - response_id, - ) = _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - ) + _accumulate_event_data(stream, event) yield event + except ( + GeneratorExit + ): # https://docs.python.org/3/reference/expressions.html#generator.close + generator_exit = True + raise finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - ) + if not generator_exit and hasattr(stream, "_span"): + _finish_streaming_span( + stream._span, + stream._integration, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + ) + del stream._span def _set_output_data( @@ -625,17 +628,11 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs - if isinstance(result, Stream): + if isinstance(result, (Stream, AsyncStream)): result._span = span result._integration = integration return result - if isinstance(result, AsyncStream): - result._iterator = _wrap_asynchronous_message_iterator( - result._iterator, span, integration - ) - return result - with capture_internal_exceptions(): if hasattr(result, "content"): ( @@ -901,6 +898,94 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_async +def _wrap_async_stream_aiter( + f: "Callable[..., AsyncIterator[RawMessageStreamEvent]]", +) -> "Callable[..., AsyncIterator[RawMessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + async def __aiter__(self: "AsyncStream") -> "AsyncIterator[RawMessageStreamEvent]": + if not hasattr(self, "_span"): + async for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + async for event in _wrap_asynchronous_message_iterator( + self, + f(self), + ): + yield event + + return __aiter__ + + +def _wrap_async_stream_anext( + f: "Callable[..., Awaitable[RawMessageStreamEvent]]", +) -> "Callable[..., Awaitable[RawMessageStreamEvent]]": + """ + Accumulates output data from the returned event. + """ + + async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = await f(self) + except StopAsyncIteration: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) + + _accumulate_event_data(self, event) + return event + + return __anext__ + + +def _wrap_async_stream_close( + f: "Callable[..., Awaitable[None]]", +) -> "Callable[..., Awaitable[None]]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + async def close(self: "Stream") -> None: + if not hasattr(self, "_span"): + return await f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return await f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return await f(self) + + return close + + def _wrap_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. @@ -1138,17 +1223,104 @@ async def _sentry_patched_aenter( tools=self._tools, ) - stream._iterator = _wrap_asynchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, - ) + stream._span = span + stream._integration = integration return stream return _sentry_patched_aenter +def _wrap_async_message_stream_aiter( + f: "Callable[..., AsyncIterator[MessageStreamEvent]]", +) -> "Callable[..., AsyncIterator[MessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + async def __aiter__( + self: "AsyncMessageStream", + ) -> "AsyncIterator[MessageStreamEvent]": + if not hasattr(self, "_span"): + async for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + async for event in _wrap_asynchronous_message_iterator( + self, + f(self), + ): + yield event + + return __aiter__ + + +def _wrap_async_message_stream_anext( + f: "Callable[..., Awaitable[MessageStreamEvent]]", +) -> "Callable[..., Awaitable[MessageStreamEvent]]": + """ + Accumulates output data from the returned event. + """ + + async def __anext__(self: "AsyncMessageStream") -> "MessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = await f(self) + except StopAsyncIteration: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) + + _accumulate_event_data(self, event) + return event + + return __anext__ + + +def _wrap_async_message_stream_close( + f: "Callable[..., Awaitable[None]]", +) -> "Callable[..., Awaitable[None]]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + async def close(self: "AsyncMessageStream") -> None: + if not hasattr(self, "_span"): + return await f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return await f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return await f(self) + + return close + + def _is_given(obj: "Any") -> bool: """ Check for givenness safely across different anthropic versions. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index bf89037660..4818e47ed8 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -2,6 +2,7 @@ from unittest import mock import json from itertools import islice +from builtins import anext try: from unittest.mock import AsyncMock @@ -974,6 +975,218 @@ async def test_streaming_create_message_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +@pytest.mark.asyncio +async def test_streaming_create_message_async_next_consumption( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopAsyncIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + while True: + await anext(messages) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +@pytest.mark.asyncio +async def test_streaming_create_message_async_iterator_methods( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = await client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + await anext(messages) + await anext(messages) + + async for item in messages: + break + + await anext(messages) + await messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", @@ -1095,6 +1308,223 @@ async def test_stream_message_async( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +@pytest.mark.asyncio +async def test_stream_messages_async_next_consumption( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopAsyncIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + while True: + await anext(stream) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +@pytest.mark.asyncio +async def test_stream_messages_async_iterator_methods( + sentry_init, + capture_events, + get_model_response, + async_iterator, + server_side_event_chunks, +): + client = AsyncAnthropic(api_key="z") + + response = get_model_response( + async_iterator( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + async with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + await anext(stream) + await anext(stream) + + async for item in stream: + break + + await anext(stream) + # New versions add TextEvent, so consume one more event. + if TextEvent is not None and isinstance(await anext(stream), TextEvent): + await anext(stream) + await stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.skipif( ANTHROPIC_VERSION < (0, 27), reason="Versions <0.27.0 do not include InputJSONDelta, which was introduced in >=0.27.0 along with a new message delta type for tool calling.", From a3cc18ff19c59bd708abce2cd715b8bb59e0f2bd Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 14:03:45 +0100 Subject: [PATCH 07/23] simplify --- sentry_sdk/integrations/anthropic.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 98d4b50ed9..4da1752895 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -123,7 +123,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if version is not None and version >= (0, 26, 2): + if not issubclass(MessageStream, Stream): MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) @@ -786,8 +786,7 @@ def _wrap_stream_iter( def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": if not hasattr(self, "_span"): - for event in f(self): - yield event + yield from f(self) return _initialize_data_accumulation_state(self) @@ -988,8 +987,7 @@ def _wrap_message_stream_iter( def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": if not hasattr(self, "_span"): - for event in f(self): - yield event + yield from f(self) return _initialize_data_accumulation_state(self) From dd26abcaebd8e033915e1a2bb37472e19b6e2005 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 14:06:31 +0100 Subject: [PATCH 08/23] simplify --- sentry_sdk/integrations/anthropic.py | 15 +++++++----- .../integrations/anthropic/test_anthropic.py | 23 ++++++++++--------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index d49341d16b..71b6915efc 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -135,6 +135,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ ) + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if not issubclass(MessageStream, Stream): + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) + AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( _wrap_async_message_stream_manager_aenter( @@ -143,12 +150,8 @@ def setup_once() -> None: ) # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a - # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if not issubclass(MessageStream, Stream): - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) - MessageStream.close = _wrap_message_stream_close(MessageStream.close) - + # AsyncMessageStream inherits from AsyncStream, so patching Stream is sufficient on these versions. + if not issubclass(AsyncMessageStream, AsyncStream): AsyncMessageStream.__aiter__ = _wrap_async_message_stream_aiter( AsyncMessageStream.__aiter__ ) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 4818e47ed8..b250aefccf 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -2,7 +2,6 @@ from unittest import mock import json from itertools import islice -from builtins import anext try: from unittest.mock import AsyncMock @@ -1049,7 +1048,7 @@ async def test_streaming_create_message_async_next_consumption( ) while True: - await anext(messages) + await messages.__anext__() assert len(events) == 1 (event,) = events @@ -1151,13 +1150,13 @@ async def test_streaming_create_message_async_iterator_methods( max_tokens=1024, messages=messages, model="model", stream=True ) - await anext(messages) - await anext(messages) + await messages.__anext__() + await messages.__anext__() async for item in messages: break - await anext(messages) + await messages.__anext__() await messages.close() assert len(events) == 1 @@ -1383,7 +1382,7 @@ async def test_stream_messages_async_next_consumption( model="model", ) as stream: while True: - await anext(stream) + await stream.__anext__() assert len(events) == 1 (event,) = events @@ -1486,16 +1485,18 @@ async def test_stream_messages_async_iterator_methods( messages=messages, model="model", ) as stream: - await anext(stream) - await anext(stream) + await stream.__anext__() + await stream.__anext__() async for item in stream: break - await anext(stream) + await stream.__anext__() # New versions add TextEvent, so consume one more event. - if TextEvent is not None and isinstance(await anext(stream), TextEvent): - await anext(stream) + if TextEvent is not None and isinstance( + await stream.__anext__(), TextEvent + ): + await stream.__anext__() await stream.close() assert len(events) == 1 From 0aeec7262f09978d76cce84c7d9d5ee649b63a95 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 12:56:47 +0100 Subject: [PATCH 09/23] update tests --- tests/integrations/anthropic/test_anthropic.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 487c69e208..4fd3c48228 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -371,7 +371,7 @@ def test_streaming_create_message_next_consumption( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -431,6 +431,7 @@ def test_streaming_create_message_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] def test_streaming_create_message_iterator_methods( @@ -470,7 +471,7 @@ def test_streaming_create_message_iterator_methods( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -689,7 +690,7 @@ def test_stream_messages_next_consumption( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -750,6 +751,7 @@ def test_stream_messages_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] def test_stream_messages_iterator_methods( @@ -789,7 +791,7 @@ def test_stream_messages_iterator_methods( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), From c5cd95933f6e81c7923b75c18bd159d5d8c17f65 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:03:38 +0100 Subject: [PATCH 10/23] . --- sentry_sdk/integrations/anthropic.py | 2 ++ tests/integrations/anthropic/test_anthropic.py | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5a19127545..e447f6e393 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -962,6 +962,7 @@ async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": self._usage, self._content_blocks, self._response_id, + self._finish_reason, ) del self._span reraise(*exc_info) @@ -994,6 +995,7 @@ async def close(self: "Stream") -> None: self._usage, self._content_blocks, self._response_id, + self._finish_reason, ) del self._span diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 6038917a3b..faf7b4c262 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1083,6 +1083,7 @@ async def test_streaming_create_message_async_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.asyncio @@ -1192,7 +1193,6 @@ async def test_streaming_create_message_async_iterator_methods( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.asyncio @@ -1418,6 +1418,7 @@ async def test_stream_messages_async_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.asyncio @@ -1533,7 +1534,6 @@ async def test_stream_messages_async_iterator_methods( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] @pytest.mark.skipif( From b92db6df0e143f8b1b57c383852ed81dfb80a496 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:15:04 +0100 Subject: [PATCH 11/23] docstring --- sentry_sdk/integrations/anthropic.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 575ff26fd4..ed87a8a530 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -94,12 +94,15 @@ def setup_once() -> None: _check_minimum_version(AnthropicIntegration, version) """ - client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept streamed events. The streamed events are used to populate output attributes on the AI Client Span. - The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. """ Messages.create = _wrap_message_create(Messages.create) Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) @@ -109,12 +112,15 @@ def setup_once() -> None: AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept streamed events. The streamed events are used to populate output attributes on the AI Client Span. - The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( From beb8f2c792b7b1108807e7d08d5b6fc080977178 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:26:45 +0100 Subject: [PATCH 12/23] docstrings --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index ed87a8a530..4cbaba0f4a 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -802,7 +802,8 @@ def _wrap_stream_iter( ) -> "Callable[..., Iterator[RawMessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": @@ -824,6 +825,7 @@ def _wrap_stream_next( ) -> "Callable[..., RawMessageStreamEvent]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ def __next__(self: "Stream") -> "RawMessageStreamEvent": @@ -858,7 +860,8 @@ def _wrap_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ def close(self: "Stream") -> None: From 7ec95fe46b97728d62717b4a9d84503e1fc3a6a1 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:28:38 +0100 Subject: [PATCH 13/23] docs --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 4cbaba0f4a..bdad51adef 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -1008,7 +1008,8 @@ def _wrap_message_stream_iter( ) -> "Callable[..., Iterator[MessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": @@ -1030,6 +1031,7 @@ def _wrap_message_stream_next( ) -> "Callable[..., MessageStreamEvent]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ def __next__(self: "MessageStream") -> "MessageStreamEvent": @@ -1064,7 +1066,8 @@ def _wrap_message_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ def close(self: "MessageStream") -> None: From 8e9bfabbd7211e505c39855eb666f444d19f9559 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:31:37 +0100 Subject: [PATCH 14/23] docstrings --- sentry_sdk/integrations/anthropic.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 3fcf9d4bed..8727578a0c 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -929,7 +929,8 @@ def _wrap_async_stream_aiter( ) -> "Callable[..., AsyncIterator[RawMessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ async def __aiter__(self: "AsyncStream") -> "AsyncIterator[RawMessageStreamEvent]": @@ -953,6 +954,7 @@ def _wrap_async_stream_anext( ) -> "Callable[..., Awaitable[RawMessageStreamEvent]]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": @@ -987,7 +989,8 @@ def _wrap_async_stream_close( f: "Callable[..., Awaitable[None]]", ) -> "Callable[..., Awaitable[None]]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ async def close(self: "Stream") -> None: @@ -1268,7 +1271,8 @@ def _wrap_async_message_stream_aiter( ) -> "Callable[..., AsyncIterator[MessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ async def __aiter__( @@ -1294,6 +1298,7 @@ def _wrap_async_message_stream_anext( ) -> "Callable[..., Awaitable[MessageStreamEvent]]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ async def __anext__(self: "AsyncMessageStream") -> "MessageStreamEvent": @@ -1328,7 +1333,8 @@ def _wrap_async_message_stream_close( f: "Callable[..., Awaitable[None]]", ) -> "Callable[..., Awaitable[None]]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ async def close(self: "AsyncMessageStream") -> None: From fab5d93b6503af639e7362ab0f62899ef21116e0 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:46:51 +0100 Subject: [PATCH 15/23] type annotation --- sentry_sdk/integrations/anthropic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 8727578a0c..9056b8dbd3 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -993,7 +993,7 @@ def _wrap_async_stream_close( the except block in the `__next__()` patch runs first. """ - async def close(self: "Stream") -> None: + async def close(self: "AsyncStream") -> None: if not hasattr(self, "_span"): return await f(self) From cda41e20c60ed25fc4a9fc99d62c90531c50ace6 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:53:52 +0100 Subject: [PATCH 16/23] review --- sentry_sdk/integrations/anthropic.py | 50 +++++++++++++--------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index bdad51adef..40afd82d3d 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -835,19 +835,17 @@ def __next__(self: "Stream") -> "RawMessageStreamEvent": except StopIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -870,6 +868,7 @@ def close(self: "Stream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return f(self) _finish_streaming_span( @@ -1041,19 +1040,17 @@ def __next__(self: "MessageStream") -> "MessageStreamEvent": except StopIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -1076,6 +1073,7 @@ def close(self: "MessageStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return f(self) _finish_streaming_span( From e17e0364090c01af3979cccd3027b727319ff9ae Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:57:04 +0100 Subject: [PATCH 17/23] review --- sentry_sdk/integrations/anthropic.py | 50 +++++++++++++--------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 7f7dbae534..4f54f18cf9 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -963,19 +963,17 @@ async def __anext__(self: "AsyncStream") -> "RawMessageStreamEvent": except StopAsyncIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -998,6 +996,7 @@ async def close(self: "AsyncStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return await f(self) _finish_streaming_span( @@ -1306,19 +1305,17 @@ async def __anext__(self: "AsyncMessageStream") -> "MessageStreamEvent": except StopAsyncIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -1341,6 +1338,7 @@ async def close(self: "AsyncMessageStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return await f(self) _finish_streaming_span( From 31869afeff1c812d127825e5602dd3754be716f3 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 15:37:28 +0100 Subject: [PATCH 18/23] simplify --- sentry_sdk/integrations/anthropic.py | 165 +++---------- .../integrations/anthropic/test_anthropic.py | 217 +----------------- 2 files changed, 34 insertions(+), 348 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 40afd82d3d..5c3f94b68f 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -95,32 +95,30 @@ def setup_once() -> None: """ client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. - The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept - streamed events. The streamed events are used to populate output attributes on the AI Client Span. + The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, + information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. - The span is finished in two possible places: - - When the user exits the context manager or directly calls close(), the patched close() ends the span. - - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + The span can be finished in two places: + - When the user exits the context manager or directly calls close(), the patched close() finishes the span. + - When iteration ends, the finally block in the _iterator wrapper finishes the span. - Both paths may run, for example, when the iterator is exhausted and then the context manager exits. + Both paths may run. For example, the context manager exit can follow iterator exhaustion. """ Messages.create = _wrap_message_create(Messages.create) - Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) - Stream.__next__ = _wrap_stream_next(Stream.__next__) Stream.close = _wrap_stream_close(Stream.close) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. - The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept - streamed events. The streamed events are used to populate output attributes on the AI Client Span. + client.messages.stream() can return an instance of the Stream class, which implements the iterator protocol. + The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, + information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. - The span is finished in two possible places: - - When the user exits the context manager or directly calls close(), the patched close() ends the span. - - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + The span can be finished in two places: + - When the user exits the context manager or directly calls close(), the patched close() finishes the span. + - When iteration ends, the finally block in the _iterator wrapper finishes the span. - Both paths may run, for example, when the iterator is exhausted and then the context manager exits. + Both paths may run. For example, the context manager exit can follow iterator exhaustion. """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( @@ -130,8 +128,6 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. if not issubclass(MessageStream, Stream): - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) @@ -443,7 +439,6 @@ def _wrap_synchronous_message_iterator( Sets information received while iterating the response stream on the AI Client Span. Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - generator_exit = False try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -464,14 +459,9 @@ def _wrap_synchronous_message_iterator( _accumulate_event_data(stream, event) yield event - except ( - GeneratorExit - ): # https://docs.python.org/3/reference/expressions.html#generator.close - generator_exit = True - raise finally: with capture_internal_exceptions(): - if not generator_exit and hasattr(stream, "_span"): + if hasattr(stream, "_span"): _finish_streaming_span( stream._span, stream._integration, @@ -643,6 +633,13 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A if isinstance(result, Stream): result._span = span result._integration = integration + + _initialize_data_accumulation_state(result) + result._iterator = _wrap_synchronous_message_iterator( + result, + result._iterator, + ) + return result if isinstance(result, AsyncStream): @@ -797,63 +794,6 @@ def _finish_streaming_span( ) -def _wrap_stream_iter( - f: "Callable[..., Iterator[RawMessageStreamEvent]]", -) -> "Callable[..., Iterator[RawMessageStreamEvent]]": - """ - Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and ends the span (unless the `close()` - or `__next__()` patches have already closed it). - """ - - def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": - if not hasattr(self, "_span"): - yield from f(self) - return - - _initialize_data_accumulation_state(self) - yield from _wrap_synchronous_message_iterator( - self, - f(self), - ) - - return __iter__ - - -def _wrap_stream_next( - f: "Callable[..., RawMessageStreamEvent]", -) -> "Callable[..., RawMessageStreamEvent]": - """ - Accumulates output data from the returned event. - Closes the AI Client Span if `StopIteration` is raised. - """ - - def __next__(self: "Stream") -> "RawMessageStreamEvent": - _initialize_data_accumulation_state(self) - try: - event = f(self) - except StopIteration: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - if hasattr(self, "_span"): - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span - reraise(*exc_info) - - _accumulate_event_data(self, event) - return event - - return __next__ - - def _wrap_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": @@ -997,66 +937,15 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": stream._span = span stream._integration = integration - return stream - - return _sentry_patched_enter - - -def _wrap_message_stream_iter( - f: "Callable[..., Iterator[MessageStreamEvent]]", -) -> "Callable[..., Iterator[MessageStreamEvent]]": - """ - Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and ends the span (unless the `close()` - or `__next__()` patches have already closed it). - """ - - def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": - if not hasattr(self, "_span"): - yield from f(self) - return - - _initialize_data_accumulation_state(self) - yield from _wrap_synchronous_message_iterator( - self, - f(self), + _initialize_data_accumulation_state(stream) + stream._iterator = _wrap_synchronous_message_iterator( + stream, + stream._iterator, ) - return __iter__ - - -def _wrap_message_stream_next( - f: "Callable[..., MessageStreamEvent]", -) -> "Callable[..., MessageStreamEvent]": - """ - Accumulates output data from the returned event. - Closes the AI Client Span if `StopIteration` is raised. - """ - - def __next__(self: "MessageStream") -> "MessageStreamEvent": - _initialize_data_accumulation_state(self) - try: - event = f(self) - except StopIteration: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - if hasattr(self, "_span"): - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span - reraise(*exc_info) - - _accumulate_event_data(self, event) - return event + return stream - return __next__ + return _sentry_patched_enter def _wrap_message_stream_close( diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 4fd3c48228..2139d74a1b 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,7 +1,6 @@ import pytest from unittest import mock import json -from itertools import islice try: from unittest.mock import AsyncMock @@ -334,7 +333,7 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] -def test_streaming_create_message_next_consumption( +def test_streaming_create_message_close( sentry_init, capture_events, get_model_response, @@ -393,7 +392,7 @@ def test_streaming_create_message_next_consumption( } ] - with pytest.raises(StopIteration), mock.patch.object( + with mock.patch.object( client._client, "send", return_value=response, @@ -403,110 +402,9 @@ def test_streaming_create_message_next_consumption( max_tokens=1024, messages=messages, model="model", stream=True ) - while True: + for _ in range(4): next(messages) - assert len(events) == 1 - (event,) = events - - assert event["type"] == "transaction" - assert event["transaction"] == "anthropic" - - span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) - - assert span["op"] == OP.GEN_AI_CHAT - assert span["description"] == "chat model" - assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" - assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" - assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" - - assert ( - span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] - == '[{"role": "user", "content": "Hello, Claude"}]' - ) - assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" - - assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 - assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True - assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] - - -def test_streaming_create_message_iterator_methods( - sentry_init, - capture_events, - get_model_response, - server_side_event_chunks, -): - client = Anthropic(api_key="z") - - response = get_model_response( - server_side_event_chunks( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="max_tokens"), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] - ) - ) - - sentry_init( - integrations=[AnthropicIntegration(include_prompts=True)], - traces_sample_rate=1.0, - send_default_pii=True, - ) - events = capture_events() - - messages = [ - { - "role": "user", - "content": "Hello, Claude", - } - ] - - with mock.patch.object( - client._client, - "send", - return_value=response, - ) as _: - with start_transaction(name="anthropic"): - messages = client.messages.create( - max_tokens=1024, messages=messages, model="model", stream=True - ) - - next(messages) - next(messages) - list(islice(messages, 1)) - next(messages) messages.close() assert len(events) == 1 @@ -653,7 +551,7 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] -def test_stream_messages_next_consumption( +def test_stream_messages_close( sentry_init, capture_events, get_model_response, @@ -712,7 +610,7 @@ def test_stream_messages_next_consumption( } ] - with pytest.raises(StopIteration), mock.patch.object( + with mock.patch.object( client._client, "send", return_value=response, @@ -723,114 +621,13 @@ def test_stream_messages_next_consumption( messages=messages, model="model", ) as stream: - while True: + for _ in range(4): next(stream) - assert len(events) == 1 - (event,) = events - - assert event["type"] == "transaction" - assert event["transaction"] == "anthropic" - - span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) - - assert span["op"] == OP.GEN_AI_CHAT - assert span["description"] == "chat model" - assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" - assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" - assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" - - assert ( - span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] - == '[{"role": "user", "content": "Hello, Claude"}]' - ) - assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" - - assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 - assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True - assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] - - -def test_stream_messages_iterator_methods( - sentry_init, - capture_events, - get_model_response, - server_side_event_chunks, -): - client = Anthropic(api_key="z") - - response = get_model_response( - server_side_event_chunks( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="max_tokens"), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] - ) - ) - - sentry_init( - integrations=[AnthropicIntegration(include_prompts=True)], - traces_sample_rate=1.0, - send_default_pii=True, - ) - events = capture_events() - - messages = [ - { - "role": "user", - "content": "Hello, Claude", - } - ] - - with mock.patch.object( - client._client, - "send", - return_value=response, - ) as _: - with start_transaction(name="anthropic"): - with client.messages.stream( - max_tokens=1024, - messages=messages, - model="model", - ) as stream: - next(stream) - next(stream) - list(islice(stream, 1)) - next(stream) # New versions add TextEvent, so consume one more event. if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) + stream.close() assert len(events) == 1 From 1017e9eee15b2325934ee2e2cca8482db56a86dc Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 15:40:19 +0100 Subject: [PATCH 19/23] simplify --- sentry_sdk/integrations/anthropic.py | 44 ++++------------------------ 1 file changed, 5 insertions(+), 39 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5c3f94b68f..514063f3f3 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -105,7 +105,7 @@ def setup_once() -> None: Both paths may run. For example, the context manager exit can follow iterator exhaustion. """ Messages.create = _wrap_message_create(Messages.create) - Stream.close = _wrap_stream_close(Stream.close) + Stream.close = _wrap_close(Stream.close) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) @@ -128,7 +128,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. if not issubclass(MessageStream, Stream): - MessageStream.close = _wrap_message_stream_close(MessageStream.close) + MessageStream.close = _wrap_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -794,15 +794,14 @@ def _finish_streaming_span( ) -def _wrap_stream_close( +def _wrap_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or - the except block in the `__next__()` patch runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self: "Stream") -> None: + def close(self: "Union[Stream, MessageStream]") -> None: if not hasattr(self, "_span"): return f(self) @@ -948,39 +947,6 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": return _sentry_patched_enter -def _wrap_message_stream_close( - f: "Callable[..., None]", -) -> "Callable[..., None]": - """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or - the except block in the `__next__()` patch runs first. - """ - - def close(self: "MessageStream") -> None: - if not hasattr(self, "_span"): - return f(self) - - if not hasattr(self, "_model"): - self._span.__exit__(None, None, None) - del self._span - return f(self) - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span - - return f(self) - - return close - - def _wrap_async_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. From fcedba70cf97c2163883f19a46f840d9aadbd390 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 15:59:35 +0100 Subject: [PATCH 20/23] . --- sentry_sdk/integrations/anthropic.py | 34 ++++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 514063f3f3..0592e337de 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -96,7 +96,7 @@ def setup_once() -> None: """ client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, - information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. + information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. The span can be finished in two places: - When the user exits the context manager or directly calls close(), the patched close() finishes the span. @@ -110,9 +110,9 @@ def setup_once() -> None: AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() can return an instance of the Stream class, which implements the iterator protocol. + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, - information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. + information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. The span can be finished in two places: - When the user exits the context manager or directly calls close(), the patched close() finishes the span. @@ -463,13 +463,13 @@ def _wrap_synchronous_message_iterator( with capture_internal_exceptions(): if hasattr(stream, "_span"): _finish_streaming_span( - stream._span, - stream._integration, - stream._model, - stream._usage, - stream._content_blocks, - stream._response_id, - stream._finish_reason, + span=stream._span, + integration=stream._integration, + model=stream._model, + usage=stream._usage, + content_blocks=stream._content_blocks, + response_id=stream._response_id, + finish_reason=stream._finish_reason, ) del stream._span @@ -811,13 +811,13 @@ def close(self: "Union[Stream, MessageStream]") -> None: return f(self) _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, + span=self._span, + integration=self._integration, + model=self._model, + usage=self._usage, + content_blocks=self._content_blocks, + response_id=self._response_id, + finish_reason=self._finish_reason, ) del self._span From e2f7afc2cf38472318d1362273ace897b6b85cc5 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 16:03:40 +0100 Subject: [PATCH 21/23] . --- sentry_sdk/integrations/anthropic.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 59ff89a868..1dcef160de 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -498,7 +498,6 @@ async def _wrap_asynchronous_message_iterator( Sets information received while iterating the response stream on the AI Client Span. Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - generator_exit = False try: async for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -519,14 +518,9 @@ async def _wrap_asynchronous_message_iterator( _accumulate_event_data(stream, event) yield event - except ( - GeneratorExit - ): # https://docs.python.org/3/reference/expressions.html#generator.close - generator_exit = True - raise finally: with capture_internal_exceptions(): - if not generator_exit and hasattr(stream, "_span"): + if hasattr(stream, "_span"): _finish_streaming_span( span=stream._span, integration=stream._integration, From c79fe89ef0301073a6212bc4b52d3d42a11e9e7b Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Wed, 18 Mar 2026 14:25:13 +0100 Subject: [PATCH 22/23] docstring --- sentry_sdk/integrations/anthropic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 7b1f3d9201..9ed0f2d650 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -487,7 +487,7 @@ async def _wrap_asynchronous_message_iterator( ) -> "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. + Responsible for closing the AI Client Span unless the span has already been closed in the close() patch. """ try: async for event in iterator: @@ -851,7 +851,7 @@ def _wrap_async_close( f: "Callable[..., Awaitable[None]]", ) -> "Callable[..., Awaitable[None]]": """ - Closes the AI Client Span, unless the finally block in `_wrap_asynchronous_message_iterator()` runs first. + Closes the AI Client Span unless the finally block in `_wrap_asynchronous_message_iterator()` runs first. """ async def close(self: "AsyncStream") -> None: From e3b6b606b285ffd46c675bed58f6c0339d57922c Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Wed, 18 Mar 2026 14:31:48 +0100 Subject: [PATCH 23/23] fix type annotation --- sentry_sdk/integrations/anthropic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 9ed0f2d650..981e8d5056 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -482,7 +482,7 @@ def _wrap_synchronous_message_iterator( async def _wrap_asynchronous_message_iterator( - stream: "Union[Stream, MessageStream]", + stream: "Union[AsyncStream, AsyncMessageStream]", iterator: "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", ) -> "AsyncIterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """