4

I have a DataFrame (df) with more than 1 billion rows

df.coalesce(5)
.write
.partitionBy("Country", "Date")
.mode("append")
.parquet(datalake_output_path)

From the above command I understand only 5 worker nodes in my 100 worker node cluster (spark 2.4.5) will be performing all the tasks. Using coalesce(5) takes the process 7 hours to complete.

Should I try repartition instead of coalesce?

Is there a more faster/ efficient way to write out 128 MB size parquet files or do I need to first calculate the size of my dataframe to determine how many partitions are required.

For example if the size of my dataframe is 1 GB and spark.sql.files.maxPartitionBytes = 128MB should I first calculate No. of partitions required as 1 GB/ 128 MB = approx(8) and then do repartition(8) or coalesce(8) ?

The idea is to maximize the size of parquet files in the output at the time of writing and be able to do so quickly (faster).

puligun
  • 332
  • 3
  • 12
  • What is the significance of 128MB output file size in your case, sounds as if you this is the maximum file size you can tolerate? – Chris May 06 '21 at 09:46
  • I just want to avoid the "too many small files" issue. The property "spark.sql.files.maxPartitionBytes" is set to 128MB and so I want the partitioned files to be as close to 128 MB as possible. For example I would like to have 10 part files of size 128 MB files rather than say 64 part files of size 20 MB Also I noticed that even if the "spark.sql.files.maxPartitionBytes" is set to 128MB I see files of 200MB, 400MB in the output path. I was expecting spark to pack them into 128MB file but this is related to another issue – puligun May 06 '21 at 11:46
  • "spark.sql.files.maxPartitionBytes" is used while reading from files. It can't control output files. For output files, you can use "spark.sql.files.maxRecordsPerFile". If your rows are more or less uniform in length, you can estimate the number of rows that would give your desired size (128MB). – A B Aug 01 '23 at 10:35

2 Answers2

2

You can get the size (dfSizeDiskMB) of your dataframe df by persisting it and then checking the Storage tab on the Web UI as in this answer. Armed with this information and an estimate of the expected Parquet compression ratio you can then estimate the number of partitions you need to achieve your desired output file partition size e.g.

val targetOutputPartitionSizeMB = 128
val parquetCompressionRation = 0.1
val numOutputPartitions = dfSizeDiskMB * parquetCompressionRatio / targetOutputPartitionSizeMB
df.coalesce(numOutputPartitions).write.parquet(path)

Note that spark.files.maxPartitionBytes is not relevant here as it is:

The maximum number of bytes to pack into a single partition when reading files.

(Unless df is the direct result of reading an input data source with no intermediate dataframes created. More likely the number of partitions for df is dictated by spark.sql.shuffle.partitions, being the number of partitions for Spark to use for dataframes created from joins and aggregations).

Should I try repartition instead of coalesce?

coalesce is usually better as it can avoid the shuffle associated with repartition, but note the warning in the docs about potentially losing parallelism in the upstream stages depending on your use case.

Chris
  • 1,335
  • 10
  • 19
  • 1
    So if i want to do it in code it would be something like this `val blockSize= 1024*1024*128 sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize) sc.hadoopConfiguration.setInt("parquet.block.size",blockSize) df.write.option("parquet.block.size", 128 * 1024 * 1024) ` – puligun May 07 '21 at 11:20
1

Coalesce is better if you are coming from higher no of partitions to lower no. However, if before writing the df, your code isn't doing shuffle , then coalesce will be pushed down to the earliest point possible in DAG. What you can do is process your df in say 100 partitions or whatever number you seem appropriate and then persist it before writing your df. Then bring your partitions down to 5 using coalesce and write it. This should probably give you a better performance

Mohd Avais
  • 237
  • 2
  • 9
  • If i want minimum no. of parquet files per partition do you think this is a good idea:- `df.repartition("Country", "Date") .write .partitionBy("Country", "Date") .mode("append") .parquet(datalake_output_path)` – puligun May 05 '21 at 19:25
  • Repartitition controls the partitioning in memory while partitionBy controls it on disk. I guess you should specify the no of partitions in repartitition along with columns to control the no of files – Mohd Avais May 05 '21 at 19:47