24

I'd like to select a range of elements in a Spark RDD. For example, I have an RDD with a hundred elements, and I need to select elements from 60 to 80. How do I do that?

I see that RDD has a take(i: int) method, which returns the first i elements. But there is no corresponding method to take the last i elements, or i elements from the middle starting at a certain index.

Sean Owen
  • 66,182
  • 23
  • 141
  • 173
PlinyTheElder
  • 1,454
  • 1
  • 10
  • 15
  • 2
    I think the most natural way to do this as of 1.0.0 is to register your RDD as a table and use Spark SQL to add a `ROW_NUMBER()` or `RANK()` to your dataset and then `SELECT` the desired rows. For a small RDD this is overkill, but this approach should work efficiently for very large RDDs. – Nick Chammas Jul 10 '14 at 17:38
  • 1
    Here is [a discussion about this on the Spark User mailing list](http://apache-spark-user-list.1001560.n3.nabble.com/How-to-RDD-take-middle-10-elements-tt9340.html). – Nick Chammas Jul 10 '14 at 20:43
  • @NickChammas Checkout updated answer – aaronman Jul 11 '14 at 18:15

4 Answers4

12

I don't think there is an efficient method to do this yet. But the easy way is using filter(), lets say you have an RDD, pairs with key value pairs and you only want elements from 60 to 80 inclusive just do.

val 60to80 = pairs.filter {
    _ match {
        case (k,v) => k >= 60 && k <= 80
        case _ => false //incase of invalid input
    }
}

I think it's possible that this could be done more efficiently in the future, by using sortByKey and saving information about the range of values mapped to each partition. Keep in mind this approach would only save anything if you were planning to query the range multiple times because the sort is obviously expensive.

From looking at the spark source it would definitely be possible to do efficient range queries using RangePartitioner:

// An array of upper bounds for the first (partitions - 1) partitions
  private val rangeBounds: Array[K] = {

This is a private member of RangePartitioner with the knowledge of all the upper bounds of the partitions, it would be easy to only query the necessary partitions. It looks like this is something spark users may see in the future: SPARK-911

UPDATE: Way better answer, based on pull request I'm writing for SPARK-911. It will run efficiently if the RDD is sorted and you query it multiple times.

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
  if (range.contains(i))
    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
  else
    Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

If having the whole partition in memory is acceptable you could even do something like this.
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search is not a member BTW I just made an implicit class that has a binary search function, not shown here

Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
aaronman
  • 18,343
  • 7
  • 63
  • 78
8

How big is your data set? You might be able to do what you need with:

data.take(80).drop(59)

This seems inefficient, but for small to medium-sized data, should work.

Is it possible to solve this in another way? What's the case for picking exactly a certain range out of the middle of your data? Would takeSample serve you better?

DPM
  • 1,571
  • 11
  • 8
  • The RDD is potentially huge (tens to hundreds of gigabytes). `takeSample` won't work either, I do need a range (from...to). – PlinyTheElder Jul 11 '14 at 14:55
  • 1
    @user1698678 how is a range defined in such a huge set? Given that the RDD will be distributed among many nodes, you will need a way to index your data. `getRange (from,to)` will not work in a distributed model. – maasg Jul 11 '14 at 15:00
  • @maasg: are you telling me that range is only defined for small RDDs? Or that it is defined differently for small ones and large ones? The take() method seems to work for all sizes, so I don't see why takeLast() and takeMiddle() methods cannot exist. – PlinyTheElder Jul 12 '14 at 16:11
  • 1
    @user1698678 `take()` is an action and will force the RDD to be collected at the driver. It will indeed only work on small sets that fit in the driver's memory. `range(...)` is unstable if you don't have an indexing or ordering in place, so I guess you need some form of key to make your selection. – maasg Jul 12 '14 at 21:08
5

Following should be able to get the range. Note the cache will save you some overhead, because internally zipWithIndex need to scan the RDD partition to get the number of elements in each partition.

scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
scala>val r2 = r1.zipWithIndex
scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1)
scala>r3.foreach(println)
d
zhang zhan
  • 1,596
  • 13
  • 10
  • You need to add this code, and refactor your code: val r4 = r3.collect r4.foreach(println) – 3xCh1_23 Mar 23 '15 at 15:06
  • 1
    It's worth noting that zipWithIndex is not purely lazy - it does a lot of distributed processing even for the example in the documentation. – Marcin Jul 10 '15 at 18:21
1

For those who stumble on this question looking for Spark 2.x-compatible answer, you can use filterByRange

jrook
  • 3,459
  • 1
  • 16
  • 33