Skip to content

Conversation

@bert-beyondloops
Copy link
Contributor

Which issue does this PR close?

PR will close issue #19781.

Rationale for this change

Fixes the internal error

What changes are included in this PR?

The code change is inspired by the CoalesceBatchesStream implementation.

Are these changes tested?

Additional sqllogictest written in limit.slt which triggered the issue before the fix.

Are there any user-facing changes?

No

@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Jan 13, 2026
let poll;
let elapsed_compute = self.metrics.baseline_metrics.elapsed_compute().clone();
loop {
// If there is any completed batch ready, return it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any -> a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::coalesce::LimitedBatchCoalescer;
use crate::coalesce::PushBatchStatus::LimitReached;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushBatchStatus is imported below, so LimitReached variant could be used as PushBatchStatus::LimitReached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

}
Err(e) => {
poll = Poll::Ready(Some(Err(e)));
LimitReached => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LimitReached => {
PushBatchStatus::LimitReached => {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

poll = Poll::Ready(Some(Err(e)));
LimitReached => {
// limit was reached, so stop early
self.batch_coalescer.finish()?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.batch_coalescer.finish()?
self.batch_coalescer.finish()?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

Ok(())
}

pub fn is_finished(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn is_finished(&self) -> bool {
pub(crate) fn is_finished(&self) -> bool {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crate visibility added

@bert-beyondloops
Copy link
Contributor Author

Thanks @martin-g for the review!

Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @Dandandan or @alamb can take a look since it seems its related to this PR?


# tests with target partition set to 1
statement ok
set datafusion.execution.target_partitions = '1';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we reset these configs after this test? Just so if tests are added after this they don't inherit these config settings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally included the comment:

tests with target partition set to 1

to mark that all subsequent tests use this configuration.

I couldn't find a method to reset this setting (I'm unsure what the initial value was, and there's no obvious way to preserve and reapply it?).
Looking at other slt files, I noticed that some settings are similarly modified without being reset—see order.slt as an example.
If there's a standard approach for handling this, I'd be happy to implement it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can move this to a separate SLT file that has this configuration as a kind of preamble?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with these settings, as people are observing, is that they affect all subsequent tests

I like @pepijnve 's suggestion -- let's put this in a separate test. Perhaps something like limit_single_row_batches.slt

Using the setup of one target_partition and batch sizes of one to test all the limiting corner cases is a great idea. We should probably add some more basic cases to cover some other operators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created an additional slt file as requested and explicitly added some comments where tests with target partitions set to 1 should start.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @bert-beyondloops @Jefffrey and @pepijnve and @martin-g

This is some great engineering.

I reviewed this PR carefully and I think it correct, makes sense, and fixes the issue. I also spent some time trying to understand how our existing test coverage missed it (theory below)

I do think it would be nice to move the test to its own file, but we could also do that as a follow on PR too

If anyone else is curious, the error from the reproducer before this PR is

set datafusion.execution.target_partitions = '1';
set datafusion.execution.batch_size = '1';
create table test (i INTEGER) as values (1), (2);
select * from test where i <> 0 limit 1;

Internal error: Assertion failed: !self.finished: LimitedBatchCoalescer: cannot push batch after finish.
This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues

Here is the plan

> explain format indent select * from test where i <> 0 limit 1;
+---------------+-----------------------------------------------------+
| plan_type     | plan                                                |
+---------------+-----------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=1                              |
|               |   Filter: test.i != Int32(0)                        |
|               |     TableScan: test projection=[i]                  |
| physical_plan | FilterExec: i@0 != 0, fetch=1                       |
|               |   DataSourceExec: partitions=1, partition_sizes=[2] |
|               |                                                     |
+---------------+-----------------------------------------------------+
2 row(s) fetched.
Elapsed 0.006 seconds.


# tests with target partition set to 1
statement ok
set datafusion.execution.target_partitions = '1';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with these settings, as people are observing, is that they affect all subsequent tests

I like @pepijnve 's suggestion -- let's put this in a separate test. Perhaps something like limit_single_row_batches.slt

Using the setup of one target_partition and batch sizes of one to test all the limiting corner cases is a great idea. We should probably add some more basic cases to cover some other operators

return self.metrics.baseline_metrics.record_poll(poll);
}

if self.batch_coalescer.is_finished() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the key change -- specifically that on subsequent calls to poll_next() we just drain the coalescer and don't get the next input / try and put it in.

Previously, if the input returns another batch, the FilterExec will try and add it to the BatchCoalescer (which is what triggers the assert)

This new code correctly drains the coalescer state 👍 .

I suspect that we haven't seen this on other queries because most/all the ExecutionPlans that can feed a FilterExec also implement Limit. Thus when the FilterExec calls poll_next on the input, the input returns None and no additional batch is pushed to the BatchCoalescer.

The default batch size for the memory exec means that it will most often only return a single batch.

I can't really figure out why setting the number of target partitions to 1 makes any difference (though I verified it is required to trigger the reproducer)

Perhaps the reason @bert-beyondloops saw this in his system is that you have some custom execution plan (that doesn't implement limit pushdown 🤔 ). This is fine I am just trying to explain why we haven't hit this issue before / in our other tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We indeed specify target partition 1 for some specific case since we know upfront our input is sorted and we do not want overhead of a SortMergeExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the target partition set to 1, you get another physical plan (with multiple partitions) and where the limit is foreseen on another type of Exec.

if self.batch_coalescer.is_finished() {
// If input is done and no batches are ready, return None to signal end of stream.
let poll = Poll::Ready(None);
return self.metrics.baseline_metrics.record_poll(poll);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I don't think record_poll is needed here (it only records information when there is a batch). There is nothing wrong with this call, but it is somewhat confusing as there are other paths below that return without also calling record_poll)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this wasn't consistent. But honestly, you know this (it only records information when there is a batch) by looking at the internals of the record_poll.
In my opinion, you should call the record_poll everywhere but this is another discussion as such :-)

I'll adapt.

@alamb
Copy link
Contributor

alamb commented Jan 14, 2026

Looks good to me ❤️

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's give this until tomorrow to allow time for others to comment and then I'll plan to merge it in assuming nothing else comes up

Thank you again @bert-beyondloops

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants