8

I'm running a Spark job to aggregate data. I have a custom data structure called a Profile, which basically contains a mutable.HashMap[Zone, Double]. I want to merge all profiles that share a given key (a UUID), with the following code:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
  .aggregateByKey(new Profile(), 3200)(merge, merge).cache()

Curiously, Spark fails with the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 116318 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

The obvious solution is to increment "spark.driver.maxResultSize", but two things puzzle me.

  1. Too much of a coincidence that I get 1024.0 greater than 1024.0
  2. All the documentation and help I found googling this particular error and configuration parameter indicates that it affect functions that take a value back to the driver. (say take() or collect()), but I'm not taking ANYTHING to the driver, just reading from HDFS, aggregating, saving back to HDFS.

Does anyone know why I'm getting this error?

Daniel Langdon
  • 5,899
  • 4
  • 28
  • 48
  • 1
    I'll upvote it, but sadly I no longer have access to that code (or company), nor does your answer solve piece #2, which is that the operation should not be happening on the first place :-S – Daniel Langdon Dec 20 '16 at 17:55

1 Answers1

1

Yes, It's failing because The values we see in exception message are rounded off by one precision and comparison happening in bytes.

That serialized output must be more than 1024.0 MB and less than 1024.1 MB.

Check added Apache Spark code snippet, It's very interesting and very rare to get this error. :)

Here totalResultSize > maxResultSize both are Long types and in holds the value in bytes. But msg holds rounded value from Utils.bytesToString().

//TaskSetManager.scala
  def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
    totalResultSize += size
    calculatedTasks += 1
    if (maxResultSize > 0 && totalResultSize > maxResultSize) {
      val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
        s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
        s"(${Utils.bytesToString(maxResultSize)})"
      logError(msg)
      abort(msg)
      false
    } else {
      true
    }
  }

Apache Spark 1.3 - source


//Utils.scala
  def bytesToString(size: Long): String = {
    val TB = 1L << 40
    val GB = 1L << 30
    val MB = 1L << 20
    val KB = 1L << 10

    val (value, unit) = {
      if (size >= 2*TB) {
        (size.asInstanceOf[Double] / TB, "TB")
      } else if (size >= 2*GB) {
        (size.asInstanceOf[Double] / GB, "GB")
      } else if (size >= 2*MB) {
        (size.asInstanceOf[Double] / MB, "MB")
      } else if (size >= 2*KB) {
        (size.asInstanceOf[Double] / KB, "KB")
      } else {
        (size.asInstanceOf[Double], "B")
      }
    }
    "%.1f %s".formatLocal(Locale.US, value, unit)
  }

Apache Spark 1.3 - source

Community
  • 1
  • 1
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125