-1

I am trying to concatenate same column values from two data frame to single data frame

For eg:

df1=
name | department| state | id|hash
-----+-----------+-------+---+---
James|Sales      |NY     |101| c123
Maria|Finance    |CA     |102| d234
Jen  |Marketing  |NY     |103| df34



df2=
name | department| state | id|hash
-----+-----------+-------+---+----
James|  Sales1   |null   |101|4df2
Maria|  Finance  |       |102|5rfg
Jen  |           |NY2    |103|234

Since both having same column names, i renamed columns of df1

new_col=[c+ '_r' for c in df1.columns]
df1=df1.toDF(*new_col)
joined_df=df1.join(df2,df3._rid==df2.id,"inner")
+--------+------------+-----+----+-----+-----------+-------+---+---+----+
|name_r  |department_r|state_r|id_r|hash_r |name | department|state| id|hash
+--------+------------+-------+----+-------+-----+-----------+-----+---+----
|James   |Sales       |NY     |101 | c123  |James|  Sales1   |null |101| 4df2 
|Maria   |Finance     |CA     |102 | d234  |Maria|  Finance  |     |102| 5rfg
|Jen     |Marketing   |NY     |103 | df34  |Jen  |           |NY2  |103| 2f34


so now i am trying to concatenate values of same columns and create a single data frame

combined_df=spark.createDataFrame([],StuctType[])
for col1 in df1.columns:
    for col2 in df2.columns:
       if col1[:-2]==col2:
             joindf=joindf.select(concate(list('[')(col(col1),lit(","),col(col2),lit(']')).alias("arraycol"+col2))
             col_to_select="arraycol"+col2
             filtered_df=joindf.select(col_to_select)
             renamed_df=filtered_df.withColumnRenamed(col_to_select,col2)
             renamed_df.show()
             if combined_df.count() < 0:
                combined_df=renamed_df
             else:
                combined_df=combined_df.rdd.zip(renamed_df.rdd).map(lambda x: x[0]+x[1])

   
new_combined_df=spark.createDataFrame(combined_df,df2.schema)
new_combined_df.show()
         

but it return error says:

an error occurred while calling z:org.apache.spark.api.python.PythonRdd.runJob. can only zip RDD with same number of elements in each partition

i see in the loop -renamed_df.show()-it producing expected column with values

eg:
renamed_df.show()
+----------------+
|name            |
['James','James']|
['Maria','Maria']|
['Jen','Jen']    |

but i am expecting to create a combined df as seen below

+-----------------------------------------------------------+-----+--------------+
|name            | department          | state      | id          | hash
['James','James']|['Sales','Sales']    |['NY',null] |['101','101']|['c123','4df2']
['Maria','Maria']|['Finance','Finance']|['CA','']   |['102','102']|['d234','5rfg']
['Jen','Jen']    |['Marketing','']     |['NY','NY2']|['102','103']|['df34','2f34']

Any solution to this?

Adhi cloud
  • 39
  • 6
  • Does this answer your question? [pyspark collect\_set or collect\_list with groupby](https://stackoverflow.com/questions/37580782/pyspark-collect-set-or-collect-list-with-groupby) – Matt Andruff May 16 '22 at 12:13

1 Answers1

0

You actually want to use collect_list to do this. Gather all the data in one data frame, group it to enable us to use collect_list..

union_all = df1.unionByName(df2, allowMissingColumns=True)
myArray = []
for myCol in union_all.columns:
  myArray +=  [f.collect_list(myCol)]

union_all.withColumn( "temp_name", col("id"))\  # to use for grouping.
  .groupBy("temp_name")\
  .agg( *myArray )\
  .drop("temp_name") # cleanup of extra column used for grouping.

If you only want unique values you can use collect_set instead.

Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • Hi @Matt.. thanks for your response.. however i need dynamically pass the column names from data frame.. because we may get different dataset having same column in both data frame and in that case we cannot hard code the column value – Adhi cloud May 17 '22 at 09:16
  • I'm not sure if it's a good idea, but I updated my statement to dynamically adust to the column names. – Matt Andruff May 17 '22 at 12:27
  • Hi @Matt.. the above code not returning the expected output as mentioned in the question. And also i need to include null and missed value in the result.. could you please check the code...Thanks – Adhi cloud May 18 '22 at 19:57