12

I use fileStream to read files in the hdfs directory from Spark (streaming context). In case my Spark shut down and starts after some time, I would like to read the new files in the directory. I don't want to read old files in the directory which was already read and processed by Spark. I am trying to avoid duplicates here.

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File")

any code snippets to help?

peterh
  • 11,875
  • 18
  • 85
  • 108
user1125829
  • 179
  • 3
  • 4
  • 13

2 Answers2

30

You can use the FileSystem API:

import org.apache.hadoop.fs.{FileSystem, Path}

val fs = FileSystem.get(sc.hadoopConfiguration)

val outPutPath = new Path("/abc")

if (fs.exists(outPutPath))
  fs.delete(outPutPath, true)
Joker
  • 2,304
  • 25
  • 36
Ishan Kumar
  • 1,941
  • 3
  • 20
  • 29
1

fileStream already handles that for you - from its Scaladoc:

Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.

This means that fileStream would only load new files (created after streaming context was started), any files that already existed in the folder before you started your streaming application would be ignored.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • I am trying to explain about fault tolerance here. Say I have number of files 1 to 10 in hdfs and spark streaming has read this file. Now my spark streaming has stopped unfortunately. I have files in hdfs say 1 to 20 where 1 to 10 files were already parsed by spark streaming and 11 to 20 were added newly. Now I start spark streaming, I can see files 1- 30. Since I started spark at the time of 21st file in hdfs, My spark styreaming will loose files 11-20. how do I get lost files. – user1125829 Jul 17 '17 at 06:25