2

I am using Spark 2.3.1 in order to process a stream of text files. Context setup is as follows:

val ssc = new StreamingContext(sparkSession.sparkContext, 
    Seconds(config.sparkStreamingBatchTime))
ssc.checkpoint(config.sparkCheckpointLocation)

Everything works fine unless not enough resources are given to the jobs and the batches start piling up. The solution is to kill the job, give more resources and start again.

Unfortunately, when doing so, the pending batches that were triggered, but not yet processed, are lost.

Example: I schedule the job once per minute that only does a Thread.sleep(3 * 60 * 1000) (three minutes). The batches start piling up, as expected.

A new file is introduced for a batch:

20/09/23 08:39:00 DEBUG FileInputDStream: Getting new files for time 1600850340000, ignoring files older than 1600850280000
20/09/23 08:39:00 DEBUG FileInputDStream: hdfs://nameservice1/[...]/test_1 accepted with mod time 1600850308293
20/09/23 08:39:00 INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 18 ms

Checkpoint system is apparent working as the file set is persisted:

20/09/23 08:39:00 DEBUG FileInputDStream: Updated checkpoint data for time 1600850340000 ms: [
3 file sets
1600850280000 ms ->
1600850340000 ms -> hdfs://nameservice1/[...]/test_1
1600850220000 ms ->
]

Then, I kill the job (apparently gracefully since shudown hooks are invoked):

20/09/23 08:40:00 INFO JobScheduler: Stopped JobScheduler
20/09/23 08:40:00 INFO StreamingContext: StreamingContext stopped successfully
20/09/23 08:40:00 INFO SparkContext: Invoking stop() from shutdown hook
[...]
20/09/23 08:40:00 INFO org.apache.spark.util.ShutdownHookManager: Shutdown hook called

Finally when I fire the job again, the file is ignored due to its timestamp, but it was not processed in the first place!

20/09/23 08:44:00 DEBUG FileInputDStream: Getting new files for time 1600850640000, ignoring files older than 1600850580000
20/09/23 08:44:00 DEBUGFileInputDStream: hdfs://nameservice1/[...]/test_1 ignored as mod time 1600850308293 <= ignore time 1600850580000
20/09/23 08:44:00 INFO FileInputDStream: Finding new files took 3 ms

There is a workaround that involves "touching" or copying the unprocessed files but it is a nightmare to maintain in production. Do you think that I am missing something at shutdown time so the pending batches are persisted to the checkpoint folder? Is it possible to override the timestamp check? Other ideas?

Thanks!

UPDATE 09/24/2020: I have a workaround, but is really ugly. Based on this question, I have managed to get the files that generated the RDD suplied in each batch:

/**
  * Recursive method to extract original metadata files involved in this batch.
  * @param rdd Each RDD created for each batch.
  * @return All HDFS files originally read.
  */
def extractSourceHDFSFiles(rdd: RDD[_]): Set[String] = {
  def extractSourceHDFSFilesWithAcc(rdd: List[RDD[_]]) : Set[String] = {
    rdd match {
      case Nil => Set()
      case head :: tail => {
        val name = head.toString()
        if (name.startsWith("hdfs")){
          Set(name.split(" ")(0)) ++ extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
        }
        else {
          extractSourceHDFSFilesWithAcc(
head.dependencies.map(_.rdd).toList) 
++ extractSourceHDFSFilesWithAcc(tail)
        }
      }
    }
}

With it, at least, I can log which files have been processed. Then, manually search the ones that were not processed. It is terrible, but is the only workaroud I have.

Update #2 09/24/2020: I noticed that I can use --conf spark.streaming.fileStream.minRememberDuration to make Spark to consider older metadata files, but it won´t prevent it from reprocessing files that have been already processed. I would need to use the information from the previous update to go an delete those files as well as forcing an administrator to clean up old metadata files before launching the job...

Update #3: 09/30/2020 I have digged in the source code of FileInputDStream and I have a better view on what the checkpoint does. It recovers the "snapshot" of the directory being monitored and which files appeared at what time.

However, the missing piece here is, those "fileset" (ie. batches) that were generated but never processed, how can they be recovered after relaunching the job?

20/09/23 08:39:00 DEBUG FileInputDStream: Updated checkpoint data for time 1600850340000 ms: [
3 file sets
1600850280000 ms ->
1600850340000 ms -> hdfs://nameservice1/[...]/test_1
1600850220000 ms ->
]
Victor
  • 2,450
  • 2
  • 23
  • 54
  • what is the reason for not using structured streaming? – Alex Ott Sep 25 '20 at 14:05
  • No need to look into past data or perform complex Spark SQL transformations. Tool basically groups incoming files and merges them (actually the source folder is a _spark_metadata folder coming from a Spark Structured Streaming job). I am not considering migrating, instead I would prefer to use the checkpoint feature. – Victor Sep 25 '20 at 14:13
  • with each set of files, are you performing window operations or operations that update state across intervals? Or are you using the Streaming API because of it's ability to detect changes? – ggordon Oct 01 '20 at 21:59
  • No, no windowing or state. Just need to accumulate new files during the batch duration so change detection, as you put it, is enough for me. The metrics and monitoring are also useful for me (and the error recovery - if it worked) – Victor Oct 01 '20 at 23:27
  • have you considered multiple jobs i.e. one to record file changes in a temporary data store/queue and another that will process changes from that datastore/queue and mark the completed ones as such if a data store was used. The first job would have less workload and the second job can always restart by checking it's datastore for incomplete processed files. You can then schedule the second one periodically and optimize there based on what is required of each job. – ggordon Oct 02 '20 at 01:11
  • Thanks for the idea but that sounds to me like reinventing the wheel. If for Kafka streams you can stop and restart st an offset, the FileInputDStream should be able to do the same, right? – Victor Oct 02 '20 at 01:24
  • problem seems to be that detected files may/may not be processed. Maybe you could use an increased `--conf spark.streaming.fileStream.minRememberDuration` to increase detection along with a `filter` to ignore previously processed files (you could cache this in a data store of choice) . This still leaves the probability that files out of the `minRememberDuration` may be missed – ggordon Oct 02 '20 at 02:36
  • Yes, that is what I have managed to get so far (see Update #2) but is a solution that I don´t like it much. – Victor Oct 02 '20 at 07:25

0 Answers0