1

Comparing two dataframes df1 (recent data) and df2 (previous Data) which are derived from same table for different timestamps and extract data from df1 based on a column name (id) that are not available in df2

I used row number to extract the recent and previous data and store them in df1(recent data) and df2(previous data). I tried using left join, subtract but i am not sure if i am on the right track.

df1=

ID|Timestamp           |RowNum|
+----------+-------------------+
|1|2019-04-03 14:45:...|     1|
|2|2019-04-03 14:45:...|     1|
|3|2019-04-03 14:45:...|     1|

df2 = 
ID|Timestamp           |RowNum|
+----------+-------------------+
|2|2019-04-03 13:45:...|     2|
|3|2019-04-03 13:45:...|     2|


%%spark
result2 = df1.join(df2.select(['id']), ['id'], how='left')
result2.show(10)

but didn't give the desired output
Required Output:

ID|Timestamp           |RowNum|
+----------+-------------------+
|1|2019-04-03 14:45:...|     1|
Tamil Babu
  • 161
  • 1
  • 10

3 Answers3

2

You can use the left_anti join type to do exactly what you want:

result2 = df1.join(df2, ['id'], how='left_anti')

It's not very well explained in Spark documentation itself but you can find more info on this join type here for example.

rluta
  • 6,717
  • 1
  • 19
  • 21
1

There are two way to achieve this:

1 IS NOT IN - Create a list(df2_list ) from lookup dataframe and use the list within isin() == False

df1 = spark.createDataFrame([(1,"A") ,(2,"B",),(3,"C",),(4,"D")], ['id','item'])

df2 = spark.createDataFrame([(1,10) ,(2,20)], ['id','otherItem'])

df2_list = df2.select('id').rdd.map(lambda row : row[0]).collect()

from pyspark.sql.functions import col

df1.where(col('id').isin(df2_list) == False).show()

2 Left Anit Join - Place master table on Left side.

df1.join(df2,  df1.id==df2.id, 'left_anti').show()
Shantanu Sharma
  • 3,661
  • 1
  • 18
  • 39
0

Try this.

scala> val df1 = Seq(("1","2019-04-03 14:45:00","1"),("2","2019-04-03 14:45:00","1"),("3","2019-04-03 14:45:00","1")).toDF("ID","Timestamp","RowNum")
df1: org.apache.spark.sql.DataFrame = [ID: string, Timestamp: string ... 1 more field]

scala> df1.show
+---+-------------------+------+
| ID|          Timestamp|RowNum|
+---+-------------------+------+
|  1|2019-04-03 14:45:00|     1|
|  2|2019-04-03 14:45:00|     1|
|  3|2019-04-03 14:45:00|     1|
+---+-------------------+------+

scala> val df2 = Seq(("2","2019-04-03 13:45:00","2"),("3","2019-04-03 13:45:00","2")).toDF("ID","Timestamp","RowNum")
df2: org.apache.spark.sql.DataFrame = [ID: string, Timestamp: string ... 1 more field]

scala> df2.show
+---+-------------------+------+
| ID|          Timestamp|RowNum|
+---+-------------------+------+
|  2|2019-04-03 13:45:00|     2|
|  3|2019-04-03 13:45:00|     2|
+---+-------------------+------+

scala> val idDiff = df1.select("ID").except(df2.select("ID"))
idDiff: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ID: string]

scala> idDiff.show
+---+
| ID|
+---+
|  1|
+---+


scala> val outputDF = df1.join(idDiff, "ID")
outputDF: org.apache.spark.sql.DataFrame = [ID: string, Timestamp: string ... 1 more field]

scala> outputDF.show
+---+-------------------+------+
| ID|          Timestamp|RowNum|
+---+-------------------+------+
|  1|2019-04-03 14:45:00|     1|
+---+-------------------+------+
C.S.Reddy Gadipally
  • 1,718
  • 11
  • 22