-
Notifications
You must be signed in to change notification settings - Fork 29.1k
Open
Description
When doing an ExceptAll after a dropDuplicates with a subset of columns, the spark process will throw an INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND error. It seems to be a bug in the query evaluation.
The following snippet:
df = spark.createDataFrame([("1", "Alice")], schema="id STRING, name STRING")
expected = spark.createDataFrame([("1", "Alice")], schema="id STRING, name STRING")
df.dropDuplicates(["id"]).exceptAll(expected).count()
Results in the following error:
ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)/ 2]
org.apache.spark.SparkException: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find name#1 in [id#0,sum#11L]. SQLSTATE: XX000
at org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:491)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:107)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:491)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:496)
at scala.collection.immutable.List.map(List.scala:240)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:729)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:496)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:467)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:435)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
at org.apache.spark.sql.execution.GenerateExec.boundGenerator$lzycompute(GenerateExec.scala:76)
at org.apache.spark.sql.execution.GenerateExec.boundGenerator(GenerateExec.scala:76)
at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$1(GenerateExec.scala:82)
at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$1$adapted(GenerateExec.scala:81)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
at org.apache.spark.scheduler.Task.run(Task.scala:147)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Notably this error will not occur when dropDuplicates is used without arguments, when distinct is used or when subtract is used instead of exceptAll.
The error can also be circumvented by materializing the result of the dropDuplicates operation before calling exceptAll:
deduped = df.dropDuplicates(["id"])
materialized = spark.createDataFrame(deduped.collect(), schema=deduped.schema)
materialized.exceptAll(expected).count() # No error
deduped.cache().localCheckpoint().exceptAll(expected).count() # No error
Tested with pyspark 4.1.1 on Python 3.13.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels