I am using Spark 2.3.1 in order to process a stream of text files. Context setup is as follows:
val ssc = new StreamingContext(sparkSession.sparkContext,
Seconds(config.sparkStreamingBatchTime))
ssc.checkpoint(config.sparkCheckpointLocation)
Everything works fine unless not enough resources are given to the jobs and the batches start piling up. The solution is to kill the job, give more resources and start again.
Unfortunately, when doing so, the pending batches that were triggered, but not yet processed, are lost.
Example: I schedule the job once per minute that only does a Thread.sleep(3 * 60 * 1000)
(three minutes). The batches start piling up, as expected.
A new file is introduced for a batch:
20/09/23 08:39:00 DEBUG FileInputDStream: Getting new files for time 1600850340000, ignoring files older than 1600850280000
20/09/23 08:39:00 DEBUG FileInputDStream: hdfs://nameservice1/[...]/test_1 accepted with mod time 1600850308293
20/09/23 08:39:00 INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 18 ms
Checkpoint system is apparent working as the file set is persisted:
20/09/23 08:39:00 DEBUG FileInputDStream: Updated checkpoint data for time 1600850340000 ms: [
3 file sets
1600850280000 ms ->
1600850340000 ms -> hdfs://nameservice1/[...]/test_1
1600850220000 ms ->
]
Then, I kill the job (apparently gracefully since shudown hooks are invoked):
20/09/23 08:40:00 INFO JobScheduler: Stopped JobScheduler
20/09/23 08:40:00 INFO StreamingContext: StreamingContext stopped successfully
20/09/23 08:40:00 INFO SparkContext: Invoking stop() from shutdown hook
[...]
20/09/23 08:40:00 INFO org.apache.spark.util.ShutdownHookManager: Shutdown hook called
Finally when I fire the job again, the file is ignored due to its timestamp, but it was not processed in the first place!
20/09/23 08:44:00 DEBUG FileInputDStream: Getting new files for time 1600850640000, ignoring files older than 1600850580000
20/09/23 08:44:00 DEBUGFileInputDStream: hdfs://nameservice1/[...]/test_1 ignored as mod time 1600850308293 <= ignore time 1600850580000
20/09/23 08:44:00 INFO FileInputDStream: Finding new files took 3 ms
There is a workaround that involves "touching" or copying the unprocessed files but it is a nightmare to maintain in production. Do you think that I am missing something at shutdown time so the pending batches are persisted to the checkpoint folder? Is it possible to override the timestamp check? Other ideas?
Thanks!
UPDATE 09/24/2020: I have a workaround, but is really ugly. Based on this question, I have managed to get the files that generated the RDD suplied in each batch:
/**
* Recursive method to extract original metadata files involved in this batch.
* @param rdd Each RDD created for each batch.
* @return All HDFS files originally read.
*/
def extractSourceHDFSFiles(rdd: RDD[_]): Set[String] = {
def extractSourceHDFSFilesWithAcc(rdd: List[RDD[_]]) : Set[String] = {
rdd match {
case Nil => Set()
case head :: tail => {
val name = head.toString()
if (name.startsWith("hdfs")){
Set(name.split(" ")(0)) ++ extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
}
else {
extractSourceHDFSFilesWithAcc(
head.dependencies.map(_.rdd).toList)
++ extractSourceHDFSFilesWithAcc(tail)
}
}
}
}
With it, at least, I can log which files have been processed. Then, manually search the ones that were not processed. It is terrible, but is the only workaroud I have.
Update #2 09/24/2020: I noticed that I can use --conf spark.streaming.fileStream.minRememberDuration
to make Spark to consider older metadata files, but it won´t prevent it from reprocessing files that have been already processed. I would need to use the information from the previous update to go an delete those files as well as forcing an administrator to clean up old metadata files before launching the job...
Update #3: 09/30/2020 I have digged in the source code of FileInputDStream and I have a better view on what the checkpoint does. It recovers the "snapshot" of the directory being monitored and which files appeared at what time.
However, the missing piece here is, those "fileset" (ie. batches) that were generated but never processed, how can they be recovered after relaunching the job?
20/09/23 08:39:00 DEBUG FileInputDStream: Updated checkpoint data for time 1600850340000 ms: [
3 file sets
1600850280000 ms ->
1600850340000 ms -> hdfs://nameservice1/[...]/test_1
1600850220000 ms ->
]