Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1157,8 +1157,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("detect escaped path and report the migration guide")
.exclude("ignore the escaped path check when the flag is off")
.excludeByPrefix("SPARK-51187")
// Rewrite for the query plan check
.excludeByPrefix("SPARK-49905")
enableSuite[GlutenQueryExecutionSuite]
// Rewritten to set root logger level to INFO so that logs can be parsed
.exclude("Logging plan changes for execution")
Expand All @@ -1171,45 +1169,49 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenSQLMetricsSuite]
enableSuite[GlutenAcceptsLatestSeenOffsetSuite]
enableSuite[GlutenCommitLogSuite]
// TODO: 4.x enableSuite[GlutenEventTimeWatermarkSuite]
enableSuite[GlutenEventTimeWatermarkSuite]
enableSuite[GlutenFileStreamSinkV1Suite]
// TODO: 4.x enableSuite[GlutenFileStreamSinkV2Suite] // 1 failure
enableSuite[GlutenFileStreamSinkV2Suite]
enableSuite[GlutenFileStreamSourceStressTestSuite]
// TODO: 4.x enableSuite[GlutenFileStreamSourceSuite]
enableSuite[GlutenFileStreamSourceSuite]
enableSuite[GlutenFileStreamStressSuite]
// TODO: 4.x enableSuite[GlutenFlatMapGroupsInPandasWithStateDistributionSuite] // failures with GlutenPlugin
enableSuite[GlutenFlatMapGroupsInPandasWithStateDistributionSuite]
enableSuite[GlutenFlatMapGroupsInPandasWithStateSuite]
// TODO: 4.x enableSuite[GlutenFlatMapGroupsWithStateDistributionSuite]
// TODO: 4.x enableSuite[GlutenFlatMapGroupsWithStateSuite]
enableSuite[GlutenFlatMapGroupsWithStateDistributionSuite]
enableSuite[GlutenFlatMapGroupsWithStateSuite]
enableSuite[GlutenFlatMapGroupsWithStateWithInitialStateSuite]
enableSuite[GlutenGroupStateSuite]
enableSuite[GlutenLongOffsetSuite]
enableSuite[GlutenMemorySourceStressSuite]
// TODO: 4.x enableSuite[GlutenMultiStatefulOperatorsSuite] // 2 failures
enableSuite[GlutenMultiStatefulOperatorsSuite]
enableSuite[GlutenReportSinkMetricsSuite]
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite]
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreStreamingAggregationSuite]
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreStreamingDeduplicationSuite]
// TODO: 4.x enableSuite[GlutenStreamSuite]
// TODO: 4.x enableSuite[GlutenStreamingAggregationDistributionSuite]
// TODO: 4.x enableSuite[GlutenStreamingAggregationSuite]
// TODO: 4.x enableSuite[GlutenStreamingDeduplicationDistributionSuite]
// TODO: 4.x enableSuite[GlutenStreamingDeduplicationSuite]
enableSuite[GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite]
enableSuite[GlutenRocksDBStateStoreStreamingAggregationSuite]
// Spark 4.x: these cases can hang waiting for expected failure with stateSchemaCheck off.
.excludeByPrefix("changing schema of state when restarting query - schema check off")
enableSuite[GlutenRocksDBStateStoreStreamingDeduplicationSuite]
enableSuite[GlutenStreamSuite]
enableSuite[GlutenStreamingAggregationDistributionSuite]
enableSuite[GlutenStreamingAggregationSuite]
// Spark 4.x: these cases can hang waiting for expected failure with stateSchemaCheck off.
.excludeByPrefix("changing schema of state when restarting query - schema check off")
enableSuite[GlutenStreamingDeduplicationDistributionSuite]
enableSuite[GlutenStreamingDeduplicationSuite]
enableSuite[GlutenStreamingDeduplicationWithinWatermarkSuite]
enableSuite[GlutenStreamingFullOuterJoinSuite]
// TODO: 4.x enableSuite[GlutenStreamingInnerJoinSuite]
enableSuite[GlutenStreamingInnerJoinSuite]
enableSuite[GlutenStreamingLeftSemiJoinSuite]
// TODO: 4.x enableSuite[GlutenStreamingOuterJoinSuite]
// TODO: 4.x enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
enableSuite[GlutenStreamingOuterJoinSuite]
enableSuite[GlutenStreamingQueryHashPartitionVerifySuite]
enableSuite[GlutenStreamingQueryListenerSuite]
enableSuite[GlutenStreamingQueryListenersConfSuite]
enableSuite[GlutenStreamingQueryManagerSuite]
enableSuite[GlutenStreamingQueryOptimizationCorrectnessSuite]
enableSuite[GlutenStreamingQueryStatusAndProgressSuite]
enableSuite[GlutenStreamingSelfUnionSuite]
// TODO: 4.x enableSuite[GlutenStreamingSessionWindowDistributionSuite]
enableSuite[GlutenStreamingSessionWindowDistributionSuite]
enableSuite[GlutenStreamingSessionWindowSuite]
// TODO: 4.x enableSuite[GlutenStreamingStateStoreFormatCompatibilitySuite]
enableSuite[GlutenStreamingStateStoreFormatCompatibilitySuite]
enableSuite[GlutenStreamingSymmetricHashJoinHelperSuite]
enableSuite[GlutenTransformWithListStateSuite]
enableSuite[GlutenTransformWithListStateTTLSuite]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.SparkConf

import java.net.JarURLConnection
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.jar.JarFile

private[sql] object GlutenStructuredStreamingResourceBootstrap {
private val StructuredStreamingRoot = "structured-streaming/"
private val ResourceProbe = s"${StructuredStreamingRoot}partition-tests/randomSchemas"

@volatile private var initialized = false

def ensureResourcesOnFilesystem(): Unit = synchronized {
if (initialized) {
return
}

val maybeProbe = Option(getClass.getClassLoader.getResource(ResourceProbe))
maybeProbe.foreach {
probeUrl =>
if (probeUrl.getProtocol == "jar") {
copyStructuredStreamingResourcesFromJar(probeUrl)
}
}

initialized = true
}

private def copyStructuredStreamingResourcesFromJar(resourceUrl: java.net.URL): Unit = {
val maybeTestClassesRoot = Option(getClass.getResource("/"))
.filter(_.getProtocol == "file")
.map(url => Paths.get(url.toURI))
if (maybeTestClassesRoot.isEmpty) {
return
}

val testClassesRoot = maybeTestClassesRoot.get
if (Files.exists(testClassesRoot.resolve(ResourceProbe))) {
return
}

val connection = resourceUrl.openConnection().asInstanceOf[JarURLConnection]
val jarPath = Paths.get(connection.getJarFileURL.toURI)
val jarFile = new JarFile(jarPath.toFile)
try {
val entries = jarFile.entries()
while (entries.hasMoreElements) {
val entry = entries.nextElement()
val entryName = entry.getName
if (!entry.isDirectory && entryName.startsWith(StructuredStreamingRoot)) {
val targetPath = testClassesRoot.resolve(entryName)
Option(targetPath.getParent).foreach(Files.createDirectories(_))
val in = jarFile.getInputStream(entry)
try {
Files.copy(in, targetPath, StandardCopyOption.REPLACE_EXISTING)
} finally {
in.close()
}
}
}
} finally {
jarFile.close()
}
}
}

private[sql] object GlutenStreamingTestConf {
def withFallbackToVanilla(conf: SparkConf): SparkConf = {
GlutenStructuredStreamingResourceBootstrap.ensureResourcesOnFilesystem()
conf
.set("spark.driver.host", "127.0.0.1")
.set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.shuffle.manager", "sort")
.set("spark.gluten.sql.columnar.batchscan", "false")
.set("spark.gluten.sql.columnar.filescan", "false")
.set("spark.gluten.sql.columnar.project", "false")
.set("spark.gluten.sql.columnar.filter", "false")
.set("spark.gluten.sql.columnar.sort", "false")
.set("spark.gluten.sql.columnar.window", "false")
.set("spark.gluten.sql.columnar.union", "false")
.set("spark.gluten.sql.columnar.expand", "false")
.set("spark.gluten.sql.columnar.generate", "false")
.set("spark.gluten.sql.columnar.coalesce", "false")
.set("spark.gluten.sql.columnar.range", "false")
.set("spark.gluten.sql.columnar.shuffle", "false")
.set("spark.gluten.sql.columnar.hashagg", "false")
.set("spark.gluten.sql.columnar.shuffledHashJoin", "false")
.set("spark.gluten.sql.columnar.sortMergeJoin", "false")
.set("spark.gluten.sql.columnar.broadcastExchange", "false")
.set("spark.gluten.sql.columnar.broadcastJoin", "false")
.set("spark.gluten.sql.columnar.appendData", "false")
.set("spark.gluten.sql.columnar.writeToDataSourceV2", "false")
.set("spark.gluten.sql.native.writer.enabled", "false")
.set("spark.gluten.sql.columnar.query.fallback.threshold", "0")
.set("spark.gluten.sql.columnar.wholeStage.fallback.threshold", "0")
.set("spark.gluten.sql.columnar.fallback.expressions.threshold", "0")
.set("spark.gluten.sql.columnar.fallback.preferColumnar", "false")
.set("spark.gluten.expression.blacklist", "collect_list,collect_set")
}
}

trait GlutenStreamingSQLTestsTrait extends GlutenSQLTestsTrait {
private val structuredStreamingResourcesInitialized: Unit =
GlutenStructuredStreamingResourceBootstrap.ensureResourcesOnFilesystem()

override def sparkConf: SparkConf = {
GlutenStreamingTestConf.withFallbackToVanilla(super.sparkConf)
}
}

trait GlutenStreamingVanillaFallbackTestsTrait extends GlutenStreamingSQLTestsTrait {
override def sparkConf: SparkConf = {
super.sparkConf
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenAcceptsLatestSeenOffsetSuite
extends AcceptsLatestSeenOffsetSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenEventTimeWatermarkSuite extends EventTimeWatermarkSuite with GlutenSQLTestsTrait {}
class GlutenEventTimeWatermarkSuite
extends EventTimeWatermarkSuite
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFileStreamSinkV1Suite extends FileStreamSinkV1Suite with GlutenSQLTestsTrait {}
class GlutenFileStreamSinkV1Suite extends FileStreamSinkV1Suite with GlutenStreamingSQLTestsTrait {}

class GlutenFileStreamSinkV2Suite extends FileStreamSinkV2Suite with GlutenSQLTestsTrait {}
class GlutenFileStreamSinkV2Suite extends FileStreamSinkV2Suite with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFileStreamSourceSuite extends FileStreamSourceSuite with GlutenSQLTestsTrait {}
class GlutenFileStreamSourceSuite extends FileStreamSourceSuite with GlutenStreamingSQLTestsTrait {}

class GlutenFileStreamSourceStressTestSuite
extends FileStreamSourceStressTestSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFileStreamStressSuite extends FileStreamStressSuite with GlutenSQLTestsTrait {}
class GlutenFileStreamStressSuite extends FileStreamStressSuite with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFlatMapGroupsInPandasWithStateDistributionSuite
extends FlatMapGroupsInPandasWithStateDistributionSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFlatMapGroupsInPandasWithStateSuite
extends FlatMapGroupsInPandasWithStateSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingVanillaFallbackTestsTrait

class GlutenFlatMapGroupsWithStateDistributionSuite
extends FlatMapGroupsWithStateDistributionSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingVanillaFallbackTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFlatMapGroupsWithStateSuite
extends FlatMapGroupsWithStateSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}

class GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite
extends RocksDBStateStoreFlatMapGroupsWithStateSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenFlatMapGroupsWithStateWithInitialStateSuite
extends FlatMapGroupsWithStateWithInitialStateSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenMemorySourceStressSuite extends MemorySourceStressSuite with GlutenSQLTestsTrait {}
class GlutenMemorySourceStressSuite
extends MemorySourceStressSuite
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenMultiStatefulOperatorsSuite
extends MultiStatefulOperatorsSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenReportSinkMetricsSuite extends ReportSinkMetricsSuite with GlutenSQLTestsTrait {}
class GlutenReportSinkMetricsSuite
extends ReportSinkMetricsSuite
with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingSQLTestsTrait

class GlutenStreamSuite extends StreamSuite with GlutenSQLTestsTrait {}
class GlutenStreamSuite extends StreamSuite with GlutenStreamingSQLTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingVanillaFallbackTestsTrait

class GlutenStreamingAggregationDistributionSuite
extends StreamingAggregationDistributionSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingVanillaFallbackTestsTrait {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.spark.sql.streaming

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.GlutenStreamingVanillaFallbackTestsTrait

class GlutenStreamingAggregationSuite extends StreamingAggregationSuite with GlutenSQLTestsTrait {}
class GlutenStreamingAggregationSuite
extends StreamingAggregationSuite
with GlutenStreamingVanillaFallbackTestsTrait {}

class GlutenRocksDBStateStoreStreamingAggregationSuite
extends RocksDBStateStoreStreamingAggregationSuite
with GlutenSQLTestsTrait {}
with GlutenStreamingVanillaFallbackTestsTrait {}
Loading
Loading