5

I want to throttle the Kafka consumers in my Flink job.

Looking through the source code for Flink 1.12, I find FlinkConnectorRateLimiter and GuavaFlinkConnectorRateLimiter. But I can't find anything connecting this rate limiter to FlinkKafkaConsumer.

How can I implement rate limiting for Kafka in Flink 1.12?

David Anderson
  • 39,434
  • 4
  • 33
  • 60
shenshijie
  • 65
  • 1
  • 3

1 Answers1

7

FlinkConnectorRateLimiter was available with the legacy Kafka consumer (flink-connector-kafka-0.10), which was dropped in Flink 1.12. The current kafka consumer does not offer rate limiting.

See this mailing list thread -- https://lists.apache.org/thread/j7kw131jn0ozmrj763j0hr87b1rj7jop -- for some discussion. In short, rate limiting should no longer have any appeal once the in-progress improvements to checkpointing under backpressure and event-time skew are completed, so there isn't really any appetite to add support for rate limiting.

However, the mailing list thread above does include an example showing how to implement rate limiting for Kafka yourself by extending FlinkKafkaConsumer to override emitRecord and emitRecordWithTimestamp.


Note that you should be careful to never block checkpointing, which means you should avoid sleeping in the main processing thread. Deserialization schemas run in another thread, so this is the best place to do rate limiting.

David Anderson
  • 39,434
  • 4
  • 33
  • 60