From e3cc76d4516c8d4945d92c9b5f8846f69a36b3cf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 11:24:31 +0100 Subject: [PATCH 01/15] fix(anthropic): Respect iterator protocol in synchronous streamed responses --- sentry_sdk/integrations/anthropic.py | 334 +++++++++++++-- .../integrations/anthropic/test_anthropic.py | 405 ++++++++++++++++++ 2 files changed, 692 insertions(+), 47 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index bc208ac4f5..efb8638642 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -39,7 +39,11 @@ from anthropic import Stream, AsyncStream from anthropic.resources import AsyncMessages, Messages - from anthropic.lib.streaming import MessageStreamManager, AsyncMessageStreamManager + from anthropic.lib.streaming import ( + MessageStreamManager, + MessageStream, + AsyncMessageStreamManager, + ) from anthropic.types import ( MessageStartEvent, @@ -56,7 +60,7 @@ raise DidNotEnable("Anthropic not installed") if TYPE_CHECKING: - from typing import Any, AsyncIterator, Iterator, Optional, Union + from typing import Any, AsyncIterator, Iterator, Optional, Union, Callable from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart @@ -67,7 +71,7 @@ TextBlockParam, ToolUnionParam, ) - from anthropic.lib.streaming import MessageStream, AsyncMessageStream + from anthropic.lib.streaming import AsyncMessageStream class _RecordedUsage: @@ -89,13 +93,36 @@ def setup_once() -> None: version = package_version("anthropic") _check_minimum_version(AnthropicIntegration, version) + """ + client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The close() method is patched for situations in which the method is directly invoked by the user, and otherwise + the finally block in the __iter__ patch closes the span. + """ Messages.create = _wrap_message_create(Messages.create) + Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) + Stream.__next__ = _wrap_stream_next(Stream.__next__) + Stream.close = _wrap_stream_close(Stream.close) + AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) + """ + client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. + The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept + streamed events. The streamed events are used to populate output attributes on the AI Client Span. + + The close() method is patched for situations in which the method is directly invoked by the user, and otherwise + the finally block in the __iter__ patch closes the span. + """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -398,20 +425,14 @@ def _set_create_input_data( def _wrap_synchronous_message_iterator( + stream: "Union[Stream, MessageStream]", iterator: "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]", - span: "Span", - integration: "AnthropicIntegration", ) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span. + Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - response_id = None - + generator_exit = False try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -430,36 +451,25 @@ def _wrap_synchronous_message_iterator( yield event continue - (model, usage, content_blocks, response_id) = _collect_ai_data( - event, - model, - usage, - content_blocks, - response_id, - ) + _accumulate_event_data(stream, event) yield event + except ( + GeneratorExit + ): # https://docs.python.org/3/reference/expressions.html#generator.close + generator_exit = True + raise finally: with capture_internal_exceptions(): - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - response_id=response_id, - ) + if not generator_exit and hasattr(stream, "_span"): + _finish_streaming_span( + stream._span, + stream._integration, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, + ) + del stream._span async def _wrap_asynchronous_message_iterator( @@ -612,9 +622,8 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs if isinstance(result, Stream): - result._iterator = _wrap_synchronous_message_iterator( - result._iterator, span, integration - ) + result._span = span + result._integration = integration return result if isinstance(result, AsyncStream): @@ -698,6 +707,155 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]"): + """ + Initialize fields for accumulating output on the Stream instance. + """ + if not hasattr(stream, "_model"): + stream._model = None + stream._usage = _RecordedUsage() + stream._content_blocks: "list[str]" = [] + stream._response_id = None + + +def _accumulate_event_data( + self, event: "Union[RawMessageStreamEvent, MessageStreamEvent]" +): + """ + Update accumulated output from a single stream event. + """ + (model, usage, content_blocks, response_id) = _collect_ai_data( + event, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + + self._model = model + self._usage = usage + self._content_blocks = content_blocks + self._response_id = response_id + + +def _finish_streaming_span( + span: "Span", + integration: "AnthropicIntegration", + model: "Optional[str]", + usage: "_RecordedUsage", + content_blocks: "list[str]", + response_id: "Optional[str]", +): + """ + Set output attributes on the AI Client Span and end the span. + """ + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + response_id=response_id, + ) + + +def _wrap_stream_iter( + f: "Callable[..., Iterator[RawMessageStreamEvent]]", +) -> "Callable[..., Iterator[RawMessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + def __iter__(self) -> "Iterator[RawMessageStreamEvent]": + if not hasattr(self, "_span"): + for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_stream_next( + f: "Callable[..., RawMessageStreamEvent]", +) -> "Callable[..., RawMessageStreamEvent]": + """ + Accumulates output data from the returned event. + """ + + def __next__(self) -> "RawMessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + raise + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self) -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return f(self) + + return close + + def _wrap_message_create_async(f: "Any") -> "Any": async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": gen = _sentry_patched_create_common(f, *args, **kwargs) @@ -805,17 +963,99 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": tools=self._tools, ) - stream._iterator = _wrap_synchronous_message_iterator( - iterator=stream._iterator, - span=span, - integration=integration, - ) + stream._span = span + stream._integration = integration return stream return _sentry_patched_enter +def _wrap_message_stream_iter( + f: "Callable[..., Iterator[MessageStreamEvent]]", +) -> "Callable[..., Iterator[MessageStreamEvent]]": + """ + Accumulates output data while iterating. When the returned iterator ends, set + output attributes on the AI Client Span and end the span. + """ + + def __iter__(self) -> "Iterator[MessageStreamEvent]": + if not hasattr(self, "_span"): + for event in f(self): + yield event + return + + _initialize_data_accumulation_state(self) + yield from _wrap_synchronous_message_iterator( + self, + f(self), + ) + + return __iter__ + + +def _wrap_message_stream_next( + f: "Callable[..., MessageStreamEvent]", +) -> "Callable[..., MessageStreamEvent]": + """ + Accumulates output data from the returned event. + """ + + def __next__(self) -> "MessageStreamEvent": + _initialize_data_accumulation_state(self) + try: + event = f(self) + except StopIteration: + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + raise + + _accumulate_event_data(self, event) + return event + + return __next__ + + +def _wrap_message_stream_close( + f: "Callable[..., None]", +) -> "Callable[..., None]": + """ + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + """ + + def close(self) -> None: + if not hasattr(self, "_span"): + return f(self) + + if not hasattr(self, "_model"): + self._span.__exit__(None, None, None) + return f(self) + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + + return f(self) + + return close + + def _wrap_async_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 3a854e3a4e..e3cde11b62 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,6 +1,7 @@ import pytest from unittest import mock import json +from itertools import islice try: from unittest.mock import AsyncMock @@ -325,6 +326,207 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +def test_streaming_create_message_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + while True: + next(messages) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +def test_streaming_create_message_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + messages = client.messages.create( + max_tokens=1024, messages=messages, model="model", stream=True + ) + + next(messages) + next(messages) + list(islice(messages, 1)) + next(messages) + messages.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.parametrize( "send_default_pii, include_prompts", [ @@ -441,6 +643,209 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" +def test_stream_messages_next_consumption( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with pytest.raises(StopIteration), mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + while True: + next(stream) + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + +def test_stream_messages_iterator_methods( + sentry_init, + capture_events, + get_model_response, + server_side_event_chunks, +): + client = Anthropic(api_key="z") + + response = get_model_response( + server_side_event_chunks( + [ + MessageStartEvent( + message=EXAMPLE_MESSAGE, + type="message_start", + ), + ContentBlockStartEvent( + type="content_block_start", + index=0, + content_block=TextBlock(type="text", text=""), + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="Hi", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text="!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockDeltaEvent( + delta=TextDelta(text=" I'm Claude!", type="text_delta"), + index=0, + type="content_block_delta", + ), + ContentBlockStopEvent(type="content_block_stop", index=0), + MessageDeltaEvent( + delta=Delta(), + usage=MessageDeltaUsage(output_tokens=10), + type="message_delta", + ), + ] + ) + ) + + sentry_init( + integrations=[AnthropicIntegration(include_prompts=True)], + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + messages = [ + { + "role": "user", + "content": "Hello, Claude", + } + ] + + with mock.patch.object( + client._client, + "send", + return_value=response, + ) as _: + with start_transaction(name="anthropic"): + with client.messages.stream( + max_tokens=1024, + messages=messages, + model="model", + ) as stream: + next(stream) + next(stream) + list(islice(stream, 2)) + next(stream) + stream.close() + + assert len(events) == 1 + (event,) = events + + assert event["type"] == "transaction" + assert event["transaction"] == "anthropic" + + span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) + + assert span["op"] == OP.GEN_AI_CHAT + assert span["description"] == "chat model" + assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" + assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" + assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" + + assert ( + span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] + == '[{"role": "user", "content": "Hello, Claude"}]' + ) + assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi!" + + assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 20 + assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 30 + assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True + assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + + @pytest.mark.asyncio @pytest.mark.parametrize( "send_default_pii, include_prompts", From 7bd7ed42c9efb24eb94ae07ef4b9f76c8c44db44 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:00:50 +0100 Subject: [PATCH 02/15] version check and typing --- sentry_sdk/integrations/anthropic.py | 78 ++++++++++--------- .../integrations/anthropic/test_anthropic.py | 7 +- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index efb8638642..5a0b511708 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -120,9 +120,13 @@ def setup_once() -> None: MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( MessageStreamManager.__enter__ ) - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) - MessageStream.close = _wrap_message_stream_close(MessageStream.close) + + # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a + # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. + if version >= (0, 26, 2): + MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) + MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) + MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -779,7 +783,7 @@ def _wrap_stream_iter( output attributes on the AI Client Span and end the span. """ - def __iter__(self) -> "Iterator[RawMessageStreamEvent]": + def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": if not hasattr(self, "_span"): for event in f(self): yield event @@ -801,24 +805,26 @@ def _wrap_stream_next( Accumulates output data from the returned event. """ - def __next__(self) -> "RawMessageStreamEvent": + def __next__(self: "Stream") -> "RawMessageStreamEvent": _initialize_data_accumulation_state(self) try: event = f(self) except StopIteration: - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - ) - del self._span - raise + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) _accumulate_event_data(self, event) return event @@ -833,7 +839,7 @@ def _wrap_stream_close( Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self) -> None: + def close(self: "Stream") -> None: if not hasattr(self, "_span"): return f(self) @@ -979,7 +985,7 @@ def _wrap_message_stream_iter( output attributes on the AI Client Span and end the span. """ - def __iter__(self) -> "Iterator[MessageStreamEvent]": + def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": if not hasattr(self, "_span"): for event in f(self): yield event @@ -1001,24 +1007,26 @@ def _wrap_message_stream_next( Accumulates output data from the returned event. """ - def __next__(self) -> "MessageStreamEvent": + def __next__(self: "MessageStream") -> "MessageStreamEvent": _initialize_data_accumulation_state(self) try: event = f(self) except StopIteration: - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - ) - del self._span - raise + exc_info = sys.exc_info() + with capture_internal_exceptions(): + if not hasattr(self, "_span"): + raise + + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + ) + del self._span + reraise(*exc_info) _accumulate_event_data(self, event) return event @@ -1033,7 +1041,7 @@ def _wrap_message_stream_close( Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self) -> None: + def close(self: "MessageStream") -> None: if not hasattr(self, "_span"): return f(self) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index e3cde11b62..49c32abf75 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -13,6 +13,7 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic +from anthropic.lib.streaming import TextEvent from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -815,8 +816,10 @@ def test_stream_messages_iterator_methods( ) as stream: next(stream) next(stream) - list(islice(stream, 2)) - next(stream) + list(islice(stream, 1)) + # New versions add TextEvent, so consume one more event. + if isinstance(next(stream), TextEvent): + next(stream) stream.close() assert len(events) == 1 From c8a6ec306f1d413295b9dbbd213317fcc7c1241e Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:03:40 +0100 Subject: [PATCH 03/15] fix tests --- tests/integrations/anthropic/test_anthropic.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 49c32abf75..d9d5c0c30e 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -13,7 +13,6 @@ async def __call__(self, *args, **kwargs): from anthropic import Anthropic, AnthropicError, AsyncAnthropic -from anthropic.lib.streaming import TextEvent from anthropic.types import MessageDeltaUsage, TextDelta, Usage from anthropic.types.content_block_delta_event import ContentBlockDeltaEvent from anthropic.types.content_block_start_event import ContentBlockStartEvent @@ -30,6 +29,11 @@ async def __call__(self, *args, **kwargs): except ImportError: pass +try: + from anthropic.lib.streaming import TextEvent +except ImportError: + TextEvent = None + try: # 0.27+ from anthropic.types.raw_message_delta_event import Delta @@ -818,7 +822,7 @@ def test_stream_messages_iterator_methods( next(stream) list(islice(stream, 1)) # New versions add TextEvent, so consume one more event. - if isinstance(next(stream), TextEvent): + if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) stream.close() From 6f1d833b6d290bd413cee40d4afe55981615a2cc Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 12:07:01 +0100 Subject: [PATCH 04/15] missing types --- sentry_sdk/integrations/anthropic.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5a0b511708..27acdafbe4 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -711,35 +711,36 @@ def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any": return _sentry_patched_create_sync -def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]"): +def _initialize_data_accumulation_state(stream: "Union[Stream, MessageStream]") -> None: """ Initialize fields for accumulating output on the Stream instance. """ if not hasattr(stream, "_model"): stream._model = None stream._usage = _RecordedUsage() - stream._content_blocks: "list[str]" = [] + stream._content_blocks = [] stream._response_id = None def _accumulate_event_data( - self, event: "Union[RawMessageStreamEvent, MessageStreamEvent]" -): + stream: "Union[Stream, MessageStream]", + event: "Union[RawMessageStreamEvent, MessageStreamEvent]", +) -> None: """ Update accumulated output from a single stream event. """ (model, usage, content_blocks, response_id) = _collect_ai_data( event, - self._model, - self._usage, - self._content_blocks, - self._response_id, + stream._model, + stream._usage, + stream._content_blocks, + stream._response_id, ) - self._model = model - self._usage = usage - self._content_blocks = content_blocks - self._response_id = response_id + stream._model = model + stream._usage = usage + stream._content_blocks = content_blocks + stream._response_id = response_id def _finish_streaming_span( @@ -749,7 +750,7 @@ def _finish_streaming_span( usage: "_RecordedUsage", content_blocks: "list[str]", response_id: "Optional[str]", -): +) -> None: """ Set output attributes on the AI Client Span and end the span. """ From 40e2bf029dd609bb9b1ded5acb7ba78b052001cf Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 13:02:19 +0100 Subject: [PATCH 05/15] . --- sentry_sdk/integrations/anthropic.py | 2 +- tests/integrations/anthropic/test_anthropic.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 27acdafbe4..98d4b50ed9 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -123,7 +123,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if version >= (0, 26, 2): + if version is not None and version >= (0, 26, 2): MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index d9d5c0c30e..bf89037660 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -821,6 +821,7 @@ def test_stream_messages_iterator_methods( next(stream) next(stream) list(islice(stream, 1)) + next(stream) # New versions add TextEvent, so consume one more event. if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) From a3cc18ff19c59bd708abce2cd715b8bb59e0f2bd Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Mon, 16 Mar 2026 14:03:45 +0100 Subject: [PATCH 06/15] simplify --- sentry_sdk/integrations/anthropic.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 98d4b50ed9..4da1752895 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -123,7 +123,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. - if version is not None and version >= (0, 26, 2): + if not issubclass(MessageStream, Stream): MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) @@ -786,8 +786,7 @@ def _wrap_stream_iter( def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": if not hasattr(self, "_span"): - for event in f(self): - yield event + yield from f(self) return _initialize_data_accumulation_state(self) @@ -988,8 +987,7 @@ def _wrap_message_stream_iter( def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": if not hasattr(self, "_span"): - for event in f(self): - yield event + yield from f(self) return _initialize_data_accumulation_state(self) From 0aeec7262f09978d76cce84c7d9d5ee649b63a95 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 12:56:47 +0100 Subject: [PATCH 07/15] update tests --- tests/integrations/anthropic/test_anthropic.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 487c69e208..4fd3c48228 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -371,7 +371,7 @@ def test_streaming_create_message_next_consumption( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -431,6 +431,7 @@ def test_streaming_create_message_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] def test_streaming_create_message_iterator_methods( @@ -470,7 +471,7 @@ def test_streaming_create_message_iterator_methods( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -689,7 +690,7 @@ def test_stream_messages_next_consumption( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), @@ -750,6 +751,7 @@ def test_stream_messages_next_consumption( assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" + assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] def test_stream_messages_iterator_methods( @@ -789,7 +791,7 @@ def test_stream_messages_iterator_methods( ), ContentBlockStopEvent(type="content_block_stop", index=0), MessageDeltaEvent( - delta=Delta(), + delta=Delta(stop_reason="max_tokens"), usage=MessageDeltaUsage(output_tokens=10), type="message_delta", ), From b92db6df0e143f8b1b57c383852ed81dfb80a496 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:15:04 +0100 Subject: [PATCH 08/15] docstring --- sentry_sdk/integrations/anthropic.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 575ff26fd4..ed87a8a530 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -94,12 +94,15 @@ def setup_once() -> None: _check_minimum_version(AnthropicIntegration, version) """ - client.messages.create(stream=True) returns an instance of the Stream class, which implements the iterator protocol. + client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept streamed events. The streamed events are used to populate output attributes on the AI Client Span. - The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. """ Messages.create = _wrap_message_create(Messages.create) Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) @@ -109,12 +112,15 @@ def setup_once() -> None: AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() returns an instance of the MessageStream class, which implements the iterator protocol. + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept streamed events. The streamed events are used to populate output attributes on the AI Client Span. - The close() method is patched for situations in which the method is directly invoked by the user, and otherwise - the finally block in the __iter__ patch closes the span. + The span is finished in two possible places: + - When the user exits the context manager or directly calls close(), the patched close() ends the span. + - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + + Both paths may run, for example, when the iterator is exhausted and then the context manager exits. """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( From beb8f2c792b7b1108807e7d08d5b6fc080977178 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:26:45 +0100 Subject: [PATCH 09/15] docstrings --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index ed87a8a530..4cbaba0f4a 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -802,7 +802,8 @@ def _wrap_stream_iter( ) -> "Callable[..., Iterator[RawMessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": @@ -824,6 +825,7 @@ def _wrap_stream_next( ) -> "Callable[..., RawMessageStreamEvent]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ def __next__(self: "Stream") -> "RawMessageStreamEvent": @@ -858,7 +860,8 @@ def _wrap_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ def close(self: "Stream") -> None: From 7ec95fe46b97728d62717b4a9d84503e1fc3a6a1 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:28:38 +0100 Subject: [PATCH 10/15] docs --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 4cbaba0f4a..bdad51adef 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -1008,7 +1008,8 @@ def _wrap_message_stream_iter( ) -> "Callable[..., Iterator[MessageStreamEvent]]": """ Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and end the span. + output attributes on the AI Client Span and ends the span (unless the `close()` + or `__next__()` patches have already closed it). """ def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": @@ -1030,6 +1031,7 @@ def _wrap_message_stream_next( ) -> "Callable[..., MessageStreamEvent]": """ Accumulates output data from the returned event. + Closes the AI Client Span if `StopIteration` is raised. """ def __next__(self: "MessageStream") -> "MessageStreamEvent": @@ -1064,7 +1066,8 @@ def _wrap_message_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or + the except block in the `__next__()` patch runs first. """ def close(self: "MessageStream") -> None: From cda41e20c60ed25fc4a9fc99d62c90531c50ace6 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 13:53:52 +0100 Subject: [PATCH 11/15] review --- sentry_sdk/integrations/anthropic.py | 50 +++++++++++++--------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index bdad51adef..40afd82d3d 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -835,19 +835,17 @@ def __next__(self: "Stream") -> "RawMessageStreamEvent": except StopIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -870,6 +868,7 @@ def close(self: "Stream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return f(self) _finish_streaming_span( @@ -1041,19 +1040,17 @@ def __next__(self: "MessageStream") -> "MessageStreamEvent": except StopIteration: exc_info = sys.exc_info() with capture_internal_exceptions(): - if not hasattr(self, "_span"): - raise - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span + if hasattr(self, "_span"): + _finish_streaming_span( + self._span, + self._integration, + self._model, + self._usage, + self._content_blocks, + self._response_id, + self._finish_reason, + ) + del self._span reraise(*exc_info) _accumulate_event_data(self, event) @@ -1076,6 +1073,7 @@ def close(self: "MessageStream") -> None: if not hasattr(self, "_model"): self._span.__exit__(None, None, None) + del self._span return f(self) _finish_streaming_span( From 31869afeff1c812d127825e5602dd3754be716f3 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 15:37:28 +0100 Subject: [PATCH 12/15] simplify --- sentry_sdk/integrations/anthropic.py | 165 +++---------- .../integrations/anthropic/test_anthropic.py | 217 +----------------- 2 files changed, 34 insertions(+), 348 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 40afd82d3d..5c3f94b68f 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -95,32 +95,30 @@ def setup_once() -> None: """ client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. - The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept - streamed events. The streamed events are used to populate output attributes on the AI Client Span. + The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, + information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. - The span is finished in two possible places: - - When the user exits the context manager or directly calls close(), the patched close() ends the span. - - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + The span can be finished in two places: + - When the user exits the context manager or directly calls close(), the patched close() finishes the span. + - When iteration ends, the finally block in the _iterator wrapper finishes the span. - Both paths may run, for example, when the iterator is exhausted and then the context manager exits. + Both paths may run. For example, the context manager exit can follow iterator exhaustion. """ Messages.create = _wrap_message_create(Messages.create) - Stream.__iter__ = _wrap_stream_iter(Stream.__iter__) - Stream.__next__ = _wrap_stream_next(Stream.__next__) Stream.close = _wrap_stream_close(Stream.close) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. - The underlying stream can be consumed using either __iter__ or __next__, so both are patched to intercept - streamed events. The streamed events are used to populate output attributes on the AI Client Span. + client.messages.stream() can return an instance of the Stream class, which implements the iterator protocol. + The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, + information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. - The span is finished in two possible places: - - When the user exits the context manager or directly calls close(), the patched close() ends the span. - - When iteration ends, the finally block in the __iter__ patch or the except block in the __next__ patch finishes the span. + The span can be finished in two places: + - When the user exits the context manager or directly calls close(), the patched close() finishes the span. + - When iteration ends, the finally block in the _iterator wrapper finishes the span. - Both paths may run, for example, when the iterator is exhausted and then the context manager exits. + Both paths may run. For example, the context manager exit can follow iterator exhaustion. """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( @@ -130,8 +128,6 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. if not issubclass(MessageStream, Stream): - MessageStream.__iter__ = _wrap_message_stream_iter(MessageStream.__iter__) - MessageStream.__next__ = _wrap_message_stream_next(MessageStream.__next__) MessageStream.close = _wrap_message_stream_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) @@ -443,7 +439,6 @@ def _wrap_synchronous_message_iterator( Sets information received while iterating the response stream on the AI Client Span. Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. """ - generator_exit = False try: for event in iterator: # Message and content types are aliases for corresponding Raw* types, introduced in @@ -464,14 +459,9 @@ def _wrap_synchronous_message_iterator( _accumulate_event_data(stream, event) yield event - except ( - GeneratorExit - ): # https://docs.python.org/3/reference/expressions.html#generator.close - generator_exit = True - raise finally: with capture_internal_exceptions(): - if not generator_exit and hasattr(stream, "_span"): + if hasattr(stream, "_span"): _finish_streaming_span( stream._span, stream._integration, @@ -643,6 +633,13 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A if isinstance(result, Stream): result._span = span result._integration = integration + + _initialize_data_accumulation_state(result) + result._iterator = _wrap_synchronous_message_iterator( + result, + result._iterator, + ) + return result if isinstance(result, AsyncStream): @@ -797,63 +794,6 @@ def _finish_streaming_span( ) -def _wrap_stream_iter( - f: "Callable[..., Iterator[RawMessageStreamEvent]]", -) -> "Callable[..., Iterator[RawMessageStreamEvent]]": - """ - Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and ends the span (unless the `close()` - or `__next__()` patches have already closed it). - """ - - def __iter__(self: "Stream") -> "Iterator[RawMessageStreamEvent]": - if not hasattr(self, "_span"): - yield from f(self) - return - - _initialize_data_accumulation_state(self) - yield from _wrap_synchronous_message_iterator( - self, - f(self), - ) - - return __iter__ - - -def _wrap_stream_next( - f: "Callable[..., RawMessageStreamEvent]", -) -> "Callable[..., RawMessageStreamEvent]": - """ - Accumulates output data from the returned event. - Closes the AI Client Span if `StopIteration` is raised. - """ - - def __next__(self: "Stream") -> "RawMessageStreamEvent": - _initialize_data_accumulation_state(self) - try: - event = f(self) - except StopIteration: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - if hasattr(self, "_span"): - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span - reraise(*exc_info) - - _accumulate_event_data(self, event) - return event - - return __next__ - - def _wrap_stream_close( f: "Callable[..., None]", ) -> "Callable[..., None]": @@ -997,66 +937,15 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": stream._span = span stream._integration = integration - return stream - - return _sentry_patched_enter - - -def _wrap_message_stream_iter( - f: "Callable[..., Iterator[MessageStreamEvent]]", -) -> "Callable[..., Iterator[MessageStreamEvent]]": - """ - Accumulates output data while iterating. When the returned iterator ends, set - output attributes on the AI Client Span and ends the span (unless the `close()` - or `__next__()` patches have already closed it). - """ - - def __iter__(self: "MessageStream") -> "Iterator[MessageStreamEvent]": - if not hasattr(self, "_span"): - yield from f(self) - return - - _initialize_data_accumulation_state(self) - yield from _wrap_synchronous_message_iterator( - self, - f(self), + _initialize_data_accumulation_state(stream) + stream._iterator = _wrap_synchronous_message_iterator( + stream, + stream._iterator, ) - return __iter__ - - -def _wrap_message_stream_next( - f: "Callable[..., MessageStreamEvent]", -) -> "Callable[..., MessageStreamEvent]": - """ - Accumulates output data from the returned event. - Closes the AI Client Span if `StopIteration` is raised. - """ - - def __next__(self: "MessageStream") -> "MessageStreamEvent": - _initialize_data_accumulation_state(self) - try: - event = f(self) - except StopIteration: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - if hasattr(self, "_span"): - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span - reraise(*exc_info) - - _accumulate_event_data(self, event) - return event + return stream - return __next__ + return _sentry_patched_enter def _wrap_message_stream_close( diff --git a/tests/integrations/anthropic/test_anthropic.py b/tests/integrations/anthropic/test_anthropic.py index 4fd3c48228..2139d74a1b 100644 --- a/tests/integrations/anthropic/test_anthropic.py +++ b/tests/integrations/anthropic/test_anthropic.py @@ -1,7 +1,6 @@ import pytest from unittest import mock import json -from itertools import islice try: from unittest.mock import AsyncMock @@ -334,7 +333,7 @@ def test_streaming_create_message( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] -def test_streaming_create_message_next_consumption( +def test_streaming_create_message_close( sentry_init, capture_events, get_model_response, @@ -393,7 +392,7 @@ def test_streaming_create_message_next_consumption( } ] - with pytest.raises(StopIteration), mock.patch.object( + with mock.patch.object( client._client, "send", return_value=response, @@ -403,110 +402,9 @@ def test_streaming_create_message_next_consumption( max_tokens=1024, messages=messages, model="model", stream=True ) - while True: + for _ in range(4): next(messages) - assert len(events) == 1 - (event,) = events - - assert event["type"] == "transaction" - assert event["transaction"] == "anthropic" - - span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) - - assert span["op"] == OP.GEN_AI_CHAT - assert span["description"] == "chat model" - assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" - assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" - assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" - - assert ( - span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] - == '[{"role": "user", "content": "Hello, Claude"}]' - ) - assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" - - assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 - assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True - assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] - - -def test_streaming_create_message_iterator_methods( - sentry_init, - capture_events, - get_model_response, - server_side_event_chunks, -): - client = Anthropic(api_key="z") - - response = get_model_response( - server_side_event_chunks( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="max_tokens"), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] - ) - ) - - sentry_init( - integrations=[AnthropicIntegration(include_prompts=True)], - traces_sample_rate=1.0, - send_default_pii=True, - ) - events = capture_events() - - messages = [ - { - "role": "user", - "content": "Hello, Claude", - } - ] - - with mock.patch.object( - client._client, - "send", - return_value=response, - ) as _: - with start_transaction(name="anthropic"): - messages = client.messages.create( - max_tokens=1024, messages=messages, model="model", stream=True - ) - - next(messages) - next(messages) - list(islice(messages, 1)) - next(messages) messages.close() assert len(events) == 1 @@ -653,7 +551,7 @@ def test_stream_messages( assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] -def test_stream_messages_next_consumption( +def test_stream_messages_close( sentry_init, capture_events, get_model_response, @@ -712,7 +610,7 @@ def test_stream_messages_next_consumption( } ] - with pytest.raises(StopIteration), mock.patch.object( + with mock.patch.object( client._client, "send", return_value=response, @@ -723,114 +621,13 @@ def test_stream_messages_next_consumption( messages=messages, model="model", ) as stream: - while True: + for _ in range(4): next(stream) - assert len(events) == 1 - (event,) = events - - assert event["type"] == "transaction" - assert event["transaction"] == "anthropic" - - span = next(span for span in event["spans"] if span["op"] == OP.GEN_AI_CHAT) - - assert span["op"] == OP.GEN_AI_CHAT - assert span["description"] == "chat model" - assert span["data"][SPANDATA.GEN_AI_SYSTEM] == "anthropic" - assert span["data"][SPANDATA.GEN_AI_OPERATION_NAME] == "chat" - assert span["data"][SPANDATA.GEN_AI_REQUEST_MODEL] == "model" - - assert ( - span["data"][SPANDATA.GEN_AI_REQUEST_MESSAGES] - == '[{"role": "user", "content": "Hello, Claude"}]' - ) - assert span["data"][SPANDATA.GEN_AI_RESPONSE_TEXT] == "Hi! I'm Claude!" - - assert span["data"][SPANDATA.GEN_AI_USAGE_INPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS] == 10 - assert span["data"][SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS] == 20 - assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True - assert span["data"][SPANDATA.GEN_AI_RESPONSE_ID] == "msg_01XFDUDYJgAACzvnptvVoYEL" - assert span["data"][SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS] == ["max_tokens"] - - -def test_stream_messages_iterator_methods( - sentry_init, - capture_events, - get_model_response, - server_side_event_chunks, -): - client = Anthropic(api_key="z") - - response = get_model_response( - server_side_event_chunks( - [ - MessageStartEvent( - message=EXAMPLE_MESSAGE, - type="message_start", - ), - ContentBlockStartEvent( - type="content_block_start", - index=0, - content_block=TextBlock(type="text", text=""), - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="Hi", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text="!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockDeltaEvent( - delta=TextDelta(text=" I'm Claude!", type="text_delta"), - index=0, - type="content_block_delta", - ), - ContentBlockStopEvent(type="content_block_stop", index=0), - MessageDeltaEvent( - delta=Delta(stop_reason="max_tokens"), - usage=MessageDeltaUsage(output_tokens=10), - type="message_delta", - ), - ] - ) - ) - - sentry_init( - integrations=[AnthropicIntegration(include_prompts=True)], - traces_sample_rate=1.0, - send_default_pii=True, - ) - events = capture_events() - - messages = [ - { - "role": "user", - "content": "Hello, Claude", - } - ] - - with mock.patch.object( - client._client, - "send", - return_value=response, - ) as _: - with start_transaction(name="anthropic"): - with client.messages.stream( - max_tokens=1024, - messages=messages, - model="model", - ) as stream: - next(stream) - next(stream) - list(islice(stream, 1)) - next(stream) # New versions add TextEvent, so consume one more event. if TextEvent is not None and isinstance(next(stream), TextEvent): next(stream) + stream.close() assert len(events) == 1 From 1017e9eee15b2325934ee2e2cca8482db56a86dc Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 15:40:19 +0100 Subject: [PATCH 13/15] simplify --- sentry_sdk/integrations/anthropic.py | 44 ++++------------------------ 1 file changed, 5 insertions(+), 39 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 5c3f94b68f..514063f3f3 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -105,7 +105,7 @@ def setup_once() -> None: Both paths may run. For example, the context manager exit can follow iterator exhaustion. """ Messages.create = _wrap_message_create(Messages.create) - Stream.close = _wrap_stream_close(Stream.close) + Stream.close = _wrap_close(Stream.close) AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) @@ -128,7 +128,7 @@ def setup_once() -> None: # Before https://github.com/anthropics/anthropic-sdk-python/commit/b1a1c0354a9aca450a7d512fdbdeb59c0ead688a # MessageStream inherits from Stream, so patching Stream is sufficient on these versions. if not issubclass(MessageStream, Stream): - MessageStream.close = _wrap_message_stream_close(MessageStream.close) + MessageStream.close = _wrap_close(MessageStream.close) AsyncMessages.stream = _wrap_async_message_stream(AsyncMessages.stream) AsyncMessageStreamManager.__aenter__ = ( @@ -794,15 +794,14 @@ def _finish_streaming_span( ) -def _wrap_stream_close( +def _wrap_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or - the except block in the `__next__()` patch runs first. + Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ - def close(self: "Stream") -> None: + def close(self: "Union[Stream, MessageStream]") -> None: if not hasattr(self, "_span"): return f(self) @@ -948,39 +947,6 @@ def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream": return _sentry_patched_enter -def _wrap_message_stream_close( - f: "Callable[..., None]", -) -> "Callable[..., None]": - """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` or - the except block in the `__next__()` patch runs first. - """ - - def close(self: "MessageStream") -> None: - if not hasattr(self, "_span"): - return f(self) - - if not hasattr(self, "_model"): - self._span.__exit__(None, None, None) - del self._span - return f(self) - - _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, - ) - del self._span - - return f(self) - - return close - - def _wrap_async_message_stream(f: "Any") -> "Any": """ Attaches user-provided arguments to the returned context manager. From fcedba70cf97c2163883f19a46f840d9aadbd390 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 17 Mar 2026 15:59:35 +0100 Subject: [PATCH 14/15] . --- sentry_sdk/integrations/anthropic.py | 34 ++++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 514063f3f3..0592e337de 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -96,7 +96,7 @@ def setup_once() -> None: """ client.messages.create(stream=True) can return an instance of the Stream class, which implements the iterator protocol. The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, - information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. + information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. The span can be finished in two places: - When the user exits the context manager or directly calls close(), the patched close() finishes the span. @@ -110,9 +110,9 @@ def setup_once() -> None: AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() can return an instance of the Stream class, which implements the iterator protocol. + client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, - information from intercepted events are accumulated and used to populate output attributes on the AI Client Span. + information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. The span can be finished in two places: - When the user exits the context manager or directly calls close(), the patched close() finishes the span. @@ -463,13 +463,13 @@ def _wrap_synchronous_message_iterator( with capture_internal_exceptions(): if hasattr(stream, "_span"): _finish_streaming_span( - stream._span, - stream._integration, - stream._model, - stream._usage, - stream._content_blocks, - stream._response_id, - stream._finish_reason, + span=stream._span, + integration=stream._integration, + model=stream._model, + usage=stream._usage, + content_blocks=stream._content_blocks, + response_id=stream._response_id, + finish_reason=stream._finish_reason, ) del stream._span @@ -811,13 +811,13 @@ def close(self: "Union[Stream, MessageStream]") -> None: return f(self) _finish_streaming_span( - self._span, - self._integration, - self._model, - self._usage, - self._content_blocks, - self._response_id, - self._finish_reason, + span=self._span, + integration=self._integration, + model=self._model, + usage=self._usage, + content_blocks=self._content_blocks, + response_id=self._response_id, + finish_reason=self._finish_reason, ) del self._span From 13240def781c40095f5289aa73207c7ee7120d03 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Wed, 18 Mar 2026 14:08:05 +0100 Subject: [PATCH 15/15] docstrings --- sentry_sdk/integrations/anthropic.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 0592e337de..ea73cdbfdb 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -110,15 +110,7 @@ def setup_once() -> None: AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create) """ - client.messages.stream() can return an instance of the MessageStream class, which implements the iterator protocol. - The private _iterator variable and the close() method are patched. During iteration over the _iterator generator, - information from intercepted events is accumulated and used to populate output attributes on the AI Client Span. - - The span can be finished in two places: - - When the user exits the context manager or directly calls close(), the patched close() finishes the span. - - When iteration ends, the finally block in the _iterator wrapper finishes the span. - - Both paths may run. For example, the context manager exit can follow iterator exhaustion. + client.messages.stream() patches are analogous to the patches for client.messages.create(stream=True) described above. """ Messages.stream = _wrap_message_stream(Messages.stream) MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter( @@ -437,7 +429,7 @@ def _wrap_synchronous_message_iterator( ) -> "Iterator[Union[RawMessageStreamEvent, MessageStreamEvent]]": """ Sets information received while iterating the response stream on the AI Client Span. - Responsible for closing the AI Client Span, unless the span has already been closed in the close() patch. + Responsible for closing the AI Client Span unless the span has already been closed in the close() patch. """ try: for event in iterator: @@ -798,7 +790,7 @@ def _wrap_close( f: "Callable[..., None]", ) -> "Callable[..., None]": """ - Closes the AI Client Span, unless the finally block in `_wrap_synchronous_message_iterator()` runs first. + Closes the AI Client Span unless the finally block in `_wrap_synchronous_message_iterator()` runs first. """ def close(self: "Union[Stream, MessageStream]") -> None: