9

This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.

I am saving a DataFrame as a parquet, but when I reload the DataFrame from parquet file and save it once again as parquet, I get an error.

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR   
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

Error message -

executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.

df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

Workaround for this problem: A non-elegant way to solve this issue is to save the DataFrame as parquet file with a different name, then delete the original parquet file and finally, rename this parquet file to the old name.

# Workaround
import os
import shutil

# Load it back
df = spark.read.format('parquet').load('.../temp')

# Save it back as temp1, as opposed to original temp      
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')

# Delete the original parquet file
shutil.rmtree('.../temp')

# Renaming the parquet folder.
os.rename('.../temp1','.../temp')

But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.

cph_sto
  • 7,189
  • 12
  • 42
  • 78
  • Possible duplicate of [Spark HiveContext : Insert Overwrite the same table it is read from](https://stackoverflow.com/questions/46143084/spark-hivecontext-insert-overwrite-the-same-table-it-is-read-from) – 10465355 Oct 02 '19 at 09:17
  • Hello, I have seen this link before, but since I am not on HIVE, I don't know how to use it in my case. If you have a solution to it, kindly write it as an answer. It would help a lot. Thanks. – cph_sto Oct 02 '19 at 09:30

2 Answers2

8

One solution for this error is to cache, make an action to the df (example: df.show()) and then save the parquet file in "overwrite" mode.

in python:

save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")

....... make your transformation to the df which is new_df

new_df.cache()
new_df.show()

new_df.write.format("parquet")\
                .mode(save_mode)\
                .save("path_to_parquet")
prashanth
  • 4,197
  • 4
  • 25
  • 42
Andre Aguiar
  • 81
  • 1
  • 2
3

When data is taken out of a cache it seems to work fine.

val df = spark.read.format("parquet").load("temp").cache()

cache is a lazy operation, and doesn't trigger any computation, we have to add some dummy action.

println(df.count()) //count over parquet files should be very fast  

Now it should work:

df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
Gelerion
  • 1,634
  • 10
  • 17
  • 1
    I ran the code with `.cache()`. Sometimes it works and other times it fails with error - `Caused by: java.io.IOException: Error: Stale file handle`. But, I think it has nothing to do with your code, as I have seen this error before while saving DFs. Let me test it form sometime before I could accept this as an answer. Thank you. – cph_sto Sep 26 '19 at 13:35
  • 1
    Can you offer an explanation as to what difference `cache()` makes? – cph_sto Sep 26 '19 at 13:43
  • 1
    Well, I am not 100% sure it will work on a big cluster, I have tested it only on my local environment. I could imagine the situation when the job is run on spot nodes, and all the nodes witch cached data were taken by the cloud provider. In this situation, Spark will try to recompute the lost data and eventually fail with the same exception. – Gelerion Sep 26 '19 at 13:47
  • 1
    Oh I see. So, would you say that the workaround that I have proposed remains more robust? Do you think that refreshing the `metadata` as I posted in my question will help? – cph_sto Sep 26 '19 at 13:51
  • 1
    `cache` stores data on a local disk per executor, then when we write the data, it will be taken from the disk and not from the remote file. – Gelerion Sep 26 '19 at 13:51
  • 2
    So, that's why `.cache()` works because `df` is not pointing to the original `parquet` file, but a location on the disk, correct? – cph_sto Sep 26 '19 at 13:53
  • 1
    yes, exactly. I had no success with refreshing `metadata` – Gelerion Sep 26 '19 at 13:54
  • 1
    It might be a viable solution if you use on-prem or emr cluster with static nodes, also you could increase replication factor. – Gelerion Sep 26 '19 at 13:56
  • 1
    Thank you so much. It helped me get a perspective :) – cph_sto Sep 26 '19 at 13:58