9

Spark streaming textFileStream and fileStream can monitor a directory and process the new files in a Dstream RDD.

How to get the file names that are being processed by the DStream RDD at that particular interval?

Vijay Innamuri
  • 4,242
  • 7
  • 42
  • 67
  • duplicate of http://stackoverflow.com/questions/29935732/spark-streaming-textfilestream-filestream-get-file-name – Irene Jun 18 '15 at 13:27
  • @Irene This was posted in March 2015 and that question was posted in April 2015. How it is possible to be a duplicate? BTW other question is also still unanswered. – Vijay Innamuri Jun 19 '15 at 05:20
  • hehe i read the date wrongly. the other question is answered in a comment. – Irene Jun 19 '15 at 07:40

3 Answers3

5

fileStream produces UnionRDD of NewHadoopRDDs. The good part about NewHadoopRDDs created by sc.newAPIHadoopFile is that their names are set to their paths.

Here's the example of what you can do with that knowledge:

def namedTextFileStream(ssc: StreamingContext, directory: String): DStream[String] =
  ssc.fileStream[LongWritable, Text, TextInputFormat](directory)
    .transform( rdd =>
      new UnionRDD(rdd.context,
        rdd.dependencies.map( dep =>
          dep.rdd.asInstanceOf[RDD[(LongWritable, Text)]].map(_._2.toString).setName(dep.rdd.name)
        )
      )
    )

def transformByFile[U: ClassTag](unionrdd: RDD[String],
                                 transformFunc: String => RDD[String] => RDD[U]): RDD[U] = {
  new UnionRDD(unionrdd.context,
    unionrdd.dependencies.map{ dep =>
      if (dep.rdd.isEmpty) None
      else {
        val filename = dep.rdd.name
        Some(
          transformFunc(filename)(dep.rdd.asInstanceOf[RDD[String]])
            .setName(filename)
        )
      }
    }.flatten
  )
}

def main(args: Array[String]) = {
  val conf = new SparkConf()
    .setAppName("Process by file")
    .setMaster("local[2]")

  val ssc = new StreamingContext(conf, Seconds(30))

  val dstream = namesTextFileStream(ssc, "/some/directory")

  def byFileTransformer(filename: String)(rdd: RDD[String]): RDD[(String, String)] =
    rdd.map(line => (filename, line))

  val transformed = dstream.
    transform(rdd => transformByFile(rdd, byFileTransformer))

  // Do some stuff with transformed

  ssc.start()
  ssc.awaitTermination()
}
nonsleepr
  • 801
  • 9
  • 12
  • 1
    Thanks for this! Though, `val filename` is confusing as it is the entire path to the file. `new File(dep.rdd.name).getName` seems to work. – OneCricketeer Jan 27 '17 at 00:30
1

For those that want some Java code instead of Scala:

JavaPairInputDStream<LongWritable, Text> textFileStream = 
        jsc.fileStream(
            inputPath, 
            LongWritable.class, 
            Text.class,
            TextInputFormat.class, 
            FileInputDStream::defaultFilter,
            false
        );
JavaDStream<Tuple2<String, String>> namedTextFileStream = textFileStream.transform((pairRdd, time) -> {
        UnionRDD<Tuple2<LongWritable, Text>> rdd = (UnionRDD<Tuple2<LongWritable, Text>>) pairRdd.rdd();
        List<RDD<Tuple2<LongWritable, Text>>> deps = JavaConverters.seqAsJavaListConverter(rdd.rdds()).asJava();
        List<RDD<Tuple2<String, String>>> collectedRdds = deps.stream().map( depRdd -> {
            if (depRdd.isEmpty()) {
                return null;
            }
            JavaRDD<Tuple2<LongWritable, Text>> depJavaRdd = depRdd.toJavaRDD();
            String filename = depRdd.name();
            JavaPairRDD<String, String> newDep = JavaPairRDD.fromJavaRDD(depJavaRdd).mapToPair(t -> new Tuple2<String, String>(filename, t._2().toString())).setName(filename);
            return newDep.rdd();
        }).filter(t -> t != null).collect(Collectors.toList());
        Seq<RDD<Tuple2<String, String>>> rddSeq = JavaConverters.asScalaBufferConverter(collectedRdds).asScala().toIndexedSeq();
        ClassTag<Tuple2<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);
        return new UnionRDD<Tuple2<String, String>>(rdd.sparkContext(), rddSeq, classTag).toJavaRDD();
});
racc
  • 1,222
  • 10
  • 13
0

Alternatively, by modifying FileInputDStream so that rather than loading the contents of the files into the RDD, it simply creates an RDD from the filenames.

This gives a performance boost if you don't actually want to read the data itself into the RDD, or want to pass filenames to an external command as one of your steps.

Simply change filesToRDD(..) so that it makes an RDD of the filenames, rather than loading the data into the RDD.

See: https://github.com/HASTE-project/bin-packing-paper/blob/master/spark/spark-scala-cellprofiler/src/main/scala/FileInputDStream2.scala#L278

Ben
  • 275
  • 3
  • 12
  • There is some info about the application here: https://arxiv.org/pdf/2001.10865 – Ben May 04 '20 at 21:58