Describe the bug
When executing a SQL query with LIMIT against a MemTable, the limit is silently ignored if the SELECT clause projects columns in a different order than they appear in the table schema.
The optimized plan correctly shows Limit: skip=0, fetch=5 and TableScan: fetch=5, but execution returns all rows instead of the requested limit.
This only occurs when the SELECT reorders columns relative to the schema. Selecting columns in schema order works correctly. Adding ORDER BY also makes it work.
Environment:
- DataFusion version: 53.0.0
- Arrow version: 55.x (via DataFusion re-export)
- Rust version: 1.94.0
- OS: Linux (Ubuntu 22.04)
To Reproduce
Minimal standalone reproduction (copy-paste into any project with datafusion = "53"):
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{StringArray, Float64Array};
use std::sync::Arc;
#[tokio::test]
async fn df53_limit_bug_repro() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_a", DataType::Utf8, false),
Field::new("col_b", DataType::Float64, true),
Field::new("col_c", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(
(0..20).map(|i| format!("a-{i}")).collect::<Vec<_>>(),
)),
Arc::new(Float64Array::from(
(0..20).map(|i| i as f64).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
(0..20).map(|i| format!("c-{i}")).collect::<Vec<_>>(),
)),
],
)
.unwrap();
// Case 1: Schema-order SELECT → LIMIT works
let t1 = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap();
let ctx1 = SessionContext::new();
ctx1.register_table("t", Arc::new(t1)).unwrap();
let r1: usize = ctx1
.sql("SELECT col_b, col_c FROM t LIMIT 5")
.await.unwrap().collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
// Case 2: Reverse-order SELECT → LIMIT silently ignored (BUG)
let t2 = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap();
let ctx2 = SessionContext::new();
ctx2.register_table("t", Arc::new(t2)).unwrap();
let df2 = ctx2.sql("SELECT col_c, col_b FROM t LIMIT 5").await.unwrap();
let plan = df2.clone().into_optimized_plan().unwrap();
println!("Optimized plan:\n{plan}");
let r2: usize = df2.collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
// Case 3: Single column → LIMIT works
let t3 = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap();
let ctx3 = SessionContext::new();
ctx3.register_table("t", Arc::new(t3)).unwrap();
let r3: usize = ctx3
.sql("SELECT col_c FROM t LIMIT 5")
.await.unwrap().collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
// Case 4: Reverse-order + ORDER BY → LIMIT works
let t4 = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
let ctx4 = SessionContext::new();
ctx4.register_table("t", Arc::new(t4)).unwrap();
let r4: usize = ctx4
.sql("SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5")
.await.unwrap().collect().await.unwrap()
.iter().map(|b| b.num_rows()).sum();
println!("Schema-order (col_b, col_c) LIMIT 5: {r1} rows");
println!("Reverse-order (col_c, col_b) LIMIT 5: {r2} rows");
println!("Single column (col_c) LIMIT 5: {r3} rows");
println!("Reverse + ORDER BY LIMIT 5: {r4} rows");
assert_eq!(r1, 5, "schema-order should return 5");
assert_eq!(r2, 5, "reverse-order should return 5"); // FAILS: returns 20
assert_eq!(r3, 5, "single column should return 5");
assert_eq!(r4, 5, "reverse + ORDER BY should return 5");
}
Actual output (DataFusion 53.0.0):
Optimized plan for reverse-order SELECT:
Projection: t.col_c, t.col_b
Limit: skip=0, fetch=5
TableScan: t projection=[col_b, col_c], fetch=5
Schema-order (col_b, col_c) LIMIT 5: 5 rows ✓
Reverse-order (col_c, col_b) LIMIT 5: 20 rows ✗ BUG
Single column (col_c) LIMIT 5: 5 rows ✓
Reverse + ORDER BY LIMIT 5: 5 rows ✓
Note: the optimized plan is correct — fetch=5 is present at both the Limit and TableScan nodes. But execution returns all 20 rows.
Summary of affected patterns:
| Query |
Actual |
Expected |
Status |
SELECT col_b, col_c FROM t LIMIT 5 (schema order) |
5 |
5 |
✓ |
SELECT col_c, col_b FROM t LIMIT 5 (reverse order) |
20 |
5 |
✗ BUG |
SELECT col_c FROM t LIMIT 5 (single column) |
5 |
5 |
✓ |
SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5 |
5 |
5 |
✓ |
The DataFrame::limit() API is also affected:
ctx.table("t").await.unwrap()
.select_columns(&["col_c", "col_b"]).unwrap()
.limit(0, Some(5)).unwrap()
.collect().await.unwrap() // returns 20 rows, not 5
Expected behavior
SELECT col_c, col_b FROM t LIMIT 5 should return exactly 5 rows, regardless of whether the SELECT column order matches the table schema column order.
The optimized plan correctly shows fetch=5 propagated into the TableScan, so the limit semantics are correctly planned — the issue is in physical execution.
Additional context
Probable root cause:
The LIMIT fetch-pushdown optimization propagates fetch into MemTable's TableScan. The TableScan plan shows projection=[col_b, col_c] (ascending schema index order) regardless of SELECT order, and an outer Projection node handles the column reorder.
The issue appears to be that when MemTable applies the fetch parameter with a non-identity projection (indices not in ascending order relative to the output), the limit is lost or applied to the wrong stream.
Workaround:
Reorder the RecordBatch columns to match the SQL SELECT column order before creating the MemTable. This makes DataFusion's projection an identity operation, preventing the buggy code path:
// Before creating MemTable, reorder batch columns to match SELECT order
let reordered_schema = Arc::new(Schema::new(vec![
Field::new("col_c", DataType::Utf8, true),
Field::new("col_b", DataType::Float64, true),
]));
let reordered_batch = RecordBatch::try_new(reordered_schema.clone(), vec![
batch.column_by_name("col_c").unwrap().clone(),
batch.column_by_name("col_b").unwrap().clone(),
]).unwrap();
let table = MemTable::try_new(reordered_schema, vec![vec![reordered_batch]]).unwrap();
// Now SELECT col_c, col_b FROM t LIMIT 5 works correctly
Discovered in:
A distributed search engine where shard-level RecordBatches have a fixed schema (_id, score, data_col1, data_col2, ...) but user SQL projects columns in arbitrary order. Multi-column LIMIT queries returned N × num_shards rows instead of N rows on a 3-node cluster.
Describe the bug
When executing a SQL query with
LIMITagainst aMemTable, the limit is silently ignored if theSELECTclause projects columns in a different order than they appear in the table schema.The optimized plan correctly shows
Limit: skip=0, fetch=5andTableScan: fetch=5, but execution returns all rows instead of the requested limit.This only occurs when the SELECT reorders columns relative to the schema. Selecting columns in schema order works correctly. Adding
ORDER BYalso makes it work.Environment:
To Reproduce
Minimal standalone reproduction (copy-paste into any project with
datafusion = "53"):Actual output (DataFusion 53.0.0):
Note: the optimized plan is correct —
fetch=5is present at both theLimitandTableScannodes. But execution returns all 20 rows.Summary of affected patterns:
SELECT col_b, col_c FROM t LIMIT 5(schema order)SELECT col_c, col_b FROM t LIMIT 5(reverse order)SELECT col_c FROM t LIMIT 5(single column)SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5The
DataFrame::limit()API is also affected:Expected behavior
SELECT col_c, col_b FROM t LIMIT 5should return exactly 5 rows, regardless of whether the SELECT column order matches the table schema column order.The optimized plan correctly shows
fetch=5propagated into theTableScan, so the limit semantics are correctly planned — the issue is in physical execution.Additional context
Probable root cause:
The
LIMITfetch-pushdown optimization propagatesfetchintoMemTable'sTableScan. TheTableScanplan showsprojection=[col_b, col_c](ascending schema index order) regardless of SELECT order, and an outerProjectionnode handles the column reorder.The issue appears to be that when
MemTableapplies thefetchparameter with a non-identity projection (indices not in ascending order relative to the output), the limit is lost or applied to the wrong stream.Workaround:
Reorder the
RecordBatchcolumns to match the SQLSELECTcolumn order before creating theMemTable. This makes DataFusion's projection an identity operation, preventing the buggy code path:Discovered in:
A distributed search engine where shard-level
RecordBatches have a fixed schema (_id, score, data_col1, data_col2, ...) but user SQL projects columns in arbitrary order. Multi-column LIMIT queries returnedN × num_shardsrows instead ofNrows on a 3-node cluster.