Skip to content

Added concurrency helpers to retrieve task results#890

Draft
agronholm wants to merge 28 commits intomasterfrom
enhanced-taskgroup
Draft

Added concurrency helpers to retrieve task results#890
agronholm wants to merge 28 commits intomasterfrom
enhanced-taskgroup

Conversation

@agronholm
Copy link
Owner

Changes

Adds an enhanced version of the task group that allows task-by-task cancellation as well as awaiting on the results of individual tasks. Two other convenience functions are also provided:

  • amap(): calls the given one-parameter coroutine function with each item from the given iterable of arguments and runs them concurrently in a task group
  • race() launches all given coroutines as tasks in a task group and returns the return value of whichever task completes first

Concurrency and rate limiting is provided by both functions.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.

Updating the changelog

If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #123, the entry should look like this:

- Fix big bad boo-boo in task groups
  (`#123 <https://github.com/agronholm/anyio/issues/123>`_; PR by @yourgithubaccount)

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

return get_async_backend().create_task_group()


class TaskHandle(Generic[T]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a TaskHandle could be used to implement a create_future() on the EnhancedTaskGroup, to have something like asyncio's Future.
What do you think?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exact addition are you suggesting?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say something like that:

class EnhancedTaskGroup:
    def create_future(self) -> TaskHandle[T]:
        handle = TaskHandle[T]()
        return handle

But now I realize a Future is just a TaskHandle, so there's nothing to do?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures are a lower level primitive, not tied to any task, unlike TaskHandle. And a Future reports the exception it receives verbatim while TaskHandle handles base exceptions in a special manner.

@davidbrochart
Copy link
Collaborator

A bit far-fetched, but what do you think of a free function create_task() that would use the current EnhancedTaskGroup if there is one, and errors out otherwise?
On one hand this is going against structured concurrency, but on the other hand it removes the need to pass a task group down the stack, when you know there must be one.

kwargs: Mapping[str, Any] | None = None,
) -> TaskHandle[T]:
handle = TaskHandle[T]()
handle._start_value = await self._task_group.start(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be able to await the start value in the TaskHandle, I guess we should wrap start() with a create_task()?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean. The start value will already be available in the TaskHandle once start() returns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah indeed 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the TaskHandle was returned immediately, but it's returned only when the task has started. Which means we cannot e.g. cancel the task before it has started. Not sure if that should be allowed?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really need to do that, you can just cancel an outer cancel scope.

@smurfix
Copy link
Collaborator

smurfix commented Mar 19, 2025

it removes the need to pass a task group down the stack,

On the other hand, that's the point of passing down the taskgroup -- when it's not there we know that the method we're calling won't start things it doesn't wait for.

If you want to circumvent that, for whatever reason, there's already a fairly-high-performance way to do it -- set a contextvar. So IMHO "explicit is better than implicit" and thus we shouldn't support that natively.

@agronholm agronholm changed the title Added EnhancedTaskGroup, amap() and race() Added EnhancedTaskGroup, amap() and as_completed() Mar 20, 2025
@agronholm agronholm force-pushed the enhanced-taskgroup branch from 6a9bec2 to 33f6919 Compare March 20, 2025 17:27
@agronholm
Copy link
Owner Author

A bit far-fetched, but what do you think of a free function create_task() that would use the current EnhancedTaskGroup if there is one, and errors out otherwise? On one hand this is going against structured concurrency, but on the other hand it removes the need to pass a task group down the stack, when you know there must be one.

I'm -1 on an implicit task group.

@dhirschfeld
Copy link

The one thing I'm after is a nursery/TaskGroup where I can start tasks and then iterate over the results asynchronously as soon as they become available so that I can interleave work and don't have to wait for the slowest task to start the next part of the pipeline.

With this API, race is trivial to implement - you just cancel the TaskGroup as soon as the first task is made available. gather is just collect all results in a list of length n_inputs in the order they were provided then return the results list when the TaskGroup exits.

@agronholm
Copy link
Owner Author

With this API, race is trivial to implement

And how do we deal with exceptions occurring in the child tasks?

@dhirschfeld
Copy link

dhirschfeld commented Apr 15, 2025

With this API, race is trivial to implement

And how do we deal with exceptions occurring in the child tasks?

In my implementation I actually return Outcome instances, so the user processes the Outcome's as they're made available. It's then up to the user to decide what to do with any errors. If they blindly unwrap an error it will raise and cancel the TaskGroup.

It would of course be nice to have this built-in, so I'm lurking here to see if I can replace my own custom solution. For me it's important to be able to process tasks as soon as they're finished and to not have to wait for the slowest.

@agronholm
Copy link
Owner Author

Ok, so help me understand. What do you suggest race() returns then? What if the first result from a child task is an exception? Do you want to return the Outcome (or equivalent) of that?

@dhirschfeld
Copy link

I think that depends on the use-case. If amap returned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implement race themselves with the semantics that made sense for their problem.

Given the amap primitive, race becomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.

@agronholm
Copy link
Owner Author

agronholm commented Apr 21, 2025

I think that depends on the use-case. If amap returned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implement race themselves with the semantics that made sense for their problem.

Given the amap primitive, race becomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.

But as_completed() does exactly that (with TaskHandles), doesn't it? I don't think amap() is suitable for implementing race().

@dhirschfeld
Copy link

I think that depends on the use-case. If amap returned an asynchronous iterator that returned Outcome wrapped results as soon as they were ready I'd leave it at that and let users implement race themselves with the semantics that made sense for their problem.
Given the amap primitive, race becomes trivial to implement however you want so there's no need to provide an implementation which may not be ideal for all use-cases.

But as_completed() does exactly that (with TaskHandles), doesn't it? I don't think amap() is suitable for implementing race().

Ah yep, I missed the as_completed implementation - that does look like it does what I'm after!

@dhirschfeld
Copy link

I'm actually pretty excited about this functionality as it will let me replace a bunch of custom code, so I'm curious if there are plans to land this in a release sometime soonish?

@agronholm
Copy link
Owner Author

I think I'll stop spamming this PR and make a separate one for rate limiting, as it's not tied to the improved concurrency API.

@agronholm
Copy link
Owner Author

For interested parties, here it is: #989

@agronholm
Copy link
Owner Author

agronholm commented Oct 3, 2025

Here's what I understand our options are:

1. Expose a new task group class (EnhancedTaskGroup or whatever)

Pros:

  • It's a new API and we're free to shape it however we like

Cons:

  • It's a new API which we have to support for the foreseeable future
  • Having two distinct task groups complicates things for users too

2. Expose only the concurrency helpers but not a new task group API

Pros:

  • Users get increased convenience without any duplication

Cons:

  • The convenience functions don't cater to all use cases
  • Only being able to retrieve task results via these concurrency helpers feels odd from an API design perspective

3. Shoehorn task handles into the existing API

Pros:

  • Low-level APIs that actually let us use the existing task groups to return task results

Cons:

  • Backwards compatibility will be tricky, particularly with the start() method which will likely require an extra keyword argument to return a task handle instead of the task_status.started() value
  • On Trio, we always end up wrapping the target coroutine function in start_soon() as there is no other way to get the return value
  • If we mess this up, it will hurt both us and the users

@agronholm
Copy link
Owner Author

For option 3, it would probably look something like this:

class TaskGroup: # never mind the name or inheritance here
    def start_soon(
        self,
        func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
        *args: Unpack[PosArgsT],
        name: object = None,
    ) -> TaskHandle[T]:
        ...

    @overload
    async def start(
        self,
        func: Callable[..., Awaitable[Any]],
        *args: object,
        name: object = None,
        return_handle: Literal[False] = False,
    ) -> Any:
        ...

    @overload
    async def start(
        self,
        func: Callable[..., Awaitable[T]],
        *args: object,
        name: object = None,
        return_handle: Literal[True],
    ) -> TaskHandle[T]:
        ...

@Graeme22
Copy link
Contributor

Graeme22 commented Oct 3, 2025

My order of preference would be 2, 3, 1. I bet the majority of users who are asking for ways to collect task results would be satisfied with some combination of gather/amap/whatever. It's also a very low-cost option that wouldn't break anything and can be adapted to future API changes easily.

@agronholm
Copy link
Owner Author

My own order of preference is 3, 2, 1. @Graeme22 what objections do you have against option 3? I feel that 2) is only kicking the can down the road. We need a final solution at some point.

@Graeme22
Copy link
Contributor

Graeme22 commented Oct 3, 2025

My own order of preference is 3, 2, 1. @Graeme22 what objections do you have against option 3? I feel that 2) is only kicking the can down the road. We need a final solution at some point.

I don't have any strong objections to 3. If it's likely to be a breaking change anyway, maybe start could just return a tuple[Any, TaskHandle[T]]?

Something like this could also be considered, sort of a combination of 1 and 3 that maintains backwards compatibility:

@classmethod
@override
def create_task_group(cls, collect_results: Literal[False] = False) -> TaskGroup: ...

@classmethod
@override
def create_task_group(cls, collect_results: Literal[True]) -> ResultCollectingTaskGroup: ...

class ResultCollectingTaskGroup:
    def start_soon(
        self,
        func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
        *args: Unpack[PosArgsT],
        name: object = None,
    ) -> TaskHandle[T]:

    async def start(
        self,
        func: Callable[..., Awaitable[Any]],
        *args: object,
        name: object = None,
    ) -> tuple[Any, TaskHandle[T]]:

Regardless, I think the utility functions should definitely be present (maybe this was implied already idk)

@agronholm
Copy link
Owner Author

My own order of preference is 3, 2, 1. @Graeme22 what objections do you have against option 3? I feel that 2) is only kicking the can down the road. We need a final solution at some point.

I don't have any strong objections to 3. If it's likely to be a breaking change anyway, maybe start could just return a tuple[Any, TaskHandle[T]]?

Wait, what? Who said anything about a breaking change?

@Graeme22
Copy link
Contributor

Graeme22 commented Oct 3, 2025

My own order of preference is 3, 2, 1. @Graeme22 what objections do you have against option 3? I feel that 2) is only kicking the can down the road. We need a final solution at some point.

I don't have any strong objections to 3. If it's likely to be a breaking change anyway, maybe start could just return a tuple[Any, TaskHandle[T]]?

Wait, what? Who said anything about a breaking change?

I just assumed it was possible when you said backwards compatibility would be tricky

@agronholm
Copy link
Owner Author

agronholm commented Oct 3, 2025

My own order of preference is 3, 2, 1. @Graeme22 what objections do you have against option 3? I feel that 2) is only kicking the can down the road. We need a final solution at some point.

I don't have any strong objections to 3. If it's likely to be a breaking change anyway, maybe start could just return a tuple[Any, TaskHandle[T]]?

Wait, what? Who said anything about a breaking change?

I just assumed it was possible when you said backwards compatibility would be tricky

What I meant was that the API becomes clunkier with the shoehorning of this functionality to what was basically copied from Trio's nurseries. But in no event do I want to compromise the compatibility guarantees.

@agronholm
Copy link
Owner Author

One more thing: TaskHandle is already capable of storing the start value of a task, so a tuple is not necessary.

@smurfix
Copy link
Collaborator

smurfix commented Oct 4, 2025

If it's likely to be a breaking change anyway

There's a difference between "we need to change some detail and there's a deprecation notice and grace period where both work and everything" (like we had with Event.set and friends) and "we want to change something major, it's suddenly incompatible and you need to update your complete codebase all at once".

The latter is not happening. Unless it's for a very important overriding reason, which this clearly is not.

@agronholm
Copy link
Owner Author

agronholm commented Oct 4, 2025

What bothers me a lot still is the start() mechanism where we cannot establish type safety for the start value. I have been unable to figure out a clearly better alternative for the task_status.started() call. The only idea I came up with is a contextvar-based mechanism where you call a free function that sets an event in the contextvar, but there is a downside for that, namely that it would allow starting functions that have no intention of ever calling that free function. Plus it would still not give us type safety.

If async generators could return a value like normal generators can, that would make for an interesting alternative.

@agronholm
Copy link
Owner Author

OTOH, if we assume that we don't need the return value from such a task, then async generators would be a viable alternative, as you can still return from them without a value. I'll whip up a test branch for that.

@smurfix
Copy link
Collaborator

smurfix commented Oct 4, 2025

The Trio people have been discussing this for more than two years now. python-trio/trio#2633

Though I have to say that our discussion here feels a bit more constructive.

@smurfix
Copy link
Collaborator

smurfix commented Oct 4, 2025

Another advantage of # 3 is (most probably) that it's the least amount of additional work. # 1 will lead to either some code duplication or some additional method calls (including annoying wrappers to catch the results; while the user doesn't see them any more they'll be still there and slow things down). # 2 definitely requires task wrappers.

@agronholm
Copy link
Owner Author

agronholm commented Oct 7, 2025

I have a new branch, concurrency-helpers which contains a reimagined implementation of the task group. It deals solely with coroutine objects and offers start() like functionality using an async generator rather than a keyword argument, thus offering a type safe way to provide a start value.

Additionally, its start_task() method takes a concurrency limiter and a rate limiter, thus adding native support for these features. It still provides a synchronous create_task() method which works just like its asyncio counterpart.

@agronholm
Copy link
Owner Author

Another advantage of # 3 is (most probably) that it's the least amount of additional work. # 1 will lead to either some code duplication or some additional method calls (including annoying wrappers to catch the results; while the user doesn't see them any more they'll be still there and slow things down). # 2 definitely requires task wrappers.

No matter how we implement it, task wrappers will be needed as there is otherwise no way to extract the return value on Trio.

@smurfix
Copy link
Collaborator

smurfix commented Oct 8, 2025

No matter how we implement it, task wrappers will be needed as there is otherwise no way to extract the return value on Trio.

… assuming the Trio people don't also go that route.

python-trio/trio#2633

Maybe you want to weigh in there.

@agronholm
Copy link
Owner Author

No matter how we implement it, task wrappers will be needed as there is otherwise no way to extract the return value on Trio.

… assuming the Trio people don't also go that route.

python-trio/trio#2633

Maybe you want to weigh in there.

Frankly I don't think this is going to be resolved upstream any time soon. But if Trio gets this feature, we have the option of changing our implementation behind the scenes. Plus my latest change would still need the wrapper for releasing the concurrency limiter.

@smurfix
Copy link
Collaborator

smurfix commented Oct 8, 2025

Frankly I don't think this is going to be resolved upstream any time soon

You may have a point here. On the other hand, maybe they'll follow your lead for once …

@agronholm agronholm modified the milestones: 4.12, 4.13 Nov 28, 2025
@Graeme22
Copy link
Contributor

Graeme22 commented Dec 29, 2025

Any updates on this? It seems like option 3 was acceptable to everyone who's commented thus far, it could look like:

def start_soon(
    self,
    func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
    *args: Unpack[PosArgsT],
    name: object = None,
) -> Future[T]:
    ...

@overload
async def start(
    self,
    func: Callable[..., Awaitable[Any]],
    *args: object,
    name: object = None,
    return_future: Literal[False] = False,
) -> Any:
    ...

@overload
async def start(
    self,
    func: Callable[..., Awaitable[T]],
    *args: object,
    name: object = None,
    return_future: Literal[True],
) -> Tuple[Future[T], Any]:
    ...

@agronholm
Copy link
Owner Author

Any updates on this? It seems like option 3 was acceptable to everyone who's commented thus far, it could look like:

def start_soon(
    self,
    func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
    *args: Unpack[PosArgsT],
    name: object = None,
) -> Future[T]:
    ...

@overload
async def start(
    self,
    func: Callable[..., Awaitable[Any]],
    *args: object,
    name: object = None,
    return_future: Literal[False] = False,
) -> Any:
    ...

@overload
async def start(
    self,
    func: Callable[..., Awaitable[T]],
    *args: object,
    name: object = None,
    return_future: Literal[True],
) -> Tuple[Future[T], Any]:
    ...

So the thing is, after an extensive discussion, there was a loose consensus that we wanted to include rate limiting in these enhanced task groups as that is not properly doable from the outside. But that effort got stuck as there was no real consensus on how the rate limiters should work. I have a couple related PRs open. I'm not sure how to get unstuck with this.

@Graeme22
Copy link
Contributor

So the thing is, after an extensive discussion, there was a loose consensus that we wanted to include rate limiting in these enhanced task groups as that is not properly doable from the outside. But that effort got stuck as there was no real consensus on how the rate limiters should work. I have a couple related PRs open. I'm not sure how to get unstuck with this.

Couldn't the rate limiter code be separated out so we can get the task results implementation worked out?

@agronholm
Copy link
Owner Author

So the thing is, after an extensive discussion, there was a loose consensus that we wanted to include rate limiting in these enhanced task groups as that is not properly doable from the outside. But that effort got stuck as there was no real consensus on how the rate limiters should work. I have a couple related PRs open. I'm not sure how to get unstuck with this.

Couldn't the rate limiter code be separated out so we can get the task results implementation worked out?

If the rate limiters can be added later w/o breaking the API. There is also the possibility of using the rate limiting interface w/o a concrete implementation, though I think that would be an odd thing to provide.

@agronholm agronholm removed this from the 4.13 milestone Feb 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants