diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index be84662e1a50..2d1c1e0ccecf 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1306,6 +1306,13 @@ jobs: with: name: arrow-jars-centos-7-${{github.sha}} path: /root/.m2/repository/org/apache/arrow/ + - name: Prepare + run: | + dnf module -y install python39 && \ + alternatives --set python3 /usr/bin/python3.9 && \ + pip3 install setuptools==77.0.3 && \ + pip3 install pyspark==3.5.5 cython && \ + pip3 install pandas==2.2.3 pyarrow==20.0.0 - name: Prepare Spark Resources for Spark 4.0.1 #TODO remove after image update run: | rm -rf /opt/shims/spark40 @@ -1412,6 +1419,15 @@ jobs: with: name: arrow-jars-centos-7-${{github.sha}} path: /root/.m2/repository/org/apache/arrow/ + - name: Prepare + run: | + dnf install -y python3.11 python3.11-pip python3.11-devel && \ + ls -la /usr/bin/python3.11 && \ + alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 && \ + alternatives --set python3 /usr/bin/python3.11 && \ + pip3 install setuptools==77.0.3 && \ + pip3 install pyspark==3.5.5 cython && \ + pip3 install pandas==2.2.3 pyarrow==20.0.0 - name: Prepare Spark Resources for Spark 4.1.1 #TODO remove after image update run: | rm -rf /opt/shims/spark41 diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala index 7747bd219375..e36fbb19f8a9 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala @@ -44,8 +44,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite { .set("spark.executor.cores", "1") } - // TODO: fix on spark-4.1 - testWithMaxSparkVersion("arrow_udf test: without projection", "4.0") { + test("arrow_udf test: without projection") { lazy val base = Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0)) .toDF("a", "b") @@ -65,8 +64,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite { checkAnswer(df2, expected) } - // TODO: fix on spark-4.1 - testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "4.0") { + test("arrow_udf test: with unrelated projection") { lazy val base = Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0)) .toDF("a", "b") @@ -87,8 +85,7 @@ class ArrowEvalPythonExecSuite extends WholeStageTransformerSuite { checkAnswer(df, expected) } - // TODO: fix on spark-4.1 - testWithMaxSparkVersion("arrow_udf test: with preprojection", "4.0") { + test("arrow_udf test: with preprojection") { lazy val base = Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0", 1), ("3", 0)) .toDF("a", "b") diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index b6a19c61ffd1..4e13d9cff7d4 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -1134,9 +1134,11 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("remove redundant WindowGroupLimits") enableSuite[GlutenSQLCollectLimitExecSuite] // Generated suites for org.apache.spark.sql.execution.python - // TODO: 4.x enableSuite[GlutenPythonDataSourceSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenPythonUDFSuite] // 1 failure - // TODO: 4.x enableSuite[GlutenPythonUDTFSuite] + enableSuite[GlutenPythonDataSourceSuite] + .exclude("SPARK-50426: should not trigger static Python data source lookup") + enableSuite[GlutenPythonUDFSuite] + .exclude("SPARK-48706: Negative test case for Python UDF in higher order functions") + enableSuite[GlutenPythonUDTFSuite] enableSuite[GlutenRowQueueSuite] enableSuite[GlutenBatchEvalPythonExecSuite] // Replaced with other tests that check for native operations diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala index 473884893733..2fb7114ee111 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala @@ -16,6 +16,22 @@ */ package org.apache.spark.sql.execution.python -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.{AnalysisException, GlutenSQLTestsTrait, IntegratedUDFTestUtils} +import org.apache.spark.sql.functions.{array, transform} -class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait {} +class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait { + + // Override: the original test uses this.getClass.getSimpleName in ExpectedContext pattern, + // which returns "GlutenPythonUDFSuite" but the actual callSite records "PythonUDFSuite". + testGluten("SPARK-48706: Negative test case for Python UDF in higher order functions") { + assume(IntegratedUDFTestUtils.shouldTestPythonUDFs) + checkError( + exception = intercept[AnalysisException] { + spark.range(1).select(transform(array("id"), x => pythonTestUDF(x))).collect() + }, + condition = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF", + parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""), + context = ExpectedContext("transform", s".*PythonUDFSuite.*") + ) + } +} diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala index e9067eecd75d..534ce669bbab 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala @@ -18,6 +18,4 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.GlutenSQLTestsTrait -// TODO: 4.x extends PythonUDTFSuite, currently PythonUDTFSuite requires Python executable -// [python3] and pyspark to be available, which are not present in the 4.0 CI environment. -class GlutenPythonUDTFSuite extends GlutenSQLTestsTrait {} +class GlutenPythonUDTFSuite extends PythonUDTFSuite with GlutenSQLTestsTrait {} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 968f28a6a963..af9d68c28a4d 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -1140,10 +1140,12 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("remove redundant WindowGroupLimits") enableSuite[GlutenSQLCollectLimitExecSuite] // Generated suites for org.apache.spark.sql.execution.python - // TODO: 4.x enableSuite[GlutenPythonDataSourceSuite] - // TODO: 4.x enableSuite[GlutenPythonUDFSuite] - // TODO: 4.x enableSuite[GlutenPythonUDTFSuite] - // TODO: 4.x enableSuite[GlutenRowQueueSuite] + enableSuite[GlutenPythonDataSourceSuite] + .exclude("data source reader with filter pushdown") + enableSuite[GlutenPythonUDFSuite] + .exclude("SPARK-48706: Negative test case for Python UDF in higher order functions") + enableSuite[GlutenPythonUDTFSuite] + enableSuite[GlutenRowQueueSuite] enableSuite[GlutenBatchEvalPythonExecSuite] // Replaced with other tests that check for native operations .exclude("Python UDF: push down deterministic FilterExec predicates") diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonDataSourceSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonDataSourceSuite.scala index 155516b583d9..b7323da73a40 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonDataSourceSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonDataSourceSuite.scala @@ -16,6 +16,82 @@ */ package org.apache.spark.sql.execution.python -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.gluten.execution.FilterExecTransformerBase -class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait {} +import org.apache.spark.sql.{GlutenSQLTestsTrait, IntegratedUDFTestUtils, Row} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.python.PythonScan +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +class GlutenPythonDataSourceSuite extends PythonDataSourceSuite with GlutenSQLTestsTrait { + + import IntegratedUDFTestUtils._ + + // Gluten replaces FilterExec with FilterExecTransformer and + // BatchScanExec with BatchScanExecTransformer + testGluten("data source reader with filter pushdown") { + assume(shouldTestPandasUDFs) + val dataSourceScript = + s""" + |from pyspark.sql.datasource import ( + | DataSource, + | DataSourceReader, + | EqualTo, + | InputPartition, + |) + | + |class SimpleDataSourceReader(DataSourceReader): + | def partitions(self): + | return [InputPartition(i) for i in range(2)] + | + | def pushFilters(self, filters): + | for filter in filters: + | if filter != EqualTo(("partition",), 0): + | yield filter + | + | def read(self, partition): + | yield (0, partition.value) + | yield (1, partition.value) + | yield (2, partition.value) + | + |class SimpleDataSource(DataSource): + | def schema(self): + | return "id int, partition int" + | + | def reader(self, schema): + | return SimpleDataSourceReader() + |""".stripMargin + val schema = StructType.fromDDL("id INT, partition INT") + val dataSource = + createUserDefinedPythonDataSource(name = dataSourceName, pythonScript = dataSourceScript) + withSQLConf(SQLConf.PYTHON_FILTER_PUSHDOWN_ENABLED.key -> "true") { + spark.dataSource.registerPython(dataSourceName, dataSource) + val df = + spark.read.format(dataSourceName).schema(schema).load().filter("id = 1 and partition = 0") + val plan = df.queryExecution.executedPlan + + val filter = collectFirst(plan) { + case s: FilterExecTransformerBase => + val condition = s.cond.toString + assert(!condition.contains("= 0")) + assert(condition.contains("= 1")) + s + }.getOrElse( + fail(s"FilterExecTransformerBase not found in the plan. Actual plan:\n$plan") + ) + + // Gluten does not replace PythonScan's BatchScanExec - it stays as vanilla + // BatchScanExec with RowToVeloxColumnar transition + collectFirst(filter) { + case s: BatchScanExec if s.scan.isInstanceOf[PythonScan] => + val p = s.scan.asInstanceOf[PythonScan] + assert(p.getMetaData().get("PushedFilters").contains("[EqualTo(partition,0)]")) + }.getOrElse( + fail(s"BatchScanExec with PythonScan not found. Actual plan:\n$plan") + ) + + checkAnswer(df, Seq(Row(1, 0), Row(1, 1))) + } + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala index 473884893733..2fb7114ee111 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDFSuite.scala @@ -16,6 +16,22 @@ */ package org.apache.spark.sql.execution.python -import org.apache.spark.sql.GlutenSQLTestsTrait +import org.apache.spark.sql.{AnalysisException, GlutenSQLTestsTrait, IntegratedUDFTestUtils} +import org.apache.spark.sql.functions.{array, transform} -class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait {} +class GlutenPythonUDFSuite extends PythonUDFSuite with GlutenSQLTestsTrait { + + // Override: the original test uses this.getClass.getSimpleName in ExpectedContext pattern, + // which returns "GlutenPythonUDFSuite" but the actual callSite records "PythonUDFSuite". + testGluten("SPARK-48706: Negative test case for Python UDF in higher order functions") { + assume(IntegratedUDFTestUtils.shouldTestPythonUDFs) + checkError( + exception = intercept[AnalysisException] { + spark.range(1).select(transform(array("id"), x => pythonTestUDF(x))).collect() + }, + condition = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF", + parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""), + context = ExpectedContext("transform", s".*PythonUDFSuite.*") + ) + } +} diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala index 20cf3dabc7a2..534ce669bbab 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/python/GlutenPythonUDTFSuite.scala @@ -18,6 +18,4 @@ package org.apache.spark.sql.execution.python import org.apache.spark.sql.GlutenSQLTestsTrait -// TODO: 4.x extends PythonUDTFSuite, currently PythonUDTFSuite requires Python executable -// [python3] and pyspark to be available, which are not present in the 4.1 CI environment. -class GlutenPythonUDTFSuite extends GlutenSQLTestsTrait {} +class GlutenPythonUDTFSuite extends PythonUDTFSuite with GlutenSQLTestsTrait {}