15

A good question for Spark experts.

I am processing data in a map operation (RDD). Within the mapper function, I need to lookup objects of class A to be used in processing of elements in an RDD.

Since this will be performed on executors AND creation of elements of type A (that will be looked up) happens to be an expensive operation, I want to pre-load and cache these objects on each executor. What is the best way of doing it?

  • One idea is to broadcast a lookup table, but class A is not serializable (no control over its implementation).

  • Another idea is to load them up in a singleton object. However, I want to control what gets loaded into that lookup table (e.g. possibly different data on different Spark jobs).

Ideally, I want to specify what will be loaded on executors once (including the case of Streaming, so that the lookup table stays in memory between batches), through a parameter that will be available on the driver during its start-up, before any data gets processed.

Is there a clean and elegant way of doing it or is it impossible to achieve?

DruckerBg
  • 223
  • 3
  • 8
  • why not have the lookup table also distributed across? So that you can join two sets of data using DataFrames? If data is always needed to be looked up, what's the need to suffer the cost of broadcasting it across everytime you need to run the calculation? – DevZer0 Nov 05 '16 at 13:41
  • 1
    @DevZer0 _A is not serializable_. –  Nov 05 '16 at 14:07

3 Answers3

6

This is exactly the targeted use case for broadcast. Broadcasted variables are transmitted once and use torrents to move efficiently to all executors, and stay in memory / local disk until you no longer need them.

Serialization often pops up as an issue when using others' interfaces. If you can enforce that the objects you consume are serializable, that's going to be the best solution. If this is impossible, your life gets a little more complicated. If you can't serialize the A objects, then you have to create them on the executors for each task. If they're stored in a file somewhere, this would look something like:

rdd.mapPartitions { it => 
  val lookupTable = loadLookupTable(path)
  it.map(elem => fn(lookupTable, elem))
}

Note that if you're using this model, then you have to load the lookup table once per task -- you can't benefit from the cross-task persistence of broadcast variables.

EDIT: Here's another model, which I believe lets you share the lookup table across tasks per JVM.

class BroadcastableLookupTable {
  @transient val lookupTable: LookupTable[A] = null

  def get: LookupTable[A] = {
    if (lookupTable == null)
      lookupTable = < load lookup table from disk>
    lookupTable
  }
}

This class can be broadcast (nothing substantial is transmitted) and the first time it's called per JVM, you'll load the lookup table and return it.

Tim
  • 3,675
  • 12
  • 25
  • Unfortunately the objects are not serializable, so we do need to go the second approach, as you described. However, we also have to be able to share lookup table across tasks. – DruckerBg Nov 05 '16 at 17:10
  • Why do you need to share across tasks? Are you updating the lookup table inside of the map operation? – Tim Nov 05 '16 at 17:46
  • 1
    Added a possible way to do this. – Tim Nov 05 '16 at 17:49
  • Thanks, this is what am looking for. I haven't tested it yet, but I think it will work. By any chance, would you be able to suggest a good way of initializing these maps in Spark Streaming, if I want to specify what to load at the beginning with a variable/parameter sent from the driver? – DruckerBg Nov 06 '16 at 18:27
  • Sorry, can't help you there -- I don't have any experience with Spark Streaming. – Tim Nov 06 '16 at 18:46
3

In case serialisation turns out to be impossible, how about storing the lookup objects in a database? It's not the easiest solution, granted, but should work just fine. I could recommend checking e.g. spark-redis, but I am sure there are better solution out there.

Lukasz Tracewski
  • 10,794
  • 3
  • 34
  • 53
  • Thanks, that is a nice solution. One problem is that these are actually some object in JVM. – DruckerBg Nov 05 '16 at 17:09
  • I updated the question to include this: "... creation of elements of type A (that will be looked up) happens to be an expensive operation..." – DruckerBg Nov 05 '16 at 17:09
  • How about storing the JVM objects as byte-array in, say, Redis? – Lukasz Tracewski Nov 05 '16 at 17:21
  • Thanks! That is possible maybe, I am not sure if serialization issues come up again. Also, it is best to avoid bringing in any other dependence, such as Redis. – DruckerBg Nov 06 '16 at 18:25
1

Since A is not serializable the easiest solution is to create yout own serializable type A1 with all data from A required for computation. Then use the new lookup table in broadcast.

Mariusz
  • 13,481
  • 3
  • 60
  • 64