Skip to content

[core] Support btree global index with embedded file metadata#7563

Open
lilei1128 wants to merge 2 commits intoapache:masterfrom
lilei1128:global_index_with_manifest
Open

[core] Support btree global index with embedded file metadata#7563
lilei1128 wants to merge 2 commits intoapache:masterfrom
lilei1128:global_index_with_manifest

Conversation

@lilei1128
Copy link
Copy Markdown
Contributor

Purpose

Add with-file-meta option for btree global index to embed ManifestEntry
data directly in index files, enabling manifest-skip query planning.

Key changes:

  • New index type "btree_file_meta": SST file mapping fileName -> ManifestEntry bytes
  • BTreeWithFileMetaBuilder: builds btree key-index + btree_file_meta atomically
  • BTreeWithFileMetaReader: reads both indexes and returns FilePathGlobalIndexResult
  • DataEvolutionBatchScan: fast path using file-meta to build DataSplits without
    manifest reads, with staleness detection via fileIO.exists()
  • BTreeIndexOptions.BTREE_WITH_FILE_META: config to enable the feature

When enabled, query planning reads only:

  • btree key-index SST (for matching row IDs)
  • btree_file_meta SST (for ManifestEntry data)

See: https://cwiki.apache.org/confluence/display/PAIMON/PIP-41%3A+Introduce+FilePath+Global+Index+And+Optimizations+For+Lookup+In+Append+Table

Tests

CI

Copy link
Copy Markdown
Contributor

@steFaiz steFaiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for this PR!
I'm just wondering that if it's necessary to reuse current BTree codebase? For example:

  1. in the btree index build topo, current implementation will decide the partition num by records number per range, and split ranges by partition, which may be not suitable for your case.
  2. And also, it seems that the BTREE_WITH_FILE_META option will create a total different index type compared to BTree.

@lilei1128
Copy link
Copy Markdown
Contributor Author

lilei1128 commented Mar 31, 2026

Hi, thanks for this PR! I'm just wondering that if it's necessary to reuse current BTree codebase? For example:

  1. in the btree index build topo, current implementation will decide the partition num by records number per range, and split ranges by partition, which may be not suitable for your case.
  2. And also, it seems that the BTREE_WITH_FILE_META option will create a total different index type compared to BTree.

The "with-file-meta" is NOT a completely different index type. It's:

  • btree key-index (key → rowId bitmap) - same as regular btree
  • btree_file_meta (fileName → ManifestEntry) - addiional metadata file

For first question, you're right that the parallelism logic is designed for key-index.
Currently, when parallelism > 1, each subtask writes a complete file-meta SST.
I've handled this with read-time deduplication using LinkedHashMap.putIfAbsent()
in BTreeWithFileMetaReader - duplicate entries are filtered out during query. Long term solution, we can:

  1. Write file-meta only in subtask 0 (write-time serialization)
  2. Or deduplicate at commit time (though this leaves orphan files)?

To skip manifest reads, we need two capabilities:

  1. Predicate evaluation (key → matching rowIds)
  2. RowId → file info (to build DataSplit without manifest)

If we don't reuse BTree:
For capability 1, we would need to either:

  • Reimplement predicate evaluation logic (essentially duplicating BTree)
  • Use Bloom Filter which can only filter files at file-level, not evaluate
    predicates precisely to get matching rowIds

For capability 2, alternatives like manifest caching still require manifest
reads - they just reduce the cost, not eliminate it.

That's why reusing BTree makes sense:

  • Capability 1: Inherited from existing BTree implementation
  • Capability 2: Added by embedding ManifestEntry in file-meta SST

@steFaiz
Copy link
Copy Markdown
Contributor

steFaiz commented Mar 31, 2026

@lilei1128
Thank you for the explanation! I misunderstood it earlier. I thought it was a general-purpose metadata index that could accelerate any metadata access.

int partitionNum = Math.max((int) (range.count() / recordsPerRange), 1);
partitionNum = Math.min(partitionNum, maxParallelism);

// Pre-serialize ManifestEntries for file-meta index (if withFileMeta is enabled)
Copy link
Copy Markdown
Contributor

@steFaiz steFaiz Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the key(i.e. the filename) is just used for deduplication? Can I image this index as actually a Range to Collection<ManifestEntry> index?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently fileName is mainly for deduplication; the runtime path does not do fileName point lookup yet.

This is an optimization point that follows

@JingsongLi
Copy link
Copy Markdown
Contributor

Hi @lilei1128 , thanks for the contribution!

Do you have some benchmark on this PR? I am curious about the performance comparison between file meta based and rowid based in big data scenario.

@lilei1128
Copy link
Copy Markdown
Contributor Author

lilei1128 commented Apr 1, 2026

Hi @lilei1128 , thanks for the contribution!

Do you have some benchmark on this PR? I am curious about the performance comparison between file meta based and rowid based in big data scenario.

Hi @lilei1128 , thanks for the contribution!

Do you have some benchmark on this PR? I am curious about the performance comparison between file meta based and rowid based in big data scenario.

Hi, This is my test result on mac:
image

Range queries perform better than point queries and the effect would be better if the data were on OSS/S3.

Appendix:

  1. Create Table
    `[root@paimon_dev workspace]# cat test_index/GlobalIndexPerfTest.scala
    import org.apache.spark.sql.SparkSession
    import scala.collection.mutable.ListBuffer
    import java.io.{File, PrintWriter}

val spark = SparkSession.builder().appName("GlobalIndexPerfTest").getOrCreate()

println("=" * 60)
println("Global Index Performance Test")
println("=" * 60)

// ==================== 配置参数 ====================
val NUM_WRITES = 30 // 写入次数(产生 manifest 数量)
val RECORDS_PER_WRITE = 50000 // 每次写入记录数
val NUM_QUERIES = 50 // 查询次数

// ==================== 1. 创建测试表 ====================
println("\n[Step 1] Creating test tables...")
spark.sql("USE paimon")
spark.sql("CREATE DATABASE IF NOT EXISTS perf_db")
spark.sql("USE perf_db")

// 删除旧表
spark.sql("DROP TABLE IF EXISTS perf_test_no_index")
spark.sql("DROP TABLE IF EXISTS perf_test_btree")
spark.sql("DROP TABLE IF EXISTS perf_test_with_meta")

// 无索引表
spark.sql("""
CREATE TABLE perf_test_no_index (
id INT,
name STRING,
embedding ARRAY,
content STRING
) USING paimon
TBLPROPERTIES (
'bucket' = '-1',
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true'
)
""")

// 普通 btree 索引表
spark.sql("""
CREATE TABLE perf_test_btree (
id INT,
name STRING,
embedding ARRAY,
content STRING
) USING paimon
TBLPROPERTIES (
'bucket' = '-1',
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'global-index.enabled' = 'true'
)
""")

// with-file-meta 索引表
spark.sql("""
CREATE TABLE perf_test_with_meta (
id INT,
name STRING,
embedding ARRAY,
content STRING
) USING paimon
TBLPROPERTIES (
'bucket' = '-1',
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'global-index.enabled' = 'true'
)
""")

println(s"Created 3 test tables")

// ==================== 2. 写入数据 ====================
println(s"\n[Step 2] Writing data ($NUM_WRITES writes × $RECORDS_PER_WRITE records)...")

val writeStartTime = System.currentTimeMillis()
for (i <- 0 until NUM_WRITES) {
val offset = i * RECORDS_PER_WRITE
val df = spark.range(offset, offset + RECORDS_PER_WRITE).map { id =>
val name = s"name_$id"
val embedding = (1 to 10).map(_ => scala.util.Random.nextFloat()).toArray
val content = s"Content for id $id with random text ${scala.util.Random.nextDouble()}"
(id.toInt, name, embedding, content)
}.toDF("id", "name", "embedding", "content")

  // 同时写入三个表
  df.write.mode("append").insertInto("perf_test_no_index")
  df.write.mode("append").insertInto("perf_test_btree")
  df.write.mode("append").insertInto("perf_test_with_meta")

  if ((i + 1) % 10 == 0) {
      println(s"  Write progress: ${i + 1}/$NUM_WRITES")
  }

}

val writeEndTime = System.currentTimeMillis()
println(s"Data write completed in ${(writeEndTime - writeStartTime) / 1000}s")

// ==================== 4. 创建索引 ====================
println("\n[Step 4] Creating global indexes...")

println("Creating btree index on perf_test_btree...")
spark.sql("CALL sys.create_global_index(table =>'perf_db.perf_test_btree', index_column =>'id', index_type =>'btree')")

println("Creating btree index with file-meta on perf_test_with_meta...")
spark.sql("CALL sys.create_global_index(table =>'perf_db.perf_test_with_meta', index_column =>'id', index_type =>'btree',options => 'index.with-file-meta=true')")

println("Indexes created successfully")

// ==================== 5. 数据统计 ====================
println("\n[Step 5] Data statistics...")

val countNoIndex = spark.sql("SELECT COUNT() FROM perf_test_no_index").collect()(0)(0)
val countBtree = spark.sql("SELECT COUNT(
) FROM perf_test_btree").collect()(0)(0)
val countWithMeta = spark.sql("SELECT COUNT(*) FROM perf_test_with_meta").collect()(0)(0)

println(s"Total records: $countNoIndex")
println(s"Total records: $countBtree")
println(s"Total records: $countWithMeta")

// ==================== 6. 性能测试 ====================
println(s"\n[Step 6] Running performance tests ($NUM_QUERIES queries)...")

// 生成随机查询值
val maxId = NUM_WRITES * RECORDS_PER_WRITE
val queryIds = (1 to NUM_QUERIES).map(_ => scala.util.Random.nextInt(maxId))

case class QueryResult(
scenario: String,
queryId: Int,
latencyMs: Long,
rowCount: Long
)

def runQuery(table: String, queryId: Int): (Long, Long) = {
val start = System.currentTimeMillis()
val rows = spark.sql(s"SELECT * FROM $table WHERE id = $queryId").collect()
val latency = System.currentTimeMillis() - start
(latency, rows.length)
}

def runBenchmark(tableName: String, scenario: String): Seq[QueryResult] = {
println(s"\nTesting: $scenario")
val results = ListBufferQueryResult

  // 预热查询(不计入统计)
  runQuery(tableName, 0)

  // 正式查询
  queryIds.foreach { queryId =>
      val (latency, rowCount) = runQuery(tableName, queryId)
      results += QueryResult(scenario, queryId, latency, rowCount)
      print(".")
  }
  println(" Done")
  results.toSeq

}

// 执行测试
val resultsNoIndex = runBenchmark("perf_test_no_index", "No Index")
val resultsBtree = runBenchmark("perf_test_btree", "BTree Index")
val resultsWithMeta = runBenchmark("perf_test_with_meta", "BTree + FileMeta")

// ==================== 7. 结果分析 ====================
println("\n" + "=" * 60)
println("PERFORMANCE RESULTS")
println("=" * 60)

case class Statistics(
scenario: String,
totalMs: Long,
avgMs: Double,
minMs: Long,
maxMs: Long,
p50Ms: Long,
p99Ms: Long
)

def calcStats(results: Seq[QueryResult]): Statistics = {
val latencies = results.map(_.latencyMs).sorted
val n = latencies.size

  Statistics(
      scenario = results.head.scenario,
      totalMs = latencies.sum,
      avgMs = latencies.sum.toDouble / n,
      minMs = latencies.min,
      maxMs = latencies.max,
      p50Ms = latencies(n / 2),
      p99Ms = latencies((n * 99) / 100)
  )

}

val statsNoIndex = calcStats(resultsNoIndex)
val statsBtree = calcStats(resultsBtree)
val statsWithMeta = calcStats(resultsWithMeta)

def printStats(stats: Statistics, baseline: Statistics): Unit = {
val improvement = if (baseline != null) {
val imp = (baseline.avgMs - stats.avgMs) / baseline.avgMs * 100
f" ($imp%.1f%% faster)"
} else ""

  println(f"""

| ${stats.scenario}$improvement
| Total: ${stats.totalMs} ms
| Avg: ${stats.avgMs}%.2f ms
| Min: ${stats.minMs} ms
| Max: ${stats.maxMs} ms
| P50: ${stats.p50Ms} ms
| P99: ${stats.p99Ms} ms
""".stripMargin)
}

printStats(statsNoIndex, null)
printStats(statsBtree, statsNoIndex)
printStats(statsWithMeta, statsNoIndex)

// 性能提升对比
println("\n" + "-" * 60)
println("SUMMARY")
println("-" * 60)

val improvementBtree = (statsNoIndex.avgMs - statsBtree.avgMs) / statsNoIndex.avgMs * 100
val improvementWithMeta = (statsNoIndex.avgMs - statsWithMeta.avgMs) / statsNoIndex.avgMs * 100
val improvementMetaVsBtree = (statsBtree.avgMs - statsWithMeta.avgMs) / statsBtree.avgMs * 100

println(f"""
| No Index → BTree Index: ${improvementBtree}%.1f%% faster
| No Index → BTree + FileMeta: ${improvementWithMeta}%.1f%% faster
| BTree → BTree + FileMeta: ${improvementMetaVsBtree}%.1f%% faster (manifest-skip benefit)
""".stripMargin)

// ==================== 8. 保存结果 ====================
println("\n[Step 8] Saving results...")

val reportFile = new java.io.PrintWriter(new java.io.File("/tmp/global_index_perf_report.txt"))
reportFile.println(s"Global Index Performance Report")
reportFile.println(s"Generated: ${java.time.LocalDateTime.now()}")
reportFile.println(s"=" * 60)
reportFile.println(s"\nConfiguration:")
reportFile.println(s" Number of writes: $NUM_WRITES")
reportFile.println(s" Records per write: $RECORDS_PER_WRITE")
reportFile.println(s" Total records: $countNoIndex")
reportFile.println(s" Query count: $NUM_QUERIES")
reportFile.println(s"\nResults:")
reportFile.println(f" No Index Avg: ${statsNoIndex.avgMs}%.2f ms")
reportFile.println(f" BTree Index Avg: ${statsBtree.avgMs}%.2f ms")
reportFile.println(f" BTree+FileMeta Avg: ${statsWithMeta.avgMs}%.2f ms")
reportFile.println(f"\nImprovement vs No Index:")
reportFile.println(f" BTree Index: ${improvementBtree}%.1f%%")
reportFile.println(f" BTree + FileMeta: ${improvementWithMeta}%.1f%%")
reportFile.println(f"\nManifest-skip benefit (BTree vs BTree+FileMeta):")
reportFile.println(f" ${improvementMetaVsBtree}%.1f%% faster")
reportFile.close()

println(s"Report saved to: /tmp/global_index_perf_report.txt")

// ==================== 完成 ====================
println("\n" + "=" * 60)
println("Test completed successfully!")
println("=" * 60)`

  1. Query script
    `[root@paimon_dev workspace]# cat test_index/GlobalIndexQueryTest.scala
    import org.apache.spark.sql.SparkSession
    import scala.collection.mutable.ListBuffer

val spark = SparkSession.builder().appName("GlobalIndexQueryTest").getOrCreate()

println("=" * 60)
println("Global Index Query Test")
println("=" * 60)

// ==================== 配置参数 ====================
val NUM_QUERIES = 50 // 查询次数
val NUM_WARMUP = 10 // 预热次数

// ==================== 1. 表信息确认 ====================
println("\n[Step 1] Checking tables...")

spark.sql("USE paimon")
spark.sql("USE perf_db")

val countNoIndex = spark.sql("SELECT COUNT(*) FROM perf_test_no_index").collect()(0)(0)
println(s"Total rows: $countNoIndex")

// ==================== 2. 预热查询 ====================
println(s"\n[Step 2] Warmup ($NUM_WARMUP queries)...")

for (i <- 1 to NUM_WARMUP) {
spark.sql("SELECT * FROM perf_test_no_index WHERE id = 12345").collect()
spark.sql("SELECT * FROM perf_test_btree WHERE id = 12345").collect()
spark.sql("SELECT * FROM perf_test_with_meta WHERE id = 12345").collect()
}
println("Warmup completed")

// ==================== 3. 性能测试 ====================
println(s"\n[Step 3] Running performance tests ($NUM_QUERIES queries)...")

val maxId = countNoIndex.asInstanceOf[Long]
val queryIds = (1 to NUM_QUERIES).map(_ => scala.util.Random.nextInt(maxId.toInt))

case class QueryResult(
scenario: String,
queryId: Int,
latencyMs: Long,
rowCount: Long
)

def runQuery(table: String, queryId: Int): (Long, Long) = {
val start = System.currentTimeMillis()
val rows = spark.sql(s"SELECT * FROM $table WHERE id = $queryId").collect()
val latency = System.currentTimeMillis() - start
(latency, rows.length)
}

def runBenchmark(tableName: String, scenario: String): Seq[QueryResult] = {
println(s"\nTesting: $scenario")
val results = ListBufferQueryResult

queryIds.foreach { queryId =>
    val (latency, rowCount) = runQuery(tableName, queryId)
    results += QueryResult(scenario, queryId, latency, rowCount)
    print(".")
}
println(" Done")
results.toSeq

}

// 执行测试
val resultsNoIndex = runBenchmark("perf_test_no_index", "No Index")
val resultsBtree = runBenchmark("perf_test_btree", "BTree Index")
val resultsWithMeta = runBenchmark("perf_test_with_meta", "BTree + FileMeta")

// ==================== 4. 结果分析 ====================
println("\n" + "=" * 60)
println("PERFORMANCE RESULTS")
println("=" * 60)

case class Statistics(
scenario: String,
totalMs: Long,
avgMs: Double,
minMs: Long,
maxMs: Long,
p50Ms: Long,
p99Ms: Long
)

def calcStats(results: Seq[QueryResult]): Statistics = {
val latencies = results.map(_.latencyMs).sorted
val n = latencies.size

Statistics(
    scenario = results.head.scenario,
    totalMs = latencies.sum,
    avgMs = latencies.sum.toDouble / n,
    minMs = latencies.min,
    maxMs = latencies.max,
    p50Ms = latencies(n / 2),
    p99Ms = latencies((n * 99) / 100)
)

}

val statsNoIndex = calcStats(resultsNoIndex)
val statsBtree = calcStats(resultsBtree)
val statsWithMeta = calcStats(resultsWithMeta)

def printStats(stats: Statistics, baseline: Statistics): Unit = {
val improvement = if (baseline != null) {
val imp = (baseline.avgMs - stats.avgMs) / baseline.avgMs * 100
f" ($imp%.1f%% faster)"
} else ""

println(f"""

| ${stats.scenario}$improvement
| Total: ${stats.totalMs} ms
| Avg: ${stats.avgMs}%.2f ms
| Min: ${stats.minMs} ms
| Max: ${stats.maxMs} ms
| P50: ${stats.p50Ms} ms
| P99: ${stats.p99Ms} ms
""".stripMargin)
}

printStats(statsNoIndex, null)
printStats(statsBtree, statsNoIndex)
printStats(statsWithMeta, statsNoIndex)

// 性能提升对比
println("\n" + "-" * 60)
println("SUMMARY")
println("-" * 60)

val improvementBtree = (statsNoIndex.avgMs - statsBtree.avgMs) / statsNoIndex.avgMs * 100
val improvementWithMeta = (statsNoIndex.avgMs - statsWithMeta.avgMs) / statsNoIndex.avgMs * 100
val improvementMetaVsBtree = (statsBtree.avgMs - statsWithMeta.avgMs) / statsBtree.avgMs * 100

println(f"""
| No Index → BTree Index: ${improvementBtree}%.1f%% faster
| No Index → BTree + FileMeta: ${improvementWithMeta}%.1f%% faster
| BTree → BTree + FileMeta: ${improvementMetaVsBtree}%.1f%% faster (manifest-skip benefit)
""".stripMargin)

// ==================== 5. 范围查询测试 ====================
println("\n" + "=" * 60)
println("RANGE QUERY TEST")
println("=" * 60)

def runRangeQuery(table: String, low: Int, high: Int): (Long, Long) = {
val start = System.currentTimeMillis()
val rows = spark.sql(s"SELECT COUNT(*) FROM $table WHERE id BETWEEN $low AND $high").collect()
val latency = System.currentTimeMillis() - start
(latency, rows(0)(0).asInstanceOf[Long])
}

val rangeTests = Seq(
(10000, 20000),
(100000, 200000),
(500000, 600000)
)

println("\nRange Query Results:")
println(f"${"Table"}%-25s ${"Range"}%-20s ${"Count"}%-12s ${"Latency(ms)"}%-12s")
println("-" * 70)

rangeTests.foreach { case (low, high) =>
val (t1, c1) = runRangeQuery("perf_test_no_index", low, high)
val (t2, c2) = runRangeQuery("perf_test_btree", low, high)
val (t3, c3) = runRangeQuery("perf_test_with_meta", low, high)

println(f"perf_test_no_index       [$low-$high]   $c1%-12d $t1%-12d")
println(f"perf_test_btree          [$low-$high]   $c2%-12d $t2%-12d")
println(f"perf_test_with_meta      [$low-$high]   $c3%-12d $t3%-12d")
println("-" * 70)

}

// ==================== 完成 ====================
println("\n" + "=" * 60)
println("Test completed!")
println("=" * 60)`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants