0

take(count) is an action on RDD, which returns an Array with first count items.

Is there a transformation that returns a RDD with first count items? (It is ok if count is approximate)

The best I can get is

val countPerPartition = count / rdd.getNumPartitions.toDouble
rdd.mapPartitions(_.take(countPerPartition))

Update: I do not want data to be transfered to the driver. In my case, count may be quite large, and driver has not enough memory to hold it. I want the data to remain paralellized for further transformations.

ov7a
  • 1,497
  • 4
  • 15
  • 34
  • 1
    You could use `sample` but that works off of a percentage of the rdd rather than a specific count. – puhlen Jun 21 '17 at 14:20
  • How expensive is `sample`? If it is expensive to get 1000th item, then it will be also expensive, right? – ov7a Jun 21 '17 at 14:28
  • I haven't used it before but it should be pretty cheap. It doesn't do very much work especially if you sample without replacement. – puhlen Jun 21 '17 at 14:42
  • I've looked the source code of `sample` and seems like it makes a `filter` on partition, so it is `O(N)` in the worst case. – ov7a Jun 21 '17 at 14:48
  • This filter example works more like what your code sample tries to do: https://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd Edit: Today, you'd zipWithIndex and Filter. – Rick Moritz Jun 21 '17 at 16:06
  • If I use `zipWithIndex`, this will lead to uneven partitions size: "The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index." – ov7a Jun 22 '17 at 07:36

1 Answers1

1

Why not to rdd.map(..).take(X). I.e. transform and then take. Don't be afraid to do redundant work, until you call take all the computations are lazy evaluated in spark(so only ~X transformations will happen)

Igor Berman
  • 1,522
  • 10
  • 16
  • What if I want to use .mapPartitions() after take? What if I want data to remain parallelized? – ov7a Jun 21 '17 at 14:25
  • take is the action(i.e. end of computation chain), it also brings data to the driver, so you'll get as result just list of objects. you can do following: val transformed = rdd.map(...) val myList = transformed.take(X) val withMapPartition = transformed.mapPartitions(...) – Igor Berman Jun 21 '17 at 15:09
  • The point is - I do not want to bring data to the driver. I've edited the post to stress that. – ov7a Jun 21 '17 at 15:28
  • so you can't use take. you probably want to use sample() or mapPartitions that will filter data to subset of each partition - in that case it will be sort of sample, you also might use mapPartitionWithIndex and filter by index(e.g. all those with index>X will be filtered) – Igor Berman Jun 21 '17 at 16:32
  • See discussion about `sample` in comments below the question. – ov7a Jun 22 '17 at 07:37