From 383ce079971d70ca0270aad87fa9c4d79f773a8d Mon Sep 17 00:00:00 2001 From: ashahid Date: Wed, 15 Jan 2025 16:34:59 -0800 Subject: [PATCH] SPARK-50840. Fixing the timestamp parsing failure for Hive table metadata reload, when timestamp alias points to NTZ --- .../spark/sql/hive/HiveExternalCatalog.scala | 35 +++++++++++++++-- .../sql/hive/client/HiveClientImpl.scala | 39 +++++++++++++++++-- .../sql/hive/execution/HiveDDLSuite.scala | 31 ++++++++++++++- 3 files changed, 98 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index db28352635775..3434119e59ca8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -45,8 +45,8 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -122,7 +122,36 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * before returning it. */ private[hive] def getRawTable(db: String, table: String): CatalogTable = { - client.getTable(db, table) + + def convertNTZToLTZ(st: StructType): StructType = { + val rectifiedFields = st.map(sf => sf.dataType match { + case TimestampNTZType + if sf.metadata.contains(HiveClientImpl.HIVE_DATA_TYPE_KEY) && + HiveClientImpl.HIVE_DATA_TYPE_TIME_STAMP == sf.metadata.getString( + HiveClientImpl.HIVE_DATA_TYPE_KEY) => + val newMd = new MetadataBuilder() + .withMetadata(sf.metadata) + .remove(HiveClientImpl.HIVE_DATA_TYPE_KEY) + .build() + sf.copy(dataType = TimestampType, metadata = newMd) + + case structType: StructType => sf.copy(dataType = convertNTZToLTZ(structType)) + + case _ => sf + }) + StructType(rectifiedFields) + } + + val ct = client.getTable(db, table) + // since we are in HiveExternalCatalog, and Hive does not support TimeStampTypeNTZ, so even + // if this spark engine , converted the timestamp type coming from Hive Metastore to NTZ, we + // need to convert it back to TimeStampType + if (SQLConf.get.timestampType == TimestampNTZType) { + val rectifiedSchema = convertNTZToLTZ(ct.schema) + ct.copy(schema = StructType(rectifiedSchema)) + } else { + ct + } } private[hive] def getRawTablesByNames(db: String, tables: Seq[String]): Seq[CatalogTable] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 00407f0ecc178..d0fb26f0e09c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -55,7 +55,9 @@ import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, N import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} +import org.apache.spark.sql.catalyst.parser.{AstBuilder, CatalystSqlParser, ParseException} +import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ComplexColTypeContext} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -1090,6 +1092,9 @@ private[hive] class HiveClientImpl( } private[hive] object HiveClientImpl extends Logging { + val HIVE_DATA_TYPE_KEY = "hiveDataType" + val HIVE_DATA_TYPE_TIME_STAMP = "timestamp" + /** Converts the native StructField to Hive's FieldSchema. */ def toHiveColumn(c: StructField): FieldSchema = { // For Hive Serde, we still need to to restore the raw type for char and varchar type. @@ -1117,7 +1122,7 @@ private[hive] object HiveClientImpl extends Logging { // map> -> map> val typeStr = hc.getType.replaceAll("(?<=struct<|,)([^,<:]+)(?=:)", "`$1`") try { - CatalystSqlParser.parseDataType(typeStr) + HiveCatalystSqLParser.parseDataType(typeStr) } catch { case e: ParseException => throw QueryExecutionErrors.cannotRecognizeHiveTypeError(e, typeStr, hc.getName) @@ -1127,13 +1132,31 @@ private[hive] object HiveClientImpl extends Logging { /** Builds the native StructField from Hive's FieldSchema. */ def fromHiveColumn(hc: FieldSchema): StructField = { val columnType = getSparkSQLDataType(hc) + // since we loading metadata of an existing table, and if Hive's timestamp column type + // is Timestamp (which in existing Hive translates to Spark's timestamp_ltz), so even if + // spark's timestamp alias points to NTZ, we need to override it to spark's Timestamp type(which + // stands for timestamp_ltz). Once spark to hive mapping is actually rectified, this fix needs + // to be revisited + val metadata = getMetadataForNtz(columnType, hc.getType) val field = StructField( name = hc.getName, dataType = columnType, - nullable = true) + nullable = true, + metadata = metadata) Option(hc.getComment).map(field.withComment).getOrElse(field) } + def getMetadataForNtz( + colType: DataType, + hiveType: String, + existingMd: Metadata = Metadata.empty ): Metadata = + colType match { + case TimestampNTZType if hiveType == HiveClientImpl.HIVE_DATA_TYPE_TIME_STAMP => + new MetadataBuilder().withMetadata(existingMd).putString(HIVE_DATA_TYPE_KEY, hiveType).build() + + case _ => existingMd + } + private def verifyColumnDataType(schema: StructType): Unit = { schema.foreach(col => getSparkSQLDataType(toHiveColumn(col))) } @@ -1421,3 +1444,13 @@ private[hive] object HiveClientImpl extends Logging { } } } + +object HiveCatalystSqLParser extends CatalystSqlParser { + override val astBuilder = new AstBuilder { + override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) { + val sf = super.visitComplexColType(ctx) + val md = HiveClientImpl.getMetadataForNtz(sf.dataType, ctx.dataType().getText, sf.metadata) + sf.copy(metadata = md) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a58adbce7ec52..0639c24436030 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION +import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION, TimestampTypes} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -3383,4 +3383,33 @@ class HiveDDLSuite checkAnswer(sql("SELECT * FROM t1"), Row(0)) } } + + test("SPARK-50840: Hive table created with timestamp LTZ, should retain the same on reload") { + withTable("t1", "t2") { + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_LTZ.toString) { + val tblDef = + s""" + |CREATE TABLE t1 ( + | ts timestamp, + | nstd Struct + |) + |using parquet""".stripMargin + sql(tblDef) + } + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_NTZ.toString) { + def assertNoTimestampNTZ(structType: StructType): Unit = { + structType.foreach { + _.dataType match { + case TimestampNTZType => fail("TimestampNTZType not expected") + case st: StructType => assertNoTimestampNTZ(st) + case _ => + } + } + } + + sql("alter table t1 rename to t2") + assertNoTimestampNTZ(spark.table("t2").schema) + } + } + } }