Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50840][SQL] Fixing the timestamp parsing failure for Hive table metadata reload, when timestamp alias points to NTZ #49521

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, QuotingUtils}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -122,7 +122,36 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* before returning it.
*/
private[hive] def getRawTable(db: String, table: String): CatalogTable = {
client.getTable(db, table)

def convertNTZToLTZ(st: StructType): StructType = {
val rectifiedFields = st.map(sf => sf.dataType match {
case TimestampNTZType
if sf.metadata.contains(HiveClientImpl.HIVE_DATA_TYPE_KEY) &&
HiveClientImpl.HIVE_DATA_TYPE_TIME_STAMP == sf.metadata.getString(
HiveClientImpl.HIVE_DATA_TYPE_KEY) =>
val newMd = new MetadataBuilder()
.withMetadata(sf.metadata)
.remove(HiveClientImpl.HIVE_DATA_TYPE_KEY)
.build()
sf.copy(dataType = TimestampType, metadata = newMd)

case structType: StructType => sf.copy(dataType = convertNTZToLTZ(structType))

case _ => sf
})
StructType(rectifiedFields)
}

val ct = client.getTable(db, table)
// since we are in HiveExternalCatalog, and Hive does not support TimeStampTypeNTZ, so even
// if this spark engine , converted the timestamp type coming from Hive Metastore to NTZ, we
// need to convert it back to TimeStampType
if (SQLConf.get.timestampType == TimestampNTZType) {
val rectifiedSchema = convertNTZToLTZ(ct.schema)
ct.copy(schema = StructType(rectifiedSchema))
} else {
ct
}
}

private[hive] def getRawTablesByNames(db: String, tables: Seq[String]): Seq[CatalogTable] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, N
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.parser.{AstBuilder, CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ComplexColTypeContext}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -1090,6 +1092,9 @@ private[hive] class HiveClientImpl(
}

private[hive] object HiveClientImpl extends Logging {
val HIVE_DATA_TYPE_KEY = "hiveDataType"
val HIVE_DATA_TYPE_TIME_STAMP = "timestamp"

/** Converts the native StructField to Hive's FieldSchema. */
def toHiveColumn(c: StructField): FieldSchema = {
// For Hive Serde, we still need to to restore the raw type for char and varchar type.
Expand Down Expand Up @@ -1117,7 +1122,7 @@ private[hive] object HiveClientImpl extends Logging {
// map<string,struct<x:int,y.z:int>> -> map<string,struct<`x`:int,`y.z`:int>>
val typeStr = hc.getType.replaceAll("(?<=struct<|,)([^,<:]+)(?=:)", "`$1`")
try {
CatalystSqlParser.parseDataType(typeStr)
HiveCatalystSqLParser.parseDataType(typeStr)
} catch {
case e: ParseException =>
throw QueryExecutionErrors.cannotRecognizeHiveTypeError(e, typeStr, hc.getName)
Expand All @@ -1127,13 +1132,35 @@ private[hive] object HiveClientImpl extends Logging {
/** Builds the native StructField from Hive's FieldSchema. */
def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = getSparkSQLDataType(hc)
// since we loading metadata of an existing table, and if Hive's timestamp column type
// is Timestamp (which in existing Hive translates to Spark's timestamp_ltz), so even if
// spark's timestamp alias points to NTZ, we need to override it to spark's Timestamp type(which
// stands for timestamp_ltz). Once spark to hive mapping is actually rectified, this fix needs
// to be revisited
val metadata = getMetadataForNtz(columnType, hc.getType)
val field = StructField(
name = hc.getName,
dataType = columnType,
nullable = true)
nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}

def getMetadataForNtz(
colType: DataType,
hiveType: String,
existingMd: Metadata = Metadata.empty ): Metadata =
colType match {
case TimestampNTZType if hiveType == HiveClientImpl.HIVE_DATA_TYPE_TIME_STAMP =>
new MetadataBuilder().withMetadata(existingMd).putString(HIVE_DATA_TYPE_KEY, hiveType).build()

case _ => existingMd
}

private def verifyColumnDataType(schema: StructType): Unit = {
schema.foreach(col => getSparkSQLDataType(toHiveColumn(col)))
}

private def toInputFormat(name: String) =
Utils.classForName[org.apache.hadoop.mapred.InputFormat[_, _]](name)

Expand Down Expand Up @@ -1417,3 +1444,13 @@ private[hive] object HiveClientImpl extends Logging {
}
}
}

object HiveCatalystSqLParser extends CatalystSqlParser {
override val astBuilder = new AstBuilder {
override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) {
val sf = super.visitComplexColType(ctx)
val md = HiveClientImpl.getMetadataForNtz(sf.dataType, ctx.dataType().getText, sf.metadata)
sf.copy(metadata = md)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
import org.apache.spark.sql.internal.SQLConf.{ORC_IMPLEMENTATION, TimestampTypes}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -3362,4 +3362,33 @@ class HiveDDLSuite
checkAnswer(sql("SELECT * FROM t1"), Row(0))
}
}

test("SPARK-50840: Hive table created with timestamp LTZ, should retain the same on reload") {
withTable("t1", "t2") {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_LTZ.toString) {
val tblDef =
s"""
|CREATE TABLE t1 (
| ts timestamp,
| nstd Struct<name: String, ts1 timestamp>
|)
|using parquet""".stripMargin
sql(tblDef)
}
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> TimestampTypes.TIMESTAMP_NTZ.toString) {
def assertNoTimestampNTZ(structType: StructType): Unit = {
structType.foreach {
_.dataType match {
case TimestampNTZType => fail("TimestampNTZType not expected")
case st: StructType => assertNoTimestampNTZ(st)
case _ =>
}
}
}

sql("alter table t1 rename to t2")
assertNoTimestampNTZ(spark.table("t2").schema)
}
}
}
}