diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala index 2a86f31fd63..4621b54e1d4 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala @@ -28,19 +28,37 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile} import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods} import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs object HiveConnectorUtils extends Logging { + private val castCtor: DynConstructors.Ctor[Expression] = + DynConstructors.builder(classOf[Expression]) + .impl( + classOf[Cast], + classOf[Expression], + classOf[DataType], + classOf[Option[_]]) + .build[Expression]() + + // SPARK-40054, ensuring cross-version compatibility. + def castExpression( + child: Expression, + dataType: DataType, + timeZoneId: Option[String] = None): Expression = { + castCtor.newInstance(child, dataType, timeZoneId) + } + def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = Try { // SPARK-43186: 3.5.0 DynConstructors.builder() diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala index 27749158b96..73b32eb591b 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.spark.connector.hive +import java.net.URI import java.util import java.util.Locale @@ -26,8 +27,11 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal} +import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder @@ -35,7 +39,7 @@ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET} @@ -46,7 +50,7 @@ case class HiveTable( sparkSession: SparkSession, catalogTable: CatalogTable, hiveTableCatalog: HiveTableCatalog) - extends Table with SupportsRead with SupportsWrite with Logging { + extends Table with SupportsRead with SupportsWrite with SupportsPartitionManagement with Logging { lazy val dataSchema: StructType = catalogTable.dataSchema @@ -112,4 +116,92 @@ case class HiveTable( override def capabilities(): util.Set[TableCapability] = { util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC) } + + override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = { + val spec = toPartitionSpec(ident) + val location = Option(properties.get(HiveTableProperties.LOCATION)).map(new URI(_)) + val newPart = CatalogTablePartition( + spec, + catalogTable.storage.copy(locationUri = location), + properties.asScala.toMap) + hiveTableCatalog.externalCatalog.createPartitions( + catalogTable.database, + catalogTable.identifier.table, + Seq(newPart), + ignoreIfExists = false) + } + + override def dropPartition(ident: InternalRow): Boolean = { + try { + hiveTableCatalog.externalCatalog.dropPartitions( + catalogTable.database, + catalogTable.identifier.table, + Seq(toPartitionSpec(ident)), + ignoreIfNotExists = false, + purge = false, + retainData = false) + true + } catch { + case _: NoSuchPartitionException => false + } + } + + override def replacePartitionMetadata( + ident: InternalRow, + properties: util.Map[String, String]): Unit = { + throw new UnsupportedOperationException("Replace partition is not supported") + } + + override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = { + val spec = toPartitionSpec(ident) + val partition = hiveTableCatalog.externalCatalog.getPartition( + catalogTable.database, + catalogTable.identifier.table, + spec) + val metadata = new util.HashMap[String, String](partition.parameters.asJava) + partition.storage.locationUri.foreach { uri => + metadata.put(HiveTableProperties.LOCATION, uri.toString) + } + metadata + } + + override def listPartitionIdentifiers( + names: Array[String], + ident: InternalRow): Array[InternalRow] = { + val partialSpec = if (names.isEmpty) { + None + } else { + val fields = names.map(partitionSchema(_)) + val schema = StructType(fields) + Some(toPartitionSpec(ident, schema)) + } + hiveTableCatalog.externalCatalog.listPartitions( + catalogTable.database, + catalogTable.identifier.table, + partialSpec).map { part => + val values = partitionSchema.map { field => + val strValue = part.spec(field.name) + HiveConnectorUtils.castExpression(Literal(strValue), field.dataType).eval() + } + new GenericInternalRow(values.toArray) + }.toArray + } + + private def toPartitionSpec(ident: InternalRow, schema: StructType): Map[String, String] = { + require( + schema.size == ident.numFields, + s"Schema size (${schema.size}) does not match numFields (${ident.numFields})") + schema.zipWithIndex.map { case (field, index) => + val value = ident.get(index, field.dataType) + val fieldValue = HiveConnectorUtils.castExpression( + Literal(value, field.dataType), + StringType, + Some(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval().toString + field.name -> fieldValue + }.toMap + } + + private def toPartitionSpec(ident: InternalRow): Map[String, String] = { + toPartitionSpec(ident, partitionSchema) + } } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableProperties.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableProperties.scala new file mode 100644 index 00000000000..fa67b5a5f64 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableProperties.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive + +object HiveTableProperties { + val LOCATION = "location" +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/PartitionManagementSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/PartitionManagementSuite.scala new file mode 100644 index 00000000000..e775ccce2f4 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/PartitionManagementSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive.command + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog} +import org.apache.spark.unsafe.types.UTF8String + +import org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION, V2_COMMAND_VERSION} + +trait PartitionManagementSuite extends DDLCommandTestUtils { + override protected def command: String = "PARTITION MANAGEMENT" + + test("create partition") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)") + sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')") + checkAnswer( + sql(s"SHOW PARTITIONS $t"), + Row("year=2023/month=01") :: Nil) + intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')") + } + } + } + + test("drop partition") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)") + sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')") + sql(s"ALTER TABLE $t DROP PARTITION (year='2023', month='01')") + checkAnswer( + sql(s"SHOW PARTITIONS $t"), + Nil) + intercept[NoSuchPartitionsException] { + sql(s"ALTER TABLE $t DROP PARTITION (year='9999', month='99')") + } + } + } + + test("show partitions") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)") + sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')") + sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='02')") + checkAnswer( + sql(s"SHOW PARTITIONS $t"), + Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil) + + checkAnswer( + sql(s"SHOW PARTITIONS $t PARTITION (year='2023', month='01')"), + Row("year=2023/month=01") :: Nil) + + checkAnswer( + sql(s"SHOW PARTITIONS $t PARTITION (year='2023')"), + Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil) + } + } +} + +class PartitionManagementV2Suite extends PartitionManagementSuite { + override protected def catalogVersion: String = "Hive V2" + override protected def commandVersion: String = V2_COMMAND_VERSION + + test("create partition with location") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)") + val loc = "file:///tmp/kyuubi/hive_catalog_part_loc" + sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01') LOCATION '$loc'") + checkAnswer( + sql(s"SHOW PARTITIONS $t"), + Row("year=2023/month=01") :: Nil) + val catalog = spark.sessionState.catalogManager + .catalog(catalogName).asInstanceOf[TableCatalog] + val partManagement = catalog.loadTable(Identifier.of(Array("ns"), "tbl")) + .asInstanceOf[SupportsPartitionManagement] + val partIdent = InternalRow.fromSeq( + Seq(UTF8String.fromString("2023"), UTF8String.fromString("01"))) + val metadata = partManagement.loadPartitionMetadata(partIdent) + assert(metadata.containsKey("location")) + assert(metadata.get("location").contains("hive_catalog_part_loc")) + } + } +} + +class PartitionManagementV1Suite extends PartitionManagementSuite { + val SESSION_CATALOG_NAME: String = "spark_catalog" + override protected val catalogName: String = SESSION_CATALOG_NAME + override protected def catalogVersion: String = "V1" + override protected def commandVersion: String = V1_COMMAND_VERSION +}