-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Add support for async callables in PythonOperator #60268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9124c9a to
cd4ec26
Compare
ebf2873 to
0b341e6
Compare
…epends on modified comms supervisor which cannot be backported to older Airflow versions Add support for async callables in PythonOperator (apache#59087) * refactor: Implemented BaseAsyncOperator in task-sdk * refactor: Now PythonOperator extends BaseAsyncOperator * refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions --------- Co-authored-by: Jason(Zhe-You) Liu <[email protected]> (cherry picked from commit 9cab6fb)
…andard/operators.py Co-authored-by: Kaxil Naik <[email protected]>
597f08a to
f829c3e
Compare
…nstead of NotImplementedError
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there, once newsfragment is added we are good
And the newfragment ;-) |
|
@kaxil If newsfragment is ok for you then I think it's finally ready to merge. |
|
Static check is failing, worth looking at https://github.com/apache/airflow/blob/main/airflow-core/newsfragments/54505.significant.rst or other examples |
|
Since rebase of PR branch I now get following error, which is strange as the 'use next version' is defined in toml file: |
|
@dabla I think that was because the versions were bumped for the release yesterday: #60437 So you'd need to update the following line: airflow/providers/standard/pyproject.toml Line 77 in 33cc0ac
to add |
Yes indeed, cause I knew I added it, but due to rebase it was gone again as versions where indeed bumped. Re-added it now so hopefully we will be fine. |
|
@kaxil last build was fine, I suppose we can merge now? |
| if not AIRFLOW_V_3_1_PLUS: | ||
|
|
||
| @property | ||
| def xcom_push(self) -> bool: | ||
| return self.do_xcom_push | ||
|
|
||
| @xcom_push.setter | ||
| def xcom_push(self, value: bool): | ||
| self.do_xcom_push = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part correct @dabla ? It's breaking my AF2 dags, when installing standard provider from main. Should it have another if/elif for AF2?
Example dag:
from datetime import datetime
from airflow.providers.common.compat.assets import Asset
from airflow.providers.standard.operators.python import PythonOperator
from airflow import DAG
def do_nothing():
pass
DAG_ID = "test_dag"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
default_args={"retries": 0},
) as dag:
task = PythonOperator(
task_id="task",
python_callable=do_nothing,
inlets=[Asset(uri="s3://bucket2/dir2/file2.txt"), Asset(uri="s3://bucket2/dir2/file3.txt")],
)
Error:
[2026-01-16, 14:20:00 UTC] {__init__.py:77} DEBUG - Lineage called with inlets: [Dataset(uri='s3://bucket2/dir2/file2.txt', extra=None), Dataset(uri='s3://bucket2/dir2/file3.txt', extra=None)], outlets: []
[2026-01-16, 14:20:00 UTC] {taskinstance.py:3336} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 282, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3194, in _execute_task_with_callbacks
self.task.post_execute(context=context, result=result)
File "/usr/local/lib/python3.12/site-packages/airflow/lineage/__init__.py", line 88, in wrapper
self.xcom_push(context, key=PIPELINE_INLETS, value=inlets)
TypeError: 'bool' object is not callable
Full logs:
dag_id=test_dag_run_id=manual__2026-01-16T14_19_59.788247+00_00_task_id=task_attempt=1.log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @kacpermuda, thanks for reporting this is indeed an issue will check asap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open PR for this.
This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.
There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3
Below an example which show you how it can be used with async hooks:
This PR will fix additional remarks made by @kaxil on the original PR which has been reverted.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.