1

I have large table saved as a parquet and when I try to load it I get an crazy amount of GC-Time like 80%. I use spark 2.4.3 The parquet is saved with the following schema:

/parentfolder/part_0001/parquet.file
/parentfolder/part_0002/parquet.file
/parentfolder/part_0003/parquet.file
                [...]
2432 in total

The table in total is 2.6 TiB and looks like this (both fields are 64bit int's):

+-----------+------------+
|        a  |         b  |
+-----------+------------+
|85899366440|515396105374|
|85899374731|463856482626|
|85899353599|661424977446|

          [...]

I have a total amount of 7.4 TiB cluster memory, with 480 cores, on 10 workers and I read the parquet like this:

df = spark.read.parquet('/main/parentfolder/*/').cache()

and as I said I get an crazy amount of garbage collection time right now it stands at: Task Time (GC Time) | 116.9 h (104.8 h) with only 110 GiB loaded after 22 min of wall time. I monitor one of the workers and memory usual hover around 546G/748G

what am I doing wrong here? Do I need a larger cluster? If my Dataset is 2.6 TiB why isn't 7.4 TiB of memory enough? But then again why isn't the memory full on my worker?

Thagor
  • 820
  • 2
  • 10
  • 33
  • Possible duplicate of [Garbage collection time very high in spark application causing program halt](https://stackoverflow.com/questions/34589051/garbage-collection-time-very-high-in-spark-application-causing-program-halt) – Rob Jul 26 '19 at 13:46
  • provide your driver and executor memory in spark-submit here. i think you are using default (`bin/spark-submit` will also read configuration options from `conf/spark-defaults.conf` ) thats the reason it is cribbing. – user3190018 Jul 26 '19 at 21:44
  • hey good guess but no im not using the default. I set the values right in the app with: `.set('spark.executor.memory','740g').set('spark.driver.memory','740g')` – Thagor Jul 28 '19 at 11:27

2 Answers2

1

just try to remove .cache(). There are only few cases where you need to cache your data, the most obvious one is one dataframe, several actions. But if your dataframe is that big, do not use cache. Use persist.

from pyspark import StorageLevel
df = spark.read.parquet('/main/parentfolder/*/').persist(StorageLevel.DISK_ONLY)
Steven
  • 14,048
  • 6
  • 38
  • 73
  • I will try this as well, but the problem is after I load the `dataframe` I do several action on it. It's turned into a graphframe and pageranked which itterates about 50 times on the `dataframe` – Thagor Jul 28 '19 at 11:35
  • @Thagor That is why `persist` on disk is the good solution for your problem I think. – Steven Jul 29 '19 at 08:34
0

see Databrics article on this...

Tuning Java Garbage Collection for Apache Spark Applications

G1 GC Running Status (after Tuning)

-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms88g -Xmx88g -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20

need to garbge collector tuning in this case. try above example conf.

  • Also make sure that in your submit you are passing right parameters like executor memory driver memory

use

scala.collection.Map<String,scala.Tuple2<Object,Object>> getExecutorMemoryStatus() Return a map from the slave to the max memory available for caching and the remaining memory available for caching.

call and debug using getExecutorMemoryStatus API using pyspark's py4j bridge

sc._jsc.sc().getExecutorMemoryStatus()
Community
  • 1
  • 1
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • 1
    Hey I will look into the garbage collection! I'm pretty sure master and worker memory is set correctly I do the "in app' with, `.set('spark.executor.memory','740g').set('spark.driver.memory','740g')` . Can I pass the aboive config with `.set('spark.executor.extraJavaOptions','-XX:+UseG1GC -XX:+PrintFlagsFinal [...]')` ? – Thagor Jul 28 '19 at 11:29