5

I use Window functions with huge window with spark 2.4.4, eg.

Window
  .partitionBy("id")
  .orderBy("timestamp")

In my tests, I have about 70 distinct ID but I may have about 200 000 rows by IDs. WIth no further configuration, I must allocate a lot of memory to my executors to avoid this OOM:

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:128)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:161)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:128)
at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(ExternalAppendOnlyUnsafeRowArray.scala:115)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextPartition(WindowExec.scala:345)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:371)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.next(WindowExec.scala:303)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage15.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$1.hasNext(WholeStageCodegenExec.scala:631)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

Looking in the source code, I discovered this parameter, which is not documented at all:

spark.sql.windowExec.buffer.in.memory.threshold

Giving a big size to it (eg. 1.000.000), I don't need as much memory anymore. As I understand it, this is the number of rows that are buffered; I guess that increasing this parameter doesn't duplicate as much the rows in the executors memory but this is not really clear to me.

Can someone explain me exactly how windows are processed on the executor side? Why are data duplicated? How to avoid this duplication and make the process faster, with many rows in each window? What parameters can be used?

Thx.

Emiliano Martinez
  • 4,073
  • 2
  • 9
  • 19
Rolintocour
  • 2,934
  • 4
  • 32
  • 63

1 Answers1

1

I discovered this parameter, which is not documented at all:

It's an internal configuration property.

From reading the source code I managed to "collect" the following:

spark.sql.windowExec.buffer.in.memory.threshold (internal) Threshold for number of rows guaranteed to be held in memory by WindowExec physical operator.

Default: 4096

Use SQLConf.windowExecBufferInMemoryThreshold method to access the current value.

Speaking of the internal properties of WindowExec operator, there is another you may need for performance tuning:

spark.sql.windowExec.buffer.spill.threshold (internal) Threshold for number of rows buffered in a WindowExec physical operator.

Default: 4096

Use SQLConf.windowExecBufferSpillThreshold method to access the current value.

Alas, I can't explain the internals fully.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420