Skip to content

MemTable LIMIT silently ignored when SELECT projects columns in different order than table schema #21176

@RamakrishnaChilaka

Description

@RamakrishnaChilaka

Describe the bug

When executing a SQL query with LIMIT against a MemTable, the limit is silently ignored if the SELECT clause projects columns in a different order than they appear in the table schema.

The optimized plan correctly shows Limit: skip=0, fetch=5 and TableScan: fetch=5, but execution returns all rows instead of the requested limit.

This only occurs when the SELECT reorders columns relative to the schema. Selecting columns in schema order works correctly. Adding ORDER BY also makes it work.

Environment:

  • DataFusion version: 53.0.0
  • Arrow version: 55.x (via DataFusion re-export)
  • Rust version: 1.94.0
  • OS: Linux (Ubuntu 22.04)

To Reproduce

Minimal standalone reproduction (copy-paste into any project with datafusion = "53"):

use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use datafusion::arrow::datatypes::{Schema, Field, DataType};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{StringArray, Float64Array};
use std::sync::Arc;

#[tokio::test]
async fn df53_limit_bug_repro() {
    let schema = Arc::new(Schema::new(vec![
        Field::new("col_a", DataType::Utf8, false),
        Field::new("col_b", DataType::Float64, true),
        Field::new("col_c", DataType::Utf8, true),
    ]));
    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(StringArray::from(
                (0..20).map(|i| format!("a-{i}")).collect::<Vec<_>>(),
            )),
            Arc::new(Float64Array::from(
                (0..20).map(|i| i as f64).collect::<Vec<_>>(),
            )),
            Arc::new(StringArray::from(
                (0..20).map(|i| format!("c-{i}")).collect::<Vec<_>>(),
            )),
        ],
    )
    .unwrap();

    // Case 1: Schema-order SELECT → LIMIT works
    let t1 = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap();
    let ctx1 = SessionContext::new();
    ctx1.register_table("t", Arc::new(t1)).unwrap();
    let r1: usize = ctx1
        .sql("SELECT col_b, col_c FROM t LIMIT 5")
        .await.unwrap().collect().await.unwrap()
        .iter().map(|b| b.num_rows()).sum();

    // Case 2: Reverse-order SELECT → LIMIT silently ignored (BUG)
    let t2 = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap();
    let ctx2 = SessionContext::new();
    ctx2.register_table("t", Arc::new(t2)).unwrap();
    let df2 = ctx2.sql("SELECT col_c, col_b FROM t LIMIT 5").await.unwrap();
    let plan = df2.clone().into_optimized_plan().unwrap();
    println!("Optimized plan:\n{plan}");
    let r2: usize = df2.collect().await.unwrap()
        .iter().map(|b| b.num_rows()).sum();

    // Case 3: Single column → LIMIT works
    let t3 = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]]).unwrap();
    let ctx3 = SessionContext::new();
    ctx3.register_table("t", Arc::new(t3)).unwrap();
    let r3: usize = ctx3
        .sql("SELECT col_c FROM t LIMIT 5")
        .await.unwrap().collect().await.unwrap()
        .iter().map(|b| b.num_rows()).sum();

    // Case 4: Reverse-order + ORDER BY → LIMIT works
    let t4 = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
    let ctx4 = SessionContext::new();
    ctx4.register_table("t", Arc::new(t4)).unwrap();
    let r4: usize = ctx4
        .sql("SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5")
        .await.unwrap().collect().await.unwrap()
        .iter().map(|b| b.num_rows()).sum();

    println!("Schema-order (col_b, col_c) LIMIT 5: {r1} rows");
    println!("Reverse-order (col_c, col_b) LIMIT 5: {r2} rows");
    println!("Single column (col_c) LIMIT 5: {r3} rows");
    println!("Reverse + ORDER BY LIMIT 5: {r4} rows");

    assert_eq!(r1, 5, "schema-order should return 5");
    assert_eq!(r2, 5, "reverse-order should return 5");  // FAILS: returns 20
    assert_eq!(r3, 5, "single column should return 5");
    assert_eq!(r4, 5, "reverse + ORDER BY should return 5");
}

Actual output (DataFusion 53.0.0):

Optimized plan for reverse-order SELECT:
Projection: t.col_c, t.col_b
  Limit: skip=0, fetch=5
    TableScan: t projection=[col_b, col_c], fetch=5

Schema-order (col_b, col_c) LIMIT 5: 5 rows ✓
Reverse-order (col_c, col_b) LIMIT 5: 20 rows ✗ BUG
Single column (col_c) LIMIT 5: 5 rows ✓
Reverse + ORDER BY LIMIT 5: 5 rows ✓

Note: the optimized plan is correctfetch=5 is present at both the Limit and TableScan nodes. But execution returns all 20 rows.

Summary of affected patterns:

Query Actual Expected Status
SELECT col_b, col_c FROM t LIMIT 5 (schema order) 5 5
SELECT col_c, col_b FROM t LIMIT 5 (reverse order) 20 5 ✗ BUG
SELECT col_c FROM t LIMIT 5 (single column) 5 5
SELECT col_c, col_b FROM t ORDER BY col_b LIMIT 5 5 5

The DataFrame::limit() API is also affected:

ctx.table("t").await.unwrap()
    .select_columns(&["col_c", "col_b"]).unwrap()
    .limit(0, Some(5)).unwrap()
    .collect().await.unwrap()  // returns 20 rows, not 5

Expected behavior

SELECT col_c, col_b FROM t LIMIT 5 should return exactly 5 rows, regardless of whether the SELECT column order matches the table schema column order.

The optimized plan correctly shows fetch=5 propagated into the TableScan, so the limit semantics are correctly planned — the issue is in physical execution.

Additional context

Probable root cause:

The LIMIT fetch-pushdown optimization propagates fetch into MemTable's TableScan. The TableScan plan shows projection=[col_b, col_c] (ascending schema index order) regardless of SELECT order, and an outer Projection node handles the column reorder.

The issue appears to be that when MemTable applies the fetch parameter with a non-identity projection (indices not in ascending order relative to the output), the limit is lost or applied to the wrong stream.

Workaround:

Reorder the RecordBatch columns to match the SQL SELECT column order before creating the MemTable. This makes DataFusion's projection an identity operation, preventing the buggy code path:

// Before creating MemTable, reorder batch columns to match SELECT order
let reordered_schema = Arc::new(Schema::new(vec![
    Field::new("col_c", DataType::Utf8, true),
    Field::new("col_b", DataType::Float64, true),
]));
let reordered_batch = RecordBatch::try_new(reordered_schema.clone(), vec![
    batch.column_by_name("col_c").unwrap().clone(),
    batch.column_by_name("col_b").unwrap().clone(),
]).unwrap();
let table = MemTable::try_new(reordered_schema, vec![vec![reordered_batch]]).unwrap();
// Now SELECT col_c, col_b FROM t LIMIT 5 works correctly

Discovered in:

A distributed search engine where shard-level RecordBatches have a fixed schema (_id, score, data_col1, data_col2, ...) but user SQL projects columns in arbitrary order. Multi-column LIMIT queries returned N × num_shards rows instead of N rows on a 3-node cluster.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions