Skip to content

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Jan 8, 2026

This PR is related to the discussion I started on the devlist and which allows you to natively execute async code on PythonOperators.

There is also an AIP for this: https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-98%3A+Rethinking+deferrable+operators%2C+async+hooks+and+performance+in+Airflow+3

Below an example which show you how it can be used with async hooks:

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
		 # Do something with result or accumulate it and return it as an XCom

This PR will fix additional remarks made by @kaxil on the original PR which has been reverted.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@dabla dabla changed the title Feature/async python operator Add support for async callables in PythonOperator Jan 8, 2026
@dabla dabla requested a review from uranusjr January 9, 2026 08:41
@dabla dabla force-pushed the feature/async-python-operator branch from 9124c9a to cd4ec26 Compare January 9, 2026 12:58
@dabla dabla closed this Jan 9, 2026
@dabla dabla force-pushed the feature/async-python-operator branch from ebf2873 to 0b341e6 Compare January 9, 2026 13:14
…epends on modified comms supervisor which cannot be backported to older Airflow versions

Add support for async callables in PythonOperator (apache#59087)

* refactor: Implemented BaseAsyncOperator in task-sdk

* refactor: Now PythonOperator extends BaseAsyncOperator

* refactor: Also implement BaseAsyncOperator in common-compat provider to support older Airflow versions

---------

Co-authored-by: Jason(Zhe-You) Liu <[email protected]>
(cherry picked from commit 9cab6fb)
@dabla dabla reopened this Jan 9, 2026
@dabla dabla force-pushed the feature/async-python-operator branch from 597f08a to f829c3e Compare January 10, 2026 08:58
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there, once newsfragment is added we are good

@dabla
Copy link
Contributor Author

dabla commented Jan 13, 2026

Almost there, once newsfragment is added we are good

And the newfragment ;-)

@dabla
Copy link
Contributor Author

dabla commented Jan 13, 2026

@kaxil If newsfragment is ok for you then I think it's finally ready to merge.

@kaxil
Copy link
Member

kaxil commented Jan 13, 2026

Static check is failing, worth looking at https://github.com/apache/airflow/blob/main/airflow-core/newsfragments/54505.significant.rst or other examples

@dabla
Copy link
Contributor Author

dabla commented Jan 14, 2026

Since rebase of PR branch I now get following error, which is strange as the 'use next version' is defined in toml file:

common.compat changed with providers: ['standard']
common.compat provider changed but the following providers don't have '# use next version' comment for their common-compat dependency!

  - standard (providers/standard/pyproject.toml)

When common.compat changes with other providers in the same PR, add '# use next version' comment where they depend on common-compat.
Example: "apache-airflow-providers-common-compat>=1.8.0",  # use next version

To bypass this check, add the label: 'skip common compat check'

Error: Process completed with exit code 1.

@dabla dabla requested a review from kaxil January 14, 2026 08:29
@kaxil
Copy link
Member

kaxil commented Jan 14, 2026

@dabla I think that was because the versions were bumped for the release yesterday: #60437

So you'd need to update the following line:

"apache-airflow-providers-common-compat",

to add # use next version

@dabla
Copy link
Contributor Author

dabla commented Jan 14, 2026

@dabla I think that was because the versions were bumped for the release yesterday: #60437

So you'd need to update the following line:

"apache-airflow-providers-common-compat",

to add # use next version

Yes indeed, cause I knew I added it, but due to rebase it was gone again as versions where indeed bumped. Re-added it now so hopefully we will be fine.

@dabla
Copy link
Contributor Author

dabla commented Jan 15, 2026

@kaxil last build was fine, I suppose we can merge now?

@kaxil kaxil merged commit faf847c into apache:main Jan 15, 2026
129 checks passed
Comment on lines +69 to +77
if not AIRFLOW_V_3_1_PLUS:

@property
def xcom_push(self) -> bool:
return self.do_xcom_push

@xcom_push.setter
def xcom_push(self, value: bool):
self.do_xcom_push = value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part correct @dabla ? It's breaking my AF2 dags, when installing standard provider from main. Should it have another if/elif for AF2?

Example dag:

from datetime import datetime

from airflow.providers.common.compat.assets import Asset
from airflow.providers.standard.operators.python import PythonOperator

from airflow import DAG


def do_nothing():
    pass

DAG_ID = "test_dag"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False,
    default_args={"retries": 0},
) as dag:

    task = PythonOperator(
        task_id="task",
        python_callable=do_nothing,
        inlets=[Asset(uri="s3://bucket2/dir2/file2.txt"), Asset(uri="s3://bucket2/dir2/file3.txt")],
    )

Error:

[2026-01-16, 14:20:00 UTC] {__init__.py:77} DEBUG - Lineage called with inlets: [Dataset(uri='s3://bucket2/dir2/file2.txt', extra=None), Dataset(uri='s3://bucket2/dir2/file3.txt', extra=None)], outlets: []
[2026-01-16, 14:20:00 UTC] {taskinstance.py:3336} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 282, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 3194, in _execute_task_with_callbacks
    self.task.post_execute(context=context, result=result)
  File "/usr/local/lib/python3.12/site-packages/airflow/lineage/__init__.py", line 88, in wrapper
    self.xcom_push(context, key=PIPELINE_INLETS, value=inlets)
TypeError: 'bool' object is not callable

Full logs:

dag_id=test_dag_run_id=manual__2026-01-16T14_19_59.788247+00_00_task_id=task_attempt=1.log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @kacpermuda, thanks for reporting this is indeed an issue will check asap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open PR for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants