0

i have two rdd, one like :

39250E6B-DB60-496E-8770-225EDB29A85F,0,ar,2018-10-09 00:00:00.0,2018-10-09 00:00:04.0,United Arab Emirates
2b98d4f4-0c55-4906-82ec-cfc7c5380652,40967837,en,2018-10-09 00:00:01.0,2018-10-09 00:00:31.0,Qatar
5bb587bc-54a0-4873-b0ba-2da38458ba1c,0,en,2018-10-09 00:00:03.0,2018-10-09 00:04:02.0,United Arab Emirates
466B96B5-DC12-4A35-8865-3A8702037A23,0,ar,2018-10-09 00:00:04.0,2018-10-09 00:00:06.0,Saudi Arabia
be51265e-ab8b-462c-928b-8dd7fab608de,0,ar,2018-10-09 00:00:06.0,2018-10-09 00:02:23.0,Jordan
9abb8b2c-beca-43b3-ab15-3df34bbbe69c,0,ar,2018-10-09 00:00:06.0,2018-10-09 00:00:06.0,United Arab Emirates
0841630C-D869-48E5-BCD3-533F4BD8764C,0,ar,2018-10-09 00:00:09.0,2018-10-09 00:00:13.0,Turkey
bb2cbcad-1580-4aa0-b9bf-d4f8a99cc493,46130751,id,2018-10-09 00:00:09.0,2018-10-09 15:45:02.0,Indonesia
61EA3935-495B-4C5A-A9CD-4ACC66FD5A47,46130713,ar,2018-10-09 00:00:11.0,2018-10-09 00:00:21.0,Kuwait
A7CADD7A-36D7-4B21-ADED-A5CAA2DD73F7,0,ar,2018-10-09 00:00:12.0,2018-10-09 00:00:12.0,Saudi Arabia
7c3187ea-f0f7-4da4-96b3-d0b118cd12a4,0,ar,2018-10-08 00:00:03.0,2018-10-08 18:12:59.0,Saudi Arabia
a0b7e6ad-8ef0-4259-b791-2e0c921a078e,46085131,en,2018-10-08 00:00:05.0,2018-10-08 00:02:20.0,Saudi Arabia
5418ab39-2cf5-4b66-9b9b-bce2be6c17d2,0,en,2018-10-08 00:00:06.0,2018-10-08 00:00:06.0,United Arab Emirates
74314d88-0e5c-46b6-97b3-2cdbf1c23a3e,0,en,2018-10-08 00:00:10.0,2018-10-09 21:18:16.0,United Arab Emirates
2D5D50B5-9570-490F-8B1A-D36F73B34AE7,46085073,ar,2018-10-08 00:00:11.0,2018-10-08 02:17:47.0,Saudi Arabia
1c74c1a4-c921-44b4-9941-ba6c2995360b,0,en,2018-10-08 00:00:11.0,2018-10-08 00:03:34.0,Saudi Arabia
8443DD2B-8A11-4C59-A72C-F9CCA22A5D98,46085077,ar,2018-10-08 00:00:11.0,2018-10-08 00:00:23.0,Saudi Arabia
B49E4117-4BE6-46E0-A7C5-40E99CAD580B,0,ar,2018-10-08 00:00:12.0,2018-10-08 00:00:17.0,Saudi Arabia
3E144DB2-9BA9-4E2E-854A-59FF52AD91DC,10715907,ar,2018-10-08 00:00:14.0,2018-10-08 00:00:20.0,Saudi Arabia
d312445e-3a7d-4678-85f3-076353ffd13c,0,en,2018-10-08 00:00:15.0,2018-10-08 00:00:15.0,Kuwait

another rdd like :

39250E6B-DB60-496E-8770-225EDB29A85F
2b98d4f4-0c55-4906-82ec-cfc7c5380652

how can i select the first rdd not contain in the second one

Archer
  • 335
  • 3
  • 12

1 Answers1

1

One approach would be to convert the RDDs to DataFrames and apply left_anti join as shown below:

val rdd1 = sc.parallelize(Seq(
  ("39250E6B-DB60-496E-8770-225EDB29A85F",0,"ar","2018-10-09 00:00:00.0","2018-10-09 00:00:04.0","United Arab Emirates"),
  ("2b98d4f4-0c55-4906-82ec-cfc7c5380652",40967837,"en","2018-10-09 00:00:01.0","2018-10-09 00:00:31.0","Qatar"),
  ("5bb587bc-54a0-4873-b0ba-2da38458ba1c",0,"en","2018-10-09 00:00:03.0","2018-10-09 00:04:02.0","United Arab Emirates"),
  ("466B96B5-DC12-4A35-8865-3A8702037A23",0,"ar","2018-10-09 00:00:04.0","2018-10-09 00:00:06.0","Saudi Arabia")
))

val rdd2 = sc.parallelize(Seq(
  ("39250E6B-DB60-496E-8770-225EDB29A85F"),
  ("2b98d4f4-0c55-4906-82ec-cfc7c5380652")
))

val dfResult = rdd1.toDF.join(rdd2.toDF("_1"), Seq("_1"), "left_anti")

dfResult.show
// +--------------------+---+---+--------------------+--------------------+--------------------+
// |                  _1| _2| _3|                  _4|                  _5|                  _6|
// +--------------------+---+---+--------------------+--------------------+--------------------+
// |5bb587bc-54a0-487...|  0| en|2018-10-09 00:00:...|2018-10-09 00:04:...|United Arab Emirates|
// |466B96B5-DC12-4A3...|  0| ar|2018-10-09 00:00:...|2018-10-09 00:00:...|        Saudi Arabia|
// +--------------------+---+---+--------------------+--------------------+--------------------+

If you would like the result dataset in RDD, simply apply rdd to the joined DataFrame:

val rddResult = dfResult.rdd

Another approach would be to transform the RDDs to PairRDDs and apply leftOuterJoin to filter away any rows with common keys:

val rddKV1 = rdd1.map(r => (r._1, (r._2, r._3, r._4, r._5, r._6)))
val rddKV2 = rdd2.map(r => (r, 1))

val rddResult = rddKV1.leftOuterJoin(rddKV2).
  filter(r => r._2._2 == None).
  map{ case (k, (v, _)) => (k, v._1, v._2, v._3, v._4, v._5) }

[UPDATE]

Per @Archer's comment, subtractByKey appears to be the most straight forward solution when taking the PairRDD conversion approach:

val rddResult = rddKV1.subtractByKey(rddKV2)
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • is that able to do it not parse to dataframe? – Archer Oct 11 '18 at 05:44
  • @Archer, please see expanded answer. – Leo C Oct 11 '18 at 07:01
  • i found a way :val rddres=rdd1.filter(x=> !(rdd2 contains x(0))) – Archer Oct 12 '18 at 01:29
  • @Archer, as far as I know RDD does not have method `contains`. If your rdd2 is small enough you can first `collect` it as a Scala collection before applying `contains`. – Leo C Oct 12 '18 at 01:49
  • i'm sorry,it shoud be val rddres=rdd1.filter(x=> !(rdd2.collect().toList contains x(0))) – Archer Oct 12 '18 at 01:54
  • Please be aware that it works only if your rdd2 isn't large, as [collect](https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/dont_collect_large_rdds.html) will copy the entire dataset to the driver node. – Leo C Oct 12 '18 at 02:03
  • how about rddkv1.subtractByKey(rddv2) – Archer Oct 12 '18 at 02:44
  • Brilliant, `subtractByKey` does look like a great solution if you take the PairRDD conversion approach. I've updated the answer to include your suggestion. – Leo C Oct 12 '18 at 03:30