Added concurrency helpers to retrieve task results#890
Added concurrency helpers to retrieve task results#890
Conversation
| return get_async_backend().create_task_group() | ||
|
|
||
|
|
||
| class TaskHandle(Generic[T]): |
There was a problem hiding this comment.
It looks like a TaskHandle could be used to implement a create_future() on the EnhancedTaskGroup, to have something like asyncio's Future.
What do you think?
There was a problem hiding this comment.
What exact addition are you suggesting?
There was a problem hiding this comment.
I was going to say something like that:
class EnhancedTaskGroup:
def create_future(self) -> TaskHandle[T]:
handle = TaskHandle[T]()
return handleBut now I realize a Future is just a TaskHandle, so there's nothing to do?
There was a problem hiding this comment.
Futures are a lower level primitive, not tied to any task, unlike TaskHandle. And a Future reports the exception it receives verbatim while TaskHandle handles base exceptions in a special manner.
|
A bit far-fetched, but what do you think of a free function |
| kwargs: Mapping[str, Any] | None = None, | ||
| ) -> TaskHandle[T]: | ||
| handle = TaskHandle[T]() | ||
| handle._start_value = await self._task_group.start( |
There was a problem hiding this comment.
If we want to be able to await the start value in the TaskHandle, I guess we should wrap start() with a create_task()?
There was a problem hiding this comment.
I don't understand what you mean. The start value will already be available in the TaskHandle once start() returns.
There was a problem hiding this comment.
I thought that the TaskHandle was returned immediately, but it's returned only when the task has started. Which means we cannot e.g. cancel the task before it has started. Not sure if that should be allowed?
There was a problem hiding this comment.
If you really need to do that, you can just cancel an outer cancel scope.
On the other hand, that's the point of passing down the taskgroup -- when it's not there we know that the method we're calling won't start things it doesn't wait for. If you want to circumvent that, for whatever reason, there's already a fairly-high-performance way to do it -- set a contextvar. So IMHO "explicit is better than implicit" and thus we shouldn't support that natively. |
6a9bec2 to
33f6919
Compare
I'm -1 on an implicit task group. |
|
The one thing I'm after is a nursery/TaskGroup where I can start tasks and then iterate over the results asynchronously as soon as they become available so that I can interleave work and don't have to wait for the slowest task to start the next part of the pipeline. With this API, |
And how do we deal with exceptions occurring in the child tasks? |
In my implementation I actually return It would of course be nice to have this built-in, so I'm lurking here to see if I can replace my own custom solution. For me it's important to be able to process tasks as soon as they're finished and to not have to wait for the slowest. |
|
Ok, so help me understand. What do you suggest |
|
I think that depends on the use-case. If Given the |
this doesn't help with the refcycles
But |
Ah yep, I missed the |
|
I'm actually pretty excited about this functionality as it will let me replace a bunch of custom code, so I'm curious if there are plans to land this in a release sometime soonish? |
|
I think I'll stop spamming this PR and make a separate one for rate limiting, as it's not tied to the improved concurrency API. |
|
For interested parties, here it is: #989 |
|
Here's what I understand our options are: 1. Expose a new task group class ( Pros:
Cons:
2. Expose only the concurrency helpers but not a new task group API Pros:
Cons:
3. Shoehorn task handles into the existing API Pros:
Cons:
|
|
For option 3, it would probably look something like this: class TaskGroup: # never mind the name or inheritance here
def start_soon(
self,
func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
*args: Unpack[PosArgsT],
name: object = None,
) -> TaskHandle[T]:
...
@overload
async def start(
self,
func: Callable[..., Awaitable[Any]],
*args: object,
name: object = None,
return_handle: Literal[False] = False,
) -> Any:
...
@overload
async def start(
self,
func: Callable[..., Awaitable[T]],
*args: object,
name: object = None,
return_handle: Literal[True],
) -> TaskHandle[T]:
... |
|
My order of preference would be 2, 3, 1. I bet the majority of users who are asking for ways to collect task results would be satisfied with some combination of gather/amap/whatever. It's also a very low-cost option that wouldn't break anything and can be adapted to future API changes easily. |
|
My own order of preference is 3, 2, 1. @Graeme22 what objections do you have against option 3? I feel that 2) is only kicking the can down the road. We need a final solution at some point. |
I don't have any strong objections to 3. If it's likely to be a breaking change anyway, maybe start could just return a Something like this could also be considered, sort of a combination of 1 and 3 that maintains backwards compatibility: @classmethod
@override
def create_task_group(cls, collect_results: Literal[False] = False) -> TaskGroup: ...
@classmethod
@override
def create_task_group(cls, collect_results: Literal[True]) -> ResultCollectingTaskGroup: ...
class ResultCollectingTaskGroup:
def start_soon(
self,
func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
*args: Unpack[PosArgsT],
name: object = None,
) -> TaskHandle[T]:
async def start(
self,
func: Callable[..., Awaitable[Any]],
*args: object,
name: object = None,
) -> tuple[Any, TaskHandle[T]]:Regardless, I think the utility functions should definitely be present (maybe this was implied already idk) |
Wait, what? Who said anything about a breaking change? |
I just assumed it was possible when you said backwards compatibility would be tricky |
What I meant was that the API becomes clunkier with the shoehorning of this functionality to what was basically copied from Trio's nurseries. But in no event do I want to compromise the compatibility guarantees. |
|
One more thing: |
There's a difference between "we need to change some detail and there's a deprecation notice and grace period where both work and everything" (like we had with The latter is not happening. Unless it's for a very important overriding reason, which this clearly is not. |
|
What bothers me a lot still is the If async generators could return a value like normal generators can, that would make for an interesting alternative. |
|
OTOH, if we assume that we don't need the return value from such a task, then async generators would be a viable alternative, as you can still |
|
The Trio people have been discussing this for more than two years now. python-trio/trio#2633 Though I have to say that our discussion here feels a bit more constructive. |
|
Another advantage of # 3 is (most probably) that it's the least amount of additional work. # 1 will lead to either some code duplication or some additional method calls (including annoying wrappers to catch the results; while the user doesn't see them any more they'll be still there and slow things down). # 2 definitely requires task wrappers. |
|
I have a new branch, Additionally, its |
No matter how we implement it, task wrappers will be needed as there is otherwise no way to extract the return value on Trio. |
… assuming the Trio people don't also go that route. Maybe you want to weigh in there. |
Frankly I don't think this is going to be resolved upstream any time soon. But if Trio gets this feature, we have the option of changing our implementation behind the scenes. Plus my latest change would still need the wrapper for releasing the concurrency limiter. |
You may have a point here. On the other hand, maybe they'll follow your lead for once … |
|
Any updates on this? It seems like option 3 was acceptable to everyone who's commented thus far, it could look like: def start_soon(
self,
func: Callable[[Unpack[PosArgsT]], Awaitable[T]],
*args: Unpack[PosArgsT],
name: object = None,
) -> Future[T]:
...
@overload
async def start(
self,
func: Callable[..., Awaitable[Any]],
*args: object,
name: object = None,
return_future: Literal[False] = False,
) -> Any:
...
@overload
async def start(
self,
func: Callable[..., Awaitable[T]],
*args: object,
name: object = None,
return_future: Literal[True],
) -> Tuple[Future[T], Any]:
... |
So the thing is, after an extensive discussion, there was a loose consensus that we wanted to include rate limiting in these enhanced task groups as that is not properly doable from the outside. But that effort got stuck as there was no real consensus on how the rate limiters should work. I have a couple related PRs open. I'm not sure how to get unstuck with this. |
Couldn't the rate limiter code be separated out so we can get the task results implementation worked out? |
If the rate limiters can be added later w/o breaking the API. There is also the possibility of using the rate limiting interface w/o a concrete implementation, though I think that would be an odd thing to provide. |
Changes
Adds an enhanced version of the task group that allows task-by-task cancellation as well as awaiting on the results of individual tasks. Two other convenience functions are also provided:
amap(): calls the given one-parameter coroutine function with each item from the given iterable of arguments and runs them concurrently in a task grouprace()launches all given coroutines as tasks in a task group and returns the return value of whichever task completes firstConcurrency and rate limiting is provided by both functions.
Checklist
If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):
tests/) added which would fail without your patchdocs/, in case of behavior changes or newfeatures)
docs/versionhistory.rst).If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.
Updating the changelog
If there are no entries after the last release, use
**UNRELEASED**as the version.If, say, your patch fixes issue #123, the entry should look like this:
If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.