1

I have to perform batch queries (basically in a loop) from Kafka via Spark, each time starting from the last offset read at the previous iteration, so that I only read new data.

Dataset<Row> df = spark
                .read()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "test-reader")
                .option("enable.auto.commit", true)
                .option("kafka.group.id", "demo-reader") //not sure about the one to use
                .option("group.id", "demo-reader")
                .option("startingOffset", "latest")
                .load()

It seems that latest is not supported in batch queries. I'm wondering if it is possible to do something similar in another way (without dealing directly with offsets).

EDIT: earliest seems to retrieve the whole data contained in topic.

  • Could you please explain yhy do you want to do batch loop instead of streaming ? – Rishabh Sharma Jun 05 '21 at 18:21
  • I have constraints to process data of the same "type" together, there is a logic that says when to process a certain batch. Streaming does not make these distinctions, but only considers periods – Roberto Bressani Jun 06 '21 at 07:25
  • I believe this is not possible till spark 2.4.3 at least (consumer to offset mapping is based on group id which is not supported in spark 2.4.3 [https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html]. However it seems that spark 3.1 does support consumer group id [https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html]. Which version are you working with ? – Rishabh Sharma Jun 07 '21 at 04:47
  • I'm on 3.1.1, but still no it does not work – Roberto Bressani Jun 07 '21 at 10:06

1 Answers1

0

Can you try earliest instead of latest for startingOffsets as shown in below example:

Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test-reader")
  .option("enable.auto.commit", true)
  .option("kafka.group.id", "demo-reader") //not sure about the one to use
  .option("group.id", "demo-reader")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();

Please refer spark docs

You should use "latest" for streaming, "earliest" for batch as per the documentation.

pcsutar
  • 1,715
  • 2
  • 9
  • 14
  • I'm still getting the whole topic – Roberto Bressani Jun 05 '21 at 08:04
  • I hope your your original issue has been solved. – pcsutar Jun 05 '21 at 08:34
  • Unfortunately no, I want only data that have not been previously queried – Roberto Bressani Jun 05 '21 at 08:45
  • In description you have mentioned "It seems that latest is not supported in batch queries. I'm wondering if it is possible to do something similar in another way (without dealing directly with offsets)". If that is not the issue then, could you please update your question? – pcsutar Jun 05 '21 at 09:04
  • Maybe I didn't get what "latest" aims to. From what I understood this should be used in startingOffset to use only data that has not been processed yet by the consumer. If it is wrong, could you please clarify? – Roberto Bressani Jun 05 '21 at 09:06
  • Please check https://stackoverflow.com/questions/48320672/what-is-the-difference-between-kafka-earliest-and-latest-offset-values – pcsutar Jun 05 '21 at 09:14