-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Issue 19781 : Internal error: Assertion failed: !self.finished: LimitedBatchCoalescer #19785
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Issue 19781 : Internal error: Assertion failed: !self.finished: LimitedBatchCoalescer #19785
Conversation
| let poll; | ||
| let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone(); | ||
| loop { | ||
| // If there is any completed batch ready, return it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any -> a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
| RecordBatchStream, SendableRecordBatchStream, Statistics, | ||
| }; | ||
| use crate::coalesce::LimitedBatchCoalescer; | ||
| use crate::coalesce::PushBatchStatus::LimitReached; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PushBatchStatus is imported below, so LimitReached variant could be used as PushBatchStatus::LimitReached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
| } | ||
| Err(e) => { | ||
| poll = Poll::Ready(Some(Err(e))); | ||
| LimitReached => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| LimitReached => { | |
| PushBatchStatus::LimitReached => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
| poll = Poll::Ready(Some(Err(e))); | ||
| LimitReached => { | ||
| // limit was reached, so stop early | ||
| self.batch_coalescer.finish()? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self.batch_coalescer.finish()? | |
| self.batch_coalescer.finish()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
| Ok(()) | ||
| } | ||
|
|
||
| pub fn is_finished(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pub fn is_finished(&self) -> bool { | |
| pub(crate) fn is_finished(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crate visibility added
|
Thanks @martin-g for the review! |
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps @Dandandan or @alamb can take a look since it seems its related to this PR?
|
|
||
| # tests with target partition set to 1 | ||
| statement ok | ||
| set datafusion.execution.target_partitions = '1'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we reset these configs after this test? Just so if tests are added after this they don't inherit these config settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intentionally included the comment:
tests with target partition set to 1
to mark that all subsequent tests use this configuration.
I couldn't find a method to reset this setting (I'm unsure what the initial value was, and there's no obvious way to preserve and reapply it?).
Looking at other slt files, I noticed that some settings are similarly modified without being reset—see order.slt as an example.
If there's a standard approach for handling this, I'd be happy to implement it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can move this to a separate SLT file that has this configuration as a kind of preamble?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with these settings, as people are observing, is that they affect all subsequent tests
I like @pepijnve 's suggestion -- let's put this in a separate test. Perhaps something like limit_single_row_batches.slt
Using the setup of one target_partition and batch sizes of one to test all the limiting corner cases is a great idea. We should probably add some more basic cases to cover some other operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created an additional slt file as requested and explicitly added some comments where tests with target partitions set to 1 should start.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @bert-beyondloops @Jefffrey and @pepijnve and @martin-g
This is some great engineering.
I reviewed this PR carefully and I think it correct, makes sense, and fixes the issue. I also spent some time trying to understand how our existing test coverage missed it (theory below)
I do think it would be nice to move the test to its own file, but we could also do that as a follow on PR too
If anyone else is curious, the error from the reproducer before this PR is
set datafusion.execution.target_partitions = '1';
set datafusion.execution.batch_size = '1';
create table test (i INTEGER) as values (1), (2);
select * from test where i <> 0 limit 1;Internal error: Assertion failed: !self.finished: LimitedBatchCoalescer: cannot push batch after finish.
This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues
Here is the plan
> explain format indent select * from test where i <> 0 limit 1;
+---------------+-----------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=1 |
| | Filter: test.i != Int32(0) |
| | TableScan: test projection=[i] |
| physical_plan | FilterExec: i@0 != 0, fetch=1 |
| | DataSourceExec: partitions=1, partition_sizes=[2] |
| | |
+---------------+-----------------------------------------------------+
2 row(s) fetched.
Elapsed 0.006 seconds.|
|
||
| # tests with target partition set to 1 | ||
| statement ok | ||
| set datafusion.execution.target_partitions = '1'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with these settings, as people are observing, is that they affect all subsequent tests
I like @pepijnve 's suggestion -- let's put this in a separate test. Perhaps something like limit_single_row_batches.slt
Using the setup of one target_partition and batch sizes of one to test all the limiting corner cases is a great idea. We should probably add some more basic cases to cover some other operators
| return self.metrics.baseline_metrics.record_poll(poll); | ||
| } | ||
|
|
||
| if self.batch_coalescer.is_finished() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the key change -- specifically that on subsequent calls to poll_next() we just drain the coalescer and don't get the next input / try and put it in.
Previously, if the input returns another batch, the FilterExec will try and add it to the BatchCoalescer (which is what triggers the assert)
This new code correctly drains the coalescer state 👍 .
I suspect that we haven't seen this on other queries because most/all the ExecutionPlans that can feed a FilterExec also implement Limit. Thus when the FilterExec calls poll_next on the input, the input returns None and no additional batch is pushed to the BatchCoalescer.
The default batch size for the memory exec means that it will most often only return a single batch.
I can't really figure out why setting the number of target partitions to 1 makes any difference (though I verified it is required to trigger the reproducer)
Perhaps the reason @bert-beyondloops saw this in his system is that you have some custom execution plan (that doesn't implement limit pushdown 🤔 ). This is fine I am just trying to explain why we haven't hit this issue before / in our other tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We indeed specify target partition 1 for some specific case since we know upfront our input is sorted and we do not want overhead of a SortMergeExec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the target partition set to 1, you get another physical plan (with multiple partitions) and where the limit is foreseen on another type of Exec.
| if self.batch_coalescer.is_finished() { | ||
| // If input is done and no batches are ready, return None to signal end of stream. | ||
| let poll = Poll::Ready(None); | ||
| return self.metrics.baseline_metrics.record_poll(poll); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I don't think record_poll is needed here (it only records information when there is a batch). There is nothing wrong with this call, but it is somewhat confusing as there are other paths below that return without also calling record_poll)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, this wasn't consistent. But honestly, you know this (it only records information when there is a batch) by looking at the internals of the record_poll.
In my opinion, you should call the record_poll everywhere but this is another discussion as such :-)
I'll adapt.
a9f3798 to
fc37c87
Compare
|
Looks good to me ❤️ |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's give this until tomorrow to allow time for others to comment and then I'll plan to merge it in assuming nothing else comes up
Thank you again @bert-beyondloops
Which issue does this PR close?
PR will close issue #19781.
Rationale for this change
Fixes the internal error
What changes are included in this PR?
The code change is inspired by the
CoalesceBatchesStreamimplementation.Are these changes tested?
Additional sqllogictest written in limit.slt which triggered the issue before the fix.
Are there any user-facing changes?
No