example extension for comparison based on utpilla's design#2267
example extension for comparison based on utpilla's design#2267gouslu wants to merge 11 commits intoopen-telemetry:mainfrom
Conversation
- Add BearerTokenProvider trait + BearerTokenProviderHandle in engine - Add Secret, BearerToken, BearerTokenError types - Add bearer auth extension (client-side only, periodic refresh) - Add Azure Identity auth extension with token cache + watch channel - Add azure-identity-auth-extension feature flag in contrib-nodes
| pub async fn get_token(&self) -> Result<BearerToken, BearerTokenError> { | ||
| self.inner.lock().await.get_token().await |
There was a problem hiding this comment.
We need async methods, but that forces the handle's mutex to also be async tokio::Mutex::lock().await. This serializes all callers: if component A is mid-way through an async call (e.g., an HTTP request) while holding the lock, component B must wait for A to fully complete before it can even start. Without the mutex, both calls would be concurrent — their I/O would overlap. In this extension concurrent calls may not matter, but any extension whose handle wraps an async I/O method will hit this limitation.
There was a problem hiding this comment.
Also, I think not having async methods is a deal-breaker.
There was a problem hiding this comment.
Also, I think not having async methods is a deal-breaker.
I agree that, with my approach in #2113, we don't have good support for async methods in the handles, which is covered well in your approach #2141. However, I don't know if we actually need async methods in the handle.
I like the clear separation of concerns, where the Extensions::start method is responsible for keeping the state updated (with async operations if required) but the handles consumed by the receivers, exporters etc. are only reading the updated state. I left a comment here with more details: #2267 (comment)
There was a problem hiding this comment.
I don't think that constraint is acceptable. What if you wanted to have a cache processor that uses different databases. Something like "KeyValueCacheProcessor" that uses "KeyValueCacheProvider", and you can implement it for "RedisKeyValueCacheProvider" and Amazon/Azure.. etc., you name it. Those providers can be extension capabilities and the cache processor doesn't have to know which tech it is using underneath. This would require async. I am just imagining scenarios, but things like that would be useful.
| /// Reads the latest token from a shared cache (updated by the extension's | ||
| /// background refresh loop) and exposes a watch channel for subscribers. | ||
| /// If the cache is empty or expired, falls back to fetching from Azure directly. | ||
| pub(crate) struct AzureTokenProvider { |
There was a problem hiding this comment.
This struct is in addition to extensions internal state on line 122, needed for state sharing, because lifecycle and extension are 2 different instances/structs. This causes redundancy.
There was a problem hiding this comment.
Here's a way you can implement a functionally equivalent implementation of BearerToken extension with the handle approach without requiring the extension author to add any Mutex or redundant code:
Check these files:
rust/otap-dataflow/crates/engine/src/extensions/bearer_token.rs(Capability trait and handle definition)rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs(Concrete implementation)
| let token_sender = Arc::new(token_sender); | ||
| let token_cache_for_demonstration = Arc::new(Mutex::new(None)); | ||
|
|
||
| let provider = AzureTokenProvider { |
There was a problem hiding this comment.
The compiler will error if you move the same value into both structs, but it won't stop you from creating independent instances with separate state. The author must understand that Arc::clone is needed to share state — the type system doesn't enforce it. So, the comment about handle based design not being error-prone in that regard is not entirely correct. It is more explicit, I will accept it, but it is still error-prone.
| /// Creates a new handle wrapping the given authenticator implementation. | ||
| pub fn new(auth: impl ServerAuthenticator + 'static) -> Self { | ||
| Self { | ||
| inner: Arc::new(Mutex::new(Box::new(auth))), |
There was a problem hiding this comment.
The tokio::sync::Mutex on the handle creates an additional layer of indirection. Benchmarks show ~6ns per call (lock + unlock) compared to <1ns for a direct vtable call — roughly 6x overhead. This is small in absolute terms, but worth noting if an extension method is called in a hot path (e.g., per-request auth header injection).
|
I think all the concerns in this PR trace back to one design question: should handle methods be async? My position is no. The way I'm modeling it, handle is a read endpoint. Its only job is to return cached state that the extension's background loop already produced. All async work (token acquisition, retries, refresh scheduling) belongs in This isn't a novel constraint — the Go Collector's auth extension interfaces follow the same principle. With that context, here's how the specific concerns in this PR play out: On tokio Mutex serializing the callers The On The In #2113, the equivalent struct is: struct AzureIdentityClientAuth {
token_rx: watch::Receiver<Option<HeaderValue>>,
}One field. No credentials, no scope, no cache, no fallback fetching. It reads from the watch channel and returns the cached header. The "redundancy" was introduced by making the handle async, not by the handle-based pattern itself. On "same error-proneness" / Arc::clone: The In #2113, there's no let (token_tx, token_rx) = watch::channel(None);
let auth = AzureIdentityClientAuth { token_rx };
let extension = AzureIdentityAuthExtension { token_tx, .. };Nothing to clone, nothing to get wrong. On 6ns per call is not a meaningful factor in a system where a single request involves socket I/O, deserialization, pipeline processing, and serialization — orders of magnitude more expensive. This shouldn't be a deciding factor for the architecture. |
I was able to implement this and nothing stopped me from implementing my extension like I did in this PR. If you are going to put all these constraints and principles to authors to "not make mistakes", and make "good design decisions" then why not just put one simple constraint like I did in my design: "if you are going to share the state, put On mutex cost, 6ns is best case. Under production load, this will likely be much higher, due to CPU cache pressure. Also, there is no real mechanism that protects anyone from doing anything multi-threaded, anyone could still run If the argument is to say that, never share state between lifecycle and extension, isn't that a constraint you are putting on the developer? Same principal can be applied to the design proposal I made; however I wouldn't go that far. I would only say, |
# Change Summary This PR adds a design proposal describing the extension system for the **OTel Dataflow Engine**. The document introduces a capability-based extension architecture allowing receivers, processors, and exporters to access non-pdata functionality through well-defined capability interfaces maintained in the engine core. The proposal covers: * core concepts such as **capabilities**, **extension providers**, and **extension instances** * integration of extensions into the **existing configuration model** * the **user experience** for declaring extensions and binding capabilities * the **developer experience** for implementing extension providers * the **runtime architecture** for resolving and instantiating extensions * the **execution models** supported by extensions (local vs shared) * comparison with the **Go Collector extension model** * a **phased evolution plan** (native extensions → hierarchical placement → WASM extensions) * implementation recommendations for building **high-performance extensions aligned with the engine's thread-per-core design** The goal of this document is to provide maintainers with a clear architectural proposal to review before implementing the extension system. ## What issue does this PR close? * Related to #2267, #2230, #2141, #2113 ## How are these changes tested? This PR introduces **documentation only** and does not modify runtime code. ## Are there any user-facing changes? Yes. This proposal describes a **future extension system** that will introduce new configuration capabilities such as: * an `extensions` section in pipeline configurations * a `capabilities` section in node definitions These changes are not implemented yet but outline the intended user-facing configuration model for extensions. --------- Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
Change Summary
What issue does this PR close?
How are these changes tested?
Are there any user-facing changes?