0

I have bellow two data frame with hash added as additional column to identify differences for same id from both data frame

df1=
name | department| state | id|hash
-----+-----------+-------+---+---
James|Sales      |NY     |101| c123
Maria|Finance    |CA     |102| d234
Jen  |Marketing  |NY     |103| df34



df2=
name | department| state | id|hash
-----+-----------+-------+---+----
James|  Sales1   |null   |101|4df2
Maria|  Finance  |       |102|5rfg
Jen  |           |NY2    |103|234

#identify unmatched row for same id from both data frame

df1_un_match_indf2=df1.join(df2,df1.hash==df2.hash,"leftanti")

df2_un_match_indf1=df2.join(df1,df2.hash==df1.hash,"leftanti")

#The above case list both data frame, since all hash for same id are different

Now i am trying to find difference of row value against the same id from 'df1_un_match_indf1,df2_un_match_indf1' data frame, so that it shows differences row by row

df3=df1_un_match_indf1
df4=df2_un_match_indf1
common_diff=df3.join(df4,df3.id==df4.id,"inner")
common_dff.show()

but result show difference like this

+--------+----------+-----+----+-----+-----------+-------+---+---+----+
|name    |department|state|id  |hash |name | department|state| id|hash
+--------+----------+-----+----+-----+-----+-----------+-----+---+-----+
|James   |Sales     |NY   |101 | c123|James|  Sales1   |null |101| 4df2 
|Maria   |Finance   |CA   |102 | d234|Maria|  Finance  |     |102| 5rfg
|Jen     |Marketing |NY   |103 | df34|Jen  |           |NY2  |103| 2f34

What i am expecting is

+-----------------------------------------------------------+-----+--------------+
|name            | department          | state      | id          | hash
['James','James']|['Sales','Sales']    |['NY',null] |['101','101']|['c123','4df2']
['Maria','Maria']|['Finance','Finance']|['CA','']   |['102','102']|['d234','5rfg']
['Jen','Jen']    |['Marketing','']     |['NY','NY2']|['102','103']|['df34','2f34']

I tried with different ways, but didn't find right solution to make this expected format

Can anyone give a solution or idea to this? Thanks

Adhi cloud
  • 39
  • 6
  • The issue is that Spark's dataframe is unordered which means at scale, the name's 0-index value and the department's 0-index value might not be from the same record. In small dataset, you can get lucky that this issue doesn't occur but you can never guarantee whatever solution can continue to work. The real solution is that you need to have something to order by. (instead of hash, you need some sort of date, some incrementing number for the hash, etc) – Emma Apr 28 '22 at 16:20

1 Answers1

0

What you want to use is likely collect_list or maybe 'collect_set'

This is really well described here:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

In your case you need to slightly change your join into a union to enable you to group the data.

df3=df1_un_match_indf1
df4=df2_un_match_indf1
common_diff = df3.union(df4)
(common_diff
  .groupby("id")
  .agg(F.collect_set("name"),
       F.collect_list("department"))
  .show())

If you can do a union just use an array:

from pyspark.sql.functions import array
common_diff.select(
  df.id,
  array( 
    common_diff.thisState, 
    common_diff.thatState
  ).alias("State"),
  array( 
    common_diff.thisDept, 
    common_diff.thatDept
  ).alias("Department")
)

It a lot more typing and a little more fragile. I suggest that renaming columns and using the groupby is likely cleaner and clearer.

Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • Hi @Matt, thanks for your response.. by the way above solution not work for this cases .. i already referred above example that seems work for column based and not row based. let me know how can apply above approach on data frame 'common_diff' – Adhi cloud Apr 28 '22 at 15:44
  • I am sorry for not giving you the full solution. Here I think I have now provided the full solution instead of just hints toward the solution. – Matt Andruff Apr 28 '22 at 16:03
  • ok here in the case, column schema may change, i am not sure union will work or not and these columns names are different for different data set that consumed by the logic – Adhi cloud Apr 28 '22 at 16:35