20

I'm using Spark Streaming 2.1. I'd like to refresh some cached table (loaded by spark provided DataSource like parquet, MySQL or user-defined data sources) periodically.

  1. how to refresh the table?

    Suppose I have some table loaded by

    spark.read.format("").load().createTempView("my_table")

    and it is also cached by

    spark.sql("cache table my_table")

    is it enough with following code to refresh the table, and when the table is loaded next, it will automatically be cached

    spark.sql("refresh table my_table")

    or do I have to do that manually with

    spark.table("my_table").unpersist spark.read.format("").load().createOrReplaceTempView("my_table") spark.sql("cache table my_table")

  2. is it safe to refresh the table concurrently?

    By concurrent I mean using ScheduledThreadPoolExecutor to do the refresh work apart from the main thread.

    What will happen if the Spark is using the cached table when I call refresh on the table?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
宇宙人
  • 1,197
  • 3
  • 10
  • 28

2 Answers2

24

In Spark 2.2.0 they have introduced feature of refreshing the metadata of a table if it was updated by hive or some external tools.

You can achieve it by using the API,

spark.catalog.refreshTable("my_table")

This API will update the metadata for that table to keep it consistent.

Ganesh
  • 574
  • 2
  • 4
  • The API `spark.catalog.refreshTable` exists prior to Spark 2.2.0 and I guess it is the same as `spark.sql("refresh table ...")`. Do you mean that before Spark 2.2.0 by using `spark.catalog.refreshTable` is not enough, I have to manually unpersist and recache? And what about concurrency? – 宇宙人 Aug 22 '17 at 05:19
  • using spark.catalog.refreshTable should be enough. And what you meant by concurrency!! If you meant parallelization, then it doesn't harm parallelization as far as I know. – Ganesh Aug 22 '17 at 05:30
  • 1
    I need to explain more, my english is poor. You answer and your comment are inconsistent. In your answer, you point out that the feature is introduced in Spark 2.2.0. In your comment, you then say that even with Spark 2.1.0 `spark.catalog.refreshTable` is enough. It confuses me very much. By concurrency, I have an example. When the Spark is executing some physical plan which is using some cached table t1, I call refresh t1 at the mean time, what will Spark do? Wait until the physical plan finished, then refresh the table. Or reload the table before actually using the table in the plan? – 宇宙人 Aug 22 '17 at 05:42
  • I feel it would reload the table and update the plan. But better if you test it once and confirm. – Ganesh Aug 22 '17 at 05:49
  • 1
    did you test it? as I cached table and update files from a cached table then it gives me file not found Error, so I used refresh table command but while running parallel process to update file and read the same error I am getting. what would be the best approach should I use task context to control parallelism? – Abhijit Jagtap Nov 28 '18 at 12:59
0

I had a problem to read a table from hive using a SparkSession specifically the method table, i.e. spark.table(table_name). Every time after wrote the table and try to read that I got this error:

java.IO.FileNotFoundException ... The underlying files may have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

I tried to refresh the table using spark.catalog.refreshTable(table_name) also sqlContext neither worked.

My solutions as wrote the table and after read using:

val usersDF = spark.read.load(s"/path/table_name")

It's work fine.

Is this a problem? Maybe the data at hdfs is not updated yet?

user1658843
  • 61
  • 2
  • 7