2

I'm running a Spark batch job that processes a large (51G) xml file using the spark-xml datasource. I'm running locally on my machine.

I want to use all my cores (8) but I want to keep each task size small so that the total RAM used by all tasks does not grow more than what I have available.

So I've used repartition like this:

val df: DataFrame = spark.sqlContext.read
  .option("mode", "FAILFAST")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag", "row")
  .load(s"$pathToInputXML")
  .repartition(30)

So I thought that each task would handle about (51GB / 30 partitions / 8 cores) == 212MB per task.

The thing is, it looks like each task is only 33MB long, as you can see by the output:

17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:0+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:134217728+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:33554432+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:234881024+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:201326592+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:100663296+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:167772160+33554432
17/04/28 19:40:02 INFO NewHadoopRDD: Input split: file:/media/felipe/SAMSUNG/StackHeavy/Posts.xml:67108864+33554432

Could anyone explain this to me? Why doesn't the size of the file split change as I change the number of partitions?

zero323
  • 322,348
  • 103
  • 959
  • 935
Felipe
  • 11,557
  • 7
  • 56
  • 103

1 Answers1

2

Why doesn't the size of the file split change as I change the number of partitions?

Because repartitioning doesn't affect input splits. Your code shuffles the data after it has been loaded into Spark.

If you're want to modify number of input split you should adjust spark.sql.files.maxPartitionBytes parameter.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935