Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import java.nio.file.{Files, Paths}
import java.sql.Timestamp
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters
import scala.collection.JavaConverters._

case class TsContributor(touch_time: Timestamp)

class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val resourcePath: String = "/tpch-data-parquet"
Expand Down Expand Up @@ -2258,4 +2261,30 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
}

test("ColumnarPartialProject for null struct row with timestamp child") {
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
val structIdentity = udf((p: TsContributor) => p)
spark.udf.register("struct_identity", structIdentity)

withTable("t") {
spark.sql("create table t (id int, ts timestamp) using parquet")
spark.sql("insert into t values (1, timestamp'2026-01-01 00:00:00')")
spark.sql("insert into t values (2, null)")

runQueryAndCompare(
"""with v as (
|select id, case when ts is null then null else named_struct('touch_time', ts) end as ts
|from t
|)
|select id, case when ts is null then null else struct_identity(ts) end from v
|""".stripMargin) {
df =>
val executedPlan = getExecutedPlan(df)
assert(executedPlan.count(_.isInstanceOf[ProjectExec]) == 0)
assert(executedPlan.count(_.isInstanceOf[ColumnarPartialProjectExec]) == 1)
}
}
}
}
}
Loading