0

I have a table in Hive as below.

spark.sql("select * from custom_log where custm_id = '9012457WXR'").show(100,False)
+----------+--------------------------+---------------------------------------------------------------------------------+----------+
|custm_id  |transf_date               |transfr                                                                          |pdt       |
+----------+--------------------------+---------------------------------------------------------------------------------+----------+
|9012457WXR|2020-12-14 20:24:08.213000|[{'oldstatus': ['new'], 'newValues': ['addition Done']}]                         |20201214  |
|9012457WXR|2020-12-14 20:24:09.175000|[{'oldstatus': ['addition Done'], 'newValues': ['update pref']}]                 |20201214  |
|9012457WXR|2020-12-14 20:24:09.241000|[{'oldstatus': ['update pref'], 'newValues': ['update personal pref']}]          |20201214  |
|9012457WXR|2020-12-14 20:24:09.241000|[{'oldstatus': ['update personal pref'], 'newValues': ['Acct settings']}]        |20201214  |
|9012457WXR|2020-12-14 23:06:23.197000|[{'oldstatus': ['Acct settings'], 'newValues': ['update pref']}]                 |20201214  |
|9012457WXR|2020-12-15 00:03:05.496000|[{'oldstatus': ['update pref'], 'newValues': ['Notificatios mod']}]              |20201215  |
|9012457WXR|2020-12-15 00:03:05.568000|[{'oldstatus': ['Notificatios mod'], 'newValues': ['shop']}]                     |20201215  |
|9012457WXR|2020-12-15 00:03:05.568000|[{'oldstatus': ['shop'], 'newValues': ['Fav']}]                                  |20201215  |
+----------+--------------------------+---------------------------------------------------------------------------------+----------+

Now I have a dataframe with same fields and I need to filter out rows if distinct custm_id, trans_date, transfr in the dataframe is present in the table custom_log. Below is the Dataframe

+----------+--------------------------+-------------------------------------------------------------------+----------+
|custm_id  |transf_date               |transfr                                                            |pdt       |
+----------+--------------------------+-------------------------------------------------------------------+----------+
|9012457WXR|2020-12-14 20:24:08.213000|[{'oldstatus': ['Acct settings'], 'newValues': ['update pref']}]   |20201216  |
|9012457WXR|2020-12-16 08:24:18.175000|[{'oldstatus': ['Fav'], 'newValues': ['Wishlist']}]                |20201216  |
|9012457WXR|2020-12-16 08:28:21.241000|[{'oldstatus': ['Wishlist'], 'newValues': ['Rm Wishlist']}]        |20201216  |
|9012457WXR|2020-12-16 11:13:46.241000|[{'oldstatus': ['Rm Wishlist'], 'newValues': ['Shop]}]             |20201216  |
|9012457WXR|2020-12-16 19:06:12.197000|[{'oldstatus': ['Shop'], 'newValues': ['Fav']}]                    |20201216  |
|9012457WXR|2020-12-16 20:03:18.496000|[{'oldstatus': ['Fav'], 'newValues': ['brk']}]                     |20201216  |
+----------+--------------------------+-------------------------------------------------------------------+----------+

In the above dataframe the 1st row is identical to the 5th row of table except pdt. So I need to compare the distinct custm_id, transf_date, transfr from the table vs dataframe and filter out that rows from dataframe.

How can this be achieved in Pyspark?

Shaido
  • 27,497
  • 23
  • 70
  • 73
bunnylorr
  • 201
  • 1
  • 10

1 Answers1

1

You can try an anti join which removes rows that satisfy the given condition:

import pyspark.sql.functions as F

result = df2.alias('df2').join(df1.alias('df1'),
    F.expr("""
        df2.custm_id = df1.custm_id and
        df2.transf_date = df1.transf_date and
        df2.transfr = df1.transfr and
        df2.pdt != df1.pdt
    """),
    'anti'
)
mck
  • 40,932
  • 13
  • 35
  • 50