-2

I have a dataframe like below

Id    linkedIn
1     [l1,l2]
2     [l5,l6,l3]
3     [l4,l5]
4     [l8,l10]
5     [l7,l9,l1]

If we see row 1 & 5 have l1 in common so those two should be merged as one row with Id=1. Similarly row 2 & 3 have l5 in common so those two should be merged as one row with Id=2 and row 4 should be unchanged as it has no duplicate from other row.

I want the output to be like below

Id    linkedIn
1     [l1,l2,l7,l9]
2     [l4,l5,l6,l3]
4     [l8,l10]

I am using spark 2.3

thebluephantom
  • 16,458
  • 8
  • 40
  • 83

2 Answers2

2

An alternative, although I like the above as well but have not tested, this solution taking performance into account, with my own data added:

import spark.implicits._
import org.apache.spark.sql.functions._

val distinctUDF = udf( (s: Seq[String]) => s.distinct ) // courtesy of LeoC

val df = Seq( (1, Array("l1", "l2", "l700")),
          (2, Array("l5", "l6", "l3")),
          (3, Array("l4", "l5")),
          (4, Array("l8", "l10")),
          (5, Array("l7", "l8", "l1", "l700")) ).toDF("k", "lv")

val df2 = df.withColumn("lv", explode($"lv")).repartition($"lv") // 200 partitions default

//collect_set() contains distinct elements and collect_list() contains all elements (except nulls
val df3 = df2.groupBy("lv").agg(collect_list("k").as("kv"))
val df4 = df3.filter(size($"kv") > 1).select("kv").distinct
val df5 = df4.withColumn("j", explode($"kv"))
val df6 = df5.join(df, (df5("j") === df("k"))) 
val df7 = df6.groupBy("kv").agg(collect_set("lv").as("lv"))

df7.withColumn("key", array_min($"kv")).withColumn("values", distinctUDF(flatten($"lv"))).select("key", "values").show(false) 
// You can order output as you wish and fusing of lazy eval code takes place

Results in (for this set of data):

+---+-----------------------+
|key|values                 |
+---+-----------------------+
|2  |[l4, l5, l6, l3]       |
|1  |[l1, l2, l700, l7, l8] |
|4  |[l8, l10, l7, l1, l700]|
+---+-----------------------+
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • The result of your sample data should be having only two rows. 2 -> [l4, l5, l6, l3] 1 -> [l8, l10, l7, l1, l700, l2 ] If you see your result the row 2 & 3 are having common value like l1, l7, l8, l700. – Haridesh Yadav May 22 '20 at 10:03
  • I reran with your input and got right answer as well. This is my input. On your question there should be no 4 output BTW. – thebluephantom May 22 '20 at 10:21
  • It is working fine because in my input only two rows are having commonality....... But as per your question "What is there are 3 rows with commonality ?", I realised that there are cases in my data set where that case is there..... In that case I am not getting right output as you can see result of your input. – Haridesh Yadav May 22 '20 at 11:08
  • @thebluephantom can you help and suggest how to handle this https://stackoverflow.com/questions/62036791/while-writing-to-hdfs-path-getting-error-java-io-ioexception-failed-to-rename – BdEngineer May 27 '20 at 06:42
1

For 2 row commonality you can try below code.

val df = Seq(
  (1,Seq("l1","l2")),
  (2,Seq("l5","l6","l3")),
  (3,Seq("l4","l5")),
  (4,Seq("l8","l10")),
  (5,Seq("l7","l9","l1")),
  (6,Seq("l20","l10","l1"))
).toDF("id","values")

val df2 = df.select('id,explode('values).as("value"))
val df3 = df2.join(df2,"value").toDF("value","id","id2")
val df4 = df3.groupBy('id).agg(hash(collect_set('id2)).as("hash"))
val df5 = df2.join(df4,"id").groupBy('hash).agg(collect_set('value).as("values"))
val df6 = df5.join(df4.groupBy('hash).agg(min('id).as("id")),"hash").select('id,'values).orderBy('id)
df6.show()

output:

+---+----------------+
| id|          values|
+---+----------------+
|  1|[l7, l9, l2, l1]|
|  2|[l4, l3, l6, l5]|
|  4|       [l8, l10]|
+---+----------------+
chlebek
  • 2,431
  • 1
  • 8
  • 20