Skip to content

dropDuplicates(columns) followed by ExceptAll results in INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND #54724

@BaseKan

Description

@BaseKan

When doing an ExceptAll after a dropDuplicates with a subset of columns, the spark process will throw an INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND error. It seems to be a bug in the query evaluation.

The following snippet:

df = spark.createDataFrame([("1", "Alice")], schema="id STRING, name STRING")
expected = spark.createDataFrame([("1", "Alice")], schema="id STRING, name STRING")

df.dropDuplicates(["id"]).exceptAll(expected).count()

Results in the following error:

ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)/ 2]
org.apache.spark.SparkException: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find name#1 in [id#0,sum#11L]. SQLSTATE: XX000
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:491)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:107)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:491)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:496)
	at scala.collection.immutable.List.map(List.scala:240)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:729)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:496)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:467)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:435)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
	at org.apache.spark.sql.execution.GenerateExec.boundGenerator$lzycompute(GenerateExec.scala:76)
	at org.apache.spark.sql.execution.GenerateExec.boundGenerator(GenerateExec.scala:76)
	at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$1(GenerateExec.scala:82)
	at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$1$adapted(GenerateExec.scala:81)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Notably this error will not occur when dropDuplicates is used without arguments, when distinct is used or when subtract is used instead of exceptAll.

The error can also be circumvented by materializing the result of the dropDuplicates operation before calling exceptAll:

deduped = df.dropDuplicates(["id"])
materialized = spark.createDataFrame(deduped.collect(), schema=deduped.schema)

materialized.exceptAll(expected).count() # No error

deduped.cache().localCheckpoint().exceptAll(expected).count() # No error

Tested with pyspark 4.1.1 on Python 3.13.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions