Skip to content

example extension for comparison based on utpilla's design#2267

Closed
gouslu wants to merge 11 commits intoopen-telemetry:mainfrom
gouslu:gouslu/utpilla-handle-based-design
Closed

example extension for comparison based on utpilla's design#2267
gouslu wants to merge 11 commits intoopen-telemetry:mainfrom
gouslu:gouslu/utpilla-handle-based-design

Conversation

@gouslu
Copy link
Copy Markdown
Contributor

@gouslu gouslu commented Mar 11, 2026

Change Summary

What issue does this PR close?

  • Closes #NNN

How are these changes tested?

Are there any user-facing changes?

utpilla and others added 11 commits February 25, 2026 23:43
- Add BearerTokenProvider trait + BearerTokenProviderHandle in engine
- Add Secret, BearerToken, BearerTokenError types
- Add bearer auth extension (client-side only, periodic refresh)
- Add Azure Identity auth extension with token cache + watch channel
- Add azure-identity-auth-extension feature flag in contrib-nodes
@github-actions github-actions bot added the rust Pull requests that update Rust code label Mar 11, 2026
Comment on lines +209 to +210
pub async fn get_token(&self) -> Result<BearerToken, BearerTokenError> {
self.inner.lock().await.get_token().await
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need async methods, but that forces the handle's mutex to also be async tokio::Mutex::lock().await. This serializes all callers: if component A is mid-way through an async call (e.g., an HTTP request) while holding the lock, component B must wait for A to fully complete before it can even start. Without the mutex, both calls would be concurrent — their I/O would overlap. In this extension concurrent calls may not matter, but any extension whose handle wraps an async I/O method will hit this limitation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think not having async methods is a deal-breaker.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think not having async methods is a deal-breaker.

I agree that, with my approach in #2113, we don't have good support for async methods in the handles, which is covered well in your approach #2141. However, I don't know if we actually need async methods in the handle.

I like the clear separation of concerns, where the Extensions::start method is responsible for keeping the state updated (with async operations if required) but the handles consumed by the receivers, exporters etc. are only reading the updated state. I left a comment here with more details: #2267 (comment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that constraint is acceptable. What if you wanted to have a cache processor that uses different databases. Something like "KeyValueCacheProcessor" that uses "KeyValueCacheProvider", and you can implement it for "RedisKeyValueCacheProvider" and Amazon/Azure.. etc., you name it. Those providers can be extension capabilities and the cache processor doesn't have to know which tech it is using underneath. This would require async. I am just imagining scenarios, but things like that would be useful.

/// Reads the latest token from a shared cache (updated by the extension's
/// background refresh loop) and exposes a watch channel for subscribers.
/// If the cache is empty or expired, falls back to fetching from Azure directly.
pub(crate) struct AzureTokenProvider {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct is in addition to extensions internal state on line 122, needed for state sharing, because lifecycle and extension are 2 different instances/structs. This causes redundancy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a way you can implement a functionally equivalent implementation of BearerToken extension with the handle approach without requiring the extension author to add any Mutex or redundant code:

utpilla#4

Check these files:

  • ‎rust/otap-dataflow/crates/engine/src/extensions/bearer_token.rs (Capability trait and handle definition)
  • ‎rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs (Concrete implementation)

let token_sender = Arc::new(token_sender);
let token_cache_for_demonstration = Arc::new(Mutex::new(None));

let provider = AzureTokenProvider {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler will error if you move the same value into both structs, but it won't stop you from creating independent instances with separate state. The author must understand that Arc::clone is needed to share state — the type system doesn't enforce it. So, the comment about handle based design not being error-prone in that regard is not entirely correct. It is more explicit, I will accept it, but it is still error-prone.

/// Creates a new handle wrapping the given authenticator implementation.
pub fn new(auth: impl ServerAuthenticator + 'static) -> Self {
Self {
inner: Arc::new(Mutex::new(Box::new(auth))),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tokio::sync::Mutex on the handle creates an additional layer of indirection. Benchmarks show ~6ns per call (lock + unlock) compared to <1ns for a direct vtable call — roughly 6x overhead. This is small in absolute terms, but worth noting if an extension method is called in a hot path (e.g., per-request auth header injection).

@utpilla
Copy link
Copy Markdown
Contributor

utpilla commented Mar 11, 2026

I think all the concerns in this PR trace back to one design question: should handle methods be async?

My position is no. The way I'm modeling it, handle is a read endpoint. Its only job is to return cached state that the extension's background loop already produced. All async work (token acquisition, retries, refresh scheduling) belongs in start(). Consumers never do I/O through the handle; they read what's already there.

This isn't a novel constraint — the Go Collector's auth extension interfaces follow the same principle. Server.Authenticate, HTTPClient.RoundTripper, and GRPCClient.PerRPCCredentials are all sync. The gRPC receiver calls Authenticate synchronously inside its interceptor, and the exporter calls PerRPCCredentials synchronously during setup. Background goroutines handle all the async I/O; the consumer-facing interfaces return cached state.

With that context, here's how the specific concerns in this PR play out:

On tokio Mutex serializing the callers

The BearerTokenProvider trait introduced here has async fn get_token(), which forces the handle to use tokio::sync::Mutex (std::sync::Mutex can't be held across .await). Yes, that serializes callers, if one is mid-way through an async operation, all others wait. However, keeping handles sync avoids this entirely.

On AzureTokenProvider being "redundant":

The AzureTokenProvider here has 4 fields (credential, scope, token_sender, token_cache_for_demonstration) and includes a fallback path that fetches tokens directly from Azure on a cache miss. That complexity comes from the async get_token() design choice — the handle needs enough state to perform I/O on demand.

In #2113, the equivalent struct is:

struct AzureIdentityClientAuth {
    token_rx: watch::Receiver<Option<HeaderValue>>,
}

One field. No credentials, no scope, no cache, no fallback fetching. It reads from the watch channel and returns the cached header. The "redundancy" was introduced by making the handle async, not by the handle-based pattern itself.

On "same error-proneness" / Arc::clone:

The Arc::clone usage here is needed because the extension and the provider both need shared access to token_sender and token_cache_for_demonstration — again, because the handle does its own I/O and needs the full credential state.

In #2113, there's no Arc::clone for state sharing. The factory calls watch::channel(), which produces two endpoints by construction — the sender moves into the extension, the receiver moves into the handle:

let (token_tx, token_rx) = watch::channel(None);
let auth = AzureIdentityClientAuth { token_rx };
let extension = AzureIdentityAuthExtension { token_tx, .. };

Nothing to clone, nothing to get wrong.

On tokio::Mutex overhead (6ns vs <1ns):

6ns per call is not a meaningful factor in a system where a single request involves socket I/O, deserialization, pipeline processing, and serialization — orders of magnitude more expensive. This shouldn't be a deciding factor for the architecture.

@gouslu
Copy link
Copy Markdown
Contributor Author

gouslu commented Mar 11, 2026

I think all the concerns in this PR trace back to one design question: should handle methods be async?

My position is no. The way I'm modeling it, handle is a read endpoint. Its only job is to return cached state that the extension's background loop already produced. All async work (token acquisition, retries, refresh scheduling) belongs in start(). Consumers never do I/O through the handle; they read what's already there.

This isn't a novel constraint — the Go Collector's auth extension interfaces follow the same principle. Server.Authenticate, HTTPClient.RoundTripper, and GRPCClient.PerRPCCredentials are all sync. The gRPC receiver calls Authenticate synchronously inside its interceptor, and the exporter calls PerRPCCredentials synchronously during setup. Background goroutines handle all the async I/O; the consumer-facing interfaces return cached state.

With that context, here's how the specific concerns in this PR play out:

On tokio Mutex serializing the callers

The BearerTokenProvider trait introduced here has async fn get_token(), which forces the handle to use tokio::sync::Mutex (std::sync::Mutex can't be held across .await). Yes, that serializes callers, if one is mid-way through an async operation, all others wait. However, keeping handles sync avoids this entirely.

On AzureTokenProvider being "redundant":

The AzureTokenProvider here has 4 fields (credential, scope, token_sender, token_cache_for_demonstration) and includes a fallback path that fetches tokens directly from Azure on a cache miss. That complexity comes from the async get_token() design choice — the handle needs enough state to perform I/O on demand.

In #2113, the equivalent struct is:

struct AzureIdentityClientAuth {
    token_rx: watch::Receiver<Option<HeaderValue>>,
}

One field. No credentials, no scope, no cache, no fallback fetching. It reads from the watch channel and returns the cached header. The "redundancy" was introduced by making the handle async, not by the handle-based pattern itself.

On "same error-proneness" / Arc::clone:

The Arc::clone usage here is needed because the extension and the provider both need shared access to token_sender and token_cache_for_demonstration — again, because the handle does its own I/O and needs the full credential state.

In #2113, there's no Arc::clone for state sharing. The factory calls watch::channel(), which produces two endpoints by construction — the sender moves into the extension, the receiver moves into the handle:

let (token_tx, token_rx) = watch::channel(None);
let auth = AzureIdentityClientAuth { token_rx };
let extension = AzureIdentityAuthExtension { token_tx, .. };

Nothing to clone, nothing to get wrong.

On tokio::Mutex overhead (6ns vs <1ns):

6ns per call is not a meaningful factor in a system where a single request involves socket I/O, deserialization, pipeline processing, and serialization — orders of magnitude more expensive. This shouldn't be a deciding factor for the architecture.

I was able to implement this and nothing stopped me from implementing my extension like I did in this PR. If you are going to put all these constraints and principles to authors to "not make mistakes", and make "good design decisions" then why not just put one simple constraint like I did in my design: "if you are going to share the state, put Arc<> around it. and be aware that your extensions will be cloned for every consumer". With this knowledge, in the design I propose, you have so much more freedom and simplicity.

On mutex cost, 6ns is best case. Under production load, this will likely be much higher, due to CPU cache pressure.

Also, there is no real mechanism that protects anyone from doing anything multi-threaded, anyone could still run tokio::task::spawn_blocking and use your handle in it. A lot of the "mistakes" in the code has to be guarded by PR review; this is not something new and PRs are there for a reason. So, the one design flaw that you pointed out in the design I proposed can easily be managed by documentation and PR reviews. This is the same level of discipline we already apply to multi-threading, memory safety patterns, and API usage across the codebase.

If the argument is to say that, never share state between lifecycle and extension, isn't that a constraint you are putting on the developer? Same principal can be applied to the design proposal I made; however I wouldn't go that far. I would only say, Arc<> wrap your shared fields, and the rest is business as usual.

github-merge-queue bot pushed a commit that referenced this pull request Mar 18, 2026
# Change Summary

This PR adds a design proposal describing the extension system for the
**OTel Dataflow Engine**.

The document introduces a capability-based extension architecture
allowing receivers, processors, and exporters to access non-pdata
functionality through well-defined capability interfaces maintained in
the engine core.

The proposal covers:

* core concepts such as **capabilities**, **extension providers**, and
**extension instances**
* integration of extensions into the **existing configuration model**
* the **user experience** for declaring extensions and binding
capabilities
* the **developer experience** for implementing extension providers
* the **runtime architecture** for resolving and instantiating
extensions
* the **execution models** supported by extensions (local vs shared)
* comparison with the **Go Collector extension model**
* a **phased evolution plan** (native extensions → hierarchical
placement → WASM extensions)
* implementation recommendations for building **high-performance
extensions aligned with the engine's thread-per-core design**

The goal of this document is to provide maintainers with a clear
architectural proposal to review before implementing the extension
system.

## What issue does this PR close?

* Related to #2267, #2230, #2141, #2113 

## How are these changes tested?

This PR introduces **documentation only** and does not modify runtime
code.

## Are there any user-facing changes?

Yes.

This proposal describes a **future extension system** that will
introduce new configuration capabilities such as:

* an `extensions` section in pipeline configurations
* a `capabilities` section in node definitions

These changes are not implemented yet but outline the intended
user-facing configuration model for extensions.

---------

Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
@gouslu gouslu closed this Mar 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants