0

I am trying to compare 2 set of data. one is dataframe a set of static data and write as Avro format .Now this comparison reading back from Avro and checking which has a timestamp column and comparison is failing since Avro store data as Long and the sql type conversion giving a different value

**--CREATE THE DATAFRAME**
val data = Seq(Row("1",java.sql.Timestamp.valueOf("2019-03-15 18:20:06.456")))
val schemaOrig = List( StructField("rowkey",StringType,true)
,StructField("txn_ts",TimestampType,true))

val sourceDf =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schemaOrig))
sourceDf.write.avro("test")
sourceDf.printSchema
root
 |-- rowkey: string (nullable = true)
 |-- txn_ts: timestamp (nullable = true)
sourceDf.show(false)


+----------------+-----------------------+
|rowkey          |txn_ts                 |
+----------------+-----------------------+
|1               |2019-03-15 18:20:06.456|
+----------------+-----------------------+

--As shown above the avro file has the expected schema specified ie String and Timestamp
--Now Read the data back from Avro 
val avroDf=spark.read.avro("test")
avroDf.printSchema

root
 |-- rowkey: string (nullable = true)
 |-- txn_ts: long (nullable = true)

avroDf.show(false)
--Avro Df schema is printing the timestamp field as long and data showing epoch time 

+----------------+-------------+
|rowkey          |txn_ts       |
+----------------+-------------+
|1               |1552688406456|
+----------------+-------------+
compare the 2 Df 
sourceDf.except(avroDf).show(false)
--Gives error due to datatype mismatch 
org.apache.spark.sql.AnalysisException: Except can only be performed on tables with the compatible column types. bigint <> timestamp at the second column of the second table;;
'Except
:- AnalysisBarrier

CAST the avro data long field back to time
stamp 
val modifiedAvroDf=avroDf.withColumn("txn_ts", col("txn_ts").cast(TimestampType))
modifiedAvroDf.printSchema

 |-- rowkey: string (nullable = true)
  |-- txn_ts: timestamp (nullable = true)
      modifiedAvroDf.show(false)
--Showing wrong timestamp value 
+----------------+-----------------------+
|rowkey          |txn_ts                 |
+----------------+-----------------------+
|1               |51172-09-26 11:07:366.0|
+----------------+-----------------------+

--Now Try to cast the source column to long 
val sourceModDf=sourceDf.withColumn("txn_ts",col("txn_ts").cast(LongType))
sourceModDf.printSchema

 |-- rowkey: string (nullable = true)
 |-- txn_ts: long (nullable = true)
sourceModDf.show(false)
sourceModDf.except(modifiedAvroDf).show(false)
Ajith Kannan
  • 812
  • 1
  • 8
  • 30

1 Answers1

1

Created UDF to convert long to timestamp string. Please check below code.

scala> val df = Seq(1552688406456L).toDF
df: org.apache.spark.sql.DataFrame = [value: bigint]

scala> import org.joda.time.DateTime
import org.joda.time.DateTime

scala> import org.joda.time.DateTimeZone
import org.joda.time.DateTimeZone

scala> val datetime = udf((date: Long) => new DateTime(date, DateTimeZone.UTC).toString.replace("Z","").replace("T"," "))
datetime: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> df.select(datetime($"value").as("dt")).show(false)
+------------------------+
|dt                      |
+------------------------+
|2019-03-15 22:20:06.456 |
+------------------------+

scala> df.select(datetime($"value").as("dt").cast("timestamp")).show(false)
+-----------------------+
|dt                     |
+-----------------------+
|2019-03-15 22:20:06.456|
+-----------------------+


scala> df.select(datetime($"value").as("dt").cast("timestamp")).printSchema
root
 |-- dt: timestamp (nullable = true)

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Thanks a lot . how can make it generic ? like in my case different table hav diff set of columns and i am invoking it like val modifiedDf = filter._2.fields.foldLeft(sourceDF) { case (newDf, SchemaField(name, fieldType)) => newDf.withColumn(name, newDf.col(name) .cast(DataTypeUtil.mapDataType(fieldType))) – Ajith Kannan May 08 '20 at 05:40
  • datetime is udf, declare somewhere top level. use it whenever required like this - newDf.withColumn("ts",datetime($"ts")).withColumn(name, newDf.col(name) .cast(DataTypeUtil.mapDataType(fieldType)) – Srinivas May 08 '20 at 05:46
  • not really. when it try generic approach using udf the value is nt getting chnged. please see post https://stackoverflow.com/questions/61708010/scala-spark-dataframe-modify-column-with-udf-return-value – Ajith Kannan May 10 '20 at 06:28
  • Hi Srinivas there is issue with the Timezone as you see the return values is 2019-03-16T03:50:06.456+05:30 where as original timetamp was 2019-03-15 18:20:06.456. I guess the issue is due to Timezone .Is there a way to force the same timezone so that Long value stored should be read bk to the exact same timestamp – Ajith Kannan May 22 '20 at 10:16
  • updated answer to convert utc timezone, if you want you can change it to different timezone, Please accept or upvote if it helps to solve issue. – Srinivas May 22 '20 at 10:31
  • Thanks . sorry but the result is 2019-03-15T22:20:06.456Z where as original was 2019-03-15 18:20:06.456 . Is there way to exact back the reason is our reconcilation is having issue deu to this. thnks a lot. – Ajith Kannan May 22 '20 at 10:38
  • great that worked. one last i need to convert to java.sql.Timestamp since the datetype of the original column is Timestamp.can u pl add tht as well – Ajith Kannan May 22 '20 at 11:01
  • why do you want to convert that into java.sql.Timestamp, is it ok spark timestamp type ? – Srinivas May 22 '20 at 11:03
  • we hv used sql.Timestamp. do u suggest spark.Timestamp ?any reason . i m able to get o sql.Timestmap by doing a Timestamp.value() . pls give for spark timestamp – Ajith Kannan May 22 '20 at 11:05
  • great . just to know what does replace("Z","").replace("T"," ")) do ? convert UTC to EDT? – Ajith Kannan May 22 '20 at 11:08
  • no, those will replace extra values inside timestamp. – Srinivas May 22 '20 at 11:10