Skip to content

Conversation

@GaneshPatil7517
Copy link
Contributor

Which issue does this PR close?

Closes #18150
Fixes #18149

Rationale for this change

The previous implementation called try_plan_async_exprs() at three separate locations (projection, filter, aggregation), each with its own match arms handling PlanAsyncExpr::Sync vs PlanAsyncExpr::Async. This caused:

  1. Duplication - The same pattern repeated three times
  2. Cognitive overhead - Understanding async UDF handling required reading all three locations
  3. Bug Async UDF rewrite input to Aggregate breaks UDAFs #18149 - Aggregation passed the wrong schema to AggregateExec, causing expression/schema mismatch

What changes are included in this PR?

Introduces two helper methods that consolidate async UDF handling:

  • maybe_wrap_async_exec() — for plain expressions (filter, aggregation)
  • maybe_wrap_async_exec_with_names() — for projection expressions with aliases

Both methods:

  1. Scan expressions for async UDFs
  2. If found, wrap the input with AsyncFuncExec
  3. Return the (possibly wrapped) input and rewritten expressions

This makes each call site a simple function call instead of a multi-arm match.

Removed code

  • try_plan_async_exprs() method
  • PlannedExprResult enum
  • PlanAsyncExpr enum

How this fixes #18149

The bug occurred because aggregation expressions were rewritten to reference __async_fn_0@N, but AggregateExec was created with the original schema (which didn't include async columns).

The fix uses input_exec.schema() after wrapping with AsyncFuncExec, ensuring the aggregate sees a schema that includes the async columns.

Are these changes tested?

Existing async_udf.slt tests cover async UDF usage in projections, filters, and aggregations. Physical plan outputs remain unchanged.

Are there any user-facing changes?

No. This is an internal refactor with no public API changes.

Replace scattered try_plan_async_exprs() calls with two focused helpers:
- maybe_wrap_async_exec(): for plain expressions
- maybe_wrap_async_exec_with_names(): for projections

This simplifies Projection, Filter, and Aggregation handling,
and fixes apache#18149 by using the correct input schema for aggregates.

Removes PlannedExprResult, PlanAsyncExpr enums and try_plan_async_exprs().
@github-actions github-actions bot added the core Core DataFusion crate label Jan 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor Async UDF rewriting in physical planner Async UDF rewrite input to Aggregate breaks UDAFs

1 participant